From 32083c55e51dd3b0cef5edd28d5155ec70a77c96 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 3 Sep 2014 20:12:44 +0100 Subject: [PATCH] move to a PingContextProvider implemented by ZenDiscovery --- .../discovery/zen/ZenDiscovery.java | 50 ++++++++++------- .../zen/ping/PingContextProvider.java | 32 +++++++++++ .../discovery/zen/ping/ZenPing.java | 5 +- .../discovery/zen/ping/ZenPingService.java | 17 +++--- .../zen/ping/multicast/MulticastZenPing.java | 55 +++++++++---------- .../zen/ping/unicast/UnicastZenPing.java | 31 +++++------ .../cluster/MinimumMasterNodesTests.java | 3 +- .../ping/multicast/MulticastZenPingTests.java | 36 ++++++++---- .../zen/ping/unicast/UnicastZenPingTests.java | 27 +++++---- .../test/cluster/NoopClusterService.java | 9 +-- 10 files changed, 154 insertions(+), 111 deletions(-) create mode 100644 src/main/java/org/elasticsearch/discovery/zen/ping/PingContextProvider.java diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 241a1be21ca2b..ac08c9b08a7ed 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -52,6 +52,7 @@ import org.elasticsearch.discovery.zen.fd.MasterFaultDetection; import org.elasticsearch.discovery.zen.fd.NodesFaultDetection; import org.elasticsearch.discovery.zen.membership.MembershipAction; +import org.elasticsearch.discovery.zen.ping.PingContextProvider; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.discovery.zen.ping.ZenPingService; import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction; @@ -76,7 +77,7 @@ /** * */ -public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, DiscoveryNodesProvider { +public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider { public final static String SETTING_REJOIN_ON_MASTER_GONE = "discovery.zen.rejoin_on_master_gone"; public final static String SETTING_PING_TIMEOUT = "discovery.zen.ping.timeout"; @@ -139,6 +140,9 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen private volatile boolean rejoinOnMasterGone; + // will be set to true upon the first successful cluster join + private final AtomicBoolean hasJoinedClusterOnce = new AtomicBoolean(); + @Nullable private NodeService nodeService; @@ -194,7 +198,7 @@ public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threa this.nodesFD.addListener(new NodeFaultDetectionListener()); this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener(), discoverySettings, clusterName); - this.pingService.setNodesProvider(this); + this.pingService.setPingContextProvider(this); this.membership = new MembershipAction(settings, clusterService, transportService, this, new MembershipListener()); transportService.registerHandler(DISCOVERY_REJOIN_ACTION_NAME, new RejoinClusterRequestHandler()); @@ -290,6 +294,7 @@ public String nodeDescription() { return clusterName.value() + "/" + localNode.id(); } + /** start of {@link org.elasticsearch.discovery.zen.ping.PingContextProvider } implementation */ @Override public DiscoveryNodes nodes() { DiscoveryNodes latestNodes = this.latestDiscoNodes; @@ -305,6 +310,14 @@ public NodeService nodeService() { return this.nodeService; } + @Override + public boolean isFirstClusterJoin() { + return !hasJoinedClusterOnce.get(); + } + + /** end of {@link org.elasticsearch.discovery.zen.ping.PingContextProvider } implementation */ + + @Override public void publish(ClusterState clusterState, AckListener ackListener) { if (!master) { @@ -387,6 +400,7 @@ public void onFailure(String source, Throwable t) { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { sendInitialStateEventIfNeeded(); + hasJoinedClusterOnce.set(true); } }); } else { @@ -404,8 +418,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } masterFD.start(masterNode, "initial_join"); - // no need to submit the received cluster state, we will get it from the master when it publishes - // the fact that we joined + hasJoinedClusterOnce.set(true); } } } @@ -963,39 +976,36 @@ private DiscoveryNode findMaster() { } } - Set possibleMasterNodes = Sets.newHashSet(); - // master nodes who has previously been part of the cluster and do not ping for the very first time - Set alreadyJoinedPossibleMasterNodes = Sets.newHashSet(); + // nodes discovered during pinging + Set activeNodes = Sets.newHashSet(); + // nodes discovered who has previously been part of the cluster and do not ping for the very first time + Set joinedOnceActiveNodes = Sets.newHashSet(); if (localNode.masterNode()) { - possibleMasterNodes.add(localNode); - if (clusterService.state().version() > 0) { - alreadyJoinedPossibleMasterNodes.add(localNode); + activeNodes.add(localNode); + if (hasJoinedClusterOnce.get()) { + joinedOnceActiveNodes.add(localNode); } } for (ZenPing.PingResponse pingResponse : pingResponses) { - possibleMasterNodes.add(pingResponse.target()); + activeNodes.add(pingResponse.target()); if (!pingResponse.initialJoin()) { - alreadyJoinedPossibleMasterNodes.add(pingResponse.target()); + joinedOnceActiveNodes.add(pingResponse.target()); } } if (pingMasters.isEmpty()) { - // if we don't have enough master nodes, we bail, because there are not enough master to elect from - if (electMaster.hasEnoughMasterNodes(possibleMasterNodes)) { + if (electMaster.hasEnoughMasterNodes(activeNodes)) { // we give preference to nodes who have previously already joined the cluster. Those will // have a cluster state in memory, including an up to date routing table (which is not persistent to disk // by the gateway) - - // nocommit - // TODO: this list may contain data and client nodes due to masterElectionFilter* settings. Should we remove that? - DiscoveryNode master = electMaster.electMaster(alreadyJoinedPossibleMasterNodes); + DiscoveryNode master = electMaster.electMaster(joinedOnceActiveNodes); if (master != null) { return master; } - return electMaster.electMaster(possibleMasterNodes); + return electMaster.electMaster(activeNodes); } else { // if we don't have enough master nodes, we bail, because there are not enough master to elect from - logger.trace("not enough master nodes [{}]", possibleMasterNodes); + logger.trace("not enough master nodes [{}]", activeNodes); return null; } } else { diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/PingContextProvider.java b/src/main/java/org/elasticsearch/discovery/zen/ping/PingContextProvider.java new file mode 100644 index 0000000000000..c967a95ec6048 --- /dev/null +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/PingContextProvider.java @@ -0,0 +1,32 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery.zen.ping; + +import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; + +/** + * + */ +public interface PingContextProvider extends DiscoveryNodesProvider { + + /** return true if this node is joining the cluster for the first time */ + boolean isFirstClusterJoin(); + +} diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPing.java b/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPing.java index 836904f6c3545..8179d2b1a0e2d 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPing.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPing.java @@ -28,7 +28,6 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; import java.io.IOException; @@ -40,7 +39,7 @@ */ public interface ZenPing extends LifecycleComponent { - void setNodesProvider(DiscoveryNodesProvider nodesProvider); + void setPingContextProvider(PingContextProvider contextProvider); void ping(PingListener listener, TimeValue timeout) throws ElasticsearchException; @@ -50,7 +49,7 @@ public interface PingListener { } public static class PingResponse implements Streamable { - + public static final PingResponse[] EMPTY = new PingResponse[0]; private ClusterName clusterName; diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java b/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java index b3ab31bb115e9..aee4990a6850a 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java @@ -23,7 +23,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.Version; -import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; @@ -33,7 +33,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.ping.multicast.MulticastZenPing; import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider; @@ -55,21 +54,21 @@ public class ZenPingService extends AbstractLifecycleComponent implemen private volatile ImmutableList zenPings = ImmutableList.of(); // here for backward comp. with discovery plugins - public ZenPingService(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, NetworkService networkService, + public ZenPingService(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, NetworkService networkService, ElectMasterService electMasterService, @Nullable Set unicastHostsProviders) { - this(settings, threadPool, transportService, clusterService, networkService, Version.CURRENT, electMasterService, unicastHostsProviders); + this(settings, threadPool, transportService, clusterName, networkService, Version.CURRENT, electMasterService, unicastHostsProviders); } @Inject - public ZenPingService(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, NetworkService networkService, + public ZenPingService(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, NetworkService networkService, Version version, ElectMasterService electMasterService, @Nullable Set unicastHostsProviders) { super(settings); ImmutableList.Builder zenPingsBuilder = ImmutableList.builder(); if (componentSettings.getAsBoolean("multicast.enabled", true)) { - zenPingsBuilder.add(new MulticastZenPing(settings, threadPool, transportService, clusterService, networkService, version)); + zenPingsBuilder.add(new MulticastZenPing(settings, threadPool, transportService, clusterName, networkService, version)); } // always add the unicast hosts, so it will be able to receive unicast requests even when working in multicast - zenPingsBuilder.add(new UnicastZenPing(settings, threadPool, transportService, clusterService, version, electMasterService, unicastHostsProviders)); + zenPingsBuilder.add(new UnicastZenPing(settings, threadPool, transportService, clusterName, version, electMasterService, unicastHostsProviders)); this.zenPings = zenPingsBuilder.build(); } @@ -92,12 +91,12 @@ public void zenPings(ImmutableList pings) { } @Override - public void setNodesProvider(DiscoveryNodesProvider nodesProvider) { + public void setPingContextProvider(PingContextProvider contextProvider) { if (lifecycle.started()) { throw new ElasticsearchIllegalStateException("Can't set nodes provider when started"); } for (ZenPing zenPing : zenPings) { - zenPing.setNodesProvider(nodesProvider); + zenPing.setPingContextProvider(contextProvider); } } diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java b/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java index 36d6a47df5980..c2f7faf333392 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java @@ -25,8 +25,6 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.bytes.BytesArray; @@ -42,7 +40,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; +import org.elasticsearch.discovery.zen.ping.PingContextProvider; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; @@ -76,10 +74,10 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem private final ThreadPool threadPool; private final TransportService transportService; - private final ClusterService clusterService; + private final ClusterName clusterName; private final NetworkService networkService; private final Version version; - private volatile DiscoveryNodesProvider nodesProvider; + private volatile PingContextProvider contextProvider; private final boolean pingEnabled; @@ -88,15 +86,15 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem private final AtomicInteger pingIdGenerator = new AtomicInteger(); private final Map> receivedResponses = newConcurrentMap(); - public MulticastZenPing(ThreadPool threadPool, TransportService transportService, ClusterService clusterService, Version version) { - this(EMPTY_SETTINGS, threadPool, transportService, clusterService, new NetworkService(EMPTY_SETTINGS), version); + public MulticastZenPing(ThreadPool threadPool, TransportService transportService, ClusterName clusterName, Version version) { + this(EMPTY_SETTINGS, threadPool, transportService, clusterName, new NetworkService(EMPTY_SETTINGS), version); } - public MulticastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, NetworkService networkService, Version version) { + public MulticastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, NetworkService networkService, Version version) { super(settings); this.threadPool = threadPool; this.transportService = transportService; - this.clusterService = clusterService; + this.clusterName = clusterName; this.networkService = networkService; this.version = version; @@ -114,11 +112,11 @@ public MulticastZenPing(Settings settings, ThreadPool threadPool, TransportServi } @Override - public void setNodesProvider(DiscoveryNodesProvider nodesProvider) { + public void setPingContextProvider(PingContextProvider nodesProvider) { if (lifecycle.started()) { throw new ElasticsearchIllegalStateException("Can't set nodes provider when started"); } - this.nodesProvider = nodesProvider; + this.contextProvider = nodesProvider; } @Override @@ -217,8 +215,8 @@ private void sendPingRequest(int id) { out.writeBytes(INTERNAL_HEADER); Version.writeVersion(version, out); out.writeInt(id); - clusterService.state().getClusterName().writeTo(out); - nodesProvider.nodes().localNode().writeTo(out); + clusterName.writeTo(out); + contextProvider.nodes().localNode().writeTo(out); out.close(); multicastChannel.send(bStream.bytes()); if (logger.isTraceEnabled()) { @@ -363,16 +361,15 @@ private void handleExternalPingRequest(Map externalPingData, XCo return; } - final String clusterName = request.containsKey("cluster_name") ? request.get("cluster_name").toString() : request.containsKey("clusterName") ? request.get("clusterName").toString() : null; - final ClusterState state = clusterService.state(); - if (clusterName == null) { + final String requestClusterName = request.containsKey("cluster_name") ? request.get("cluster_name").toString() : request.containsKey("clusterName") ? request.get("clusterName").toString() : null; + if (requestClusterName == null) { logger.warn("malformed external ping request, missing 'cluster_name' element within request, from {}, content {}", remoteAddress, externalPingData); return; } - if (!clusterName.equals(state.getClusterName().value())) { + if (!requestClusterName.equals(clusterName.value())) { logger.trace("got request for cluster_name {}, but our cluster_name is {}, from {}, content {}", - clusterName, state.getClusterName().value(), remoteAddress, externalPingData); + requestClusterName, clusterName.value(), remoteAddress, externalPingData); return; } if (logger.isTraceEnabled()) { @@ -380,16 +377,16 @@ private void handleExternalPingRequest(Map externalPingData, XCo } try { - DiscoveryNode localNode = nodesProvider.nodes().localNode(); + DiscoveryNode localNode = contextProvider.nodes().localNode(); XContentBuilder builder = XContentFactory.contentBuilder(contentType); builder.startObject().startObject("response"); - builder.field("cluster_name", state.getClusterName().value()); + builder.field("cluster_name", clusterName.value()); builder.startObject("version").field("number", version.number()).field("snapshot_build", version.snapshot).endObject(); builder.field("transport_address", localNode.address().toString()); - if (nodesProvider.nodeService() != null) { - for (Map.Entry attr : nodesProvider.nodeService().attributes().entrySet()) { + if (contextProvider.nodeService() != null) { + for (Map.Entry attr : contextProvider.nodeService().attributes().entrySet()) { builder.field(attr.getKey(), attr.getValue()); } } @@ -410,33 +407,33 @@ private void handleExternalPingRequest(Map externalPingData, XCo } } - private void handleNodePingRequest(int id, DiscoveryNode requestingNodeX, ClusterName clusterName) { + private void handleNodePingRequest(int id, DiscoveryNode requestingNodeX, ClusterName requestClusterName) { if (!pingEnabled) { return; } - final DiscoveryNodes discoveryNodes = nodesProvider.nodes(); - final ClusterState state = clusterService.state(); + final DiscoveryNodes discoveryNodes = contextProvider.nodes(); final DiscoveryNode requestingNode = requestingNodeX; if (requestingNode.id().equals(discoveryNodes.localNodeId())) { // that's me, ignore return; } - if (!clusterName.equals(state.getClusterName())) { + if (!requestClusterName.equals(clusterName)) { if (logger.isTraceEnabled()) { - logger.trace("[{}] received ping_request from [{}], but wrong cluster_name [{}], expected [{}], ignoring", id, requestingNode, clusterName, state.getClusterName()); + logger.trace("[{}] received ping_request from [{}], but wrong cluster_name [{}], expected [{}], ignoring", + id, requestingNode, requestClusterName.value(), clusterName.value()); } return; } // don't connect between two client nodes, no need for that... if (!discoveryNodes.localNode().shouldConnectTo(requestingNode)) { if (logger.isTraceEnabled()) { - logger.trace("[{}] received ping_request from [{}], both are client nodes, ignoring", id, requestingNode, clusterName); + logger.trace("[{}] received ping_request from [{}], both are client nodes, ignoring", id, requestingNode, requestClusterName); } return; } final MulticastPingResponse multicastPingResponse = new MulticastPingResponse(); multicastPingResponse.id = id; - multicastPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode(), state.getClusterName(), state.version() <= 0); + multicastPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode(), clusterName, contextProvider.isFirstClusterJoin()); if (logger.isTraceEnabled()) { logger.trace("[{}] received ping_request from [{}], sending {}", id, requestingNode, multicastPingResponse.pingResponse); diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index 73937e0c334be..bd20e74087d92 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -25,8 +25,7 @@ import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.Version; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Nullable; @@ -39,8 +38,8 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; import org.elasticsearch.discovery.zen.elect.ElectMasterService; +import org.elasticsearch.discovery.zen.ping.PingContextProvider; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; @@ -67,14 +66,14 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen private final ThreadPool threadPool; private final TransportService transportService; - private final ClusterService clusterService; + private final ClusterName clusterName; private final ElectMasterService electMasterService; private final int concurrentConnects; private final DiscoveryNode[] configuredTargetNodes; - private volatile DiscoveryNodesProvider nodesProvider; + private volatile PingContextProvider contextProvider; private final AtomicInteger pingIdGenerator = new AtomicInteger(); @@ -85,12 +84,12 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen private final CopyOnWriteArrayList hostsProviders = new CopyOnWriteArrayList<>(); - public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, + public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, Version version, ElectMasterService electMasterService, @Nullable Set unicastHostsProviders) { super(settings); this.threadPool = threadPool; this.transportService = transportService; - this.clusterService = clusterService; + this.clusterName = clusterName; this.electMasterService = electMasterService; if (unicastHostsProviders != null) { @@ -148,8 +147,8 @@ public void removeHostsProvider(UnicastHostsProvider provider) { } @Override - public void setNodesProvider(DiscoveryNodesProvider nodesProvider) { - this.nodesProvider = nodesProvider; + public void setPingContextProvider(PingContextProvider contextProvider) { + this.contextProvider = contextProvider; } /** @@ -251,15 +250,14 @@ void sendPings(final TimeValue timeout, @Nullable TimeValue waitTime, final Send final UnicastPingRequest pingRequest = new UnicastPingRequest(); pingRequest.id = sendPingsHandler.id(); pingRequest.timeout = timeout; - DiscoveryNodes discoNodes = nodesProvider.nodes(); - ClusterState state = clusterService.state(); + DiscoveryNodes discoNodes = contextProvider.nodes(); pingRequest.pingResponse = createPingResponse(discoNodes); HashSet nodesToPingSet = new HashSet<>(); for (PingResponse temporalResponse : temporalResponses) { // Only send pings to nodes that have the same cluster name. - if (state.getClusterName().equals(temporalResponse.clusterName())) { + if (clusterName.equals(temporalResponse.clusterName())) { nodesToPingSet.add(temporalResponse.target()); } } @@ -374,13 +372,13 @@ public String executor() { public void handleResponse(UnicastPingResponse response) { logger.trace("[{}] received response from {}: {}", id, nodeToSend, Arrays.toString(response.pingResponses)); try { - DiscoveryNodes discoveryNodes = nodesProvider.nodes(); + DiscoveryNodes discoveryNodes = contextProvider.nodes(); for (PingResponse pingResponse : response.pingResponses) { if (pingResponse.target().id().equals(discoveryNodes.localNodeId())) { // that's us, ignore continue; } - if (!pingResponse.clusterName().equals(clusterService.state().getClusterName())) { + if (!pingResponse.clusterName().equals(clusterName)) { // not part of the cluster logger.debug("[{}] filtering out response from {}, not same cluster_name [{}]", id, pingResponse.target(), pingResponse.clusterName().value()); continue; @@ -433,7 +431,7 @@ public void run() { }); List pingResponses = newArrayList(temporalResponses); - pingResponses.add(createPingResponse(nodesProvider.nodes())); + pingResponses.add(createPingResponse(contextProvider.nodes())); UnicastPingResponse unicastPingResponse = new UnicastPingResponse(); @@ -490,8 +488,7 @@ public void writeTo(StreamOutput out) throws IOException { } private PingResponse createPingResponse(DiscoveryNodes discoNodes) { - ClusterState state = clusterService.state(); - return new PingResponse(discoNodes.localNode(), discoNodes.masterNode(), state.getClusterName(), state.version() <= 0); + return new PingResponse(discoNodes.localNode(), discoNodes.masterNode(), clusterName, contextProvider.isFirstClusterJoin()); } static class UnicastPingResponse extends TransportResponse { diff --git a/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesTests.java b/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesTests.java index 06e6f56e1ccf0..a63fb7f626f0f 100644 --- a/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesTests.java +++ b/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesTests.java @@ -201,8 +201,7 @@ public boolean apply(Object obj) { assertThat(state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(true)); logger.info("--> start two more nodes"); - internalCluster().startNode(settings); - internalCluster().startNode(settings); + internalCluster().startNodesAsync(2, settings).get(); ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("4").execute().actionGet(); assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); diff --git a/src/test/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPingTests.java b/src/test/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPingTests.java index 5fc283f10aa83..6a1382351dbd3 100644 --- a/src/test/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPingTests.java +++ b/src/test/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPingTests.java @@ -21,7 +21,6 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.logging.Loggers; @@ -30,11 +29,10 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; +import org.elasticsearch.discovery.zen.ping.PingContextProvider; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.node.service.NodeService; import org.elasticsearch.test.ElasticsearchTestCase; -import org.elasticsearch.test.cluster.NoopClusterService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.local.LocalTransport; @@ -67,16 +65,15 @@ public void testSimplePings() { settings = buildRandomMulticast(settings); ThreadPool threadPool = new ThreadPool("testSimplePings"); - final ClusterState initialJoinState = ClusterState.builder(new ClusterName("test")).build(); - final ClusterState alreadyJoinedState = ClusterState.builder(initialJoinState).version(initialJoinState.version() + 1).build(); + final ClusterName clusterName = new ClusterName("test"); final TransportService transportServiceA = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT), threadPool).start(); final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress(), Version.CURRENT); final TransportService transportServiceB = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT), threadPool).start(); final DiscoveryNode nodeB = new DiscoveryNode("B", transportServiceB.boundAddress().publishAddress(), Version.CURRENT); - MulticastZenPing zenPingA = new MulticastZenPing(threadPool, transportServiceA, new NoopClusterService().state(initialJoinState), Version.CURRENT); - zenPingA.setNodesProvider(new DiscoveryNodesProvider() { + MulticastZenPing zenPingA = new MulticastZenPing(threadPool, transportServiceA, clusterName, Version.CURRENT); + zenPingA.setPingContextProvider(new PingContextProvider() { @Override public DiscoveryNodes nodes() { return DiscoveryNodes.builder().put(nodeA).localNodeId("A").build(); @@ -86,11 +83,16 @@ public DiscoveryNodes nodes() { public NodeService nodeService() { return null; } + + @Override + public boolean isFirstClusterJoin() { + return true; + } }); zenPingA.start(); - MulticastZenPing zenPingB = new MulticastZenPing(threadPool, transportServiceB, new NoopClusterService().state(alreadyJoinedState), Version.CURRENT); - zenPingB.setNodesProvider(new DiscoveryNodesProvider() { + MulticastZenPing zenPingB = new MulticastZenPing(threadPool, transportServiceB, clusterName, Version.CURRENT); + zenPingB.setPingContextProvider(new PingContextProvider() { @Override public DiscoveryNodes nodes() { return DiscoveryNodes.builder().put(nodeB).localNodeId("B").build(); @@ -100,6 +102,11 @@ public DiscoveryNodes nodes() { public NodeService nodeService() { return null; } + + @Override + public boolean isFirstClusterJoin() { + return false; + } }); zenPingB.start(); @@ -131,12 +138,12 @@ public void testExternalPing() throws Exception { settings = buildRandomMulticast(settings); final ThreadPool threadPool = new ThreadPool("testExternalPing"); - final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).build(); + final ClusterName clusterName = new ClusterName("test"); final TransportService transportServiceA = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT), threadPool).start(); final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress(), Version.CURRENT); - MulticastZenPing zenPingA = new MulticastZenPing(threadPool, transportServiceA, new NoopClusterService().state(clusterState), Version.CURRENT); - zenPingA.setNodesProvider(new DiscoveryNodesProvider() { + MulticastZenPing zenPingA = new MulticastZenPing(threadPool, transportServiceA, clusterName, Version.CURRENT); + zenPingA.setPingContextProvider(new PingContextProvider() { @Override public DiscoveryNodes nodes() { return DiscoveryNodes.builder().put(nodeA).localNodeId("A").build(); @@ -146,6 +153,11 @@ public DiscoveryNodes nodes() { public NodeService nodeService() { return null; } + + @Override + public boolean isFirstClusterJoin() { + return false; + } }); zenPingA.start(); diff --git a/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java b/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java index 118ad0fe9da6e..e42f18bd02453 100644 --- a/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java +++ b/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java @@ -21,7 +21,6 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.network.NetworkService; @@ -30,12 +29,11 @@ import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; import org.elasticsearch.discovery.zen.elect.ElectMasterService; +import org.elasticsearch.discovery.zen.ping.PingContextProvider; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.node.service.NodeService; import org.elasticsearch.test.ElasticsearchTestCase; -import org.elasticsearch.test.cluster.NoopClusterService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.netty.NettyTransport; @@ -56,8 +54,7 @@ public void testSimplePings() { settings = ImmutableSettings.builder().put(settings).put("transport.tcp.port", startPort + "-" + endPort).build(); ThreadPool threadPool = new ThreadPool(getClass().getName()); - ClusterState initialJoinState = ClusterState.builder(new ClusterName("test")).build(); - ClusterState alreadyJoinedState = ClusterState.builder(initialJoinState).version(initialJoinState.version() + 1).build(); + ClusterName clusterName = new ClusterName("test"); NetworkService networkService = new NetworkService(settings); ElectMasterService electMasterService = new ElectMasterService(settings); @@ -78,9 +75,8 @@ public void testSimplePings() { addressB.address().getAddress().getHostAddress() + ":" + addressB.address().getPort()) .build(); - UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, transportServiceA, - new NoopClusterService().state(initialJoinState), Version.CURRENT, electMasterService, null); - zenPingA.setNodesProvider(new DiscoveryNodesProvider() { + UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, transportServiceA, clusterName, Version.CURRENT, electMasterService, null); + zenPingA.setPingContextProvider(new PingContextProvider() { @Override public DiscoveryNodes nodes() { return DiscoveryNodes.builder().put(nodeA).localNodeId("UZP_A").build(); @@ -90,12 +86,16 @@ public DiscoveryNodes nodes() { public NodeService nodeService() { return null; } + + @Override + public boolean isFirstClusterJoin() { + return true; + } }); zenPingA.start(); - UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, - transportServiceB, new NoopClusterService().state(alreadyJoinedState), Version.CURRENT, electMasterService, null); - zenPingB.setNodesProvider(new DiscoveryNodesProvider() { + UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, transportServiceB, clusterName, Version.CURRENT, electMasterService, null); + zenPingB.setPingContextProvider(new PingContextProvider() { @Override public DiscoveryNodes nodes() { return DiscoveryNodes.builder().put(nodeB).localNodeId("UZP_B").build(); @@ -105,6 +105,11 @@ public DiscoveryNodes nodes() { public NodeService nodeService() { return null; } + + @Override + public boolean isFirstClusterJoin() { + return false; + } }); zenPingB.start(); diff --git a/src/test/java/org/elasticsearch/test/cluster/NoopClusterService.java b/src/test/java/org/elasticsearch/test/cluster/NoopClusterService.java index 3df5cd0e7fa03..4821ac1f9d7bb 100644 --- a/src/test/java/org/elasticsearch/test/cluster/NoopClusterService.java +++ b/src/test/java/org/elasticsearch/test/cluster/NoopClusterService.java @@ -34,8 +34,6 @@ public class NoopClusterService implements ClusterService { - ClusterState state = null; - @Override public DiscoveryNode localNode() { return null; @@ -43,12 +41,7 @@ public DiscoveryNode localNode() { @Override public ClusterState state() { - return state; - } - - public NoopClusterService state(ClusterState state) { - this.state = state; - return this; + return null; } @Override