From 8d1ad47e1663f6035c101b7965826849f1232660 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 16 May 2014 22:09:39 +0200 Subject: [PATCH] Do not execute cluster state changes if current node is no longer master When a node steps down from being a master (because, for example, min_master_node is breached), it may still have cluster state update tasks queued up. Most (but not all) are tasks that should no longer be executed as the node no longer has authority to do so. Other cluster states updates, like electing the current node as master, should be executed even if the current node is no longer master. This commit make sure that, by default, `ClusterStateUpdateTask` is not executed if the node is no longer master. Tasks that should run on non masters are changed to implement a new interface called `ClusterStateNonMasterUpdateTask` Closes #6230 --- .../TransportClusterUpdateSettingsAction.java | 9 +++- .../elasticsearch/cluster/ClusterService.java | 12 +++++ .../ClusterStateNonMasterUpdateTask.java | 27 ++++++++++ ...cessedClusterStateNonMasterUpdateTask.java | 26 +++++++++ .../cluster/routing/RoutingService.java | 4 -- .../service/InternalClusterService.java | 6 ++- .../discovery/local/LocalDiscovery.java | 10 ++-- .../discovery/zen/ZenDiscovery.java | 38 +++++++++---- .../org/elasticsearch/tribe/TribeService.java | 3 +- .../cluster/ClusterServiceTests.java | 53 +++++++++++++++++++ 10 files changed, 165 insertions(+), 23 deletions(-) create mode 100644 src/main/java/org/elasticsearch/cluster/ClusterStateNonMasterUpdateTask.java create mode 100644 src/main/java/org/elasticsearch/cluster/ProcessedClusterStateNonMasterUpdateTask.java diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java index 70a4392a82ae7..5e6e11d4cb098 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java @@ -139,8 +139,13 @@ protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) { @Override public void onFailure(String source, Throwable t) { //if the reroute fails we only log - logger.debug("failed to perform [{}]", t, source); - listener.onFailure(new ElasticsearchException("reroute after update settings failed", t)); + if (t instanceof ClusterService.NoLongerMasterException) { + logger.debug("failed to preform reroute after cluster settings were updated - current node is no longer a master"); + listener.onResponse(new ClusterUpdateSettingsResponse(updateSettingsAcked, transientUpdates.build(), persistentUpdates.build())); + } else { + logger.debug("failed to perform [{}]", t, source); + listener.onFailure(new ElasticsearchException("reroute after update settings failed", t)); + } } @Override diff --git a/src/main/java/org/elasticsearch/cluster/ClusterService.java b/src/main/java/org/elasticsearch/cluster/ClusterService.java index 6204599f57d33..f032a0cd06454 100644 --- a/src/main/java/org/elasticsearch/cluster/ClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/ClusterService.java @@ -110,4 +110,16 @@ public interface ClusterService extends LifecycleComponent { * Returns the tasks that are pending. */ List pendingTasks(); + + /** + * an exception to indicate a {@link org.elasticsearch.cluster.ClusterStateUpdateTask} was not executed as + * the current node is no longer master + */ + public static class NoLongerMasterException extends ElasticsearchIllegalStateException { + + public NoLongerMasterException(String msg) { + super(msg); + } + + } } diff --git a/src/main/java/org/elasticsearch/cluster/ClusterStateNonMasterUpdateTask.java b/src/main/java/org/elasticsearch/cluster/ClusterStateNonMasterUpdateTask.java new file mode 100644 index 0000000000000..2fac718ae2de2 --- /dev/null +++ b/src/main/java/org/elasticsearch/cluster/ClusterStateNonMasterUpdateTask.java @@ -0,0 +1,27 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster; + +/** + * This is a marker interface to indicate that the task should be executed + * even if the current node is not a master. + */ +public interface ClusterStateNonMasterUpdateTask extends ClusterStateUpdateTask { +} diff --git a/src/main/java/org/elasticsearch/cluster/ProcessedClusterStateNonMasterUpdateTask.java b/src/main/java/org/elasticsearch/cluster/ProcessedClusterStateNonMasterUpdateTask.java new file mode 100644 index 0000000000000..e46a2edc79245 --- /dev/null +++ b/src/main/java/org/elasticsearch/cluster/ProcessedClusterStateNonMasterUpdateTask.java @@ -0,0 +1,26 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster; + +/** + * A combination interface between {@link org.elasticsearch.cluster.ProcessedClusterStateUpdateTask} and + * {@link org.elasticsearch.cluster.ClusterStateNonMasterUpdateTask} to allow easy creation of anonymous classes + */ +public interface ProcessedClusterStateNonMasterUpdateTask extends ProcessedClusterStateUpdateTask, ClusterStateNonMasterUpdateTask { +} diff --git a/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java b/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java index 059697eb21cfd..b545175895b96 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java +++ b/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java @@ -141,10 +141,6 @@ private void reroute() { clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE, Priority.HIGH, new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - // double check we are still master, this may have changed. - if (!currentState.nodes().localNodeMaster()) { - return currentState; - } RoutingAllocation.Result routingResult = allocationService.reroute(currentState); if (!routingResult.changed()) { diff --git a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index 7cb52df3b6b57..dbe0b4c7ad085 100644 --- a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -325,6 +325,11 @@ public void run() { } logger.debug("processing [{}]: execute", source); ClusterState previousClusterState = clusterState; + if (!previousClusterState.nodes().localNodeMaster() && !(updateTask instanceof ClusterStateNonMasterUpdateTask)) { + logger.debug("failing [{}]: local node is no longer master", source); + updateTask.onFailure(source, new NoLongerMasterException("source: " + source)); + return; + } ClusterState newClusterState; try { newClusterState = updateTask.execute(previousClusterState); @@ -722,5 +727,4 @@ public void onTimeout() { } } } - } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java b/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java index 1a6ffd3a66a2e..065f3b6e45fed 100644 --- a/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java @@ -123,7 +123,7 @@ protected void doStart() throws ElasticsearchException { // we are the first master (and the master) master = true; final LocalDiscovery master = firstMaster; - clusterService.submitStateUpdateTask("local-disco-initial_connect(master)", new ProcessedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("local-disco-initial_connect(master)", new ProcessedClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(); @@ -149,7 +149,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } else if (firstMaster != null) { // update as fast as we can the local node state with the new metadata (so we create indices for example) final ClusterState masterState = firstMaster.clusterService.state(); - clusterService.submitStateUpdateTask("local-disco(detected_master)", new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("local-disco(detected_master)", new ClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { // make sure we have the local node id set, we might need it as a result of the new metadata @@ -165,7 +165,7 @@ public void onFailure(String source, Throwable t) { // tell the master to send the fact that we are here final LocalDiscovery master = firstMaster; - firstMaster.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode + "])", new ProcessedClusterStateUpdateTask() { + firstMaster.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode + "])", new ProcessedClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(); @@ -225,7 +225,7 @@ protected void doStop() throws ElasticsearchException { } final LocalDiscovery master = firstMaster; - master.clusterService.submitStateUpdateTask("local-disco-update", new ClusterStateUpdateTask() { + master.clusterService.submitStateUpdateTask("local-disco-update", new ClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { DiscoveryNodes newNodes = currentState.nodes().removeDeadMembers(newMembers, master.localNode.id()); @@ -305,7 +305,7 @@ private void publish(LocalDiscovery[] members, ClusterState clusterState, final nodeSpecificClusterState.status(ClusterState.ClusterStateStatus.RECEIVED); // ignore cluster state messages that do not include "me", not in the game yet... if (nodeSpecificClusterState.nodes().localNode() != null) { - discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ProcessedClusterStateUpdateTask() { + discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ProcessedClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { if (nodeSpecificClusterState.version() < currentState.version() && Objects.equal(nodeSpecificClusterState.nodes().masterNodeId(), currentState.nodes().masterNodeId())) { diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 7babee60abe5f..36cb8ee85b78f 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -312,7 +312,7 @@ private void innerJoinCluster() { if (localNode.equals(masterNode)) { this.master = true; nodesFD.start(); // start the nodes FD - clusterService.submitStateUpdateTask("zen-disco-join (elected_as_master)", Priority.URGENT, new ProcessedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("zen-disco-join (elected_as_master)", Priority.URGENT, new ProcessedClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { // Take into account the previous known nodes, if they happen not to be available @@ -334,7 +334,7 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(String source, Throwable t) { - logger.error("unexpected failure during [{}]", t, source); + logger.error("unexpected failure during [{}]", t, source); } @Override @@ -398,7 +398,12 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(String source, Throwable t) { - logger.error("unexpected failure during [{}]", t, source); + if (t instanceof ClusterService.NoLongerMasterException) { + logger.debug("not processing {} leave request as we are no longer master", node); + } + else { + logger.error("unexpected failure during [{}]", t, source); + } } }); } else { @@ -433,7 +438,12 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(String source, Throwable t) { - logger.error("unexpected failure during [{}]", t, source); + if (t instanceof ClusterService.NoLongerMasterException) { + logger.debug("not processing [{}] as we are no longer master", source); + } + else { + logger.error("unexpected failure during [{}]", t, source); + } } @Override @@ -466,7 +476,12 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(String source, Throwable t) { - logger.error("unexpected failure during [{}]", t, source); + if (t instanceof ClusterService.NoLongerMasterException) { + logger.debug("not processing [{}] as we are no longer master", source); + } + else { + logger.error("unexpected failure during [{}]", t, source); + } } @Override @@ -488,7 +503,7 @@ private void handleMasterGone(final DiscoveryNode masterNode, final String reaso logger.info("master_left [{}], reason [{}]", masterNode, reason); - clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", Priority.IMMEDIATE, new ProcessedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", Priority.IMMEDIATE, new ProcessedClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { if (!masterNode.id().equals(currentState.nodes().masterNodeId())) { @@ -622,7 +637,7 @@ public void onFailure(String source, Throwable t) { final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState, newStateProcessed); processNewClusterStates.add(processClusterState); - clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newClusterState.nodes().masterNode() + "])", Priority.URGENT, new ProcessedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newClusterState.nodes().masterNode() + "])", Priority.URGENT, new ProcessedClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { // we already processed it in a previous event @@ -962,7 +977,7 @@ public RejoinClusterRequest newInstance() { @Override public void messageReceived(final RejoinClusterRequest request, final TransportChannel channel) throws Exception { - clusterService.submitStateUpdateTask("received a request to rejoin the cluster from [" + request.fromNodeId + "]", Priority.URGENT, new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("received a request to rejoin the cluster from [" + request.fromNodeId + "]", Priority.URGENT, new ClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { try { @@ -975,7 +990,12 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(String source, Throwable t) { - logger.error("unexpected failure during [{}]", t, source); + if (t instanceof ClusterService.NoLongerMasterException) { + logger.debug("not processing [{}] as we are no longer master", source); + } + else { + logger.error("unexpected failure during [{}]", t, source); + } } }); } diff --git a/src/main/java/org/elasticsearch/tribe/TribeService.java b/src/main/java/org/elasticsearch/tribe/TribeService.java index 0894edccd8c2c..9c1607900a78c 100644 --- a/src/main/java/org/elasticsearch/tribe/TribeService.java +++ b/src/main/java/org/elasticsearch/tribe/TribeService.java @@ -43,7 +43,6 @@ import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.node.NodeBuilder; @@ -223,7 +222,7 @@ class TribeClusterStateListener implements ClusterStateListener { @Override public void clusterChanged(final ClusterChangedEvent event) { logger.debug("[{}] received cluster event, [{}]", tribeName, event.source()); - clusterService.submitStateUpdateTask("cluster event from " + tribeName + ", " + event.source(), new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("cluster event from " + tribeName + ", " + event.source(), new ClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { ClusterState tribeState = event.state(); diff --git a/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java b/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java index dde9eedc4e1ed..1286c62d1668a 100644 --- a/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java +++ b/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster; import com.google.common.base.Predicate; +import com.google.common.util.concurrent.ListenableFuture; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse; @@ -256,6 +257,58 @@ public void onFailure(String source, Throwable t) { assertThat(processedLatch.await(1, TimeUnit.SECONDS), equalTo(true)); } + @Test + public void testMasterAwareExecution() throws Exception { + Settings settings = settingsBuilder() + .put("discovery.type", "local") + .build(); + + ListenableFuture master = cluster().startNodeAsync(settings); + ListenableFuture nonMaster = cluster().startNodeAsync(settingsBuilder().put(settings).put("node.master", false).build()); + master.get(); + ensureGreen(); // make sure we have a cluster + + ClusterService clusterService = cluster().getInstance(ClusterService.class, nonMaster.get()); + + final boolean[] taskFailed = {false}; + final CountDownLatch latch1 = new CountDownLatch(1); + clusterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + latch1.countDown(); + return currentState; + } + + @Override + public void onFailure(String source, Throwable t) { + taskFailed[0] = true; + latch1.countDown(); + } + }); + + latch1.await(); + assertTrue("cluster state update task was executed on a non-master", taskFailed[0]); + + taskFailed[0] = true; + final CountDownLatch latch2 = new CountDownLatch(1); + clusterService.submitStateUpdateTask("test", new ClusterStateNonMasterUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + taskFailed[0] = false; + latch2.countDown(); + return currentState; + } + + @Override + public void onFailure(String source, Throwable t) { + taskFailed[0] = true; + latch2.countDown(); + } + }); + latch2.await(); + assertFalse("non-master cluster state update task was not executed", taskFailed[0]); + } + @Test public void testAckedUpdateTaskNoAckExpected() throws Exception { Settings settings = settingsBuilder()