Skip to content

Commit

Permalink
Do not execute cluster state changes if current node is no longer master
Browse files Browse the repository at this point in the history
When a node steps down from being a master (because, for example, min_master_node is breached), it may still have
cluster state update tasks queued up. Most (but not all) are tasks that should no longer be executed as the node
no longer has authority to do so. Other cluster states updates, like electing the current node as master, should be
executed even if the current node is no longer master.

This commit make sure that, by default, `ClusterStateUpdateTask` is not executed if the node is no longer master. Tasks
that should run on non masters are changed to implement a new interface called `ClusterStateNonMasterUpdateTask`

Closes #6230
  • Loading branch information
bleskes committed Jul 17, 2014
1 parent a7a9315 commit 8d1ad47
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,13 @@ protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) {
@Override
public void onFailure(String source, Throwable t) {
//if the reroute fails we only log
logger.debug("failed to perform [{}]", t, source);
listener.onFailure(new ElasticsearchException("reroute after update settings failed", t));
if (t instanceof ClusterService.NoLongerMasterException) {
logger.debug("failed to preform reroute after cluster settings were updated - current node is no longer a master");
listener.onResponse(new ClusterUpdateSettingsResponse(updateSettingsAcked, transientUpdates.build(), persistentUpdates.build()));
} else {
logger.debug("failed to perform [{}]", t, source);
listener.onFailure(new ElasticsearchException("reroute after update settings failed", t));
}
}

@Override
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/org/elasticsearch/cluster/ClusterService.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,16 @@ public interface ClusterService extends LifecycleComponent<ClusterService> {
* Returns the tasks that are pending.
*/
List<PendingClusterTask> pendingTasks();

/**
* an exception to indicate a {@link org.elasticsearch.cluster.ClusterStateUpdateTask} was not executed as
* the current node is no longer master
*/
public static class NoLongerMasterException extends ElasticsearchIllegalStateException {

public NoLongerMasterException(String msg) {
super(msg);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster;

/**
* This is a marker interface to indicate that the task should be executed
* even if the current node is not a master.
*/
public interface ClusterStateNonMasterUpdateTask extends ClusterStateUpdateTask {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;

/**
* A combination interface between {@link org.elasticsearch.cluster.ProcessedClusterStateUpdateTask} and
* {@link org.elasticsearch.cluster.ClusterStateNonMasterUpdateTask} to allow easy creation of anonymous classes
*/
public interface ProcessedClusterStateNonMasterUpdateTask extends ProcessedClusterStateUpdateTask, ClusterStateNonMasterUpdateTask {
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,6 @@ private void reroute() {
clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE, Priority.HIGH, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
// double check we are still master, this may have changed.
if (!currentState.nodes().localNodeMaster()) {
return currentState;
}

RoutingAllocation.Result routingResult = allocationService.reroute(currentState);
if (!routingResult.changed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,11 @@ public void run() {
}
logger.debug("processing [{}]: execute", source);
ClusterState previousClusterState = clusterState;
if (!previousClusterState.nodes().localNodeMaster() && !(updateTask instanceof ClusterStateNonMasterUpdateTask)) {
logger.debug("failing [{}]: local node is no longer master", source);
updateTask.onFailure(source, new NoLongerMasterException("source: " + source));
return;
}
ClusterState newClusterState;
try {
newClusterState = updateTask.execute(previousClusterState);
Expand Down Expand Up @@ -722,5 +727,4 @@ public void onTimeout() {
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ protected void doStart() throws ElasticsearchException {
// we are the first master (and the master)
master = true;
final LocalDiscovery master = firstMaster;
clusterService.submitStateUpdateTask("local-disco-initial_connect(master)", new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("local-disco-initial_connect(master)", new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
Expand All @@ -149,7 +149,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
} else if (firstMaster != null) {
// update as fast as we can the local node state with the new metadata (so we create indices for example)
final ClusterState masterState = firstMaster.clusterService.state();
clusterService.submitStateUpdateTask("local-disco(detected_master)", new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("local-disco(detected_master)", new ClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
// make sure we have the local node id set, we might need it as a result of the new metadata
Expand All @@ -165,7 +165,7 @@ public void onFailure(String source, Throwable t) {

// tell the master to send the fact that we are here
final LocalDiscovery master = firstMaster;
firstMaster.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode + "])", new ProcessedClusterStateUpdateTask() {
firstMaster.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode + "])", new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
Expand Down Expand Up @@ -225,7 +225,7 @@ protected void doStop() throws ElasticsearchException {
}

final LocalDiscovery master = firstMaster;
master.clusterService.submitStateUpdateTask("local-disco-update", new ClusterStateUpdateTask() {
master.clusterService.submitStateUpdateTask("local-disco-update", new ClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes newNodes = currentState.nodes().removeDeadMembers(newMembers, master.localNode.id());
Expand Down Expand Up @@ -305,7 +305,7 @@ private void publish(LocalDiscovery[] members, ClusterState clusterState, final
nodeSpecificClusterState.status(ClusterState.ClusterStateStatus.RECEIVED);
// ignore cluster state messages that do not include "me", not in the game yet...
if (nodeSpecificClusterState.nodes().localNode() != null) {
discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ProcessedClusterStateUpdateTask() {
discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (nodeSpecificClusterState.version() < currentState.version() && Objects.equal(nodeSpecificClusterState.nodes().masterNodeId(), currentState.nodes().masterNodeId())) {
Expand Down
38 changes: 29 additions & 9 deletions src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ private void innerJoinCluster() {
if (localNode.equals(masterNode)) {
this.master = true;
nodesFD.start(); // start the nodes FD
clusterService.submitStateUpdateTask("zen-disco-join (elected_as_master)", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-join (elected_as_master)", Priority.URGENT, new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
// Take into account the previous known nodes, if they happen not to be available
Expand All @@ -334,7 +334,7 @@ public ClusterState execute(ClusterState currentState) {

@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
logger.error("unexpected failure during [{}]", t, source);
}

@Override
Expand Down Expand Up @@ -398,7 +398,12 @@ public ClusterState execute(ClusterState currentState) {

@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
if (t instanceof ClusterService.NoLongerMasterException) {
logger.debug("not processing {} leave request as we are no longer master", node);
}
else {
logger.error("unexpected failure during [{}]", t, source);
}
}
});
} else {
Expand Down Expand Up @@ -433,7 +438,12 @@ public ClusterState execute(ClusterState currentState) {

@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
if (t instanceof ClusterService.NoLongerMasterException) {
logger.debug("not processing [{}] as we are no longer master", source);
}
else {
logger.error("unexpected failure during [{}]", t, source);
}
}

@Override
Expand Down Expand Up @@ -466,7 +476,12 @@ public ClusterState execute(ClusterState currentState) {

@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
if (t instanceof ClusterService.NoLongerMasterException) {
logger.debug("not processing [{}] as we are no longer master", source);
}
else {
logger.error("unexpected failure during [{}]", t, source);
}
}

@Override
Expand All @@ -488,7 +503,7 @@ private void handleMasterGone(final DiscoveryNode masterNode, final String reaso

logger.info("master_left [{}], reason [{}]", masterNode, reason);

clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", Priority.IMMEDIATE, new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", Priority.IMMEDIATE, new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (!masterNode.id().equals(currentState.nodes().masterNodeId())) {
Expand Down Expand Up @@ -622,7 +637,7 @@ public void onFailure(String source, Throwable t) {
final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState, newStateProcessed);
processNewClusterStates.add(processClusterState);

clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newClusterState.nodes().masterNode() + "])", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newClusterState.nodes().masterNode() + "])", Priority.URGENT, new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
// we already processed it in a previous event
Expand Down Expand Up @@ -962,7 +977,7 @@ public RejoinClusterRequest newInstance() {

@Override
public void messageReceived(final RejoinClusterRequest request, final TransportChannel channel) throws Exception {
clusterService.submitStateUpdateTask("received a request to rejoin the cluster from [" + request.fromNodeId + "]", Priority.URGENT, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("received a request to rejoin the cluster from [" + request.fromNodeId + "]", Priority.URGENT, new ClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
try {
Expand All @@ -975,7 +990,12 @@ public ClusterState execute(ClusterState currentState) {

@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
if (t instanceof ClusterService.NoLongerMasterException) {
logger.debug("not processing [{}] as we are no longer master", source);
}
else {
logger.error("unexpected failure during [{}]", t, source);
}
}
});
}
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/org/elasticsearch/tribe/TribeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.node.NodeBuilder;
Expand Down Expand Up @@ -223,7 +222,7 @@ class TribeClusterStateListener implements ClusterStateListener {
@Override
public void clusterChanged(final ClusterChangedEvent event) {
logger.debug("[{}] received cluster event, [{}]", tribeName, event.source());
clusterService.submitStateUpdateTask("cluster event from " + tribeName + ", " + event.source(), new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("cluster event from " + tribeName + ", " + event.source(), new ClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
ClusterState tribeState = event.state();
Expand Down
53 changes: 53 additions & 0 deletions src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.cluster;

import com.google.common.base.Predicate;
import com.google.common.util.concurrent.ListenableFuture;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
Expand Down Expand Up @@ -256,6 +257,58 @@ public void onFailure(String source, Throwable t) {
assertThat(processedLatch.await(1, TimeUnit.SECONDS), equalTo(true));
}

@Test
public void testMasterAwareExecution() throws Exception {
Settings settings = settingsBuilder()
.put("discovery.type", "local")
.build();

ListenableFuture<String> master = cluster().startNodeAsync(settings);
ListenableFuture<String> nonMaster = cluster().startNodeAsync(settingsBuilder().put(settings).put("node.master", false).build());
master.get();
ensureGreen(); // make sure we have a cluster

ClusterService clusterService = cluster().getInstance(ClusterService.class, nonMaster.get());

final boolean[] taskFailed = {false};
final CountDownLatch latch1 = new CountDownLatch(1);
clusterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
latch1.countDown();
return currentState;
}

@Override
public void onFailure(String source, Throwable t) {
taskFailed[0] = true;
latch1.countDown();
}
});

latch1.await();
assertTrue("cluster state update task was executed on a non-master", taskFailed[0]);

taskFailed[0] = true;
final CountDownLatch latch2 = new CountDownLatch(1);
clusterService.submitStateUpdateTask("test", new ClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
taskFailed[0] = false;
latch2.countDown();
return currentState;
}

@Override
public void onFailure(String source, Throwable t) {
taskFailed[0] = true;
latch2.countDown();
}
});
latch2.await();
assertFalse("non-master cluster state update task was not executed", taskFailed[0]);
}

@Test
public void testAckedUpdateTaskNoAckExpected() throws Exception {
Settings settings = settingsBuilder()
Expand Down

0 comments on commit 8d1ad47

Please sign in to comment.