Skip to content

Commit

Permalink
move to a PingContextProvider implemented by ZenDiscovery
Browse files Browse the repository at this point in the history
  • Loading branch information
bleskes committed Sep 4, 2014
1 parent 1dd4ef9 commit 32083c5
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 111 deletions.
50 changes: 30 additions & 20 deletions src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.discovery.zen.fd.MasterFaultDetection;
import org.elasticsearch.discovery.zen.fd.NodesFaultDetection;
import org.elasticsearch.discovery.zen.membership.MembershipAction;
import org.elasticsearch.discovery.zen.ping.PingContextProvider;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.discovery.zen.ping.ZenPingService;
import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;
Expand All @@ -76,7 +77,7 @@
/**
*
*/
public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implements Discovery, DiscoveryNodesProvider {
public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implements Discovery, PingContextProvider {

public final static String SETTING_REJOIN_ON_MASTER_GONE = "discovery.zen.rejoin_on_master_gone";
public final static String SETTING_PING_TIMEOUT = "discovery.zen.ping.timeout";
Expand Down Expand Up @@ -139,6 +140,9 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen

private volatile boolean rejoinOnMasterGone;

// will be set to true upon the first successful cluster join
private final AtomicBoolean hasJoinedClusterOnce = new AtomicBoolean();

@Nullable
private NodeService nodeService;

Expand Down Expand Up @@ -194,7 +198,7 @@ public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threa
this.nodesFD.addListener(new NodeFaultDetectionListener());

this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener(), discoverySettings, clusterName);
this.pingService.setNodesProvider(this);
this.pingService.setPingContextProvider(this);
this.membership = new MembershipAction(settings, clusterService, transportService, this, new MembershipListener());

transportService.registerHandler(DISCOVERY_REJOIN_ACTION_NAME, new RejoinClusterRequestHandler());
Expand Down Expand Up @@ -290,6 +294,7 @@ public String nodeDescription() {
return clusterName.value() + "/" + localNode.id();
}

/** start of {@link org.elasticsearch.discovery.zen.ping.PingContextProvider } implementation */
@Override
public DiscoveryNodes nodes() {
DiscoveryNodes latestNodes = this.latestDiscoNodes;
Expand All @@ -305,6 +310,14 @@ public NodeService nodeService() {
return this.nodeService;
}

@Override
public boolean isFirstClusterJoin() {
return !hasJoinedClusterOnce.get();
}

/** end of {@link org.elasticsearch.discovery.zen.ping.PingContextProvider } implementation */


@Override
public void publish(ClusterState clusterState, AckListener ackListener) {
if (!master) {
Expand Down Expand Up @@ -387,6 +400,7 @@ public void onFailure(String source, Throwable t) {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
sendInitialStateEventIfNeeded();
hasJoinedClusterOnce.set(true);
}
});
} else {
Expand All @@ -404,8 +418,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}

masterFD.start(masterNode, "initial_join");
// no need to submit the received cluster state, we will get it from the master when it publishes
// the fact that we joined
hasJoinedClusterOnce.set(true);
}
}
}
Expand Down Expand Up @@ -963,39 +976,36 @@ private DiscoveryNode findMaster() {
}
}

