Skip to content

Commit

Permalink
Introduce ability to minimize round-trips in CCS (#37828)
Browse files Browse the repository at this point in the history
With #37566 we have introduced the ability to merge multiple search responses into one. That makes it possible to expose a new way of executing cross-cluster search requests, that makes CCS much faster whenever there is network latency between the CCS coordinating node and the remote clusters. The coordinating node can now send a single search request to each remote cluster, which gets reduced by each one of them. from + size results are requested to each cluster, and the reduce phase in each cluster is non final (meaning that buckets are not pruned and pipeline aggs are not executed). The CCS coordinating node performs an additional, final reduction, which produces one search response out of the multiple responses received from the different clusters.

This new execution path will be activated by default for any CCS request unless a scroll is provided or inner hits are requested as part of field collapsing. The search API accepts now a new parameter called ccs_minimize_roundtrips that allows to opt-out of the default behaviour.

Relates to #32125
  • Loading branch information
javanna authored Jan 31, 2019
1 parent ae9f4df commit 622fb78
Show file tree
Hide file tree
Showing 37 changed files with 1,159 additions and 331 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ private static void addSearchRequestParams(Params params, SearchRequest searchRe
params.withPreference(searchRequest.preference());
params.withIndicesOptions(searchRequest.indicesOptions());
params.putParam("search_type", searchRequest.searchType().name().toLowerCase(Locale.ROOT));
params.putParam("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips()));
if (searchRequest.requestCache() != null) {
params.putParam("request_cache", Boolean.toString(searchRequest.requestCache()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1239,7 +1239,7 @@ public void testMultiSearch() throws IOException {
requests.add(searchRequest);
};
MultiSearchRequest.readMultiLineFormat(new BytesArray(EntityUtils.toByteArray(request.getEntity())),
REQUEST_BODY_CONTENT_TYPE.xContent(), consumer, null, multiSearchRequest.indicesOptions(), null, null, null,
REQUEST_BODY_CONTENT_TYPE.xContent(), consumer, null, multiSearchRequest.indicesOptions(), null, null, null, null,
xContentRegistry(), true);
assertEquals(requests, multiSearchRequest.requests());
}
Expand Down Expand Up @@ -1862,6 +1862,10 @@ private static void setRandomSearchParams(SearchRequest searchRequest,
searchRequest.scroll(randomTimeValue());
expectedParams.put("scroll", searchRequest.scroll().keepAlive().getStringRep());
}
if (randomBoolean()) {
searchRequest.setCcsMinimizeRoundtrips(randomBoolean());
}
expectedParams.put("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips()));
}

static void setRandomIndicesOptions(Consumer<IndicesOptions> setter, Supplier<IndicesOptions> getter,
Expand Down
43 changes: 43 additions & 0 deletions docs/reference/modules/cross-cluster-search.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ GET /cluster_one:twitter/_search
{
"took": 150,
"timed_out": false,
"num_reduce_phases": 2,
"_shards": {
"total": 1,
"successful": 1,
Expand Down Expand Up @@ -130,6 +131,7 @@ will be prefixed with their remote cluster name:
{
"took": 150,
"timed_out": false,
"num_reduce_phases": 3,
"_shards": {
"total": 2,
"successful": 2,
Expand Down Expand Up @@ -222,6 +224,7 @@ GET /cluster_one:twitter,cluster_two:twitter,twitter/_search <1>
{
"took": 150,
"timed_out": false,
"num_reduce_phases": 3,
"_shards": {
"total": 2,
"successful": 2,
Expand Down Expand Up @@ -273,3 +276,43 @@ GET /cluster_one:twitter,cluster_two:twitter,twitter/_search <1>
// TESTRESPONSE[s/"_score": 1/"_score": "$body.hits.hits.0._score"/]
// TESTRESPONSE[s/"_score": 2/"_score": "$body.hits.hits.1._score"/]
<1> The `clusters` section indicates that one cluster was unavailable and got skipped

[float]
[[ccs-reduction]]
=== CCS reduction phase

Cross-cluster search requests can be executed in two ways:

- the CCS coordinating node minimizes network round-trips by sending one search
request to each cluster. Each cluster performs the search independently,
reducing and fetching results. Once the CCS node has received all the
responses, it performs another reduction and returns the relevant results back
to the user. This strategy is beneficial when there is network latency between
the CCS coordinating node and the remote clusters involved, which is typically
the case. A single request is sent to each remote cluster, at the cost of
retrieving `from` + `size` already fetched results. This is the default
strategy, used whenever possible. In case a scroll is provided, or inner hits
are requested as part of field collapsing, this strategy is not supported hence
network round-trips cannot be minimized and the following strategy is used
instead.

- the CCS coordinating node sends a <<search-shards,search shards>> request to
each remote cluster, in order to collect information about their corresponding
remote indices involved in the search request and the shards where their data
is located. Once each cluster has responded to such request, the search
executes as if all shards were part of the same cluster. The coordinating node
sends one request to each shard involved, each shard executes the query and
returns its own results which are then reduced (and fetched, depending on the
<<search-request-search-type, search type>>) by the CCS coordinating node.
This strategy may be beneficial whenever there is very low network latency
between the CCS coordinating node and the remote clusters involved, as it
treats all shards the same, at the cost of sending many requests to each remote
cluster, which is problematic in presence of network latency.

The <<search-request-body, search API>> supports the `ccs_minimize_roundtrips`
parameter, which defaults to `true` and can be set to `false` in case
minimizing network round-trips is not desirable.

Note that all the communication between the nodes, regardless of which cluster
they belong to and the selected reduce mode, happens through the
<<modules-transport,transport layer>>.
5 changes: 5 additions & 0 deletions docs/reference/search/request-body.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ And here is a sample response:
reduce the memory overhead per search request if the potential number of
shards in the request can be large.

`ccs_minimize_roundtrips`::

Defaults to `true`. Set to `false` to disable minimizing network round-trips
between the coordinating node and the remote clusters when executing
cross-cluster search requests. See <<ccs-reduction>> for more.


Out of the above, the `search_type`, `request_cache` and the `allow_partial_search_results`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public MultiSearchTemplateRequest add(SearchTemplateRequest request) {
return this;
}


/**
* Returns the amount of search requests specified in this multi search requests are allowed to be ran concurrently.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ protected MultiSearchTemplateResponse createTestInstance() {
int successfulShards = randomIntBetween(0, totalShards);
int skippedShards = totalShards - successfulShards;
InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty();
SearchResponse.Clusters clusters = new SearchResponse.Clusters(totalShards, successfulShards, skippedShards);
SearchResponse.Clusters clusters = randomClusters();
SearchTemplateResponse searchTemplateResponse = new SearchTemplateResponse();
SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, totalShards,
successfulShards, skippedShards, tookInMillis, ShardSearchFailure.EMPTY_ARRAY, clusters);
Expand All @@ -59,22 +59,27 @@ protected MultiSearchTemplateResponse createTestInstance() {
}
return new MultiSearchTemplateResponse(items, overallTookInMillis);
}


private static SearchResponse.Clusters randomClusters() {
int totalClusters = randomIntBetween(0, 10);
int successfulClusters = randomIntBetween(0, totalClusters);
int skippedClusters = totalClusters - successfulClusters;
return new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters);
}

private static MultiSearchTemplateResponse createTestInstanceWithFailures() {
int numItems = randomIntBetween(0, 128);
long overallTookInMillis = randomNonNegativeLong();
MultiSearchTemplateResponse.Item[] items = new MultiSearchTemplateResponse.Item[numItems];
for (int i = 0; i < numItems; i++) {
if (randomBoolean()) {
// Creating a minimal response is OK, because SearchResponse self
// is tested elsewhere.
// Creating a minimal response is OK, because SearchResponse is tested elsewhere.
long tookInMillis = randomNonNegativeLong();
int totalShards = randomIntBetween(1, Integer.MAX_VALUE);
int successfulShards = randomIntBetween(0, totalShards);
int skippedShards = totalShards - successfulShards;
InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty();
SearchResponse.Clusters clusters = new SearchResponse.Clusters(totalShards, successfulShards, skippedShards);
SearchResponse.Clusters clusters = randomClusters();
SearchTemplateResponse searchTemplateResponse = new SearchTemplateResponse();
SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, totalShards,
successfulShards, skippedShards, tookInMillis, ShardSearchFailure.EMPTY_ARRAY, clusters);
Expand Down Expand Up @@ -133,6 +138,5 @@ public void testFromXContentWithFailures() throws IOException {
AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, instanceSupplier, supportsUnknownFields, Strings.EMPTY_ARRAY,
getRandomFieldsExcludeFilterWhenResultHasErrors(), this::createParser, this::doParseInstance,
this::assertEqualInstances, assertToXContentEquivalence, ToXContent.EMPTY_PARAMS);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction;
Expand All @@ -32,9 +33,11 @@
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
Expand All @@ -49,6 +52,8 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -107,6 +112,14 @@ private static MockTransportService startTransport(
channel.sendResponse(new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0],
knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap()));
});
newService.registerRequestHandler(SearchAction.NAME, ThreadPool.Names.SAME, SearchRequest::new,
(request, channel, task) -> {
InternalSearchResponse response = new InternalSearchResponse(new SearchHits(new SearchHit[0],
new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN), InternalAggregations.EMPTY, null, null, false, null, 1);
SearchResponse searchResponse = new SearchResponse(response, null, 1, 1, 0, 100, ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY);
channel.sendResponse(searchResponse);
});
newService.registerRequestHandler(ClusterStateAction.NAME, ThreadPool.Names.SAME, ClusterStateRequest::new,
(request, channel, task) -> {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
terms:
field: f1.keyword

- match: {_clusters.total: 2}
- match: {_clusters.successful: 2}
- match: {_clusters.skipped: 0}
- match: {num}
- match: { _shards.total: 5 }
- match: { hits.total: 11 }
- gte: { hits.hits.0._seq_no: 0 }
Expand All @@ -59,6 +63,9 @@
terms:
field: f1.keyword

- match: {_clusters.total: 2}
- match: {_clusters.successful: 2}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 5 }
- match: { hits.total: 6}
- match: { hits.hits.0._index: "my_remote_cluster:test_index"}
Expand All @@ -76,6 +83,9 @@
terms:
field: f1.keyword

- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 3 }
- match: { hits.total: 6}
- match: { hits.hits.0._index: "my_remote_cluster:test_index"}
Expand All @@ -93,6 +103,7 @@
terms:
field: f1.keyword

- is_false: _clusters
- match: { _shards.total: 2 }
- match: { hits.total: 5}
- match: { hits.hits.0._index: "test_index"}
Expand Down Expand Up @@ -122,6 +133,9 @@
rest_total_hits_as_int: true
index: test_remote_cluster:test_index

- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 3 }
- match: { hits.total: 6 }
- match: { hits.hits.0._index: "test_remote_cluster:test_index" }
Expand All @@ -148,6 +162,9 @@
rest_total_hits_as_int: true
index: "*:test_index"

- match: {_clusters.total: 2}
- match: {_clusters.successful: 2}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 6 }
- match: { hits.total: 12 }

Expand All @@ -159,6 +176,9 @@
rest_total_hits_as_int: true
index: my_remote_cluster:aliased_test_index

- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 3 }
- match: { hits.total: 2 }
- match: { hits.hits.0._source.filter_field: 1 }
Expand All @@ -172,6 +192,9 @@
rest_total_hits_as_int: true
index: my_remote_cluster:aliased_test_index,my_remote_cluster:field_caps_index_1

- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 4 }
- match: { hits.total: 2 }
- match: { hits.hits.0._source.filter_field: 1 }
Expand All @@ -185,6 +208,9 @@
rest_total_hits_as_int: true
index: "my_remote_cluster:single_doc_index"

- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 1 }
- match: { hits.total: 1 }
- match: { hits.hits.0._index: "my_remote_cluster:single_doc_index"}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
rest_total_hits_as_int: true
index: "skip_shards_index,my_remote_cluster:single_doc_index"
pre_filter_shard_size: 1
ccs_minimize_roundtrips: false
body: { "size" : 10, "query" : { "range" : { "created_at" : { "gte" : "2016-02-01", "lt": "2018-02-01"} } } }

- match: { hits.total: 1 }
- match: { hits.hits.0._index: "skip_shards_index"}
- is_false: num_reduce_phases
- match: { _shards.total: 2 }
- match: { _shards.successful: 2 }
- match: { _shards.skipped : 1}
Expand All @@ -45,10 +47,12 @@
rest_total_hits_as_int: true
index: "skip_shards_index,my_remote_cluster:single_doc_index"
pre_filter_shard_size: 1
ccs_minimize_roundtrips: false
body: { "size" : 10, "query" : { "range" : { "created_at" : { "gte" : "2015-02-01", "lt": "2016-02-01"} } } }

- match: { hits.total: 1 }
- match: { hits.hits.0._index: "my_remote_cluster:single_doc_index"}
- is_false: num_reduce_phases
- match: { _shards.total: 2 }
- match: { _shards.successful: 2 }
- match: { _shards.skipped : 1}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@
"type" : "boolean",
"description" : "Indicates whether hits.total should be rendered as an integer or an object in the rest search response",
"default" : false
},
"ccs_minimize_roundtrips": {
"type" : "boolean",
"description" : "Indicates whether network round-trips should be minimized as part of cross-cluster search requests execution",
"default" : "true"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
"type" : "boolean",
"description" : "Indicates whether hits.total should be rendered as an integer or an object in the rest search response",
"default" : false
},
"ccs_minimize_roundtrips": {
"type" : "boolean",
"description" : "Indicates whether network round-trips should be minimized as part of cross-cluster search requests execution",
"default" : "true"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
"type" : "boolean",
"description" : "Specify whether wildcard and prefix queries should be analyzed (default: false)"
},
"ccs_minimize_roundtrips": {
"type" : "boolean",
"description" : "Indicates whether network round-trips should be minimized as part of cross-cluster search requests execution",
"default" : "true"
},
"default_operator": {
"type" : "enum",
"options" : ["AND","OR"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@
"type" : "boolean",
"description" : "Indicates whether hits.total should be rendered as an integer or an object in the rest search response",
"default" : false
},
"ccs_minimize_roundtrips": {
"type" : "boolean",
"description" : "Indicates whether network round-trips should be minimized as part of cross-cluster search requests execution",
"default" : "true"
}
}
},
Expand Down
Loading

0 comments on commit 622fb78

Please sign in to comment.