Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebuild remote connections on profile changes #37678

Merged
merged 18 commits into from
Feb 19, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,55 @@ public void apply(Map<String, Tuple<A, B>> values, Settings current, Settings pr
});
}

/**
* Adds a affix settings consumer that accepts the settings for a group of settings. The consumer is only
* notified if at least one of the settings change.
* <p>
* Note: Only settings registered in {@link SettingsModule} can be changed dynamically.
* </p>
*/
public synchronized void addAffixGroupUpdateConsumer(List<Setting.AffixSetting<?>> settings, BiConsumer<String, Settings> consumer) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you pull this out into a separate change, leaving the usage here for reference? Then we can integrate this back after that has a separate review from the core/infra team.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened #37679

List<SettingUpdater> affixUpdaters = new ArrayList<>(settings.size());
for (Setting.AffixSetting<?> setting : settings) {
ensureSettingIsRegistered(setting);
affixUpdaters.add(setting.newAffixUpdater((a,b)-> {}, logger, (a,b)-> {}));
}

addSettingsUpdater(new SettingUpdater<Map<String, Settings>>() {

@Override
public boolean hasChanged(Settings current, Settings previous) {
return affixUpdaters.stream().anyMatch(au -> au.hasChanged(current, previous));
}

@Override
public Map<String, Settings> getValue(Settings current, Settings previous) {
Set<String> namespaces = new HashSet<>();
Consumer<String> aConsumer = namespaces::add;
for (Setting.AffixSetting<?> setting : settings) {
SettingUpdater affixUpdaterA = setting.newAffixUpdater((k, v) -> aConsumer.accept(k), logger, (a, b) ->{});
affixUpdaterA.apply(current, previous);
}
Map<String, Settings> namespaceToSettings = new HashMap<>(namespaces.size());
for (String namespace : namespaces) {
Set<String> concreteSettings = new HashSet<>(settings.size());
for (Setting.AffixSetting<?> setting : settings) {
concreteSettings.add(setting.getConcreteSettingForNamespace(namespace).getKey());
}
namespaceToSettings.put(namespace, current.filter(concreteSettings::contains));
}
return namespaceToSettings;
}

@Override
public void apply(Map<String, Settings> values, Settings current, Settings previous) {
for (Map.Entry<String, Settings> entry : values.entrySet()) {
consumer.accept(entry.getKey(), entry.getValue());
}
}
});
}

