From 45a8acd3d04664311bf8a83780035f342c126c49 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 29 Aug 2014 16:46:53 +0200 Subject: [PATCH] [Cluster] Refactored ClusterStateUpdateTask protection against execution on a non master Previous implementation used a marker interface and had no explicit failure call back for the case update task was run on a non master (i.e., the master stepped down after it was submitted). That lead to a couple of instance of checks. This approach moves ClusterStateUpdateTask from an interface to an abstract class, which allows adding a flag to indicate whether it should only run on master nodes (defaults to true). It also adds an explicit onNoLongerMaster call back to allow different error handling for that case. This also removed the need for the NoLongerMaster. --- .../TransportClusterUpdateSettingsAction.java | 15 ++-- .../action/bench/BenchmarkService.java | 42 ++++++----- .../cluster/AckedClusterStateUpdateTask.java | 4 +- .../elasticsearch/cluster/ClusterService.java | 11 --- .../ClusterStateNonMasterUpdateTask.java | 7 +- .../cluster/ClusterStateUpdateTask.java | 24 ++++++- ...cessedClusterStateNonMasterUpdateTask.java | 9 ++- .../ProcessedClusterStateUpdateTask.java | 4 +- .../TimeoutClusterStateUpdateTask.java | 4 +- .../cluster/routing/RoutingService.java | 7 +- .../service/InternalClusterService.java | 4 +- .../discovery/zen/ZenDiscovery.java | 70 ++++++++++++------- .../cluster/ClusterServiceTests.java | 4 +- 13 files changed, 124 insertions(+), 81 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java index a94b322ceb376..edcf833464073 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java @@ -137,16 +137,17 @@ protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) { return new ClusterUpdateSettingsResponse(updateSettingsAcked && acknowledged, transientUpdates.build(), persistentUpdates.build()); } + @Override + public void onNoLongerMaster(String source) { + logger.debug("failed to preform reroute after cluster settings were updated - current node is no longer a master"); + listener.onResponse(new ClusterUpdateSettingsResponse(updateSettingsAcked, transientUpdates.build(), persistentUpdates.build())); + } + @Override public void onFailure(String source, Throwable t) { //if the reroute fails we only log - if (t instanceof ClusterService.NoLongerMasterException) { - logger.debug("failed to preform reroute after cluster settings were updated - current node is no longer a master"); - listener.onResponse(new ClusterUpdateSettingsResponse(updateSettingsAcked, transientUpdates.build(), persistentUpdates.build())); - } else { - logger.debug("failed to perform [{}]", t, source); - listener.onFailure(new ElasticsearchException("reroute after update settings failed", t)); - } + logger.debug("failed to perform [{}]", t, source); + listener.onFailure(new ElasticsearchException("reroute after update settings failed", t)); } @Override diff --git a/src/main/java/org/elasticsearch/action/bench/BenchmarkService.java b/src/main/java/org/elasticsearch/action/bench/BenchmarkService.java index 0ebfd47593e0a..5868aa12b5a28 100644 --- a/src/main/java/org/elasticsearch/action/bench/BenchmarkService.java +++ b/src/main/java/org/elasticsearch/action/bench/BenchmarkService.java @@ -66,11 +66,11 @@ public class BenchmarkService extends AbstractLifecycleComponent listener) { @@ -171,8 +174,8 @@ public void onFailure(Throwable t) { /** * Executes benchmarks on the cluster * - * @param request Benchmark request - * @param listener Response listener + * @param request Benchmark request + * @param listener Response listener */ public void startBenchmark(final BenchmarkRequest request, final ActionListener listener) { @@ -228,7 +231,7 @@ public void onFailure(Throwable t) { listener.onFailure(t); } }, (benchmarkResponse.state() != BenchmarkResponse.State.ABORTED) && - (benchmarkResponse.state() != BenchmarkResponse.State.FAILED))); + (benchmarkResponse.state() != BenchmarkResponse.State.FAILED))); } private final boolean isBenchmarkNode(DiscoveryNode node) { @@ -403,6 +406,7 @@ protected CountDownAsyncHandler(int size) { } public abstract T newInstance(); + protected abstract void sendResponse(); @Override @@ -593,7 +597,7 @@ public ClusterState execute(ClusterState currentState) { if (bmd != null) { for (BenchmarkMetaData.Entry entry : bmd.entries()) { - if (request.benchmarkName().equals(entry.benchmarkId())){ + if (request.benchmarkName().equals(entry.benchmarkId())) { if (entry.state() != BenchmarkMetaData.State.SUCCESS && entry.state() != BenchmarkMetaData.State.FAILED) { throw new ElasticsearchException("A benchmark with ID [" + request.benchmarkName() + "] is already running in state [" + entry.state() + "]"); } @@ -648,7 +652,7 @@ public FinishBenchmarkTask(String reason, String benchmarkId, BenchmarkStateList @Override protected BenchmarkMetaData.Entry process(BenchmarkMetaData.Entry entry) { BenchmarkMetaData.State state = entry.state(); - assert state == BenchmarkMetaData.State.STARTED || state == BenchmarkMetaData.State.ABORTED : "Expected state: STARTED or ABORTED but was: " + entry.state(); + assert state == BenchmarkMetaData.State.STARTED || state == BenchmarkMetaData.State.ABORTED : "Expected state: STARTED or ABORTED but was: " + entry.state(); if (success) { return new BenchmarkMetaData.Entry(entry, BenchmarkMetaData.State.SUCCESS); } else { @@ -661,7 +665,7 @@ public final class AbortBenchmarkTask extends UpdateBenchmarkStateTask { private final String[] patterns; public AbortBenchmarkTask(String[] patterns, BenchmarkStateListener listener) { - super("abort_benchmark", null , listener); + super("abort_benchmark", null, listener); this.patterns = patterns; } @@ -675,7 +679,7 @@ protected BenchmarkMetaData.Entry process(BenchmarkMetaData.Entry entry) { } } - public abstract class UpdateBenchmarkStateTask implements ProcessedClusterStateUpdateTask { + public abstract class UpdateBenchmarkStateTask extends ProcessedClusterStateUpdateTask { private final String reason; protected final String benchmarkId; @@ -702,7 +706,7 @@ public ClusterState execute(ClusterState currentState) { ImmutableList.Builder builder = new ImmutableList.Builder(); for (BenchmarkMetaData.Entry e : bmd.entries()) { if (benchmarkId == null || match(e)) { - e = process(e) ; + e = process(e); instances.add(e); } // Don't keep finished benchmarks around in cluster state @@ -741,7 +745,7 @@ public String reason() { } } - public abstract class BenchmarkStateChangeAction implements TimeoutClusterStateUpdateTask { + public abstract class BenchmarkStateChangeAction extends TimeoutClusterStateUpdateTask { protected final R request; public BenchmarkStateChangeAction(R request) { diff --git a/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java b/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java index 7cdee75387382..087bd1c6ad68a 100644 --- a/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java +++ b/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java @@ -28,7 +28,7 @@ * An extension interface to {@link ClusterStateUpdateTask} that allows to be notified when * all the nodes have acknowledged a cluster state update request */ -public abstract class AckedClusterStateUpdateTask implements TimeoutClusterStateUpdateTask { +public abstract class AckedClusterStateUpdateTask extends TimeoutClusterStateUpdateTask { private final ActionListener listener; private final AckedRequest request; @@ -40,6 +40,7 @@ protected AckedClusterStateUpdateTask(AckedRequest request, ActionListener { */ List pendingTasks(); - /** - * an exception to indicate a {@link org.elasticsearch.cluster.ClusterStateUpdateTask} was not executed as - * the current node is no longer master - */ - public static class NoLongerMasterException extends ElasticsearchIllegalStateException { - - public NoLongerMasterException(String msg) { - super(msg); - } - - } } diff --git a/src/main/java/org/elasticsearch/cluster/ClusterStateNonMasterUpdateTask.java b/src/main/java/org/elasticsearch/cluster/ClusterStateNonMasterUpdateTask.java index 2fac718ae2de2..48afbb8f1fe8f 100644 --- a/src/main/java/org/elasticsearch/cluster/ClusterStateNonMasterUpdateTask.java +++ b/src/main/java/org/elasticsearch/cluster/ClusterStateNonMasterUpdateTask.java @@ -23,5 +23,10 @@ * This is a marker interface to indicate that the task should be executed * even if the current node is not a master. */ -public interface ClusterStateNonMasterUpdateTask extends ClusterStateUpdateTask { +public abstract class ClusterStateNonMasterUpdateTask extends ClusterStateUpdateTask { + + @Override + public boolean runOnlyOnMaster() { + return false; + } } diff --git a/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java b/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java index 490a556ab1264..921b6d149ee41 100644 --- a/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java +++ b/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java @@ -19,19 +19,37 @@ package org.elasticsearch.cluster; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; + /** * A task that can update the cluster state. */ -public interface ClusterStateUpdateTask { +abstract public class ClusterStateUpdateTask { /** * Update the cluster state based on the current state. Return the *same instance* if no state * should be changed. */ - ClusterState execute(ClusterState currentState) throws Exception; + abstract public ClusterState execute(ClusterState currentState) throws Exception; /** * A callback called when execute fails. */ - void onFailure(String source, Throwable t); + abstract public void onFailure(String source, @Nullable Throwable t); + + + /** + * indicates whether this task should only run if current node is master + */ + public boolean runOnlyOnMaster() { + return true; + } + + /** + * called when the task was rejected because the local node is no longer master + */ + public void onNoLongerMaster(String source) { + onFailure(source, new EsRejectedExecutionException("no longer master. source: [" + source + "]")); + } } diff --git a/src/main/java/org/elasticsearch/cluster/ProcessedClusterStateNonMasterUpdateTask.java b/src/main/java/org/elasticsearch/cluster/ProcessedClusterStateNonMasterUpdateTask.java index e46a2edc79245..4af05b43581e0 100644 --- a/src/main/java/org/elasticsearch/cluster/ProcessedClusterStateNonMasterUpdateTask.java +++ b/src/main/java/org/elasticsearch/cluster/ProcessedClusterStateNonMasterUpdateTask.java @@ -19,8 +19,13 @@ package org.elasticsearch.cluster; /** - * A combination interface between {@link org.elasticsearch.cluster.ProcessedClusterStateUpdateTask} and + * A combination between {@link org.elasticsearch.cluster.ProcessedClusterStateUpdateTask} and * {@link org.elasticsearch.cluster.ClusterStateNonMasterUpdateTask} to allow easy creation of anonymous classes */ -public interface ProcessedClusterStateNonMasterUpdateTask extends ProcessedClusterStateUpdateTask, ClusterStateNonMasterUpdateTask { +abstract public class ProcessedClusterStateNonMasterUpdateTask extends ProcessedClusterStateUpdateTask { + + @Override + public boolean runOnlyOnMaster() { + return false; + } } diff --git a/src/main/java/org/elasticsearch/cluster/ProcessedClusterStateUpdateTask.java b/src/main/java/org/elasticsearch/cluster/ProcessedClusterStateUpdateTask.java index 72074965f95ed..58e0237dce99f 100644 --- a/src/main/java/org/elasticsearch/cluster/ProcessedClusterStateUpdateTask.java +++ b/src/main/java/org/elasticsearch/cluster/ProcessedClusterStateUpdateTask.java @@ -23,11 +23,11 @@ * An extension interface to {@link ClusterStateUpdateTask} that allows to be notified when * the cluster state update has been processed. */ -public interface ProcessedClusterStateUpdateTask extends ClusterStateUpdateTask { +public abstract class ProcessedClusterStateUpdateTask extends ClusterStateUpdateTask { /** * Called when the result of the {@link #execute(ClusterState)} have been processed * properly by all listeners. */ - void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState); + abstract public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState); } diff --git a/src/main/java/org/elasticsearch/cluster/TimeoutClusterStateUpdateTask.java b/src/main/java/org/elasticsearch/cluster/TimeoutClusterStateUpdateTask.java index 1083e1ddcbeda..1ae767c6560ae 100644 --- a/src/main/java/org/elasticsearch/cluster/TimeoutClusterStateUpdateTask.java +++ b/src/main/java/org/elasticsearch/cluster/TimeoutClusterStateUpdateTask.java @@ -25,11 +25,11 @@ * An extension interface to {@link org.elasticsearch.cluster.ClusterStateUpdateTask} that allows to associate * a timeout. */ -public interface TimeoutClusterStateUpdateTask extends ProcessedClusterStateUpdateTask { +abstract public class TimeoutClusterStateUpdateTask extends ProcessedClusterStateUpdateTask { /** * If the cluster state update task wasn't processed by the provided timeout, call * {@link #onFailure(String, Throwable)} */ - TimeValue timeout(); + abstract public TimeValue timeout(); } diff --git a/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java b/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java index 828244494a976..555b8b3ef1b48 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java +++ b/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java @@ -149,12 +149,15 @@ public ClusterState execute(ClusterState currentState) { return ClusterState.builder(currentState).routingResult(routingResult).build(); } + @Override + public void onNoLongerMaster(String source) { + // no biggie + } + @Override public void onFailure(String source, Throwable t) { - if (!(t instanceof ClusterService.NoLongerMasterException)) { ClusterState state = clusterService.state(); logger.error("unexpected failure during [{}], current state:\n{}", t, source, state.prettyPrint()); - } } }); routingTableDirty = false; diff --git a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index be4f8d26df79c..c5fe004f8b9b2 100644 --- a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -325,9 +325,9 @@ public void run() { } logger.debug("processing [{}]: execute", source); ClusterState previousClusterState = clusterState; - if (!previousClusterState.nodes().localNodeMaster() && !(updateTask instanceof ClusterStateNonMasterUpdateTask)) { + if (!previousClusterState.nodes().localNodeMaster() && updateTask.runOnlyOnMaster()) { logger.debug("failing [{}]: local node is no longer master", source); - updateTask.onFailure(source, new NoLongerMasterException("source: " + source)); + updateTask.onNoLongerMaster(source); return; } ClusterState newClusterState; diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index e03bac932e207..06328c4c6b14d 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -43,6 +43,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.discovery.DiscoverySettings; @@ -476,13 +477,14 @@ public ClusterState execute(ClusterState currentState) { return ClusterState.builder(currentState).routingResult(routingResult).build(); } + @Override + public void onNoLongerMaster(String source) { + // ignoring (already logged) + } + @Override public void onFailure(String source, Throwable t) { - if (t instanceof ClusterService.NoLongerMasterException) { - logger.debug("not processing {} leave request as we are no longer master", node); - } else { - logger.error("unexpected failure during [{}]", t, source); - } + logger.error("unexpected failure during [{}]", t, source); } }); } else { @@ -515,13 +517,14 @@ public ClusterState execute(ClusterState currentState) { return ClusterState.builder(currentState).routingResult(routingResult).build(); } + @Override + public void onNoLongerMaster(String source) { + // already logged + } + @Override public void onFailure(String source, Throwable t) { - if (t instanceof ClusterService.NoLongerMasterException) { - logger.debug("not processing [{}] as we are no longer master", source); - } else { - logger.error("unexpected failure during [{}]", t, source); - } + logger.error("unexpected failure during [{}]", t, source); } @Override @@ -552,13 +555,15 @@ public ClusterState execute(ClusterState currentState) { return currentState; } + + @Override + public void onNoLongerMaster(String source) { + // ignoring (already logged) + } + @Override public void onFailure(String source, Throwable t) { - if (t instanceof ClusterService.NoLongerMasterException) { - logger.debug("not processing [{}] as we are no longer master", source); - } else { - logger.error("unexpected failure during [{}]", t, source); - } + logger.error("unexpected failure during [{}]", t, source); } @Override @@ -870,17 +875,27 @@ public ClusterState execute(ClusterState currentState) { } @Override - public void onFailure(String source, Throwable t) { - if (t instanceof ClusterService.NoLongerMasterException) { - logger.debug("not processing [{}] as we are no longer master", source); - } else { - logger.error("unexpected failure during [{}]", t, source); - } + public void onNoLongerMaster(String source) { + Exception e = new EsRejectedExecutionException("no longer master. source: [" + source + "]"); + innerOnFailure(e); + } + + void innerOnFailure(Throwable t) { for (Tuple drainedTask : drainedTasks) { - drainedTask.v2().onFailure(t); + try { + drainedTask.v2().onFailure(t); + } catch (Exception e) { + logger.error("error during task failure", e); + } } } + @Override + public void onFailure(String source, Throwable t) { + logger.error("unexpected failure during [{}]", t, source); + innerOnFailure(t); + } + @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { for (Tuple drainedTask : drainedTasks) { @@ -1151,13 +1166,14 @@ public ClusterState execute(ClusterState currentState) { return rejoin(currentState, "received a request to rejoin the cluster from [" + request.fromNodeId + "]"); } + @Override + public void onNoLongerMaster(String source) { + // already logged + } + @Override public void onFailure(String source, Throwable t) { - if (t instanceof ClusterService.NoLongerMasterException) { - logger.debug("not processing [{}] as we are no longer master", source); - } else { - logger.error("unexpected failure during [{}]", t, source); - } + logger.error("unexpected failure during [{}]", t, source); } }); } diff --git a/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java b/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java index 52f00035c4ab8..1d0a20386159b 100644 --- a/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java +++ b/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java @@ -708,7 +708,7 @@ public void testPrioritizedTasks() throws Exception { } } - private static class BlockingTask implements ClusterStateUpdateTask { + private static class BlockingTask extends ClusterStateUpdateTask { private final CountDownLatch latch = new CountDownLatch(1); @Override @@ -727,7 +727,7 @@ public void release() { } - private static class PrioritiezedTask implements ClusterStateUpdateTask { + private static class PrioritiezedTask extends ClusterStateUpdateTask { private final Priority priority; private final CountDownLatch latch;