From d989ba24c1124860179a010b52e0b3d06932cac0 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 9 Jan 2019 17:47:49 -0700 Subject: [PATCH 01/10] WIP --- .../transport/RemoteClusterConnection.java | 13 +++++- .../transport/RemoteClusterService.java | 45 +++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 7ea55925262ff..b0f2778e98be8 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -119,6 +119,13 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos createConnectionManager(settings, clusterAlias, transportService)); } + RemoteClusterConnection(Settings settings, String clusterAlias, List>> seedNodes, + TransportService transportService, int maxNumRemoteConnections, Predicate nodePredicate, + String proxyAddress, ConnectionProfile connectionProfile) { + this(settings, clusterAlias, seedNodes, transportService, maxNumRemoteConnections, nodePredicate, proxyAddress, + createConnectionManager(connectionProfile, transportService)); + } + // Public for tests to pass a StubbableConnectionManager RemoteClusterConnection(Settings settings, String clusterAlias, List>> seedNodes, TransportService transportService, int maxNumRemoteConnections, Predicate nodePredicate, @@ -754,7 +761,11 @@ private static ConnectionManager createConnectionManager(Settings settings, Stri TransportRequestOptions.Type.RECOVERY) .setCompressionEnabled(REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings)) .setPingInterval(REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterAlias).get(settings)); - return new ConnectionManager(builder.build(), transportService.transport, transportService.threadPool); + return createConnectionManager(builder.build(), transportService); + } + + private static ConnectionManager createConnectionManager(ConnectionProfile connectionProfile, TransportService transportService) { + return new ConnectionManager(connectionProfile, transportService.transport, transportService.threadPool); } ConnectionManager getConnectionManager() { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index ff9dea8fe45a7..26c6ea45fe305 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -189,6 +189,7 @@ public String getKey(final String key) { private final TransportService transportService; private final int numRemoteConnections; private volatile Map remoteClusters = Collections.emptyMap(); + private volatile Map remoteClusterConnectionProfiles = Collections.emptyMap(); RemoteClusterService(Settings settings, TransportService transportService) { super(settings); @@ -391,6 +392,31 @@ public void listenForUpdates(ClusterSettings clusterSettings) { clusterSettings.addAffixUpdateConsumer(SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE, this::updateSkipUnavailable, (alias, value) -> {}); } + private synchronized void updateRemoteClusterPingSchedule(String clusterAlias, TimeValue timeValue) { + RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterAlias); + if (remoteClusterConnection != null) { + ConnectionProfile oldProfile = remoteClusterConnection.getConnectionManager().getConnectionProfile(); + if (oldProfile.getPingInterval().equals(timeValue) == false) { + updateRemoteClusterConnectionProfile(clusterAlias); + } + } + } + + private synchronized void updateRemoteClusterCompressionSetting(String clusterAlias, Boolean compressionEnabled) { + RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterAlias); + if (remoteClusterConnection != null) { + ConnectionProfile oldProfile = remoteClusterConnection.getConnectionManager().getConnectionProfile(); + if (oldProfile.getCompressionEnabled().equals(compressionEnabled) == false) { + updateRemoteClusterConnectionProfile(clusterAlias); + } + } + } + + private synchronized void updateRemoteClusterConnectionProfile(String clusterAlias) { + + + } + synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavailable) { RemoteClusterConnection remote = this.remoteClusters.get(clusterAlias); if (remote != null) { @@ -424,6 +450,7 @@ void initializeRemoteClusters() { final PlainActionFuture future = new PlainActionFuture<>(); Map>>>> seeds = RemoteClusterAware.buildRemoteClustersDynamicConfig(settings); + initializeConnectionProfiles(seeds.keySet()); updateRemoteClusters(seeds, future); try { future.get(timeValue.millis(), TimeUnit.MILLISECONDS); @@ -436,6 +463,24 @@ void initializeRemoteClusters() { } } + private synchronized void initializeConnectionProfiles(Set remoteClusters) { + Map connectionProfiles = new HashMap<>(remoteClusters.size()); + for (String clusterName : remoteClusters) { + ConnectionProfile.Builder builder = new ConnectionProfile.Builder() + .setConnectTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings)) + .setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings)) + .addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING) // TODO make this configurable? + // we don't want this to be used for anything else but search + .addConnections(0, TransportRequestOptions.Type.BULK, + TransportRequestOptions.Type.STATE, + TransportRequestOptions.Type.RECOVERY) + .setCompressionEnabled(REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterName).get(settings)) + .setPingInterval(REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterName).get(settings)); + connectionProfiles.put(clusterName, builder.build()); + } + this.remoteClusterConnectionProfiles = Collections.unmodifiableMap(connectionProfiles); + } + @Override public void close() throws IOException { IOUtils.close(remoteClusters.values()); From 000ee4e2b12c7832ea3d962e1a4babd9d9c10053 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 11 Jan 2019 11:13:54 -0700 Subject: [PATCH 02/10] WIP --- .../transport/RemoteClusterService.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 26c6ea45fe305..d9a274224ee89 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -406,15 +406,23 @@ private synchronized void updateRemoteClusterCompressionSetting(String clusterAl RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterAlias); if (remoteClusterConnection != null) { ConnectionProfile oldProfile = remoteClusterConnection.getConnectionManager().getConnectionProfile(); + // May be null if (oldProfile.getCompressionEnabled().equals(compressionEnabled) == false) { - updateRemoteClusterConnectionProfile(clusterAlias); + ConnectionProfile.Builder newProfileBuilder = new ConnectionProfile.Builder(oldProfile); + newProfileBuilder.setCompressionEnabled(compressionEnabled); + updateRemoteClusterConnectionProfile(clusterAlias, newProfileBuilder.build()); } } } - private synchronized void updateRemoteClusterConnectionProfile(String clusterAlias) { - + private synchronized void updateRemoteClusterConnectionProfile(String clusterAlias, ConnectionProfile connectionProfile) { + HashMap connectionProfiles = new HashMap<>(remoteClusterConnectionProfiles); + ConnectionProfile oldConnectionProfile = connectionProfiles.remove(clusterAlias); + if (oldConnectionProfile != null) { + } else { + } + this.remoteClusterConnectionProfiles = Collections.unmodifiableMap(connectionProfiles); } synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavailable) { From 1eb61509f66c7a088635c46b79185395a6c535be Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 11 Jan 2019 17:04:47 -0700 Subject: [PATCH 03/10] WIP --- .../settings/AbstractScopedSettings.java | 51 +++++++++++++ .../transport/RemoteClusterAware.java | 24 ++++-- .../transport/RemoteClusterConnection.java | 8 ++ .../transport/RemoteClusterService.java | 74 +++++++++++-------- .../xpack/ccr/CcrRepositoryManager.java | 4 +- .../authz/IndicesAndAliasesResolver.java | 4 +- 6 files changed, 128 insertions(+), 37 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java index b49f0f8225016..0ed87e583194e 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java @@ -296,6 +296,57 @@ public void apply(Map> values, Settings current, Settings pr }); } + /** + * Adds a affix settings consumer that accepts the values for two settings. The consumer is only notified if one or both settings change + * and if the provided validator succeeded. + *

+ * Note: Only settings registered in {@link SettingsModule} can be changed dynamically. + *