Set<DiscoveryNode> possibleMasterNodes = Sets.newHashSet();
// master nodes who has previously been part of the cluster and do not ping for the very first time
Set<DiscoveryNode> alreadyJoinedPossibleMasterNodes = Sets.newHashSet();
// nodes discovered during pinging
Set<DiscoveryNode> activeNodes = Sets.newHashSet();
// nodes discovered who has previously been part of the cluster and do not ping for the very first time
Set<DiscoveryNode> joinedOnceActiveNodes = Sets.newHashSet();
if (localNode.masterNode()) {
possibleMasterNodes.add(localNode);
if (clusterService.state().version() > 0) {
alreadyJoinedPossibleMasterNodes.add(localNode);
activeNodes.add(localNode);
if (hasJoinedClusterOnce.get()) {
joinedOnceActiveNodes.add(localNode);
}
}
for (ZenPing.PingResponse pingResponse : pingResponses) {
possibleMasterNodes.add(pingResponse.target());
activeNodes.add(pingResponse.target());
if (!pingResponse.initialJoin()) {
alreadyJoinedPossibleMasterNodes.add(pingResponse.target());
joinedOnceActiveNodes.add(pingResponse.target());
}
}

if (pingMasters.isEmpty()) {
// if we don't have enough master nodes, we bail, because there are not enough master to elect from
if (electMaster.hasEnoughMasterNodes(possibleMasterNodes)) {
if (electMaster.hasEnoughMasterNodes(activeNodes)) {
// we give preference to nodes who have previously already joined the cluster. Those will
// have a cluster state in memory, including an up to date routing table (which is not persistent to disk
// by the gateway)

// nocommit
// TODO: this list may contain data and client nodes due to masterElectionFilter* settings. Should we remove that?
DiscoveryNode master = electMaster.electMaster(alreadyJoinedPossibleMasterNodes);
DiscoveryNode master = electMaster.electMaster(joinedOnceActiveNodes);
if (master != null) {
return master;
}
return electMaster.electMaster(possibleMasterNodes);
return electMaster.electMaster(activeNodes);
} else {
// if we don't have enough master nodes, we bail, because there are not enough master to elect from
logger.trace("not enough master nodes [{}]", possibleMasterNodes);
logger.trace("not enough master nodes [{}]", activeNodes);
return null;
}
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.discovery.zen.ping;

import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;

/**
*
*/
public interface PingContextProvider extends DiscoveryNodesProvider {

/** return true if this node is joining the cluster for the first time */
boolean isFirstClusterJoin();

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;

import java.io.IOException;

Expand All @@ -40,7 +39,7 @@
*/
public interface ZenPing extends LifecycleComponent<ZenPing> {

void setNodesProvider(DiscoveryNodesProvider nodesProvider);
void setPingContextProvider(PingContextProvider contextProvider);

void ping(PingListener listener, TimeValue timeout) throws ElasticsearchException;

Expand All @@ -50,7 +49,7 @@ public interface PingListener {
}

public static class PingResponse implements Streamable {

public static final PingResponse[] EMPTY = new PingResponse[0];

private ClusterName clusterName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
Expand All @@ -33,7 +33,6 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.ping.multicast.MulticastZenPing;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
Expand All @@ -55,21 +54,21 @@ public class ZenPingService extends AbstractLifecycleComponent<ZenPing> implemen
private volatile ImmutableList<? extends ZenPing> zenPings = ImmutableList.of();

// here for backward comp. with discovery plugins
public ZenPingService(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, NetworkService networkService,
public ZenPingService(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, NetworkService networkService,
ElectMasterService electMasterService, @Nullable Set<UnicastHostsProvider> unicastHostsProviders) {
this(settings, threadPool, transportService, clusterService, networkService, Version.CURRENT, electMasterService, unicastHostsProviders);
this(settings, threadPool, transportService, clusterName, networkService, Version.CURRENT, electMasterService, unicastHostsProviders);
}

@Inject
public ZenPingService(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, NetworkService networkService,
public ZenPingService(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, NetworkService networkService,
Version version, ElectMasterService electMasterService, @Nullable Set<UnicastHostsProvider> unicastHostsProviders) {
super(settings);
ImmutableList.Builder<ZenPing> zenPingsBuilder = ImmutableList.builder();
if (componentSettings.getAsBoolean("multicast.enabled", true)) {
zenPingsBuilder.add(new MulticastZenPing(settings, threadPool, transportService, clusterService, networkService, version));
zenPingsBuilder.add(new MulticastZenPing(settings, threadPool, transportService, clusterName, networkService, version));
}
// always add the unicast hosts, so it will be able to receive unicast requests even when working in multicast
zenPingsBuilder.add(new UnicastZenPing(settings, threadPool, transportService, clusterService, version, electMasterService, unicastHostsProviders));
zenPingsBuilder.add(new UnicastZenPing(settings, threadPool, transportService, clusterName, version, electMasterService, unicastHostsProviders));

this.zenPings = zenPingsBuilder.build();
}
Expand All @@ -92,12 +91,12 @@ public void zenPings(ImmutableList<? extends ZenPing> pings) {
}

@Override
public void setNodesProvider(DiscoveryNodesProvider nodesProvider) {
public void setPingContextProvider(PingContextProvider contextProvider) {
if (lifecycle.started()) {
throw new ElasticsearchIllegalStateException("Can't set nodes provider when started");
}
for (ZenPing zenPing : zenPings) {
zenPing.setNodesProvider(nodesProvider);
zenPing.setPingContextProvider(contextProvider);
}
}

Expand Down
Loading

0 comments on commit 32083c5

Please sign in to comment.