Skip to content

Commit

Permalink
SOLR-17419: Introduce ParallelHttpShardHandler (#2681)
Browse files Browse the repository at this point in the history
The default ShardHandler implementation, HttpShardHandler, sends all
shard-requests serially, only parallelizing the waiting and parsing of
responses.  This works great for collections with few shards, but as the
number of shards increases the serialized sending of shard-requests adds
a larger and larger overhead to the overall request (especially when
auth and PKI are done at request-sending time).

This commit fixes this by introducing an alternate ShardHandler
implementation, geared towards collections with many shards.  This
ShardHandler uses an executor to parallelize both request sending and
response waiting/parsing.  This consumes more CPU, but reduces greatly
reduces the latency/QTime observed by users querying many-shard
collections.
  • Loading branch information
gerlowskija authored Sep 6, 2024
1 parent 2d2b061 commit eeeb0e3
Show file tree
Hide file tree
Showing 14 changed files with 329 additions and 227 deletions.
7 changes: 6 additions & 1 deletion solr/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,12 @@ New Features
Improvements
---------------------
* SOLR-17397: SkipExistingDocumentsProcessor now functions correctly with child documents. (Tim Owens via Eric Pugh)
* SOLR-17180: Deprecate snapshotscli.sh in favour of bin/solr snapshot sub commands. Now able to manage Snapshots from the CLI. HDFS module specific snapshot script now ships as part of that module in the modules/hdfs/bin directory. (Eric Pugh)

* SOLR-17180: Deprecate snapshotscli.sh in favour of bin/solr snapshot sub commands. Now able to manage Snapshots from the CLI. HDFS module specific snapshot script now ships as part of that module in the modules/hdfs/bin directory. (Eric Pugh)

* SOLR-17419: An alternate ShardHandlerFactory is now available, ParallelHttpShardHandlerFactory,
which may help reduce distributed-search latency in collections with many shards, especially
when PKI is used between nodes. (Jason Gerlowski)

Optimizations
---------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@
import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.security.AllowListUrlChecker;

/**
* Solr's default {@link ShardHandler} implementation; uses Jetty's async HTTP Client APIs for
* sending requests.
*
* <p>Shard-requests triggered by {@link #submit(ShardRequest, String, ModifiableSolrParams)} will
* be sent synchronously (i.e. before 'submit' returns to the caller). Response waiting and parsing
* happens asynchronously via {@link HttpShardHandlerFactory#commExecutor}. See {@link
* HttpShardHandlerFactory} for details on configuring this executor.
*
* <p>The ideal choice for collections with modest or moderate sharding.
*/
@NotThreadSafe
public class HttpShardHandler extends ShardHandler {
/**
Expand All @@ -59,12 +70,12 @@ public class HttpShardHandler extends ShardHandler {
*/
public static String ONLY_NRT_REPLICAS = "distribOnlyRealtime";

private HttpShardHandlerFactory httpShardHandlerFactory;
private Map<ShardResponse, CompletableFuture<LBSolrClient.Rsp>> responseFutureMap;
private BlockingQueue<ShardResponse> responses;
private AtomicInteger pending;
private Map<String, List<String>> shardToURLs;
private LBHttp2SolrClient lbClient;
protected HttpShardHandlerFactory httpShardHandlerFactory;
protected Map<ShardResponse, CompletableFuture<LBSolrClient.Rsp>> responseFutureMap;
protected BlockingQueue<ShardResponse> responses;
protected AtomicInteger pending;
protected Map<String, List<String>> shardToURLs;
protected LBHttp2SolrClient lbClient;

public HttpShardHandler(HttpShardHandlerFactory httpShardHandlerFactory) {
this.httpShardHandlerFactory = httpShardHandlerFactory;
Expand All @@ -80,7 +91,7 @@ public HttpShardHandler(HttpShardHandlerFactory httpShardHandlerFactory) {
shardToURLs = new HashMap<>();
}

private static class SimpleSolrResponse extends SolrResponse {
public static class SimpleSolrResponse extends SolrResponse {

volatile long elapsedTime;

Expand Down Expand Up @@ -109,7 +120,7 @@ public void setElapsedTime(long elapsedTime) {

// Not thread safe... don't use in Callable.
// Don't modify the returned URL list.
private List<String> getURLs(String shard) {
protected List<String> getURLs(String shard) {
List<String> urls = shardToURLs.get(shard);
if (urls == null) {
urls = httpShardHandlerFactory.buildURLList(shard);
Expand All @@ -118,48 +129,58 @@ private List<String> getURLs(String shard) {
return urls;
}

@Override
public void submit(
final ShardRequest sreq, final String shard, final ModifiableSolrParams params) {
// do this outside of the callable for thread safety reasons
final List<String> urls = getURLs(shard);

protected LBSolrClient.Req prepareLBRequest(
ShardRequest sreq, String shard, ModifiableSolrParams params, List<String> urls) {
params.remove(CommonParams.WT); // use default (currently javabin)
params.remove(CommonParams.VERSION);
QueryRequest req = makeQueryRequest(sreq, params, shard);
req.setMethod(SolrRequest.METHOD.POST);
SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo();
if (requestInfo != null) {
req.setUserPrincipal(requestInfo.getReq().getUserPrincipal());
}

LBSolrClient.Req lbReq = httpShardHandlerFactory.newLBHttpSolrClientReq(req, urls);
return httpShardHandlerFactory.newLBHttpSolrClientReq(req, urls);
}

protected ShardResponse prepareShardResponse(ShardRequest sreq, String shard) {
ShardResponse srsp = new ShardResponse();
if (sreq.nodeName != null) {
srsp.setNodeName(sreq.nodeName);
}
srsp.setShardRequest(sreq);
srsp.setShard(shard);
SimpleSolrResponse ssr = new SimpleSolrResponse();
srsp.setSolrResponse(ssr);

return srsp;
}

protected void recordNoUrlShardResponse(ShardResponse srsp, String shard) {
// TODO: what's the right error code here? We should use the same thing when
// all of the servers for a shard are down.
SolrException exception =
new SolrException(
SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard);
srsp.setException(exception);
srsp.setResponseCode(exception.code());
responses.add(srsp);
}

@Override
public void submit(ShardRequest sreq, String shard, ModifiableSolrParams params) {
// do this outside of the callable for thread safety reasons
final List<String> urls = getURLs(shard);
final var lbReq = prepareLBRequest(sreq, shard, params, urls);
final var srsp = prepareShardResponse(sreq, shard);
final var ssr = new SimpleSolrResponse();
srsp.setSolrResponse(ssr);
pending.incrementAndGet();
// if there are no shards available for a slice, urls.size()==0

if (urls.isEmpty()) {
// TODO: what's the right error code here? We should use the same thing when
// all of the servers for a shard are down.
SolrException exception =
new SolrException(
SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard);
srsp.setException(exception);
srsp.setResponseCode(exception.code());
responses.add(srsp);
recordNoUrlShardResponse(srsp, shard);
return;
}

long startTime = System.nanoTime();
SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo();
if (requestInfo != null) {
req.setUserPrincipal(requestInfo.getReq().getUserPrincipal());
}

CompletableFuture<LBSolrClient.Rsp> future = this.lbClient.requestAsync(lbReq);
future.whenComplete(
(rsp, throwable) -> {
Expand Down Expand Up @@ -196,19 +217,11 @@ protected ShardResponse transfomResponse(
return rsp;
}

/**
* returns a ShardResponse of the last response correlated with a ShardRequest. This won't return
* early if it runs into an error.
*/
@Override
public ShardResponse takeCompletedIncludingErrors() {
return take(false);
}

/**
* returns a ShardResponse of the last response correlated with a ShardRequest, or immediately
* returns a ShardResponse if there was an error detected
*/
@Override
public ShardResponse takeCompletedOrError() {
return take(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Creates {@link HttpShardHandler} instances */
public class HttpShardHandlerFactory extends ShardHandlerFactory
implements org.apache.solr.util.plugin.PluginInfoInitialized, SolrMetricProducer {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
Expand All @@ -79,7 +80,7 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory
// requests at some point (or should we simply return failure?)
//
// This executor is initialized in the init method
private ExecutorService commExecutor;
protected ExecutorService commExecutor;

protected volatile Http2SolrClient defaultClient;
protected InstrumentedHttpListenerFactory httpListenerFactory;
Expand Down Expand Up @@ -195,6 +196,31 @@ private void initReplicaListTransformers(NamedList<?> routingConfig) {
new RequestReplicaListTransformerGenerator(defaultRltFactory, stableRltFactory);
}

/**
* Customizes {@link HttpShardHandler} instances that will be produced by this factory.
*
* <p>Supports the following parameters in {@code info}:
*
* <ul>
* <li>socketTimeout - read timeout for requests, in milliseconds.
* <li>connTimeout - connection timeout for requests, in milliseconds.
* <li>urlScheme - "http" or "https"
* <li>maxConnectionsPerHost - caps the number of concurrent connections per host
* <li>corePoolSize - the initial size of the thread pool used to service requests
* <li>maximumPoolSize - the maximum size of the thread pool used to service requests.
* <li>maxThreadIdleTime - the amount of time (in seconds) that thread pool entries may sit idle
* before being killed
* <li>sizeOfQueue - the size of the queue (if any) used by the thread pool that services
* shard-handler requests
* <li>fairnessPolicy - true if the thread pool should prioritize fairness over throughput,
* false otherwise
* <li>replicaRouting - a NamedList of preferences used to select the order in which replicas
* for a shard will be used by created ShardHandlers
* </ul>
*
* @param info configuration for the created factory, typically reflecting the contents of a
* &lt;shardHandlerFactory&gt; XML tag from solr.xml or solrconfig.xml
*/
@Override
public void init(PluginInfo info) {
StringBuilder sb = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.component;

import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import net.jcip.annotations.NotThreadSafe;
import org.apache.solr.client.solrj.impl.LBSolrClient;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A version of {@link HttpShardHandler} optimized for massively-sharded collections.
*
* <p>Uses a {@link HttpShardHandlerFactory#commExecutor} thread for all work related to outgoing
* requests, allowing {@link #submit(ShardRequest, String, ModifiableSolrParams)} to return more
* quickly. (See {@link HttpShardHandler} for comparison.)
*
* <p>The additional focus on parallelization makes this an ideal implementation for collections
* with many shards.
*/
@NotThreadSafe
public class ParallelHttpShardHandler extends HttpShardHandler {

private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private final ExecutorService commExecutor;

public ParallelHttpShardHandler(ParallelHttpShardHandlerFactory httpShardHandlerFactory) {
super(httpShardHandlerFactory);
this.commExecutor = httpShardHandlerFactory.commExecutor;
}

@Override
public void submit(ShardRequest sreq, String shard, ModifiableSolrParams params) {
// do this outside of the callable for thread safety reasons
final List<String> urls = getURLs(shard);
final var lbReq = prepareLBRequest(sreq, shard, params, urls);
final var srsp = prepareShardResponse(sreq, shard);
final var ssr = new SimpleSolrResponse();
srsp.setSolrResponse(ssr);
pending.incrementAndGet();

if (urls.isEmpty()) {
recordNoUrlShardResponse(srsp, shard);
return;
}

long startTime = System.nanoTime();
final Runnable executeRequestRunnable =
() -> {
CompletableFuture<LBSolrClient.Rsp> future = this.lbClient.requestAsync(lbReq);
future.whenComplete(
(rsp, throwable) -> {
if (rsp != null) {
ssr.nl = rsp.getResponse();
srsp.setShardAddress(rsp.getServer());
ssr.elapsedTime =
TimeUnit.MILLISECONDS.convert(
System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
responses.add(srsp);
} else if (throwable != null) {
ssr.elapsedTime =
TimeUnit.MILLISECONDS.convert(
System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
srsp.setException(throwable);
if (throwable instanceof SolrException) {
srsp.setResponseCode(((SolrException) throwable).code());
}
responses.add(srsp);
}
});
responseFutureMap.put(srsp, future);
};

CompletableFuture.runAsync(executeRequestRunnable, commExecutor);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.component;

/** Creates {@link ParallelHttpShardHandler} instances */
public class ParallelHttpShardHandlerFactory extends HttpShardHandlerFactory {

@Override
public ShardHandler getShardHandler() {
return new ParallelHttpShardHandler(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,53 @@
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;

/**
* Executes, tracks, and awaits all shard-requests made in the course of a distributed request.
*
* <p>New ShardHandler instances are created for each individual distributed request, and should not
* be assumed to be thread-safe.
*/
public abstract class ShardHandler {

/**
* Bootstraps any data structures needed by the ShardHandler to execute or track outgoing
* requests.
*
* @param rb provides access to request and response state.
*/
public abstract void prepDistributed(ResponseBuilder rb);

/**
* Sends a request (represented by <code>sreq</code>) to the specified shard.
*
* <p>The outgoing request may be sent asynchronously. Callers must invoke {@link
* #takeCompletedIncludingErrors()} or {@link #takeCompletedOrError()} to inspect the success or
* failure of requests.
*
* @param sreq metadata about the series of sub-requests that the outgoing request belongs to and
* should be tracked with.
* @param shard URLs for replicas of the receiving shard, delimited by '|' (e.g.
* "http://solr1:8983/solr/foo1|http://solr2:7574/solr/foo2")
* @param params query-parameters set on the outgoing request
*/
public abstract void submit(ShardRequest sreq, String shard, ModifiableSolrParams params);

/**
* returns a ShardResponse of the last response correlated with a ShardRequest. This won't return
* early if it runs into an error.
*/
public abstract ShardResponse takeCompletedIncludingErrors();

// TODO - Shouldn't this method be taking in a ShardRequest? Does ShardHandler not really
// distinguish between different ShardRequest objects as it seems to advertise? What's going on
// here?
/**
* returns a ShardResponse of the last response correlated with a ShardRequest, or immediately
* returns a ShardResponse if there was an error detected
*/
public abstract ShardResponse takeCompletedOrError();

/** Cancels all uncompleted requests managed by this instance */
public abstract void cancelAll();

public abstract ShardHandlerFactory getShardHandlerFactory();
Expand Down
Loading

0 comments on commit eeeb0e3

Please sign in to comment.