-
Notifications
You must be signed in to change notification settings - Fork 25.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Watcher add stopped listener #43939
Watcher add stopped listener #43939
Changes from all commits
5911077
1b0afbd
2890581
6ca165d
d701006
ddc4032
af03eb4
3807bdc
39b0336
89649d3
41e194e
7e158e6
e46e14f
3a2e31b
2fe0a41
c54c7da
9b9b33c
4b89026
0f7174d
eab50a0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,8 @@ | |
*/ | ||
package org.elasticsearch.xpack.watcher; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.elasticsearch.cluster.ClusterChangedEvent; | ||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.cluster.ClusterStateListener; | ||
|
@@ -25,6 +27,7 @@ | |
|
||
import java.util.Collections; | ||
import java.util.Comparator; | ||
import java.util.EnumSet; | ||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
|
@@ -35,10 +38,12 @@ | |
|
||
public class WatcherLifeCycleService implements ClusterStateListener { | ||
|
||
private static final Logger logger = LogManager.getLogger(WatcherLifeCycleService.class); | ||
private final AtomicReference<WatcherState> state = new AtomicReference<>(WatcherState.STARTED); | ||
private final AtomicReference<List<ShardRouting>> previousShardRoutings = new AtomicReference<>(Collections.emptyList()); | ||
private volatile boolean shutDown = false; // indicates that the node has been shutdown and we should never start watcher after this. | ||
private volatile WatcherService watcherService; | ||
private final EnumSet<WatcherState> stopStates = EnumSet.of(WatcherState.STOPPED, WatcherState.STOPPING); | ||
|
||
WatcherLifeCycleService(ClusterService clusterService, WatcherService watcherService) { | ||
this.watcherService = watcherService; | ||
|
@@ -57,8 +62,10 @@ synchronized void shutDown() { | |
this.state.set(WatcherState.STOPPING); | ||
shutDown = true; | ||
clearAllocationIds(); | ||
watcherService.shutDown(); | ||
this.state.set(WatcherState.STOPPED); | ||
watcherService.shutDown(() -> { | ||
this.state.set(WatcherState.STOPPED); | ||
logger.info("watcher has stopped and shutdown"); | ||
}); | ||
} | ||
|
||
/** | ||
|
@@ -88,9 +95,10 @@ public void clusterChanged(ClusterChangedEvent event) { | |
} | ||
|
||
boolean isWatcherStoppedManually = isWatcherStoppedManually(event.state()); | ||
boolean isStoppedOrStopping = stopStates.contains(this.state.get()); | ||
// if this is not a data node, we need to start it ourselves possibly | ||
if (event.state().nodes().getLocalNode().isDataNode() == false && | ||
isWatcherStoppedManually == false && this.state.get() == WatcherState.STOPPED) { | ||
isWatcherStoppedManually == false && isStoppedOrStopping) { | ||
this.state.set(WatcherState.STARTING); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't we only be able to set the state to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is to stay passive with the old behavior. If we kept this to only The pre-existing design allows you "stop" and restart it immediately and re-process the same watch, any inflight watches will finish up. If I didn't allow |
||
watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we guard against going into STARTED state from a state other than STARTING? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am trying to avoid changes to the |
||
return; | ||
|
@@ -99,8 +107,20 @@ public void clusterChanged(ClusterChangedEvent event) { | |
if (isWatcherStoppedManually) { | ||
if (this.state.get() == WatcherState.STARTED) { | ||
clearAllocationIds(); | ||
watcherService.stop("watcher manually marked to shutdown by cluster state update"); | ||
this.state.set(WatcherState.STOPPED); | ||
boolean stopping = this.state.compareAndSet(WatcherState.STARTED, WatcherState.STOPPING); | ||
if (stopping) { | ||
//waiting to set state to stopped until after all currently running watches are finished | ||
watcherService.stop("watcher manually marked to shutdown by cluster state update", () -> { | ||
//only transition from stopping -> stopped (which may not be the case if restarted quickly) | ||
boolean stopped = state.compareAndSet(WatcherState.STOPPING, WatcherState.STOPPED); | ||
if (stopped) { | ||
logger.info("watcher has stopped"); | ||
} else { | ||
logger.info("watcher has not been stopped. not currently in a stopping state, current state [{}]", state.get()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor nit: s/currently in a/in/ |
||
} | ||
|
||
}); | ||
} | ||
} | ||
return; | ||
} | ||
|
@@ -142,7 +162,7 @@ public void clusterChanged(ClusterChangedEvent event) { | |
previousShardRoutings.set(localAffectedShardRoutings); | ||
if (state.get() == WatcherState.STARTED) { | ||
watcherService.reload(event.state(), "new local watcher shard allocation ids"); | ||
} else if (state.get() == WatcherState.STOPPED) { | ||
} else if (isStoppedOrStopping) { | ||
this.state.set(WatcherState.STARTING); | ||
watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED)); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor nit: you could move this into the WatcherState enum and just have a method
isStopState()