diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java index 70a4392a82ae7..5e6e11d4cb098 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java @@ -139,8 +139,13 @@ protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) { @Override public void onFailure(String source, Throwable t) { //if the reroute fails we only log - logger.debug("failed to perform [{}]", t, source); - listener.onFailure(new ElasticsearchException("reroute after update settings failed", t)); + if (t instanceof ClusterService.NoLongerMasterException) { + logger.debug("failed to preform reroute after cluster settings were updated - current node is no longer a master"); + listener.onResponse(new ClusterUpdateSettingsResponse(updateSettingsAcked, transientUpdates.build(), persistentUpdates.build())); + } else { + logger.debug("failed to perform [{}]", t, source); + listener.onFailure(new ElasticsearchException("reroute after update settings failed", t)); + } } @Override diff --git a/src/main/java/org/elasticsearch/cluster/ClusterService.java b/src/main/java/org/elasticsearch/cluster/ClusterService.java index 6204599f57d33..f032a0cd06454 100644 --- a/src/main/java/org/elasticsearch/cluster/ClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/ClusterService.java @@ -110,4 +110,16 @@ public interface ClusterService extends LifecycleComponent { * Returns the tasks that are pending. */ List pendingTasks(); + + /** + * an exception to indicate a {@link org.elasticsearch.cluster.ClusterStateUpdateTask} was not executed as + * the current node is no longer master + */ + public static class NoLongerMasterException extends ElasticsearchIllegalStateException { + + public NoLongerMasterException(String msg) { + super(msg); + } + + } } diff --git a/src/main/java/org/elasticsearch/cluster/ClusterStateNonMasterUpdateTask.java b/src/main/java/org/elasticsearch/cluster/ClusterStateNonMasterUpdateTask.java new file mode 100644 index 0000000000000..2fac718ae2de2 --- /dev/null +++ b/src/main/java/org/elasticsearch/cluster/ClusterStateNonMasterUpdateTask.java @@ -0,0 +1,27 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster; + +/** + * This is a marker interface to indicate that the task should be executed + * even if the current node is not a master. + */ +public interface ClusterStateNonMasterUpdateTask extends ClusterStateUpdateTask { +} diff --git a/src/main/java/org/elasticsearch/cluster/ProcessedClusterStateNonMasterUpdateTask.java b/src/main/java/org/elasticsearch/cluster/ProcessedClusterStateNonMasterUpdateTask.java new file mode 100644 index 0000000000000..e46a2edc79245 --- /dev/null +++ b/src/main/java/org/elasticsearch/cluster/ProcessedClusterStateNonMasterUpdateTask.java @@ -0,0 +1,26 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster; + +/** + * A combination interface between {@link org.elasticsearch.cluster.ProcessedClusterStateUpdateTask} and + * {@link org.elasticsearch.cluster.ClusterStateNonMasterUpdateTask} to allow easy creation of anonymous classes + */ +public interface ProcessedClusterStateNonMasterUpdateTask extends ProcessedClusterStateUpdateTask, ClusterStateNonMasterUpdateTask { +} diff --git a/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java b/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java index 059697eb21cfd..b545175895b96 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java +++ b/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java @@ -141,10 +141,6 @@ private void reroute() { clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE, Priority.HIGH, new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - // double check we are still master, this may have changed. - if (!currentState.nodes().localNodeMaster()) { - return currentState; - } RoutingAllocation.Result routingResult = allocationService.reroute(currentState); if (!routingResult.changed()) { diff --git a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index 7cb52df3b6b57..dbe0b4c7ad085 100644 --- a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -325,6 +325,11 @@ public void run() { } logger.debug("processing [{}]: execute", source); ClusterState previousClusterState = clusterState; + if (!previousClusterState.nodes().localNodeMaster() && !(updateTask instanceof ClusterStateNonMasterUpdateTask)) { + logger.debug("failing [{}]: local node is no longer master", source); + updateTask.onFailure(source, new NoLongerMasterException("source: " + source)); + return; + } ClusterState newClusterState; try { newClusterState = updateTask.execute(previousClusterState); @@ -722,5 +727,4 @@ public void onTimeout() { } } } - } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java b/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java index 1a6ffd3a66a2e..065f3b6e45fed 100644 --- a/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java @@ -123,7 +123,7 @@ protected void doStart() throws ElasticsearchException { // we are the first master (and the master) master = true; final LocalDiscovery master = firstMaster; - clusterService.submitStateUpdateTask("local-disco-initial_connect(master)", new ProcessedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("local-disco-initial_connect(master)", new ProcessedClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(); @@ -149,7 +149,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } else if (firstMaster != null) { // update as fast as we can the local node state with the new metadata (so we create indices for example) final ClusterState masterState = firstMaster.clusterService.state(); - clusterService.submitStateUpdateTask("local-disco(detected_master)", new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("local-disco(detected_master)", new ClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { // make sure we have the local node id set, we might need it as a result of the new metadata @@ -165,7 +165,7 @@ public void onFailure(String source, Throwable t) { // tell the master to send the fact that we are here final LocalDiscovery master = firstMaster; - firstMaster.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode + "])", new ProcessedClusterStateUpdateTask() { + firstMaster.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode + "])", new ProcessedClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(); @@ -225,7 +225,7 @@ protected void doStop() throws ElasticsearchException { } final LocalDiscovery master = firstMaster; - master.clusterService.submitStateUpdateTask("local-disco-update", new ClusterStateUpdateTask() { + master.clusterService.submitStateUpdateTask("local-disco-update", new ClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { DiscoveryNodes newNodes = currentState.nodes().removeDeadMembers(newMembers, master.localNode.id()); @@ -305,7 +305,7 @@ private void publish(LocalDiscovery[] members, ClusterState clusterState, final nodeSpecificClusterState.status(ClusterState.ClusterStateStatus.RECEIVED); // ignore cluster state messages that do not include "me", not in the game yet... if (nodeSpecificClusterState.nodes().localNode() != null) { - discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ProcessedClusterStateUpdateTask() { + discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ProcessedClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { if (nodeSpecificClusterState.version() < currentState.version() && Objects.equal(nodeSpecificClusterState.nodes().masterNodeId(), currentState.nodes().masterNodeId())) { diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 7babee60abe5f..36cb8ee85b78f 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -312,7 +312,7 @@ private void innerJoinCluster() { if (localNode.equals(masterNode)) { this.master = true; nodesFD.start(); // start the nodes FD - clusterService.submitStateUpdateTask("zen-disco-join (elected_as_master)", Priority.URGENT, new ProcessedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("zen-disco-join (elected_as_master)", Priority.URGENT, new ProcessedClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { // Take into account the previous known nodes, if they happen not to be available @@ -334,7 +334,7 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(String source, Throwable t) { - logger.error("unexpected failure during [{}]", t, source); + logger.error("unexpected failure during [{}]", t, source); } @Override @@ -398,7 +398,12 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(String source, Throwable t) { - logger.error("unexpected failure during [{}]", t, source); + if (t instanceof ClusterService.NoLongerMasterException) { + logger.debug("not processing {} leave request as we are no longer master", node); + } + else { + logger.error("unexpected failure during [{}]", t, source); + } } }); } else { @@ -433,7 +438,12 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(String source, Throwable t) { - logger.error("unexpected failure during [{}]", t, source); + if (t instanceof ClusterService.NoLongerMasterException) { + logger.debug("not processing [{}] as we are no longer master", source); + } + else { + logger.error("unexpected failure during [{}]", t, source); + } } @Override @@ -466,7 +476,12 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(String source, Throwable t) { - logger.error("unexpected failure during [{}]", t, source); + if (t instanceof ClusterService.NoLongerMasterException) { + logger.debug("not processing [{}] as we are no longer master", source); + } + else { + logger.error("unexpected failure during [{}]", t, source); + } } @Override @@ -488,7 +503,7 @@ private void handleMasterGone(final DiscoveryNode masterNode, final String reaso logger.info("master_left [{}], reason [{}]", masterNode, reason); - clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", Priority.IMMEDIATE, new ProcessedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", Priority.IMMEDIATE, new ProcessedClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { if (!masterNode.id().equals(currentState.nodes().masterNodeId())) { @@ -622,7 +637,7 @@ public void onFailure(String source, Throwable t) { final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState, newStateProcessed); processNewClusterStates.add(processClusterState); - clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newClusterState.nodes().masterNode() + "])", Priority.URGENT, new ProcessedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newClusterState.nodes().masterNode() + "])", Priority.URGENT, new ProcessedClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { // we already processed it in a previous event @@ -962,7 +977,7 @@ public RejoinClusterRequest newInstance() { @Override public void messageReceived(final RejoinClusterRequest request, final TransportChannel channel) throws Exception { - clusterService.submitStateUpdateTask("received a request to rejoin the cluster from [" + request.fromNodeId + "]", Priority.URGENT, new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("received a request to rejoin the cluster from [" + request.fromNodeId + "]", Priority.URGENT, new ClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { try { @@ -975,7 +990,12 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(String source, Throwable t) { - logger.error("unexpected failure during [{}]", t, source); + if (t instanceof ClusterService.NoLongerMasterException) { + logger.debug("not processing [{}] as we are no longer master", source); + } + else { + logger.error("unexpected failure during [{}]", t, source); + } } }); } diff --git a/src/main/java/org/elasticsearch/tribe/TribeService.java b/src/main/java/org/elasticsearch/tribe/TribeService.java index 0894edccd8c2c..9c1607900a78c 100644 --- a/src/main/java/org/elasticsearch/tribe/TribeService.java +++ b/src/main/java/org/elasticsearch/tribe/TribeService.java @@ -43,7 +43,6 @@ import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.node.NodeBuilder; @@ -223,7 +222,7 @@ class TribeClusterStateListener implements ClusterStateListener { @Override public void clusterChanged(final ClusterChangedEvent event) { logger.debug("[{}] received cluster event, [{}]", tribeName, event.source()); - clusterService.submitStateUpdateTask("cluster event from " + tribeName + ", " + event.source(), new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("cluster event from " + tribeName + ", " + event.source(), new ClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { ClusterState tribeState = event.state(); diff --git a/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java b/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java index dde9eedc4e1ed..1286c62d1668a 100644 --- a/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java +++ b/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster; import com.google.common.base.Predicate; +import com.google.common.util.concurrent.ListenableFuture; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse; @@ -256,6 +257,58 @@ public void onFailure(String source, Throwable t) { assertThat(processedLatch.await(1, TimeUnit.SECONDS), equalTo(true)); } + @Test + public void testMasterAwareExecution() throws Exception { + Settings settings = settingsBuilder() + .put("discovery.type", "local") + .build(); + + ListenableFuture master = cluster().startNodeAsync(settings); + ListenableFuture nonMaster = cluster().startNodeAsync(settingsBuilder().put(settings).put("node.master", false).build()); + master.get(); + ensureGreen(); // make sure we have a cluster + + ClusterService clusterService = cluster().getInstance(ClusterService.class, nonMaster.get()); + + final boolean[] taskFailed = {false}; + final CountDownLatch latch1 = new CountDownLatch(1); + clusterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + latch1.countDown(); + return currentState; + } + + @Override + public void onFailure(String source, Throwable t) { + taskFailed[0] = true; + latch1.countDown(); + } + }); + + latch1.await(); + assertTrue("cluster state update task was executed on a non-master", taskFailed[0]); + + taskFailed[0] = true; + final CountDownLatch latch2 = new CountDownLatch(1); + clusterService.submitStateUpdateTask("test", new ClusterStateNonMasterUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + taskFailed[0] = false; + latch2.countDown(); + return currentState; + } + + @Override + public void onFailure(String source, Throwable t) { + taskFailed[0] = true; + latch2.countDown(); + } + }); + latch2.await(); + assertFalse("non-master cluster state update task was not executed", taskFailed[0]); + } + @Test public void testAckedUpdateTaskNoAckExpected() throws Exception { Settings settings = settingsBuilder()