+ * This method registers a compound updater that is useful if two settings are depending on each other. + * The consumer is always provided with both values even if only one of the two changes. + */ + public synchronized void addAffixUpdateConsumer(List> settings, BiConsumer consumer) { + List affixUpdaters = new ArrayList<>(settings.size()); + for (Setting.AffixSetting setting : settings) { + ensureSettingIsRegistered(setting); + affixUpdaters.add(setting.newAffixUpdater((a,b)-> {}, logger, (a,b)-> {})); + } + + addSettingsUpdater(new SettingUpdater>() { + + @Override + public boolean hasChanged(Settings current, Settings previous) { + return affixUpdaters.stream().anyMatch(au -> au.hasChanged(current, previous)); + } + + @Override + public Map getValue(Settings current, Settings previous) { + Set namespaces = new HashSet<>(); + Consumer aConsumer = namespaces::add; + for (Setting.AffixSetting setting : settings) { + SettingUpdater affixUpdaterA = setting.newAffixUpdater((k, v) -> aConsumer.accept(k), logger, (a, b) ->{}); + affixUpdaterA.apply(current, previous); + } + Map namespaceToSettings = new HashMap<>(namespaces.size()); + for (String namespace : namespaces) { + Set> concreteSettings = new HashSet<>(settings.size()); + for (Setting.AffixSetting setting : settings) { + concreteSettings.add(setting.getConcreteSettingForNamespace(namespace)); + } + namespaceToSettings.put(namespace, current.filter(concreteSettings::contains)); + } + return namespaceToSettings; + } + + @Override + public void apply(Map values, Settings current, Settings previous) { + for (Map.Entry entry : values.entrySet()) { + consumer.accept(entry.getKey(), entry.getValue()); + } + } + }); + } + private void ensureSettingIsRegistered(Setting.AffixSetting setting) { final Setting registeredSetting = this.complexMatchers.get(setting.getKey()); if (setting != registeredSetting) { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java index 9b9243b612b74..0c370d93168c7 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.settings.SettingUpgrader; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -282,21 +283,32 @@ protected Map> groupClusterIndices(Set remoteCluste return perClusterIndices; } + void updateRemoteCluster(String clusterAlias, List addresses, String proxy) { + updateRemoteCluster(clusterAlias, addresses, proxy, null, null); + } + + void updateRemoteCluster(String clusterAlias, Settings settings) { + String proxy = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(settings); + List addresses = REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings); + Boolean compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings); + TimeValue pingSchedule = RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterAlias).get(settings); + + updateRemoteCluster(clusterAlias, addresses, proxy, compress, pingSchedule); + } + /** * Subclasses must implement this to receive information about updated cluster aliases. If the given address list is * empty the cluster alias is unregistered and should be removed. */ - protected abstract void updateRemoteCluster(String clusterAlias, List addresses, String proxy); + protected abstract void updateRemoteCluster(String clusterAlias, List addresses, String proxy, Boolean compressionEnabled, + TimeValue timeValue); /** * Registers this instance to listen to updates on the cluster settings. */ public void listenForUpdates(ClusterSettings clusterSettings) { - clusterSettings.addAffixUpdateConsumer( - RemoteClusterAware.REMOTE_CLUSTERS_PROXY, - RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, - (key, value) -> updateRemoteCluster(key, value.v2(), value.v1()), - (namespace, value) -> {}); + clusterSettings.addAffixUpdateConsumer(Arrays.asList(RemoteClusterAware.REMOTE_CLUSTERS_PROXY, RemoteClusterAware.REMOTE_CLUSTERS_SEEDS), + this::updateRemoteCluster); clusterSettings.addAffixUpdateConsumer( RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_PROXY, RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_SEEDS, diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index b0f2778e98be8..069252fb74cba 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -369,6 +369,14 @@ public boolean isClosed() { return connectHandler.isClosed(); } + public String getProxyAddress() { + return proxyAddress; + } + + public List>> getSeedNodes() { + return seedNodes; + } + /** * The connect handler manages node discovery and the actual connect to the remote cluster. * There is at most one connect job running at any time. If such a connect job is triggered diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index d9a274224ee89..256f273fcd115 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -71,6 +71,8 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl private static final Logger logger = LogManager.getLogger(RemoteClusterService.class); + private static final ActionListener noopListener = ActionListener.wrap((x) -> {}, (x) -> {}); + static { // remove search.remote.* settings in 8.0.0 assert Version.CURRENT.major < 8; @@ -393,36 +395,45 @@ public void listenForUpdates(ClusterSettings clusterSettings) { } private synchronized void updateRemoteClusterPingSchedule(String clusterAlias, TimeValue timeValue) { - RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterAlias); - if (remoteClusterConnection != null) { - ConnectionProfile oldProfile = remoteClusterConnection.getConnectionManager().getConnectionProfile(); + ConnectionProfile oldProfile = remoteClusterConnectionProfiles.get(clusterAlias); + if (oldProfile != null) { if (oldProfile.getPingInterval().equals(timeValue) == false) { - updateRemoteClusterConnectionProfile(clusterAlias); + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(oldProfile); + builder.setPingInterval(timeValue); + updateRemoteClusterConnectionProfile(clusterAlias, builder.build()); } + } else { + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(buildConnectionProfileFromSettings(clusterAlias)); + builder.setPingInterval(timeValue); + updateRemoteClusterConnectionProfile(clusterAlias, builder.build()); } } private synchronized void updateRemoteClusterCompressionSetting(String clusterAlias, Boolean compressionEnabled) { - RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterAlias); - if (remoteClusterConnection != null) { - ConnectionProfile oldProfile = remoteClusterConnection.getConnectionManager().getConnectionProfile(); - // May be null + ConnectionProfile oldProfile = remoteClusterConnectionProfiles.get(clusterAlias); + if (oldProfile != null) { if (oldProfile.getCompressionEnabled().equals(compressionEnabled) == false) { - ConnectionProfile.Builder newProfileBuilder = new ConnectionProfile.Builder(oldProfile); - newProfileBuilder.setCompressionEnabled(compressionEnabled); - updateRemoteClusterConnectionProfile(clusterAlias, newProfileBuilder.build()); + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(oldProfile); + builder.setCompressionEnabled(compressionEnabled); + updateRemoteClusterConnectionProfile(clusterAlias, builder.build()); } + } else { + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(buildConnectionProfileFromSettings(clusterAlias)); + builder.setCompressionEnabled(compressionEnabled); + updateRemoteClusterConnectionProfile(clusterAlias, builder.build()); } } private synchronized void updateRemoteClusterConnectionProfile(String clusterAlias, ConnectionProfile connectionProfile) { HashMap connectionProfiles = new HashMap<>(remoteClusterConnectionProfiles); - ConnectionProfile oldConnectionProfile = connectionProfiles.remove(clusterAlias); - if (oldConnectionProfile != null) { - } else { - - } + connectionProfiles.put(clusterAlias, connectionProfile); this.remoteClusterConnectionProfiles = Collections.unmodifiableMap(connectionProfiles); + RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterAlias); + if (remoteClusterConnection != null) { + String proxyAddress = remoteClusterConnection.getProxyAddress(); + List>> seedNodes = remoteClusterConnection.getSeedNodes(); + updateRemoteClusters(Collections.singletonMap(clusterAlias, new Tuple<>(proxyAddress, seedNodes)), noopListener); + } } synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavailable) { @@ -433,8 +444,9 @@ synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavail } @Override - protected void updateRemoteCluster(String clusterAlias, List addresses, String proxyAddress) { - updateRemoteCluster(clusterAlias, addresses, proxyAddress, ActionListener.wrap((x) -> {}, (x) -> {})); + protected void updateRemoteCluster(String clusterAlias, List addresses, String proxyAddress, Boolean compressionEnabled, + TimeValue timeValue) { + updateRemoteCluster(clusterAlias, addresses, proxyAddress, noopListener); } void updateRemoteCluster( @@ -474,21 +486,25 @@ void initializeRemoteClusters() { private synchronized void initializeConnectionProfiles(Set remoteClusters) { Map connectionProfiles = new HashMap<>(remoteClusters.size()); for (String clusterName : remoteClusters) { - ConnectionProfile.Builder builder = new ConnectionProfile.Builder() - .setConnectTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings)) - .setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings)) - .addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING) // TODO make this configurable? - // we don't want this to be used for anything else but search - .addConnections(0, TransportRequestOptions.Type.BULK, - TransportRequestOptions.Type.STATE, - TransportRequestOptions.Type.RECOVERY) - .setCompressionEnabled(REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterName).get(settings)) - .setPingInterval(REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterName).get(settings)); - connectionProfiles.put(clusterName, builder.build()); + connectionProfiles.put(clusterName, buildConnectionProfileFromSettings(clusterName)); } this.remoteClusterConnectionProfiles = Collections.unmodifiableMap(connectionProfiles); } + private ConnectionProfile buildConnectionProfileFromSettings(String clusterName) { + return new ConnectionProfile.Builder() + .setConnectTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings)) + .setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings)) + .addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING) // TODO make this configurable? + // we don't want this to be used for anything else but search + .addConnections(0, TransportRequestOptions.Type.BULK, + TransportRequestOptions.Type.STATE, + TransportRequestOptions.Type.RECOVERY) + .setCompressionEnabled(REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterName).get(settings)) + .setPingInterval(REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterName).get(settings)) + .build(); + } + @Override public void close() throws IOException { IOUtils.close(remoteClusters.values()); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java index 54403df367809..8b0326f96cc2a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java @@ -12,6 +12,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryAction; import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryRequest; @@ -76,7 +77,8 @@ void init() { } @Override - protected void updateRemoteCluster(String clusterAlias, List addresses, String proxy) { + protected void updateRemoteCluster(String clusterAlias, List addresses, String proxy, Boolean compressionEnabled, + TimeValue timeValue) { String repositoryName = CcrRepository.NAME_PREFIX + clusterAlias; if (addresses.isEmpty()) { deleteRepository(repositoryName); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java index aa1461b189a39..7b59abb28bfd2 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.protocol.xpack.graph.GraphExploreRequest; import org.elasticsearch.transport.RemoteClusterAware; @@ -439,7 +440,8 @@ private RemoteClusterResolver(Settings settings, ClusterSettings clusterSettings } @Override - protected void updateRemoteCluster(String clusterAlias, List addresses, String proxyAddress) { + protected void updateRemoteCluster(String clusterAlias, List addresses, String proxyAddress, Boolean compressionEnabled, + TimeValue timeValue) { if (addresses.isEmpty()) { clusters.remove(clusterAlias); } else { From c34ab4e6e51ab37f42a43416d20ff34ddc782a42 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 11 Jan 2019 17:58:33 -0700 Subject: [PATCH 04/10] WIP --- .../settings/AbstractScopedSettings.java | 12 ++- .../transport/RemoteClusterAware.java | 14 +-- .../transport/RemoteClusterService.java | 44 ++++++++-- .../common/settings/ScopedSettingsTests.java | 85 +++++++++++++++++++ .../xpack/ccr/CcrRepositoryManager.java | 4 +- .../authz/IndicesAndAliasesResolver.java | 4 +- 6 files changed, 140 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java index 0ed87e583194e..0060d82f7509c 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java @@ -297,15 +297,13 @@ public void apply(Map> values, Settings current, Settings pr } /** - * Adds a affix settings consumer that accepts the values for two settings. The consumer is only notified if one or both settings change - * and if the provided validator succeeded. + * Adds a affix settings consumer that accepts the settings for a group of settings. The consumer is only + * notified if at least one of the settings change. *

* Note: Only settings registered in {@link SettingsModule} can be changed dynamically. *

- * This method registers a compound updater that is useful if two settings are depending on each other. - * The consumer is always provided with both values even if only one of the two changes. */ - public synchronized void addAffixUpdateConsumer(List> settings, BiConsumer consumer) { + public synchronized void addAffixGroupUpdateConsumer(List> settings, BiConsumer consumer) { List affixUpdaters = new ArrayList<>(settings.size()); for (Setting.AffixSetting setting : settings) { ensureSettingIsRegistered(setting); @@ -329,9 +327,9 @@ public Map getValue(Settings current, Settings previous) { } Map namespaceToSettings = new HashMap<>(namespaces.size()); for (String namespace : namespaces) { - Set> concreteSettings = new HashSet<>(settings.size()); + Set concreteSettings = new HashSet<>(settings.size()); for (Setting.AffixSetting setting : settings) { - concreteSettings.add(setting.getConcreteSettingForNamespace(namespace)); + concreteSettings.add(setting.getConcreteSettingForNamespace(namespace).getKey()); } namespaceToSettings.put(namespace, current.filter(concreteSettings::contains)); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java index 0c370d93168c7..b20c650465bb1 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java @@ -284,7 +284,9 @@ protected Map> groupClusterIndices(Set remoteCluste } void updateRemoteCluster(String clusterAlias, List addresses, String proxy) { - updateRemoteCluster(clusterAlias, addresses, proxy, null, null); + Boolean compress = TransportSettings.TRANSPORT_COMPRESS.get(settings); + TimeValue pingSchedule = TransportSettings.PING_SCHEDULE.get(settings); + updateRemoteCluster(clusterAlias, addresses, proxy, compress, pingSchedule); } void updateRemoteCluster(String clusterAlias, Settings settings) { @@ -300,15 +302,17 @@ void updateRemoteCluster(String clusterAlias, Settings settings) { * Subclasses must implement this to receive information about updated cluster aliases. If the given address list is * empty the cluster alias is unregistered and should be removed. */ - protected abstract void updateRemoteCluster(String clusterAlias, List addresses, String proxy, Boolean compressionEnabled, - TimeValue timeValue); + protected abstract void updateRemoteCluster(String clusterAlias, List addresses, String proxy, boolean compressionEnabled, + TimeValue pingSchedule); /** * Registers this instance to listen to updates on the cluster settings. */ public void listenForUpdates(ClusterSettings clusterSettings) { - clusterSettings.addAffixUpdateConsumer(Arrays.asList(RemoteClusterAware.REMOTE_CLUSTERS_PROXY, RemoteClusterAware.REMOTE_CLUSTERS_SEEDS), - this::updateRemoteCluster); + List> remoteClusterSettings = Arrays.asList(RemoteClusterAware.REMOTE_CLUSTERS_PROXY, + RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, RemoteClusterService.REMOTE_CLUSTER_COMPRESS, + RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE); + clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::updateRemoteCluster); clusterSettings.addAffixUpdateConsumer( RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_PROXY, RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_SEEDS, diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 256f273fcd115..bb40818278740 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -219,19 +219,33 @@ private synchronized void updateRemoteClusters(Map>> seedList = entry.getValue().v2(); String proxyAddress = entry.getValue().v1(); - RemoteClusterConnection remote = this.remoteClusters.get(entry.getKey()); + String clusterAlias = entry.getKey(); + RemoteClusterConnection remote = this.remoteClusters.get(clusterAlias); + ConnectionProfile connectionProfile = this.remoteClusterConnectionProfiles.get(clusterAlias); if (seedList.isEmpty()) { // with no seed nodes we just remove the connection try { + // TODO: Close will block!!!!!!!!!!!!!!!! IOUtils.close(remote); } catch (IOException e) { - logger.warn("failed to close remote cluster connections for cluster: " + entry.getKey(), e); + logger.warn("failed to close remote cluster connections for cluster: " + clusterAlias, e); } - remoteClusters.remove(entry.getKey()); + remoteClusters.remove(clusterAlias); continue; } if (remote == null) { // this is a new cluster we have to add a new representation - String clusterAlias = entry.getKey(); + remote = new RemoteClusterConnection(settings, clusterAlias, seedList, transportService, numRemoteConnections, + getNodePredicate(settings), proxyAddress); + remoteClusters.put(clusterAlias, remote); + } else if (remote.getConnectionManager().getConnectionProfile().equals(connectionProfile) == false) { + // TODO: need to implement connection profile equals + // New ConnectionProfile. Must tear down existing connection + try { + IOUtils.close(remote); + } catch (IOException e) { + logger.warn("failed to close remote cluster connections for cluster: " + clusterAlias, e); + } + remoteClusters.remove(clusterAlias); remote = new RemoteClusterConnection(settings, clusterAlias, seedList, transportService, numRemoteConnections, getNodePredicate(settings), proxyAddress); remoteClusters.put(clusterAlias, remote); @@ -250,7 +264,7 @@ private synchronized void updateRemoteClusters(Map addresses, String proxyAddress, Boolean compressionEnabled, - TimeValue timeValue) { + protected void updateRemoteCluster(String clusterAlias, List addresses, String proxyAddress, boolean compressionEnabled, + TimeValue pingSchedule) { + ConnectionProfile oldProfile = remoteClusterConnectionProfiles.get(clusterAlias); + ConnectionProfile newProfile; + if (oldProfile != null) { + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(oldProfile); + builder.setCompressionEnabled(compressionEnabled); + builder.setPingInterval(pingSchedule); + newProfile = builder.build(); + } else { + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(buildConnectionProfileFromSettings(clusterAlias)); + builder.setCompressionEnabled(compressionEnabled); + builder.setPingInterval(pingSchedule); + newProfile = builder.build(); + } + HashMap connectionProfiles = new HashMap<>(remoteClusterConnectionProfiles); + connectionProfiles.put(clusterAlias, newProfile); + this.remoteClusterConnectionProfiles = Collections.unmodifiableMap(connectionProfiles); updateRemoteCluster(clusterAlias, addresses, proxyAddress, noopListener); } diff --git a/server/src/test/java/org/elasticsearch/common/settings/ScopedSettingsTests.java b/server/src/test/java/org/elasticsearch/common/settings/ScopedSettingsTests.java index fc732fbd88e2e..9965c36632aac 100644 --- a/server/src/test/java/org/elasticsearch/common/settings/ScopedSettingsTests.java +++ b/server/src/test/java/org/elasticsearch/common/settings/ScopedSettingsTests.java @@ -316,6 +316,91 @@ public void testTupleAffixUpdateConsumer() { assertEquals(0, results.size()); } + public void testAffixGroupUpdateConsumer() { + String prefix = randomAlphaOfLength(3) + "foo."; + String intSuffix = randomAlphaOfLength(3); + String listSuffix = randomAlphaOfLength(4); + Setting.AffixSetting intSetting = Setting.affixKeySetting(prefix, intSuffix, + (k) -> Setting.intSetting(k, 1, Property.Dynamic, Property.NodeScope)); + Setting.AffixSetting> listSetting = Setting.affixKeySetting(prefix, listSuffix, + (k) -> Setting.listSetting(k, Arrays.asList("1"), Integer::parseInt, Property.Dynamic, Property.NodeScope)); + AbstractScopedSettings service = new ClusterSettings(Settings.EMPTY,new HashSet<>(Arrays.asList(intSetting, listSetting))); + Map results = new HashMap<>(); + Function listBuilder = g -> (prefix + g + "." + listSuffix); + Function intBuilder = g -> (prefix + g + "." + intSuffix); + String group1 = randomAlphaOfLength(3); + String group2 = randomAlphaOfLength(4); + String group3 = randomAlphaOfLength(5); + BiConsumer listConsumer = results::put; + + service.addAffixGroupUpdateConsumer(Arrays.asList(intSetting, listSetting), listConsumer); + assertEquals(0, results.size()); + service.applySettings(Settings.builder() + .put(intBuilder.apply(group1), 2) + .put(intBuilder.apply(group2), 7) + .putList(listBuilder.apply(group1), "16", "17") + .putList(listBuilder.apply(group2), "18", "19", "20") + .build()); + Settings groupOneSettings = results.get(group1); + Settings groupTwoSettings = results.get(group2); + assertEquals(2, intSetting.getConcreteSettingForNamespace(group1).get(groupOneSettings).intValue()); + assertEquals(7, intSetting.getConcreteSettingForNamespace(group2).get(groupTwoSettings).intValue()); + assertEquals(Arrays.asList(16, 17), listSetting.getConcreteSettingForNamespace(group1).get(groupOneSettings)); + assertEquals(Arrays.asList(18, 19, 20), listSetting.getConcreteSettingForNamespace(group2).get(groupTwoSettings)); + assertEquals(2, groupOneSettings.size()); + assertEquals(2, groupTwoSettings.size()); + assertEquals(2, results.size()); + + results.clear(); + + service.applySettings(Settings.builder() + .put(intBuilder.apply(group1), 2) + .put(intBuilder.apply(group2), 7) + .putList(listBuilder.apply(group1), "16", "17") + .putNull(listBuilder.apply(group2)) // removed + .build()); + + assertNull(group1 + " wasn't changed", results.get(group1)); + groupTwoSettings = results.get(group2); + assertEquals(7, intSetting.getConcreteSettingForNamespace(group2).get(groupTwoSettings).intValue()); + assertEquals(Arrays.asList(1), listSetting.getConcreteSettingForNamespace(group2).get(groupTwoSettings)); + assertEquals(1, results.size()); + assertEquals(2, groupTwoSettings.size()); + results.clear(); + + service.applySettings(Settings.builder() + .put(intBuilder.apply(group1), 2) + .put(intBuilder.apply(group2), 7) + .putList(listBuilder.apply(group1), "16", "17") + .putList(listBuilder.apply(group3), "5", "6") // added + .build()); + assertNull(group1 + " wasn't changed", results.get(group1)); + assertNull(group2 + " wasn't changed", results.get(group2)); + + Settings groupThreeSettings = results.get(group3); + assertEquals(1, intSetting.getConcreteSettingForNamespace(group3).get(groupThreeSettings).intValue()); + assertEquals(Arrays.asList(5, 6), listSetting.getConcreteSettingForNamespace(group3).get(groupThreeSettings)); + assertEquals(1, results.size()); + assertEquals(2, groupThreeSettings.size()); + results.clear(); + + service.applySettings(Settings.builder() + .put(intBuilder.apply(group1), 4) // modified + .put(intBuilder.apply(group2), 7) + .putList(listBuilder.apply(group1), "16", "17") + .putList(listBuilder.apply(group3), "5", "6") + .build()); + assertNull(group2 + " wasn't changed", results.get(group2)); + assertNull(group3 + " wasn't changed", results.get(group3)); + + groupOneSettings = results.get(group1); + assertEquals(4, intSetting.getConcreteSettingForNamespace(group1).get(groupOneSettings).intValue()); + assertEquals(Arrays.asList(16, 17), listSetting.getConcreteSettingForNamespace(group1).get(groupOneSettings)); + assertEquals(1, results.size()); + assertEquals(2, groupOneSettings.size()); + results.clear(); + } + public void testAddConsumerAffix() { Setting.AffixSetting intSetting = Setting.affixKeySetting("foo.", "bar", (k) -> Setting.intSetting(k, 1, Property.Dynamic, Property.NodeScope)); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java index 8b0326f96cc2a..768523536eadd 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java @@ -77,8 +77,8 @@ void init() { } @Override - protected void updateRemoteCluster(String clusterAlias, List addresses, String proxy, Boolean compressionEnabled, - TimeValue timeValue) { + protected void updateRemoteCluster(String clusterAlias, List addresses, String proxy, boolean compressionEnabled, + TimeValue pingSchedule) { String repositoryName = CcrRepository.NAME_PREFIX + clusterAlias; if (addresses.isEmpty()) { deleteRepository(repositoryName); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java index 7b59abb28bfd2..889150d805c2e 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java @@ -440,8 +440,8 @@ private RemoteClusterResolver(Settings settings, ClusterSettings clusterSettings } @Override - protected void updateRemoteCluster(String clusterAlias, List addresses, String proxyAddress, Boolean compressionEnabled, - TimeValue timeValue) { + protected void updateRemoteCluster(String clusterAlias, List addresses, String proxyAddress, boolean compressionEnabled, + TimeValue pingSchedule) { if (addresses.isEmpty()) { clusters.remove(clusterAlias); } else { From c635cc138602846884c71b833c8af9589c267a79 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 21 Jan 2019 18:13:33 -0700 Subject: [PATCH 05/10] WIP --- .../transport/ConnectionManager.java | 51 ++++++------------ .../transport/RemoteClusterAware.java | 6 ++- .../transport/RemoteClusterConnection.java | 6 ++- .../transport/RemoteClusterService.java | 52 +++---------------- .../transport/TransportService.java | 2 +- .../discovery/PeerFinderTests.java | 2 +- .../transport/ConnectionManagerTests.java | 2 +- .../RemoteClusterConnectionTests.java | 4 +- .../test/transport/MockTransport.java | 2 +- .../test/transport/MockTransportService.java | 2 +- .../transport/StubbableConnectionManager.java | 2 +- 11 files changed, 40 insertions(+), 91 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java index f1067a0c5575f..da86ed076e396 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java @@ -30,7 +30,6 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; import java.io.IOException; @@ -38,8 +37,6 @@ import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -56,19 +53,17 @@ public class ConnectionManager implements Closeable { private final ConcurrentMap connectedNodes = ConcurrentCollections.newConcurrentMap(); private final KeyedLock connectionLock = new KeyedLock<>(); private final Transport transport; - private final ThreadPool threadPool; private final ConnectionProfile defaultProfile; private final AtomicBoolean isClosed = new AtomicBoolean(false); private final ReadWriteLock closeLock = new ReentrantReadWriteLock(); private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener(); - public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) { - this(ConnectionProfile.buildDefaultConnectionProfile(settings), transport, threadPool); + public ConnectionManager(Settings settings, Transport transport) { + this(ConnectionProfile.buildDefaultConnectionProfile(settings), transport); } - public ConnectionManager(ConnectionProfile connectionProfile, Transport transport, ThreadPool threadPool) { + public ConnectionManager(ConnectionProfile connectionProfile, Transport transport) { this.transport = transport; - this.threadPool = threadPool; this.defaultProfile = connectionProfile; } @@ -185,35 +180,23 @@ public int size() { @Override public void close() { + Transports.assertNotTransportThread("Closing ConnectionManager"); if (isClosed.compareAndSet(false, true)) { - CountDownLatch latch = new CountDownLatch(1); - - // TODO: Consider moving all read/write lock (in Transport and this class) to the TransportService - threadPool.generic().execute(() -> { - closeLock.writeLock().lock(); - try { - // we are holding a write lock so nobody adds to the connectedNodes / openConnections map - it's safe to first close - // all instances and then clear them maps - Iterator> iterator = connectedNodes.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry next = iterator.next(); - try { - IOUtils.closeWhileHandlingException(next.getValue()); - } finally { - iterator.remove(); - } + closeLock.writeLock().lock(); + try { + // we are holding a write lock so nobody adds to the connectedNodes / openConnections map - it's safe to first close + // all instances and then clear them maps + Iterator> iterator = connectedNodes.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry next = iterator.next(); + try { + IOUtils.closeWhileHandlingException(next.getValue()); + } finally { + iterator.remove(); } - } finally { - closeLock.writeLock().unlock(); - latch.countDown(); } - }); - - try { - latch.await(30, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - // ignore + } finally { + closeLock.writeLock().unlock(); } } } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java index b20c650465bb1..9e4413712ec92 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java @@ -289,11 +289,13 @@ void updateRemoteCluster(String clusterAlias, List addresses, String pro updateRemoteCluster(clusterAlias, addresses, proxy, compress, pingSchedule); } - void updateRemoteCluster(String clusterAlias, Settings settings) { + private void updateRemoteCluster(String clusterAlias, Settings settings) { String proxy = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(settings); List addresses = REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings); Boolean compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings); - TimeValue pingSchedule = RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterAlias).get(settings); + TimeValue pingSchedule = RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE + .getConcreteSettingForNamespace(clusterAlias) + .get(settings); updateRemoteCluster(clusterAlias, addresses, proxy, compress, pingSchedule); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 069252fb74cba..10e01ffbc3036 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -362,7 +362,9 @@ Transport.Connection getConnection() { @Override public void close() throws IOException { - IOUtils.close(connectHandler, connectionManager); + IOUtils.close(connectHandler); + // In the ConnectionManager we wait on connections being closed. + threadPool.generic().execute(connectionManager::close); } public boolean isClosed() { @@ -773,7 +775,7 @@ private static ConnectionManager createConnectionManager(Settings settings, Stri } private static ConnectionManager createConnectionManager(ConnectionProfile connectionProfile, TransportService transportService) { - return new ConnectionManager(connectionProfile, transportService.transport, transportService.threadPool); + return new ConnectionManager(connectionProfile, transportService.transport); } ConnectionManager getConnectionManager() { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 3b63395e7b5b2..778e87011bdde 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -49,6 +49,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -224,7 +225,6 @@ private synchronized void updateRemoteClusters(Map {}); } - private synchronized void updateRemoteClusterPingSchedule(String clusterAlias, TimeValue timeValue) { - ConnectionProfile oldProfile = remoteClusterConnectionProfiles.get(clusterAlias); - if (oldProfile != null) { - if (oldProfile.getPingInterval().equals(timeValue) == false) { - ConnectionProfile.Builder builder = new ConnectionProfile.Builder(oldProfile); - builder.setPingInterval(timeValue); - updateRemoteClusterConnectionProfile(clusterAlias, builder.build()); - } - } else { - ConnectionProfile.Builder builder = new ConnectionProfile.Builder(buildConnectionProfileFromSettings(clusterAlias)); - builder.setPingInterval(timeValue); - updateRemoteClusterConnectionProfile(clusterAlias, builder.build()); - } - } - - private synchronized void updateRemoteClusterCompressionSetting(String clusterAlias, Boolean compressionEnabled) { - ConnectionProfile oldProfile = remoteClusterConnectionProfiles.get(clusterAlias); - if (oldProfile != null) { - if (oldProfile.getCompressionEnabled().equals(compressionEnabled) == false) { - ConnectionProfile.Builder builder = new ConnectionProfile.Builder(oldProfile); - builder.setCompressionEnabled(compressionEnabled); - updateRemoteClusterConnectionProfile(clusterAlias, builder.build()); - } - } else { - ConnectionProfile.Builder builder = new ConnectionProfile.Builder(buildConnectionProfileFromSettings(clusterAlias)); - builder.setCompressionEnabled(compressionEnabled); - updateRemoteClusterConnectionProfile(clusterAlias, builder.build()); - } - } - - private synchronized void updateRemoteClusterConnectionProfile(String clusterAlias, ConnectionProfile connectionProfile) { - HashMap connectionProfiles = new HashMap<>(remoteClusterConnectionProfiles); - connectionProfiles.put(clusterAlias, connectionProfile); - this.remoteClusterConnectionProfiles = Collections.unmodifiableMap(connectionProfiles); - RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterAlias); - if (remoteClusterConnection != null) { - String proxyAddress = remoteClusterConnection.getProxyAddress(); - List>> seedNodes = remoteClusterConnection.getSeedNodes(); - updateRemoteClusters(Collections.singletonMap(clusterAlias, new Tuple<>(proxyAddress, seedNodes)), noopListener); - } - } - synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavailable) { RemoteClusterConnection remote = this.remoteClusters.get(clusterAlias); if (remote != null) { @@ -552,6 +509,11 @@ public Stream getRemoteConnectionInfos() { return remoteClusters.values().stream().map(RemoteClusterConnection::getConnectionInfo); } + private boolean connectionProfileChanged(ConnectionProfile oldProfile, ConnectionProfile newProfile) { + return Objects.equals(oldProfile.getCompressionEnabled(), newProfile.getCompressionEnabled()) == false + || Objects.equals(oldProfile.getPingInterval(), newProfile.getPingInterval()) == false; + } + /** * Collects all nodes of the given clusters and returns / passes a (clusterAlias, nodeId) to {@link DiscoveryNode} * function on success. diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 57aaba671518e..d8edceb00f72a 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -149,7 +149,7 @@ public TransportService(Settings settings, Transport transport, ThreadPool threa Function localNodeFactory, @Nullable ClusterSettings clusterSettings, Set taskHeaders) { this(settings, transport, threadPool, transportInterceptor, localNodeFactory, clusterSettings, taskHeaders, - new ConnectionManager(settings, transport, threadPool)); + new ConnectionManager(settings, transport)); } public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor, diff --git a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java index 78a2f2446c5dc..5ffe242dfb208 100644 --- a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java @@ -211,7 +211,7 @@ public void setup() { localNode = newDiscoveryNode("local-node"); ConnectionManager innerConnectionManager - = new ConnectionManager(settings, capturingTransport, deterministicTaskQueue.getThreadPool()); + = new ConnectionManager(settings, capturingTransport); StubbableConnectionManager connectionManager = new StubbableConnectionManager(innerConnectionManager, settings, capturingTransport, deterministicTaskQueue.getThreadPool()); connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> { diff --git a/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java b/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java index 578521190e2ff..c1dd512e0232d 100644 --- a/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java @@ -55,7 +55,7 @@ public void createConnectionManager() { .build(); threadPool = new ThreadPool(settings); transport = mock(Transport.class); - connectionManager = new ConnectionManager(settings, transport, threadPool); + connectionManager = new ConnectionManager(settings, transport); TimeValue oneSecond = new TimeValue(1000); TimeValue oneMinute = TimeValue.timeValueMinutes(1); connectionProfile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, oneSecond, oneSecond, diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 02e701ed4bc86..0271673fba0e4 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -477,7 +477,7 @@ public void sendRequest(long requestId, String action, TransportRequest request, } }; - ConnectionManager delegate = new ConnectionManager(Settings.EMPTY, service.transport, threadPool); + ConnectionManager delegate = new ConnectionManager(Settings.EMPTY, service.transport); StubbableConnectionManager connectionManager = new StubbableConnectionManager(delegate, Settings.EMPTY, service.transport, threadPool); @@ -1332,7 +1332,7 @@ public void sendRequest(long requestId, String action, TransportRequest request, } }; - ConnectionManager delegate = new ConnectionManager(Settings.EMPTY, service.transport, threadPool); + ConnectionManager delegate = new ConnectionManager(Settings.EMPTY, service.transport); StubbableConnectionManager connectionManager = new StubbableConnectionManager(delegate, Settings.EMPTY, service.transport, threadPool); diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java index ddfcc29c750ce..e783fd992d20d 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java @@ -77,7 +77,7 @@ public class MockTransport implements Transport, LifecycleComponent { public TransportService createTransportService(Settings settings, ThreadPool threadPool, TransportInterceptor interceptor, Function localNodeFactory, @Nullable ClusterSettings clusterSettings, Set taskHeaders) { - StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ConnectionManager(settings, this, threadPool), + StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ConnectionManager(settings, this), settings, this, threadPool); connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> nodeConnected(discoveryNode)); connectionManager.setDefaultGetConnectionBehavior((cm, discoveryNode) -> createConnection(discoveryNode)); diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 403ac96104a10..a8b16fc29d6c9 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -157,7 +157,7 @@ private MockTransportService(Settings settings, StubbableTransport transport, Th Function localNodeFactory, @Nullable ClusterSettings clusterSettings, Set taskHeaders) { super(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, - new StubbableConnectionManager(new ConnectionManager(settings, transport, threadPool), settings, transport, threadPool)); + new StubbableConnectionManager(new ConnectionManager(settings, transport), settings, transport, threadPool)); this.original = transport.getDelegate(); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java index 994037be07a92..643a176feec02 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java @@ -42,7 +42,7 @@ public class StubbableConnectionManager extends ConnectionManager { private volatile NodeConnectedBehavior defaultNodeConnectedBehavior = ConnectionManager::nodeConnected; public StubbableConnectionManager(ConnectionManager delegate, Settings settings, Transport transport, ThreadPool threadPool) { - super(settings, transport, threadPool); + super(settings, transport); this.delegate = delegate; this.getConnectionBehaviors = new ConcurrentHashMap<>(); this.nodeConnectedBehaviors = new ConcurrentHashMap<>(); From 1eef1bc66010fe4748ea66b6a28fe1b8d84ea043 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 24 Jan 2019 09:54:56 -0700 Subject: [PATCH 06/10] Fix test --- .../java/org/elasticsearch/transport/RemoteClusterService.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index ebc8a95ac363a..8a75b022d006c 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -379,6 +379,9 @@ private synchronized void updateSkipUnavailable(String clusterAlias, Boolean ski @Override protected void updateRemoteCluster(String clusterAlias, List addresses, String proxyAddress, boolean compressionEnabled, TimeValue pingSchedule) { + if (LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) { + throw new IllegalArgumentException("remote clusters must not have the empty string as its key"); + } ConnectionProfile oldProfile = remoteClusterConnectionProfiles.get(clusterAlias); ConnectionProfile newProfile; if (oldProfile != null) { From 131169b0f581ed693c7c79ab4ec45b187f7b76e4 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 28 Jan 2019 17:14:45 -0700 Subject: [PATCH 07/10] Testing --- .../transport/RemoteClusterAware.java | 2 +- .../transport/RemoteClusterConnection.java | 25 +------- .../transport/RemoteClusterService.java | 29 ++++----- .../RemoteClusterConnectionTests.java | 39 ++++++------ .../transport/RemoteClusterServiceTests.java | 60 +++++++++++++++---- 5 files changed, 86 insertions(+), 69 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java index 9e4413712ec92..2278f6deb6da7 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java @@ -289,7 +289,7 @@ void updateRemoteCluster(String clusterAlias, List addresses, String pro updateRemoteCluster(clusterAlias, addresses, proxy, compress, pingSchedule); } - private void updateRemoteCluster(String clusterAlias, Settings settings) { + void updateRemoteCluster(String clusterAlias, Settings settings) { String proxy = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(settings); List addresses = REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings); Boolean compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings); diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 269004d9338fb..f4a1b250e7f5e 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -64,9 +64,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; -import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_COMPRESS; -import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE; - /** * Represents a connection to a single remote cluster. In contrast to a local cluster a remote cluster is not joined such that the * current node is part of the cluster and it won't receive cluster state updates from the remote cluster. Remote clusters are also not @@ -107,14 +104,8 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos * @param maxNumRemoteConnections the maximum number of connections to the remote cluster * @param nodePredicate a predicate to filter eligible remote nodes to connect to * @param proxyAddress the proxy address + * @param connectionProfile the connection profile to use */ - RemoteClusterConnection(Settings settings, String clusterAlias, List>> seedNodes, - TransportService transportService, int maxNumRemoteConnections, Predicate nodePredicate, - String proxyAddress) { - this(settings, clusterAlias, seedNodes, transportService, maxNumRemoteConnections, nodePredicate, proxyAddress, - createConnectionManager(settings, clusterAlias, transportService)); - } - RemoteClusterConnection(Settings settings, String clusterAlias, List>> seedNodes, TransportService transportService, int maxNumRemoteConnections, Predicate nodePredicate, String proxyAddress, ConnectionProfile connectionProfile) { @@ -714,20 +705,6 @@ private synchronized void ensureIteratorAvailable() { } } - private static ConnectionManager createConnectionManager(Settings settings, String clusterAlias, TransportService transportService) { - ConnectionProfile.Builder builder = new ConnectionProfile.Builder() - .setConnectTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings)) - .setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings)) - .addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING) // TODO make this configurable? - // we don't want this to be used for anything else but search - .addConnections(0, TransportRequestOptions.Type.BULK, - TransportRequestOptions.Type.STATE, - TransportRequestOptions.Type.RECOVERY) - .setCompressionEnabled(REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings)) - .setPingInterval(REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterAlias).get(settings)); - return createConnectionManager(builder.build(), transportService); - } - private static ConnectionManager createConnectionManager(ConnectionProfile connectionProfile, TransportService transportService) { return new ConnectionManager(connectionProfile, transportService.transport); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 7fdce7578f7b2..dbd9cca5271dd 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -231,7 +231,7 @@ private synchronized void updateRemoteClusters(Map addresses, builder.setPingInterval(pingSchedule); newProfile = builder.build(); } - HashMap connectionProfiles = new HashMap<>(remoteClusterConnectionProfiles); - connectionProfiles.put(clusterAlias, newProfile); - this.remoteClusterConnectionProfiles = Collections.unmodifiableMap(connectionProfiles); - updateRemoteCluster(clusterAlias, addresses, proxyAddress, noopListener); + updateRemoteCluster(clusterAlias, addresses, proxyAddress, newProfile, noopListener); } - void updateRemoteCluster( - final String clusterAlias, - final List addresses, - final String proxyAddress, - final ActionListener connectionListener) { + void updateRemoteCluster(final String clusterAlias, final List addresses, final String proxyAddress, + final ConnectionProfile connectionProfile, final ActionListener connectionListener) { + HashMap connectionProfiles = new HashMap<>(remoteClusterConnectionProfiles); + connectionProfiles.put(clusterAlias, connectionProfile); + this.remoteClusterConnectionProfiles = Collections.unmodifiableMap(connectionProfiles); final List>> nodes = - addresses.stream().>>map(address -> Tuple.tuple(address, () -> - buildSeedNode(clusterAlias, address, Strings.hasLength(proxyAddress))) - ).collect(Collectors.toList()); + addresses.stream().>>map(address -> Tuple.tuple(address, () -> + buildSeedNode(clusterAlias, address, Strings.hasLength(proxyAddress))) + ).collect(Collectors.toList()); updateRemoteClusters(Collections.singletonMap(clusterAlias, new Tuple<>(proxyAddress, nodes)), connectionListener); } @@ -444,6 +441,10 @@ private synchronized void initializeConnectionProfiles(Set remoteCluster } private ConnectionProfile buildConnectionProfileFromSettings(String clusterName) { + return buildConnectionProfileFromSettings(settings, clusterName); + } + + static ConnectionProfile buildConnectionProfileFromSettings(Settings settings, String clusterName) { return new ConnectionProfile.Builder() .setConnectTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings)) .setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings)) diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 19cbb0ae1804b..9c59b44af13b7 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -95,6 +95,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + private final ConnectionProfile profile = RemoteClusterService.buildConnectionProfileFromSettings(Settings.EMPTY, "cluster"); @Override public void tearDown() throws Exception { @@ -164,7 +165,8 @@ public void testRemoteProfileIsUsedForLocalCluster() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null)) { + Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null, + profile)) { ConnectionManager connectionManager = connection.getConnectionManager(); updateSeedNodes(connection, seedNodes(seedNode)); assertTrue(connectionManager.nodeConnected(seedNode)); @@ -206,7 +208,8 @@ public void testRemoteProfileIsUsedForRemoteCluster() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null)) { + Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null, + profile)) { ConnectionManager connectionManager = connection.getConnectionManager(); updateSeedNodes(connection, seedNodes(seedNode)); assertTrue(connectionManager.nodeConnected(seedNode)); @@ -259,7 +262,8 @@ public void testDiscoverSingleNode() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null)) { + Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null, + profile)) { ConnectionManager connectionManager = connection.getConnectionManager(); updateSeedNodes(connection, seedNodes(seedNode)); assertTrue(connectionManager.nodeConnected(seedNode)); @@ -291,7 +295,7 @@ public void testDiscoverSingleNodeWithIncompatibleSeed() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes, service, Integer.MAX_VALUE, n -> true, null, profile)) { ConnectionManager connectionManager = connection.getConnectionManager(); updateSeedNodes(connection, seedNodes); assertTrue(connectionManager.nodeConnected(seedNode)); @@ -319,7 +323,7 @@ public void testNodeDisconnected() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null, profile)) { ConnectionManager connectionManager = connection.getConnectionManager(); updateSeedNodes(connection, seedNodes(seedNode)); assertTrue(connectionManager.nodeConnected(seedNode)); @@ -369,7 +373,7 @@ public void testFilterDiscoveredNodes() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes(seedNode), service, Integer.MAX_VALUE, n -> n.equals(rejectedNode) == false, null)) { + seedNodes(seedNode), service, Integer.MAX_VALUE, n -> n.equals(rejectedNode) == false, null, profile)) { ConnectionManager connectionManager = connection.getConnectionManager(); updateSeedNodes(connection, seedNodes(seedNode)); if (rejectedNode.equals(seedNode)) { @@ -434,7 +438,8 @@ public void testConnectWithIncompatibleTransports() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null)) { + Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null, + profile)) { ConnectionManager connectionManager = connection.getConnectionManager(); expectThrows( Exception.class, @@ -532,7 +537,7 @@ public void run() { CountDownLatch listenerCalled = new CountDownLatch(1); AtomicReference exceptionReference = new AtomicReference<>(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null, profile)) { ActionListener listener = ActionListener.wrap(x -> { listenerCalled.countDown(); fail("expected exception"); @@ -587,7 +592,7 @@ public void testTriggerUpdatesConcurrently() throws IOException, InterruptedExce service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes, service, Integer.MAX_VALUE, n -> true, null, profile)) { ConnectionManager connectionManager = connection.getConnectionManager(); int numThreads = randomIntBetween(4, 10); Thread[] threads = new Thread[numThreads]; @@ -667,7 +672,7 @@ public void testCloseWhileConcurrentlyConnecting() throws IOException, Interrupt service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes, service, Integer.MAX_VALUE, n -> true, null, profile)) { int numThreads = randomIntBetween(4, 10); Thread[] threads = new Thread[numThreads]; CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); @@ -755,7 +760,7 @@ public void testGetConnectionInfo() throws Exception { service.acceptIncomingRequests(); int maxNumConnections = randomIntBetween(1, 5); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, maxNumConnections, n -> true, null)) { + seedNodes, service, maxNumConnections, n -> true, null, profile)) { // test no nodes connected RemoteConnectionInfo remoteConnectionInfo = assertSerialization(connection.getConnectionInfo()); assertNotNull(remoteConnectionInfo); @@ -887,7 +892,7 @@ public void testEnsureConnected() throws IOException, InterruptedException { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null, profile)) { ConnectionManager connectionManager = connection.getConnectionManager(); assertFalse(connectionManager.nodeConnected(seedNode)); assertFalse(connectionManager.nodeConnected(discoverableNode)); @@ -937,7 +942,7 @@ public void testCollectNodes() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null, profile)) { if (randomBoolean()) { updateSeedNodes(connection, seedNodes(seedNode)); } @@ -985,7 +990,7 @@ public void testConnectedNodesConcurrentAccess() throws IOException, Interrupted service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes, service, Integer.MAX_VALUE, n -> true, null, profile)) { final int numGetThreads = randomIntBetween(4, 10); final Thread[] getThreads = new Thread[numGetThreads]; final int numModifyingThreads = randomIntBetween(4, 10); @@ -1073,7 +1078,7 @@ public void testClusterNameIsChecked() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null, profile)) { ConnectionManager connectionManager = connection.getConnectionManager(); updateSeedNodes(connection, seedNodes(seedNode)); assertTrue(connectionManager.nodeConnected(seedNode)); @@ -1187,7 +1192,7 @@ public void testLazyResolveTransportAddress() throws Exception { return seedNode; }); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedSupplier), service, Integer.MAX_VALUE, n -> true, null)) { + Arrays.asList(seedSupplier), service, Integer.MAX_VALUE, n -> true, null, profile)) { updateSeedNodes(connection, Arrays.asList(seedSupplier)); // Closing connections leads to RemoteClusterConnection.ConnectHandler.collectRemoteNodes // being called again so we try to resolve the same seed node's host twice @@ -1219,7 +1224,7 @@ public void testProxyMode() throws Exception { RemoteClusterAware.buildSeedNode("some-remote-cluster", "node_0:" + randomIntBetween(1, 10000), true)); assertEquals("node_0", seedSupplier.v2().get().getAttributes().get("server_name")); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedSupplier), service, Integer.MAX_VALUE, n -> true, proxyAddress)) { + Arrays.asList(seedSupplier), service, Integer.MAX_VALUE, n -> true, proxyAddress, profile)) { updateSeedNodes(connection, Arrays.asList(seedSupplier), proxyAddress); assertEquals(2, connection.getNumNodesConnected()); assertNotNull(connection.getConnection(discoverableTransport.getLocalDiscoNode())); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 2407106273f3f..47b60a27aa28d 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -400,11 +400,7 @@ public void testCustomPingSchedule() throws IOException { TimeValue.timeValueSeconds(randomIntBetween(1, 10)); builder.put("cluster.remote.cluster_2.transport.ping_schedule", pingSchedule2); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { - assertFalse(service.isCrossClusterSearchEnabled()); service.initializeRemoteClusters(); - assertTrue(service.isCrossClusterSearchEnabled()); - service.updateRemoteCluster("cluster_1", Collections.singletonList(cluster1Seed.getAddress().toString()), null); - assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); RemoteClusterConnection remoteClusterConnection1 = service.getRemoteClusterConnection("cluster_1"); assertEquals(pingSchedule1, remoteClusterConnection1.getConnectionManager().getConnectionProfile().getPingInterval()); @@ -415,6 +411,40 @@ public void testCustomPingSchedule() throws IOException { } } + public void testChangeSettings() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT)) { + DiscoveryNode cluster1Seed = cluster1Transport.getLocalDiscoNode(); + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, + threadPool, null)) { + transportService.start(); + transportService.acceptIncomingRequests(); + Settings.Builder builder = Settings.builder(); + builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); + try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { + service.initializeRemoteClusters(); + RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); + Settings.Builder settingsChange = Settings.builder(); + TimeValue pingSchedule = TimeValue.timeValueSeconds(randomIntBetween(6, 8)); + settingsChange.put("cluster.remote.cluster_1.transport.ping_schedule", pingSchedule); + boolean compressionEnabled = true; + settingsChange.put("cluster.remote.cluster_1.transport.compress", compressionEnabled); + settingsChange.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); + service.updateRemoteCluster("cluster_1", settingsChange.build()); + assertBusy(remoteClusterConnection::isClosed); + + remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); + ConnectionProfile connectionProfile = remoteClusterConnection.getConnectionManager().getConnectionProfile(); + assertEquals(pingSchedule, connectionProfile.getPingInterval()); + assertEquals(compressionEnabled, connectionProfile.getCompressionEnabled()); + } + } + } + } + public void testRemoteNodeAttribute() throws IOException, InterruptedException { final Settings settings = Settings.builder().put("cluster.remote.node.attr", "gateway").build(); @@ -460,14 +490,14 @@ public void testRemoteNodeAttribute() throws IOException, InterruptedException { service.updateRemoteCluster( "cluster_1", Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), null, - connectionListener(firstLatch)); + genericProfile("cluster_1"), connectionListener(firstLatch)); firstLatch.await(); final CountDownLatch secondLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_2", Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), null, - connectionListener(secondLatch)); + genericProfile("cluster_2"), connectionListener(secondLatch)); secondLatch.await(); assertTrue(service.isCrossClusterSearchEnabled()); @@ -525,14 +555,14 @@ public void testRemoteNodeRoles() throws IOException, InterruptedException { service.updateRemoteCluster( "cluster_1", Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), null, - connectionListener(firstLatch)); + genericProfile("cluster_1"), connectionListener(firstLatch)); firstLatch.await(); final CountDownLatch secondLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_2", Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), null, - connectionListener(secondLatch)); + genericProfile("cluster_2"), connectionListener(secondLatch)); secondLatch.await(); assertTrue(service.isCrossClusterSearchEnabled()); @@ -595,17 +625,17 @@ public void testCollectNodes() throws InterruptedException, IOException { assertFalse(service.isCrossClusterSearchEnabled()); final CountDownLatch firstLatch = new CountDownLatch(1); - service.updateRemoteCluster( - "cluster_1", + + service.updateRemoteCluster("cluster_1", Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), null, - connectionListener(firstLatch)); + genericProfile("cluster_1"), connectionListener(firstLatch)); firstLatch.await(); final CountDownLatch secondLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_2", Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), null, - connectionListener(secondLatch)); + genericProfile("cluster_2"), connectionListener(secondLatch)); secondLatch.await(); CountDownLatch latch = new CountDownLatch(1); service.collectNodes(new HashSet<>(Arrays.asList("cluster_1", "cluster_2")), @@ -916,7 +946,7 @@ private static void updateRemoteCluster(RemoteClusterService service, String clu exceptionAtomicReference.set(x); latch.countDown(); }); - service.updateRemoteCluster(clusterAlias, addresses, proxyAddress, listener); + service.updateRemoteCluster(clusterAlias, addresses, proxyAddress, genericProfile(clusterAlias), listener); latch.await(); if (exceptionAtomicReference.get() != null) { throw exceptionAtomicReference.get(); @@ -958,4 +988,8 @@ public void testSkipUnavailable() { } } } + + private static ConnectionProfile genericProfile(String clusterName) { + return RemoteClusterService.buildConnectionProfileFromSettings(Settings.EMPTY, clusterName); + } } From 30688d4e1de92837165f9d3577d788b43e14635f Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 13 Feb 2019 15:38:26 -0700 Subject: [PATCH 08/10] Changes --- .../modules/remote-clusters.asciidoc | 42 +++++++++++- .../xpack/ccr/IndexFollowingIT.java | 66 +++++++++++++++++++ 2 files changed, 106 insertions(+), 2 deletions(-) diff --git a/docs/reference/modules/remote-clusters.asciidoc b/docs/reference/modules/remote-clusters.asciidoc index 768eb7d6117bf..f061964f53281 100644 --- a/docs/reference/modules/remote-clusters.asciidoc +++ b/docs/reference/modules/remote-clusters.asciidoc @@ -49,13 +49,17 @@ cluster: remote: cluster_one: <1> seeds: 127.0.0.1:9300 + transport.ping_schedule: 30s <2> cluster_two: <1> seeds: 127.0.0.1:9301 + transport.compression: true <3> -------------------------------- <1> `cluster_one` and `cluster_two` are arbitrary _cluster aliases_ representing the connection to each cluster. These names are subsequently used to distinguish between local and remote indices. +<2> A keep-alive ping is configured for cluster_one in this example. +<3> Compression is explicitly enabled for requests to cluster_two in this example. The equivalent example using the <> to add remote clusters to all nodes in the cluster would look like the @@ -71,12 +75,14 @@ PUT _cluster/settings "cluster_one": { "seeds": [ "127.0.0.1:9300" - ] + ], + "transport.ping_schedule": "30s" }, "cluster_two": { "seeds": [ "127.0.0.1:9301" - ] + ], + "transport.compression": true }, "cluster_three": { "seeds": [ @@ -92,6 +98,38 @@ PUT _cluster/settings // TEST[setup:host] // TEST[s/127.0.0.1:9300/\${transport_host}/] +Compression and ping schedule can be dynamically updated. However, seeds must be +re-included in the settings update request. Updating the compression or ping schedule +settings requires that all the existing node connections be closed and re-opened. This +can cause in-flight requests to fail. This example would update these settings: + +[source,js] +-------------------------------- +PUT _cluster/settings +{ + "persistent": { + "cluster": { + "remote": { + "cluster_one": { + "seeds": [ + "127.0.0.1:9300" + ], + "transport.ping_schedule": "60s" + }, + "cluster_two": { + "seeds": [ + "127.0.0.1:9301" + ], + "transport.compression": false + } + } + } + } +} +-------------------------------- +// CONSOLE +// TEST[continued] + A remote cluster can be deleted from the cluster settings by setting its seeds to `null`: diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index ade96b4614171..2d59fe13a230f 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -46,6 +46,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -62,6 +63,8 @@ import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.transport.NoSuchRemoteClusterException; +import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.CcrIntegTestCase; import org.elasticsearch.xpack.ccr.action.ShardFollowTask; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; @@ -1050,6 +1053,69 @@ public void testIndexFallBehind() throws Exception { } } + public void testUpdateRemoteConfigsDuringFollowing() throws Exception { + final int numberOfPrimaryShards = randomIntBetween(1, 3); + int numberOfReplicas = between(0, 1); + + final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, numberOfReplicas, + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderYellow("index1"); + + final int firstBatchNumDocs = randomIntBetween(200, 800); + + logger.info("Executing put follow"); + final PutFollowAction.Request followRequest = putFollow("index1", "index2"); + PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + assertTrue(response.isFollowIndexCreated()); + assertTrue(response.isFollowIndexShardsAcked()); + assertTrue(response.isIndexFollowingStarted()); + + logger.info("Indexing [{}] docs while updateing remote config", firstBatchNumDocs); + try (BackgroundIndexer indexer = new BackgroundIndexer("index1", "_doc", leaderClient(), firstBatchNumDocs, + randomIntBetween(1, 5))) { + + ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); + String address = getLeaderCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString(); + Setting compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("leader_cluster"); + Setting> seeds = RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("leader_cluster"); + settingsRequest.persistentSettings(Settings.builder().put(compress.getKey(), true).put(seeds.getKey(), address)); + assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); + + waitForDocs(firstBatchNumDocs, indexer); + indexer.assertNoFailures(); + + final Map firstBatchNumDocsPerShard = new HashMap<>(); + final ShardStats[] firstBatchShardStats = + leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); + for (final ShardStats shardStats : firstBatchShardStats) { + if (shardStats.getShardRouting().primary()) { + long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1; + firstBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value); + } + } + + assertBusy(assertTask(numberOfPrimaryShards, firstBatchNumDocsPerShard)); + + for (String docId : indexer.getIds()) { + assertBusy(() -> { + final GetResponse getResponse = followerClient().prepareGet("index2", "_doc", docId).get(); + assertTrue("Doc with id [" + docId + "] is missing", getResponse.isExists()); + }); + } + + assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfPrimaryShards); + } finally { + ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); + String address = getLeaderCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString(); + Setting compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("leader_cluster"); + Setting> seeds = RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("leader_cluster"); + settingsRequest.persistentSettings(Settings.builder().put(compress.getKey(), compress.getDefault(Settings.EMPTY)) + .put(seeds.getKey(), address)); + assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); + } + } + private long getFollowTaskSettingsVersion(String followerIndex) { long settingsVersion = -1L; for (ShardFollowNodeTaskStatus status : getFollowTaskStatuses(followerIndex)) { From b31d2af19c36966863f237e91d4ff949a7f000c0 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 13 Feb 2019 15:47:04 -0700 Subject: [PATCH 09/10] Fix settings --- docs/reference/modules/remote-clusters.asciidoc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/reference/modules/remote-clusters.asciidoc b/docs/reference/modules/remote-clusters.asciidoc index f061964f53281..98b1928f0430e 100644 --- a/docs/reference/modules/remote-clusters.asciidoc +++ b/docs/reference/modules/remote-clusters.asciidoc @@ -52,7 +52,7 @@ cluster: transport.ping_schedule: 30s <2> cluster_two: <1> seeds: 127.0.0.1:9301 - transport.compression: true <3> + transport.compress: true <3> -------------------------------- <1> `cluster_one` and `cluster_two` are arbitrary _cluster aliases_ representing @@ -82,7 +82,7 @@ PUT _cluster/settings "seeds": [ "127.0.0.1:9301" ], - "transport.compression": true + "transport.compress": true }, "cluster_three": { "seeds": [ @@ -120,7 +120,7 @@ PUT _cluster/settings "seeds": [ "127.0.0.1:9301" ], - "transport.compression": false + "transport.compress": false } } } From 8085001c886314f4a521bac215d456ed9a05dccd Mon Sep 17 00:00:00 2001 From: lcawl Date: Fri, 15 Feb 2019 18:27:18 -0800 Subject: [PATCH 10/10] [DOCS] Edits remote cluster configuration --- .../modules/remote-clusters.asciidoc | 44 +++++++++---------- 1 file changed, 21 insertions(+), 23 deletions(-) diff --git a/docs/reference/modules/remote-clusters.asciidoc b/docs/reference/modules/remote-clusters.asciidoc index 98b1928f0430e..aefa750a92eb0 100644 --- a/docs/reference/modules/remote-clusters.asciidoc +++ b/docs/reference/modules/remote-clusters.asciidoc @@ -27,21 +27,16 @@ more _gateway nodes_ and uses them to federate requests to the remote cluster. [float] [[configuring-remote-clusters]] -=== Configuring Remote Clusters +=== Configuring remote clusters -Remote clusters can be specified globally using -<> (which can be updated dynamically), -or local to individual nodes using the `elasticsearch.yml` file. +You can configure remote clusters globally by using +<>, which you can update dynamically. +Alternatively, you can configure them locally on individual nodes by using the `elasticsearch.yml` file. -If a remote cluster is configured via `elasticsearch.yml` only the nodes with -that configuration will be able to connect to the remote cluster. In other -words, functionality that relies on remote cluster requests will have to be -driven specifically from those nodes. Remote clusters set via the -<> will be available on every node -in the cluster. - -The `elasticsearch.yml` config file for a node that connects to remote clusters -needs to list the remote clusters that should be connected to, for instance: +If you specify the settings in `elasticsearch.yml` files, only the nodes with +those settings can connect to the remote cluster. In other words, functionality +that relies on remote cluster requests must be driven specifically from those +nodes. For example: [source,yaml] -------------------------------- @@ -50,7 +45,7 @@ cluster: cluster_one: <1> seeds: 127.0.0.1:9300 transport.ping_schedule: 30s <2> - cluster_two: <1> + cluster_two: seeds: 127.0.0.1:9301 transport.compress: true <3> @@ -58,12 +53,13 @@ cluster: <1> `cluster_one` and `cluster_two` are arbitrary _cluster aliases_ representing the connection to each cluster. These names are subsequently used to distinguish between local and remote indices. -<2> A keep-alive ping is configured for cluster_one in this example. -<3> Compression is explicitly enabled for requests to cluster_two in this example. +<2> A keep-alive ping is configured for `cluster_one`. +<3> Compression is explicitly enabled for requests to `cluster_two`. + +For more information about the optional transport settings, see +<>. -The equivalent example using the <> to add remote clusters to all nodes in the cluster would look like the -following: +If you use <>, the remote clusters are available on every node in the cluster. For example: [source,js] -------------------------------- @@ -98,10 +94,8 @@ PUT _cluster/settings // TEST[setup:host] // TEST[s/127.0.0.1:9300/\${transport_host}/] -Compression and ping schedule can be dynamically updated. However, seeds must be -re-included in the settings update request. Updating the compression or ping schedule -settings requires that all the existing node connections be closed and re-opened. This -can cause in-flight requests to fail. This example would update these settings: +You can dynamically update the compression and ping schedule settings. However, +you must re-include seeds in the settings update request. For example: [source,js] -------------------------------- @@ -130,6 +124,10 @@ PUT _cluster/settings // CONSOLE // TEST[continued] +NOTE: When the compression or ping schedule settings change, all the existing +node connections must close and re-open, which can cause in-flight requests to +fail. + A remote cluster can be deleted from the cluster settings by setting its seeds to `null`: