From 245956c6efa8a8073784e67e7b6aa06469c1e297 Mon Sep 17 00:00:00 2001
From: Jason Gerlowski <gerlowskija@apache.org>
Date: Fri, 6 Sep 2024 09:01:40 -0700
Subject: [PATCH] SOLR-17419: Introduce ParallelHttpShardHandler (#2681)

The default ShardHandler implementation, HttpShardHandler, sends all
shard-requests serially, only parallelizing the waiting and parsing of
responses.  This works great for collections with few shards, but as the
number of shards increases the serialized sending of shard-requests adds
a larger and larger overhead to the overall request (especially when
auth and PKI are done at request-sending time).

This commit fixes this by introducing an alternate ShardHandler
implementation, geared towards collections with many shards.  This
ShardHandler uses an executor to parallelize both request sending and
response waiting/parsing.  This consumes more CPU, but reduces greatly
reduces the latency/QTime observed by users querying many-shard
collections.
---
 solr/CHANGES.txt                              |   7 +-
 .../handler/component/HttpShardHandler.java   |  90 +++++++++------
 .../component/HttpShardHandlerFactory.java    |  28 ++++-
 .../component/ParallelHttpShardHandler.java   |  97 ++++++++++++++++
 .../ParallelHttpShardHandlerFactory.java      |  26 +++++
 .../solr/handler/component/ShardHandler.java  |  38 ++++++
 ...solr-shardhandler-loadBalancerRequests.xml |   2 +-
 .../solr/core/MockShardHandlerFactory.java    |   4 +
 .../solr/core/TestShardHandlerFactory.java    |  63 ----------
 ...tory.java => TestShardHandlerFactory.java} |  71 ++++++++++--
 solr/server/solr/solr.xml                     |   3 +-
 .../pages/configuring-solr-xml.adoc           |  15 ++-
 .../requesthandlers-searchcomponents.adoc     |   3 +-
 .../pages/solrcloud-distributed-requests.adoc | 108 +-----------------
 14 files changed, 329 insertions(+), 226 deletions(-)
 create mode 100644 solr/core/src/java/org/apache/solr/handler/component/ParallelHttpShardHandler.java
 create mode 100644 solr/core/src/java/org/apache/solr/handler/component/ParallelHttpShardHandlerFactory.java
 delete mode 100644 solr/core/src/test/org/apache/solr/core/TestShardHandlerFactory.java
 rename solr/core/src/test/org/apache/solr/handler/component/{TestHttpShardHandlerFactory.java => TestShardHandlerFactory.java} (66%)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 735d80caa85..3d63679bf9f 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -12,7 +12,12 @@ New Features
 Improvements
 ---------------------
 * SOLR-17397: SkipExistingDocumentsProcessor now functions correctly with child documents.  (Tim Owens via Eric Pugh)
-* SOLR-17180: Deprecate snapshotscli.sh in favour of bin/solr snapshot sub commands.  Now able to manage Snapshots from the CLI.  HDFS module specific snapshot script now ships as part of that module in the modules/hdfs/bin directory. (Eric Pugh)  
+
+* SOLR-17180: Deprecate snapshotscli.sh in favour of bin/solr snapshot sub commands.  Now able to manage Snapshots from the CLI.  HDFS module specific snapshot script now ships as part of that module in the modules/hdfs/bin directory. (Eric Pugh)
+
+* SOLR-17419: An alternate ShardHandlerFactory is now available, ParallelHttpShardHandlerFactory,
+  which may help reduce distributed-search latency in collections with many shards, especially
+  when PKI is used between nodes. (Jason Gerlowski)
 
 Optimizations
 ---------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
index 5074d7b36e8..3bc1c542906 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
@@ -47,6 +47,17 @@
 import org.apache.solr.request.SolrRequestInfo;
 import org.apache.solr.security.AllowListUrlChecker;
 
