From 622fb7883b4e7c6de6dce2636108e334cca28cd3 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Thu, 31 Jan 2019 15:12:14 +0100 Subject: [PATCH] Introduce ability to minimize round-trips in CCS (#37828) With #37566 we have introduced the ability to merge multiple search responses into one. That makes it possible to expose a new way of executing cross-cluster search requests, that makes CCS much faster whenever there is network latency between the CCS coordinating node and the remote clusters. The coordinating node can now send a single search request to each remote cluster, which gets reduced by each one of them. from + size results are requested to each cluster, and the reduce phase in each cluster is non final (meaning that buckets are not pruned and pipeline aggs are not executed). The CCS coordinating node performs an additional, final reduction, which produces one search response out of the multiple responses received from the different clusters. This new execution path will be activated by default for any CCS request unless a scroll is provided or inner hits are requested as part of field collapsing. The search API accepts now a new parameter called ccs_minimize_roundtrips that allows to opt-out of the default behaviour. Relates to #32125 --- .../client/RequestConverters.java | 1 + .../client/RequestConvertersTests.java | 6 +- .../modules/cross-cluster-search.asciidoc | 43 ++ docs/reference/search/request-body.asciidoc | 5 + .../mustache/MultiSearchTemplateRequest.java | 1 - .../MultiSearchTemplateResponseTests.java | 18 +- ...rossClusterSearchUnavailableClusterIT.java | 13 + .../test/multi_cluster/10_basic.yml | 26 + .../test/multi_cluster/70_skip_shards.yml | 4 + .../resources/rest-api-spec/api/msearch.json | 5 + .../rest-api-spec/api/msearch_template.json | 5 + .../resources/rest-api-spec/api/search.json | 5 + .../rest-api-spec/api/search_template.json | 5 + .../action/search/MultiSearchRequest.java | 7 + .../action/search/SearchRequest.java | 138 ++-- .../action/search/SearchResponse.java | 3 +- .../action/search/SearchResponseMerger.java | 107 +++- .../action/search/TransportSearchAction.java | 253 ++++++-- .../common/io/stream/StreamInput.java | 2 +- .../common/io/stream/StreamOutput.java | 2 +- .../action/search/RestMultiSearchAction.java | 3 +- .../rest/action/search/RestSearchAction.java | 1 + .../elasticsearch/search/SearchService.java | 6 +- .../search/MultiSearchRequestTests.java | 12 +- .../search/MultiSearchResponseTests.java | 7 +- .../action/search/SearchRequestTests.java | 31 +- .../search/SearchResponseMergerTests.java | 61 +- .../action/search/SearchResponseTests.java | 37 +- .../TransportSearchActionSingleNodeTests.java | 11 +- .../search/TransportSearchActionTests.java | 591 ++++++++++++++---- .../common/io/stream/BytesStreamsTests.java | 2 +- .../transport/RemoteClusterClientTests.java | 6 +- .../RemoteClusterConnectionTests.java | 27 + .../search/RandomSearchRequestGenerator.java | 15 +- .../test/multi_cluster/10_basic.yml | 22 + .../test/multi_cluster/40_scroll.yml | 7 + .../test/multi_cluster/60_skip_shards.yml | 2 + 37 files changed, 1159 insertions(+), 331 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java index 1286e083d8203..a30fec41b0bf3 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java @@ -399,6 +399,7 @@ private static void addSearchRequestParams(Params params, SearchRequest searchRe params.withPreference(searchRequest.preference()); params.withIndicesOptions(searchRequest.indicesOptions()); params.putParam("search_type", searchRequest.searchType().name().toLowerCase(Locale.ROOT)); + params.putParam("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips())); if (searchRequest.requestCache() != null) { params.putParam("request_cache", Boolean.toString(searchRequest.requestCache())); } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index f31c562332687..95971ad40ced0 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -1239,7 +1239,7 @@ public void testMultiSearch() throws IOException { requests.add(searchRequest); }; MultiSearchRequest.readMultiLineFormat(new BytesArray(EntityUtils.toByteArray(request.getEntity())), - REQUEST_BODY_CONTENT_TYPE.xContent(), consumer, null, multiSearchRequest.indicesOptions(), null, null, null, + REQUEST_BODY_CONTENT_TYPE.xContent(), consumer, null, multiSearchRequest.indicesOptions(), null, null, null, null, xContentRegistry(), true); assertEquals(requests, multiSearchRequest.requests()); } @@ -1862,6 +1862,10 @@ private static void setRandomSearchParams(SearchRequest searchRequest, searchRequest.scroll(randomTimeValue()); expectedParams.put("scroll", searchRequest.scroll().keepAlive().getStringRep()); } + if (randomBoolean()) { + searchRequest.setCcsMinimizeRoundtrips(randomBoolean()); + } + expectedParams.put("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips())); } static void setRandomIndicesOptions(Consumer setter, Supplier getter, diff --git a/docs/reference/modules/cross-cluster-search.asciidoc b/docs/reference/modules/cross-cluster-search.asciidoc index 61b0bb50aedc7..b59f74198c3e8 100644 --- a/docs/reference/modules/cross-cluster-search.asciidoc +++ b/docs/reference/modules/cross-cluster-search.asciidoc @@ -65,6 +65,7 @@ GET /cluster_one:twitter/_search { "took": 150, "timed_out": false, + "num_reduce_phases": 2, "_shards": { "total": 1, "successful": 1, @@ -130,6 +131,7 @@ will be prefixed with their remote cluster name: { "took": 150, "timed_out": false, + "num_reduce_phases": 3, "_shards": { "total": 2, "successful": 2, @@ -222,6 +224,7 @@ GET /cluster_one:twitter,cluster_two:twitter,twitter/_search <1> { "took": 150, "timed_out": false, + "num_reduce_phases": 3, "_shards": { "total": 2, "successful": 2, @@ -273,3 +276,43 @@ GET /cluster_one:twitter,cluster_two:twitter,twitter/_search <1> // TESTRESPONSE[s/"_score": 1/"_score": "$body.hits.hits.0._score"/] // TESTRESPONSE[s/"_score": 2/"_score": "$body.hits.hits.1._score"/] <1> The `clusters` section indicates that one cluster was unavailable and got skipped + +[float] +[[ccs-reduction]] +=== CCS reduction phase + +Cross-cluster search requests can be executed in two ways: + +- the CCS coordinating node minimizes network round-trips by sending one search +request to each cluster. Each cluster performs the search independently, +reducing and fetching results. Once the CCS node has received all the +responses, it performs another reduction and returns the relevant results back +to the user. This strategy is beneficial when there is network latency between +the CCS coordinating node and the remote clusters involved, which is typically +the case. A single request is sent to each remote cluster, at the cost of +retrieving `from` + `size` already fetched results. This is the default +strategy, used whenever possible. In case a scroll is provided, or inner hits +are requested as part of field collapsing, this strategy is not supported hence +network round-trips cannot be minimized and the following strategy is used +instead. + +- the CCS coordinating node sends a <> request to +each remote cluster, in order to collect information about their corresponding +remote indices involved in the search request and the shards where their data +is located. Once each cluster has responded to such request, the search +executes as if all shards were part of the same cluster. The coordinating node +sends one request to each shard involved, each shard executes the query and +returns its own results which are then reduced (and fetched, depending on the +<>) by the CCS coordinating node. +This strategy may be beneficial whenever there is very low network latency +between the CCS coordinating node and the remote clusters involved, as it +treats all shards the same, at the cost of sending many requests to each remote +cluster, which is problematic in presence of network latency. + +The <> supports the `ccs_minimize_roundtrips` +parameter, which defaults to `true` and can be set to `false` in case +minimizing network round-trips is not desirable. + +Note that all the communication between the nodes, regardless of which cluster +they belong to and the selected reduce mode, happens through the +<>. diff --git a/docs/reference/search/request-body.asciidoc b/docs/reference/search/request-body.asciidoc index dac7622aab8ed..120c4c6757599 100644 --- a/docs/reference/search/request-body.asciidoc +++ b/docs/reference/search/request-body.asciidoc @@ -113,6 +113,11 @@ And here is a sample response: reduce the memory overhead per search request if the potential number of shards in the request can be large. +`ccs_minimize_roundtrips`:: + + Defaults to `true`. Set to `false` to disable minimizing network round-trips + between the coordinating node and the remote clusters when executing + cross-cluster search requests. See <> for more. Out of the above, the `search_type`, `request_cache` and the `allow_partial_search_results` diff --git a/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/MultiSearchTemplateRequest.java b/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/MultiSearchTemplateRequest.java index a685c3ba5ba7c..c80f99484a947 100644 --- a/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/MultiSearchTemplateRequest.java +++ b/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/MultiSearchTemplateRequest.java @@ -65,7 +65,6 @@ public MultiSearchTemplateRequest add(SearchTemplateRequest request) { return this; } - /** * Returns the amount of search requests specified in this multi search requests are allowed to be ran concurrently. */ diff --git a/modules/lang-mustache/src/test/java/org/elasticsearch/script/mustache/MultiSearchTemplateResponseTests.java b/modules/lang-mustache/src/test/java/org/elasticsearch/script/mustache/MultiSearchTemplateResponseTests.java index 2c67dd4709bc9..dadaf7cb05a09 100644 --- a/modules/lang-mustache/src/test/java/org/elasticsearch/script/mustache/MultiSearchTemplateResponseTests.java +++ b/modules/lang-mustache/src/test/java/org/elasticsearch/script/mustache/MultiSearchTemplateResponseTests.java @@ -50,7 +50,7 @@ protected MultiSearchTemplateResponse createTestInstance() { int successfulShards = randomIntBetween(0, totalShards); int skippedShards = totalShards - successfulShards; InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty(); - SearchResponse.Clusters clusters = new SearchResponse.Clusters(totalShards, successfulShards, skippedShards); + SearchResponse.Clusters clusters = randomClusters(); SearchTemplateResponse searchTemplateResponse = new SearchTemplateResponse(); SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, totalShards, successfulShards, skippedShards, tookInMillis, ShardSearchFailure.EMPTY_ARRAY, clusters); @@ -59,7 +59,13 @@ protected MultiSearchTemplateResponse createTestInstance() { } return new MultiSearchTemplateResponse(items, overallTookInMillis); } - + + private static SearchResponse.Clusters randomClusters() { + int totalClusters = randomIntBetween(0, 10); + int successfulClusters = randomIntBetween(0, totalClusters); + int skippedClusters = totalClusters - successfulClusters; + return new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters); + } private static MultiSearchTemplateResponse createTestInstanceWithFailures() { int numItems = randomIntBetween(0, 128); @@ -67,14 +73,13 @@ private static MultiSearchTemplateResponse createTestInstanceWithFailures() { MultiSearchTemplateResponse.Item[] items = new MultiSearchTemplateResponse.Item[numItems]; for (int i = 0; i < numItems; i++) { if (randomBoolean()) { - // Creating a minimal response is OK, because SearchResponse self - // is tested elsewhere. + // Creating a minimal response is OK, because SearchResponse is tested elsewhere. long tookInMillis = randomNonNegativeLong(); int totalShards = randomIntBetween(1, Integer.MAX_VALUE); int successfulShards = randomIntBetween(0, totalShards); int skippedShards = totalShards - successfulShards; InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty(); - SearchResponse.Clusters clusters = new SearchResponse.Clusters(totalShards, successfulShards, skippedShards); + SearchResponse.Clusters clusters = randomClusters(); SearchTemplateResponse searchTemplateResponse = new SearchTemplateResponse(); SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, totalShards, successfulShards, skippedShards, tookInMillis, ShardSearchFailure.EMPTY_ARRAY, clusters); @@ -133,6 +138,5 @@ public void testFromXContentWithFailures() throws IOException { AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, instanceSupplier, supportsUnknownFields, Strings.EMPTY_ARRAY, getRandomFieldsExcludeFilterWhenResultHasErrors(), this::createParser, this::doParseInstance, this::assertEqualInstances, assertToXContentEquivalence, ToXContent.EMPTY_PARAMS); - } - + } } diff --git a/qa/ccs-unavailable-clusters/src/test/java/org/elasticsearch/search/CrossClusterSearchUnavailableClusterIT.java b/qa/ccs-unavailable-clusters/src/test/java/org/elasticsearch/search/CrossClusterSearchUnavailableClusterIT.java index efac9ac220573..e280b1d2d1a05 100644 --- a/qa/ccs-unavailable-clusters/src/test/java/org/elasticsearch/search/CrossClusterSearchUnavailableClusterIT.java +++ b/qa/ccs-unavailable-clusters/src/test/java/org/elasticsearch/search/CrossClusterSearchUnavailableClusterIT.java @@ -22,6 +22,7 @@ import org.apache.http.HttpEntity; import org.apache.http.entity.ContentType; import org.apache.http.nio.entity.NStringEntity; +import org.apache.lucene.search.TotalHits; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction; @@ -32,9 +33,11 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; @@ -49,6 +52,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; @@ -107,6 +112,14 @@ private static MockTransportService startTransport( channel.sendResponse(new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0], knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap())); }); + newService.registerRequestHandler(SearchAction.NAME, ThreadPool.Names.SAME, SearchRequest::new, + (request, channel, task) -> { + InternalSearchResponse response = new InternalSearchResponse(new SearchHits(new SearchHit[0], + new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN), InternalAggregations.EMPTY, null, null, false, null, 1); + SearchResponse searchResponse = new SearchResponse(response, null, 1, 1, 0, 100, ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY); + channel.sendResponse(searchResponse); + }); newService.registerRequestHandler(ClusterStateAction.NAME, ThreadPool.Names.SAME, ClusterStateRequest::new, (request, channel, task) -> { DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml index 5acf84139bbf4..4499a60bfe24a 100644 --- a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml +++ b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml @@ -36,6 +36,10 @@ terms: field: f1.keyword + - match: {_clusters.total: 2} + - match: {_clusters.successful: 2} + - match: {_clusters.skipped: 0} + - match: {num} - match: { _shards.total: 5 } - match: { hits.total: 11 } - gte: { hits.hits.0._seq_no: 0 } @@ -59,6 +63,9 @@ terms: field: f1.keyword + - match: {_clusters.total: 2} + - match: {_clusters.successful: 2} + - match: {_clusters.skipped: 0} - match: { _shards.total: 5 } - match: { hits.total: 6} - match: { hits.hits.0._index: "my_remote_cluster:test_index"} @@ -76,6 +83,9 @@ terms: field: f1.keyword + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} - match: { _shards.total: 3 } - match: { hits.total: 6} - match: { hits.hits.0._index: "my_remote_cluster:test_index"} @@ -93,6 +103,7 @@ terms: field: f1.keyword + - is_false: _clusters - match: { _shards.total: 2 } - match: { hits.total: 5} - match: { hits.hits.0._index: "test_index"} @@ -122,6 +133,9 @@ rest_total_hits_as_int: true index: test_remote_cluster:test_index + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} - match: { _shards.total: 3 } - match: { hits.total: 6 } - match: { hits.hits.0._index: "test_remote_cluster:test_index" } @@ -148,6 +162,9 @@ rest_total_hits_as_int: true index: "*:test_index" + - match: {_clusters.total: 2} + - match: {_clusters.successful: 2} + - match: {_clusters.skipped: 0} - match: { _shards.total: 6 } - match: { hits.total: 12 } @@ -159,6 +176,9 @@ rest_total_hits_as_int: true index: my_remote_cluster:aliased_test_index + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} - match: { _shards.total: 3 } - match: { hits.total: 2 } - match: { hits.hits.0._source.filter_field: 1 } @@ -172,6 +192,9 @@ rest_total_hits_as_int: true index: my_remote_cluster:aliased_test_index,my_remote_cluster:field_caps_index_1 + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} - match: { _shards.total: 4 } - match: { hits.total: 2 } - match: { hits.hits.0._source.filter_field: 1 } @@ -185,6 +208,9 @@ rest_total_hits_as_int: true index: "my_remote_cluster:single_doc_index" + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} - match: { _shards.total: 1 } - match: { hits.total: 1 } - match: { hits.hits.0._index: "my_remote_cluster:single_doc_index"} diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/70_skip_shards.yml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/70_skip_shards.yml index 81c09da54c085..d1a5a273e1d0f 100644 --- a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/70_skip_shards.yml +++ b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/70_skip_shards.yml @@ -29,10 +29,12 @@ rest_total_hits_as_int: true index: "skip_shards_index,my_remote_cluster:single_doc_index" pre_filter_shard_size: 1 + ccs_minimize_roundtrips: false body: { "size" : 10, "query" : { "range" : { "created_at" : { "gte" : "2016-02-01", "lt": "2018-02-01"} } } } - match: { hits.total: 1 } - match: { hits.hits.0._index: "skip_shards_index"} + - is_false: num_reduce_phases - match: { _shards.total: 2 } - match: { _shards.successful: 2 } - match: { _shards.skipped : 1} @@ -45,10 +47,12 @@ rest_total_hits_as_int: true index: "skip_shards_index,my_remote_cluster:single_doc_index" pre_filter_shard_size: 1 + ccs_minimize_roundtrips: false body: { "size" : 10, "query" : { "range" : { "created_at" : { "gte" : "2015-02-01", "lt": "2016-02-01"} } } } - match: { hits.total: 1 } - match: { hits.hits.0._index: "my_remote_cluster:single_doc_index"} + - is_false: num_reduce_phases - match: { _shards.total: 2 } - match: { _shards.successful: 2 } - match: { _shards.skipped : 1} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/msearch.json b/rest-api-spec/src/main/resources/rest-api-spec/api/msearch.json index 13a6005c9a189..398dcbd29515d 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/msearch.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/msearch.json @@ -43,6 +43,11 @@ "type" : "boolean", "description" : "Indicates whether hits.total should be rendered as an integer or an object in the rest search response", "default" : false + }, + "ccs_minimize_roundtrips": { + "type" : "boolean", + "description" : "Indicates whether network round-trips should be minimized as part of cross-cluster search requests execution", + "default" : "true" } } }, diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/msearch_template.json b/rest-api-spec/src/main/resources/rest-api-spec/api/msearch_template.json index e49cc2083f929..e89f96e06960f 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/msearch_template.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/msearch_template.json @@ -33,6 +33,11 @@ "type" : "boolean", "description" : "Indicates whether hits.total should be rendered as an integer or an object in the rest search response", "default" : false + }, + "ccs_minimize_roundtrips": { + "type" : "boolean", + "description" : "Indicates whether network round-trips should be minimized as part of cross-cluster search requests execution", + "default" : "true" } } }, diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/search.json b/rest-api-spec/src/main/resources/rest-api-spec/api/search.json index 9ac02b1214a2f..f44c0f74b2c3d 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/search.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/search.json @@ -24,6 +24,11 @@ "type" : "boolean", "description" : "Specify whether wildcard and prefix queries should be analyzed (default: false)" }, + "ccs_minimize_roundtrips": { + "type" : "boolean", + "description" : "Indicates whether network round-trips should be minimized as part of cross-cluster search requests execution", + "default" : "true" + }, "default_operator": { "type" : "enum", "options" : ["AND","OR"], diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/search_template.json b/rest-api-spec/src/main/resources/rest-api-spec/api/search_template.json index 528ee905e7ee8..24b7fa135b331 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/search_template.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/search_template.json @@ -67,6 +67,11 @@ "type" : "boolean", "description" : "Indicates whether hits.total should be rendered as an integer or an object in the rest search response", "default" : false + }, + "ccs_minimize_roundtrips": { + "type" : "boolean", + "description" : "Indicates whether network round-trips should be minimized as part of cross-cluster search requests execution", + "default" : "true" } } }, diff --git a/server/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java index a574c0559804b..1dc2dc624b277 100644 --- a/server/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java @@ -173,6 +173,7 @@ public static void readMultiLineFormat(BytesReference data, String[] types, String routing, String searchType, + Boolean ccsMinimizeRoundtrips, NamedXContentRegistry registry, boolean allowExplicitIndex) throws IOException { int from = 0; @@ -205,6 +206,9 @@ public static void readMultiLineFormat(BytesReference data, if (searchType != null) { searchRequest.searchType(searchType); } + if (ccsMinimizeRoundtrips != null) { + searchRequest.setCcsMinimizeRoundtrips(ccsMinimizeRoundtrips); + } IndicesOptions defaultOptions = searchRequest.indicesOptions(); // now parse the action if (nextMarker - from > 0) { @@ -226,6 +230,8 @@ public static void readMultiLineFormat(BytesReference data, searchRequest.types(nodeStringArrayValue(value)); } else if ("search_type".equals(entry.getKey()) || "searchType".equals(entry.getKey())) { searchRequest.searchType(nodeStringValue(value, null)); + } else if ("ccs_minimize_roundtrips".equals(entry.getKey()) || "ccsMinimizeRoundtrips".equals(entry.getKey())) { + searchRequest.setCcsMinimizeRoundtrips(nodeBooleanValue(value)); } else if ("request_cache".equals(entry.getKey()) || "requestCache".equals(entry.getKey())) { searchRequest.requestCache(nodeBooleanValue(value, entry.getKey())); } else if ("preference".equals(entry.getKey())) { @@ -327,6 +333,7 @@ public static void writeSearchRequestParams(SearchRequest request, XContentBuild if (request.searchType() != null) { xContentBuilder.field("search_type", request.searchType().name().toLowerCase(Locale.ROOT)); } + xContentBuilder.field("ccs_minimize_roundtrips", request.isCcsMinimizeRoundtrips()); if (request.requestCache() != null) { xContentBuilder.field("request_cache", request.requestCache()); } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 020887068f015..55122b6806fd2 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -93,6 +93,8 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest private String[] types = Strings.EMPTY_ARRAY; + private boolean ccsMinimizeRoundtrips = true; + public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.strictExpandOpenAndForbidClosedIgnoreThrottled(); private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS; @@ -106,21 +108,7 @@ public SearchRequest() { * Constructs a new search request from the provided search request */ public SearchRequest(SearchRequest searchRequest) { - this.allowPartialSearchResults = searchRequest.allowPartialSearchResults; - this.batchedReduceSize = searchRequest.batchedReduceSize; - this.indices = searchRequest.indices; - this.indicesOptions = searchRequest.indicesOptions; - this.maxConcurrentShardRequests = searchRequest.maxConcurrentShardRequests; - this.preference = searchRequest.preference; - this.preFilterShardSize = searchRequest.preFilterShardSize; - this.requestCache = searchRequest.requestCache; - this.routing = searchRequest.routing; - this.scroll = searchRequest.scroll; - this.searchType = searchRequest.searchType; - this.source = searchRequest.source; - this.types = searchRequest.types; - this.localClusterAlias = searchRequest.localClusterAlias; - this.absoluteStartMillis = searchRequest.absoluteStartMillis; + this(searchRequest, searchRequest.indices, searchRequest.localClusterAlias, searchRequest.absoluteStartMillis); } /** @@ -144,16 +132,40 @@ public SearchRequest(String[] indices, SearchSourceBuilder source) { } /** - * Creates a new search request by providing the alias of the cluster where it will be executed, as well as the current time in - * milliseconds from the epoch time. Used when a {@link SearchRequest} is created and executed as part of a cross-cluster search - * request performing local reduction on each cluster. The coordinating CCS node provides the alias to prefix index names with in - * the returned search results, and the current time to be used on the remote clusters to ensure that the same value is used. + * Creates a new search request by providing the search request to copy all fields from, the indices to search against, + * the alias of the cluster where it will be executed, as well as the start time in milliseconds from the epoch time. + * Used when a {@link SearchRequest} is created and executed as part of a cross-cluster search request performing local reduction + * on each cluster. The coordinating CCS node provides the original search request, the indices to search against as well as the + * alias to prefix index names with in the returned search results, and the absolute start time to be used on the remote clusters + * to ensure that the same value is used. */ - SearchRequest(String localClusterAlias, long absoluteStartMillis) { - this.localClusterAlias = Objects.requireNonNull(localClusterAlias, "cluster alias must not be null"); + static SearchRequest withLocalReduction(SearchRequest originalSearchRequest, String[] indices, + String localClusterAlias, long absoluteStartMillis) { + Objects.requireNonNull(originalSearchRequest, "search request must not be null"); + validateIndices(indices); + Objects.requireNonNull(localClusterAlias, "cluster alias must not be null"); if (absoluteStartMillis < 0) { throw new IllegalArgumentException("absoluteStartMillis must not be negative but was [" + absoluteStartMillis + "]"); } + return new SearchRequest(originalSearchRequest, indices, localClusterAlias, absoluteStartMillis); + } + + private SearchRequest(SearchRequest searchRequest, String[] indices, String localClusterAlias, long absoluteStartMillis) { + this.allowPartialSearchResults = searchRequest.allowPartialSearchResults; + this.batchedReduceSize = searchRequest.batchedReduceSize; + this.ccsMinimizeRoundtrips = searchRequest.ccsMinimizeRoundtrips; + this.indices = indices; + this.indicesOptions = searchRequest.indicesOptions; + this.maxConcurrentShardRequests = searchRequest.maxConcurrentShardRequests; + this.preference = searchRequest.preference; + this.preFilterShardSize = searchRequest.preFilterShardSize; + this.requestCache = searchRequest.requestCache; + this.routing = searchRequest.routing; + this.scroll = searchRequest.scroll; + this.searchType = searchRequest.searchType; + this.source = searchRequest.source; + this.types = searchRequest.types; + this.localClusterAlias = localClusterAlias; this.absoluteStartMillis = absoluteStartMillis; } @@ -191,6 +203,9 @@ public SearchRequest(StreamInput in) throws IOException { localClusterAlias = null; absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS; } + if (in.getVersion().onOrAfter(Version.V_7_0_0)) { + ccsMinimizeRoundtrips = in.readBoolean(); + } } @Override @@ -217,33 +232,37 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(absoluteStartMillis); } } + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + out.writeBoolean(ccsMinimizeRoundtrips); + } } @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; - final Scroll scroll = scroll(); - if (source != null - && source.trackTotalHitsUpTo() != null - && source.trackTotalHitsUpTo() != SearchContext.TRACK_TOTAL_HITS_ACCURATE - && scroll != null) { - validationException = - addValidationError("disabling [track_total_hits] is not allowed in a scroll context", validationException); - } - if (source != null && source.from() > 0 && scroll != null) { - validationException = - addValidationError("using [from] is not allowed in a scroll context", validationException); - } - if (requestCache != null && requestCache && scroll != null) { - validationException = - addValidationError("[request_cache] cannot be used in a scroll context", validationException); - } - if (source != null && source.size() == 0 && scroll != null) { - validationException = addValidationError("[size] cannot be [0] in a scroll context", validationException); - } - if (source != null && source.rescores() != null && source.rescores().isEmpty() == false && scroll != null) { - validationException = - addValidationError("using [rescore] is not allowed in a scroll context", validationException); + boolean scroll = scroll() != null; + if (scroll) { + if (source != null) { + if (source.trackTotalHitsUpTo() != null && source.trackTotalHitsUpTo() != SearchContext.TRACK_TOTAL_HITS_ACCURATE) { + validationException = + addValidationError("disabling [track_total_hits] is not allowed in a scroll context", validationException); + } + if (source.from() > 0) { + validationException = + addValidationError("using [from] is not allowed in a scroll context", validationException); + } + if (source.size() == 0) { + validationException = addValidationError("[size] cannot be [0] in a scroll context", validationException); + } + if (source.rescores() != null && source.rescores().isEmpty() == false) { + validationException = + addValidationError("using [rescore] is not allowed in a scroll context", validationException); + } + } + if (requestCache != null && requestCache) { + validationException = + addValidationError("[request_cache] cannot be used in a scroll context", validationException); + } } return validationException; } @@ -261,8 +280,8 @@ String getLocalClusterAlias() { /** * Returns the current time in milliseconds from the time epoch, to be used for the execution of this search request. Used to * ensure that the same value, determined by the coordinating node, is used on all nodes involved in the execution of the search - * request. When created through {@link #SearchRequest(String, long)}, this method returns the provided current time, otherwise - * it will return {@link System#currentTimeMillis()}. + * request. When created through {@link #withLocalReduction(SearchRequest, String[], String, long)}, this method returns the provided + * current time, otherwise it will return {@link System#currentTimeMillis()}. * */ long getOrCreateAbsoluteStartMillis() { @@ -274,12 +293,16 @@ long getOrCreateAbsoluteStartMillis() { */ @Override public SearchRequest indices(String... indices) { + validateIndices(indices); + this.indices = indices; + return this; + } + + private static void validateIndices(String... indices) { Objects.requireNonNull(indices, "indices must not be null"); for (String index : indices) { Objects.requireNonNull(index, "index must not be null"); } - this.indices = indices; - return this; } @Override @@ -292,6 +315,21 @@ public SearchRequest indicesOptions(IndicesOptions indicesOptions) { return this; } + /** + * Returns whether network round-trips should be minimized when executing cross-cluster search requests. + * Defaults to true. + */ + public boolean isCcsMinimizeRoundtrips() { + return ccsMinimizeRoundtrips; + } + + /** + * Sets whether network round-trips should be minimized when executing cross-cluster search requests. Defaults to true. + */ + public void setCcsMinimizeRoundtrips(boolean ccsMinimizeRoundtrips) { + this.ccsMinimizeRoundtrips = ccsMinimizeRoundtrips; + } + /** * The document types to execute the search against. Defaults to be executed against * all types. @@ -583,14 +621,15 @@ public boolean equals(Object o) { Objects.equals(indicesOptions, that.indicesOptions) && Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults) && Objects.equals(localClusterAlias, that.localClusterAlias) && - absoluteStartMillis == that.absoluteStartMillis; + absoluteStartMillis == that.absoluteStartMillis && + ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips; } @Override public int hashCode() { return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache, scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize, - allowPartialSearchResults, localClusterAlias, absoluteStartMillis); + allowPartialSearchResults, localClusterAlias, absoluteStartMillis, ccsMinimizeRoundtrips); } @Override @@ -610,6 +649,7 @@ public String toString() { ", allowPartialSearchResults=" + allowPartialSearchResults + ", localClusterAlias=" + localClusterAlias + ", getOrCreateAbsoluteStartMillis=" + absoluteStartMillis + + ", ccsMinimizeRoundtrips=" + ccsMinimizeRoundtrips + ", source=" + source + '}'; } } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java b/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java index 0273d5e58219a..dd0d4de07d6f4 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java @@ -111,7 +111,6 @@ public Aggregations getAggregations() { return internalResponse.aggregations(); } - public Suggest getSuggest() { return internalResponse.suggest(); } @@ -349,7 +348,7 @@ static SearchResponse innerFromXContent(XContentParser parser) throws IOExceptio SearchResponseSections searchResponseSections = new SearchResponseSections(hits, aggs, suggest, timedOut, terminatedEarly, profile, numReducePhases); return new SearchResponse(searchResponseSections, scrollId, totalShards, successfulShards, skippedShards, tookInMillis, - failures.toArray(new ShardSearchFailure[failures.size()]), clusters); + failures.toArray(ShardSearchFailure.EMPTY_ARRAY), clusters); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java index b146d42c0d2e6..567040246c50f 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java @@ -39,6 +39,7 @@ import org.elasticsearch.search.profile.ProfileShardResult; import org.elasticsearch.search.profile.SearchProfileShardResults; import org.elasticsearch.search.suggest.Suggest; +import org.elasticsearch.transport.RemoteClusterAware; import java.util.ArrayList; import java.util.Arrays; @@ -76,9 +77,9 @@ //from the remote clusters in the fetch phase. This would be identical to the removed QueryAndFetch strategy except that only the remote //cluster response would have the fetch results. final class SearchResponseMerger { - private final int from; - private final int size; - private final int trackTotalHitsUpTo; + final int from; + final int size; + final int trackTotalHitsUpTo; private final SearchTimeProvider searchTimeProvider; private final Function reduceContextFunction; private final List searchResponses = new CopyOnWriteArrayList<>(); @@ -98,15 +99,28 @@ final class SearchResponseMerger { * That may change in the future as it's possible to introduce incremental merges as responses come in if necessary. */ void add(SearchResponse searchResponse) { + assert searchResponse.getScrollId() == null : "merging scroll results is not supported"; searchResponses.add(searchResponse); } + int numResponses() { + return searchResponses.size(); + } + /** * Returns the merged response. To be called once all responses have been added through {@link #add(SearchResponse)} * so that all responses are merged into a single one. */ SearchResponse getMergedResponse(Clusters clusters) { - assert searchResponses.size() > 1; + //if the search is only across remote clusters, none of them are available, and all of them have skip_unavailable set to true, + //we end up calling merge without anything to merge, we just return an empty search response + if (searchResponses.size() == 0) { + SearchHits searchHits = new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), Float.NaN); + InternalSearchResponse internalSearchResponse = new InternalSearchResponse(searchHits, + InternalAggregations.EMPTY, null, null, false, null, 0); + return new SearchResponse(internalSearchResponse, null, 0, 0, 0, searchTimeProvider.buildTookInMillis(), + ShardSearchFailure.EMPTY_ARRAY, clusters); + } int totalShards = 0; int skippedShards = 0; int successfulShards = 0; @@ -115,7 +129,7 @@ SearchResponse getMergedResponse(Clusters clusters) { List failures = new ArrayList<>(); Map profileResults = new HashMap<>(); List aggs = new ArrayList<>(); - Map shards = new TreeMap<>(); + Map shards = new TreeMap<>(); List topDocsList = new ArrayList<>(searchResponses.size()); Map> groupedSuggestions = new HashMap<>(); Boolean trackTotalHits = null; @@ -171,10 +185,11 @@ SearchResponse getMergedResponse(Clusters clusters) { Suggest suggest = groupedSuggestions.isEmpty() ? null : new Suggest(Suggest.reduce(groupedSuggestions)); InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, reduceContextFunction.apply(true)); ShardSearchFailure[] shardFailures = failures.toArray(ShardSearchFailure.EMPTY_ARRAY); + SearchProfileShardResults profileShardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults); //make failures ordering consistent with ordinary search and CCS Arrays.sort(shardFailures, FAILURES_COMPARATOR); - InternalSearchResponse response = new InternalSearchResponse(mergedSearchHits, reducedAggs, suggest, - new SearchProfileShardResults(profileResults), topDocsStats.timedOut, topDocsStats.terminatedEarly, numReducePhases); + InternalSearchResponse response = new InternalSearchResponse(mergedSearchHits, reducedAggs, suggest, profileShardResults, + topDocsStats.timedOut, topDocsStats.terminatedEarly, numReducePhases); long tookInMillis = searchTimeProvider.buildTookInMillis(); return new SearchResponse(response, null, totalShards, successfulShards, skippedShards, tookInMillis, shardFailures, clusters); } @@ -210,7 +225,7 @@ private ShardId extractShardId(ShardSearchFailure failure) { } }; - private static TopDocs searchHitsToTopDocs(SearchHits searchHits, TotalHits totalHits, Map shards) { + private static TopDocs searchHitsToTopDocs(SearchHits searchHits, TotalHits totalHits, Map shards) { SearchHit[] hits = searchHits.getHits(); ScoreDoc[] scoreDocs = new ScoreDoc[hits.length]; final TopDocs topDocs; @@ -228,7 +243,8 @@ private static TopDocs searchHitsToTopDocs(SearchHits searchHits, TotalHits tota for (int i = 0; i < hits.length; i++) { SearchHit hit = hits[i]; - ShardId shardId = hit.getShard().getShardId(); + SearchShardTarget shard = hit.getShard(); + ShardIdAndClusterAlias shardId = new ShardIdAndClusterAlias(shard.getShardId(), shard.getClusterAlias()); shards.putIfAbsent(shardId, null); final SortField[] sortFields = searchHits.getSortFields(); final Object[] sortValues; @@ -246,18 +262,21 @@ private static TopDocs searchHitsToTopDocs(SearchHits searchHits, TotalHits tota return topDocs; } - private static void setShardIndex(Map shards, List topDocsList) { - int shardIndex = 0; - for (Map.Entry shard : shards.entrySet()) { - shard.setValue(shardIndex++); + private static void setShardIndex(Map shards, List topDocsList) { + { + //assign a different shardIndex to each shard, based on their shardId natural ordering and their cluster alias + int shardIndex = 0; + for (Map.Entry shard : shards.entrySet()) { + shard.setValue(shardIndex++); + } } - //and go through all the scoreDocs from each cluster and set their corresponding shardIndex + //go through all the scoreDocs from each cluster and set their corresponding shardIndex for (TopDocs topDocs : topDocsList) { for (ScoreDoc scoreDoc : topDocs.scoreDocs) { FieldDocAndSearchHit fieldDocAndSearchHit = (FieldDocAndSearchHit) scoreDoc; - //When hits come from the indices with same names on multiple clusters and same shard identifier, we rely on such indices - //to have a different uuid across multiple clusters. That's how they will get a different shardIndex. - ShardId shardId = fieldDocAndSearchHit.searchHit.getShard().getShardId(); + SearchShardTarget shard = fieldDocAndSearchHit.searchHit.getShard(); + ShardIdAndClusterAlias shardId = new ShardIdAndClusterAlias(shard.getShardId(), shard.getClusterAlias()); + assert shards.containsKey(shardId); fieldDocAndSearchHit.shardIndex = shards.get(shardId); } } @@ -294,4 +313,58 @@ private static final class FieldDocAndSearchHit extends FieldDoc { this.searchHit = searchHit; } } + + /** + * This class is used instead of plain {@link ShardId} to support the scenario where the same remote cluster is registered twice using + * different aliases. In that case searching across the same cluster twice would make an assertion in lucene fail + * (see TopDocs#tieBreakLessThan line 86). Generally, indices with same names on different clusters have different index uuids which + * make their ShardIds different, which is not the case if the index is really the same one from the same cluster, in which case we + * need to look at the cluster alias and make sure to assign a different shardIndex based on that. + */ + private static final class ShardIdAndClusterAlias implements Comparable { + private final ShardId shardId; + private final String clusterAlias; + + ShardIdAndClusterAlias(ShardId shardId, String clusterAlias) { + this.shardId = shardId; + this.clusterAlias = clusterAlias; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ShardIdAndClusterAlias that = (ShardIdAndClusterAlias) o; + return shardId.equals(that.shardId) && + clusterAlias.equals(that.clusterAlias); + } + + @Override + public int hashCode() { + return Objects.hash(shardId, clusterAlias); + } + + @Override + public int compareTo(ShardIdAndClusterAlias o) { + int shardIdCompareTo = shardId.compareTo(o.shardId); + if (shardIdCompareTo != 0) { + return shardIdCompareTo; + } + int clusterAliasCompareTo = clusterAlias.compareTo(o.clusterAlias); + if (clusterAliasCompareTo != 0) { + //TODO we may want to fix this, CCS returns remote results before local ones (TransportSearchAction#mergeShardsIterators) + if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { + return 1; + } + if (o.clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { + return -1; + } + } + return clusterAliasCompareTo; + } + } } diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 30e030eca7376..48ae3f1249522 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -47,8 +47,10 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchService; +import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; @@ -69,6 +71,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.LongSupplier; @@ -190,8 +193,8 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener< new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime); ActionListener rewriteListener = ActionListener.wrap(source -> { if (source != searchRequest.source()) { - // only set it if it changed - we don't allow null values to be set but it might be already null be we want to catch - // situations when it possible due to a bug changes to null + // only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch + // situations when source is rewritten to null due to a bug searchRequest.source(source); } final ClusterState clusterState = clusterService.state(); @@ -199,26 +202,31 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener< searchRequest.indices(), idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState)); OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); if (remoteClusterIndices.isEmpty()) { - executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, Collections.emptyList(), - (clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, SearchResponse.Clusters.EMPTY); + executeLocalSearch(task, timeProvider, searchRequest, localIndices, clusterState, listener); } else { - AtomicInteger skippedClusters = new AtomicInteger(0); - collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(), skippedClusters, - remoteClusterIndices, remoteClusterService, threadPool, - ActionListener.wrap( - searchShardsResponses -> { - List remoteShardIterators = new ArrayList<>(); - Map remoteAliasFilters = new HashMap<>(); - BiFunction clusterNodeLookup = processRemoteShards( - searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters); - int localClusters = localIndices == null ? 0 : 1; - int totalClusters = remoteClusterIndices.size() + localClusters; - int successfulClusters = searchShardsResponses.size() + localClusters; - executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, - remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener, - new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get())); - }, - listener::onFailure)); + if (shouldMinimizeRoundtrips(searchRequest)) { + ccsRemoteReduce(searchRequest, localIndices, remoteClusterIndices, timeProvider, searchService::createReduceContext, + remoteClusterService, threadPool, listener, + (r, l) -> executeLocalSearch(task, timeProvider, r, localIndices, clusterState, l)); + } else { + AtomicInteger skippedClusters = new AtomicInteger(0); + collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(), + skippedClusters, remoteClusterIndices, remoteClusterService, threadPool, + ActionListener.wrap( + searchShardsResponses -> { + List remoteShardIterators = new ArrayList<>(); + Map remoteAliasFilters = new HashMap<>(); + BiFunction clusterNodeLookup = processRemoteShards( + searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters); + int localClusters = localIndices == null ? 0 : 1; + int totalClusters = remoteClusterIndices.size() + localClusters; + int successfulClusters = searchShardsResponses.size() + localClusters; + executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, + remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener, + new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get())); + }, + listener::onFailure)); + } } }, listener::onFailure); if (searchRequest.source() == null) { @@ -229,12 +237,79 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener< } } + static boolean shouldMinimizeRoundtrips(SearchRequest searchRequest) { + if (searchRequest.isCcsMinimizeRoundtrips() == false) { + return false; + } + if (searchRequest.scroll() != null) { + return false; + } + SearchSourceBuilder source = searchRequest.source(); + return source == null || source.collapse() == null || source.collapse().getInnerHits() == null || + source.collapse().getInnerHits().isEmpty(); + } + + static void ccsRemoteReduce(SearchRequest searchRequest, OriginalIndices localIndices, Map remoteIndices, + SearchTimeProvider timeProvider, Function reduceContext, + RemoteClusterService remoteClusterService, ThreadPool threadPool, ActionListener listener, + BiConsumer> localSearchConsumer) { + SearchResponseMerger searchResponseMerger = createSearchResponseMerger(searchRequest.source(), timeProvider, reduceContext); + AtomicInteger skippedClusters = new AtomicInteger(0); + final AtomicReference exceptions = new AtomicReference<>(); + int totalClusters = remoteIndices.size() + (localIndices == null ? 0 : 1); + final CountDown countDown = new CountDown(totalClusters); + for (Map.Entry entry : remoteIndices.entrySet()) { + String clusterAlias = entry.getKey(); + boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias); + OriginalIndices indices = entry.getValue(); + SearchRequest ccsSearchRequest = SearchRequest.withLocalReduction(searchRequest, indices.indices(), + clusterAlias, timeProvider.getAbsoluteStartMillis()); + ActionListener ccsListener = createCCSListener(clusterAlias, skipUnavailable, countDown, + skippedClusters, exceptions, searchResponseMerger, totalClusters, listener); + Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias); + remoteClusterClient.search(ccsSearchRequest, ccsListener); + } + if (localIndices != null) { + ActionListener ccsListener = createCCSListener(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, + false, countDown, skippedClusters, exceptions, searchResponseMerger, totalClusters, listener); + //here we provide the empty string a cluster alias, which means no prefix in index name, + //but the coord node will perform non final reduce as it's not null. + SearchRequest ccsLocalSearchRequest = SearchRequest.withLocalReduction(searchRequest, localIndices.indices(), + RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis()); + localSearchConsumer.accept(ccsLocalSearchRequest, ccsListener); + } + } + + static SearchResponseMerger createSearchResponseMerger(SearchSourceBuilder source, SearchTimeProvider timeProvider, + Function reduceContextFunction) { + final int from; + final int size; + final int trackTotalHitsUpTo; + if (source == null) { + from = SearchService.DEFAULT_FROM; + size = SearchService.DEFAULT_SIZE; + trackTotalHitsUpTo = SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO; + } else { + from = source.from() == -1 ? SearchService.DEFAULT_FROM : source.from(); + size = source.size() == -1 ? SearchService.DEFAULT_SIZE : source.size(); + trackTotalHitsUpTo = source.trackTotalHitsUpTo() == null + ? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO : source.trackTotalHitsUpTo(); + //here we modify the original source so we can re-use it by setting it to each outgoing search request + source.from(0); + source.size(from + size); + //TODO when searching only against a remote cluster, we could ask directly for the final number of results and let + //the remote cluster do a final reduction, yet that is not possible as we are providing a localClusterAlias which + //will automatically make the reduction non final + } + return new SearchResponseMerger(from, size, trackTotalHitsUpTo, timeProvider, reduceContextFunction); + } + static void collectSearchShards(IndicesOptions indicesOptions, String preference, String routing, AtomicInteger skippedClusters, Map remoteIndicesByCluster, RemoteClusterService remoteClusterService, ThreadPool threadPool, ActionListener> listener) { final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size()); final Map searchShardsResponses = new ConcurrentHashMap<>(); - final AtomicReference transportException = new AtomicReference<>(); + final AtomicReference exceptions = new AtomicReference<>(); for (Map.Entry entry : remoteIndicesByCluster.entrySet()) { final String clusterAlias = entry.getKey(); boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias); @@ -242,49 +317,53 @@ static void collectSearchShards(IndicesOptions indicesOptions, String preference final String[] indices = entry.getValue().indices(); ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices) .indicesOptions(indicesOptions).local(true).preference(preference).routing(routing); - clusterClient.admin().cluster().searchShards(searchShardsRequest, new ActionListener() { + clusterClient.admin().cluster().searchShards(searchShardsRequest, + new CCSActionListener>( + clusterAlias, skipUnavailable, responsesCountDown, skippedClusters, exceptions, listener) { @Override - public void onResponse(ClusterSearchShardsResponse response) { - searchShardsResponses.put(clusterAlias, response); - maybeFinish(); + void innerOnResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) { + searchShardsResponses.put(clusterAlias, clusterSearchShardsResponse); } @Override - public void onFailure(Exception e) { - if (skipUnavailable) { - skippedClusters.incrementAndGet(); - } else { - RemoteTransportException exception = - new RemoteTransportException("error while communicating with remote cluster [" + clusterAlias + "]", e); - if (transportException.compareAndSet(null, exception) == false) { - transportException.accumulateAndGet(exception, (previous, current) -> { - current.addSuppressed(previous); - return current; - }); - } - } - maybeFinish(); - } - - private void maybeFinish() { - if (responsesCountDown.countDown()) { - RemoteTransportException exception = transportException.get(); - if (exception == null) { - listener.onResponse(searchShardsResponses); - } else { - listener.onFailure(transportException.get()); - } - } + Map createFinalResponse() { + return searchShardsResponses; } } ); } } + private static ActionListener createCCSListener(String clusterAlias, boolean skipUnavailable, CountDown countDown, + AtomicInteger skippedClusters, AtomicReference exceptions, + SearchResponseMerger searchResponseMerger, int totalClusters, + ActionListener originalListener) { + return new CCSActionListener(clusterAlias, skipUnavailable, countDown, skippedClusters, + exceptions, originalListener) { + @Override + void innerOnResponse(SearchResponse searchResponse) { + searchResponseMerger.add(searchResponse); + } + + @Override + SearchResponse createFinalResponse() { + SearchResponse.Clusters clusters = new SearchResponse.Clusters(totalClusters, searchResponseMerger.numResponses(), + skippedClusters.get()); + return searchResponseMerger.getMergedResponse(clusters); + } + }; + } + + private void executeLocalSearch(Task task, SearchTimeProvider timeProvider, SearchRequest searchRequest, OriginalIndices localIndices, + ClusterState clusterState, ActionListener listener) { + executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, Collections.emptyList(), + (clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, SearchResponse.Clusters.EMPTY); + } + static BiFunction processRemoteShards(Map searchShardsResponses, - Map remoteIndicesByCluster, - List remoteShardIterators, - Map aliasFilterMap) { + Map remoteIndicesByCluster, + List remoteShardIterators, + Map aliasFilterMap) { Map> clusterToNode = new HashMap<>(); for (Map.Entry entry : searchShardsResponses.entrySet()) { String clusterAlias = entry.getKey(); @@ -491,4 +570,70 @@ private static void failIfOverShardCountLimit(ClusterService clusterService, int + "] to a greater value if you really want to query that many shards at the same time."); } } + + abstract static class CCSActionListener implements ActionListener { + private final String clusterAlias; + private final boolean skipUnavailable; + private final CountDown countDown; + private final AtomicInteger skippedClusters; + private final AtomicReference exceptions; + private final ActionListener originalListener; + + CCSActionListener(String clusterAlias, boolean skipUnavailable, CountDown countDown, AtomicInteger skippedClusters, + AtomicReference exceptions, ActionListener originalListener) { + this.clusterAlias = clusterAlias; + this.skipUnavailable = skipUnavailable; + this.countDown = countDown; + this.skippedClusters = skippedClusters; + this.exceptions = exceptions; + this.originalListener = originalListener; + } + + @Override + public final void onResponse(Response response) { + innerOnResponse(response); + maybeFinish(); + } + + abstract void innerOnResponse(Response response); + + @Override + public final void onFailure(Exception e) { + if (skipUnavailable) { + skippedClusters.incrementAndGet(); + } else { + Exception exception = e; + if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias) == false) { + exception = new RemoteTransportException("error while communicating with remote cluster [" + clusterAlias + "]", e); + } + if (exceptions.compareAndSet(null, exception) == false) { + exceptions.accumulateAndGet(exception, (previous, current) -> { + current.addSuppressed(previous); + return current; + }); + } + } + maybeFinish(); + } + + private void maybeFinish() { + if (countDown.countDown()) { + Exception exception = exceptions.get(); + if (exception == null) { + FinalResponse response; + try { + response = createFinalResponse(); + } catch(Exception e) { + originalListener.onFailure(e); + return; + } + originalListener.onResponse(response); + } else { + originalListener.onFailure(exceptions.get()); + } + } + } + + abstract FinalResponse createFinalResponse(); + } } diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java index dde71ad68e17f..2de583b460f84 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java @@ -1009,7 +1009,7 @@ public List readNamedWriteableList(Class catego } /** - * Reads an enum with type E that was serialized based on the value of it's ordinal + * Reads an enum with type E that was serialized based on the value of its ordinal */ public > E readEnum(Class enumClass) throws IOException { int ordinal = readVInt(); diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java index 3031e2f2e7164..175f800a7d8cf 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java @@ -1094,7 +1094,7 @@ public void writeNamedWriteableList(List list) throws } /** - * Writes an enum with type E that by serialized it based on it's ordinal value + * Writes an enum with type E based on its ordinal value */ public > void writeEnum(E enumValue) throws IOException { writeVInt(enumValue.ordinal()); diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java b/server/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java index 49bebe053a3f9..05a20a0cc06b9 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java @@ -147,13 +147,14 @@ public static void parseMultiLineRequest(RestRequest request, IndicesOptions ind String[] indices = Strings.splitStringByCommaToArray(request.param("index")); String[] types = Strings.splitStringByCommaToArray(request.param("type")); String searchType = request.param("search_type"); + boolean ccsMinimizeRoundtrips = request.paramAsBoolean("ccs_minimize_roundtrips", true); String routing = request.param("routing"); final Tuple sourceTuple = request.contentOrSourceParam(); final XContent xContent = sourceTuple.v1().xContent(); final BytesReference data = sourceTuple.v2(); MultiSearchRequest.readMultiLineFormat(data, xContent, consumer, indices, indicesOptions, types, routing, - searchType, request.getXContentRegistry(), allowExplicitIndex); + searchType, ccsMinimizeRoundtrips, request.getXContentRegistry(), allowExplicitIndex); } @Override diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java index 78082dd364173..00c08a124f1e4 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java @@ -173,6 +173,7 @@ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest r searchRequest.routing(request.param("routing")); searchRequest.preference(request.param("preference")); searchRequest.indicesOptions(IndicesOptions.fromRequest(request, searchRequest.indicesOptions())); + searchRequest.setCcsMinimizeRoundtrips(request.paramAsBoolean("ccs_minimize_roundtrips", true)); checkRestTotalHits(request, searchRequest); } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index ef255c8af7ad1..a14b4a328775c 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -148,6 +148,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv public static final Setting MAX_OPEN_SCROLL_CONTEXT = Setting.intSetting("search.max_open_scroll_context", 500, 0, Property.Dynamic, Property.NodeScope); + public static final int DEFAULT_SIZE = 10; + public static final int DEFAULT_FROM = 0; private final ThreadPool threadPool; @@ -606,10 +608,10 @@ final SearchContext createContext(ShardSearchRequest request) throws IOException // if the from and size are still not set, default them if (context.from() == -1) { - context.from(0); + context.from(DEFAULT_FROM); } if (context.size() == -1) { - context.size(10); + context.size(DEFAULT_SIZE); } // pre process diff --git a/server/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java b/server/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java index bd12f46564bac..da22ce4c96c1a 100644 --- a/server/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java @@ -180,7 +180,7 @@ public void testSimpleAdd4() throws Exception { assertThat(request.requests().get(2).routing(), equalTo("123")); } - public void testResponseErrorToXContent() throws IOException { + public void testResponseErrorToXContent() { long tookInMillis = randomIntBetween(1, 1000); MultiSearchResponse response = new MultiSearchResponse( new MultiSearchResponse.Item[] { @@ -262,12 +262,12 @@ public void testMultiLineSerialization() throws IOException { parsedRequest.add(r); }; MultiSearchRequest.readMultiLineFormat(new BytesArray(originalBytes), xContentType.xContent(), - consumer, null, null, null, null, null, xContentRegistry(), true); + consumer, null, null, null, null, null, null, xContentRegistry(), true); assertEquals(originalRequest, parsedRequest); } } - public void testEqualsAndHashcode() throws IOException { + public void testEqualsAndHashcode() { checkEqualsAndHashCode(createMultiSearchRequest(), MultiSearchRequestTests::copyRequest, MultiSearchRequestTests::mutate); } @@ -282,7 +282,7 @@ private static MultiSearchRequest mutate(MultiSearchRequest searchRequest) throw return mutation; } - private static MultiSearchRequest copyRequest(MultiSearchRequest request) throws IOException { + private static MultiSearchRequest copyRequest(MultiSearchRequest request) { MultiSearchRequest copy = new MultiSearchRequest(); if (request.maxConcurrentSearchRequests() > 0) { copy.maxConcurrentSearchRequests(request.maxConcurrentSearchRequests()); @@ -294,7 +294,7 @@ private static MultiSearchRequest copyRequest(MultiSearchRequest request) throws return copy; } - private static MultiSearchRequest createMultiSearchRequest() throws IOException { + private static MultiSearchRequest createMultiSearchRequest() { int numSearchRequest = randomIntBetween(1, 128); MultiSearchRequest request = new MultiSearchRequest(); for (int j = 0; j < numSearchRequest; j++) { @@ -321,7 +321,7 @@ private static MultiSearchRequest createMultiSearchRequest() throws IOException return request; } - private static SearchRequest createSimpleSearchRequest() throws IOException { + private static SearchRequest createSimpleSearchRequest() { return randomSearchRequest(() -> { // No need to return a very complex SearchSourceBuilder here, that is tested elsewhere SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); diff --git a/server/src/test/java/org/elasticsearch/action/search/MultiSearchResponseTests.java b/server/src/test/java/org/elasticsearch/action/search/MultiSearchResponseTests.java index 4bd4406d81cca..d91a4eaf02288 100644 --- a/server/src/test/java/org/elasticsearch/action/search/MultiSearchResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/MultiSearchResponseTests.java @@ -46,8 +46,8 @@ protected MultiSearchResponse createTestInstance() { int totalShards = randomIntBetween(1, Integer.MAX_VALUE); int successfulShards = randomIntBetween(0, totalShards); int skippedShards = totalShards - successfulShards; + SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty(); - SearchResponse.Clusters clusters = new SearchResponse.Clusters(totalShards, successfulShards, skippedShards); SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, totalShards, successfulShards, skippedShards, tookInMillis, ShardSearchFailure.EMPTY_ARRAY, clusters); items[i] = new MultiSearchResponse.Item(searchResponse, null); @@ -60,14 +60,13 @@ private static MultiSearchResponse createTestInstanceWithFailures() { MultiSearchResponse.Item[] items = new MultiSearchResponse.Item[numItems]; for (int i = 0; i < numItems; i++) { if (randomBoolean()) { - // Creating a minimal response is OK, because SearchResponse self - // is tested elsewhere. + // Creating a minimal response is OK, because SearchResponse is tested elsewhere. long tookInMillis = randomNonNegativeLong(); int totalShards = randomIntBetween(1, Integer.MAX_VALUE); int successfulShards = randomIntBetween(0, totalShards); int skippedShards = totalShards - successfulShards; + SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty(); - SearchResponse.Clusters clusters = new SearchResponse.Clusters(totalShards, successfulShards, skippedShards); SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, totalShards, successfulShards, skippedShards, tookInMillis, ShardSearchFailure.EMPTY_ARRAY, clusters); items[i] = new MultiSearchResponse.Item(searchResponse, null); diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java index 91f6c0c09cd20..1d2d59c60e2ae 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java @@ -22,12 +22,12 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.ArrayUtils; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.AbstractSearchTestCase; -import org.elasticsearch.search.RandomSearchRequestGenerator; import org.elasticsearch.search.Scroll; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.rescore.QueryRescorerBuilder; @@ -48,19 +48,23 @@ public class SearchRequestTests extends AbstractSearchTestCase { @Override protected SearchRequest createSearchRequest() throws IOException { + SearchRequest request = super.createSearchRequest(); if (randomBoolean()) { - return super.createSearchRequest(); + return request; } //clusterAlias and absoluteStartMillis do not have public getters/setters hence we randomize them only in this test specifically. - SearchRequest searchRequest = new SearchRequest(randomAlphaOfLengthBetween(5, 10), randomNonNegativeLong()); - RandomSearchRequestGenerator.randomSearchRequest(searchRequest, this::createSearchSourceBuilder); - return searchRequest; + return SearchRequest.withLocalReduction(request, request.indices(), + randomAlphaOfLengthBetween(5, 10), randomNonNegativeLong()); } - public void testClusterAliasValidation() { - expectThrows(NullPointerException.class, () -> new SearchRequest(null, 0)); - expectThrows(IllegalArgumentException.class, () -> new SearchRequest("", -1)); - SearchRequest searchRequest = new SearchRequest("", 0); + public void testWithLocalReduction() { + expectThrows(NullPointerException.class, () -> SearchRequest.withLocalReduction(null, Strings.EMPTY_ARRAY, "", 0)); + SearchRequest request = new SearchRequest(); + expectThrows(NullPointerException.class, () -> SearchRequest.withLocalReduction(request, null, "", 0)); + expectThrows(NullPointerException.class, () -> SearchRequest.withLocalReduction(request, new String[]{null}, "", 0)); + expectThrows(NullPointerException.class, () -> SearchRequest.withLocalReduction(request, Strings.EMPTY_ARRAY, null, 0)); + expectThrows(IllegalArgumentException.class, () -> SearchRequest.withLocalReduction(request, Strings.EMPTY_ARRAY, "", -1)); + SearchRequest searchRequest = SearchRequest.withLocalReduction(request, Strings.EMPTY_ARRAY, "", 0); assertNull(searchRequest.validate()); } @@ -72,10 +76,15 @@ public void testSerialization() throws Exception { assertNotSame(deserializedRequest, searchRequest); } - public void testClusterAliasSerialization() throws IOException { + public void testRandomVersionSerialization() throws IOException { SearchRequest searchRequest = createSearchRequest(); Version version = VersionUtils.randomVersion(random()); SearchRequest deserializedRequest = copyWriteable(searchRequest, namedWriteableRegistry, SearchRequest::new, version); + if (version.before(Version.V_7_0_0)) { + assertTrue(deserializedRequest.isCcsMinimizeRoundtrips()); + } else { + assertEquals(searchRequest.isCcsMinimizeRoundtrips(), deserializedRequest.isCcsMinimizeRoundtrips()); + } if (version.before(Version.V_6_7_0)) { assertNull(deserializedRequest.getLocalClusterAlias()); assertAbsoluteStartMillisIsCurrentTime(deserializedRequest); @@ -93,6 +102,7 @@ public void testReadFromPre6_7_0() throws IOException { assertArrayEquals(new String[]{"index"}, searchRequest.indices()); assertNull(searchRequest.getLocalClusterAlias()); assertAbsoluteStartMillisIsCurrentTime(searchRequest); + assertTrue(searchRequest.isCcsMinimizeRoundtrips()); } } @@ -215,6 +225,7 @@ private SearchRequest mutate(SearchRequest searchRequest) { mutators.add(() -> mutation.searchType(randomValueOtherThan(searchRequest.searchType(), () -> randomFrom(SearchType.DFS_QUERY_THEN_FETCH, SearchType.QUERY_THEN_FETCH)))); mutators.add(() -> mutation.source(randomValueOtherThan(searchRequest.source(), this::createSearchSourceBuilder))); + mutators.add(() -> mutation.setCcsMinimizeRoundtrips(searchRequest.isCcsMinimizeRoundtrips() == false)); randomFrom(mutators).run(); return mutation; } diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java index d02b712eaaef3..712d6a60440fe 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java @@ -73,7 +73,7 @@ public class SearchResponseMergerTests extends ESTestCase { @Before public void init() { - numResponses = randomIntBetween(2, 10); + numResponses = randomIntBetween(1, 10); executorService = Executors.newFixedThreadPool(numResponses); } @@ -87,7 +87,7 @@ private void addResponse(SearchResponseMerger searchResponseMerger, SearchRespon private void awaitResponsesAdded() throws InterruptedException { executorService.shutdown(); - executorService.awaitTermination(5, TimeUnit.SECONDS); + assertTrue(executorService.awaitTermination(5, TimeUnit.SECONDS)); } public void testMergeTookInMillis() throws InterruptedException { @@ -137,6 +137,7 @@ public void testMergeShardFailures() throws InterruptedException { addResponse(merger, searchResponse); } awaitResponsesAdded(); + assertEquals(numResponses, merger.numResponses()); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); SearchResponse mergedResponse = merger.getMergedResponse(clusters); assertSame(clusters, mergedResponse.getClusters()); @@ -170,6 +171,7 @@ public void testMergeShardFailuresNullShardId() throws InterruptedException { addResponse(merger, searchResponse); } awaitResponsesAdded(); + assertEquals(numResponses, merger.numResponses()); ShardSearchFailure[] shardFailures = merger.getMergedResponse(SearchResponse.Clusters.EMPTY).getShardFailures(); assertThat(Arrays.asList(shardFailures), containsInAnyOrder(expectedFailures.toArray(ShardSearchFailure.EMPTY_ARRAY))); } @@ -189,6 +191,7 @@ public void testMergeProfileResults() throws InterruptedException { addResponse(merger, searchResponse); } awaitResponsesAdded(); + assertEquals(numResponses, merger.numResponses()); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); SearchResponse mergedResponse = merger.getMergedResponse(clusters); assertSame(clusters, mergedResponse.getClusters()); @@ -221,6 +224,7 @@ public void testMergeSuggestions() throws InterruptedException { addResponse(searchResponseMerger, searchResponse); } awaitResponsesAdded(); + assertEquals(numResponses, searchResponseMerger.numResponses()); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); SearchResponse mergedResponse = searchResponseMerger.getMergedResponse(clusters); assertSame(clusters, mergedResponse.getClusters()); @@ -267,6 +271,7 @@ public void testMergeAggs() throws InterruptedException { addResponse(searchResponseMerger, searchResponse); } awaitResponsesAdded(); + assertEquals(numResponses, searchResponseMerger.numResponses()); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); SearchResponse mergedResponse = searchResponseMerger.getMergedResponse(clusters); assertSame(clusters, mergedResponse.getClusters()); @@ -334,7 +339,7 @@ public void testMergeSearchHits() throws InterruptedException { Iterator> indicesIterator = randomRealisticIndices(numIndices, numResponses).entrySet().iterator(); for (int i = 0; i < numResponses; i++) { Map.Entry entry = indicesIterator.next(); - String clusterAlias = entry.getKey().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) ? null : entry.getKey(); + String clusterAlias = entry.getKey(); Index[] indices = entry.getValue(); int total = randomIntBetween(1, 1000); expectedTotal += total; @@ -386,7 +391,7 @@ public void testMergeSearchHits() throws InterruptedException { } awaitResponsesAdded(); - + assertEquals(numResponses, searchResponseMerger.numResponses()); final SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); SearchResponse searchResponse = searchResponseMerger.getMergedResponse(clusters); @@ -434,6 +439,33 @@ public void testMergeSearchHits() throws InterruptedException { } } + public void testMergeNoResponsesAdded() { + long currentRelativeTime = randomLong(); + final SearchTimeProvider timeProvider = new SearchTimeProvider(randomLong(), 0, () -> currentRelativeTime); + SearchResponseMerger merger = new SearchResponseMerger(0, 10, Integer.MAX_VALUE, timeProvider, flag -> null); + SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); + assertEquals(0, merger.numResponses()); + SearchResponse response = merger.getMergedResponse(clusters); + assertSame(clusters, response.getClusters()); + assertEquals(TimeUnit.NANOSECONDS.toMillis(currentRelativeTime), response.getTook().millis()); + assertEquals(0, response.getTotalShards()); + assertEquals(0, response.getSuccessfulShards()); + assertEquals(0, response.getSkippedShards()); + assertEquals(0, response.getFailedShards()); + assertEquals(0, response.getNumReducePhases()); + assertFalse(response.isTimedOut()); + assertNotNull(response.getHits().getTotalHits()); + assertEquals(0, response.getHits().getTotalHits().value); + assertEquals(0, response.getHits().getHits().length); + assertEquals(TotalHits.Relation.EQUAL_TO, response.getHits().getTotalHits().relation); + assertNull(response.getScrollId()); + assertSame(InternalAggregations.EMPTY, response.getAggregations()); + assertNull(response.getSuggest()); + assertEquals(0, response.getProfileResults().size()); + assertNull(response.isTerminatedEarly()); + assertEquals(0, response.getShardFailures().length); + } + private static Tuple randomTrackTotalHits() { switch(randomIntBetween(0, 2)) { case 0: @@ -499,8 +531,11 @@ private static Map randomRealisticIndices(int numIndices, int n for (int i = 0; i < numClusters; i++) { Index[] indices = new Index[indicesNames.length]; for (int j = 0; j < indices.length; j++) { - //Realistically clusters have the same indices with same names, but different uuid - indices[j] = new Index(indicesNames[j], randomAlphaOfLength(10)); + String indexName = indicesNames[j]; + //Realistically clusters have the same indices with same names, but different uuid. Yet it can happen that the same cluster + //is registered twice with different aliases and searched multiple times as part of the same search request. + String indexUuid = frequently() ? randomAlphaOfLength(10) : indexName; + indices[j] = new Index(indexName, indexUuid); } String clusterAlias; if (frequently() || indicesPerCluster.containsKey(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { @@ -551,10 +586,22 @@ public int compare(SearchHit a, SearchHit b) { } } } - int shardIdCompareTo = a.getShard().getShardId().compareTo(b.getShard().getShardId()); + SearchShardTarget aShard = a.getShard(); + SearchShardTarget bShard = b.getShard(); + int shardIdCompareTo = aShard.getShardId().compareTo(bShard.getShardId()); if (shardIdCompareTo != 0) { return shardIdCompareTo; } + int clusterAliasCompareTo = aShard.getClusterAlias().compareTo(bShard.getClusterAlias()); + if (clusterAliasCompareTo != 0) { + if (aShard.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { + return 1; + } + if (bShard.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { + return -1; + } + return clusterAliasCompareTo; + } return Integer.compare(a.docId(), b.docId()); } } diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchResponseTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchResponseTests.java index f07be38765f66..18890e1339557 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchResponseTests.java @@ -20,13 +20,11 @@ package org.elasticsearch.action.search; import org.apache.lucene.search.TotalHits; +import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.text.Text; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -246,7 +244,8 @@ public void testToXContent() { new InternalSearchResponse( new SearchHits(hits, new TotalHits(100, TotalHits.Relation.EQUAL_TO), 1.5f), null, null, null, false, null, 1 ), - null, 0, 0, 0, 0, ShardSearchFailure.EMPTY_ARRAY, new SearchResponse.Clusters(5, 3, 2)); + null, 0, 0, 0, 0, ShardSearchFailure.EMPTY_ARRAY, + new SearchResponse.Clusters(5, 3, 2)); StringBuilder expectedString = new StringBuilder(); expectedString.append("{"); { @@ -279,24 +278,18 @@ public void testToXContent() { public void testSerialization() throws IOException { SearchResponse searchResponse = createTestItem(false); - BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); - searchResponse.writeTo(bytesStreamOutput); - try (StreamInput in = new NamedWriteableAwareStreamInput( - StreamInput.wrap(bytesStreamOutput.bytes().toBytesRef().bytes), namedWriteableRegistry)) { - SearchResponse serialized = new SearchResponse(); - serialized.readFrom(in); - if (searchResponse.getHits().getTotalHits() == null) { - assertNull(serialized.getHits().getTotalHits()); - } else { - assertEquals(searchResponse.getHits().getTotalHits().value, serialized.getHits().getTotalHits().value); - assertEquals(searchResponse.getHits().getTotalHits().relation, serialized.getHits().getTotalHits().relation); - } - assertEquals(searchResponse.getHits().getHits().length, serialized.getHits().getHits().length); - assertEquals(searchResponse.getNumReducePhases(), serialized.getNumReducePhases()); - assertEquals(searchResponse.getFailedShards(), serialized.getFailedShards()); - assertEquals(searchResponse.getTotalShards(), serialized.getTotalShards()); - assertEquals(searchResponse.getSkippedShards(), serialized.getSkippedShards()); - assertEquals(searchResponse.getClusters(), serialized.getClusters()); + SearchResponse deserialized = copyStreamable(searchResponse, namedWriteableRegistry, SearchResponse::new, Version.CURRENT); + if (searchResponse.getHits().getTotalHits() == null) { + assertNull(deserialized.getHits().getTotalHits()); + } else { + assertEquals(searchResponse.getHits().getTotalHits().value, deserialized.getHits().getTotalHits().value); + assertEquals(searchResponse.getHits().getTotalHits().relation, deserialized.getHits().getTotalHits().relation); } + assertEquals(searchResponse.getHits().getHits().length, deserialized.getHits().getHits().length); + assertEquals(searchResponse.getNumReducePhases(), deserialized.getNumReducePhases()); + assertEquals(searchResponse.getFailedShards(), deserialized.getFailedShards()); + assertEquals(searchResponse.getTotalShards(), deserialized.getTotalShards()); + assertEquals(searchResponse.getSkippedShards(), deserialized.getSkippedShards()); + assertEquals(searchResponse.getClusters(), deserialized.getClusters()); } } diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java index 19bd76ec09da2..8fd75c5fd673d 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.common.Strings; import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; @@ -41,7 +42,7 @@ public void testLocalClusterAlias() { assertEquals(RestStatus.CREATED, indexResponse.status()); { - SearchRequest searchRequest = new SearchRequest("local", nowInMillis); + SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), Strings.EMPTY_ARRAY, "local", nowInMillis); SearchResponse searchResponse = client().search(searchRequest).actionGet(); assertEquals(1, searchResponse.getHits().getTotalHits().value); SearchHit[] hits = searchResponse.getHits().getHits(); @@ -52,7 +53,7 @@ public void testLocalClusterAlias() { assertEquals("1", hit.getId()); } { - SearchRequest searchRequest = new SearchRequest("", nowInMillis); + SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), Strings.EMPTY_ARRAY, "", nowInMillis); SearchResponse searchResponse = client().search(searchRequest).actionGet(); assertEquals(1, searchResponse.getHits().getTotalHits().value); SearchHit[] hits = searchResponse.getHits().getHits(); @@ -93,19 +94,19 @@ public void testAbsoluteStartMillis() { assertEquals(0, searchResponse.getTotalShards()); } { - SearchRequest searchRequest = new SearchRequest("", 0); + SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), Strings.EMPTY_ARRAY, "", 0); SearchResponse searchResponse = client().search(searchRequest).actionGet(); assertEquals(2, searchResponse.getHits().getTotalHits().value); } { - SearchRequest searchRequest = new SearchRequest("", 0); + SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), Strings.EMPTY_ARRAY, "", 0); searchRequest.indices(""); SearchResponse searchResponse = client().search(searchRequest).actionGet(); assertEquals(1, searchResponse.getHits().getTotalHits().value); assertEquals("test-1970.01.01", searchResponse.getHits().getHits()[0].getIndex()); } { - SearchRequest searchRequest = new SearchRequest("", 0); + SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), Strings.EMPTY_ARRAY, "", 0); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); RangeQueryBuilder rangeQuery = new RangeQueryBuilder("date"); rangeQuery.gte("1970-01-01"); diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 1b99beee65e81..8a5859e200eac 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -19,6 +19,8 @@ package org.elasticsearch.action.search; +import org.apache.lucene.search.TotalHits; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; @@ -34,13 +36,24 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.index.query.InnerHitBuilder; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.Scroll; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.collapse.CollapseBuilder; import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; @@ -320,169 +333,495 @@ public void close() { } } - private MockTransportService startTransport(String id, List knownNodes) { - return RemoteClusterConnectionTests.startTransport(id, knownNodes, Version.CURRENT, threadPool); - } - - public void testCollectSearchShards() throws Exception { - int numClusters = randomIntBetween(2, 10); + private MockTransportService[] startTransport(int numClusters, DiscoveryNode[] nodes, Map remoteIndices, + Settings.Builder settingsBuilder) { MockTransportService[] mockTransportServices = new MockTransportService[numClusters]; - DiscoveryNode[] nodes = new DiscoveryNode[numClusters]; - Map remoteIndicesByCluster = new HashMap<>(); - Settings.Builder builder = Settings.builder(); for (int i = 0; i < numClusters; i++) { List knownNodes = new CopyOnWriteArrayList<>(); - MockTransportService remoteSeedTransport = startTransport("node_remote" + i, knownNodes); + MockTransportService remoteSeedTransport = RemoteClusterConnectionTests.startTransport("node_remote" + i, knownNodes, + Version.CURRENT, threadPool); mockTransportServices[i] = remoteSeedTransport; DiscoveryNode remoteSeedNode = remoteSeedTransport.getLocalDiscoNode(); knownNodes.add(remoteSeedNode); nodes[i] = remoteSeedNode; - builder.put("cluster.remote.remote" + i + ".seeds", remoteSeedNode.getAddress().toString()); - remoteIndicesByCluster.put("remote" + i, new OriginalIndices(new String[]{"index"}, IndicesOptions.lenientExpandOpen())); + settingsBuilder.put("cluster.remote.remote" + i + ".seeds", remoteSeedNode.getAddress().toString()); + remoteIndices.put("remote" + i, new OriginalIndices(new String[]{"index"}, IndicesOptions.lenientExpandOpen())); } + return mockTransportServices; + } + + private static SearchResponse emptySearchResponse() { + InternalSearchResponse response = new InternalSearchResponse(new SearchHits(new SearchHit[0], + new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN), InternalAggregations.EMPTY, null, null, false, null, 1); + return new SearchResponse(response, null, 1, 1, 0, 100, ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY); + } + + public void testCCSRemoteReduceMergeFails() throws Exception { + int numClusters = randomIntBetween(2, 10); + DiscoveryNode[] nodes = new DiscoveryNode[numClusters]; + Map remoteIndicesByCluster = new HashMap<>(); + Settings.Builder builder = Settings.builder(); + MockTransportService[] mockTransportServices = startTransport(numClusters, nodes, remoteIndicesByCluster, builder); Settings settings = builder.build(); + boolean local = randomBoolean(); + OriginalIndices localIndices = local ? new OriginalIndices(new String[]{"index"}, SearchRequest.DEFAULT_INDICES_OPTIONS) : null; + TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0); + Function reduceContext = finalReduce -> null; + try (MockTransportService service = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + RemoteClusterService remoteClusterService = service.getRemoteClusterService(); + SearchRequest searchRequest = new SearchRequest(); + searchRequest.preference("null_target"); + final CountDownLatch latch = new CountDownLatch(1); + SetOnce>> setOnce = new SetOnce<>(); + AtomicReference failure = new AtomicReference<>(); + LatchedActionListener listener = new LatchedActionListener<>( + ActionListener.wrap(r -> fail("no response expected"), failure::set), latch); + TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider, reduceContext, + remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l))); + if (localIndices == null) { + assertNull(setOnce.get()); + } else { + Tuple> tuple = setOnce.get(); + assertEquals("", tuple.v1().getLocalClusterAlias()); + assertThat(tuple.v2(), instanceOf(TransportSearchAction.CCSActionListener.class)); + tuple.v2().onResponse(emptySearchResponse()); + } + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertNotNull(failure.get()); + //the intention here is not to test that we throw NPE, rather to trigger a situation that makes + //SearchResponseMerger#getMergedResponse fail unexpectedly and verify that the listener is properly notified with the NPE + assertThat(failure.get(), instanceOf(NullPointerException.class)); + assertEquals(0, service.getConnectionManager().size()); + } finally { + for (MockTransportService mockTransportService : mockTransportServices) { + mockTransportService.close(); + } + } + } - try { - try (MockTransportService service = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null)) { - service.start(); - service.acceptIncomingRequests(); - RemoteClusterService remoteClusterService = service.getRemoteClusterService(); - { - final CountDownLatch latch = new CountDownLatch(1); - AtomicReference> response = new AtomicReference<>(); - AtomicInteger skippedClusters = new AtomicInteger(); - TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters, - remoteIndicesByCluster, remoteClusterService, threadPool, - new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch)); - awaitLatch(latch, 5, TimeUnit.SECONDS); - assertEquals(0, skippedClusters.get()); - assertNotNull(response.get()); - Map map = response.get(); - assertEquals(numClusters, map.size()); - for (int i = 0; i < numClusters; i++) { - String clusterAlias = "remote" + i; - assertTrue(map.containsKey(clusterAlias)); - ClusterSearchShardsResponse shardsResponse = map.get(clusterAlias); - assertEquals(1, shardsResponse.getNodes().length); - } + public void testCCSRemoteReduce() throws Exception { + int numClusters = randomIntBetween(2, 10); + DiscoveryNode[] nodes = new DiscoveryNode[numClusters]; + Map remoteIndicesByCluster = new HashMap<>(); + Settings.Builder builder = Settings.builder(); + MockTransportService[] mockTransportServices = startTransport(numClusters, nodes, remoteIndicesByCluster, builder); + Settings settings = builder.build(); + boolean local = randomBoolean(); + OriginalIndices localIndices = local ? new OriginalIndices(new String[]{"index"}, SearchRequest.DEFAULT_INDICES_OPTIONS) : null; + int totalClusters = numClusters + (local ? 1 : 0); + TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0); + Function reduceContext = finalReduce -> null; + try (MockTransportService service = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + RemoteClusterService remoteClusterService = service.getRemoteClusterService(); + { + SearchRequest searchRequest = new SearchRequest(); + final CountDownLatch latch = new CountDownLatch(1); + SetOnce>> setOnce = new SetOnce<>(); + AtomicReference response = new AtomicReference<>(); + LatchedActionListener listener = new LatchedActionListener<>( + ActionListener.wrap(response::set, e -> fail("no failures expected")), latch); + TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider, reduceContext, + remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l))); + if (localIndices == null) { + assertNull(setOnce.get()); + } else { + Tuple> tuple = setOnce.get(); + assertEquals("", tuple.v1().getLocalClusterAlias()); + assertThat(tuple.v2(), instanceOf(TransportSearchAction.CCSActionListener.class)); + tuple.v2().onResponse(emptySearchResponse()); } - { - final CountDownLatch latch = new CountDownLatch(1); - AtomicReference failure = new AtomicReference<>(); - AtomicInteger skippedClusters = new AtomicInteger(0); - TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), "index_not_found", null, skippedClusters, - remoteIndicesByCluster, remoteClusterService, threadPool, - new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch)); - awaitLatch(latch, 5, TimeUnit.SECONDS); - assertEquals(0, skippedClusters.get()); - assertNotNull(failure.get()); - assertThat(failure.get(), instanceOf(RemoteTransportException.class)); - RemoteTransportException remoteTransportException = (RemoteTransportException) failure.get(); - assertEquals(RestStatus.NOT_FOUND, remoteTransportException.status()); + awaitLatch(latch, 5, TimeUnit.SECONDS); + + SearchResponse searchResponse = response.get(); + assertEquals(0, searchResponse.getClusters().getSkipped()); + assertEquals(totalClusters, searchResponse.getClusters().getTotal()); + assertEquals(totalClusters, searchResponse.getClusters().getSuccessful()); + assertEquals(totalClusters + 1, searchResponse.getNumReducePhases()); + } + { + SearchRequest searchRequest = new SearchRequest(); + searchRequest.preference("index_not_found"); + final CountDownLatch latch = new CountDownLatch(1); + SetOnce>> setOnce = new SetOnce<>(); + AtomicReference failure = new AtomicReference<>(); + LatchedActionListener listener = new LatchedActionListener<>( + ActionListener.wrap(r -> fail("no response expected"), failure::set), latch); + TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider, reduceContext, + remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l))); + if (localIndices == null) { + assertNull(setOnce.get()); + } else { + Tuple> tuple = setOnce.get(); + assertEquals("", tuple.v1().getLocalClusterAlias()); + assertThat(tuple.v2(), instanceOf(TransportSearchAction.CCSActionListener.class)); + tuple.v2().onResponse(emptySearchResponse()); } + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertNotNull(failure.get()); + assertThat(failure.get(), instanceOf(RemoteTransportException.class)); + RemoteTransportException remoteTransportException = (RemoteTransportException) failure.get(); + assertEquals(RestStatus.NOT_FOUND, remoteTransportException.status()); + } - int numDisconnectedClusters = randomIntBetween(1, numClusters); - Set disconnectedNodes = new HashSet<>(numDisconnectedClusters); - Set disconnectedNodesIndices = new HashSet<>(numDisconnectedClusters); - while (disconnectedNodes.size() < numDisconnectedClusters) { - int i = randomIntBetween(0, numClusters - 1); - if (disconnectedNodes.add(nodes[i])) { - assertTrue(disconnectedNodesIndices.add(i)); - } + int numDisconnectedClusters = randomIntBetween(1, numClusters); + Set disconnectedNodes = new HashSet<>(numDisconnectedClusters); + Set disconnectedNodesIndices = new HashSet<>(numDisconnectedClusters); + while (disconnectedNodes.size() < numDisconnectedClusters) { + int i = randomIntBetween(0, numClusters - 1); + if (disconnectedNodes.add(nodes[i])) { + assertTrue(disconnectedNodesIndices.add(i)); } + } - CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters); - RemoteClusterServiceTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() { - @Override - public void onNodeDisconnected(DiscoveryNode node) { - if (disconnectedNodes.remove(node)) { - disconnectedLatch.countDown(); - } + CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters); + RemoteClusterServiceTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() { + @Override + public void onNodeDisconnected(DiscoveryNode node) { + if (disconnectedNodes.remove(node)) { + disconnectedLatch.countDown(); } - }); - for (DiscoveryNode disconnectedNode : disconnectedNodes) { - service.addFailToSendNoConnectRule(disconnectedNode.getAddress()); } + }); + for (DiscoveryNode disconnectedNode : disconnectedNodes) { + service.addFailToSendNoConnectRule(disconnectedNode.getAddress()); + } - { - final CountDownLatch latch = new CountDownLatch(1); - AtomicInteger skippedClusters = new AtomicInteger(0); - AtomicReference failure = new AtomicReference<>(); - TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters, - remoteIndicesByCluster, remoteClusterService, threadPool, - new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch)); - awaitLatch(latch, 5, TimeUnit.SECONDS); - assertEquals(0, skippedClusters.get()); - assertNotNull(failure.get()); - assertThat(failure.get(), instanceOf(RemoteTransportException.class)); - assertThat(failure.get().getMessage(), containsString("error while communicating with remote cluster [")); - assertThat(failure.get().getCause(), instanceOf(NodeDisconnectedException.class)); + { + SearchRequest searchRequest = new SearchRequest(); + final CountDownLatch latch = new CountDownLatch(1); + SetOnce>> setOnce = new SetOnce<>(); + AtomicReference failure = new AtomicReference<>(); + LatchedActionListener listener = new LatchedActionListener<>( + ActionListener.wrap(r -> fail("no response expected"), failure::set), latch); + TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider, reduceContext, + remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l))); + if (localIndices == null) { + assertNull(setOnce.get()); + } else { + Tuple> tuple = setOnce.get(); + assertEquals("", tuple.v1().getLocalClusterAlias()); + assertThat(tuple.v2(), instanceOf(TransportSearchAction.CCSActionListener.class)); + tuple.v2().onResponse(emptySearchResponse()); } + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertNotNull(failure.get()); + assertThat(failure.get(), instanceOf(RemoteTransportException.class)); + RemoteTransportException remoteTransportException = (RemoteTransportException) failure.get(); + assertThat(failure.get().getMessage(), containsString("error while communicating with remote cluster [")); + assertThat(failure.get().getCause(), instanceOf(NodeDisconnectedException.class)); + } - //setting skip_unavailable to true for all the disconnected clusters will make the request succeed again - for (int i : disconnectedNodesIndices) { - RemoteClusterServiceTests.updateSkipUnavailable(remoteClusterService, "remote" + i, true); + //setting skip_unavailable to true for all the disconnected clusters will make the request succeed again + for (int i : disconnectedNodesIndices) { + RemoteClusterServiceTests.updateSkipUnavailable(remoteClusterService, "remote" + i, true); + } + + { + SearchRequest searchRequest = new SearchRequest(); + final CountDownLatch latch = new CountDownLatch(1); + SetOnce>> setOnce = new SetOnce<>(); + AtomicReference response = new AtomicReference<>(); + LatchedActionListener listener = new LatchedActionListener<>( + ActionListener.wrap(response::set, e -> fail("no failures expected")), latch); + TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider, reduceContext, + remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l))); + if (localIndices == null) { + assertNull(setOnce.get()); + } else { + Tuple> tuple = setOnce.get(); + assertEquals("", tuple.v1().getLocalClusterAlias()); + assertThat(tuple.v2(), instanceOf(TransportSearchAction.CCSActionListener.class)); + tuple.v2().onResponse(emptySearchResponse()); } + awaitLatch(latch, 5, TimeUnit.SECONDS); + + SearchResponse searchResponse = response.get(); + assertEquals(disconnectedNodesIndices.size(), searchResponse.getClusters().getSkipped()); + assertEquals(totalClusters, searchResponse.getClusters().getTotal()); + int successful = totalClusters - disconnectedNodesIndices.size(); + assertEquals(successful, searchResponse.getClusters().getSuccessful()); + assertEquals(successful == 0 ? 0 : successful + 1, searchResponse.getNumReducePhases()); + } + + //give transport service enough time to realize that the node is down, and to notify the connection listeners + //so that RemoteClusterConnection is left with no connected nodes, hence it will retry connecting next + assertTrue(disconnectedLatch.await(5, TimeUnit.SECONDS)); - { - final CountDownLatch latch = new CountDownLatch(1); - AtomicInteger skippedClusters = new AtomicInteger(0); - AtomicReference> response = new AtomicReference<>(); - TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters, - remoteIndicesByCluster, remoteClusterService, threadPool, - new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch)); - awaitLatch(latch, 5, TimeUnit.SECONDS); - assertNotNull(response.get()); - Map map = response.get(); - assertEquals(numClusters - disconnectedNodesIndices.size(), map.size()); - assertEquals(skippedClusters.get(), disconnectedNodesIndices.size()); - for (int i = 0; i < numClusters; i++) { - String clusterAlias = "remote" + i; - if (disconnectedNodesIndices.contains(i)) { - assertFalse(map.containsKey(clusterAlias)); - } else { - assertNotNull(map.get(clusterAlias)); - } + service.clearAllRules(); + if (randomBoolean()) { + for (int i : disconnectedNodesIndices) { + if (randomBoolean()) { + RemoteClusterServiceTests.updateSkipUnavailable(remoteClusterService, "remote" + i, true); } + + } + } + { + SearchRequest searchRequest = new SearchRequest(); + final CountDownLatch latch = new CountDownLatch(1); + SetOnce>> setOnce = new SetOnce<>(); + AtomicReference response = new AtomicReference<>(); + LatchedActionListener listener = new LatchedActionListener<>( + ActionListener.wrap(response::set, e -> fail("no failures expected")), latch); + TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider, reduceContext, + remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l))); + if (localIndices == null) { + assertNull(setOnce.get()); + } else { + Tuple> tuple = setOnce.get(); + assertEquals("", tuple.v1().getLocalClusterAlias()); + assertThat(tuple.v2(), instanceOf(TransportSearchAction.CCSActionListener.class)); + tuple.v2().onResponse(emptySearchResponse()); } + awaitLatch(latch, 5, TimeUnit.SECONDS); + + SearchResponse searchResponse = response.get(); + assertEquals(0, searchResponse.getClusters().getSkipped()); + assertEquals(totalClusters, searchResponse.getClusters().getTotal()); + assertEquals(totalClusters, searchResponse.getClusters().getSuccessful()); + assertEquals(totalClusters + 1, searchResponse.getNumReducePhases()); + } + assertEquals(0, service.getConnectionManager().size()); + } finally { + for (MockTransportService mockTransportService : mockTransportServices) { + mockTransportService.close(); + } + } + } - //give transport service enough time to realize that the node is down, and to notify the connection listeners - //so that RemoteClusterConnection is left with no connected nodes, hence it will retry connecting next - assertTrue(disconnectedLatch.await(5, TimeUnit.SECONDS)); + public void testCollectSearchShards() throws Exception { + int numClusters = randomIntBetween(2, 10); + DiscoveryNode[] nodes = new DiscoveryNode[numClusters]; + Map remoteIndicesByCluster = new HashMap<>(); + Settings.Builder builder = Settings.builder(); + MockTransportService[] mockTransportServices = startTransport(numClusters, nodes, remoteIndicesByCluster, builder); + Settings settings = builder.build(); + try (MockTransportService service = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + RemoteClusterService remoteClusterService = service.getRemoteClusterService(); + { + final CountDownLatch latch = new CountDownLatch(1); + AtomicReference> response = new AtomicReference<>(); + AtomicInteger skippedClusters = new AtomicInteger(); + TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters, + remoteIndicesByCluster, remoteClusterService, threadPool, + new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch)); + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertEquals(0, skippedClusters.get()); + assertNotNull(response.get()); + Map map = response.get(); + assertEquals(numClusters, map.size()); + for (int i = 0; i < numClusters; i++) { + String clusterAlias = "remote" + i; + assertTrue(map.containsKey(clusterAlias)); + ClusterSearchShardsResponse shardsResponse = map.get(clusterAlias); + assertEquals(1, shardsResponse.getNodes().length); + } + } + { + final CountDownLatch latch = new CountDownLatch(1); + AtomicReference failure = new AtomicReference<>(); + AtomicInteger skippedClusters = new AtomicInteger(0); + TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), "index_not_found", null, skippedClusters, + remoteIndicesByCluster, remoteClusterService, threadPool, + new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch)); + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertEquals(0, skippedClusters.get()); + assertNotNull(failure.get()); + assertThat(failure.get(), instanceOf(RemoteTransportException.class)); + RemoteTransportException remoteTransportException = (RemoteTransportException) failure.get(); + assertEquals(RestStatus.NOT_FOUND, remoteTransportException.status()); + } - service.clearAllRules(); - if (randomBoolean()) { - for (int i : disconnectedNodesIndices) { - if (randomBoolean()) { - RemoteClusterServiceTests.updateSkipUnavailable(remoteClusterService, "remote" + i, true); - } + int numDisconnectedClusters = randomIntBetween(1, numClusters); + Set disconnectedNodes = new HashSet<>(numDisconnectedClusters); + Set disconnectedNodesIndices = new HashSet<>(numDisconnectedClusters); + while (disconnectedNodes.size() < numDisconnectedClusters) { + int i = randomIntBetween(0, numClusters - 1); + if (disconnectedNodes.add(nodes[i])) { + assertTrue(disconnectedNodesIndices.add(i)); + } + } + CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters); + RemoteClusterServiceTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() { + @Override + public void onNodeDisconnected(DiscoveryNode node) { + if (disconnectedNodes.remove(node)) { + disconnectedLatch.countDown(); } } - { - final CountDownLatch latch = new CountDownLatch(1); - AtomicInteger skippedClusters = new AtomicInteger(0); - AtomicReference> response = new AtomicReference<>(); - TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters, - remoteIndicesByCluster, remoteClusterService, threadPool, - new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch)); - awaitLatch(latch, 5, TimeUnit.SECONDS); - assertEquals(0, skippedClusters.get()); - assertNotNull(response.get()); - Map map = response.get(); - assertEquals(numClusters, map.size()); - for (int i = 0; i < numClusters; i++) { - String clusterAlias = "remote" + i; - assertTrue(map.containsKey(clusterAlias)); + }); + for (DiscoveryNode disconnectedNode : disconnectedNodes) { + service.addFailToSendNoConnectRule(disconnectedNode.getAddress()); + } + + { + final CountDownLatch latch = new CountDownLatch(1); + AtomicInteger skippedClusters = new AtomicInteger(0); + AtomicReference failure = new AtomicReference<>(); + TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters, + remoteIndicesByCluster, remoteClusterService, threadPool, + new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch)); + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertEquals(0, skippedClusters.get()); + assertNotNull(failure.get()); + assertThat(failure.get(), instanceOf(RemoteTransportException.class)); + assertThat(failure.get().getMessage(), containsString("error while communicating with remote cluster [")); + assertThat(failure.get().getCause(), instanceOf(NodeDisconnectedException.class)); + } + + //setting skip_unavailable to true for all the disconnected clusters will make the request succeed again + for (int i : disconnectedNodesIndices) { + RemoteClusterServiceTests.updateSkipUnavailable(remoteClusterService, "remote" + i, true); + } + + { + final CountDownLatch latch = new CountDownLatch(1); + AtomicInteger skippedClusters = new AtomicInteger(0); + AtomicReference> response = new AtomicReference<>(); + TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters, + remoteIndicesByCluster, remoteClusterService, threadPool, + new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch)); + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertNotNull(response.get()); + Map map = response.get(); + assertEquals(numClusters - disconnectedNodesIndices.size(), map.size()); + assertEquals(skippedClusters.get(), disconnectedNodesIndices.size()); + for (int i = 0; i < numClusters; i++) { + String clusterAlias = "remote" + i; + if (disconnectedNodesIndices.contains(i)) { + assertFalse(map.containsKey(clusterAlias)); + } else { assertNotNull(map.get(clusterAlias)); } } - assertEquals(0, service.getConnectionManager().size()); } + + //give transport service enough time to realize that the node is down, and to notify the connection listeners + //so that RemoteClusterConnection is left with no connected nodes, hence it will retry connecting next + assertTrue(disconnectedLatch.await(5, TimeUnit.SECONDS)); + + service.clearAllRules(); + if (randomBoolean()) { + for (int i : disconnectedNodesIndices) { + if (randomBoolean()) { + RemoteClusterServiceTests.updateSkipUnavailable(remoteClusterService, "remote" + i, true); + } + + } + } + { + final CountDownLatch latch = new CountDownLatch(1); + AtomicInteger skippedClusters = new AtomicInteger(0); + AtomicReference> response = new AtomicReference<>(); + TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters, + remoteIndicesByCluster, remoteClusterService, threadPool, + new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch)); + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertEquals(0, skippedClusters.get()); + assertNotNull(response.get()); + Map map = response.get(); + assertEquals(numClusters, map.size()); + for (int i = 0; i < numClusters; i++) { + String clusterAlias = "remote" + i; + assertTrue(map.containsKey(clusterAlias)); + assertNotNull(map.get(clusterAlias)); + } + } + assertEquals(0, service.getConnectionManager().size()); } finally { for (MockTransportService mockTransportService : mockTransportServices) { mockTransportService.close(); } } } + + public void testCreateSearchResponseMerger() { + TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0); + Function reduceContext = flag -> null; + { + SearchSourceBuilder source = new SearchSourceBuilder(); + assertEquals(-1, source.size()); + assertEquals(-1, source.from()); + assertNull(source.trackTotalHitsUpTo()); + SearchResponseMerger merger = TransportSearchAction.createSearchResponseMerger(source, timeProvider, reduceContext); + assertEquals(0, merger.from); + assertEquals(10, merger.size); + assertEquals(SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO, merger.trackTotalHitsUpTo); + assertEquals(0, source.from()); + assertEquals(10, source.size()); + assertNull(source.trackTotalHitsUpTo()); + } + { + SearchResponseMerger merger = TransportSearchAction.createSearchResponseMerger(null, timeProvider, reduceContext); + assertEquals(0, merger.from); + assertEquals(10, merger.size); + assertEquals(SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO, merger.trackTotalHitsUpTo); + } + { + SearchSourceBuilder source = new SearchSourceBuilder(); + int originalFrom = randomIntBetween(0, 1000); + source.from(originalFrom); + int originalSize = randomIntBetween(0, 1000); + source.size(originalSize); + int trackTotalHitsUpTo = randomIntBetween(0, Integer.MAX_VALUE); + source.trackTotalHitsUpTo(trackTotalHitsUpTo); + SearchResponseMerger merger = TransportSearchAction.createSearchResponseMerger(source, timeProvider, reduceContext); + assertEquals(0, source.from()); + assertEquals(originalFrom + originalSize, source.size()); + assertEquals(trackTotalHitsUpTo, (int)source.trackTotalHitsUpTo()); + assertEquals(originalFrom, merger.from); + assertEquals(originalSize, merger.size); + assertEquals(trackTotalHitsUpTo, merger.trackTotalHitsUpTo); + } + } + + public void testShouldMinimizeRoundtrips() throws Exception { + { + SearchRequest searchRequest = new SearchRequest(); + assertTrue(TransportSearchAction.shouldMinimizeRoundtrips(searchRequest)); + } + { + SearchRequest searchRequest = new SearchRequest(); + searchRequest.source(new SearchSourceBuilder()); + assertTrue(TransportSearchAction.shouldMinimizeRoundtrips(searchRequest)); + } + { + SearchRequest searchRequest = new SearchRequest(); + searchRequest.scroll("5s"); + assertFalse(TransportSearchAction.shouldMinimizeRoundtrips(searchRequest)); + } + { + SearchRequest searchRequest = new SearchRequest(); + SearchSourceBuilder source = new SearchSourceBuilder(); + searchRequest.source(source); + CollapseBuilder collapseBuilder = new CollapseBuilder("field"); + source.collapse(collapseBuilder); + collapseBuilder.setInnerHits(new InnerHitBuilder("inner")); + assertFalse(TransportSearchAction.shouldMinimizeRoundtrips(searchRequest)); + } + { + SearchRequestTests searchRequestTests = new SearchRequestTests(); + searchRequestTests.setUp(); + SearchRequest searchRequest = searchRequestTests.createSearchRequest(); + searchRequest.scroll((Scroll)null); + SearchSourceBuilder source = searchRequest.source(); + if (source != null) { + CollapseBuilder collapse = source.collapse(); + if (collapse != null) { + collapse.setInnerHits(Collections.emptyList()); + } + } + searchRequest.setCcsMinimizeRoundtrips(true); + assertTrue(TransportSearchAction.shouldMinimizeRoundtrips(searchRequest)); + searchRequest.setCcsMinimizeRoundtrips(false); + assertFalse(TransportSearchAction.shouldMinimizeRoundtrips(searchRequest)); + } + } } diff --git a/server/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java b/server/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java index 430cd900660c3..948e29d5d67de 100644 --- a/server/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java +++ b/server/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java @@ -813,7 +813,7 @@ public void testInvalidEnum() throws IOException { assertEquals(0, input.available()); } - private void assertEqualityAfterSerialize(TimeValue value, int expectedSize) throws IOException { + private static void assertEqualityAfterSerialize(TimeValue value, int expectedSize) throws IOException { BytesStreamOutput out = new BytesStreamOutput(); out.writeTimeValue(value); assertEquals(expectedSize, out.size()); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java index 3f85d927e9295..6e9c2e4eaf320 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java @@ -62,10 +62,10 @@ public void testConnectAndExecuteRequest() throws Exception { ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get(); assertNotNull(clusterStateResponse); assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value()); - // also test a failure, there is no handler for search registered + // also test a failure, there is no handler for scroll registered ActionNotFoundTransportException ex = expectThrows(ActionNotFoundTransportException.class, - () -> client.prepareSearch().get()); - assertEquals("No handler for action [indices:data/read/search]", ex.getMessage()); + () -> client.prepareSearchScroll("").get()); + assertEquals("No handler for action [indices:data/read/scroll]", ex.getMessage()); } } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 3ec2506da244e..9eddac80a17c0 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.transport; +import org.apache.lucene.search.TotalHits; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -29,6 +30,10 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -47,6 +52,10 @@ import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.mocksocket.MockServerSocket; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.junit.annotations.TestLogging; @@ -130,6 +139,24 @@ public static MockTransportService startTransport( knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap())); } }); + newService.registerRequestHandler(SearchAction.NAME, ThreadPool.Names.SAME, SearchRequest::new, + (request, channel, task) -> { + if ("index_not_found".equals(request.preference())) { + channel.sendResponse(new IndexNotFoundException("index")); + return; + } + SearchHits searchHits; + if ("null_target".equals(request.preference())) { + searchHits = new SearchHits(new SearchHit[] {new SearchHit(0)}, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1F); + } else { + searchHits = new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN); + } + InternalSearchResponse response = new InternalSearchResponse(searchHits, + InternalAggregations.EMPTY, null, null, false, null, 1); + SearchResponse searchResponse = new SearchResponse(response, null, 1, 1, 0, 100, ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY); + channel.sendResponse(searchResponse); + }); newService.registerRequestHandler(ClusterStateAction.NAME, ThreadPool.Names.SAME, ClusterStateRequest::new, (request, channel, task) -> { DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); diff --git a/test/framework/src/main/java/org/elasticsearch/search/RandomSearchRequestGenerator.java b/test/framework/src/main/java/org/elasticsearch/search/RandomSearchRequestGenerator.java index 58dbe869b5c71..df554ea42de28 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/RandomSearchRequestGenerator.java +++ b/test/framework/src/main/java/org/elasticsearch/search/RandomSearchRequestGenerator.java @@ -84,18 +84,11 @@ private RandomSearchRequestGenerator() {} * {@link #randomSearchSourceBuilder(Supplier, Supplier, Supplier, Supplier, Supplier)}. */ public static SearchRequest randomSearchRequest(Supplier randomSearchSourceBuilder) { - return randomSearchRequest(new SearchRequest(), randomSearchSourceBuilder); - } - - /** - * Set random fields to the provided search request. - * - * @param searchRequest the search request - * @param randomSearchSourceBuilder builds a random {@link SearchSourceBuilder}. You can use - * {@link #randomSearchSourceBuilder(Supplier, Supplier, Supplier, Supplier, Supplier)}. - */ - public static SearchRequest randomSearchRequest(SearchRequest searchRequest, Supplier randomSearchSourceBuilder) { + SearchRequest searchRequest = new SearchRequest(); searchRequest.allowPartialSearchResults(true); + if (randomBoolean()) { + searchRequest.setCcsMinimizeRoundtrips(randomBoolean()); + } if (randomBoolean()) { searchRequest.indices(generateRandomStringArray(10, 10, false, false)); } diff --git a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml index 1ebd18ccaa3ac..fa8172697287e 100644 --- a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml +++ b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml @@ -97,6 +97,9 @@ teardown: terms: field: f1.keyword + - match: {_clusters.total: 2} + - match: {_clusters.successful: 2} + - match: {_clusters.skipped: 0} - match: { _shards.total: 5 } - match: { hits.total: 6} - match: { hits.hits.0._index: "my_remote_cluster:test_index"} @@ -115,6 +118,9 @@ teardown: terms: field: f1.keyword + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} - match: { _shards.total: 3 } - match: { hits.total: 6} - match: { hits.hits.0._index: "my_remote_cluster:test_index"} @@ -134,6 +140,9 @@ teardown: terms: field: f1.keyword + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} - match: { _shards.total: 3 } - match: { hits.total: 6} - match: { hits.hits.0._index: "my_remote_cluster:test_index"} @@ -152,6 +161,7 @@ teardown: terms: field: f1.keyword + - is_false: _clusters - match: { _shards.total: 2 } - match: { hits.total: 5} - match: { hits.hits.0._index: "local_index"} @@ -182,6 +192,9 @@ teardown: rest_total_hits_as_int: true index: test_remote_cluster:test_index + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} - match: { _shards.total: 3 } - match: { hits.total: 6 } - match: { hits.hits.0._index: "test_remote_cluster:test_index" } @@ -193,6 +206,9 @@ teardown: rest_total_hits_as_int: true index: "*_remote_cluster:test_ind*" + - match: {_clusters.total: 2} + - match: {_clusters.successful: 2} + - match: {_clusters.skipped: 0} - match: { _shards.total: 6 } - match: { hits.total: 12 } @@ -205,6 +221,9 @@ teardown: rest_total_hits_as_int: true index: my_remote_cluster:aliased_test_index + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} - match: { _shards.total: 3 } - match: { hits.total: 2 } - match: { hits.hits.0._source.filter_field: 1 } @@ -219,6 +238,9 @@ teardown: rest_total_hits_as_int: true index: my_remote_cluster:secure_alias # TODO make this a wildcard once + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} - match: { _shards.total: 2 } - match: { hits.total: 1 } - is_true: hits.hits.0._source.secure diff --git a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/40_scroll.yml b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/40_scroll.yml index 6875df0847d1a..0026df4978075 100644 --- a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/40_scroll.yml +++ b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/40_scroll.yml @@ -52,6 +52,9 @@ teardown: query: match_all: {} + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} - set: {_scroll_id: scroll_id} - match: {hits.total: 6 } - length: {hits.hits: 4 } @@ -66,6 +69,7 @@ teardown: rest_total_hits_as_int: true body: { "scroll_id": "$scroll_id", "scroll": "1m"} + - is_false: _clusters - match: {hits.total: 6 } - length: {hits.hits: 2 } - match: {hits.hits.0._source.filter_field: 1 } @@ -100,6 +104,9 @@ teardown: query: match_all: {} + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} - set: {_scroll_id: scroll_id} - match: {hits.total: 6 } - length: {hits.hits: 4 } diff --git a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/60_skip_shards.yml b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/60_skip_shards.yml index e7842db70d263..d74e82edca7f0 100644 --- a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/60_skip_shards.yml +++ b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/60_skip_shards.yml @@ -66,6 +66,7 @@ teardown: - do: headers: { Authorization: "Basic am9lOnMza3JpdA==" } search: + ccs_minimize_roundtrips: false rest_total_hits_as_int: true index: "skip_shards_index,my_remote_cluster:single_doc_index" pre_filter_shard_size: 1 @@ -83,6 +84,7 @@ teardown: - do: headers: { Authorization: "Basic am9lOnMza3JpdA==" } search: + ccs_minimize_roundtrips: false rest_total_hits_as_int: true index: "skip_shards_index,my_remote_cluster:single_doc_index" pre_filter_shard_size: 1