-
Notifications
You must be signed in to change notification settings - Fork 25.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Periodically try to reassign unassigned persistent tasks #36069
Periodically try to reassign unassigned persistent tasks #36069
Conversation
Previously persistent task assignment was checked in the following situations: - Persistent tasks are changed - A node joins or leaves the cluster - The routing table is changed - Custom metadata in the cluster state is changed - A new master node is elected However, there could be situations when a persistent task that could not be assigned to a node could become assignable due to some other change, such as memory usage on the nodes. This change adds a timed recheck of persistent task assignment to account for such situations. The timer is suspended while checks triggered by cluster state changes are in-flight to avoid adding burden to an already busy cluster. Closes elastic#35792
Pinging @elastic/es-distributed |
1b0794e
to
8ff4e2e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks robert. I left some comments. Also can you add an IT test?
}); | ||
reassignPersistentTasks(event.state().getVersion()); | ||
} else { | ||
periodicRechecker.schedule(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you only want to schedule if you have unassigned tasks?
@Override | ||
public ClusterState execute(ClusterState currentState) { | ||
ClusterState newState = reassignTasks(currentState); | ||
periodicRechecker.schedule(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you only want to do this if you have unassigned tasks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, this should be done when the cluster state is published. See ClusterStateUpdateTask.clusterStatePublished
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ClusterStateUpdateTask.clusterStatePublished
is empty and final
and also only gets called if the cluster state is updated by the execute
method.
I think to make this reschedule after publishing or no-op I'd need to change the update class to implementAckedClusterStateTaskListener
right? Then onAllNodesAcked
will get called regardless of whether execute
changes the cluster state or not.
Any objections before I start on this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
argh, you are right. The one you need is clusterStateProcessed
. Note that both of these are only called when the cluster state is committed, which is a big difference than when the execute method returns. It means that master got other master nodes to agree on the change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like overriding clusterStateProcessed
is non-trivial now.
Since https://github.com/elastic/elasticsearch/pull/31241/files#diff-bc0dd060947fa9d8e3209d60f7255f1dR67 it cannot be overridden in a class that extends ClusterStateUpdateTask
because it runs in the system context. I don't think running in the system context would be a problem for persistent task allocation decisions, but it creates a question of how to do this without opening up ClusterStateUpdateTask
to future mistakes. I could move most of the functionality of ClusterStateUpdateTask
into a new base class called SystemContextClusterStateUpdateTask
, and have ClusterStateUpdateTask
extend that and just override clusterStateProcessed
with it's empty final
version. Does that sound OK?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@droberts195 I may be missing something, but the way I read the other PR, it is exactly the intention to use clusterStateProcessed
as the thread context is maintained there. See here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I was still looking at clusterStatePublished
. clusterStateProcessed
makes it easy.
logger.trace("periodic persistent task assignment check running"); | ||
ClusterState state = clusterService.state(); | ||
final PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); | ||
if (tasks != null && anyTaskNeedsReassignment(tasks, state)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we unify this logic with clusterChanged
?
/** | ||
* Class to periodically try to reassign unassigned persistent tasks. | ||
*/ | ||
private class PeriodicRechecker implements Runnable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's time to extract a utility class that's shared with org.elasticsearch.index.IndexService.BaseAsyncTask
this.clusterService = clusterService; | ||
clusterService.addListener(this); | ||
clusterService.getClusterSettings().addSettingsUpdateConsumer(CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add the settings update consumer last, after the periodicRechecker
is instanciated?
(The one that is still to be addressed is the integration test)
I think I've addressed all the feedback - please could you have another look @bleskes? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good - I left some more comments.
} | ||
|
||
void setRecheckInterval(TimeValue recheckInterval) { | ||
this.recheckInterval = recheckInterval; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this feels awkward. Why do we need to store it in two different places?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point - we don't
@@ -241,21 +269,47 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS | |||
|
|||
@Override | |||
public void clusterChanged(ClusterChangedEvent event) { | |||
periodicRechecker.cancel(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find it easier to reason about this if we only cancel a scheduled check if this even is really relevant i.e., if shouldReassignPersistentTasks
returns true. This will also avoid the need to have a re-schedule if we didn't end up doing anything.
private boolean anyTaskNeedsReassignment(final PersistentTasksCustomMetaData tasks, final ClusterState state) { | ||
for (PersistentTask<?> task : tasks.tasks()) { | ||
if (needsReassignment(task.getAssignment(), state.nodes())) { | ||
Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), state); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method has weird naimig (semantics?) - it's called anyTaskNeedsReassignment
and here we know that needsReassignment
returned true, why don't we return true immediately ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does anyTaskReassignmentRequired
sound? I think in combination with renaming needsReassignment
to isAssignedToValidNode
it makes the code clearer.
*/ | ||
private boolean isAnyTaskUnassigned(final PersistentTasksCustomMetaData tasks, final ClusterState state) { | ||
for (PersistentTask<?> task : tasks.tasks()) { | ||
if (needsReassignment(task.getAssignment(), state.nodes())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the method is called isAnyTaskUnassigned
but the code checks that a task needsReassignment
- can you double check what we need and rename accordingly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needsReassignment
predates this change. I'll rename it to isAssignedToValidNode
, which reflects what it actually does.
|
||
public void testManualRepeat() throws InterruptedException { | ||
|
||
final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if you want a CyclicBarrier here.
@@ -136,7 +136,6 @@ public void testRefreshTaskIsUpdated() throws IOException { | |||
assertNotSame(refreshTask, indexService.getRefreshTask()); | |||
assertTrue(refreshTask.isClosed()); | |||
assertFalse(refreshTask.isScheduled()); | |||
assertFalse(indexService.getRefreshTask().mustReschedule()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this changed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because checking whether close()
has been called is now outside of the control of the derived class. mustReschedule()
on the derived class just checks extra conditions over and above whether the task has been closed. (mustReschedule()
is protected - it's not meant to be called by external users.) If AbstractAsyncTaskTests
is doing its job properly then assertTrue(refreshTask.isClosed());
on line 137 should be enough in this file to confirm that the task will never be scheduled again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
k
@bleskes I think I've addressed your second batch of comments now - please can you have another look? |
server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java
Show resolved
Hide resolved
periodicRechecker.cancel(); | ||
reassignPersistentTasks(event.state().getVersion()); | ||
} else { | ||
scheduleRecheckIfUnassignedTasks(event.state()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left over?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something is needed here. I added a slightly different version back in 610f25e with a comment to make it clearer why.
* Returns true if any persistent task provided is unassigned, | ||
* i.e. is not assigned or is assigned to a non-existing node. | ||
*/ | ||
private boolean isAnyTaskUnassigned(final PersistentTasksCustomMetaData tasks, final ClusterState state) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still some naming issues here - how about isAnyTasksRequiresAssignment
?
@@ -330,7 +400,7 @@ static boolean persistentTasksChanged(final ClusterChangedEvent event) { | |||
} | |||
|
|||
/** Returns true if the task is not assigned or is assigned to a non-existing node */ | |||
public static boolean needsReassignment(final Assignment assignment, final DiscoveryNodes nodes) { | |||
public static boolean isAssignedToValidNode(final Assignment assignment, final DiscoveryNodes nodes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about requiresAssignment
? (is assigned suggest it is but we also consider unassigned tasks)
* Submit a cluster state update to reassign any persistent tasks that need reassigning | ||
*/ | ||
private void reassignPersistentTasks(long currentStateVersion) { | ||
logger.trace("checking task reassignment for cluster state {}", currentStateVersion); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's weird to have a parameter passed in for just a version log under trace. Can you remove it and leave the logging in the event method?
if (tasks != null && anyTaskReassignmentRequired(tasks, state)) { | ||
reassignPersistentTasks(state.getVersion()); | ||
} else { | ||
scheduleRecheckIfUnassignedTasks(state); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is unneeded no? we already checked in this if so why check again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking great. I made another pass and left some nits.
}); | ||
reassignPersistentTasks(); | ||
} else if (periodicRechecker.isScheduled() == false && | ||
isAnyTaskUnassigned(event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any time that a task can't be allocated when it should have we schedule a periodic check. This means this else clause is not needed. Can you clarify why this needed? (it also creates a race condition when one cluster change has triggered submission of a cluster state task via reassignPersistentTasks and then another cluster state change comes in and schedules a check).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CI for commit 824ffab failed without this extra check: https://elasticsearch-ci.elastic.co/job/elastic+elasticsearch+pull-request-1/1905/console
shouldReassignPersistentTasks
returns false
if there are unassigned tasks but the assignment would still be the same for them.
createPersistentTask
sets the initial assignment for a newly created task, which will then be the same when it goes through shouldReassignPersistentTasks
, causing shouldReassignPersistentTasks
to return false
and bypass the scheduling of the periodic check.
A solution that avoids the race condition you pointed out is to schedule a recheck in the anonymous cluster state update class in createPersistentTask
. That's in bcb1daf.
|
||
@Override | ||
protected String getThreadPool() { | ||
return ThreadPool.Names.GENERIC; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be same, I think.
|
||
public void testAutoRepeat() throws InterruptedException { | ||
|
||
final AtomicReference<CountDownLatch> latch1 = new AtomicReference<>(new CountDownLatch(1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you didn't like the CyclicBarrier ?
} catch (InterruptedException e) { | ||
fail("interrupted"); | ||
} | ||
if (randomBoolean()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you sample this in advance (before the task is generated) so it will be reproducible?
|
||
assertFalse(task.isScheduled()); | ||
task.rescheduleIfNecessary(); | ||
barrier.await(10, TimeUnit.SECONDS); // should happen very quickly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we typically wait indefinitely so if things get stuck we get a suite timeout + stack dump so we know where things got stuck
task.setInterval(TimeValue.timeValueMillis(1)); | ||
assertTrue(task.isScheduled()); | ||
// This should only take 2 milliseconds in ideal conditions, but allow 10 seconds in case of VM stalls | ||
assertTrue(latch.await(10, TimeUnit.SECONDS)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment
@@ -136,7 +136,6 @@ public void testRefreshTaskIsUpdated() throws IOException { | |||
assertNotSame(refreshTask, indexService.getRefreshTask()); | |||
assertTrue(refreshTask.isClosed()); | |||
assertFalse(refreshTask.isScheduled()); | |||
assertFalse(indexService.getRefreshTask().mustReschedule()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
k
PersistentTasksClusterService persistentTasksClusterService = | ||
internalCluster().getInstance(PersistentTasksClusterService.class, internalCluster().getMasterName()); | ||
// Speed up rechecks to a rate that is quicker than what settings would allow | ||
persistentTasksClusterService.setRecheckInterval(TimeValue.timeValueMillis(10)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will still make a slow test. How about calling the setTimeInterval method in PersistentTasksClusterService directly and settings it to something much faster?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it to 1ms here and the test runs in 85ms on my laptop now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @droberts195
run gradle build tests 1 |
Previously persistent task assignment was checked in the following situations: - Persistent tasks are changed - A node joins or leaves the cluster - The routing table is changed - Custom metadata in the cluster state is changed - A new master node is elected However, there could be situations when a persistent task that could not be assigned to a node could become assignable due to some other change, such as memory usage on the nodes. This change adds a timed recheck of persistent task assignment to account for such situations. The timer is suspended while checks triggered by cluster state changes are in-flight to avoid adding burden to an already busy cluster. Closes #35792
After #36069 the approach for reallocating ML persistent tasks after refreshing job memory requirements can be simplified.
Previously persistent task assignment was checked in the
following situations:
However, there could be situations when a persistent
task that could not be assigned to a node could become
assignable due to some other change, such as memory
usage on the nodes.
This change adds a timed recheck of persistent task
assignment to account for such situations. The timer
is suspended while checks triggered by cluster state
changes are in-flight to avoid adding burden to an
already busy cluster.
Closes #35792