+/**
+ * Solr's default {@link ShardHandler} implementation; uses Jetty's async HTTP Client APIs for
+ * sending requests.
+ *
+ * <p>Shard-requests triggered by {@link #submit(ShardRequest, String, ModifiableSolrParams)} will
+ * be sent synchronously (i.e. before 'submit' returns to the caller). Response waiting and parsing
+ * happens asynchronously via {@link HttpShardHandlerFactory#commExecutor}. See {@link
+ * HttpShardHandlerFactory} for details on configuring this executor.
+ *
+ * <p>The ideal choice for collections with modest or moderate sharding.
+ */
 @NotThreadSafe
 public class HttpShardHandler extends ShardHandler {
   /**
@@ -59,12 +70,12 @@ public class HttpShardHandler extends ShardHandler {
    */
   public static String ONLY_NRT_REPLICAS = "distribOnlyRealtime";
 
-  private HttpShardHandlerFactory httpShardHandlerFactory;
-  private Map<ShardResponse, CompletableFuture<LBSolrClient.Rsp>> responseFutureMap;
-  private BlockingQueue<ShardResponse> responses;
-  private AtomicInteger pending;
-  private Map<String, List<String>> shardToURLs;
-  private LBHttp2SolrClient lbClient;
+  protected HttpShardHandlerFactory httpShardHandlerFactory;
+  protected Map<ShardResponse, CompletableFuture<LBSolrClient.Rsp>> responseFutureMap;
+  protected BlockingQueue<ShardResponse> responses;
+  protected AtomicInteger pending;
+  protected Map<String, List<String>> shardToURLs;
+  protected LBHttp2SolrClient lbClient;
 
   public HttpShardHandler(HttpShardHandlerFactory httpShardHandlerFactory) {
     this.httpShardHandlerFactory = httpShardHandlerFactory;
@@ -80,7 +91,7 @@ public HttpShardHandler(HttpShardHandlerFactory httpShardHandlerFactory) {
     shardToURLs = new HashMap<>();
   }
 
-  private static class SimpleSolrResponse extends SolrResponse {
+  public static class SimpleSolrResponse extends SolrResponse {
 
     volatile long elapsedTime;
 
@@ -109,7 +120,7 @@ public void setElapsedTime(long elapsedTime) {
 
   // Not thread safe... don't use in Callable.
   // Don't modify the returned URL list.
-  private List<String> getURLs(String shard) {
+  protected List<String> getURLs(String shard) {
     List<String> urls = shardToURLs.get(shard);
     if (urls == null) {
       urls = httpShardHandlerFactory.buildURLList(shard);
@@ -118,47 +129,58 @@ private List<String> getURLs(String shard) {
     return urls;
   }
 
-  @Override
-  public void submit(
-      final ShardRequest sreq, final String shard, final ModifiableSolrParams params) {
-    // do this outside of the callable for thread safety reasons
-    final List<String> urls = getURLs(shard);
+  protected LBSolrClient.Req prepareLBRequest(
+      ShardRequest sreq, String shard, ModifiableSolrParams params, List<String> urls) {
     params.remove(CommonParams.WT); // use default (currently javabin)
     params.remove(CommonParams.VERSION);
     QueryRequest req = makeQueryRequest(sreq, params, shard);
     req.setMethod(SolrRequest.METHOD.POST);
+    SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo();
+    if (requestInfo != null) {
+      req.setUserPrincipal(requestInfo.getReq().getUserPrincipal());
+    }
 
-    LBSolrClient.Req lbReq = httpShardHandlerFactory.newLBHttpSolrClientReq(req, urls);
+    return httpShardHandlerFactory.newLBHttpSolrClientReq(req, urls);
+  }
 
+  protected ShardResponse prepareShardResponse(ShardRequest sreq, String shard) {
     ShardResponse srsp = new ShardResponse();
     if (sreq.nodeName != null) {
       srsp.setNodeName(sreq.nodeName);
     }
     srsp.setShardRequest(sreq);
     srsp.setShard(shard);
-    SimpleSolrResponse ssr = new SimpleSolrResponse();
-    srsp.setSolrResponse(ssr);
 
+    return srsp;
+  }
+
+  protected void recordNoUrlShardResponse(ShardResponse srsp, String shard) {
+    // TODO: what's the right error code here? We should use the same thing when
+    // all of the servers for a shard are down.
+    SolrException exception =
+        new SolrException(
+            SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard);
+    srsp.setException(exception);
+    srsp.setResponseCode(exception.code());
+    responses.add(srsp);
+  }
+
+  @Override
+  public void submit(ShardRequest sreq, String shard, ModifiableSolrParams params) {
+    // do this outside of the callable for thread safety reasons
+    final List<String> urls = getURLs(shard);
+    final var lbReq = prepareLBRequest(sreq, shard, params, urls);
+    final var srsp = prepareShardResponse(sreq, shard);
+    final var ssr = new SimpleSolrResponse();
+    srsp.setSolrResponse(ssr);
     pending.incrementAndGet();
-    // if there are no shards available for a slice, urls.size()==0
+
     if (urls.isEmpty()) {
-      // TODO: what's the right error code here? We should use the same thing when
-      // all of the servers for a shard are down.
-      SolrException exception =
-          new SolrException(
-              SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard);
-      srsp.setException(exception);
-      srsp.setResponseCode(exception.code());
-      responses.add(srsp);
+      recordNoUrlShardResponse(srsp, shard);
       return;
     }
 
     long startTime = System.nanoTime();
-    SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo();
-    if (requestInfo != null) {
-      req.setUserPrincipal(requestInfo.getReq().getUserPrincipal());
-    }
-
     CompletableFuture<LBSolrClient.Rsp> future = this.lbClient.requestAsync(lbReq);
     future.whenComplete(
         (rsp, throwable) -> {
@@ -195,19 +217,11 @@ protected ShardResponse transfomResponse(
     return rsp;
   }
 
-  /**
-   * returns a ShardResponse of the last response correlated with a ShardRequest. This won't return
-   * early if it runs into an error.
-   */
   @Override
   public ShardResponse takeCompletedIncludingErrors() {
     return take(false);
   }
 
-  /**
-   * returns a ShardResponse of the last response correlated with a ShardRequest, or immediately
-   * returns a ShardResponse if there was an error detected
-   */
   @Override
   public ShardResponse takeCompletedOrError() {
     return take(true);
diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
index 79e0ad48681..990e3312deb 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
@@ -65,6 +65,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/** Creates {@link HttpShardHandler} instances */
 public class HttpShardHandlerFactory extends ShardHandlerFactory
     implements org.apache.solr.util.plugin.PluginInfoInitialized, SolrMetricProducer {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -78,7 +79,7 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory
   // requests at some point (or should we simply return failure?)
   //
   // This executor is initialized in the init method
-  private ExecutorService commExecutor;
+  protected ExecutorService commExecutor;
 
   protected volatile Http2SolrClient defaultClient;
   protected InstrumentedHttpListenerFactory httpListenerFactory;
@@ -194,6 +195,31 @@ private void initReplicaListTransformers(NamedList<?> routingConfig) {
         new RequestReplicaListTransformerGenerator(defaultRltFactory, stableRltFactory);
   }
 
+  /**
+   * Customizes {@link HttpShardHandler} instances that will be produced by this factory.
+   *
+   * <p>Supports the following parameters in {@code info}:
+   *
+   * <ul>
+   *   <li>socketTimeout - read timeout for requests, in milliseconds.
+   *   <li>connTimeout - connection timeout for requests, in milliseconds.
+   *   <li>urlScheme - "http" or "https"
+   *   <li>maxConnectionsPerHost - caps the number of concurrent connections per host
+   *   <li>corePoolSize - the initial size of the thread pool used to service requests
+   *   <li>maximumPoolSize - the maximum size of the thread pool used to service requests.
+   *   <li>maxThreadIdleTime - the amount of time (in seconds) that thread pool entries may sit idle
+   *       before being killed
+   *   <li>sizeOfQueue - the size of the queue (if any) used by the thread pool that services
+   *       shard-handler requests
+   *   <li>fairnessPolicy - true if the thread pool should prioritize fairness over throughput,
+   *       false otherwise
+   *   <li>replicaRouting - a NamedList of preferences used to select the order in which replicas
+   *       for a shard will be used by created ShardHandlers
+   * </ul>
+   *
+   * @param info configuration for the created factory, typically reflecting the contents of a
+   *     &lt;shardHandlerFactory&gt; XML tag from solr.xml or solrconfig.xml
+   */
   @Override
   public void init(PluginInfo info) {
     StringBuilder sb = new StringBuilder();
diff --git a/solr/core/src/java/org/apache/solr/handler/component/ParallelHttpShardHandler.java b/solr/core/src/java/org/apache/solr/handler/component/ParallelHttpShardHandler.java
new file mode 100644
index 00000000000..4a8dd5e3267
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/component/ParallelHttpShardHandler.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.component;
+
+import java.lang.invoke.MethodHandles;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import net.jcip.annotations.NotThreadSafe;
+import org.apache.solr.client.solrj.impl.LBSolrClient;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A version of {@link HttpShardHandler} optimized for massively-sharded collections.
+ *
+ * <p>Uses a {@link HttpShardHandlerFactory#commExecutor} thread for all work related to outgoing
+ * requests, allowing {@link #submit(ShardRequest, String, ModifiableSolrParams)} to return more
+ * quickly. (See {@link HttpShardHandler} for comparison.)
+ *
+ * <p>The additional focus on parallelization makes this an ideal implementation for collections
+ * with many shards.
+ */
+@NotThreadSafe
+public class ParallelHttpShardHandler extends HttpShardHandler {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final ExecutorService commExecutor;
+
+  public ParallelHttpShardHandler(ParallelHttpShardHandlerFactory httpShardHandlerFactory) {
+    super(httpShardHandlerFactory);
+    this.commExecutor = httpShardHandlerFactory.commExecutor;
+  }
+
+  @Override
+  public void submit(ShardRequest sreq, String shard, ModifiableSolrParams params) {
+    // do this outside of the callable for thread safety reasons
+    final List<String> urls = getURLs(shard);
+    final var lbReq = prepareLBRequest(sreq, shard, params, urls);
+    final var srsp = prepareShardResponse(sreq, shard);
+    final var ssr = new SimpleSolrResponse();
+    srsp.setSolrResponse(ssr);
+    pending.incrementAndGet();
+
+    if (urls.isEmpty()) {
+      recordNoUrlShardResponse(srsp, shard);
+      return;
+    }
+
+    long startTime = System.nanoTime();
+    final Runnable executeRequestRunnable =
+        () -> {
+          CompletableFuture<LBSolrClient.Rsp> future = this.lbClient.requestAsync(lbReq);
+          future.whenComplete(
+              (rsp, throwable) -> {
+                if (rsp != null) {
+                  ssr.nl = rsp.getResponse();
+                  srsp.setShardAddress(rsp.getServer());
+                  ssr.elapsedTime =
+                      TimeUnit.MILLISECONDS.convert(
+                          System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+                  responses.add(srsp);
+                } else if (throwable != null) {
+                  ssr.elapsedTime =
+                      TimeUnit.MILLISECONDS.convert(
+                          System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+                  srsp.setException(throwable);
+                  if (throwable instanceof SolrException) {
+                    srsp.setResponseCode(((SolrException) throwable).code());
+                  }
+                  responses.add(srsp);
+                }
+              });
+          responseFutureMap.put(srsp, future);
+        };
+
+    CompletableFuture.runAsync(executeRequestRunnable, commExecutor);
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/handler/component/ParallelHttpShardHandlerFactory.java b/solr/core/src/java/org/apache/solr/handler/component/ParallelHttpShardHandlerFactory.java
new file mode 100644
index 00000000000..38b2cb9a974
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/component/ParallelHttpShardHandlerFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.component;
+
+/** Creates {@link ParallelHttpShardHandler} instances */
+public class ParallelHttpShardHandlerFactory extends HttpShardHandlerFactory {
+
+  @Override
+  public ShardHandler getShardHandler() {
+    return new ParallelHttpShardHandler(this);
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/handler/component/ShardHandler.java b/solr/core/src/java/org/apache/solr/handler/component/ShardHandler.java
index b63613f91ae..2717bc47845 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/ShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/ShardHandler.java
@@ -23,15 +23,53 @@
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.ShardParams;
 
+/**
+ * Executes, tracks, and awaits all shard-requests made in the course of a distributed request.
+ *
+ * <p>New ShardHandler instances are created for each individual distributed request, and should not
+ * be assumed to be thread-safe.
+ */
 public abstract class ShardHandler {
+
+  /**
+   * Bootstraps any data structures needed by the ShardHandler to execute or track outgoing
+   * requests.
+   *
+   * @param rb provides access to request and response state.
+   */
   public abstract void prepDistributed(ResponseBuilder rb);
 
+  /**
+   * Sends a request (represented by <code>sreq</code>) to the specified shard.
+   *
+   * <p>The outgoing request may be sent asynchronously. Callers must invoke {@link
+   * #takeCompletedIncludingErrors()} or {@link #takeCompletedOrError()} to inspect the success or
+   * failure of requests.
+   *
+   * @param sreq metadata about the series of sub-requests that the outgoing request belongs to and
+   *     should be tracked with.
+   * @param shard URLs for replicas of the receiving shard, delimited by '|' (e.g.
+   *     "http://solr1:8983/solr/foo1|http://solr2:7574/solr/foo2")
+   * @param params query-parameters set on the outgoing request
+   */
   public abstract void submit(ShardRequest sreq, String shard, ModifiableSolrParams params);
 
+  /**
+   * returns a ShardResponse of the last response correlated with a ShardRequest. This won't return
+   * early if it runs into an error.
+   */
   public abstract ShardResponse takeCompletedIncludingErrors();
 
+  // TODO - Shouldn't this method be taking in a ShardRequest?  Does ShardHandler not really
+  // distinguish between different ShardRequest objects as it seems to advertise? What's going on
+  // here?
+  /**
+   * returns a ShardResponse of the last response correlated with a ShardRequest, or immediately
+   * returns a ShardResponse if there was an error detected
+   */
   public abstract ShardResponse takeCompletedOrError();
 
+  /** Cancels all uncompleted requests managed by this instance */
   public abstract void cancelAll();
 
   public abstract ShardHandlerFactory getShardHandlerFactory();
diff --git a/solr/core/src/test-files/solr/solr-shardhandler-loadBalancerRequests.xml b/solr/core/src/test-files/solr/solr-shardhandler-loadBalancerRequests.xml
index 92339d9befb..473a8b3c274 100644
--- a/solr/core/src/test-files/solr/solr-shardhandler-loadBalancerRequests.xml
+++ b/solr/core/src/test-files/solr/solr-shardhandler-loadBalancerRequests.xml
@@ -16,7 +16,7 @@
  limitations under the License.
 -->
 <solr>
-  <shardHandlerFactory name="shardHandlerFactory" class="solr.HttpShardHandlerFactory">
+  <shardHandlerFactory name="shardHandlerFactory" class="${solr.tests.defaultShardHandlerFactory:solr.HttpShardHandlerFactory}">
     <int   name="loadBalancerRequestsMinimumAbsolute">${solr.tests.loadBalancerRequestsMinimumAbsolute:0}</int>
     <float name="loadBalancerRequestsMaximumFraction">${solr.tests.loadBalancerRequestsMaximumFraction:1.0}</float>
   </shardHandlerFactory>
diff --git a/solr/core/src/test/org/apache/solr/core/MockShardHandlerFactory.java b/solr/core/src/test/org/apache/solr/core/MockShardHandlerFactory.java
index 299f751393e..e18865b18e3 100644
--- a/solr/core/src/test/org/apache/solr/core/MockShardHandlerFactory.java
+++ b/solr/core/src/test/org/apache/solr/core/MockShardHandlerFactory.java
@@ -29,6 +29,10 @@
 public class MockShardHandlerFactory extends ShardHandlerFactory implements PluginInfoInitialized {
   NamedList<?> args;
 
+  public NamedList<?> getArgs() {
+    return args;
+  }
+
   @Override
   public void init(PluginInfo info) {
     args = info.initArgs;
diff --git a/solr/core/src/test/org/apache/solr/core/TestShardHandlerFactory.java b/solr/core/src/test/org/apache/solr/core/TestShardHandlerFactory.java
deleted file mode 100644
index 7c9f5ef0702..00000000000
--- a/solr/core/src/test/org/apache/solr/core/TestShardHandlerFactory.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.core;
-
-import java.nio.file.Path;
-import org.apache.solr.SolrTestCaseJ4;
-import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.ShardParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.handler.component.ShardHandler;
-import org.apache.solr.handler.component.ShardHandlerFactory;
-
-/** Tests specifying a custom ShardHandlerFactory */
-public class TestShardHandlerFactory extends SolrTestCaseJ4 {
-
-  public void testXML() {
-    Path home = TEST_PATH();
-    CoreContainer cc = CoreContainer.createAndLoad(home, home.resolve("solr-shardhandler.xml"));
-    ShardHandlerFactory factory = cc.getShardHandlerFactory();
-    assertTrue(factory instanceof MockShardHandlerFactory);
-    NamedList<?> args = ((MockShardHandlerFactory) factory).args;
-    assertEquals("myMagicRequiredValue", args.get("myMagicRequiredParameter"));
-    factory.close();
-    cc.shutdown();
-  }
-
-  /** Test {@link ShardHandler#setShardAttributesToParams} */
-  public void testSetShardAttributesToParams() {
-    // NOTE: the value of this test is really questionable; we should feel free to remove it
-    ModifiableSolrParams modifiable = new ModifiableSolrParams();
-    var dummyIndent = "Dummy-Indent";
-
-    modifiable.set(ShardParams.SHARDS, "dummyValue");
-    modifiable.set(CommonParams.HEADER_ECHO_PARAMS, "dummyValue");
-    modifiable.set(CommonParams.INDENT, dummyIndent);
-
-    ShardHandler.setShardAttributesToParams(modifiable, 2);
-
-    assertEquals(Boolean.FALSE.toString(), modifiable.get(CommonParams.DISTRIB));
-    assertEquals("2", modifiable.get(ShardParams.SHARDS_PURPOSE));
-    assertEquals(Boolean.FALSE.toString(), modifiable.get(CommonParams.OMIT_HEADER));
-    assertEquals(Boolean.TRUE.toString(), modifiable.get(ShardParams.IS_SHARD));
-
-    assertNull(modifiable.get(CommonParams.HEADER_ECHO_PARAMS));
-    assertNull(modifiable.get(ShardParams.SHARDS));
-    assertNull(modifiable.get(CommonParams.INDENT));
-  }
-}
diff --git a/solr/core/src/test/org/apache/solr/handler/component/TestHttpShardHandlerFactory.java b/solr/core/src/test/org/apache/solr/handler/component/TestShardHandlerFactory.java
similarity index 66%
rename from solr/core/src/test/org/apache/solr/handler/component/TestHttpShardHandlerFactory.java
rename to solr/core/src/test/org/apache/solr/handler/component/TestShardHandlerFactory.java
index c17a7de012e..cbee25f24a6 100644
--- a/solr/core/src/test/org/apache/solr/handler/component/TestHttpShardHandlerFactory.java
+++ b/solr/core/src/test/org/apache/solr/handler/component/TestShardHandlerFactory.java
@@ -18,6 +18,7 @@
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 
 import java.nio.file.Path;
@@ -31,41 +32,52 @@
 import org.apache.solr.client.solrj.impl.LBSolrClient;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.ShardParams;
+import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.CoreContainer;
-import org.junit.AfterClass;
+import org.apache.solr.core.MockShardHandlerFactory;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-/** Tests specifying a custom ShardHandlerFactory */
-public class TestHttpShardHandlerFactory extends SolrTestCaseJ4 {
+/** Tests exercising Solr's two "out-of-the-box" ShardHandlerFactory implementations */
+public class TestShardHandlerFactory extends SolrTestCaseJ4 {
+
+  private static final String[] SHARD_HANDLER_FACTORY_IMPLEMENTATIONS =
+      new String[] {
+        HttpShardHandlerFactory.class.getName(), ParallelHttpShardHandlerFactory.class.getName()
+      };
 
   private static final String LOAD_BALANCER_REQUESTS_MIN_ABSOLUTE =
       "solr.tests.loadBalancerRequestsMinimumAbsolute";
   private static final String LOAD_BALANCER_REQUESTS_MAX_FRACTION =
       "solr.tests.loadBalancerRequestsMaximumFraction";
+  private static final String SHARD_HANDLER_FACTORY_PROPERTY =
+      "solr.tests.defaultShardHandlerFactory";
 
   private static int expectedLoadBalancerRequestsMinimumAbsolute = 0;
   private static float expectedLoadBalancerRequestsMaximumFraction = 1.0f;
+  private static String expectedShardHandlerFactory;
 
   @BeforeClass
   public static void beforeTests() {
     expectedLoadBalancerRequestsMinimumAbsolute = random().nextInt(3); // 0 .. 2
     expectedLoadBalancerRequestsMaximumFraction = (1 + random().nextInt(10)) / 10f; // 0.1 .. 1.0
+    expectedShardHandlerFactory =
+        SHARD_HANDLER_FACTORY_IMPLEMENTATIONS[
+            random().nextInt(SHARD_HANDLER_FACTORY_IMPLEMENTATIONS.length)];
     System.setProperty(
         LOAD_BALANCER_REQUESTS_MIN_ABSOLUTE,
         Integer.toString(expectedLoadBalancerRequestsMinimumAbsolute));
     System.setProperty(
         LOAD_BALANCER_REQUESTS_MAX_FRACTION,
         Float.toString(expectedLoadBalancerRequestsMaximumFraction));
+    System.setProperty(SHARD_HANDLER_FACTORY_PROPERTY, expectedShardHandlerFactory);
   }
 
-  @AfterClass
-  public static void afterTests() {
-    System.clearProperty(LOAD_BALANCER_REQUESTS_MIN_ABSOLUTE);
-    System.clearProperty(LOAD_BALANCER_REQUESTS_MAX_FRACTION);
-  }
-
-  public void testLoadBalancerRequestsMinMax() {
+  @Test
+  public void testLoadBalancerRequestsMinMax() throws ClassNotFoundException {
     final Path home = TEST_PATH();
     CoreContainer cc = null;
     ShardHandlerFactory factory = null;
@@ -75,8 +87,8 @@ public void testLoadBalancerRequestsMinMax() {
               home, home.resolve("solr-shardhandler-loadBalancerRequests.xml"));
       factory = cc.getShardHandlerFactory();
 
-      // test that factory is HttpShardHandlerFactory with expected url reserve fraction
-      assertTrue(factory instanceof HttpShardHandlerFactory);
+      assertThat(factory, instanceOf(Class.forName(expectedShardHandlerFactory)));
+      // All SHF's currently extend HttpShardFactory, so this case is safe
       @SuppressWarnings("resource")
       final HttpShardHandlerFactory httpShardHandlerFactory = ((HttpShardHandlerFactory) factory);
       assertEquals(
@@ -154,4 +166,39 @@ public void testLiveNodesToHostUrl() {
     assertThat(hostSet, hasItem("1.2.3.4:9000"));
     assertThat(hostSet, hasItem("1.2.3.4:9001"));
   }
+
+  @Test
+  public void testXML() {
+    Path home = TEST_PATH();
+    CoreContainer cc = CoreContainer.createAndLoad(home, home.resolve("solr-shardhandler.xml"));
+    ShardHandlerFactory factory = cc.getShardHandlerFactory();
+    assertTrue(factory instanceof MockShardHandlerFactory);
+    NamedList<?> args = ((MockShardHandlerFactory) factory).getArgs();
+    assertEquals("myMagicRequiredValue", args.get("myMagicRequiredParameter"));
+    factory.close();
+    cc.shutdown();
+  }
+
+  /** Test {@link ShardHandler#setShardAttributesToParams} */
+  @Test
+  public void testSetShardAttributesToParams() {
+    // NOTE: the value of this test is really questionable; we should feel free to remove it
+    ModifiableSolrParams modifiable = new ModifiableSolrParams();
+    var dummyIndent = "Dummy-Indent";
+
+    modifiable.set(ShardParams.SHARDS, "dummyValue");
+    modifiable.set(CommonParams.HEADER_ECHO_PARAMS, "dummyValue");
+    modifiable.set(CommonParams.INDENT, dummyIndent);
+
+    ShardHandler.setShardAttributesToParams(modifiable, 2);
+
+    assertEquals(Boolean.FALSE.toString(), modifiable.get(CommonParams.DISTRIB));
+    assertEquals("2", modifiable.get(ShardParams.SHARDS_PURPOSE));
+    assertEquals(Boolean.FALSE.toString(), modifiable.get(CommonParams.OMIT_HEADER));
+    assertEquals(Boolean.TRUE.toString(), modifiable.get(ShardParams.IS_SHARD));
+
+    assertNull(modifiable.get(CommonParams.HEADER_ECHO_PARAMS));
+    assertNull(modifiable.get(ShardParams.SHARDS));
+    assertNull(modifiable.get(CommonParams.INDENT));
+  }
 }
diff --git a/solr/server/solr/solr.xml b/solr/server/solr/solr.xml
index afcd6eaa238..ac57111374a 100644
--- a/solr/server/solr/solr.xml
+++ b/solr/server/solr/solr.xml
@@ -56,8 +56,7 @@
 
   </solrcloud>
 
-  <shardHandlerFactory name="shardHandlerFactory"
-    class="HttpShardHandlerFactory">
+  <shardHandlerFactory name="shardHandlerFactory" class="${defaultShardHandlerFactory:HttpShardHandlerFactory}">
     <int name="socketTimeout">${socketTimeout:600000}</int>
     <int name="connTimeout">${connTimeout:60000}</int>
   </shardHandlerFactory>
diff --git a/solr/solr-ref-guide/modules/configuration-guide/pages/configuring-solr-xml.adoc b/solr/solr-ref-guide/modules/configuration-guide/pages/configuring-solr-xml.adoc
index 6dff9824a73..85e11562135 100644
--- a/solr/solr-ref-guide/modules/configuration-guide/pages/configuring-solr-xml.adoc
+++ b/solr/solr-ref-guide/modules/configuration-guide/pages/configuring-solr-xml.adoc
@@ -551,15 +551,22 @@ For example when using Log4j one might specify DEBUG, WARN, INFO, etc.
 
 === The <shardHandlerFactory> Element
 
-Custom shard handlers can be defined in `solr.xml` if you wish to create a custom shard handler.
+Solr uses "Shard Handlers" to send and track the inter-node requests made internally to process a distributed search or other request.
+A factory, configured via the `<shardHandlerFactory>` element, is used to create new Shard Handlers as needed.
+The factory defined here will be used throughout Solr, unless overridden by particular requestHandler's in solrconfig.xml.
+
+Two factory implementations are available, each creating a corresponding Shard Handler.
+The default, `HttpShardHandlerFactory`, serves as the best option for most deployments.
+However some deployments, especially those using authentication or with massively sharded collections, may benefit from the additional parallelization offered by `ParallelHttpShardHandlerFactory`.
+
+Custom shard handlers are also supported and should be referenced in `solr.xml` by their fully-qualified class name:
 
 [source,xml]
 ----
-<shardHandlerFactory name="ShardHandlerFactory" class="qualified.class.name">
+<shardHandlerFactory name="ShardHandlerFactory" class="qualified.class.name"/>
 ----
 
-Since this is a custom shard handler, sub-elements are specific to the implementation.
-The default and only shard handler provided by Solr is the `HttpShardHandlerFactory` in which case, the following sub-elements can be specified:
+Sub-elements of `<shardHandlerFactory>` may vary in the case of custom shard handlers, but both `HttpShardHandlerFactory` and `ParallelShardHandlerFactory` support the following configuration options:
 
 `socketTimeout`::
 +
diff --git a/solr/solr-ref-guide/modules/configuration-guide/pages/requesthandlers-searchcomponents.adoc b/solr/solr-ref-guide/modules/configuration-guide/pages/requesthandlers-searchcomponents.adoc
index 08bf0d27f31..7ba738044ee 100644
--- a/solr/solr-ref-guide/modules/configuration-guide/pages/requesthandlers-searchcomponents.adoc
+++ b/solr/solr-ref-guide/modules/configuration-guide/pages/requesthandlers-searchcomponents.adoc
@@ -212,7 +212,8 @@ All the blocks are optional, especially since parameters can also be provided wi
 The defaults/appends/invariants blocks were described earlier in <<defaults-appends-and-invariants>>.
 All query parameters can be defined as parameters for any of the Search Handlers.
 
-The Search Components blocks are described next, and xref:deployment-guide:solrcloud-distributed-requests.adoc#configuring-the-shardhandlerfactory[shardHandlerFactory] is for fine-tuning of the SolrCloud distributed requests.
+The `shardHandlerFactory` section can be used to provide fine-grained control of how this SearchHandler makes requests to other shards and replicas in a SolrCloud collection.
+See the xref:configuration-guide:configuring-solr-xml.adoc#the-shardhandlerfactory-element[ShardHandler documentation here] for more details.
 
 === Defining Search Components
 The search components themselves are defined outside of the Request Handlers and then are referenced from various Search Handlers that want to use them.
diff --git a/solr/solr-ref-guide/modules/deployment-guide/pages/solrcloud-distributed-requests.adoc b/solr/solr-ref-guide/modules/deployment-guide/pages/solrcloud-distributed-requests.adoc
index 0f2cdb06087..72053e2f219 100644
--- a/solr/solr-ref-guide/modules/deployment-guide/pages/solrcloud-distributed-requests.adoc
+++ b/solr/solr-ref-guide/modules/deployment-guide/pages/solrcloud-distributed-requests.adoc
@@ -313,110 +313,12 @@ For extremely short autoCommit intervals, consider disabling caching and autowar
 
 == Configuring the ShardHandlerFactory
 
-For finer-grained control, you can directly configure and tune aspects of the concurrency and thread-pooling used within distributed search in Solr.
-The default configuration favors throughput over latency.
-
-This is done by defining a `shardHandlerFactory` in the configuration for your search handler.
-
-To add a `shardHandlerFactory` to the standard search handler, provide a configuration in `solrconfig.xml`, as in this example:
-
-[source,xml]
-----
-<requestHandler name="/select" class="solr.SearchHandler">
-  <!-- other params go here -->
-  <shardHandlerFactory class="HttpShardHandlerFactory">
-    <int name="socketTimeout">1000</int>
-    <int name="connTimeout">5000</int>
-  </shardHandlerFactory>
-</requestHandler>
-----
-
-`HttpShardHandlerFactory` is the only `ShardHandlerFactory` implementation included out of the box with Solr.
-
-NOTE:: The `shardHandlerFactory` is reliant on the `allowUrls` parameter configured in `solr.xml`, which controls which nodes are allowed to talk to each other.
-This means that the configuration of hosts is global instead of per-core or per-collection.
-See the section xref:configuration-guide:configuring-solr-xml.adoc#allow-urls [allowUrls] for details.
-
-The `HttpShardHandlerFactory` accepts the following parameters:
-
-`socketTimeout`::
-+
-[%autowidth,frame=none]
-|===
-|Optional |Default: `0`
-|===
-+
-The amount of time in milliseconds that a socket is allowed to wait.
-The default is `0`, where the operating system's default will be used.
-
-`connTimeout`::
-+
-[%autowidth,frame=none]
-|===
-|Optional |Default: `0`
-|===
-+
-The amount of time in milliseconds that is accepted for binding / connecting a socket.
-The default is `0`, where the operating system's default will be used.
-
-`maxConnectionsPerHost`::
-+
-[%autowidth,frame=none]
-|===
-|Optional |Default: `100000`
-|===
-+
-The maximum number of concurrent connections that is made to each individual shard in a distributed search.
-
-`corePoolSize`::
-+
-[%autowidth,frame=none]
-|===
-|Optional |Default: `0`
-|===
-+
-The retained lowest limit on the number of threads used in coordinating distributed search.
-
-`maximumPoolSize`::
-+
-[%autowidth,frame=none]
-|===
-|Optional |Default: `Integer.MAX_VALUE`
-|===
-+
-The maximum number of threads used for coordinating distributed search.
-
-`maxThreadIdleTime`::
-+
-[%autowidth,frame=none]
-|===
-|Optional |Default: `5`
-|===
-+
-The amount of time in seconds to wait for before threads are scaled back in response to a reduction in load.
-
-`sizeOfQueue`::
-+
-[%autowidth,frame=none]
-|===
-|Optional |Default: `-1`
-|===
-+
-If specified, the thread pool will use a backing queue instead of a direct handoff buffer.
-High throughput systems will want to configure this to be a direct hand off (with `-1`).
-Systems that desire better latency will want to configure a reasonable size of queue to handle variations in requests.
-
-`fairnessPolicy`::
-+
-[%autowidth,frame=none]
-|===
-|Optional |Default: `false`
-|===
-+
-Chooses the JVM specifics dealing with fair policy queuing.
-If enabled distributed searches will be handled in a first-in-first-out fashion at a cost to throughput.
-If disabled throughput will be favored over latency.
+Administrators who want fine-grained control over the concurrency and thread-pooling used in performing distributed-search may define a `shardHandlerFactory` in their SearchHandler configuration.
+The default configuration, `HttpShardHandlerFactory`, favors throughput over latency.
+An alternate implementation, `ParallelShardHandlerFactory`, is also available and may be preferable for collections with many shards.
 
+With either implementation, a number of other `shardHandlerFactory` settings (thread-pool sizes, network timeouts, etc.) are available to administrators who wish to further tune distributed-search behavior.
+See the xref:configuration-guide:configuring-solr-xml.adoc#the-shardhandlerfactory-element[ShardHandler documentation here] for more details.
 
 [[distributedidf]]
 == Distributed Inverse Document Frequency (IDF)