From a5cbef9d1bc5b3ec24398c7dd793179f32951b64 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 19 Feb 2019 11:32:21 -0700 Subject: [PATCH] Rebuild remote connections on profile changes (#37678) Currently remote compression and ping schedule settings are dynamic. However, we do not listen for changes. This commit adds listeners for changes to those two settings. Additionally, when those settings change we now close existing connections and open new ones with the settings applied. Fixes #37201. --- .../modules/remote-clusters.asciidoc | 74 ++++++++++---- .../transport/ConnectionManager.java | 51 ++++------ .../transport/RemoteClusterAware.java | 30 ++++-- .../transport/RemoteClusterConnection.java | 34 +++---- .../transport/RemoteClusterService.java | 97 ++++++++++++++++--- .../transport/TransportService.java | 2 +- .../discovery/PeerFinderTests.java | 2 +- .../transport/ConnectionManagerTests.java | 2 +- .../RemoteClusterConnectionTests.java | 43 ++++---- .../transport/RemoteClusterServiceTests.java | 60 +++++++++--- .../test/transport/MockTransport.java | 2 +- .../test/transport/MockTransportService.java | 2 +- .../transport/StubbableConnectionManager.java | 2 +- .../xpack/ccr/CcrRepositoryManager.java | 4 +- .../xpack/ccr/IndexFollowingIT.java | 66 +++++++++++++ .../authz/IndicesAndAliasesResolver.java | 4 +- 16 files changed, 342 insertions(+), 133 deletions(-) diff --git a/docs/reference/modules/remote-clusters.asciidoc b/docs/reference/modules/remote-clusters.asciidoc index 768eb7d6117bf..aefa750a92eb0 100644 --- a/docs/reference/modules/remote-clusters.asciidoc +++ b/docs/reference/modules/remote-clusters.asciidoc @@ -27,21 +27,16 @@ more _gateway nodes_ and uses them to federate requests to the remote cluster. [float] [[configuring-remote-clusters]] -=== Configuring Remote Clusters +=== Configuring remote clusters -Remote clusters can be specified globally using -<> (which can be updated dynamically), -or local to individual nodes using the `elasticsearch.yml` file. +You can configure remote clusters globally by using +<>, which you can update dynamically. +Alternatively, you can configure them locally on individual nodes by using the `elasticsearch.yml` file. -If a remote cluster is configured via `elasticsearch.yml` only the nodes with -that configuration will be able to connect to the remote cluster. In other -words, functionality that relies on remote cluster requests will have to be -driven specifically from those nodes. Remote clusters set via the -<> will be available on every node -in the cluster. - -The `elasticsearch.yml` config file for a node that connects to remote clusters -needs to list the remote clusters that should be connected to, for instance: +If you specify the settings in `elasticsearch.yml` files, only the nodes with +those settings can connect to the remote cluster. In other words, functionality +that relies on remote cluster requests must be driven specifically from those +nodes. For example: [source,yaml] -------------------------------- @@ -49,17 +44,22 @@ cluster: remote: cluster_one: <1> seeds: 127.0.0.1:9300 - cluster_two: <1> + transport.ping_schedule: 30s <2> + cluster_two: seeds: 127.0.0.1:9301 + transport.compress: true <3> -------------------------------- <1> `cluster_one` and `cluster_two` are arbitrary _cluster aliases_ representing the connection to each cluster. These names are subsequently used to distinguish between local and remote indices. +<2> A keep-alive ping is configured for `cluster_one`. +<3> Compression is explicitly enabled for requests to `cluster_two`. + +For more information about the optional transport settings, see +<>. -The equivalent example using the <> to add remote clusters to all nodes in the cluster would look like the -following: +If you use <>, the remote clusters are available on every node in the cluster. For example: [source,js] -------------------------------- @@ -71,12 +71,14 @@ PUT _cluster/settings "cluster_one": { "seeds": [ "127.0.0.1:9300" - ] + ], + "transport.ping_schedule": "30s" }, "cluster_two": { "seeds": [ "127.0.0.1:9301" - ] + ], + "transport.compress": true }, "cluster_three": { "seeds": [ @@ -92,6 +94,40 @@ PUT _cluster/settings // TEST[setup:host] // TEST[s/127.0.0.1:9300/\${transport_host}/] +You can dynamically update the compression and ping schedule settings. However, +you must re-include seeds in the settings update request. For example: + +[source,js] +-------------------------------- +PUT _cluster/settings +{ + "persistent": { + "cluster": { + "remote": { + "cluster_one": { + "seeds": [ + "127.0.0.1:9300" + ], + "transport.ping_schedule": "60s" + }, + "cluster_two": { + "seeds": [ + "127.0.0.1:9301" + ], + "transport.compress": false + } + } + } + } +} +-------------------------------- +// CONSOLE +// TEST[continued] + +NOTE: When the compression or ping schedule settings change, all the existing +node connections must close and re-open, which can cause in-flight requests to +fail. + A remote cluster can be deleted from the cluster settings by setting its seeds to `null`: diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java index f1067a0c5575f..da86ed076e396 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java @@ -30,7 +30,6 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; import java.io.IOException; @@ -38,8 +37,6 @@ import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -56,19 +53,17 @@ public class ConnectionManager implements Closeable { private final ConcurrentMap connectedNodes = ConcurrentCollections.newConcurrentMap(); private final KeyedLock connectionLock = new KeyedLock<>(); private final Transport transport; - private final ThreadPool threadPool; private final ConnectionProfile defaultProfile; private final AtomicBoolean isClosed = new AtomicBoolean(false); private final ReadWriteLock closeLock = new ReentrantReadWriteLock(); private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener(); - public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) { - this(ConnectionProfile.buildDefaultConnectionProfile(settings), transport, threadPool); + public ConnectionManager(Settings settings, Transport transport) { + this(ConnectionProfile.buildDefaultConnectionProfile(settings), transport); } - public ConnectionManager(ConnectionProfile connectionProfile, Transport transport, ThreadPool threadPool) { + public ConnectionManager(ConnectionProfile connectionProfile, Transport transport) { this.transport = transport; - this.threadPool = threadPool; this.defaultProfile = connectionProfile; } @@ -185,35 +180,23 @@ public int size() { @Override public void close() { + Transports.assertNotTransportThread("Closing ConnectionManager"); if (isClosed.compareAndSet(false, true)) { - CountDownLatch latch = new CountDownLatch(1); - - // TODO: Consider moving all read/write lock (in Transport and this class) to the TransportService - threadPool.generic().execute(() -> { - closeLock.writeLock().lock(); - try { - // we are holding a write lock so nobody adds to the connectedNodes / openConnections map - it's safe to first close - // all instances and then clear them maps - Iterator> iterator = connectedNodes.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry next = iterator.next(); - try { - IOUtils.closeWhileHandlingException(next.getValue()); - } finally { - iterator.remove(); - } + closeLock.writeLock().lock(); + try { + // we are holding a write lock so nobody adds to the connectedNodes / openConnections map - it's safe to first close + // all instances and then clear them maps + Iterator> iterator = connectedNodes.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry next = iterator.next(); + try { + IOUtils.closeWhileHandlingException(next.getValue()); + } finally { + iterator.remove(); } - } finally { - closeLock.writeLock().unlock(); - latch.countDown(); } - }); - - try { - latch.await(30, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - // ignore + } finally { + closeLock.writeLock().unlock(); } } } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java index ff06e59d4f729..1bf47d1a42f94 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.settings.SettingUpgrader; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -283,21 +284,38 @@ protected Map> groupClusterIndices(Set remoteCluste return perClusterIndices; } + void updateRemoteCluster(String clusterAlias, List addresses, String proxy) { + Boolean compress = TransportSettings.TRANSPORT_COMPRESS.get(settings); + TimeValue pingSchedule = TransportSettings.PING_SCHEDULE.get(settings); + updateRemoteCluster(clusterAlias, addresses, proxy, compress, pingSchedule); + } + + void updateRemoteCluster(String clusterAlias, Settings settings) { + String proxy = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(settings); + List addresses = REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings); + Boolean compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings); + TimeValue pingSchedule = RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE + .getConcreteSettingForNamespace(clusterAlias) + .get(settings); + + updateRemoteCluster(clusterAlias, addresses, proxy, compress, pingSchedule); + } + /** * Subclasses must implement this to receive information about updated cluster aliases. If the given address list is * empty the cluster alias is unregistered and should be removed. */ - protected abstract void updateRemoteCluster(String clusterAlias, List addresses, String proxy); + protected abstract void updateRemoteCluster(String clusterAlias, List addresses, String proxy, boolean compressionEnabled, + TimeValue pingSchedule); /** * Registers this instance to listen to updates on the cluster settings. */ public void listenForUpdates(ClusterSettings clusterSettings) { - clusterSettings.addAffixUpdateConsumer( - RemoteClusterAware.REMOTE_CLUSTERS_PROXY, - RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, - (key, value) -> updateRemoteCluster(key, value.v2(), value.v1()), - (namespace, value) -> {}); + List> remoteClusterSettings = Arrays.asList(RemoteClusterAware.REMOTE_CLUSTERS_PROXY, + RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, RemoteClusterService.REMOTE_CLUSTER_COMPRESS, + RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE); + clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::updateRemoteCluster); clusterSettings.addAffixUpdateConsumer( RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_PROXY, RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_SEEDS, diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 57820a8ca48a9..f4a1b250e7f5e 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -64,9 +64,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; -import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_COMPRESS; -import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE; - /** * Represents a connection to a single remote cluster. In contrast to a local cluster a remote cluster is not joined such that the * current node is part of the cluster and it won't receive cluster state updates from the remote cluster. Remote clusters are also not @@ -107,12 +104,13 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos * @param maxNumRemoteConnections the maximum number of connections to the remote cluster * @param nodePredicate a predicate to filter eligible remote nodes to connect to * @param proxyAddress the proxy address + * @param connectionProfile the connection profile to use */ RemoteClusterConnection(Settings settings, String clusterAlias, List>> seedNodes, TransportService transportService, int maxNumRemoteConnections, Predicate nodePredicate, - String proxyAddress) { + String proxyAddress, ConnectionProfile connectionProfile) { this(settings, clusterAlias, seedNodes, transportService, maxNumRemoteConnections, nodePredicate, proxyAddress, - createConnectionManager(settings, clusterAlias, transportService)); + createConnectionManager(connectionProfile, transportService)); } // Public for tests to pass a StubbableConnectionManager @@ -309,13 +307,23 @@ Transport.Connection getConnection() { @Override public void close() throws IOException { - IOUtils.close(connectHandler, connectionManager); + IOUtils.close(connectHandler); + // In the ConnectionManager we wait on connections being closed. + threadPool.generic().execute(connectionManager::close); } public boolean isClosed() { return connectHandler.isClosed(); } + public String getProxyAddress() { + return proxyAddress; + } + + public List>> getSeedNodes() { + return seedNodes; + } + /** * The connect handler manages node discovery and the actual connect to the remote cluster. * There is at most one connect job running at any time. If such a connect job is triggered @@ -697,18 +705,8 @@ private synchronized void ensureIteratorAvailable() { } } - private static ConnectionManager createConnectionManager(Settings settings, String clusterAlias, TransportService transportService) { - ConnectionProfile.Builder builder = new ConnectionProfile.Builder() - .setConnectTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings)) - .setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings)) - .addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING) // TODO make this configurable? - // we don't want this to be used for anything else but search - .addConnections(0, TransportRequestOptions.Type.BULK, - TransportRequestOptions.Type.STATE, - TransportRequestOptions.Type.RECOVERY) - .setCompressionEnabled(REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings)) - .setPingInterval(REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterAlias).get(settings)); - return new ConnectionManager(builder.build(), transportService.transport, transportService.threadPool); + private static ConnectionManager createConnectionManager(ConnectionProfile connectionProfile, TransportService transportService) { + return new ConnectionManager(connectionProfile, transportService.transport); } ConnectionManager getConnectionManager() { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index a126337aa0e54..fab7db20a3322 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -47,6 +47,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -67,6 +68,8 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl private static final Logger logger = LogManager.getLogger(RemoteClusterService.class); + private static final ActionListener noopListener = ActionListener.wrap((x) -> {}, (x) -> {}); + static { // remove search.remote.* settings in 8.0.0 // TODO @@ -186,6 +189,7 @@ public String getKey(final String key) { private final TransportService transportService; private final int numRemoteConnections; private volatile Map remoteClusters = Collections.emptyMap(); + private volatile Map remoteClusterConnectionProfiles = Collections.emptyMap(); RemoteClusterService(Settings settings, TransportService transportService) { super(settings); @@ -213,21 +217,33 @@ private synchronized void updateRemoteClusters(Map>> seedList = entry.getValue().v2(); String proxyAddress = entry.getValue().v1(); - RemoteClusterConnection remote = this.remoteClusters.get(entry.getKey()); + String clusterAlias = entry.getKey(); + RemoteClusterConnection remote = this.remoteClusters.get(clusterAlias); + ConnectionProfile connectionProfile = this.remoteClusterConnectionProfiles.get(clusterAlias); if (seedList.isEmpty()) { // with no seed nodes we just remove the connection try { IOUtils.close(remote); } catch (IOException e) { - logger.warn("failed to close remote cluster connections for cluster: " + entry.getKey(), e); + logger.warn("failed to close remote cluster connections for cluster: " + clusterAlias, e); } - remoteClusters.remove(entry.getKey()); + remoteClusters.remove(clusterAlias); continue; } if (remote == null) { // this is a new cluster we have to add a new representation - String clusterAlias = entry.getKey(); remote = new RemoteClusterConnection(settings, clusterAlias, seedList, transportService, numRemoteConnections, - getNodePredicate(settings), proxyAddress); + getNodePredicate(settings), proxyAddress, connectionProfile); + remoteClusters.put(clusterAlias, remote); + } else if (connectionProfileChanged(remote.getConnectionManager().getConnectionProfile(), connectionProfile)) { + // New ConnectionProfile. Must tear down existing connection + try { + IOUtils.close(remote); + } catch (IOException e) { + logger.warn("failed to close remote cluster connections for cluster: " + clusterAlias, e); + } + remoteClusters.remove(clusterAlias); + remote = new RemoteClusterConnection(settings, clusterAlias, seedList, transportService, numRemoteConnections, + getNodePredicate(settings), proxyAddress, connectionProfile); remoteClusters.put(clusterAlias, remote); } @@ -244,7 +260,7 @@ private synchronized void updateRemoteClusters(Map addresses, String proxyAddress) { - updateRemoteCluster(clusterAlias, addresses, proxyAddress, ActionListener.wrap((x) -> {}, (x) -> {})); + protected void updateRemoteCluster(String clusterAlias, List addresses, String proxyAddress, boolean compressionEnabled, + TimeValue pingSchedule) { + if (LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) { + throw new IllegalArgumentException("remote clusters must not have the empty string as its key"); + } + ConnectionProfile oldProfile = remoteClusterConnectionProfiles.get(clusterAlias); + ConnectionProfile newProfile; + if (oldProfile != null) { + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(oldProfile); + builder.setCompressionEnabled(compressionEnabled); + builder.setPingInterval(pingSchedule); + newProfile = builder.build(); + } else { + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(buildConnectionProfileFromSettings(clusterAlias)); + builder.setCompressionEnabled(compressionEnabled); + builder.setPingInterval(pingSchedule); + newProfile = builder.build(); + } + updateRemoteCluster(clusterAlias, addresses, proxyAddress, newProfile, noopListener); } - void updateRemoteCluster( - final String clusterAlias, - final List addresses, - final String proxyAddress, - final ActionListener connectionListener) { + void updateRemoteCluster(final String clusterAlias, final List addresses, final String proxyAddress, + final ConnectionProfile connectionProfile, final ActionListener connectionListener) { + HashMap connectionProfiles = new HashMap<>(remoteClusterConnectionProfiles); + connectionProfiles.put(clusterAlias, connectionProfile); + this.remoteClusterConnectionProfiles = Collections.unmodifiableMap(connectionProfiles); final List>> nodes = - addresses.stream().>>map(address -> Tuple.tuple(address, () -> - buildSeedNode(clusterAlias, address, Strings.hasLength(proxyAddress))) - ).collect(Collectors.toList()); + addresses.stream().>>map(address -> Tuple.tuple(address, () -> + buildSeedNode(clusterAlias, address, Strings.hasLength(proxyAddress))) + ).collect(Collectors.toList()); updateRemoteClusters(Collections.singletonMap(clusterAlias, new Tuple<>(proxyAddress, nodes)), connectionListener); } @@ -387,6 +420,7 @@ void initializeRemoteClusters() { final PlainActionFuture future = new PlainActionFuture<>(); Map>>>> seeds = RemoteClusterAware.buildRemoteClustersDynamicConfig(settings); + initializeConnectionProfiles(seeds.keySet()); updateRemoteClusters(seeds, future); try { future.get(timeValue.millis(), TimeUnit.MILLISECONDS); @@ -399,6 +433,32 @@ void initializeRemoteClusters() { } } + private synchronized void initializeConnectionProfiles(Set remoteClusters) { + Map connectionProfiles = new HashMap<>(remoteClusters.size()); + for (String clusterName : remoteClusters) { + connectionProfiles.put(clusterName, buildConnectionProfileFromSettings(clusterName)); + } + this.remoteClusterConnectionProfiles = Collections.unmodifiableMap(connectionProfiles); + } + + private ConnectionProfile buildConnectionProfileFromSettings(String clusterName) { + return buildConnectionProfileFromSettings(settings, clusterName); + } + + static ConnectionProfile buildConnectionProfileFromSettings(Settings settings, String clusterName) { + return new ConnectionProfile.Builder() + .setConnectTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings)) + .setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings)) + .addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING) // TODO make this configurable? + // we don't want this to be used for anything else but search + .addConnections(0, TransportRequestOptions.Type.BULK, + TransportRequestOptions.Type.STATE, + TransportRequestOptions.Type.RECOVERY) + .setCompressionEnabled(REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterName).get(settings)) + .setPingInterval(REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterName).get(settings)) + .build(); + } + @Override public void close() throws IOException { IOUtils.close(remoteClusters.values()); @@ -408,6 +468,11 @@ public Stream getRemoteConnectionInfos() { return remoteClusters.values().stream().map(RemoteClusterConnection::getConnectionInfo); } + private boolean connectionProfileChanged(ConnectionProfile oldProfile, ConnectionProfile newProfile) { + return Objects.equals(oldProfile.getCompressionEnabled(), newProfile.getCompressionEnabled()) == false + || Objects.equals(oldProfile.getPingInterval(), newProfile.getPingInterval()) == false; + } + /** * Collects all nodes of the given clusters and returns / passes a (clusterAlias, nodeId) to {@link DiscoveryNode} * function on success. diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 3ea15bba43a84..48ef529f9c3e3 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -149,7 +149,7 @@ public TransportService(Settings settings, Transport transport, ThreadPool threa Function localNodeFactory, @Nullable ClusterSettings clusterSettings, Set taskHeaders) { this(settings, transport, threadPool, transportInterceptor, localNodeFactory, clusterSettings, taskHeaders, - new ConnectionManager(settings, transport, threadPool)); + new ConnectionManager(settings, transport)); } public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor, diff --git a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java index 78a2f2446c5dc..5ffe242dfb208 100644 --- a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java @@ -211,7 +211,7 @@ public void setup() { localNode = newDiscoveryNode("local-node"); ConnectionManager innerConnectionManager - = new ConnectionManager(settings, capturingTransport, deterministicTaskQueue.getThreadPool()); + = new ConnectionManager(settings, capturingTransport); StubbableConnectionManager connectionManager = new StubbableConnectionManager(innerConnectionManager, settings, capturingTransport, deterministicTaskQueue.getThreadPool()); connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> { diff --git a/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java b/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java index 578521190e2ff..c1dd512e0232d 100644 --- a/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java @@ -55,7 +55,7 @@ public void createConnectionManager() { .build(); threadPool = new ThreadPool(settings); transport = mock(Transport.class); - connectionManager = new ConnectionManager(settings, transport, threadPool); + connectionManager = new ConnectionManager(settings, transport); TimeValue oneSecond = new TimeValue(1000); TimeValue oneMinute = TimeValue.timeValueMinutes(1); connectionProfile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, oneSecond, oneSecond, diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 9eddac80a17c0..5bc683d6fd708 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -104,6 +104,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + private final ConnectionProfile profile = RemoteClusterService.buildConnectionProfileFromSettings(Settings.EMPTY, "cluster"); @Override public void tearDown() throws Exception { @@ -191,7 +192,8 @@ public void testRemoteProfileIsUsedForLocalCluster() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null)) { + Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null, + profile)) { ConnectionManager connectionManager = connection.getConnectionManager(); updateSeedNodes(connection, seedNodes(seedNode)); assertTrue(connectionManager.nodeConnected(seedNode)); @@ -233,7 +235,8 @@ public void testRemoteProfileIsUsedForRemoteCluster() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null)) { + Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null, + profile)) { ConnectionManager connectionManager = connection.getConnectionManager(); updateSeedNodes(connection, seedNodes(seedNode)); assertTrue(connectionManager.nodeConnected(seedNode)); @@ -286,7 +289,8 @@ public void testDiscoverSingleNode() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null)) { + Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null, + profile)) { ConnectionManager connectionManager = connection.getConnectionManager(); updateSeedNodes(connection, seedNodes(seedNode)); assertTrue(connectionManager.nodeConnected(seedNode)); @@ -318,7 +322,7 @@ public void testDiscoverSingleNodeWithIncompatibleSeed() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes, service, Integer.MAX_VALUE, n -> true, null, profile)) { ConnectionManager connectionManager = connection.getConnectionManager(); updateSeedNodes(connection, seedNodes); assertTrue(connectionManager.nodeConnected(seedNode)); @@ -346,7 +350,7 @@ public void testNodeDisconnected() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null, profile)) { ConnectionManager connectionManager = connection.getConnectionManager(); updateSeedNodes(connection, seedNodes(seedNode)); assertTrue(connectionManager.nodeConnected(seedNode)); @@ -396,7 +400,7 @@ public void testFilterDiscoveredNodes() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes(seedNode), service, Integer.MAX_VALUE, n -> n.equals(rejectedNode) == false, null)) { + seedNodes(seedNode), service, Integer.MAX_VALUE, n -> n.equals(rejectedNode) == false, null, profile)) { ConnectionManager connectionManager = connection.getConnectionManager(); updateSeedNodes(connection, seedNodes(seedNode)); if (rejectedNode.equals(seedNode)) { @@ -461,7 +465,8 @@ public void testConnectWithIncompatibleTransports() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null)) { + Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null, + profile)) { ConnectionManager connectionManager = connection.getConnectionManager(); expectThrows( Exception.class, @@ -502,7 +507,7 @@ public void sendRequest(long requestId, String action, TransportRequest request, } }; - ConnectionManager delegate = new ConnectionManager(Settings.EMPTY, service.transport, threadPool); + ConnectionManager delegate = new ConnectionManager(Settings.EMPTY, service.transport); StubbableConnectionManager connectionManager = new StubbableConnectionManager(delegate, Settings.EMPTY, service.transport, threadPool); @@ -559,7 +564,7 @@ public void run() { CountDownLatch listenerCalled = new CountDownLatch(1); AtomicReference exceptionReference = new AtomicReference<>(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null, profile)) { ActionListener listener = ActionListener.wrap(x -> { listenerCalled.countDown(); fail("expected exception"); @@ -614,7 +619,7 @@ public void testTriggerUpdatesConcurrently() throws IOException, InterruptedExce service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes, service, Integer.MAX_VALUE, n -> true, null, profile)) { ConnectionManager connectionManager = connection.getConnectionManager(); int numThreads = randomIntBetween(4, 10); Thread[] threads = new Thread[numThreads]; @@ -694,7 +699,7 @@ public void testCloseWhileConcurrentlyConnecting() throws IOException, Interrupt service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes, service, Integer.MAX_VALUE, n -> true, null, profile)) { int numThreads = randomIntBetween(4, 10); Thread[] threads = new Thread[numThreads]; CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); @@ -782,7 +787,7 @@ public void testGetConnectionInfo() throws Exception { service.acceptIncomingRequests(); int maxNumConnections = randomIntBetween(1, 5); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, maxNumConnections, n -> true, null)) { + seedNodes, service, maxNumConnections, n -> true, null, profile)) { // test no nodes connected RemoteConnectionInfo remoteConnectionInfo = assertSerialization(connection.getConnectionInfo()); assertNotNull(remoteConnectionInfo); @@ -914,7 +919,7 @@ public void testEnsureConnected() throws IOException, InterruptedException { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null, profile)) { ConnectionManager connectionManager = connection.getConnectionManager(); assertFalse(connectionManager.nodeConnected(seedNode)); assertFalse(connectionManager.nodeConnected(discoverableNode)); @@ -964,7 +969,7 @@ public void testCollectNodes() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null, profile)) { if (randomBoolean()) { updateSeedNodes(connection, seedNodes(seedNode)); } @@ -1012,7 +1017,7 @@ public void testConnectedNodesConcurrentAccess() throws IOException, Interrupted service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes, service, Integer.MAX_VALUE, n -> true, null, profile)) { final int numGetThreads = randomIntBetween(4, 10); final Thread[] getThreads = new Thread[numGetThreads]; final int numModifyingThreads = randomIntBetween(4, 10); @@ -1100,7 +1105,7 @@ public void testClusterNameIsChecked() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null, profile)) { ConnectionManager connectionManager = connection.getConnectionManager(); updateSeedNodes(connection, seedNodes(seedNode)); assertTrue(connectionManager.nodeConnected(seedNode)); @@ -1156,7 +1161,7 @@ public void sendRequest(long requestId, String action, TransportRequest request, } }; - ConnectionManager delegate = new ConnectionManager(Settings.EMPTY, service.transport, threadPool); + ConnectionManager delegate = new ConnectionManager(Settings.EMPTY, service.transport); StubbableConnectionManager connectionManager = new StubbableConnectionManager(delegate, Settings.EMPTY, service.transport, threadPool); @@ -1214,7 +1219,7 @@ public void testLazyResolveTransportAddress() throws Exception { return seedNode; }); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedSupplier), service, Integer.MAX_VALUE, n -> true, null)) { + Arrays.asList(seedSupplier), service, Integer.MAX_VALUE, n -> true, null, profile)) { updateSeedNodes(connection, Arrays.asList(seedSupplier)); // Closing connections leads to RemoteClusterConnection.ConnectHandler.collectRemoteNodes // being called again so we try to resolve the same seed node's host twice @@ -1246,7 +1251,7 @@ public void testProxyMode() throws Exception { RemoteClusterAware.buildSeedNode("some-remote-cluster", "node_0:" + randomIntBetween(1, 10000), true)); assertEquals("node_0", seedSupplier.v2().get().getAttributes().get("server_name")); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedSupplier), service, Integer.MAX_VALUE, n -> true, proxyAddress)) { + Arrays.asList(seedSupplier), service, Integer.MAX_VALUE, n -> true, proxyAddress, profile)) { updateSeedNodes(connection, Arrays.asList(seedSupplier), proxyAddress); assertEquals(2, connection.getNumNodesConnected()); assertNotNull(connection.getConnection(discoverableTransport.getLocalDiscoNode())); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 3ef1e4df02bfb..25c8a5fac7299 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -400,11 +400,7 @@ public void testCustomPingSchedule() throws IOException { TimeValue.timeValueSeconds(randomIntBetween(1, 10)); builder.put("cluster.remote.cluster_2.transport.ping_schedule", pingSchedule2); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { - assertFalse(service.isCrossClusterSearchEnabled()); service.initializeRemoteClusters(); - assertTrue(service.isCrossClusterSearchEnabled()); - service.updateRemoteCluster("cluster_1", Collections.singletonList(cluster1Seed.getAddress().toString()), null); - assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); RemoteClusterConnection remoteClusterConnection1 = service.getRemoteClusterConnection("cluster_1"); assertEquals(pingSchedule1, remoteClusterConnection1.getConnectionManager().getConnectionProfile().getPingInterval()); @@ -415,6 +411,40 @@ public void testCustomPingSchedule() throws IOException { } } + public void testChangeSettings() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT)) { + DiscoveryNode cluster1Seed = cluster1Transport.getLocalDiscoNode(); + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, + threadPool, null)) { + transportService.start(); + transportService.acceptIncomingRequests(); + Settings.Builder builder = Settings.builder(); + builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); + try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { + service.initializeRemoteClusters(); + RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); + Settings.Builder settingsChange = Settings.builder(); + TimeValue pingSchedule = TimeValue.timeValueSeconds(randomIntBetween(6, 8)); + settingsChange.put("cluster.remote.cluster_1.transport.ping_schedule", pingSchedule); + boolean compressionEnabled = true; + settingsChange.put("cluster.remote.cluster_1.transport.compress", compressionEnabled); + settingsChange.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); + service.updateRemoteCluster("cluster_1", settingsChange.build()); + assertBusy(remoteClusterConnection::isClosed); + + remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); + ConnectionProfile connectionProfile = remoteClusterConnection.getConnectionManager().getConnectionProfile(); + assertEquals(pingSchedule, connectionProfile.getPingInterval()); + assertEquals(compressionEnabled, connectionProfile.getCompressionEnabled()); + } + } + } + } + public void testRemoteNodeAttribute() throws IOException, InterruptedException { final Settings settings = Settings.builder().put("cluster.remote.node.attr", "gateway").build(); @@ -460,14 +490,14 @@ public void testRemoteNodeAttribute() throws IOException, InterruptedException { service.updateRemoteCluster( "cluster_1", Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), null, - connectionListener(firstLatch)); + genericProfile("cluster_1"), connectionListener(firstLatch)); firstLatch.await(); final CountDownLatch secondLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_2", Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), null, - connectionListener(secondLatch)); + genericProfile("cluster_2"), connectionListener(secondLatch)); secondLatch.await(); assertTrue(service.isCrossClusterSearchEnabled()); @@ -525,14 +555,14 @@ public void testRemoteNodeRoles() throws IOException, InterruptedException { service.updateRemoteCluster( "cluster_1", Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), null, - connectionListener(firstLatch)); + genericProfile("cluster_1"), connectionListener(firstLatch)); firstLatch.await(); final CountDownLatch secondLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_2", Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), null, - connectionListener(secondLatch)); + genericProfile("cluster_2"), connectionListener(secondLatch)); secondLatch.await(); assertTrue(service.isCrossClusterSearchEnabled()); @@ -595,17 +625,17 @@ public void testCollectNodes() throws InterruptedException, IOException { assertFalse(service.isCrossClusterSearchEnabled()); final CountDownLatch firstLatch = new CountDownLatch(1); - service.updateRemoteCluster( - "cluster_1", + + service.updateRemoteCluster("cluster_1", Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), null, - connectionListener(firstLatch)); + genericProfile("cluster_1"), connectionListener(firstLatch)); firstLatch.await(); final CountDownLatch secondLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_2", Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), null, - connectionListener(secondLatch)); + genericProfile("cluster_2"), connectionListener(secondLatch)); secondLatch.await(); CountDownLatch latch = new CountDownLatch(1); service.collectNodes(new HashSet<>(Arrays.asList("cluster_1", "cluster_2")), @@ -911,7 +941,7 @@ private static void updateRemoteCluster(RemoteClusterService service, String clu exceptionAtomicReference.set(x); latch.countDown(); }); - service.updateRemoteCluster(clusterAlias, addresses, proxyAddress, listener); + service.updateRemoteCluster(clusterAlias, addresses, proxyAddress, genericProfile(clusterAlias), listener); latch.await(); if (exceptionAtomicReference.get() != null) { throw exceptionAtomicReference.get(); @@ -953,4 +983,8 @@ public void testSkipUnavailable() { } } } + + private static ConnectionProfile genericProfile(String clusterName) { + return RemoteClusterService.buildConnectionProfileFromSettings(Settings.EMPTY, clusterName); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java index a6dbd1561936e..e39f5d03cba07 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java @@ -80,7 +80,7 @@ public class MockTransport implements Transport, LifecycleComponent { public TransportService createTransportService(Settings settings, ThreadPool threadPool, TransportInterceptor interceptor, Function localNodeFactory, @Nullable ClusterSettings clusterSettings, Set taskHeaders) { - StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ConnectionManager(settings, this, threadPool), + StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ConnectionManager(settings, this), settings, this, threadPool); connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> nodeConnected(discoveryNode)); connectionManager.setDefaultGetConnectionBehavior((cm, discoveryNode) -> createConnection(discoveryNode)); diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index e3d7e72a0bb97..4b998f04a568a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -157,7 +157,7 @@ private MockTransportService(Settings settings, StubbableTransport transport, Th Function localNodeFactory, @Nullable ClusterSettings clusterSettings, Set taskHeaders) { super(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, - new StubbableConnectionManager(new ConnectionManager(settings, transport, threadPool), settings, transport, threadPool)); + new StubbableConnectionManager(new ConnectionManager(settings, transport), settings, transport, threadPool)); this.original = transport.getDelegate(); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java index 41ac87f0af576..108e1bf5e24b5 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java @@ -42,7 +42,7 @@ public class StubbableConnectionManager extends ConnectionManager { private volatile NodeConnectedBehavior defaultNodeConnectedBehavior = ConnectionManager::nodeConnected; public StubbableConnectionManager(ConnectionManager delegate, Settings settings, Transport transport, ThreadPool threadPool) { - super(settings, transport, threadPool); + super(settings, transport); this.delegate = delegate; this.getConnectionBehaviors = new ConcurrentHashMap<>(); this.nodeConnectedBehaviors = new ConcurrentHashMap<>(); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java index c1a28b72cf8fe..c241c7a9aa070 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java @@ -12,6 +12,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryAction; import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryRequest; @@ -75,7 +76,8 @@ void init() { } @Override - protected void updateRemoteCluster(String clusterAlias, List addresses, String proxy) { + protected void updateRemoteCluster(String clusterAlias, List addresses, String proxy, boolean compressionEnabled, + TimeValue pingSchedule) { String repositoryName = CcrRepository.NAME_PREFIX + clusterAlias; if (addresses.isEmpty()) { deleteRepository(repositoryName); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index 5a8a7feb34716..e0f71fe45155a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -50,6 +50,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -68,6 +69,8 @@ import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.transport.NoSuchRemoteClusterException; +import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.CcrIntegTestCase; import org.elasticsearch.xpack.ccr.action.ShardFollowTask; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; @@ -1122,6 +1125,69 @@ private void runFallBehindTest( } } + public void testUpdateRemoteConfigsDuringFollowing() throws Exception { + final int numberOfPrimaryShards = randomIntBetween(1, 3); + int numberOfReplicas = between(0, 1); + + final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, numberOfReplicas, + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderYellow("index1"); + + final int firstBatchNumDocs = randomIntBetween(200, 800); + + logger.info("Executing put follow"); + final PutFollowAction.Request followRequest = putFollow("index1", "index2"); + PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + assertTrue(response.isFollowIndexCreated()); + assertTrue(response.isFollowIndexShardsAcked()); + assertTrue(response.isIndexFollowingStarted()); + + logger.info("Indexing [{}] docs while updateing remote config", firstBatchNumDocs); + try (BackgroundIndexer indexer = new BackgroundIndexer("index1", "_doc", leaderClient(), firstBatchNumDocs, + randomIntBetween(1, 5))) { + + ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); + String address = getLeaderCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString(); + Setting compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("leader_cluster"); + Setting> seeds = RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("leader_cluster"); + settingsRequest.persistentSettings(Settings.builder().put(compress.getKey(), true).put(seeds.getKey(), address)); + assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); + + waitForDocs(firstBatchNumDocs, indexer); + indexer.assertNoFailures(); + + final Map firstBatchNumDocsPerShard = new HashMap<>(); + final ShardStats[] firstBatchShardStats = + leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); + for (final ShardStats shardStats : firstBatchShardStats) { + if (shardStats.getShardRouting().primary()) { + long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1; + firstBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value); + } + } + + assertBusy(assertTask(numberOfPrimaryShards, firstBatchNumDocsPerShard)); + + for (String docId : indexer.getIds()) { + assertBusy(() -> { + final GetResponse getResponse = followerClient().prepareGet("index2", "_doc", docId).get(); + assertTrue("Doc with id [" + docId + "] is missing", getResponse.isExists()); + }); + } + + assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfPrimaryShards); + } finally { + ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); + String address = getLeaderCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString(); + Setting compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("leader_cluster"); + Setting> seeds = RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("leader_cluster"); + settingsRequest.persistentSettings(Settings.builder().put(compress.getKey(), compress.getDefault(Settings.EMPTY)) + .put(seeds.getKey(), address)); + assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); + } + } + private long getFollowTaskSettingsVersion(String followerIndex) { long settingsVersion = -1L; for (ShardFollowNodeTaskStatus status : getFollowTaskStatuses(followerIndex)) { diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java index 03c78ed903e81..e5d4609c13fb1 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.protocol.xpack.graph.GraphExploreRequest; import org.elasticsearch.transport.RemoteClusterAware; @@ -438,7 +439,8 @@ private RemoteClusterResolver(Settings settings, ClusterSettings clusterSettings } @Override - protected void updateRemoteCluster(String clusterAlias, List addresses, String proxyAddress) { + protected void updateRemoteCluster(String clusterAlias, List addresses, String proxyAddress, boolean compressionEnabled, + TimeValue pingSchedule) { if (addresses.isEmpty()) { clusters.remove(clusterAlias); } else {