Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored ClusterStateUpdateTask protection against execution on a non master #7511

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,17 @@ protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) {
return new ClusterUpdateSettingsResponse(updateSettingsAcked && acknowledged, transientUpdates.build(), persistentUpdates.build());
}

@Override
public void onNoLongerMaster(String source) {
logger.debug("failed to preform reroute after cluster settings were updated - current node is no longer a master");
listener.onResponse(new ClusterUpdateSettingsResponse(updateSettingsAcked, transientUpdates.build(), persistentUpdates.build()));
}

@Override
public void onFailure(String source, Throwable t) {
//if the reroute fails we only log
if (t instanceof ClusterService.NoLongerMasterException) {
logger.debug("failed to preform reroute after cluster settings were updated - current node is no longer a master");
listener.onResponse(new ClusterUpdateSettingsResponse(updateSettingsAcked, transientUpdates.build(), persistentUpdates.build()));
} else {
logger.debug("failed to perform [{}]", t, source);
listener.onFailure(new ElasticsearchException("reroute after update settings failed", t));
}
logger.debug("failed to perform [{}]", t, source);
listener.onFailure(new ElasticsearchException("reroute after update settings failed", t));
}

@Override
Expand Down
42 changes: 23 additions & 19 deletions src/main/java/org/elasticsearch/action/bench/BenchmarkService.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ public class BenchmarkService extends AbstractLifecycleComponent<BenchmarkServic
/**
* Constructs a service component for running benchmarks
*
* @param settings Settings
* @param clusterService Cluster service
* @param threadPool Thread pool
* @param client Client
* @param transportService Transport service
* @param settings Settings
* @param clusterService Cluster service
* @param threadPool Thread pool
* @param client Client
* @param transportService Transport service
*/
@Inject
public BenchmarkService(Settings settings, ClusterService clusterService, ThreadPool threadPool,
Expand All @@ -86,19 +86,22 @@ public BenchmarkService(Settings settings, ClusterService clusterService, Thread
}

@Override
protected void doStart() throws ElasticsearchException { }
protected void doStart() throws ElasticsearchException {
}

@Override
protected void doStop() throws ElasticsearchException { }
protected void doStop() throws ElasticsearchException {
}

@Override
protected void doClose() throws ElasticsearchException { }
protected void doClose() throws ElasticsearchException {
}

/**
* Lists actively running benchmarks on the cluster
*
* @param request Status request
* @param listener Response listener
* @param request Status request
* @param listener Response listener
*/
public void listBenchmarks(final BenchmarkStatusRequest request, final ActionListener<BenchmarkStatusResponse> listener) {

Expand Down Expand Up @@ -171,8 +174,8 @@ public void onFailure(Throwable t) {
/**
* Executes benchmarks on the cluster
*
* @param request Benchmark request
* @param listener Response listener
* @param request Benchmark request
* @param listener Response listener
*/
public void startBenchmark(final BenchmarkRequest request, final ActionListener<BenchmarkResponse> listener) {

Expand Down Expand Up @@ -228,7 +231,7 @@ public void onFailure(Throwable t) {
listener.onFailure(t);
}
}, (benchmarkResponse.state() != BenchmarkResponse.State.ABORTED) &&
(benchmarkResponse.state() != BenchmarkResponse.State.FAILED)));
(benchmarkResponse.state() != BenchmarkResponse.State.FAILED)));
}

