diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java index fca29821bfaa4..f1dd7be196569 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.watcher; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; @@ -25,6 +27,7 @@ import java.util.Collections; import java.util.Comparator; +import java.util.EnumSet; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; @@ -35,10 +38,12 @@ public class WatcherLifeCycleService implements ClusterStateListener { + private static final Logger logger = LogManager.getLogger(WatcherLifeCycleService.class); private final AtomicReference state = new AtomicReference<>(WatcherState.STARTED); private final AtomicReference> previousShardRoutings = new AtomicReference<>(Collections.emptyList()); private volatile boolean shutDown = false; // indicates that the node has been shutdown and we should never start watcher after this. private volatile WatcherService watcherService; + private final EnumSet stopStates = EnumSet.of(WatcherState.STOPPED, WatcherState.STOPPING); WatcherLifeCycleService(ClusterService clusterService, WatcherService watcherService) { this.watcherService = watcherService; @@ -57,8 +62,10 @@ synchronized void shutDown() { this.state.set(WatcherState.STOPPING); shutDown = true; clearAllocationIds(); - watcherService.shutDown(); - this.state.set(WatcherState.STOPPED); + watcherService.shutDown(() -> { + this.state.set(WatcherState.STOPPED); + logger.info("watcher has stopped and shutdown"); + }); } /** @@ -88,9 +95,10 @@ public void clusterChanged(ClusterChangedEvent event) { } boolean isWatcherStoppedManually = isWatcherStoppedManually(event.state()); + boolean isStoppedOrStopping = stopStates.contains(this.state.get()); // if this is not a data node, we need to start it ourselves possibly if (event.state().nodes().getLocalNode().isDataNode() == false && - isWatcherStoppedManually == false && this.state.get() == WatcherState.STOPPED) { + isWatcherStoppedManually == false && isStoppedOrStopping) { this.state.set(WatcherState.STARTING); watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED)); return; @@ -99,8 +107,20 @@ public void clusterChanged(ClusterChangedEvent event) { if (isWatcherStoppedManually) { if (this.state.get() == WatcherState.STARTED) { clearAllocationIds(); - watcherService.stop("watcher manually marked to shutdown by cluster state update"); - this.state.set(WatcherState.STOPPED); + boolean stopping = this.state.compareAndSet(WatcherState.STARTED, WatcherState.STOPPING); + if (stopping) { + //waiting to set state to stopped until after all currently running watches are finished + watcherService.stop("watcher manually marked to shutdown by cluster state update", () -> { + //only transition from stopping -> stopped (which may not be the case if restarted quickly) + boolean stopped = state.compareAndSet(WatcherState.STOPPING, WatcherState.STOPPED); + if (stopped) { + logger.info("watcher has stopped"); + } else { + logger.info("watcher has not been stopped. not currently in a stopping state, current state [{}]", state.get()); + } + + }); + } } return; } @@ -142,7 +162,7 @@ public void clusterChanged(ClusterChangedEvent event) { previousShardRoutings.set(localAffectedShardRoutings); if (state.get() == WatcherState.STARTED) { watcherService.reload(event.state(), "new local watcher shard allocation ids"); - } else if (state.get() == WatcherState.STOPPED) { + } else if (isStoppedOrStopping) { this.state.set(WatcherState.STARTING); watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED)); } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java index c96203bd64222..32031e78f5e46 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java @@ -35,6 +35,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.upgrade.UpgradeField; +import org.elasticsearch.xpack.core.watcher.WatcherState; import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField; import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.elasticsearch.xpack.watcher.execution.ExecutionService; @@ -144,24 +145,29 @@ public boolean validate(ClusterState state) { } /** - * Stops the watcher service and marks its services as paused + * Stops the watcher service and marks its services as paused. Callers should set the Watcher state to {@link WatcherState#STOPPING} + * prior to calling this method. + * + * @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may not be {@code null} */ - public void stop(String reason) { + public void stop(String reason, Runnable stoppedListener) { + assert stoppedListener != null; logger.info("stopping watch service, reason [{}]", reason); - executionService.pause(); + executionService.pause(stoppedListener); triggerService.pauseExecution(); } /** * shuts down the trigger service as well to make sure there are no lingering threads - * also no need to check anything, as this is final, we just can go to status STOPPED + * + * @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may not be {@code null} */ - void shutDown() { + void shutDown(Runnable stoppedListener) { + assert stoppedListener != null; logger.info("stopping watch service, reason [shutdown initiated]"); - executionService.pause(); + executionService.pause(stoppedListener); triggerService.stop(); stopExecutor(); - logger.debug("watch service has stopped"); } void stopExecutor() { @@ -185,7 +191,7 @@ void reload(ClusterState state, String reason) { processedClusterStateVersion.set(state.getVersion()); triggerService.pauseExecution(); - int cancelledTaskCount = executionService.clearExecutionsAndQueue(); + int cancelledTaskCount = executionService.clearExecutionsAndQueue(() -> {}); logger.info("reloading watcher, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount); executor.execute(wrapWatcherService(() -> reloadInner(state, reason, false), @@ -256,7 +262,7 @@ private synchronized boolean reloadInner(ClusterState state, String reason, bool */ public void pauseExecution(String reason) { triggerService.pauseExecution(); - int cancelledTaskCount = executionService.pause(); + int cancelledTaskCount = executionService.pause(() -> {}); logger.info("paused watch execution, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount); } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/CurrentExecutions.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/CurrentExecutions.java index 95ac803003681..9e76cbcffca6b 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/CurrentExecutions.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/CurrentExecutions.java @@ -5,8 +5,11 @@ */ package org.elasticsearch.xpack.watcher.execution; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.core.watcher.WatcherState; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; @@ -19,6 +22,7 @@ public final class CurrentExecutions implements Iterable { + private static final Logger logger = LogManager.getLogger(CurrentExecutions.class); private final ConcurrentMap currentExecutions = new ConcurrentHashMap<>(); // the condition of the lock is used to wait and signal the finishing of all executions on shutdown private final ReentrantLock lock = new ReentrantLock(); @@ -63,9 +67,12 @@ public void remove(String id) { * Calling this method makes the class stop accepting new executions and throws and exception instead. * In addition it waits for a certain amount of time for current executions to finish before returning * - * @param maxStopTimeout The maximum wait time to wait to current executions to finish + * @param maxStopTimeout The maximum wait time to wait to current executions to finish + * @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the + * {@link WatcherState#STOPPED} is set elsewhere or not needed to be set. */ - void sealAndAwaitEmpty(TimeValue maxStopTimeout) { + void sealAndAwaitEmpty(TimeValue maxStopTimeout, Runnable stoppedListener) { + assert stoppedListener != null; lock.lock(); // We may have current executions still going on. // We should try to wait for the current executions to have completed. @@ -81,6 +88,8 @@ void sealAndAwaitEmpty(TimeValue maxStopTimeout) { } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { + //fully stop Watcher after all executions are finished + stoppedListener.run(); lock.unlock(); } } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index b1ba8c1522acf..ff1e1f8ec98b5 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -38,6 +38,7 @@ import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.xpack.core.watcher.WatcherState; import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper; import org.elasticsearch.xpack.core.watcher.actions.ActionWrapperResult; import org.elasticsearch.xpack.core.watcher.common.stats.Counters; @@ -135,23 +136,31 @@ public void unPause() { * Pausing means, that no new watch executions will be done unless this pausing is explicitly unset. * This is important when watcher is stopped, so that scheduled watches do not accidentally get executed. * This should not be used when we need to reload watcher based on some cluster state changes, then just calling - * {@link #clearExecutionsAndQueue()} is the way to go + * {@link #clearExecutionsAndQueue(Runnable)} is the way to go + * + * @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the + * {@link WatcherState#STOPPED} is set elsewhere or not needed to be set. * * @return the number of tasks that have been removed */ - public int pause() { + public int pause(Runnable stoppedListener) { + assert stoppedListener != null; paused.set(true); - return clearExecutionsAndQueue(); + return clearExecutionsAndQueue(stoppedListener); } /** * Empty the currently queued tasks and wait for current executions to finish. * + * @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the + * {@link WatcherState#STOPPED} is set elsewhere or not needed to be set. + * * @return the number of tasks that have been removed */ - public int clearExecutionsAndQueue() { + public int clearExecutionsAndQueue(Runnable stoppedListener) { + assert stoppedListener != null; int cancelledTaskCount = executor.queue().drainTo(new ArrayList<>()); - this.clearExecutions(); + this.clearExecutions(stoppedListener); return cancelledTaskCount; } @@ -278,8 +287,10 @@ public WatchRecord execute(WatchExecutionContext ctx) { ctx.setNodeId(clusterService.localNode().getId()); WatchRecord record = null; final String watchId = ctx.id().watchId(); + //pull this to a local reference since the class reference can be swapped, and need to ensure same object is used for put/remove + final CurrentExecutions currentExecutions = this.currentExecutions.get(); try { - boolean executionAlreadyExists = currentExecutions.get().put(watchId, new WatchExecution(ctx, Thread.currentThread())); + boolean executionAlreadyExists = currentExecutions.put(watchId, new WatchExecution(ctx, Thread.currentThread())); if (executionAlreadyExists) { logger.trace("not executing watch [{}] because it is already queued", watchId); record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED, "Watch is already queued in thread pool"); @@ -334,7 +345,7 @@ record = createWatchRecord(record, ctx, e); triggeredWatchStore.delete(ctx.id()); } - currentExecutions.get().remove(watchId); + currentExecutions.remove(watchId); logger.debug("finished [{}]/[{}]", watchId, ctx.id()); } return record; @@ -577,11 +588,15 @@ public Counters executionTimes() { /** * This clears out the current executions and sets new empty current executions * This is needed, because when this method is called, watcher keeps running, so sealing executions would be a bad idea + * + * @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the + * {@link WatcherState#STOPPED} is set elsewhere or not needed to be set. */ - private void clearExecutions() { + private void clearExecutions(Runnable stoppedListener) { + assert stoppedListener != null; final CurrentExecutions currentExecutionsBeforeSetting = currentExecutions.getAndSet(new CurrentExecutions()); // clear old executions in background, no need to wait - genericExecutor.execute(() -> currentExecutionsBeforeSetting.sealAndAwaitEmpty(maxStopTimeout)); + genericExecutor.execute(() -> currentExecutionsBeforeSetting.sealAndAwaitEmpty(maxStopTimeout, stoppedListener)); } // the watch execution task takes another runnable as parameter diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java index 548583ac14b7d..cf6c2c5ac6663 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.xpack.core.watcher.WatcherState; import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.junit.Before; +import org.mockito.ArgumentCaptor; import org.mockito.stubbing.Answer; import java.util.Collections; @@ -133,8 +134,8 @@ public void testShutdown() { when(watcherService.validate(clusterState)).thenReturn(true); lifeCycleService.shutDown(); - verify(watcherService, never()).stop(anyString()); - verify(watcherService, times(1)).shutDown(); + verify(watcherService, never()).stop(anyString(), any()); + verify(watcherService, times(1)).shutDown(any()); reset(watcherService); lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState)); @@ -175,7 +176,12 @@ public void testManualStartStop() { .build(); lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", stoppedClusterState, clusterState)); - verify(watcherService, times(1)).stop(eq("watcher manually marked to shutdown by cluster state update")); + ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); + verify(watcherService, times(1)) + .stop(eq("watcher manually marked to shutdown by cluster state update"), captor.capture()); + assertEquals(WatcherState.STOPPING, lifeCycleService.getState()); + captor.getValue().run(); + assertEquals(WatcherState.STOPPED, lifeCycleService.getState()); // Starting via cluster state update, as the watcher metadata block is removed/set to true reset(watcherService); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java index f4ee831266b38..e67512ee694c1 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java @@ -269,8 +269,8 @@ void stopExecutor() { csBuilder.metaData(MetaData.builder()); service.reload(csBuilder.build(), "whatever"); - verify(executionService).clearExecutionsAndQueue(); - verify(executionService, never()).pause(); + verify(executionService).clearExecutionsAndQueue(any()); + verify(executionService, never()).pause(any()); verify(triggerService).pauseExecution(); } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java index 20c27531509a4..d8f3396206c75 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java @@ -520,10 +520,12 @@ protected void stopWatcher() throws Exception { WatcherStatsResponse watcherStatsResponse = new WatcherStatsRequestBuilder(client()).get(); assertThat(watcherStatsResponse.hasFailures(), is(false)); List> currentStatesFromStatsRequest = watcherStatsResponse.getNodes().stream() - .map(response -> Tuple.tuple(response.getNode().getName(), response.getWatcherState())) - .collect(Collectors.toList()); + .map(response -> Tuple.tuple(response.getNode().getName() + " (" + response.getThreadPoolQueueSize() + ")", + response.getWatcherState())).collect(Collectors.toList()); List states = currentStatesFromStatsRequest.stream().map(Tuple::v2).collect(Collectors.toList()); + + logger.info("waiting to stop watcher, current states {}", currentStatesFromStatsRequest); boolean isAllStateStarted = states.stream().allMatch(w -> w == WatcherState.STARTED); @@ -548,7 +550,7 @@ protected void stopWatcher() throws Exception { } throw new AssertionError("unexpected state, retrying with next run"); - }); + }, 30, TimeUnit.SECONDS); } public static class NoopEmailService extends EmailService { diff --git a/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityClientYamlTestSuiteIT.java b/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityClientYamlTestSuiteIT.java index 679bc08f01f38..9ec458067dc11 100644 --- a/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityClientYamlTestSuiteIT.java +++ b/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityClientYamlTestSuiteIT.java @@ -19,6 +19,7 @@ import org.junit.Before; import java.util.Collections; +import java.util.concurrent.TimeUnit; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -108,7 +109,7 @@ public void stopWatcher() throws Exception { default: throw new AssertionError("unknown state[" + state + "]"); } - }); + }, 30, TimeUnit.SECONDS); } @Override diff --git a/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityIT.java b/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityIT.java index 20fd53b72e416..351c5d27fcf7e 100644 --- a/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityIT.java +++ b/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityIT.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -116,7 +117,7 @@ public void stopWatcher() throws Exception { } catch (IOException e) { throw new AssertionError(e); } - }); + }, 30, TimeUnit.SECONDS); adminClient().performRequest(new Request("DELETE", "/my_test_index")); } diff --git a/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherTestSuiteIT.java b/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherTestSuiteIT.java index 60866661617f8..7138ed7732259 100644 --- a/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherTestSuiteIT.java +++ b/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherTestSuiteIT.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -93,7 +94,7 @@ public void stopWatcher() throws Exception { default: throw new AssertionError("unknown state[" + state + "]"); } - }); + }, 30, TimeUnit.SECONDS); } @Override diff --git a/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/WatcherRestIT.java b/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/WatcherRestIT.java index 2dd5cc86a89c6..3a1155d562d9c 100644 --- a/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/WatcherRestIT.java +++ b/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/WatcherRestIT.java @@ -14,6 +14,8 @@ import org.junit.After; import org.junit.Before; +import java.util.concurrent.TimeUnit; + import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; @@ -90,6 +92,6 @@ public void stopWatcher() throws Exception { default: throw new AssertionError("unknown state[" + state + "]"); } - }); + }, 30, TimeUnit.SECONDS); } } diff --git a/x-pack/qa/third-party/jira/src/test/java/org/elasticsearch/smoketest/WatcherJiraYamlTestSuiteIT.java b/x-pack/qa/third-party/jira/src/test/java/org/elasticsearch/smoketest/WatcherJiraYamlTestSuiteIT.java index 8f8792f26971e..c95c89a7ba954 100644 --- a/x-pack/qa/third-party/jira/src/test/java/org/elasticsearch/smoketest/WatcherJiraYamlTestSuiteIT.java +++ b/x-pack/qa/third-party/jira/src/test/java/org/elasticsearch/smoketest/WatcherJiraYamlTestSuiteIT.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -70,6 +71,6 @@ public void stopWatcher() throws Exception { } catch (IOException e) { throw new AssertionError(e); } - }); + }, 30, TimeUnit.SECONDS); } } diff --git a/x-pack/qa/third-party/pagerduty/src/test/java/org/elasticsearch/smoketest/WatcherPagerDutyYamlTestSuiteIT.java b/x-pack/qa/third-party/pagerduty/src/test/java/org/elasticsearch/smoketest/WatcherPagerDutyYamlTestSuiteIT.java index b9a628f71f972..64de13f8375f0 100644 --- a/x-pack/qa/third-party/pagerduty/src/test/java/org/elasticsearch/smoketest/WatcherPagerDutyYamlTestSuiteIT.java +++ b/x-pack/qa/third-party/pagerduty/src/test/java/org/elasticsearch/smoketest/WatcherPagerDutyYamlTestSuiteIT.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -70,6 +71,6 @@ public void stopWatcher() throws Exception { } catch (IOException e) { throw new AssertionError(e); } - }); + }, 30, TimeUnit.SECONDS); } } diff --git a/x-pack/qa/third-party/slack/src/test/java/org/elasticsearch/smoketest/WatcherSlackYamlTestSuiteIT.java b/x-pack/qa/third-party/slack/src/test/java/org/elasticsearch/smoketest/WatcherSlackYamlTestSuiteIT.java index 01eeae442b2e0..a1e2938817bfa 100644 --- a/x-pack/qa/third-party/slack/src/test/java/org/elasticsearch/smoketest/WatcherSlackYamlTestSuiteIT.java +++ b/x-pack/qa/third-party/slack/src/test/java/org/elasticsearch/smoketest/WatcherSlackYamlTestSuiteIT.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -70,6 +71,6 @@ public void stopWatcher() throws Exception { } catch (IOException e) { throw new AssertionError(e); } - }); + }, 30, TimeUnit.SECONDS); } }