Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Watcher: Mark watcher as started only after loading watches #30403

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ public void clusterChanged(ClusterChangedEvent event) {
// if this is not a data node, we need to start it ourselves possibly
if (event.state().nodes().getLocalNode().isDataNode() == false &&
isWatcherStoppedManually == false && this.state.get() == WatcherState.STOPPED) {
watcherService.start(event.state());
this.state.set(WatcherState.STARTED);
watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED));
return;
}

Expand Down Expand Up @@ -157,8 +156,8 @@ public void clusterChanged(ClusterChangedEvent event) {
if (state.get() == WatcherState.STARTED) {
watcherService.reload(event.state(), "new local watcher shard allocation ids");
} else if (state.get() == WatcherState.STOPPED) {
watcherService.start(event.state());
this.state.set(WatcherState.STARTED);
this.state.set(WatcherState.STARTING);
watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED));
}
} else {
clearAllocationIds();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,23 +183,40 @@ void reload(ClusterState state, String reason) {
// by checking the cluster state version before and after loading the watches we can potentially just exit without applying the
// changes
processedClusterStateVersion.set(state.getVersion());
pauseExecution(reason);
triggerService.pauseExecution();
int cancelledTaskCount = executionService.reload();
logger.info("reloading watcher, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount);

executor.execute(wrapWatcherService(() -> reloadInner(state, reason, false),
e -> logger.error("error reloading watcher", e)));
}

public void start(ClusterState state) {
/**
* start the watcher service, load watches in the background
*
* @param state the current cluster state
* @param callback the callback to be triggered, when watches where loaded successfully
*/
public void start(ClusterState state, Runnable callback) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we call the callback onStarted ? it is not called on error

executionService.unPause();
processedClusterStateVersion.set(state.getVersion());
executor.execute(wrapWatcherService(() -> reloadInner(state, "starting", true),
executor.execute(wrapWatcherService(() -> {
if (reloadInner(state, "starting", true)) {
callback.run();
}
},
e -> logger.error("error starting watcher", e)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to do something with is error? (not related to this change)

}

/**
* reload the watches and start scheduling them
*
* @param state the current cluster state
* @param reason the reason for reloading, will be logged
* @param loadTriggeredWatches should triggered watches be loaded in this run, not needed for reloading, only for starting
* @return true if no other loading of a newer cluster state happened in parallel, false otherwise
*/
private synchronized void reloadInner(ClusterState state, String reason, boolean loadTriggeredWatches) {
private synchronized boolean reloadInner(ClusterState state, String reason, boolean loadTriggeredWatches) {
// exit early if another thread has come in between
if (processedClusterStateVersion.get() != state.getVersion()) {
logger.debug("watch service has not been reloaded for state [{}], another reload for state [{}] in progress",
Expand All @@ -221,9 +238,11 @@ private synchronized void reloadInner(ClusterState state, String reason, boolean
executionService.executeTriggeredWatches(triggeredWatches);
}
logger.debug("watch service has been reloaded, reason [{}]", reason);
return true;
} else {
logger.debug("watch service has not been reloaded for state [{}], another reload for state [{}] in progress",
state.getVersion(), processedClusterStateVersion.get());
return false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,19 @@ public void unPause() {
}

/**
* Pause the execution of the watcher executor
* Pause the execution of the watcher executor, and empty the state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you unpack the java docs to explain what this does? this seems too terse. For example it doesn't mention reload?

* @return the number of tasks that have been removed
*/
public int pause() {
paused.set(true);
return reload();
}

/**
* Empty the currently queued tasks and wait for current executions to finish
* @return the number of tasks that have been removed
*/
public int reload() {
int cancelledTaskCount = executor.queue().drainTo(new ArrayList<>());
this.clearExecutions();
return cancelledTaskCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public void testManualStartStop() {
reset(watcherService);
when(watcherService.validate(clusterState)).thenReturn(true);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, stoppedClusterState));
verify(watcherService, times(1)).start(eq(clusterState));
verify(watcherService, times(1)).start(eq(clusterState), anyObject());

// no change, keep going
reset(watcherService);
Expand Down Expand Up @@ -423,7 +423,7 @@ public void testWatcherServiceDoesNotStartIfIndexTemplatesAreMissing() throws Ex
when(watcherService.validate(eq(state))).thenReturn(true);

lifeCycleService.clusterChanged(new ClusterChangedEvent("any", state, state));
verify(watcherService, times(0)).start(any(ClusterState.class));
verify(watcherService, times(0)).start(any(ClusterState.class), anyObject());
}

public void testWatcherStopsWhenMasterNodeIsMissing() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -199,7 +200,7 @@ void stopExecutor() {
when(client.clearScroll(any(ClearScrollRequest.class))).thenReturn(clearScrollFuture);
clearScrollFuture.onResponse(new ClearScrollResponse(true, 1));

service.start(clusterState);
service.start(clusterState, () -> {});

ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
verify(triggerService).start(captor.capture());
Expand Down Expand Up @@ -238,6 +239,27 @@ void stopExecutor() {
verify(triggerEngine).pauseExecution();
}

// if we have to reload the watcher service, the execution service should not be paused, as this might
// result in missing executions
public void testReloadingWatcherDoesNotPauseExecutionService() {
ExecutionService executionService = mock(ExecutionService.class);
TriggerService triggerService = mock(TriggerService.class);
WatcherService service = new WatcherService(Settings.EMPTY, triggerService, mock(TriggeredWatchStore.class),
executionService, mock(WatchParser.class), mock(Client.class), executorService) {
@Override
void stopExecutor() {
}
};

ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
csBuilder.metaData(MetaData.builder());

service.reload(csBuilder.build(), "whatever");
verify(executionService).reload();
verify(executionService, never()).pause();
verify(triggerService).pauseExecution();
}

private static DiscoveryNode newNode() {
return new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(),
new HashSet<>(asList(DiscoveryNode.Role.values())), Version.CURRENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import java.util.function.Function;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction;
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput;
Expand All @@ -36,6 +37,8 @@

public class ExecutionVarsIntegrationTests extends AbstractWatcherIntegrationTestCase {

private String watchId = randomAlphaOfLength(20);

@Override
protected List<Class<? extends Plugin>> pluginTypes() {
List<Class<? extends Plugin>> types = super.pluginTypes();
Expand Down Expand Up @@ -107,7 +110,7 @@ protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
public void testVars() throws Exception {
WatcherClient watcherClient = watcherClient();

PutWatchResponse putWatchResponse = watcherClient.preparePutWatch("_id").setSource(watchBuilder()
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch(watchId).setSource(watchBuilder()
.trigger(schedule(cron("0/1 * * * * ?")))
.input(simpleInput("value", 5))
.condition(new ScriptCondition(
Expand All @@ -126,7 +129,7 @@ public void testVars() throws Exception {

assertThat(putWatchResponse.isCreated(), is(true));

timeWarp().trigger("_id");
timeWarp().trigger(watchId);

flush();
refresh();
Expand All @@ -135,11 +138,11 @@ public void testVars() throws Exception {
// defaults to match all;
});

assertThat(searchResponse.getHits().getTotalHits(), is(1L));
assertHitCount(searchResponse, 1L);

Map<String, Object> source = searchResponse.getHits().getAt(0).getSourceAsMap();

assertValue(source, "watch_id", is("_id"));
assertValue(source, "watch_id", is(watchId));
assertValue(source, "state", is("executed"));

// we don't store the computed vars in history
Expand Down Expand Up @@ -171,7 +174,7 @@ public void testVars() throws Exception {
public void testVarsManual() throws Exception {
WatcherClient watcherClient = watcherClient();

PutWatchResponse putWatchResponse = watcherClient.preparePutWatch("_id").setSource(watchBuilder()
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch(watchId).setSource(watchBuilder()
.trigger(schedule(cron("0/1 * * * * ? 2020")))
.input(simpleInput("value", 5))
.condition(new ScriptCondition(
Expand All @@ -193,13 +196,13 @@ public void testVarsManual() throws Exception {
boolean debug = randomBoolean();

ExecuteWatchResponse executeWatchResponse = watcherClient
.prepareExecuteWatch("_id")
.prepareExecuteWatch(watchId)
.setDebug(debug)
.get();
assertThat(executeWatchResponse.getRecordId(), notNullValue());
XContentSource source = executeWatchResponse.getRecordSource();

assertValue(source, "watch_id", is("_id"));
assertValue(source, "watch_id", is(watchId));
assertValue(source, "state", is("executed"));

if (debug) {
Expand Down