From 3e1be72b5806a76132184394558168c85fe3aab8 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Mon, 24 Sep 2018 11:45:33 +0200 Subject: [PATCH] Clarify RemoteClusterService#groupIndices behaviour (#33899) When executing a cross-cluster search, we need to search against all local indices (and no remote indices) in case no indices are specified. Also, if only remote indices are specified, no local indices will be queried. We previously added empty local indices whenever they were not present in the map of the grouped indices, then we would act differently later based on the extracted remote indices. Instead, we now add the empty array for local indices only in case we need to search all local indices; the entry for local indices is not added when local indices should not be searched. This way the grouped indices reflect reality and provide a better indication of what indices will be searched. --- .../TransportFieldCapabilitiesAction.java | 5 +- .../action/search/TransportSearchAction.java | 30 ++++---- .../transport/RemoteClusterService.java | 16 +++-- .../search/TransportSearchActionTests.java | 9 ++- .../transport/RemoteClusterServiceTests.java | 71 +++++++++++++++++-- 5 files changed, 99 insertions(+), 32 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index b8d1f477ac108..64e411d0fe2f9 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -70,9 +70,8 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti request.indices(), idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState)); final OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); final String[] concreteIndices; - if (remoteClusterIndices.isEmpty() == false && localIndices.indices().length == 0) { - // in the case we have one or more remote indices but no local we don't expand to all local indices and just do remote - // indices + if (localIndices == null) { + // in the case we have one or more remote indices but no local we don't expand to all local indices and just do remote indices concreteIndices = Strings.EMPTY_ARRAY; } else { concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, localIndices); diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 5c0b2eb39ed51..b7d6c00235074 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -193,7 +194,7 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener< searchRequest.indices(), idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState)); OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); if (remoteClusterIndices.isEmpty()) { - executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, remoteClusterIndices, Collections.emptyList(), + executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, Collections.emptyList(), (clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, SearchResponse.Clusters.EMPTY); } else { remoteClusterService.collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), @@ -203,7 +204,7 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener< BiFunction clusterNodeLookup = processRemoteShards(searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters); SearchResponse.Clusters clusters = buildClusters(localIndices, remoteClusterIndices, searchShardsResponses); - executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteClusterIndices, + executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener, clusters); }, listener::onFailure)); @@ -219,7 +220,7 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener< static SearchResponse.Clusters buildClusters(OriginalIndices localIndices, Map remoteIndices, Map searchShardsResponses) { - int localClusters = Math.min(localIndices.indices().length, 1); + int localClusters = localIndices == null ? 0 : 1; int totalClusters = remoteIndices.size() + localClusters; int successfulClusters = localClusters; for (ClusterSearchShardsResponse searchShardsResponse : searchShardsResponses.values()) { @@ -277,8 +278,19 @@ static BiFunction processRemoteShards(Map remoteClusterIndices, List remoteShardIterators, + private Index[] resolveLocalIndices(OriginalIndices localIndices, + IndicesOptions indicesOptions, + ClusterState clusterState, + SearchTimeProvider timeProvider) { + if (localIndices == null) { + return Index.EMPTY_ARRAY; //don't search on any local index (happens when only remote indices were specified) + } + return indexNameExpressionResolver.concreteIndices(clusterState, indicesOptions, + timeProvider.getAbsoluteStartMillis(), localIndices.indices()); + } + + private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, + OriginalIndices localIndices, List remoteShardIterators, BiFunction remoteConnections, ClusterState clusterState, Map remoteAliasMap, ActionListener listener, SearchResponse.Clusters clusters) { @@ -287,13 +299,7 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea // TODO: I think startTime() should become part of ActionRequest and that should be used both for index name // date math expressions and $now in scripts. This way all apis will deal with now in the same way instead // of just for the _search api - final Index[] indices; - if (localIndices.indices().length == 0 && remoteClusterIndices.isEmpty() == false) { - indices = Index.EMPTY_ARRAY; // don't search on _all if only remote indices were specified - } else { - indices = indexNameExpressionResolver.concreteIndices(clusterState, searchRequest.indicesOptions(), - timeProvider.getAbsoluteStartMillis(), localIndices.indices()); - } + final Index[] indices = resolveLocalIndices(localIndices, searchRequest.indicesOptions(), clusterState, timeProvider); Map aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap); Map> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(), searchRequest.indices()); diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 75891ef820c02..dc3bd3a353604 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -262,14 +262,16 @@ public Map groupIndices(IndicesOptions indicesOptions, Map originalIndicesMap = new HashMap<>(); if (isCrossClusterSearchEnabled()) { final Map> groupedIndices = groupClusterIndices(indices, indexExists); - for (Map.Entry> entry : groupedIndices.entrySet()) { - String clusterAlias = entry.getKey(); - List originalIndices = entry.getValue(); - originalIndicesMap.put(clusterAlias, - new OriginalIndices(originalIndices.toArray(new String[originalIndices.size()]), indicesOptions)); - } - if (originalIndicesMap.containsKey(LOCAL_CLUSTER_GROUP_KEY) == false) { + if (groupedIndices.isEmpty()) { + //search on _all in the local cluster if neither local indices nor remote indices were specified originalIndicesMap.put(LOCAL_CLUSTER_GROUP_KEY, new OriginalIndices(Strings.EMPTY_ARRAY, indicesOptions)); + } else { + for (Map.Entry> entry : groupedIndices.entrySet()) { + String clusterAlias = entry.getKey(); + List originalIndices = entry.getValue(); + originalIndicesMap.put(clusterAlias, + new OriginalIndices(originalIndices.toArray(new String[originalIndices.size()]), indicesOptions)); + } } } else { originalIndicesMap.put(LOCAL_CLUSTER_GROUP_KEY, new OriginalIndices(indices, indicesOptions)); diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 902a8ad97a0c4..c763709a04e40 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -44,7 +44,6 @@ import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.TransportService; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -64,7 +63,7 @@ public void tearDown() throws Exception { ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); } - public void testMergeShardsIterators() throws IOException { + public void testMergeShardsIterators() { List localShardIterators = new ArrayList<>(); { ShardId shardId = new ShardId("local_index", "local_index_uuid", 0); @@ -146,7 +145,7 @@ public void testMergeShardsIterators() throws IOException { } } - public void testProcessRemoteShards() throws IOException { + public void testProcessRemoteShards() { try (TransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { RemoteClusterService service = transportService.getRemoteClusterService(); @@ -241,12 +240,12 @@ public void testProcessRemoteShards() throws IOException { } public void testBuildClusters() { - OriginalIndices localIndices = randomOriginalIndices(); + OriginalIndices localIndices = randomBoolean() ? null : randomOriginalIndices(); Map remoteIndices = new HashMap<>(); Map searchShardsResponses = new HashMap<>(); int numRemoteClusters = randomIntBetween(0, 10); boolean onlySuccessful = randomBoolean(); - int localClusters = localIndices.indices().length == 0 ? 0 : 1; + int localClusters = localIndices == null ? 0 : 1; int total = numRemoteClusters + localClusters; int successful = localClusters; int skipped = 0; diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index ac8f578a67ac5..6e92e70e4aed1 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.AbstractScopedSettings; import org.elasticsearch.common.settings.ClusterSettings; @@ -151,7 +152,6 @@ public void testBuildRemoteClustersDynamicConfig() throws Exception { assertEquals(boom.getVersion(), Version.CURRENT.minimumCompatibilityVersion()); } - public void testGroupClusterIndices() throws IOException { List knownNodes = new CopyOnWriteArrayList<>(); try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); @@ -179,10 +179,9 @@ public void testGroupClusterIndices() throws IOException { Map> perClusterIndices = service.groupClusterIndices(new String[]{"foo:bar", "cluster_1:bar", "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo", "cluster*:baz", "*:boo", "no*match:boo"}, i -> false); - String[] localIndices = perClusterIndices.computeIfAbsent(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, - k -> Collections.emptyList()).toArray(new String[0]); - assertNotNull(perClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)); - assertArrayEquals(new String[]{"foo:bar", "foo", "no*match:boo"}, localIndices); + List localIndices = perClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + assertNotNull(localIndices); + assertEquals(Arrays.asList("foo:bar", "foo", "no*match:boo"), localIndices); assertEquals(2, perClusterIndices.size()); assertEquals(Arrays.asList("bar", "test", "baz", "boo"), perClusterIndices.get("cluster_1")); assertEquals(Arrays.asList("foo:bar", "foo*", "baz", "boo"), perClusterIndices.get("cluster_2")); @@ -198,6 +197,68 @@ public void testGroupClusterIndices() throws IOException { } } + public void testGroupIndices() throws IOException { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + DiscoveryNode otherSeedNode = otherSeedTransport.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(otherSeedTransport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, + null)) { + transportService.start(); + transportService.acceptIncomingRequests(); + Settings.Builder builder = Settings.builder(); + builder.putList("cluster.remote.cluster_1.seeds", seedNode.getAddress().toString()); + builder.putList("cluster.remote.cluster_2.seeds", otherSeedNode.getAddress().toString()); + try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { + assertFalse(service.isCrossClusterSearchEnabled()); + service.initializeRemoteClusters(); + assertTrue(service.isCrossClusterSearchEnabled()); + assertTrue(service.isRemoteClusterRegistered("cluster_1")); + assertTrue(service.isRemoteClusterRegistered("cluster_2")); + assertFalse(service.isRemoteClusterRegistered("foo")); + { + Map perClusterIndices = service.groupIndices(IndicesOptions.LENIENT_EXPAND_OPEN, + new String[]{"foo:bar", "cluster_1:bar", "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo", + "cluster*:baz", "*:boo", "no*match:boo"}, + i -> false); + assertEquals(3, perClusterIndices.size()); + assertArrayEquals(new String[]{"foo:bar", "foo", "no*match:boo"}, + perClusterIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).indices()); + assertArrayEquals(new String[]{"bar", "test", "baz", "boo"}, perClusterIndices.get("cluster_1").indices()); + assertArrayEquals(new String[]{"foo:bar", "foo*", "baz", "boo"}, perClusterIndices.get("cluster_2").indices()); + } + { + IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> + service.groupClusterIndices(new String[]{"foo:bar", "cluster_1:bar", + "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo"}, "cluster_1:bar"::equals)); + assertEquals("Can not filter indices; index cluster_1:bar exists but there is also a remote cluster named:" + + " cluster_1", iae.getMessage()); + } + { + Map perClusterIndices = service.groupIndices(IndicesOptions.LENIENT_EXPAND_OPEN, + new String[]{"cluster_1:bar", "cluster_2:foo*"}, + i -> false); + assertEquals(2, perClusterIndices.size()); + assertArrayEquals(new String[]{"bar"}, perClusterIndices.get("cluster_1").indices()); + assertArrayEquals(new String[]{"foo*"}, perClusterIndices.get("cluster_2").indices()); + } + { + Map perClusterIndices = service.groupIndices(IndicesOptions.LENIENT_EXPAND_OPEN, + Strings.EMPTY_ARRAY, + i -> false); + assertEquals(1, perClusterIndices.size()); + assertArrayEquals(Strings.EMPTY_ARRAY, perClusterIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).indices()); + } + } + } + } + } + public void testIncrementallyAddClusters() throws IOException { List knownNodes = new CopyOnWriteArrayList<>(); try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT);