diff --git a/docs/reference/modules/remote-clusters.asciidoc b/docs/reference/modules/remote-clusters.asciidoc index 768eb7d6117bf..aefa750a92eb0 100644 --- a/docs/reference/modules/remote-clusters.asciidoc +++ b/docs/reference/modules/remote-clusters.asciidoc @@ -27,21 +27,16 @@ more _gateway nodes_ and uses them to federate requests to the remote cluster. [float] [[configuring-remote-clusters]] -=== Configuring Remote Clusters +=== Configuring remote clusters -Remote clusters can be specified globally using -<> (which can be updated dynamically), -or local to individual nodes using the `elasticsearch.yml` file. +You can configure remote clusters globally by using +<>, which you can update dynamically. +Alternatively, you can configure them locally on individual nodes by using the `elasticsearch.yml` file. -If a remote cluster is configured via `elasticsearch.yml` only the nodes with -that configuration will be able to connect to the remote cluster. In other -words, functionality that relies on remote cluster requests will have to be -driven specifically from those nodes. Remote clusters set via the -<> will be available on every node -in the cluster. - -The `elasticsearch.yml` config file for a node that connects to remote clusters -needs to list the remote clusters that should be connected to, for instance: +If you specify the settings in `elasticsearch.yml` files, only the nodes with +those settings can connect to the remote cluster. In other words, functionality +that relies on remote cluster requests must be driven specifically from those +nodes. For example: [source,yaml] -------------------------------- @@ -49,17 +44,22 @@ cluster: remote: cluster_one: <1> seeds: 127.0.0.1:9300 - cluster_two: <1> + transport.ping_schedule: 30s <2> + cluster_two: seeds: 127.0.0.1:9301 + transport.compress: true <3> -------------------------------- <1> `cluster_one` and `cluster_two` are arbitrary _cluster aliases_ representing the connection to each cluster. These names are subsequently used to distinguish between local and remote indices. +<2> A keep-alive ping is configured for `cluster_one`. +<3> Compression is explicitly enabled for requests to `cluster_two`. + +For more information about the optional transport settings, see +<>. -The equivalent example using the <> to add remote clusters to all nodes in the cluster would look like the -following: +If you use <>, the remote clusters are available on every node in the cluster. For example: [source,js] -------------------------------- @@ -71,12 +71,14 @@ PUT _cluster/settings "cluster_one": { "seeds": [ "127.0.0.1:9300" - ] + ], + "transport.ping_schedule": "30s" }, "cluster_two": { "seeds": [ "127.0.0.1:9301" - ] + ], + "transport.compress": true }, "cluster_three": { "seeds": [ @@ -92,6 +94,40 @@ PUT _cluster/settings // TEST[setup:host] // TEST[s/127.0.0.1:9300/\${transport_host}/] +You can dynamically update the compression and ping schedule settings. However, +you must re-include seeds in the settings update request. For example: + +[source,js] +-------------------------------- +PUT _cluster/settings +{ + "persistent": { + "cluster": { + "remote": { + "cluster_one": { + "seeds": [ + "127.0.0.1:9300" + ], + "transport.ping_schedule": "60s" + }, + "cluster_two": { + "seeds": [ + "127.0.0.1:9301" + ], + "transport.compress": false + } + } + } + } +} +-------------------------------- +// CONSOLE +// TEST[continued] + +NOTE: When the compression or ping schedule settings change, all the existing +node connections must close and re-open, which can cause in-flight requests to +fail. + A remote cluster can be deleted from the cluster settings by setting its seeds to `null`: diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java index f1067a0c5575f..da86ed076e396 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java @@ -30,7 +30,6 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; import java.io.IOException; @@ -38,8 +37,6 @@ import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -56,19 +53,17 @@ public class ConnectionManager implements Closeable { private final ConcurrentMap connectedNodes = ConcurrentCollections.newConcurrentMap(); private final KeyedLock connectionLock = new KeyedLock<>(); private final Transport transport; - private final ThreadPool threadPool; private final ConnectionProfile defaultProfile; private final AtomicBoolean isClosed = new AtomicBoolean(false); private final ReadWriteLock closeLock = new ReentrantReadWriteLock(); private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener(); - public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) { - this(ConnectionProfile.buildDefaultConnectionProfile(settings), transport, threadPool); + public ConnectionManager(Settings settings, Transport transport) { + this(ConnectionProfile.buildDefaultConnectionProfile(settings), transport); } - public ConnectionManager(ConnectionProfile connectionProfile, Transport transport, ThreadPool threadPool) { + public ConnectionManager(ConnectionProfile connectionProfile, Transport transport) { this.transport = transport; - this.threadPool = threadPool; this.defaultProfile = connectionProfile; } @@ -185,35 +180,23 @@ public int size() { @Override public void close() { + Transports.assertNotTransportThread("Closing ConnectionManager"); if (isClosed.compareAndSet(false, true)) { - CountDownLatch latch = new CountDownLatch(1); - - // TODO: Consider moving all read/write lock (in Transport and this class) to the TransportService - threadPool.generic().execute(() -> { - closeLock.writeLock().lock(); - try { - // we are holding a write lock so nobody adds to the connectedNodes / openConnections map - it's safe to first close - // all instances and then clear them maps - Iterator> iterator = connectedNodes.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry next = iterator.next(); - try { - IOUtils.closeWhileHandlingException(next.getValue()); - } finally { - iterator.remove(); - } + closeLock.writeLock().lock(); + try { + // we are holding a write lock so nobody adds to the connectedNodes / openConnections map - it's safe to first close + // all instances and then clear them maps + Iterator> iterator = connectedNodes.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry next = iterator.next(); + try { + IOUtils.closeWhileHandlingException(next.getValue()); + } finally { + iterator.remove(); } - } finally { - closeLock.writeLock().unlock(); - latch.countDown(); } - }); - - try { - latch.await(30, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - // ignore + } finally { + closeLock.writeLock().unlock(); } } } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java index ff06e59d4f729..1bf47d1a42f94 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.settings.SettingUpgrader; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -283,21 +284,38 @@ protected Map> groupClusterIndices(Set remoteCluste return perClusterIndices; } + void updateRemoteCluster(String clusterAlias, List addresses, String proxy) { + Boolean compress = TransportSettings.TRANSPORT_COMPRESS.get(settings); + TimeValue pingSchedule = TransportSettings.PING_SCHEDULE.get(settings); + updateRemoteCluster(clusterAlias, addresses, proxy, compress, pingSchedule); + } + + void updateRemoteCluster(String clusterAlias, Settings settings) { + String proxy = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(settings); + List addresses = REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings); + Boolean compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings); + TimeValue pingSchedule = RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE + .getConcreteSettingForNamespace(clusterAlias) + .get(settings); + + updateRemoteCluster(clusterAlias, addresses, proxy, compress, pingSchedule); + } + /** * Subclasses must implement this to receive information about updated cluster aliases. If the given address list is * empty the cluster alias is unregistered and should be removed. */ - protected abstract void updateRemoteCluster(String clusterAlias, List addresses, String proxy); + protected abstract void updateRemoteCluster(String clusterAlias, List addresses, String proxy, boolean compressionEnabled, + TimeValue pingSchedule); /** * Registers this instance to listen to updates on the cluster settings. */ public void listenForUpdates(ClusterSettings clusterSettings) { - clusterSettings.addAffixUpdateConsumer( - RemoteClusterAware.REMOTE_CLUSTERS_PROXY, - RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, - (key, value) -> updateRemoteCluster(key, value.v2(), value.v1()), - (namespace, value) -> {}); + List> remoteClusterSettings = Arrays.asList(RemoteClusterAware.REMOTE_CLUSTERS_PROXY, + RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, RemoteClusterService.REMOTE_CLUSTER_COMPRESS, + RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE); + clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::updateRemoteCluster); clusterSettings.addAffixUpdateConsumer( RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_PROXY, RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_SEEDS, diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 57820a8ca48a9..f4a1b250e7f5e 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -64,9 +64,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; -import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_COMPRESS; -import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE; - /** * Represents a connection to a single remote cluster. In contrast to a local cluster a remote cluster is not joined such that the * current node is part of the cluster and it won't receive cluster state updates from the remote cluster. Remote clusters are also not @@ -107,12 +104,13 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos * @param maxNumRemoteConnections the maximum number of connections to the remote cluster * @param nodePredicate a predicate to filter eligible remote nodes to connect to * @param proxyAddress the proxy address + * @param connectionProfile the connection profile to use */ RemoteClusterConnection(Settings settings, String clusterAlias, List>> seedNodes, TransportService transportService, int maxNumRemoteConnections, Predicate nodePredicate, - String proxyAddress) { + String proxyAddress, ConnectionProfile connectionProfile) { this(settings, clusterAlias, seedNodes, transportService, maxNumRemoteConnections, nodePredicate, proxyAddress, - createConnectionManager(settings, clusterAlias, transportService)); + createConnectionManager(connectionProfile, transportService)); } // Public for tests to pass a StubbableConnectionManager @@ -309,13 +307,23 @@ Transport.Connection getConnection() { @Override public void close() throws IOException { - IOUtils.close(connectHandler, connectionManager); + IOUtils.close(connectHandler); + // In the ConnectionManager we wait on connections being closed. + threadPool.generic().execute(connectionManager::close); } public boolean isClosed() { return connectHandler.isClosed(); } + public String getProxyAddress() { + return proxyAddress; + } + + public List>> getSeedNodes() { + return seedNodes; + } + /** * The connect handler manages node discovery and the actual connect to the remote cluster. * There is at most one connect job running at any time. If such a connect job is triggered @@ -697,18 +705,8 @@ private synchronized void ensureIteratorAvailable() { } } - private static ConnectionManager createConnectionManager(Settings settings, String clusterAlias, TransportService transportService) { - ConnectionProfile.Builder builder = new ConnectionProfile.Builder() - .setConnectTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings)) - .setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings)) - .addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING) // TODO make this configurable? - // we don't want this to be used for anything else but search - .addConnections(0, TransportRequestOptions.Type.BULK, - TransportRequestOptions.Type.STATE, - TransportRequestOptions.Type.RECOVERY) - .setCompressionEnabled(REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings)) - .setPingInterval(REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterAlias).get(settings)); - return new ConnectionManager(builder.build(), transportService.transport, transportService.threadPool); + private static ConnectionManager createConnectionManager(ConnectionProfile connectionProfile, TransportService transportService) { + return new ConnectionManager(connectionProfile, transportService.transport); } ConnectionManager getConnectionManager() { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index a126337aa0e54..fab7db20a3322 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -47,6 +47,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -67,6 +68,8 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl private static final Logger logger = LogManager.getLogger(RemoteClusterService.class); + private static final ActionListener noopListener = ActionListener.wrap((x) -> {}, (x) -> {}); + static { // remove search.remote.* settings in 8.0.0 // TODO @@ -186,6 +189,7 @@ public String getKey(final String key) { private final TransportService transportService; private final int numRemoteConnections; private volatile Map remoteClusters = Collections.emptyMap(); + private volatile Map remoteClusterConnectionProfiles = Collections.emptyMap(); RemoteClusterService(Settings settings, TransportService transportService) { super(settings); @@ -213,21 +217,33 @@ private synchronized void updateRemoteClusters(Map>> seedList = entry.getValue().v2(); String proxyAddress = entry.getValue().v1(); - RemoteClusterConnection remote = this.remoteClusters.get(entry.getKey()); + String clusterAlias = entry.getKey(); + RemoteClusterConnection remote = this.remoteClusters.get(clusterAlias); + ConnectionProfile connectionProfile = this.remoteClusterConnectionProfiles.get(clusterAlias); if (seedList.isEmpty()) { // with no seed nodes we just remove the connection try { IOUtils.close(remote); } catch (IOException e) { - logger.warn("failed to close remote cluster connections for cluster: " + entry.getKey(), e); + logger.warn("failed to close remote cluster connections for cluster: " + clusterAlias, e); } - remoteClusters.remove(entry.getKey()); + remoteClusters.remove(clusterAlias); continue; } if (remote == null) { // this is a new cluster we have to add a new representation - String clusterAlias = entry.getKey(); remote = new RemoteClusterConnection(settings, clusterAlias, seedList, transportService, numRemoteConnections, - getNodePredicate(settings), proxyAddress); + getNodePredicate(settings), proxyAddress, connectionProfile); + remoteClusters.put(clusterAlias, remote); + } else if (connectionProfileChanged(remote.getConnectionManager().getConnectionProfile(), connectionProfile)) { + // New ConnectionProfile. Must tear down existing connection + try { + IOUtils.close(remote); + } catch (IOException e) { + logger.warn("failed to close remote cluster connections for cluster: " + clusterAlias, e); + } + remoteClusters.remove(clusterAlias); + remote = new RemoteClusterConnection(settings, clusterAlias, seedList, transportService, numRemoteConnections, + getNodePredicate(settings), proxyAddress, connectionProfile); remoteClusters.put(clusterAlias, remote); } @@ -244,7 +260,7 @@ private synchronized void updateRemoteClusters(Map addresses, String proxyAddress) { - updateRemoteCluster(clusterAlias, addresses, proxyAddress, ActionListener.wrap((x) -> {}, (x) -> {})); + protected void updateRemoteCluster(String clusterAlias, List addresses, String proxyAddress, boolean compressionEnabled, + TimeValue pingSchedule) { + if (LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) { + throw new IllegalArgumentException("remote clusters must not have the empty string as its key"); + } + ConnectionProfile oldProfile = remoteClusterConnectionProfiles.get(clusterAlias); + ConnectionProfile newProfile; + if (oldProfile != null) { + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(oldProfile); + builder.setCompressionEnabled(compressionEnabled); + builder.setPingInterval(pingSchedule); + newProfile = builder.build(); + } else { + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(buildConnectionProfileFromSettings(clusterAlias)); + builder.setCompressionEnabled(compressionEnabled); + builder.setPingInterval(pingSchedule); + newProfile = builder.build(); + } + updateRemoteCluster(clusterAlias, addresses, proxyAddress, newProfile, noopListener); } - void updateRemoteCluster( - final String clusterAlias, - final List addresses, - final String proxyAddress, - final ActionListener connectionListener) { + void updateRemoteCluster(final String clusterAlias, final List addresses, final String proxyAddress, + final ConnectionProfile connectionProfile, final ActionListener connectionListener) { + HashMap connectionProfiles = new HashMap<>(remoteClusterConnectionProfiles); + connectionProfiles.put(clusterAlias, connectionProfile); + this.remoteClusterConnectionProfiles = Collections.unmodifiableMap(connectionProfiles); final List>> nodes = - addresses.stream().>>map(address -> Tuple.tuple(address, () -> - buildSeedNode(clusterAlias, address, Strings.hasLength(proxyAddress))) - ).collect(Collectors.toList()); + addresses.stream().>>map(address -> Tuple.tuple(address, () -> + buildSeedNode(clusterAlias, address, Strings.hasLength(proxyAddress))) + ).collect(Collectors.toList()); updateRemoteClusters(Collections.singletonMap(clusterAlias, new Tuple<>(proxyAddress, nodes)), connectionListener); } @@ -387,6 +420,7 @@ void initializeRemoteClusters() { final PlainActionFuture future = new PlainActionFuture<>(); Map>>>> seeds = RemoteClusterAware.buildRemoteClustersDynamicConfig(settings); + initializeConnectionProfiles(seeds.keySet()); updateRemoteClusters(seeds, future); try { future.get(timeValue.millis(), TimeUnit.MILLISECONDS); @@ -399,6 +433,32 @@ void initializeRemoteClusters() { } } + private synchronized void initializeConnectionProfiles(Set remoteClusters) { + Map connectionProfiles = new HashMap<>(remoteClusters.size()); + for (String clusterName : remoteClusters) { + connectionProfiles.put(clusterName, buildConnectionProfileFromSettings(clusterName)); + } + this.remoteClusterConnectionProfiles = Collections.unmodifiableMap(connectionProfiles); + } + + private ConnectionProfile buildConnectionProfileFromSettings(String clusterName) { + return buildConnectionProfileFromSettings(settings, clusterName); + } + + static ConnectionProfile buildConnectionProfileFromSettings(Settings settings, String clusterName) { + return new ConnectionProfile.Builder() + .setConnectTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings)) + .setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings)) + .addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING) // TODO make this configurable? + // we don't want this to be used for anything else but search + .addConnections(0, TransportRequestOptions.Type.BULK, + TransportRequestOptions.Type.STATE, + TransportRequestOptions.Type.RECOVERY) + .setCompressionEnabled(REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterName).get(settings)) + .setPingInterval(REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterName).get(settings)) + .build(); + } + @Override public void close() throws IOException { IOUtils.close(remoteClusters.values()); @@ -408,6 +468,11 @@ public Stream getRemoteConnectionInfos() { return remoteClusters.values().stream().map(RemoteClusterConnection::getConnectionInfo); } + private boolean connectionProfileChanged(ConnectionProfile oldProfile, ConnectionProfile newProfile) { + return Objects.equals(oldProfile.getCompressionEnabled(), newProfile.getCompressionEnabled()) == false + || Objects.equals(oldProfile.getPingInterval(), newProfile.getPingInterval()) == false; + } + /** * Collects all nodes of the given clusters and returns / passes a (clusterAlias, nodeId) to {@link DiscoveryNode} * function on success. diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 3ea15bba43a84..48ef529f9c3e3 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -149,7 +149,7 @@ public TransportService(Settings settings, Transport transport, ThreadPool threa Function localNodeFactory, @Nullable ClusterSettings clusterSettings, Set taskHeaders) { this(settings, transport, threadPool, transportInterceptor, localNodeFactory, clusterSettings, taskHeaders, - new ConnectionManager(settings, transport, threadPool)); + new ConnectionManager(settings, transport)); } public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor, diff --git a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java index 78a2f2446c5dc..5ffe242dfb208 100644 --- a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java @@ -211,7 +211,7 @@ public void setup() { localNode = newDiscoveryNode("local-node"); ConnectionManager innerConnectionManager - = new ConnectionManager(settings, capturingTransport, deterministicTaskQueue.getThreadPool()); + = new ConnectionManager(settings, capturingTransport); StubbableConnectionManager connectionManager = new StubbableConnectionManager(innerConnectionManager, settings, capturingTransport, deterministicTaskQueue.getThreadPool()); connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> { diff --git a/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java b/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java index 578521190e2ff..c1dd512e0232d 100644 --- a/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java @@ -55,7 +55,7 @@ public void createConnectionManager() { .build(); threadPool = new ThreadPool(settings); transport = mock(Transport.class); - connectionManager = new ConnectionManager(settings, transport, threadPool); + connectionManager = new ConnectionManager(settings, transport); TimeValue oneSecond = new TimeValue(1000); TimeValue oneMinute = TimeValue.timeValueMinutes(1); connectionProfile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, oneSecond, oneSecond, diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 9eddac80a17c0..5bc683d6fd708 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -104,6 +104,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + private final ConnectionProfile profile = RemoteClusterService.buildConnectionProfileFromSettings(Settings.EMPTY, "cluster"); @Override public void tearDown() throws Exception { @@ -191,7 +192,8 @@ public void testRemoteProfileIsUsedForLocalCluster() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null)) { + Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null, + profile)) { ConnectionManager connectionManager = connection.getConnectionManager(); updateSeedNodes(connection, seedNodes(seedNode)); assertTrue(connectionManager.nodeConnected(seedNode)); @@ -233,7 +235,8 @@ public void testRemoteProfileIsUsedForRemoteCluster() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null)) { + Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null, + profile)) { ConnectionManager connectionManager = connection.getConnectionManager(); updateSeedNodes(connection, seedNodes(seedNode)); assertTrue(connectionManager.nodeConnected(seedNode)); @@ -286,7 +289,8 @@ public void testDiscoverSingleNode() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null)) { + Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null, + profile)) { ConnectionManager connectionManager = connection.getConnectionManager(); updateSeedNodes(connection, seedNodes(seedNode)); assertTrue(connectionManager.nodeConnected(seedNode)); @@ -318,7 +322,7 @@ public void testDiscoverSingleNodeWithIncompatibleSeed() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes, service, Integer.MAX_VALUE, n -> true, null, profile)) { ConnectionManager connectionManager = connection.getConnectionManager(); updateSeedNodes(connection, seedNodes); assertTrue(connectionManager.nodeConnected(seedNode)); @@ -346,7 +350,7 @@ public void testNodeDisconnected() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null, profile)) { ConnectionManager connectionManager = connection.getConnectionManager(); updateSeedNodes(connection, seedNodes(seedNode)); assertTrue(connectionManager.nodeConnected(seedNode)); @@ -396,7 +400,7 @@ public void testFilterDiscoveredNodes() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes(seedNode), service, Integer.MAX_VALUE, n -> n.equals(rejectedNode) == false, null)) { + seedNodes(seedNode), service, Integer.MAX_VALUE, n -> n.equals(rejectedNode) == false, null, profile)) { ConnectionManager connectionManager = connection.getConnectionManager(); updateSeedNodes(connection, seedNodes(seedNode)); if (rejectedNode.equals(seedNode)) { @@ -461,7 +465,8 @@ public void testConnectWithIncompatibleTransports() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null)) { + Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null, + profile)) { ConnectionManager connectionManager = connection.getConnectionManager(); expectThrows( Exception.class, @@ -502,7 +507,7 @@ public void sendRequest(long requestId, String action, TransportRequest request, } }; - ConnectionManager delegate = new ConnectionManager(Settings.EMPTY, service.transport, threadPool); + ConnectionManager delegate = new ConnectionManager(Settings.EMPTY, service.transport); StubbableConnectionManager connectionManager = new StubbableConnectionManager(delegate, Settings.EMPTY, service.transport, threadPool); @@ -559,7 +564,7 @@ public void run() { CountDownLatch listenerCalled = new CountDownLatch(1); AtomicReference exceptionReference = new AtomicReference<>(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null, profile)) { ActionListener listener = ActionListener.wrap(x -> { listenerCalled.countDown(); fail("expected exception"); @@ -614,7 +619,7 @@ public void testTriggerUpdatesConcurrently() throws IOException, InterruptedExce service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes, service, Integer.MAX_VALUE, n -> true, null, profile)) { ConnectionManager connectionManager = connection.getConnectionManager(); int numThreads = randomIntBetween(4, 10); Thread[] threads = new Thread[numThreads]; @@ -694,7 +699,7 @@ public void testCloseWhileConcurrentlyConnecting() throws IOException, Interrupt service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes, service, Integer.MAX_VALUE, n -> true, null, profile)) { int numThreads = randomIntBetween(4, 10); Thread[] threads = new Thread[numThreads]; CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); @@ -782,7 +787,7 @@ public void testGetConnectionInfo() throws Exception { service.acceptIncomingRequests(); int maxNumConnections = randomIntBetween(1, 5); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, maxNumConnections, n -> true, null)) { + seedNodes, service, maxNumConnections, n -> true, null, profile)) { // test no nodes connected RemoteConnectionInfo remoteConnectionInfo = assertSerialization(connection.getConnectionInfo()); assertNotNull(remoteConnectionInfo); @@ -914,7 +919,7 @@ public void testEnsureConnected() throws IOException, InterruptedException { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null, profile)) { ConnectionManager connectionManager = connection.getConnectionManager(); assertFalse(connectionManager.nodeConnected(seedNode)); assertFalse(connectionManager.nodeConnected(discoverableNode)); @@ -964,7 +969,7 @@ public void testCollectNodes() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null, profile)) { if (randomBoolean()) { updateSeedNodes(connection, seedNodes(seedNode)); } @@ -1012,7 +1017,7 @@ public void testConnectedNodesConcurrentAccess() throws IOException, Interrupted service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes, service, Integer.MAX_VALUE, n -> true, null, profile)) { final int numGetThreads = randomIntBetween(4, 10); final Thread[] getThreads = new Thread[numGetThreads]; final int numModifyingThreads = randomIntBetween(4, 10); @@ -1100,7 +1105,7 @@ public void testClusterNameIsChecked() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null, profile)) { ConnectionManager connectionManager = connection.getConnectionManager(); updateSeedNodes(connection, seedNodes(seedNode)); assertTrue(connectionManager.nodeConnected(seedNode)); @@ -1156,7 +1161,7 @@ public void sendRequest(long requestId, String action, TransportRequest request, } }; - ConnectionManager delegate = new ConnectionManager(Settings.EMPTY, service.transport, threadPool); + ConnectionManager delegate = new ConnectionManager(Settings.EMPTY, service.transport); StubbableConnectionManager connectionManager = new StubbableConnectionManager(delegate, Settings.EMPTY, service.transport, threadPool); @@ -1214,7 +1219,7 @@ public void testLazyResolveTransportAddress() throws Exception { return seedNode; }); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedSupplier), service, Integer.MAX_VALUE, n -> true, null)) { + Arrays.asList(seedSupplier), service, Integer.MAX_VALUE, n -> true, null, profile)) { updateSeedNodes(connection, Arrays.asList(seedSupplier)); // Closing connections leads to RemoteClusterConnection.ConnectHandler.collectRemoteNodes // being called again so we try to resolve the same seed node's host twice @@ -1246,7 +1251,7 @@ public void testProxyMode() throws Exception { RemoteClusterAware.buildSeedNode("some-remote-cluster", "node_0:" + randomIntBetween(1, 10000), true)); assertEquals("node_0", seedSupplier.v2().get().getAttributes().get("server_name")); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedSupplier), service, Integer.MAX_VALUE, n -> true, proxyAddress)) { + Arrays.asList(seedSupplier), service, Integer.MAX_VALUE, n -> true, proxyAddress, profile)) { updateSeedNodes(connection, Arrays.asList(seedSupplier), proxyAddress); assertEquals(2, connection.getNumNodesConnected()); assertNotNull(connection.getConnection(discoverableTransport.getLocalDiscoNode())); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 3ef1e4df02bfb..25c8a5fac7299 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -400,11 +400,7 @@ public void testCustomPingSchedule() throws IOException { TimeValue.timeValueSeconds(randomIntBetween(1, 10)); builder.put("cluster.remote.cluster_2.transport.ping_schedule", pingSchedule2); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { - assertFalse(service.isCrossClusterSearchEnabled()); service.initializeRemoteClusters(); - assertTrue(service.isCrossClusterSearchEnabled()); - service.updateRemoteCluster("cluster_1", Collections.singletonList(cluster1Seed.getAddress().toString()), null); - assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); RemoteClusterConnection remoteClusterConnection1 = service.getRemoteClusterConnection("cluster_1"); assertEquals(pingSchedule1, remoteClusterConnection1.getConnectionManager().getConnectionProfile().getPingInterval()); @@ -415,6 +411,40 @@ public void testCustomPingSchedule() throws IOException { } } + public void testChangeSettings() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT)) { + DiscoveryNode cluster1Seed = cluster1Transport.getLocalDiscoNode(); + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, + threadPool, null)) { + transportService.start(); + transportService.acceptIncomingRequests(); + Settings.Builder builder = Settings.builder(); + builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); + try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { + service.initializeRemoteClusters(); + RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); + Settings.Builder settingsChange = Settings.builder(); + TimeValue pingSchedule = TimeValue.timeValueSeconds(randomIntBetween(6, 8)); + settingsChange.put("cluster.remote.cluster_1.transport.ping_schedule", pingSchedule); + boolean compressionEnabled = true; + settingsChange.put("cluster.remote.cluster_1.transport.compress", compressionEnabled); + settingsChange.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); + service.updateRemoteCluster("cluster_1", settingsChange.build()); + assertBusy(remoteClusterConnection::isClosed); + + remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); + ConnectionProfile connectionProfile = remoteClusterConnection.getConnectionManager().getConnectionProfile(); + assertEquals(pingSchedule, connectionProfile.getPingInterval()); + assertEquals(compressionEnabled, connectionProfile.getCompressionEnabled()); + } + } + } + } + public void testRemoteNodeAttribute() throws IOException, InterruptedException { final Settings settings = Settings.builder().put("cluster.remote.node.attr", "gateway").build(); @@ -460,14 +490,14 @@ public void testRemoteNodeAttribute() throws IOException, InterruptedException { service.updateRemoteCluster( "cluster_1", Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), null, - connectionListener(firstLatch)); + genericProfile("cluster_1"), connectionListener(firstLatch)); firstLatch.await(); final CountDownLatch secondLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_2", Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), null, - connectionListener(secondLatch)); + genericProfile("cluster_2"), connectionListener(secondLatch)); secondLatch.await(); assertTrue(service.isCrossClusterSearchEnabled()); @@ -525,14 +555,14 @@ public void testRemoteNodeRoles() throws IOException, InterruptedException { service.updateRemoteCluster( "cluster_1", Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), null, - connectionListener(firstLatch)); + genericProfile("cluster_1"), connectionListener(firstLatch)); firstLatch.await(); final CountDownLatch secondLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_2", Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), null, - connectionListener(secondLatch)); + genericProfile("cluster_2"), connectionListener(secondLatch)); secondLatch.await(); assertTrue(service.isCrossClusterSearchEnabled()); @@ -595,17 +625,17 @@ public void testCollectNodes() throws InterruptedException, IOException { assertFalse(service.isCrossClusterSearchEnabled()); final CountDownLatch firstLatch = new CountDownLatch(1); - service.updateRemoteCluster( - "cluster_1", + + service.updateRemoteCluster("cluster_1", Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), null, - connectionListener(firstLatch)); + genericProfile("cluster_1"), connectionListener(firstLatch)); firstLatch.await(); final CountDownLatch secondLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_2", Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), null, - connectionListener(secondLatch)); + genericProfile("cluster_2"), connectionListener(secondLatch)); secondLatch.await(); CountDownLatch latch = new CountDownLatch(1); service.collectNodes(new HashSet<>(Arrays.asList("cluster_1", "cluster_2")), @@ -911,7 +941,7 @@ private static void updateRemoteCluster(RemoteClusterService service, String clu exceptionAtomicReference.set(x); latch.countDown(); }); - service.updateRemoteCluster(clusterAlias, addresses, proxyAddress, listener); + service.updateRemoteCluster(clusterAlias, addresses, proxyAddress, genericProfile(clusterAlias), listener); latch.await(); if (exceptionAtomicReference.get() != null) { throw exceptionAtomicReference.get(); @@ -953,4 +983,8 @@ public void testSkipUnavailable() { } } } + + private static ConnectionProfile genericProfile(String clusterName) { + return RemoteClusterService.buildConnectionProfileFromSettings(Settings.EMPTY, clusterName); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java index a6dbd1561936e..e39f5d03cba07 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java @@ -80,7 +80,7 @@ public class MockTransport implements Transport, LifecycleComponent { public TransportService createTransportService(Settings settings, ThreadPool threadPool, TransportInterceptor interceptor, Function localNodeFactory, @Nullable ClusterSettings clusterSettings, Set taskHeaders) { - StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ConnectionManager(settings, this, threadPool), + StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ConnectionManager(settings, this), settings, this, threadPool); connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> nodeConnected(discoveryNode)); connectionManager.setDefaultGetConnectionBehavior((cm, discoveryNode) -> createConnection(discoveryNode)); diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index e3d7e72a0bb97..4b998f04a568a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -157,7 +157,7 @@ private MockTransportService(Settings settings, StubbableTransport transport, Th Function localNodeFactory, @Nullable ClusterSettings clusterSettings, Set taskHeaders) { super(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, - new StubbableConnectionManager(new ConnectionManager(settings, transport, threadPool), settings, transport, threadPool)); + new StubbableConnectionManager(new ConnectionManager(settings, transport), settings, transport, threadPool)); this.original = transport.getDelegate(); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java index 41ac87f0af576..108e1bf5e24b5 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java @@ -42,7 +42,7 @@ public class StubbableConnectionManager extends ConnectionManager { private volatile NodeConnectedBehavior defaultNodeConnectedBehavior = ConnectionManager::nodeConnected; public StubbableConnectionManager(ConnectionManager delegate, Settings settings, Transport transport, ThreadPool threadPool) { - super(settings, transport, threadPool); + super(settings, transport); this.delegate = delegate; this.getConnectionBehaviors = new ConcurrentHashMap<>(); this.nodeConnectedBehaviors = new ConcurrentHashMap<>(); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java index c1a28b72cf8fe..c241c7a9aa070 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java @@ -12,6 +12,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryAction; import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryRequest; @@ -75,7 +76,8 @@ void init() { } @Override - protected void updateRemoteCluster(String clusterAlias, List addresses, String proxy) { + protected void updateRemoteCluster(String clusterAlias, List addresses, String proxy, boolean compressionEnabled, + TimeValue pingSchedule) { String repositoryName = CcrRepository.NAME_PREFIX + clusterAlias; if (addresses.isEmpty()) { deleteRepository(repositoryName); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index 5a8a7feb34716..e0f71fe45155a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -50,6 +50,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -68,6 +69,8 @@ import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.transport.NoSuchRemoteClusterException; +import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.CcrIntegTestCase; import org.elasticsearch.xpack.ccr.action.ShardFollowTask; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; @@ -1122,6 +1125,69 @@ private void runFallBehindTest( } } + public void testUpdateRemoteConfigsDuringFollowing() throws Exception { + final int numberOfPrimaryShards = randomIntBetween(1, 3); + int numberOfReplicas = between(0, 1); + + final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, numberOfReplicas, + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderYellow("index1"); + + final int firstBatchNumDocs = randomIntBetween(200, 800); + + logger.info("Executing put follow"); + final PutFollowAction.Request followRequest = putFollow("index1", "index2"); + PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + assertTrue(response.isFollowIndexCreated()); + assertTrue(response.isFollowIndexShardsAcked()); + assertTrue(response.isIndexFollowingStarted()); + + logger.info("Indexing [{}] docs while updateing remote config", firstBatchNumDocs); + try (BackgroundIndexer indexer = new BackgroundIndexer("index1", "_doc", leaderClient(), firstBatchNumDocs, + randomIntBetween(1, 5))) { + + ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); + String address = getLeaderCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString(); + Setting compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("leader_cluster"); + Setting> seeds = RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("leader_cluster"); + settingsRequest.persistentSettings(Settings.builder().put(compress.getKey(), true).put(seeds.getKey(), address)); + assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); + + waitForDocs(firstBatchNumDocs, indexer); + indexer.assertNoFailures(); + + final Map firstBatchNumDocsPerShard = new HashMap<>(); + final ShardStats[] firstBatchShardStats = + leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); + for (final ShardStats shardStats : firstBatchShardStats) { + if (shardStats.getShardRouting().primary()) { + long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1; + firstBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value); + } + } + + assertBusy(assertTask(numberOfPrimaryShards, firstBatchNumDocsPerShard)); + + for (String docId : indexer.getIds()) { + assertBusy(() -> { + final GetResponse getResponse = followerClient().prepareGet("index2", "_doc", docId).get(); + assertTrue("Doc with id [" + docId + "] is missing", getResponse.isExists()); + }); + } + + assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfPrimaryShards); + } finally { + ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); + String address = getLeaderCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString(); + Setting compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("leader_cluster"); + Setting> seeds = RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("leader_cluster"); + settingsRequest.persistentSettings(Settings.builder().put(compress.getKey(), compress.getDefault(Settings.EMPTY)) + .put(seeds.getKey(), address)); + assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); + } + } + private long getFollowTaskSettingsVersion(String followerIndex) { long settingsVersion = -1L; for (ShardFollowNodeTaskStatus status : getFollowTaskStatuses(followerIndex)) { diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java index 03c78ed903e81..e5d4609c13fb1 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.protocol.xpack.graph.GraphExploreRequest; import org.elasticsearch.transport.RemoteClusterAware; @@ -438,7 +439,8 @@ private RemoteClusterResolver(Settings settings, ClusterSettings clusterSettings } @Override - protected void updateRemoteCluster(String clusterAlias, List addresses, String proxyAddress) { + protected void updateRemoteCluster(String clusterAlias, List addresses, String proxyAddress, boolean compressionEnabled, + TimeValue pingSchedule) { if (addresses.isEmpty()) { clusters.remove(clusterAlias); } else {