Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Watcher add stopped listener #43939

Merged
merged 20 commits into from
Aug 16, 2019
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.watcher;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
Expand Down Expand Up @@ -35,6 +37,7 @@

public class WatcherLifeCycleService implements ClusterStateListener {

private static final Logger logger = LogManager.getLogger(WatcherLifeCycleService.class);
private final AtomicReference<WatcherState> state = new AtomicReference<>(WatcherState.STARTED);
private final AtomicReference<List<ShardRouting>> previousShardRoutings = new AtomicReference<>(Collections.emptyList());
private volatile boolean shutDown = false; // indicates that the node has been shutdown and we should never start watcher after this.
Expand All @@ -57,7 +60,7 @@ synchronized void shutDown() {
this.state.set(WatcherState.STOPPING);
shutDown = true;
clearAllocationIds();
watcherService.shutDown();
watcherService.shutDown(() -> {});
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martijnvg pointed out that we may want to push the STOPPED state to inside the callback. That sounds right, but will need to ensure that delaying the STOPPED state won't cause issues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has been implemented in c54c7da

this.state.set(WatcherState.STOPPED);
}

Expand Down Expand Up @@ -88,9 +91,11 @@ public void clusterChanged(ClusterChangedEvent event) {
}

boolean isWatcherStoppedManually = isWatcherStoppedManually(event.state());
boolean isStoppedOrStopping = this.state.get() == WatcherState.STOPPED ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.state.get() could change between those two calls?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks! addressed in eab50a0

this.state.get() == WatcherState.STOPPING;
// if this is not a data node, we need to start it ourselves possibly
if (event.state().nodes().getLocalNode().isDataNode() == false &&
isWatcherStoppedManually == false && this.state.get() == WatcherState.STOPPED) {
isWatcherStoppedManually == false && isStoppedOrStopping) {
this.state.set(WatcherState.STARTING);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we only be able to set the state to STARTING if the current state is STOPPED?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to stay passive with the old behavior. If we kept this to only STOPPED with this change, it would mean you can not restart watcher while any watches are currently inflight.

The pre-existing design allows you "stop" and restart it immediately and re-process the same watch, any inflight watches will finish up. If I didn't allow STOPPING here you would have to wait for all inflight watches to finish up and if one were were stuck (for what ever reason) could cause the in-ability to ever reach a fully STOPPED state.

watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we guard against going into STARTED state from a state other than STARTING?
(this.state.compareAndSet(WatcherState.STARTING, WatcherState.STARTED);)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to avoid changes to the START* states/behavior. This change should only impact the STOPPED state for a manual (e.g. via the API) request to stop.

return;
Expand All @@ -99,8 +104,18 @@ public void clusterChanged(ClusterChangedEvent event) {
if (isWatcherStoppedManually) {
if (this.state.get() == WatcherState.STARTED) {
clearAllocationIds();
watcherService.stop("watcher manually marked to shutdown by cluster state update");
this.state.set(WatcherState.STOPPED);
this.state.compareAndSet(WatcherState.STARTED, WatcherState.STOPPING);
//waiting to set state to stopped until after all currently running watches are finished
watcherService.stop("watcher manually marked to shutdown by cluster state update", () -> {
//only transition from stopping -> stopped (which may not be the case if restarted quickly)
boolean stopped = state.compareAndSet(WatcherState.STOPPING, WatcherState.STOPPED);
if (stopped) {
logger.info("watcher has stopped");
} else {
logger.info("watcher has not been stopped. not currently in a stopping state, current state [{}]", state.get());
}

});
}
return;
}
Expand Down Expand Up @@ -142,7 +157,7 @@ public void clusterChanged(ClusterChangedEvent event) {
previousShardRoutings.set(localAffectedShardRoutings);
if (state.get() == WatcherState.STARTED) {
watcherService.reload(event.state(), "new local watcher shard allocation ids");
} else if (state.get() == WatcherState.STOPPED) {
} else if (isStoppedOrStopping) {
this.state.set(WatcherState.STARTING);
watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.upgrade.UpgradeField;
import org.elasticsearch.xpack.core.watcher.WatcherState;
import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField;
import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
Expand Down Expand Up @@ -144,24 +145,32 @@ public boolean validate(ClusterState state) {
}

/**
* Stops the watcher service and marks its services as paused
* Stops the watcher service and marks its services as paused. Callers should set the Watcher state to {@link WatcherState#STOPPING}
* prior to calling this method.
*
* @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may not be {@code null}
*/
public void stop(String reason) {
public void stop(String reason, Runnable stoppedListener) {
assert stoppedListener != null;
logger.info("stopping watch service, reason [{}]", reason);
executionService.pause();
executionService.pause(stoppedListener);
triggerService.pauseExecution();
}

/**
* shuts down the trigger service as well to make sure there are no lingering threads
* also no need to check anything, as this is final, we just can go to status STOPPED
*
* @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the
* {@link WatcherState#STOPPED} is set elsewhere or not needed to be set.
*/
void shutDown() {
void shutDown(Runnable stoppedListener) {
assert stoppedListener != null;
logger.info("stopping watch service, reason [shutdown initiated]");
executionService.pause();
executionService.pause(stoppedListener);
triggerService.stop();
stopExecutor();
logger.debug("watch service has stopped");
logger.debug("watch service has been shut down");
}

void stopExecutor() {
Expand All @@ -185,7 +194,7 @@ void reload(ClusterState state, String reason) {
processedClusterStateVersion.set(state.getVersion());

triggerService.pauseExecution();
int cancelledTaskCount = executionService.clearExecutionsAndQueue();
int cancelledTaskCount = executionService.clearExecutionsAndQueue(() -> {});
logger.info("reloading watcher, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount);

executor.execute(wrapWatcherService(() -> reloadInner(state, reason, false),
Expand Down Expand Up @@ -256,7 +265,7 @@ private synchronized boolean reloadInner(ClusterState state, String reason, bool
*/
public void pauseExecution(String reason) {
triggerService.pauseExecution();
int cancelledTaskCount = executionService.pause();
int cancelledTaskCount = executionService.pause(() -> {});
logger.info("paused watch execution, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
*/
package org.elasticsearch.xpack.watcher.execution;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.watcher.WatcherState;

import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -19,6 +22,7 @@

public final class CurrentExecutions implements Iterable<ExecutionService.WatchExecution> {

private static final Logger logger = LogManager.getLogger(CurrentExecutions.class);
private final ConcurrentMap<String, ExecutionService.WatchExecution> currentExecutions = new ConcurrentHashMap<>();
// the condition of the lock is used to wait and signal the finishing of all executions on shutdown
private final ReentrantLock lock = new ReentrantLock();
Expand Down Expand Up @@ -63,9 +67,12 @@ public void remove(String id) {
* Calling this method makes the class stop accepting new executions and throws and exception instead.
* In addition it waits for a certain amount of time for current executions to finish before returning
*
* @param maxStopTimeout The maximum wait time to wait to current executions to finish
* @param maxStopTimeout The maximum wait time to wait to current executions to finish
* @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the
* {@link WatcherState#STOPPED} is set elsewhere or not needed to be set.
*/
void sealAndAwaitEmpty(TimeValue maxStopTimeout) {
void sealAndAwaitEmpty(TimeValue maxStopTimeout, Runnable stoppedListener) {
assert stoppedListener != null;
lock.lock();
// We may have current executions still going on.
// We should try to wait for the current executions to have completed.
Expand All @@ -81,6 +88,8 @@ void sealAndAwaitEmpty(TimeValue maxStopTimeout) {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
//fully stop Watcher after all executions are finished
stoppedListener.run();
lock.unlock();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.xpack.core.watcher.WatcherState;
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper;
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapperResult;
import org.elasticsearch.xpack.core.watcher.common.stats.Counters;
Expand Down Expand Up @@ -135,23 +136,31 @@ public void unPause() {
* Pausing means, that no new watch executions will be done unless this pausing is explicitly unset.
* This is important when watcher is stopped, so that scheduled watches do not accidentally get executed.
* This should not be used when we need to reload watcher based on some cluster state changes, then just calling
* {@link #clearExecutionsAndQueue()} is the way to go
* {@link #clearExecutionsAndQueue(Runnable)} is the way to go
*
* @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the
* {@link WatcherState#STOPPED} is set elsewhere or not needed to be set.
*
* @return the number of tasks that have been removed
*/
public int pause() {
public int pause(Runnable stoppedListener) {
assert stoppedListener != null;
paused.set(true);
return clearExecutionsAndQueue();
return clearExecutionsAndQueue(stoppedListener);
}

/**
* Empty the currently queued tasks and wait for current executions to finish.
*
* @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the
* {@link WatcherState#STOPPED} is set elsewhere or not needed to be set.
*
* @return the number of tasks that have been removed
*/
public int clearExecutionsAndQueue() {
public int clearExecutionsAndQueue(Runnable stoppedListener) {
assert stoppedListener != null;
int cancelledTaskCount = executor.queue().drainTo(new ArrayList<>());
this.clearExecutions();
this.clearExecutions(stoppedListener);
return cancelledTaskCount;
}

Expand Down Expand Up @@ -278,8 +287,10 @@ public WatchRecord execute(WatchExecutionContext ctx) {
ctx.setNodeId(clusterService.localNode().getId());
WatchRecord record = null;
final String watchId = ctx.id().watchId();
//pull this to a local reference since the class reference can be swapped, and need to ensure same object is used for put/remove
final CurrentExecutions currentExecutions = this.currentExecutions.get();
try {
boolean executionAlreadyExists = currentExecutions.get().put(watchId, new WatchExecution(ctx, Thread.currentThread()));
boolean executionAlreadyExists = currentExecutions.put(watchId, new WatchExecution(ctx, Thread.currentThread()));
if (executionAlreadyExists) {
logger.trace("not executing watch [{}] because it is already queued", watchId);
record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED, "Watch is already queued in thread pool");
Expand Down Expand Up @@ -334,7 +345,7 @@ record = createWatchRecord(record, ctx, e);

triggeredWatchStore.delete(ctx.id());
}
currentExecutions.get().remove(watchId);
currentExecutions.remove(watchId);
logger.debug("finished [{}]/[{}]", watchId, ctx.id());
}
return record;
Expand Down Expand Up @@ -577,11 +588,15 @@ public Counters executionTimes() {
/**
* This clears out the current executions and sets new empty current executions
* This is needed, because when this method is called, watcher keeps running, so sealing executions would be a bad idea
*
* @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the
* {@link WatcherState#STOPPED} is set elsewhere or not needed to be set.
*/
private void clearExecutions() {
private void clearExecutions(Runnable stoppedListener) {
assert stoppedListener != null;
final CurrentExecutions currentExecutionsBeforeSetting = currentExecutions.getAndSet(new CurrentExecutions());
// clear old executions in background, no need to wait
genericExecutor.execute(() -> currentExecutionsBeforeSetting.sealAndAwaitEmpty(maxStopTimeout));
genericExecutor.execute(() -> currentExecutionsBeforeSetting.sealAndAwaitEmpty(maxStopTimeout, stoppedListener));
}

// the watch execution task takes another runnable as parameter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.xpack.core.watcher.WatcherState;
import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.stubbing.Answer;

import java.util.Collections;
Expand Down Expand Up @@ -133,8 +134,8 @@ public void testShutdown() {
when(watcherService.validate(clusterState)).thenReturn(true);

lifeCycleService.shutDown();
verify(watcherService, never()).stop(anyString());
verify(watcherService, times(1)).shutDown();
verify(watcherService, never()).stop(anyString(), any());
verify(watcherService, times(1)).shutDown(any());

reset(watcherService);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
Expand Down Expand Up @@ -175,7 +176,12 @@ public void testManualStartStop() {
.build();

lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", stoppedClusterState, clusterState));
verify(watcherService, times(1)).stop(eq("watcher manually marked to shutdown by cluster state update"));
ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
verify(watcherService, times(1))
.stop(eq("watcher manually marked to shutdown by cluster state update"), captor.capture());
assertEquals(WatcherState.STOPPING, lifeCycleService.getState());
captor.getValue().run();
assertEquals(WatcherState.STOPPED, lifeCycleService.getState());

// Starting via cluster state update, as the watcher metadata block is removed/set to true
reset(watcherService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,8 @@ void stopExecutor() {
csBuilder.metaData(MetaData.builder());

service.reload(csBuilder.build(), "whatever");
verify(executionService).clearExecutionsAndQueue();
verify(executionService, never()).pause();
verify(executionService).clearExecutionsAndQueue(any());
verify(executionService, never()).pause(any());
verify(triggerService).pauseExecution();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,10 +518,12 @@ protected void stopWatcher() throws Exception {
WatcherStatsResponse watcherStatsResponse = new WatcherStatsRequestBuilder(client()).get();
assertThat(watcherStatsResponse.hasFailures(), is(false));
List<Tuple<String, WatcherState>> currentStatesFromStatsRequest = watcherStatsResponse.getNodes().stream()
.map(response -> Tuple.tuple(response.getNode().getName(), response.getWatcherState()))
.collect(Collectors.toList());
.map(response -> Tuple.tuple(response.getNode().getName() + " (" + response.getThreadPoolQueueSize() + ")",
response.getWatcherState())).collect(Collectors.toList());
List<WatcherState> states = currentStatesFromStatsRequest.stream().map(Tuple::v2).collect(Collectors.toList());



logger.info("waiting to stop watcher, current states {}", currentStatesFromStatsRequest);

boolean isAllStateStarted = states.stream().allMatch(w -> w == WatcherState.STARTED);
Expand All @@ -546,7 +548,7 @@ protected void stopWatcher() throws Exception {
}

throw new AssertionError("unexpected state, retrying with next run");
});
}, 30, TimeUnit.SECONDS);
}

public static class NoopEmailService extends EmailService {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.junit.Before;

import java.util.Collections;
import java.util.concurrent.TimeUnit;

import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
Expand Down Expand Up @@ -108,7 +109,7 @@ public void stopWatcher() throws Exception {
default:
throw new AssertionError("unknown state[" + state + "]");
}
});
}, 30, TimeUnit.SECONDS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
Expand Down Expand Up @@ -116,7 +117,7 @@ public void stopWatcher() throws Exception {
} catch (IOException e) {
throw new AssertionError(e);
}
});
}, 30, TimeUnit.SECONDS);

adminClient().performRequest(new Request("DELETE", "/my_test_index"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
Expand Down Expand Up @@ -93,7 +94,7 @@ public void stopWatcher() throws Exception {
default:
throw new AssertionError("unknown state[" + state + "]");
}
});
}, 30, TimeUnit.SECONDS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.junit.After;
import org.junit.Before;

import java.util.concurrent.TimeUnit;

import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
Expand Down Expand Up @@ -90,6 +92,6 @@ public void stopWatcher() throws Exception {
default:
throw new AssertionError("unknown state[" + state + "]");
}
});
}, 30, TimeUnit.SECONDS);
}
}
Loading