From 59110771496e8bca967d31e388d30feccee1d01d Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Wed, 3 Jul 2019 14:07:26 -0500 Subject: [PATCH 01/14] introduce a stopped listener --- .../watcher/WatcherLifeCycleService.java | 18 +++++++++---- .../xpack/watcher/WatcherService.java | 25 ++++++++++++------ .../watcher/execution/CurrentExecutions.java | 17 ++++++++++-- .../watcher/execution/ExecutionService.java | 26 ++++++++++++++----- .../watcher/WatcherLifeCycleServiceTests.java | 8 +++--- .../xpack/watcher/WatcherServiceTests.java | 4 +-- 6 files changed, 71 insertions(+), 27 deletions(-) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java index fca29821bfaa4..5871541913239 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.stream.Collectors; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; @@ -57,7 +58,7 @@ synchronized void shutDown() { this.state.set(WatcherState.STOPPING); shutDown = true; clearAllocationIds(); - watcherService.shutDown(); + watcherService.shutDown(null); this.state.set(WatcherState.STOPPED); } @@ -90,7 +91,7 @@ public void clusterChanged(ClusterChangedEvent event) { boolean isWatcherStoppedManually = isWatcherStoppedManually(event.state()); // if this is not a data node, we need to start it ourselves possibly if (event.state().nodes().getLocalNode().isDataNode() == false && - isWatcherStoppedManually == false && this.state.get() == WatcherState.STOPPED) { + isWatcherStoppedManually == false && (this.state.get() == WatcherState.STOPPED || this.state.get() == WatcherState.STOPPING)) { this.state.set(WatcherState.STARTING); watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED)); return; @@ -98,9 +99,16 @@ public void clusterChanged(ClusterChangedEvent event) { if (isWatcherStoppedManually) { if (this.state.get() == WatcherState.STARTED) { + this.state.set(WatcherState.STOPPING); clearAllocationIds(); - watcherService.stop("watcher manually marked to shutdown by cluster state update"); - this.state.set(WatcherState.STOPPED); + //waiting to set state to stopped until after all currently running watches are finished + watcherService.stop("watcher manually marked to shutdown by cluster state update", a -> { + //ensure that Watcher wasn't restarted between stopping and now. + if (state.get() == WatcherState.STOPPING) { + state.set(WatcherState.STOPPED); + } + + }); } return; } @@ -142,7 +150,7 @@ public void clusterChanged(ClusterChangedEvent event) { previousShardRoutings.set(localAffectedShardRoutings); if (state.get() == WatcherState.STARTED) { watcherService.reload(event.state(), "new local watcher shard allocation ids"); - } else if (state.get() == WatcherState.STOPPED) { + } else if (state.get() == WatcherState.STOPPED || state.get() == WatcherState.STOPPING) { this.state.set(WatcherState.STARTING); watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED)); } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java index c96203bd64222..01858aa233db4 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.routing.Preference; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -35,6 +36,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.upgrade.UpgradeField; +import org.elasticsearch.xpack.core.watcher.WatcherState; import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField; import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.elasticsearch.xpack.watcher.execution.ExecutionService; @@ -144,24 +146,31 @@ public boolean validate(ClusterState state) { } /** - * Stops the watcher service and marks its services as paused + * Stops the watcher service and marks its services as paused. Callers should set the Watcher state to {@link WatcherState#STOPPING} + * prior to calling this method. + * + * @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may not be {@code null} */ - public void stop(String reason) { + public void stop(String reason, Consumer stoppedListener) { + assert stoppedListener != null; logger.info("stopping watch service, reason [{}]", reason); - executionService.pause(); + executionService.pause(stoppedListener); triggerService.pauseExecution(); } /** * shuts down the trigger service as well to make sure there are no lingering threads * also no need to check anything, as this is final, we just can go to status STOPPED + * + * @param stopListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be {@code null} assuming the + * {@link WatcherState#STOPPED} is set elsewhere or not needed to be set. */ - void shutDown() { + void shutDown(@Nullable Consumer stopListener) { logger.info("stopping watch service, reason [shutdown initiated]"); - executionService.pause(); + executionService.pause(stopListener); triggerService.stop(); stopExecutor(); - logger.debug("watch service has stopped"); + logger.debug("watch service has been shut down"); } void stopExecutor() { @@ -185,7 +194,7 @@ void reload(ClusterState state, String reason) { processedClusterStateVersion.set(state.getVersion()); triggerService.pauseExecution(); - int cancelledTaskCount = executionService.clearExecutionsAndQueue(); + int cancelledTaskCount = executionService.clearExecutionsAndQueue(null); logger.info("reloading watcher, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount); executor.execute(wrapWatcherService(() -> reloadInner(state, reason, false), @@ -256,7 +265,7 @@ private synchronized boolean reloadInner(ClusterState state, String reason, bool */ public void pauseExecution(String reason) { triggerService.pauseExecution(); - int cancelledTaskCount = executionService.pause(); + int cancelledTaskCount = executionService.pause(null); logger.info("paused watch execution, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount); } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/CurrentExecutions.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/CurrentExecutions.java index 95ac803003681..af8053a415583 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/CurrentExecutions.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/CurrentExecutions.java @@ -5,8 +5,12 @@ */ package org.elasticsearch.xpack.watcher.execution; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.core.watcher.WatcherState; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; @@ -14,11 +18,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; import static org.elasticsearch.xpack.core.watcher.support.Exceptions.illegalState; public final class CurrentExecutions implements Iterable { + private static final Logger logger = LogManager.getLogger(CurrentExecutions.class); private final ConcurrentMap currentExecutions = new ConcurrentHashMap<>(); // the condition of the lock is used to wait and signal the finishing of all executions on shutdown private final ReentrantLock lock = new ReentrantLock(); @@ -63,9 +69,11 @@ public void remove(String id) { * Calling this method makes the class stop accepting new executions and throws and exception instead. * In addition it waits for a certain amount of time for current executions to finish before returning * - * @param maxStopTimeout The maximum wait time to wait to current executions to finish + * @param maxStopTimeout The maximum wait time to wait to current executions to finish + * @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be {@code null} assuming the + * {@link WatcherState#STOPPED} is set elsewhere or not needed to be set. */ - void sealAndAwaitEmpty(TimeValue maxStopTimeout) { + void sealAndAwaitEmpty(TimeValue maxStopTimeout, @Nullable Consumer stoppedListener) { lock.lock(); // We may have current executions still going on. // We should try to wait for the current executions to have completed. @@ -81,6 +89,11 @@ void sealAndAwaitEmpty(TimeValue maxStopTimeout) { } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { + //fully stop Watcher after all executions are finished + if(stoppedListener != null) { + stoppedListener.accept(null); + logger.info("watch service has stopped"); + } lock.unlock(); } } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index b1ba8c1522acf..d30dcd0a72437 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -24,6 +24,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.routing.Preference; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Setting; @@ -38,6 +39,7 @@ import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.xpack.core.watcher.WatcherState; import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper; import org.elasticsearch.xpack.core.watcher.actions.ActionWrapperResult; import org.elasticsearch.xpack.core.watcher.common.stats.Counters; @@ -79,6 +81,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN; @@ -135,23 +138,29 @@ public void unPause() { * Pausing means, that no new watch executions will be done unless this pausing is explicitly unset. * This is important when watcher is stopped, so that scheduled watches do not accidentally get executed. * This should not be used when we need to reload watcher based on some cluster state changes, then just calling - * {@link #clearExecutionsAndQueue()} is the way to go + * {@link #clearExecutionsAndQueue(Consumer)} is the way to go + * + * @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be {@code null} assuming the + * {@link WatcherState#STOPPED} is set elsewhere or not needed to be set. * * @return the number of tasks that have been removed */ - public int pause() { + public int pause(@Nullable Consumer stoppedListener) { paused.set(true); - return clearExecutionsAndQueue(); + return clearExecutionsAndQueue(stoppedListener); } /** * Empty the currently queued tasks and wait for current executions to finish. * + * @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be {@code null} assuming the + * {@link WatcherState#STOPPED} is set elsewhere or not needed to be set. + * * @return the number of tasks that have been removed */ - public int clearExecutionsAndQueue() { + public int clearExecutionsAndQueue(@Nullable Consumer stoppedListener) { int cancelledTaskCount = executor.queue().drainTo(new ArrayList<>()); - this.clearExecutions(); + this.clearExecutions(stoppedListener); return cancelledTaskCount; } @@ -577,11 +586,14 @@ public Counters executionTimes() { /** * This clears out the current executions and sets new empty current executions * This is needed, because when this method is called, watcher keeps running, so sealing executions would be a bad idea + * + * @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be {@code null} assuming the + * {@link WatcherState#STOPPED} is set elsewhere or not needed to be set. */ - private void clearExecutions() { + private void clearExecutions(@Nullable Consumer stoppedListener) { final CurrentExecutions currentExecutionsBeforeSetting = currentExecutions.getAndSet(new CurrentExecutions()); // clear old executions in background, no need to wait - genericExecutor.execute(() -> currentExecutionsBeforeSetting.sealAndAwaitEmpty(maxStopTimeout)); + genericExecutor.execute(() -> currentExecutionsBeforeSetting.sealAndAwaitEmpty(maxStopTimeout, stoppedListener)); } // the watch execution task takes another runnable as parameter diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java index 548583ac14b7d..dcf94f8f7c2f9 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java @@ -39,6 +39,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -54,6 +55,7 @@ import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isNotNull; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -133,8 +135,8 @@ public void testShutdown() { when(watcherService.validate(clusterState)).thenReturn(true); lifeCycleService.shutDown(); - verify(watcherService, never()).stop(anyString()); - verify(watcherService, times(1)).shutDown(); + verify(watcherService, never()).stop(anyString(), any()); + verify(watcherService, times(1)).shutDown(any()); reset(watcherService); lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState)); @@ -175,7 +177,7 @@ public void testManualStartStop() { .build(); lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", stoppedClusterState, clusterState)); - verify(watcherService, times(1)).stop(eq("watcher manually marked to shutdown by cluster state update")); + verify(watcherService, times(1)).stop(eq("watcher manually marked to shutdown by cluster state update"), (Consumer) isNotNull()); // Starting via cluster state update, as the watcher metadata block is removed/set to true reset(watcherService); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java index f4ee831266b38..8dc0755425164 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java @@ -269,8 +269,8 @@ void stopExecutor() { csBuilder.metaData(MetaData.builder()); service.reload(csBuilder.build(), "whatever"); - verify(executionService).clearExecutionsAndQueue(); - verify(executionService, never()).pause(); + verify(executionService).clearExecutionsAndQueue(null); + verify(executionService, never()).pause(null); verify(triggerService).pauseExecution(); } From 1b0afbdb4571ac9b0222d8bbfe8cd025ff7eb814 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Wed, 3 Jul 2019 14:16:07 -0500 Subject: [PATCH 02/14] fix checkstyle --- .../elasticsearch/xpack/watcher/WatcherLifeCycleService.java | 1 - .../xpack/watcher/WatcherLifeCycleServiceTests.java | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java index 5871541913239..d3cab81cdd4b9 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java @@ -28,7 +28,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import java.util.stream.Collectors; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java index dcf94f8f7c2f9..dade5a784ef27 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java @@ -177,7 +177,8 @@ public void testManualStartStop() { .build(); lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", stoppedClusterState, clusterState)); - verify(watcherService, times(1)).stop(eq("watcher manually marked to shutdown by cluster state update"), (Consumer) isNotNull()); + verify(watcherService, times(1)) + .stop(eq("watcher manually marked to shutdown by cluster state update"), (Consumer) isNotNull()); // Starting via cluster state update, as the watcher metadata block is removed/set to true reset(watcherService); From 28905818b1a8a433bfabcfb40e779c476946a4ed Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Wed, 3 Jul 2019 14:17:53 -0500 Subject: [PATCH 03/14] fix ordering --- .../elasticsearch/xpack/watcher/WatcherLifeCycleService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java index d3cab81cdd4b9..0faebdf15866c 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java @@ -98,8 +98,8 @@ public void clusterChanged(ClusterChangedEvent event) { if (isWatcherStoppedManually) { if (this.state.get() == WatcherState.STARTED) { - this.state.set(WatcherState.STOPPING); clearAllocationIds(); + this.state.set(WatcherState.STOPPING); //waiting to set state to stopped until after all currently running watches are finished watcherService.stop("watcher manually marked to shutdown by cluster state update", a -> { //ensure that Watcher wasn't restarted between stopping and now. From 6ca165d2021257ec706ce611f124f10c5f86d6cd Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Wed, 3 Jul 2019 14:45:41 -0500 Subject: [PATCH 04/14] assert state transition --- .../xpack/watcher/WatcherLifeCycleServiceTests.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java index dade5a784ef27..2c57ec264f353 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java @@ -34,6 +34,9 @@ import org.elasticsearch.xpack.core.watcher.WatcherState; import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.junit.Before; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.MockitoAnnotations; import org.mockito.stubbing.Answer; import java.util.Collections; @@ -71,8 +74,12 @@ public class WatcherLifeCycleServiceTests extends ESTestCase { private WatcherService watcherService; private WatcherLifeCycleService lifeCycleService; + @Captor + private ArgumentCaptor> voidConsumerCaptor; + @Before public void prepareServices() { + MockitoAnnotations.initMocks(this); ClusterService clusterService = mock(ClusterService.class); Answer answer = invocationOnMock -> { AckedClusterStateUpdateTask updateTask = (AckedClusterStateUpdateTask) invocationOnMock.getArguments()[1]; @@ -178,7 +185,10 @@ public void testManualStartStop() { lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", stoppedClusterState, clusterState)); verify(watcherService, times(1)) - .stop(eq("watcher manually marked to shutdown by cluster state update"), (Consumer) isNotNull()); + .stop(eq("watcher manually marked to shutdown by cluster state update"), voidConsumerCaptor.capture()); + assertEquals(WatcherState.STOPPING, lifeCycleService.getState()); + voidConsumerCaptor.getValue().accept(null); + assertEquals(WatcherState.STOPPED, lifeCycleService.getState()); // Starting via cluster state update, as the watcher metadata block is removed/set to true reset(watcherService); From d701006f7151850b672dec78d62e81b102ea31e0 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Wed, 3 Jul 2019 14:56:53 -0500 Subject: [PATCH 05/14] bump up the timeout for Watcher stops --- .../watcher/test/AbstractWatcherIntegrationTestCase.java | 2 +- .../SmokeTestWatcherWithSecurityClientYamlTestSuiteIT.java | 3 ++- .../smoketest/SmokeTestWatcherWithSecurityIT.java | 3 ++- .../elasticsearch/smoketest/SmokeTestWatcherTestSuiteIT.java | 3 ++- .../test/java/org/elasticsearch/smoketest/WatcherRestIT.java | 4 +++- .../elasticsearch/smoketest/WatcherJiraYamlTestSuiteIT.java | 3 ++- .../smoketest/WatcherPagerDutyYamlTestSuiteIT.java | 3 ++- .../elasticsearch/smoketest/WatcherSlackYamlTestSuiteIT.java | 3 ++- 8 files changed, 16 insertions(+), 8 deletions(-) diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java index 7ad5b5e98bf4a..903d092c4e52b 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java @@ -546,7 +546,7 @@ protected void stopWatcher() throws Exception { } throw new AssertionError("unexpected state, retrying with next run"); - }); + }, 30, TimeUnit.SECONDS); } public static class NoopEmailService extends EmailService { diff --git a/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityClientYamlTestSuiteIT.java b/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityClientYamlTestSuiteIT.java index 679bc08f01f38..9ec458067dc11 100644 --- a/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityClientYamlTestSuiteIT.java +++ b/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityClientYamlTestSuiteIT.java @@ -19,6 +19,7 @@ import org.junit.Before; import java.util.Collections; +import java.util.concurrent.TimeUnit; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -108,7 +109,7 @@ public void stopWatcher() throws Exception { default: throw new AssertionError("unknown state[" + state + "]"); } - }); + }, 30, TimeUnit.SECONDS); } @Override diff --git a/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityIT.java b/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityIT.java index 20fd53b72e416..351c5d27fcf7e 100644 --- a/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityIT.java +++ b/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityIT.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -116,7 +117,7 @@ public void stopWatcher() throws Exception { } catch (IOException e) { throw new AssertionError(e); } - }); + }, 30, TimeUnit.SECONDS); adminClient().performRequest(new Request("DELETE", "/my_test_index")); } diff --git a/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherTestSuiteIT.java b/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherTestSuiteIT.java index 60866661617f8..7138ed7732259 100644 --- a/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherTestSuiteIT.java +++ b/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherTestSuiteIT.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -93,7 +94,7 @@ public void stopWatcher() throws Exception { default: throw new AssertionError("unknown state[" + state + "]"); } - }); + }, 30, TimeUnit.SECONDS); } @Override diff --git a/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/WatcherRestIT.java b/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/WatcherRestIT.java index 2dd5cc86a89c6..3a1155d562d9c 100644 --- a/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/WatcherRestIT.java +++ b/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/WatcherRestIT.java @@ -14,6 +14,8 @@ import org.junit.After; import org.junit.Before; +import java.util.concurrent.TimeUnit; + import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; @@ -90,6 +92,6 @@ public void stopWatcher() throws Exception { default: throw new AssertionError("unknown state[" + state + "]"); } - }); + }, 30, TimeUnit.SECONDS); } } diff --git a/x-pack/qa/third-party/jira/src/test/java/org/elasticsearch/smoketest/WatcherJiraYamlTestSuiteIT.java b/x-pack/qa/third-party/jira/src/test/java/org/elasticsearch/smoketest/WatcherJiraYamlTestSuiteIT.java index 8f8792f26971e..c95c89a7ba954 100644 --- a/x-pack/qa/third-party/jira/src/test/java/org/elasticsearch/smoketest/WatcherJiraYamlTestSuiteIT.java +++ b/x-pack/qa/third-party/jira/src/test/java/org/elasticsearch/smoketest/WatcherJiraYamlTestSuiteIT.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -70,6 +71,6 @@ public void stopWatcher() throws Exception { } catch (IOException e) { throw new AssertionError(e); } - }); + }, 30, TimeUnit.SECONDS); } } diff --git a/x-pack/qa/third-party/pagerduty/src/test/java/org/elasticsearch/smoketest/WatcherPagerDutyYamlTestSuiteIT.java b/x-pack/qa/third-party/pagerduty/src/test/java/org/elasticsearch/smoketest/WatcherPagerDutyYamlTestSuiteIT.java index b9a628f71f972..64de13f8375f0 100644 --- a/x-pack/qa/third-party/pagerduty/src/test/java/org/elasticsearch/smoketest/WatcherPagerDutyYamlTestSuiteIT.java +++ b/x-pack/qa/third-party/pagerduty/src/test/java/org/elasticsearch/smoketest/WatcherPagerDutyYamlTestSuiteIT.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -70,6 +71,6 @@ public void stopWatcher() throws Exception { } catch (IOException e) { throw new AssertionError(e); } - }); + }, 30, TimeUnit.SECONDS); } } diff --git a/x-pack/qa/third-party/slack/src/test/java/org/elasticsearch/smoketest/WatcherSlackYamlTestSuiteIT.java b/x-pack/qa/third-party/slack/src/test/java/org/elasticsearch/smoketest/WatcherSlackYamlTestSuiteIT.java index 01eeae442b2e0..a1e2938817bfa 100644 --- a/x-pack/qa/third-party/slack/src/test/java/org/elasticsearch/smoketest/WatcherSlackYamlTestSuiteIT.java +++ b/x-pack/qa/third-party/slack/src/test/java/org/elasticsearch/smoketest/WatcherSlackYamlTestSuiteIT.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -70,6 +71,6 @@ public void stopWatcher() throws Exception { } catch (IOException e) { throw new AssertionError(e); } - }); + }, 30, TimeUnit.SECONDS); } } From ddc4032e0133e24f9fb5ef735ee63617b892b658 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Wed, 3 Jul 2019 15:00:55 -0500 Subject: [PATCH 06/14] fix checkstyle again --- .../xpack/watcher/WatcherLifeCycleServiceTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java index 2c57ec264f353..8d1ccce6d02ac 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java @@ -58,7 +58,6 @@ import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.isNotNull; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; From 39b0336b38555d3a2590c063068cd595ee327210 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Mon, 8 Jul 2019 13:02:56 -0500 Subject: [PATCH 07/14] consumer->runnable and other review changes --- .../watcher/WatcherLifeCycleService.java | 19 +++++++++---------- .../xpack/watcher/WatcherService.java | 11 ++++++----- .../watcher/execution/CurrentExecutions.java | 11 +++++------ .../watcher/execution/ExecutionService.java | 19 ++++++++++--------- .../watcher/WatcherLifeCycleServiceTests.java | 11 +++-------- .../xpack/watcher/WatcherServiceTests.java | 4 ++-- 6 files changed, 35 insertions(+), 40 deletions(-) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java index 0faebdf15866c..60a7af732429f 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java @@ -57,7 +57,7 @@ synchronized void shutDown() { this.state.set(WatcherState.STOPPING); shutDown = true; clearAllocationIds(); - watcherService.shutDown(null); + watcherService.shutDown(() -> {}); this.state.set(WatcherState.STOPPED); } @@ -88,9 +88,11 @@ public void clusterChanged(ClusterChangedEvent event) { } boolean isWatcherStoppedManually = isWatcherStoppedManually(event.state()); + boolean isStoppedOrStopping = this.state.get() == WatcherState.STOPPED || + this.state.get() == WatcherState.STOPPING; // if this is not a data node, we need to start it ourselves possibly if (event.state().nodes().getLocalNode().isDataNode() == false && - isWatcherStoppedManually == false && (this.state.get() == WatcherState.STOPPED || this.state.get() == WatcherState.STOPPING)) { + isWatcherStoppedManually == false && isStoppedOrStopping) { this.state.set(WatcherState.STARTING); watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED)); return; @@ -99,14 +101,11 @@ public void clusterChanged(ClusterChangedEvent event) { if (isWatcherStoppedManually) { if (this.state.get() == WatcherState.STARTED) { clearAllocationIds(); - this.state.set(WatcherState.STOPPING); + this.state.compareAndSet(WatcherState.STARTED, WatcherState.STOPPING); //waiting to set state to stopped until after all currently running watches are finished - watcherService.stop("watcher manually marked to shutdown by cluster state update", a -> { - //ensure that Watcher wasn't restarted between stopping and now. - if (state.get() == WatcherState.STOPPING) { - state.set(WatcherState.STOPPED); - } - + watcherService.stop("watcher manually marked to shutdown by cluster state update", () -> { + //only transition from stopping -> stopped (which may not be the case if restarted quickly) + state.compareAndSet(WatcherState.STOPPING, WatcherState.STOPPED); }); } return; @@ -149,7 +148,7 @@ public void clusterChanged(ClusterChangedEvent event) { previousShardRoutings.set(localAffectedShardRoutings); if (state.get() == WatcherState.STARTED) { watcherService.reload(event.state(), "new local watcher shard allocation ids"); - } else if (state.get() == WatcherState.STOPPED || state.get() == WatcherState.STOPPING) { + } else if (isStoppedOrStopping) { this.state.set(WatcherState.STARTING); watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED)); } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java index 01858aa233db4..1bd1b06554920 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java @@ -151,7 +151,7 @@ public boolean validate(ClusterState state) { * * @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may not be {@code null} */ - public void stop(String reason, Consumer stoppedListener) { + public void stop(String reason, Runnable stoppedListener) { assert stoppedListener != null; logger.info("stopping watch service, reason [{}]", reason); executionService.pause(stoppedListener); @@ -162,12 +162,13 @@ public void stop(String reason, Consumer stoppedListener) { * shuts down the trigger service as well to make sure there are no lingering threads * also no need to check anything, as this is final, we just can go to status STOPPED * - * @param stopListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be {@code null} assuming the + * @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the * {@link WatcherState#STOPPED} is set elsewhere or not needed to be set. */ - void shutDown(@Nullable Consumer stopListener) { + void shutDown(Runnable stoppedListener) { + assert stoppedListener != null; logger.info("stopping watch service, reason [shutdown initiated]"); - executionService.pause(stopListener); + executionService.pause(stoppedListener); triggerService.stop(); stopExecutor(); logger.debug("watch service has been shut down"); @@ -194,7 +195,7 @@ void reload(ClusterState state, String reason) { processedClusterStateVersion.set(state.getVersion()); triggerService.pauseExecution(); - int cancelledTaskCount = executionService.clearExecutionsAndQueue(null); + int cancelledTaskCount = executionService.clearExecutionsAndQueue(() -> {}); logger.info("reloading watcher, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount); executor.execute(wrapWatcherService(() -> reloadInner(state, reason, false), diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/CurrentExecutions.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/CurrentExecutions.java index af8053a415583..11aaaa05efe1d 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/CurrentExecutions.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/CurrentExecutions.java @@ -70,10 +70,11 @@ public void remove(String id) { * In addition it waits for a certain amount of time for current executions to finish before returning * * @param maxStopTimeout The maximum wait time to wait to current executions to finish - * @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be {@code null} assuming the + * @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the * {@link WatcherState#STOPPED} is set elsewhere or not needed to be set. */ - void sealAndAwaitEmpty(TimeValue maxStopTimeout, @Nullable Consumer stoppedListener) { + void sealAndAwaitEmpty(TimeValue maxStopTimeout, Runnable stoppedListener) { + assert stoppedListener != null; lock.lock(); // We may have current executions still going on. // We should try to wait for the current executions to have completed. @@ -90,10 +91,8 @@ void sealAndAwaitEmpty(TimeValue maxStopTimeout, @Nullable Consumer stoppe Thread.currentThread().interrupt(); } finally { //fully stop Watcher after all executions are finished - if(stoppedListener != null) { - stoppedListener.accept(null); - logger.info("watch service has stopped"); - } + stoppedListener.run(); + logger.info("watch service has stopped"); lock.unlock(); } } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index d30dcd0a72437..bb334365fb42b 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -24,7 +24,6 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.routing.Preference; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Setting; @@ -81,7 +80,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN; @@ -138,14 +136,15 @@ public void unPause() { * Pausing means, that no new watch executions will be done unless this pausing is explicitly unset. * This is important when watcher is stopped, so that scheduled watches do not accidentally get executed. * This should not be used when we need to reload watcher based on some cluster state changes, then just calling - * {@link #clearExecutionsAndQueue(Consumer)} is the way to go + * {@link #clearExecutionsAndQueue(Runnable)} is the way to go * - * @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be {@code null} assuming the + * @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the * {@link WatcherState#STOPPED} is set elsewhere or not needed to be set. * * @return the number of tasks that have been removed */ - public int pause(@Nullable Consumer stoppedListener) { + public int pause(Runnable stoppedListener) { + assert stoppedListener != null; paused.set(true); return clearExecutionsAndQueue(stoppedListener); } @@ -153,12 +152,13 @@ public int pause(@Nullable Consumer stoppedListener) { /** * Empty the currently queued tasks and wait for current executions to finish. * - * @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be {@code null} assuming the + * @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the * {@link WatcherState#STOPPED} is set elsewhere or not needed to be set. * * @return the number of tasks that have been removed */ - public int clearExecutionsAndQueue(@Nullable Consumer stoppedListener) { + public int clearExecutionsAndQueue(Runnable stoppedListener) { + assert stoppedListener != null; int cancelledTaskCount = executor.queue().drainTo(new ArrayList<>()); this.clearExecutions(stoppedListener); return cancelledTaskCount; @@ -587,10 +587,11 @@ public Counters executionTimes() { * This clears out the current executions and sets new empty current executions * This is needed, because when this method is called, watcher keeps running, so sealing executions would be a bad idea * - * @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be {@code null} assuming the + * @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the * {@link WatcherState#STOPPED} is set elsewhere or not needed to be set. */ - private void clearExecutions(@Nullable Consumer stoppedListener) { + private void clearExecutions(Runnable stoppedListener) { + assert stoppedListener != null; final CurrentExecutions currentExecutionsBeforeSetting = currentExecutions.getAndSet(new CurrentExecutions()); // clear old executions in background, no need to wait genericExecutor.execute(() -> currentExecutionsBeforeSetting.sealAndAwaitEmpty(maxStopTimeout, stoppedListener)); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java index 8d1ccce6d02ac..a515b776ac21a 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java @@ -35,14 +35,12 @@ import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.junit.Before; import org.mockito.ArgumentCaptor; -import org.mockito.Captor; import org.mockito.MockitoAnnotations; import org.mockito.stubbing.Answer; import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -73,12 +71,8 @@ public class WatcherLifeCycleServiceTests extends ESTestCase { private WatcherService watcherService; private WatcherLifeCycleService lifeCycleService; - @Captor - private ArgumentCaptor> voidConsumerCaptor; - @Before public void prepareServices() { - MockitoAnnotations.initMocks(this); ClusterService clusterService = mock(ClusterService.class); Answer answer = invocationOnMock -> { AckedClusterStateUpdateTask updateTask = (AckedClusterStateUpdateTask) invocationOnMock.getArguments()[1]; @@ -183,10 +177,11 @@ public void testManualStartStop() { .build(); lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", stoppedClusterState, clusterState)); + ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); verify(watcherService, times(1)) - .stop(eq("watcher manually marked to shutdown by cluster state update"), voidConsumerCaptor.capture()); + .stop(eq("watcher manually marked to shutdown by cluster state update"), captor.capture()); assertEquals(WatcherState.STOPPING, lifeCycleService.getState()); - voidConsumerCaptor.getValue().accept(null); + captor.getValue().run(); assertEquals(WatcherState.STOPPED, lifeCycleService.getState()); // Starting via cluster state update, as the watcher metadata block is removed/set to true diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java index 8dc0755425164..e67512ee694c1 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java @@ -269,8 +269,8 @@ void stopExecutor() { csBuilder.metaData(MetaData.builder()); service.reload(csBuilder.build(), "whatever"); - verify(executionService).clearExecutionsAndQueue(null); - verify(executionService, never()).pause(null); + verify(executionService).clearExecutionsAndQueue(any()); + verify(executionService, never()).pause(any()); verify(triggerService).pauseExecution(); } From 89649d315d2ef53284d2d2f8d9c9c4c00ff28680 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Mon, 8 Jul 2019 13:53:49 -0500 Subject: [PATCH 08/14] fix null assertion --- .../java/org/elasticsearch/xpack/watcher/WatcherService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java index 1bd1b06554920..eaba8d1d3800b 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java @@ -266,7 +266,7 @@ private synchronized boolean reloadInner(ClusterState state, String reason, bool */ public void pauseExecution(String reason) { triggerService.pauseExecution(); - int cancelledTaskCount = executionService.pause(null); + int cancelledTaskCount = executionService.pause(() -> {}); logger.info("paused watch execution, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount); } From 41e194e52643f1bd673e2a7e315628ff5d3e4832 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Mon, 8 Jul 2019 14:05:15 -0500 Subject: [PATCH 09/14] fix checkstyle --- .../java/org/elasticsearch/xpack/watcher/WatcherService.java | 1 - .../xpack/watcher/execution/CurrentExecutions.java | 2 -- .../xpack/watcher/WatcherLifeCycleServiceTests.java | 1 - 3 files changed, 4 deletions(-) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java index eaba8d1d3800b..faae54cc1c689 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java @@ -24,7 +24,6 @@ import org.elasticsearch.cluster.routing.Preference; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/CurrentExecutions.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/CurrentExecutions.java index 11aaaa05efe1d..4f7cdf6c5786e 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/CurrentExecutions.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/CurrentExecutions.java @@ -8,7 +8,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.core.watcher.WatcherState; @@ -18,7 +17,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Consumer; import static org.elasticsearch.xpack.core.watcher.support.Exceptions.illegalState; diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java index a515b776ac21a..cf6c2c5ac6663 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java @@ -35,7 +35,6 @@ import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.junit.Before; import org.mockito.ArgumentCaptor; -import org.mockito.MockitoAnnotations; import org.mockito.stubbing.Answer; import java.util.Collections; From 7e158e6e59998aa27851a065314a67db27dbae6f Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Mon, 8 Jul 2019 15:20:43 -0500 Subject: [PATCH 10/14] minor logging updates --- .../xpack/watcher/WatcherLifeCycleService.java | 11 ++++++++++- .../xpack/watcher/execution/CurrentExecutions.java | 1 - .../test/AbstractWatcherIntegrationTestCase.java | 6 ++++-- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java index 60a7af732429f..9a6f2d8324a97 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.watcher; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; @@ -35,6 +37,7 @@ public class WatcherLifeCycleService implements ClusterStateListener { + private static final Logger logger = LogManager.getLogger(WatcherLifeCycleService.class); private final AtomicReference state = new AtomicReference<>(WatcherState.STARTED); private final AtomicReference> previousShardRoutings = new AtomicReference<>(Collections.emptyList()); private volatile boolean shutDown = false; // indicates that the node has been shutdown and we should never start watcher after this. @@ -105,7 +108,13 @@ public void clusterChanged(ClusterChangedEvent event) { //waiting to set state to stopped until after all currently running watches are finished watcherService.stop("watcher manually marked to shutdown by cluster state update", () -> { //only transition from stopping -> stopped (which may not be the case if restarted quickly) - state.compareAndSet(WatcherState.STOPPING, WatcherState.STOPPED); + boolean stopped = state.compareAndSet(WatcherState.STOPPING, WatcherState.STOPPED); + if (stopped) { + logger.info("watcher has stopped"); + } else { + logger.info("watcher has not been stopped. not currently in a stopping state, current state [{}]", state.get()); + } + }); } return; diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/CurrentExecutions.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/CurrentExecutions.java index 4f7cdf6c5786e..9e76cbcffca6b 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/CurrentExecutions.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/CurrentExecutions.java @@ -90,7 +90,6 @@ void sealAndAwaitEmpty(TimeValue maxStopTimeout, Runnable stoppedListener) { } finally { //fully stop Watcher after all executions are finished stoppedListener.run(); - logger.info("watch service has stopped"); lock.unlock(); } } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java index 903d092c4e52b..1e84a76083246 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java @@ -518,10 +518,12 @@ protected void stopWatcher() throws Exception { WatcherStatsResponse watcherStatsResponse = new WatcherStatsRequestBuilder(client()).get(); assertThat(watcherStatsResponse.hasFailures(), is(false)); List> currentStatesFromStatsRequest = watcherStatsResponse.getNodes().stream() - .map(response -> Tuple.tuple(response.getNode().getName(), response.getWatcherState())) - .collect(Collectors.toList()); + .map(response -> Tuple.tuple(response.getNode().getName() + " (" + response.getThreadPoolQueueSize() + ")", + response.getWatcherState())).collect(Collectors.toList()); List states = currentStatesFromStatsRequest.stream().map(Tuple::v2).collect(Collectors.toList()); + + logger.info("waiting to stop watcher, current states {}", currentStatesFromStatsRequest); boolean isAllStateStarted = states.stream().allMatch(w -> w == WatcherState.STARTED); From e46e14f6000ad42895c297f4fdc550106ac3896f Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Mon, 8 Jul 2019 19:53:11 -0500 Subject: [PATCH 11/14] fix potential issue with class member swap --- .../xpack/watcher/execution/ExecutionService.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index bb334365fb42b..ff1e1f8ec98b5 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -287,8 +287,10 @@ public WatchRecord execute(WatchExecutionContext ctx) { ctx.setNodeId(clusterService.localNode().getId()); WatchRecord record = null; final String watchId = ctx.id().watchId(); + //pull this to a local reference since the class reference can be swapped, and need to ensure same object is used for put/remove + final CurrentExecutions currentExecutions = this.currentExecutions.get(); try { - boolean executionAlreadyExists = currentExecutions.get().put(watchId, new WatchExecution(ctx, Thread.currentThread())); + boolean executionAlreadyExists = currentExecutions.put(watchId, new WatchExecution(ctx, Thread.currentThread())); if (executionAlreadyExists) { logger.trace("not executing watch [{}] because it is already queued", watchId); record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED, "Watch is already queued in thread pool"); @@ -343,7 +345,7 @@ record = createWatchRecord(record, ctx, e); triggeredWatchStore.delete(ctx.id()); } - currentExecutions.get().remove(watchId); + currentExecutions.remove(watchId); logger.debug("finished [{}]/[{}]", watchId, ctx.id()); } return record; From 3a2e31b0625f68dc081a212afeeb2dece3a4e431 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Wed, 10 Jul 2019 09:00:42 -0500 Subject: [PATCH 12/14] protect against multiple calls to watcherService.stop --- .../watcher/WatcherLifeCycleService.java | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java index 9a6f2d8324a97..9be2a75f3b860 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java @@ -104,18 +104,20 @@ public void clusterChanged(ClusterChangedEvent event) { if (isWatcherStoppedManually) { if (this.state.get() == WatcherState.STARTED) { clearAllocationIds(); - this.state.compareAndSet(WatcherState.STARTED, WatcherState.STOPPING); - //waiting to set state to stopped until after all currently running watches are finished - watcherService.stop("watcher manually marked to shutdown by cluster state update", () -> { - //only transition from stopping -> stopped (which may not be the case if restarted quickly) - boolean stopped = state.compareAndSet(WatcherState.STOPPING, WatcherState.STOPPED); - if (stopped) { - logger.info("watcher has stopped"); - } else { - logger.info("watcher has not been stopped. not currently in a stopping state, current state [{}]", state.get()); - } - - }); + boolean stopping = this.state.compareAndSet(WatcherState.STARTED, WatcherState.STOPPING); + if (stopping) { + //waiting to set state to stopped until after all currently running watches are finished + watcherService.stop("watcher manually marked to shutdown by cluster state update", () -> { + //only transition from stopping -> stopped (which may not be the case if restarted quickly) + boolean stopped = state.compareAndSet(WatcherState.STOPPING, WatcherState.STOPPED); + if (stopped) { + logger.info("watcher has stopped"); + } else { + logger.info("watcher has not been stopped. not currently in a stopping state, current state [{}]", state.get()); + } + + }); + } } return; } From c54c7da61bd6ce96ac4dda3f79084a0e8886815e Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Mon, 15 Jul 2019 09:23:56 -0500 Subject: [PATCH 13/14] delay stopped status in shutdown --- .../xpack/watcher/WatcherLifeCycleService.java | 6 ++++-- .../org/elasticsearch/xpack/watcher/WatcherService.java | 5 +---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java index 9be2a75f3b860..742a1ceb189b1 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java @@ -60,8 +60,10 @@ synchronized void shutDown() { this.state.set(WatcherState.STOPPING); shutDown = true; clearAllocationIds(); - watcherService.shutDown(() -> {}); - this.state.set(WatcherState.STOPPED); + watcherService.shutDown(() -> { + this.state.set(WatcherState.STOPPED); + logger.info("watcher has stopped and shutdown"); + }); } /** diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java index faae54cc1c689..32031e78f5e46 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java @@ -159,10 +159,8 @@ public void stop(String reason, Runnable stoppedListener) { /** * shuts down the trigger service as well to make sure there are no lingering threads - * also no need to check anything, as this is final, we just can go to status STOPPED * - * @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the - * {@link WatcherState#STOPPED} is set elsewhere or not needed to be set. + * @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may not be {@code null} */ void shutDown(Runnable stoppedListener) { assert stoppedListener != null; @@ -170,7 +168,6 @@ void shutDown(Runnable stoppedListener) { executionService.pause(stoppedListener); triggerService.stop(); stopExecutor(); - logger.debug("watch service has been shut down"); } void stopExecutor() { From eab50a08ab1a73ed16e93324e930741f17efc823 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Wed, 14 Aug 2019 15:08:19 -0500 Subject: [PATCH 14/14] fix minor potential of race by only checking state once in conditional --- .../elasticsearch/xpack/watcher/WatcherLifeCycleService.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java index 742a1ceb189b1..f1dd7be196569 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.Comparator; +import java.util.EnumSet; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; @@ -42,6 +43,7 @@ public class WatcherLifeCycleService implements ClusterStateListener { private final AtomicReference> previousShardRoutings = new AtomicReference<>(Collections.emptyList()); private volatile boolean shutDown = false; // indicates that the node has been shutdown and we should never start watcher after this. private volatile WatcherService watcherService; + private final EnumSet stopStates = EnumSet.of(WatcherState.STOPPED, WatcherState.STOPPING); WatcherLifeCycleService(ClusterService clusterService, WatcherService watcherService) { this.watcherService = watcherService; @@ -93,8 +95,7 @@ public void clusterChanged(ClusterChangedEvent event) { } boolean isWatcherStoppedManually = isWatcherStoppedManually(event.state()); - boolean isStoppedOrStopping = this.state.get() == WatcherState.STOPPED || - this.state.get() == WatcherState.STOPPING; + boolean isStoppedOrStopping = stopStates.contains(this.state.get()); // if this is not a data node, we need to start it ourselves possibly if (event.state().nodes().getLocalNode().isDataNode() == false && isWatcherStoppedManually == false && isStoppedOrStopping) {