private final boolean isBenchmarkNode(DiscoveryNode node) {
Expand Down Expand Up @@ -403,6 +406,7 @@ protected CountDownAsyncHandler(int size) {
}

public abstract T newInstance();

protected abstract void sendResponse();

@Override
Expand Down Expand Up @@ -593,7 +597,7 @@ public ClusterState execute(ClusterState currentState) {

if (bmd != null) {
for (BenchmarkMetaData.Entry entry : bmd.entries()) {
if (request.benchmarkName().equals(entry.benchmarkId())){
if (request.benchmarkName().equals(entry.benchmarkId())) {
if (entry.state() != BenchmarkMetaData.State.SUCCESS && entry.state() != BenchmarkMetaData.State.FAILED) {
throw new ElasticsearchException("A benchmark with ID [" + request.benchmarkName() + "] is already running in state [" + entry.state() + "]");
}
Expand Down Expand Up @@ -648,7 +652,7 @@ public FinishBenchmarkTask(String reason, String benchmarkId, BenchmarkStateList
@Override
protected BenchmarkMetaData.Entry process(BenchmarkMetaData.Entry entry) {
BenchmarkMetaData.State state = entry.state();
assert state == BenchmarkMetaData.State.STARTED || state == BenchmarkMetaData.State.ABORTED : "Expected state: STARTED or ABORTED but was: " + entry.state();
assert state == BenchmarkMetaData.State.STARTED || state == BenchmarkMetaData.State.ABORTED : "Expected state: STARTED or ABORTED but was: " + entry.state();
if (success) {
return new BenchmarkMetaData.Entry(entry, BenchmarkMetaData.State.SUCCESS);
} else {
Expand All @@ -661,7 +665,7 @@ public final class AbortBenchmarkTask extends UpdateBenchmarkStateTask {
private final String[] patterns;

public AbortBenchmarkTask(String[] patterns, BenchmarkStateListener listener) {
super("abort_benchmark", null , listener);
super("abort_benchmark", null, listener);
this.patterns = patterns;
}

Expand All @@ -675,7 +679,7 @@ protected BenchmarkMetaData.Entry process(BenchmarkMetaData.Entry entry) {
}
}

public abstract class UpdateBenchmarkStateTask implements ProcessedClusterStateUpdateTask {
public abstract class UpdateBenchmarkStateTask extends ProcessedClusterStateUpdateTask {

private final String reason;
protected final String benchmarkId;
Expand All @@ -702,7 +706,7 @@ public ClusterState execute(ClusterState currentState) {
ImmutableList.Builder<BenchmarkMetaData.Entry> builder = new ImmutableList.Builder<BenchmarkMetaData.Entry>();
for (BenchmarkMetaData.Entry e : bmd.entries()) {
if (benchmarkId == null || match(e)) {
e = process(e) ;
e = process(e);
instances.add(e);
}
// Don't keep finished benchmarks around in cluster state
Expand Down Expand Up @@ -741,7 +745,7 @@ public String reason() {
}
}

public abstract class BenchmarkStateChangeAction<R extends MasterNodeOperationRequest> implements TimeoutClusterStateUpdateTask {
public abstract class BenchmarkStateChangeAction<R extends MasterNodeOperationRequest> extends TimeoutClusterStateUpdateTask {
protected final R request;

public BenchmarkStateChangeAction(R request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* An extension interface to {@link ClusterStateUpdateTask} that allows to be notified when
* all the nodes have acknowledged a cluster state update request
*/
public abstract class AckedClusterStateUpdateTask<Response> implements TimeoutClusterStateUpdateTask {
public abstract class AckedClusterStateUpdateTask<Response> extends TimeoutClusterStateUpdateTask {

private final ActionListener<Response> listener;
private final AckedRequest request;
Expand All @@ -40,6 +40,7 @@ protected AckedClusterStateUpdateTask(AckedRequest request, ActionListener<Respo

/**
* Called to determine which nodes the acknowledgement is expected from
*
* @param discoveryNode a node
* @return true if the node is expected to send ack back, false otherwise
*/
Expand All @@ -50,6 +51,7 @@ public boolean mustAck(DiscoveryNode discoveryNode) {
/**
* Called once all the nodes have acknowledged the cluster state update request. Must be
* very lightweight execution, since it gets executed on the cluster service thread.
*
* @param t optional error that might have been thrown
*/
public void onAllNodesAcked(@Nullable Throwable t) {
Expand Down
11 changes: 0 additions & 11 deletions src/main/java/org/elasticsearch/cluster/ClusterService.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,4 @@ public interface ClusterService extends LifecycleComponent<ClusterService> {
*/
List<PendingClusterTask> pendingTasks();

/**
* an exception to indicate a {@link org.elasticsearch.cluster.ClusterStateUpdateTask} was not executed as
* the current node is no longer master
*/
public static class NoLongerMasterException extends ElasticsearchIllegalStateException {

public NoLongerMasterException(String msg) {
super(msg);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,10 @@
* This is a marker interface to indicate that the task should be executed
* even if the current node is not a master.
*/
public interface ClusterStateNonMasterUpdateTask extends ClusterStateUpdateTask {
public abstract class ClusterStateNonMasterUpdateTask extends ClusterStateUpdateTask {

@Override
public boolean runOnlyOnMaster() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,37 @@

package org.elasticsearch.cluster;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;

/**
* A task that can update the cluster state.
*/
public interface ClusterStateUpdateTask {
abstract public class ClusterStateUpdateTask {

/**
* Update the cluster state based on the current state. Return the *same instance* if no state
* should be changed.
*/
ClusterState execute(ClusterState currentState) throws Exception;
abstract public ClusterState execute(ClusterState currentState) throws Exception;

/**
* A callback called when execute fails.
*/
void onFailure(String source, Throwable t);
abstract public void onFailure(String source, @Nullable Throwable t);


/**
* indicates whether this task should only run if current node is master
*/
public boolean runOnlyOnMaster() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can make this more crisp maybe masterOnly() or isMasterOnly()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - I debated this and decided to opt for clarity at the price of verbosity - this things are complex enough. isMasterOnly or masterOnly raises to me the question of what does it mean for an updateTask. I'm fine with changing it if other people feel that masterOnly is more intuitive.

return true;
}

/**
* called when the task was rejected because the local node is no longer master
*/
public void onNoLongerMaster(String source) {
onFailure(source, new EsRejectedExecutionException("no longer master. source: [" + source + "]"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@
package org.elasticsearch.cluster;

/**
* A combination interface between {@link org.elasticsearch.cluster.ProcessedClusterStateUpdateTask} and
* A combination between {@link org.elasticsearch.cluster.ProcessedClusterStateUpdateTask} and
* {@link org.elasticsearch.cluster.ClusterStateNonMasterUpdateTask} to allow easy creation of anonymous classes
*/
public interface ProcessedClusterStateNonMasterUpdateTask extends ProcessedClusterStateUpdateTask, ClusterStateNonMasterUpdateTask {
abstract public class ProcessedClusterStateNonMasterUpdateTask extends ProcessedClusterStateUpdateTask {

@Override
public boolean runOnlyOnMaster() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
* An extension interface to {@link ClusterStateUpdateTask} that allows to be notified when
* the cluster state update has been processed.
*/
public interface ProcessedClusterStateUpdateTask extends ClusterStateUpdateTask {
public abstract class ProcessedClusterStateUpdateTask extends ClusterStateUpdateTask {

/**
* Called when the result of the {@link #execute(ClusterState)} have been processed
* properly by all listeners.
*/
void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState);
abstract public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we try to add visibility first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
* An extension interface to {@link org.elasticsearch.cluster.ClusterStateUpdateTask} that allows to associate
* a timeout.
*/
public interface TimeoutClusterStateUpdateTask extends ProcessedClusterStateUpdateTask {
abstract public class TimeoutClusterStateUpdateTask extends ProcessedClusterStateUpdateTask {

/**
* If the cluster state update task wasn't processed by the provided timeout, call
* {@link #onFailure(String, Throwable)}
*/
TimeValue timeout();
abstract public TimeValue timeout();
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,15 @@ public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState).routingResult(routingResult).build();
}

@Override
public void onNoLongerMaster(String source) {
// no biggie
}

@Override
public void onFailure(String source, Throwable t) {
if (!(t instanceof ClusterService.NoLongerMasterException)) {
ClusterState state = clusterService.state();
logger.error("unexpected failure during [{}], current state:\n{}", t, source, state.prettyPrint());
}
}
});
routingTableDirty = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,9 +325,9 @@ public void run() {
}
logger.debug("processing [{}]: execute", source);
ClusterState previousClusterState = clusterState;
if (!previousClusterState.nodes().localNodeMaster() && !(updateTask instanceof ClusterStateNonMasterUpdateTask)) {
if (!previousClusterState.nodes().localNodeMaster() && updateTask.runOnlyOnMaster()) {
logger.debug("failing [{}]: local node is no longer master", source);
updateTask.onFailure(source, new NoLongerMasterException("source: " + source));
updateTask.onNoLongerMaster(source);
return;
}
ClusterState newClusterState;
Expand Down
Loading