From 7e947cb841154d338b9540f5da3e710c207be966 Mon Sep 17 00:00:00 2001 From: David Smiley Date: Tue, 9 Jun 2026 09:34:54 -0400 Subject: [PATCH 1/7] Refactor CLI StreamTool SolrClientCache usage * don't call SolrClientCache.setBasicAuthCredentials * insist on a StreamContext, even if "remote mode" --- .../java/org/apache/solr/cli/CLIUtils.java | 4 +- .../java/org/apache/solr/cli/StreamTool.java | 94 +++++++++---------- 2 files changed, 46 insertions(+), 52 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cli/CLIUtils.java b/solr/core/src/java/org/apache/solr/cli/CLIUtils.java index 9e8cd9aa8cb6..1a8f76c0db8b 100644 --- a/solr/core/src/java/org/apache/solr/cli/CLIUtils.java +++ b/solr/core/src/java/org/apache/solr/cli/CLIUtils.java @@ -180,7 +180,7 @@ public static SolrClient getSolrClient(CommandLine cli) throws Exception { * is used, and warns those users. In the future we'll have urls ending with /api as well. * * @param solrUrl The user supplied url to Solr. - * @return the solrUrl in the format that Solr expects to see internally. + * @return a URL without any path, e.g. {@code http://localhost:8983} */ public static String normalizeSolrUrl(String solrUrl) { return normalizeSolrUrl(solrUrl, true); @@ -192,7 +192,7 @@ public static String normalizeSolrUrl(String solrUrl) { * * @param solrUrl The user supplied url to Solr. * @param logUrlFormatWarning If a warning message should be logged about the url format - * @return the solrUrl in the format that Solr expects to see internally. + * @return a URL without any path, e.g. {@code http://localhost:8983} */ public static String normalizeSolrUrl(String solrUrl, boolean logUrlFormatWarning) { if (solrUrl != null) { diff --git a/solr/core/src/java/org/apache/solr/cli/StreamTool.java b/solr/core/src/java/org/apache/solr/cli/StreamTool.java index 203291c53f07..4083aca508bb 100644 --- a/solr/core/src/java/org/apache/solr/cli/StreamTool.java +++ b/solr/core/src/java/org/apache/solr/cli/StreamTool.java @@ -38,7 +38,6 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; -import org.apache.solr.client.solrj.io.Lang; import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.StreamComparator; @@ -49,8 +48,8 @@ import org.apache.solr.client.solrj.io.stream.expr.Explanation; import org.apache.solr.client.solrj.io.stream.expr.Expressible; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; -import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.client.solrj.jetty.HttpJettySolrClient; import org.apache.solr.common.SolrException; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.handler.CatStream; @@ -62,8 +61,6 @@ public StreamTool(ToolRuntime runtime) { super(runtime); } - private final SolrClientCache solrClientCache = new SolrClientCache(); - @Override public String getName() { return "stream"; @@ -166,14 +163,17 @@ public void runImpl(CommandLine cli) throws Exception { } } + // a stream needs a context + StreamContext streamContext = createStreamContext(cli); + // create the stream PushBackStream pushBackStream; if (execution.equalsIgnoreCase("local")) { - pushBackStream = doLocalMode(cli, expr); + pushBackStream = doLocalMode(expr, streamContext.getStreamFactory()); } else { - pushBackStream = doRemoteMode(cli, expr); + pushBackStream = doRemoteMode(expr, cli); } - try { + pushBackStream.setStreamContext(streamContext); pushBackStream.open(); if (outputHeaders == null) { @@ -227,35 +227,52 @@ public void runImpl(CommandLine cli) throws Exception { } } finally { pushBackStream.close(); - solrClientCache.close(); + streamContext.getSolrClientCache().close(); } echoIfVerbose("StreamTool -- Done."); } + private StreamContext createStreamContext(CommandLine cli) throws Exception { + var jettyClientBuilder = new HttpJettySolrClient.Builder(); + String credentials = cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION); + if (credentials != null) { + String[] userPass = credentials.split(":"); + jettyClientBuilder.withBasicAuthCredentials(userPass[0], userPass[1]); + } + HttpJettySolrClient client = jettyClientBuilder.build(); + + // subclass so we can ensure our client is closed when the cache is closed + var solrClientCache = + new SolrClientCache(client) { + @Override + public synchronized void close() { + super.close(); + client.close(); + } + }; + + var solrConnection = CLIUtils.getSolrConnection(cli); + echoIfVerbose("Connecting to Solr at " + solrConnection); + + StreamContext streamContext = new StreamContext(); + streamContext.setSolrClientCache(solrClientCache); + streamContext.getStreamFactory().withDefaultSolrConnection(solrConnection); + return streamContext; + } + /** * Runs a streaming expression in the local process of the CLI. * *

Running locally means that parallelization support or those expressions requiring access to * internal Solr capabilities will not function. * - * @param cli The CLI invoking the call * @param expr The streaming expression to be parsed and in the context of the CLI process + * @param streamFactory * @return A connection to the streaming expression that receives Tuples as they are emitted * locally. */ - private PushBackStream doLocalMode(CommandLine cli, String expr) throws Exception { - var solrConnection = CLIUtils.getSolrConnection(cli); - echoIfVerbose("Connecting to Solr at " + solrConnection.toString()); - solrClientCache.setBasicAuthCredentials( - cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION)); - solrClientCache.getCloudSolrClient(solrConnection); - - TupleStream stream; - PushBackStream pushBackStream; - - StreamExpression streamExpression = StreamExpressionParser.parse(expr); - StreamFactory streamFactory = new StreamFactory(); + private PushBackStream doLocalMode(String expr, StreamFactory streamFactory) throws Exception { // stdin is ONLY available in the local mode, not in the remote mode as it // requires access to System.in @@ -265,23 +282,7 @@ private PushBackStream doLocalMode(CommandLine cli, String expr) throws Exceptio // logic about where to read data from. streamFactory.withFunctionName("cat", LocalCatStream.class); - streamFactory.withDefaultSolrConnection(solrConnection); - - Lang.register(streamFactory); - - assert streamExpression != null; - stream = streamFactory.constructStream(streamExpression); - - pushBackStream = new PushBackStream(stream); - - // Now we can run the stream and return the results. - StreamContext streamContext = new StreamContext(); - streamContext.setSolrClientCache(solrClientCache); - - // Output the headers - pushBackStream.setStreamContext(streamContext); - - return pushBackStream; + return new PushBackStream(streamFactory.constructStream(expr)); } /** @@ -291,14 +292,15 @@ private PushBackStream doLocalMode(CommandLine cli, String expr) throws Exceptio *

Running remotely allows you to use all the standard Streaming Expression capabilities as the * expression is running in a Solr environment. * - * @param cli The CLI invoking the call * @param expr The streaming expression to be parsed and run remotely + * @param cli The CLI invoking the call * @return A connection to the streaming expression that receives Tuples as they are emitted from * Solr /stream. */ - private PushBackStream doRemoteMode(CommandLine cli, String expr) throws Exception { + private PushBackStream doRemoteMode(String expr, CommandLine cli) throws Exception { String solrUrl = CLIUtils.normalizeSolrUrl(cli); + if (!cli.hasOption(COLLECTION_OPTION)) { throw new IllegalStateException( "You must provide --name COLLECTION with --execution remote parameter."); @@ -310,16 +312,8 @@ private PushBackStream doRemoteMode(CommandLine cli, String expr) throws Excepti "The stdin() expression is only usable with --worker local set up."); } - final SolrStream solrStream = - new SolrStream(solrUrl + "/solr/" + collection, params("qt", "/stream", "expr", expr)); - - String credentials = cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION); - if (credentials != null) { - String username = credentials.split(":")[0]; - String password = credentials.split(":")[1]; - solrStream.setCredentials(username, password); - } - return new PushBackStream(solrStream); + return new PushBackStream( + new SolrStream(solrUrl + "/solr/" + collection, params("qt", "/stream", "expr", expr))); } private static ModifiableSolrParams params(String... params) { From 604eb3ff6558b6050da9bf7c72e2684bf1b6a66c Mon Sep 17 00:00:00 2001 From: Eric Pugh Date: Wed, 10 Jun 2026 15:48:38 -0400 Subject: [PATCH 2/7] fix precommit Signed-off-by: Eric Pugh --- solr/core/src/java/org/apache/solr/cli/StreamTool.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cli/StreamTool.java b/solr/core/src/java/org/apache/solr/cli/StreamTool.java index 4083aca508bb..c01775ccddec 100644 --- a/solr/core/src/java/org/apache/solr/cli/StreamTool.java +++ b/solr/core/src/java/org/apache/solr/cli/StreamTool.java @@ -267,8 +267,8 @@ public synchronized void close() { *

Running locally means that parallelization support or those expressions requiring access to * internal Solr capabilities will not function. * - * @param expr The streaming expression to be parsed and in the context of the CLI process - * @param streamFactory + * @param expr The streaming expression to be parsed and run in the context of the CLI process + * @param streamFactory The factory used to construct the streaming expression * @return A connection to the streaming expression that receives Tuples as they are emitted * locally. */ From 9a82cb7f53dd0ca633422f18627ef9e513a1fcc7 Mon Sep 17 00:00:00 2001 From: Eric Pugh Date: Sat, 13 Jun 2026 11:03:33 -0400 Subject: [PATCH 3/7] Move up validation to before creating remote connection. Fix tests. --- .../java/org/apache/solr/cli/StreamTool.java | 46 +++++++++++-------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cli/StreamTool.java b/solr/core/src/java/org/apache/solr/cli/StreamTool.java index c01775ccddec..8af535df58da 100644 --- a/solr/core/src/java/org/apache/solr/cli/StreamTool.java +++ b/solr/core/src/java/org/apache/solr/cli/StreamTool.java @@ -38,6 +38,7 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; +import org.apache.solr.client.solrj.io.Lang; import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.StreamComparator; @@ -163,16 +164,29 @@ public void runImpl(CommandLine cli) throws Exception { } } + // Validate inputs before opening any connection to Solr. + boolean local = execution.equalsIgnoreCase("local"); + if (!local) { + if (!cli.hasOption(COLLECTION_OPTION)) { + throw new IllegalStateException( + "You must provide --name COLLECTION with --execution remote parameter."); + } + if (expr.toLowerCase(Locale.ROOT).contains("stdin(")) { + throw new IllegalStateException( + "The stdin() expression is only usable with --worker local set up."); + } + } + // a stream needs a context StreamContext streamContext = createStreamContext(cli); // create the stream - PushBackStream pushBackStream; - if (execution.equalsIgnoreCase("local")) { - pushBackStream = doLocalMode(expr, streamContext.getStreamFactory()); - } else { - pushBackStream = doRemoteMode(expr, cli); - } + PushBackStream pushBackStream = null; try { + if (local) { + pushBackStream = doLocalMode(expr, streamContext.getStreamFactory()); + } else { + pushBackStream = doRemoteMode(expr, cli); + } pushBackStream.setStreamContext(streamContext); pushBackStream.open(); @@ -226,7 +240,9 @@ public void runImpl(CommandLine cli) throws Exception { } } } finally { - pushBackStream.close(); + if (pushBackStream != null) { + pushBackStream.close(); + } streamContext.getSolrClientCache().close(); } @@ -257,7 +273,11 @@ public synchronized void close() { StreamContext streamContext = new StreamContext(); streamContext.setSolrClientCache(solrClientCache); - streamContext.getStreamFactory().withDefaultSolrConnection(solrConnection); + + StreamFactory streamFactory = new StreamFactory(); + Lang.register(streamFactory); + streamFactory.withDefaultSolrConnection(solrConnection); + streamContext.setStreamFactory(streamFactory); return streamContext; } @@ -300,18 +320,8 @@ private PushBackStream doLocalMode(String expr, StreamFactory streamFactory) thr private PushBackStream doRemoteMode(String expr, CommandLine cli) throws Exception { String solrUrl = CLIUtils.normalizeSolrUrl(cli); - - if (!cli.hasOption(COLLECTION_OPTION)) { - throw new IllegalStateException( - "You must provide --name COLLECTION with --execution remote parameter."); - } String collection = cli.getOptionValue(COLLECTION_OPTION); - if (expr.toLowerCase(Locale.ROOT).contains("stdin(")) { - throw new IllegalStateException( - "The stdin() expression is only usable with --worker local set up."); - } - return new PushBackStream( new SolrStream(solrUrl + "/solr/" + collection, params("qt", "/stream", "expr", expr))); } From a0624a1964210d1413dda21162099887b49a39eb Mon Sep 17 00:00:00 2001 From: Eric Pugh Date: Thu, 18 Jun 2026 10:03:48 -0400 Subject: [PATCH 4/7] tidy refactor --- solr/core/src/java/org/apache/solr/cli/StreamTool.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cli/StreamTool.java b/solr/core/src/java/org/apache/solr/cli/StreamTool.java index 8af535df58da..07ce211521f9 100644 --- a/solr/core/src/java/org/apache/solr/cli/StreamTool.java +++ b/solr/core/src/java/org/apache/solr/cli/StreamTool.java @@ -38,7 +38,6 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; -import org.apache.solr.client.solrj.io.Lang; import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.StreamComparator; @@ -46,6 +45,7 @@ import org.apache.solr.client.solrj.io.stream.SolrStream; import org.apache.solr.client.solrj.io.stream.StreamContext; import org.apache.solr.client.solrj.io.stream.TupleStream; +import org.apache.solr.client.solrj.io.stream.expr.DefaultStreamFactory; import org.apache.solr.client.solrj.io.stream.expr.Explanation; import org.apache.solr.client.solrj.io.stream.expr.Expressible; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; @@ -274,8 +274,7 @@ public synchronized void close() { StreamContext streamContext = new StreamContext(); streamContext.setSolrClientCache(solrClientCache); - StreamFactory streamFactory = new StreamFactory(); - Lang.register(streamFactory); + StreamFactory streamFactory = new DefaultStreamFactory(); streamFactory.withDefaultSolrConnection(solrConnection); streamContext.setStreamFactory(streamFactory); return streamContext; From a78dbc8544dd140cb4a643632448a32fb5201092 Mon Sep 17 00:00:00 2001 From: Eric Pugh Date: Sun, 21 Jun 2026 14:19:57 -0400 Subject: [PATCH 5/7] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../java/org/apache/solr/cli/StreamTool.java | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cli/StreamTool.java b/solr/core/src/java/org/apache/solr/cli/StreamTool.java index 07ce211521f9..d69d6abb4617 100644 --- a/solr/core/src/java/org/apache/solr/cli/StreamTool.java +++ b/solr/core/src/java/org/apache/solr/cli/StreamTool.java @@ -268,16 +268,21 @@ public synchronized void close() { } }; - var solrConnection = CLIUtils.getSolrConnection(cli); - echoIfVerbose("Connecting to Solr at " + solrConnection); - - StreamContext streamContext = new StreamContext(); - streamContext.setSolrClientCache(solrClientCache); - - StreamFactory streamFactory = new DefaultStreamFactory(); - streamFactory.withDefaultSolrConnection(solrConnection); - streamContext.setStreamFactory(streamFactory); - return streamContext; + try { + var solrConnection = CLIUtils.getSolrConnection(cli); + echoIfVerbose("Connecting to Solr at " + solrConnection); + + StreamContext streamContext = new StreamContext(); + streamContext.setSolrClientCache(solrClientCache); + + StreamFactory streamFactory = new DefaultStreamFactory(); + streamFactory.withDefaultSolrConnection(solrConnection); + streamContext.setStreamFactory(streamFactory); + return streamContext; + } catch (Exception e) { + org.apache.solr.common.util.IOUtils.closeQuietly(solrClientCache); + throw e; + } } /** From 137cee02cc60a25231edaca9a710d09a5f4690ab Mon Sep 17 00:00:00 2001 From: Eric Pugh Date: Sun, 21 Jun 2026 14:20:46 -0400 Subject: [PATCH 6/7] respond to copilot suggestions --- solr/core/src/java/org/apache/solr/cli/StreamTool.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cli/StreamTool.java b/solr/core/src/java/org/apache/solr/cli/StreamTool.java index 07ce211521f9..5da3f694b08b 100644 --- a/solr/core/src/java/org/apache/solr/cli/StreamTool.java +++ b/solr/core/src/java/org/apache/solr/cli/StreamTool.java @@ -173,7 +173,7 @@ public void runImpl(CommandLine cli) throws Exception { } if (expr.toLowerCase(Locale.ROOT).contains("stdin(")) { throw new IllegalStateException( - "The stdin() expression is only usable with --worker local set up."); + "The stdin() expression is only usable with --execution local."); } } @@ -252,10 +252,7 @@ public void runImpl(CommandLine cli) throws Exception { private StreamContext createStreamContext(CommandLine cli) throws Exception { var jettyClientBuilder = new HttpJettySolrClient.Builder(); String credentials = cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION); - if (credentials != null) { - String[] userPass = credentials.split(":"); - jettyClientBuilder.withBasicAuthCredentials(userPass[0], userPass[1]); - } + jettyClientBuilder.withOptionalBasicAuthCredentials(credentials); HttpJettySolrClient client = jettyClientBuilder.build(); // subclass so we can ensure our client is closed when the cache is closed From 0fa897078184e670d0329426bc2878e1433776d2 Mon Sep 17 00:00:00 2001 From: Eric Pugh Date: Sun, 21 Jun 2026 14:32:07 -0400 Subject: [PATCH 7/7] check fix --- solr/core/src/java/org/apache/solr/cli/StreamTool.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/solr/core/src/java/org/apache/solr/cli/StreamTool.java b/solr/core/src/java/org/apache/solr/cli/StreamTool.java index 391f5334069c..31070e7e27b0 100644 --- a/solr/core/src/java/org/apache/solr/cli/StreamTool.java +++ b/solr/core/src/java/org/apache/solr/cli/StreamTool.java @@ -53,6 +53,7 @@ import org.apache.solr.client.solrj.jetty.HttpJettySolrClient; import org.apache.solr.common.SolrException; import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.IOUtils; import org.apache.solr.handler.CatStream; /** Supports stream command in the bin/solr script. */ @@ -277,7 +278,7 @@ public synchronized void close() { streamContext.setStreamFactory(streamFactory); return streamContext; } catch (Exception e) { - org.apache.solr.common.util.IOUtils.closeQuietly(solrClientCache); + IOUtils.closeQuietly(solrClientCache); throw e; } }