private void ensureSettingIsRegistered(Setting.AffixSetting<?> setting) {
final Setting<?> registeredSetting = this.complexMatchers.get(setting.getKey());
if (setting != registeredSetting) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,13 @@
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand All @@ -56,19 +53,17 @@ public class ConnectionManager implements Closeable {
private final ConcurrentMap<DiscoveryNode, Transport.Connection> connectedNodes = ConcurrentCollections.newConcurrentMap();
private final KeyedLock<String> connectionLock = new KeyedLock<>();
private final Transport transport;
private final ThreadPool threadPool;
private final ConnectionProfile defaultProfile;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener();

public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) {
this(ConnectionProfile.buildDefaultConnectionProfile(settings), transport, threadPool);
public ConnectionManager(Settings settings, Transport transport) {
this(ConnectionProfile.buildDefaultConnectionProfile(settings), transport);
}

public ConnectionManager(ConnectionProfile connectionProfile, Transport transport, ThreadPool threadPool) {
public ConnectionManager(ConnectionProfile connectionProfile, Transport transport) {
this.transport = transport;
this.threadPool = threadPool;
this.defaultProfile = connectionProfile;
}

Expand Down Expand Up @@ -185,35 +180,23 @@ public int size() {

@Override
public void close() {
Transports.assertNotTransportThread("Closing ConnectionManager");
if (isClosed.compareAndSet(false, true)) {
CountDownLatch latch = new CountDownLatch(1);

// TODO: Consider moving all read/write lock (in Transport and this class) to the TransportService
threadPool.generic().execute(() -> {
closeLock.writeLock().lock();
try {
// we are holding a write lock so nobody adds to the connectedNodes / openConnections map - it's safe to first close
// all instances and then clear them maps
Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<DiscoveryNode, Transport.Connection> next = iterator.next();
try {
IOUtils.closeWhileHandlingException(next.getValue());
} finally {
iterator.remove();
}
closeLock.writeLock().lock();
try {
// we are holding a write lock so nobody adds to the connectedNodes / openConnections map - it's safe to first close
// all instances and then clear them maps
Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<DiscoveryNode, Transport.Connection> next = iterator.next();
try {
IOUtils.closeWhileHandlingException(next.getValue());
} finally {
iterator.remove();
}
} finally {
closeLock.writeLock().unlock();
latch.countDown();
}
});

try {
latch.await(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// ignore
} finally {
closeLock.writeLock().unlock();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.settings.SettingUpgrader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;

import java.net.InetAddress;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -282,21 +283,38 @@ protected Map<String, List<String>> groupClusterIndices(Set<String> remoteCluste
return perClusterIndices;
}

void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxy) {
Boolean compress = TransportSettings.TRANSPORT_COMPRESS.get(settings);
TimeValue pingSchedule = TransportSettings.PING_SCHEDULE.get(settings);
updateRemoteCluster(clusterAlias, addresses, proxy, compress, pingSchedule);
}

private void updateRemoteCluster(String clusterAlias, Settings settings) {
String proxy = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(settings);
List<String> addresses = REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings);
Boolean compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings);
TimeValue pingSchedule = RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE
.getConcreteSettingForNamespace(clusterAlias)
.get(settings);

updateRemoteCluster(clusterAlias, addresses, proxy, compress, pingSchedule);
}

/**
* Subclasses must implement this to receive information about updated cluster aliases. If the given address list is
* empty the cluster alias is unregistered and should be removed.
*/
protected abstract void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxy);
protected abstract void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxy, boolean compressionEnabled,
TimeValue pingSchedule);

/**
* Registers this instance to listen to updates on the cluster settings.
*/
public void listenForUpdates(ClusterSettings clusterSettings) {
clusterSettings.addAffixUpdateConsumer(
RemoteClusterAware.REMOTE_CLUSTERS_PROXY,
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
(key, value) -> updateRemoteCluster(key, value.v2(), value.v1()),
(namespace, value) -> {});
List<Setting.AffixSetting<?>> remoteClusterSettings = Arrays.asList(RemoteClusterAware.REMOTE_CLUSTERS_PROXY,
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, RemoteClusterService.REMOTE_CLUSTER_COMPRESS,
RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE);
clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::updateRemoteCluster);
clusterSettings.addAffixUpdateConsumer(
RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_PROXY,
RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_SEEDS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
createConnectionManager(settings, clusterAlias, transportService));
}

RemoteClusterConnection(Settings settings, String clusterAlias, List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes,
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate,
String proxyAddress, ConnectionProfile connectionProfile) {
this(settings, clusterAlias, seedNodes, transportService, maxNumRemoteConnections, nodePredicate, proxyAddress,
createConnectionManager(connectionProfile, transportService));
}

// Public for tests to pass a StubbableConnectionManager
RemoteClusterConnection(Settings settings, String clusterAlias, List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes,
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate,
Expand Down Expand Up @@ -355,13 +362,23 @@ Transport.Connection getConnection() {

@Override
public void close() throws IOException {
IOUtils.close(connectHandler, connectionManager);
IOUtils.close(connectHandler);
// In the ConnectionManager we wait on connections being closed.
threadPool.generic().execute(connectionManager::close);
}

public boolean isClosed() {
return connectHandler.isClosed();
}

public String getProxyAddress() {
return proxyAddress;
}

public List<Tuple<String, Supplier<DiscoveryNode>>> getSeedNodes() {
return seedNodes;
}

/**
* The connect handler manages node discovery and the actual connect to the remote cluster.
* There is at most one connect job running at any time. If such a connect job is triggered
Expand Down Expand Up @@ -754,7 +771,11 @@ private static ConnectionManager createConnectionManager(Settings settings, Stri
TransportRequestOptions.Type.RECOVERY)
.setCompressionEnabled(REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings))
.setPingInterval(REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterAlias).get(settings));
return new ConnectionManager(builder.build(), transportService.transport, transportService.threadPool);
return createConnectionManager(builder.build(), transportService);
}

private static ConnectionManager createConnectionManager(ConnectionProfile connectionProfile, TransportService transportService) {
return new ConnectionManager(connectionProfile, transportService.transport);
}

ConnectionManager getConnectionManager() {
Expand Down
Loading