From 2305064649a0a82b48dd8e5684a40472ef978cd7 Mon Sep 17 00:00:00 2001 From: Jasmine Dahilig Date: Thu, 11 Feb 2021 10:46:55 -0800 Subject: [PATCH 1/4] lifecycle: block prestart taskrunners from exiting until allocation ready to exit #9841 --- client/allocrunner/taskrunner/lifecycle.go | 4 +++- client/allocrunner/taskrunner/restarts/restarts.go | 1 + client/allocrunner/taskrunner/task_runner_getters.go | 5 +++++ 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/client/allocrunner/taskrunner/lifecycle.go b/client/allocrunner/taskrunner/lifecycle.go index b812156a846..aff27d6e1db 100644 --- a/client/allocrunner/taskrunner/lifecycle.go +++ b/client/allocrunner/taskrunner/lifecycle.go @@ -2,6 +2,7 @@ package taskrunner import ( "context" + "fmt" "github.com/hashicorp/nomad/nomad/structs" ) @@ -15,7 +16,8 @@ func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, fai handle := tr.getDriverHandle() // Check it is running - if handle == nil { + if handle == nil && !tr.IsPrestartTask() { + fmt.Println("!!! task not running, not restarting") return ErrTaskNotRunning } diff --git a/client/allocrunner/taskrunner/restarts/restarts.go b/client/allocrunner/taskrunner/restarts/restarts.go index a117e8d49d1..e572b151ca9 100644 --- a/client/allocrunner/taskrunner/restarts/restarts.go +++ b/client/allocrunner/taskrunner/restarts/restarts.go @@ -168,6 +168,7 @@ func (r *RestartTracker) GetState() (string, time.Duration) { // Hot path if a restart was triggered if r.restartTriggered { + fmt.Println("!!! restart triggered") r.reason = "" return structs.TaskRestarting, 0 } diff --git a/client/allocrunner/taskrunner/task_runner_getters.go b/client/allocrunner/taskrunner/task_runner_getters.go index fcf3189b7a1..594207b0881 100644 --- a/client/allocrunner/taskrunner/task_runner_getters.go +++ b/client/allocrunner/taskrunner/task_runner_getters.go @@ -28,6 +28,11 @@ func (tr *TaskRunner) IsLeader() bool { return tr.taskLeader } +// IsPrestartTask returns true if this task is a prestart task in its task group. +func (tr *TaskRunner) IsPrestartTask() bool { + return tr.Task().Lifecycle != nil && tr.Task().Lifecycle.Hook == structs.TaskLifecycleHookPrestart +} + // IsPoststopTask returns true if this task is a poststop task in its task group. func (tr *TaskRunner) IsPoststopTask() bool { return tr.Task().Lifecycle != nil && tr.Task().Lifecycle.Hook == structs.TaskLifecycleHookPoststop From 57d05e42e8272c3f8724510190efcf31e7b1960b Mon Sep 17 00:00:00 2001 From: Jasmine Dahilig Date: Mon, 18 Apr 2022 11:23:17 -0700 Subject: [PATCH 2/4] lifecycle: block prestart taskrunners from exiting until allocation ready to exit #9841 --- client/allocrunner/alloc_runner.go | 1 + client/allocrunner/task_hook_coordinator.go | 5 ++++ client/allocrunner/taskrunner/task_runner.go | 25 +++++++++++++++++++- 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 32b8d4e0246..09d08d0aa35 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -278,6 +278,7 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error { DriverManager: ar.driverManager, ServersContactedCh: ar.serversContactedCh, StartConditionMetCtx: ar.taskHookCoordinator.startConditionForTask(task), + EndConditionMetCtx: ar.taskHookCoordinator.endConditionForTask(task), ShutdownDelayCtx: ar.shutdownDelayCtx, ServiceRegWrapper: ar.serviceRegWrapper, } diff --git a/client/allocrunner/task_hook_coordinator.go b/client/allocrunner/task_hook_coordinator.go index 09f90d22d67..a779692b770 100644 --- a/client/allocrunner/task_hook_coordinator.go +++ b/client/allocrunner/task_hook_coordinator.go @@ -121,6 +121,11 @@ func (c *taskHookCoordinator) startConditionForTask(task *structs.Task) <-chan s } } +// Tasks are able to exit the taskrunner Run() loop when poststop tasks are ready to start +func (c *taskHookCoordinator) endConditionForTask(task *structs.Task) <-chan struct{} { + return c.poststopTaskCtx.Done() +} + // This is not thread safe! This must only be called from one thread per alloc runner. func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskState) { for task := range c.prestartSidecar { diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index b8c3b270c32..0addccd8876 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -228,9 +228,16 @@ type TaskRunner struct { // GetClientAllocs has been called in case of a failed restore. serversContactedCh <-chan struct{} - // startConditionMetCtx is done when TR should start the task + // startConditionMetCtx is done when TaskRunner should start the task + // within the allocation lifecycle. This will allow the task to proceed + // runtime execution. startConditionMetCtx <-chan struct{} + // endConditionMetCtx blocks tasks from exiting the taskrunner Run loop + // until the whole allocation is ready to finish. This allows prestart ephemeral + // tasks to be restarted successfully + endConditionMetCtx <-chan struct{} + // waitOnServers defaults to false but will be set true if a restore // fails and the Run method should wait until serversContactedCh is // closed. @@ -299,6 +306,10 @@ type Config struct { // startConditionMetCtx is done when TR should start the task StartConditionMetCtx <-chan struct{} + // EndConditionMetCtx is done when TR can let the task exit the run loop + // i.e. when the whole allocation is ready to finish + EndConditionMetCtx <-chan struct{} + // ShutdownDelayCtx is a context from the alloc runner which will // tell us to exit early from shutdown_delay ShutdownDelayCtx context.Context @@ -364,6 +375,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { maxEvents: defaultMaxEvents, serversContactedCh: config.ServersContactedCh, startConditionMetCtx: config.StartConditionMetCtx, + endConditionMetCtx: config.EndConditionMetCtx, shutdownDelayCtx: config.ShutdownDelayCtx, shutdownDelayCancelFn: config.ShutdownDelayCancelFn, serviceRegWrapper: config.ServiceRegWrapper, @@ -639,6 +651,7 @@ MAIN: tr.logger.Trace("gracefully shutting down during restart delay") return } + } // Ensure handle is cleaned up. Restore could have recovered a task @@ -664,6 +677,16 @@ MAIN: tr.logger.Error("stop failed", "error", err) } + // Block until the allocation is ready to finish + select { + case <-tr.endConditionMetCtx: + tr.logger.Debug("lifecycle end condition has been met, proceeding") + // yay proceed + case <-tr.killCtx.Done(): + case <-tr.shutdownCtx.Done(): + return + } + tr.logger.Debug("task run loop exiting") } From 6b9b47bf366a69f2368d59e67a7a8fe0e551cee1 Mon Sep 17 00:00:00 2001 From: Jasmine Dahilig Date: Thu, 2 Jun 2022 11:38:08 -0700 Subject: [PATCH 3/4] fixed restart for poststart but not prestart --- client/allocrunner/alloc_runner.go | 3 ++ client/allocrunner/task_hook_coordinator.go | 35 ++++++++++++++++++- client/allocrunner/taskrunner/lifecycle.go | 3 +- client/allocrunner/taskrunner/task_runner.go | 27 ++++++++------ .../taskrunner/task_runner_getters.go | 5 +++ 5 files changed, 61 insertions(+), 12 deletions(-) diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 09d08d0aa35..46bd073d813 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -1222,6 +1222,9 @@ func (ar *allocRunner) RestartAll(taskEvent *structs.TaskEvent) error { } } + // TODO: run before or after the taskRestartHooks? + ar.taskHookCoordinator.RestartTaskHookCoordinator() + return err.ErrorOrNil() } diff --git a/client/allocrunner/task_hook_coordinator.go b/client/allocrunner/task_hook_coordinator.go index a779692b770..ae3865c39fa 100644 --- a/client/allocrunner/task_hook_coordinator.go +++ b/client/allocrunner/task_hook_coordinator.go @@ -8,6 +8,15 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +// TODO: killed deployment while prestart was stuck in running state>>> +// [ERROR] client.alloc_runner.task_runner.task_hook.stats_hook: +// failed to start stats collection for task: alloc_id=24546e00-09c1-6582-d589-85b63b5b9548 +// task=prestart error="task not found for given id" + +// TODO: investigate when a task is marked as Dead +// - prestart will never get to Dead if it's being blocked +// - this blocks all tasks from being executed + // TaskHookCoordinator helps coordinate when mainTasks start tasks can launch // namely after all Prestart Tasks have run, and after all BlockUntilCompleted have completed type taskHookCoordinator struct { @@ -27,6 +36,9 @@ type taskHookCoordinator struct { poststopTaskCtx context.Context poststopTaskCtxCancel context.CancelFunc + restartAllocCtx context.Context + restartAllocCtxCancel context.CancelFunc + prestartSidecar map[string]struct{} prestartEphemeral map[string]struct{} mainTasksRunning map[string]struct{} // poststop: main tasks running -> finished @@ -40,6 +52,7 @@ func newTaskHookCoordinator(logger hclog.Logger, tasks []*structs.Task) *taskHoo mainTaskCtx, mainCancelFn := context.WithCancel(context.Background()) poststartTaskCtx, poststartCancelFn := context.WithCancel(context.Background()) poststopTaskCtx, poststopTaskCancelFn := context.WithCancel(context.Background()) + restartAllocCtx, restartAllocCancelFn := context.WithCancel(context.Background()) c := &taskHookCoordinator{ logger: logger, @@ -54,6 +67,8 @@ func newTaskHookCoordinator(logger hclog.Logger, tasks []*structs.Task) *taskHoo poststartTaskCtxCancel: poststartCancelFn, poststopTaskCtx: poststopTaskCtx, poststopTaskCtxCancel: poststopTaskCancelFn, + restartAllocCtx: restartAllocCtx, + restartAllocCtxCancel: restartAllocCancelFn, } c.setTasks(tasks) return c @@ -122,8 +137,10 @@ func (c *taskHookCoordinator) startConditionForTask(task *structs.Task) <-chan s } // Tasks are able to exit the taskrunner Run() loop when poststop tasks are ready to start +// TODO: in case of a restart, poststop tasks will not be ready to run +// - need to refresh the taskhookcoordinator so execution flow happens sequentially func (c *taskHookCoordinator) endConditionForTask(task *structs.Task) <-chan struct{} { - return c.poststopTaskCtx.Done() + return c.restartAllocCtx.Done() } // This is not thread safe! This must only be called from one thread per alloc runner. @@ -146,6 +163,7 @@ func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskSt delete(c.prestartEphemeral, task) } + // This unblocks the main tasks for task := range c.mainTasksRunning { st := states[task] @@ -156,6 +174,7 @@ func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskSt delete(c.mainTasksRunning, task) } + // This unblocks the PostStart tasks for task := range c.mainTasksPending { st := states[task] if st == nil || st.StartedAt.IsZero() { @@ -165,16 +184,30 @@ func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskSt delete(c.mainTasksPending, task) } + // This unblocks the Main tasks if !c.hasPrestartTasks() { c.mainTaskCtxCancel() } + // This unblocks the PostStart tasks if !c.hasPendingMainTasks() { c.poststartTaskCtxCancel() } + + // This unblocks the PostStop tasks if !c.hasRunningMainTasks() { c.poststopTaskCtxCancel() + + // TODO: realizing now that the naming is bad, + // you can unblock the endCondition in case of + // regular execution or in case of a restart + c.restartAllocCtxCancel() } + +} + +func (c *taskHookCoordinator) RestartTaskHookCoordinator() { + c.restartAllocCtxCancel() } func (c *taskHookCoordinator) StartPoststopTasks() { diff --git a/client/allocrunner/taskrunner/lifecycle.go b/client/allocrunner/taskrunner/lifecycle.go index aff27d6e1db..2120ec37601 100644 --- a/client/allocrunner/taskrunner/lifecycle.go +++ b/client/allocrunner/taskrunner/lifecycle.go @@ -16,7 +16,8 @@ func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, fai handle := tr.getDriverHandle() // Check it is running - if handle == nil && !tr.IsPrestartTask() { + if handle == nil { + //if handle == nil && !(tr.IsPrestartTask() || tr.IsPoststartTask()) { fmt.Println("!!! task not running, not restarting") return ErrTaskNotRunning } diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 0addccd8876..88119820297 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -542,6 +542,7 @@ func (tr *TaskRunner) Run() { } select { + // TODO: rename lifecycleStartConditionMetCtx or something case <-tr.startConditionMetCtx: tr.logger.Debug("lifecycle start condition has been met, proceeding") // yay proceed @@ -622,6 +623,19 @@ MAIN: } } + // Block until the allocation is ready to finish + // TODO: this whole design will break prestart tasks + // - if we block prestart tasks from fully executing, they will always be in running state + // - if they never reach dead state, then main tasks will never start :< + select { + case <-tr.endConditionMetCtx: + tr.logger.Debug("lifecycle end condition has been met, proceeding") + // yay proceed + case <-tr.killCtx.Done(): + case <-tr.shutdownCtx.Done(): + return + } + // Clear the handle tr.clearDriverHandle() @@ -669,6 +683,9 @@ MAIN: } } + // TODO: prestart phase depends on all prestart tasks being dead + // - prestart tasks will never reach this code + // - how do we adjust the event // Mark the task as dead tr.UpdateState(structs.TaskStateDead, nil) @@ -677,16 +694,6 @@ MAIN: tr.logger.Error("stop failed", "error", err) } - // Block until the allocation is ready to finish - select { - case <-tr.endConditionMetCtx: - tr.logger.Debug("lifecycle end condition has been met, proceeding") - // yay proceed - case <-tr.killCtx.Done(): - case <-tr.shutdownCtx.Done(): - return - } - tr.logger.Debug("task run loop exiting") } diff --git a/client/allocrunner/taskrunner/task_runner_getters.go b/client/allocrunner/taskrunner/task_runner_getters.go index 594207b0881..b6bbb56d461 100644 --- a/client/allocrunner/taskrunner/task_runner_getters.go +++ b/client/allocrunner/taskrunner/task_runner_getters.go @@ -33,6 +33,11 @@ func (tr *TaskRunner) IsPrestartTask() bool { return tr.Task().Lifecycle != nil && tr.Task().Lifecycle.Hook == structs.TaskLifecycleHookPrestart } +// IsPostStartTask returns true if this task is a poststart task in its task group. +func (tr *TaskRunner) IsPoststartTask() bool { + return tr.Task().Lifecycle != nil && tr.Task().Lifecycle.Hook == structs.TaskLifecycleHookPoststart +} + // IsPoststopTask returns true if this task is a poststop task in its task group. func (tr *TaskRunner) IsPoststopTask() bool { return tr.Task().Lifecycle != nil && tr.Task().Lifecycle.Hook == structs.TaskLifecycleHookPoststop From bf006e01b61ba63bbcb88208e7f739873cfa3e92 Mon Sep 17 00:00:00 2001 From: Jasmine Dahilig Date: Fri, 3 Jun 2022 08:35:47 -0700 Subject: [PATCH 4/4] fixed restart for lifecycle prestart with introduction of Completed task state --- client/allocrunner/task_hook_coordinator.go | 2 +- client/allocrunner/taskrunner/task_runner.go | 2 ++ nomad/structs/structs.go | 9 +++++---- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/client/allocrunner/task_hook_coordinator.go b/client/allocrunner/task_hook_coordinator.go index ae3865c39fa..62d6130f451 100644 --- a/client/allocrunner/task_hook_coordinator.go +++ b/client/allocrunner/task_hook_coordinator.go @@ -163,7 +163,7 @@ func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskSt delete(c.prestartEphemeral, task) } - // This unblocks the main tasks + // This unblocks the PostStop tasks for task := range c.mainTasksRunning { st := states[task] diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 88119820297..e914138c3ef 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -623,6 +623,8 @@ MAIN: } } + tr.UpdateState(structs.TaskStateComplete, nil) + // Block until the allocation is ready to finish // TODO: this whole design will break prestart tasks // - if we block prestart tasks from fully executing, they will always be in running state diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index ae8a87b5cc0..a8985394e13 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -7836,9 +7836,10 @@ func (h *TaskHandle) Copy() *TaskHandle { // Set of possible states for a task. const ( - TaskStatePending = "pending" // The task is waiting to be run. - TaskStateRunning = "running" // The task is currently running. - TaskStateDead = "dead" // Terminal state of task. + TaskStatePending = "pending" // The task is waiting to be run. + TaskStateRunning = "running" // The task is currently running. + TaskStateComplete = "complete" // The task is completed but not terminal + TaskStateDead = "dead" // Terminal state of task. ) // TaskState tracks the current state of a task and events that caused state @@ -7910,7 +7911,7 @@ func (ts *TaskState) Copy() *TaskState { // for batch allocations or ephemeral (non-sidecar) lifecycle tasks part of a // service or system allocation. func (ts *TaskState) Successful() bool { - return ts.State == TaskStateDead && !ts.Failed + return (ts.State == TaskStateDead || ts.State == TaskStateComplete) && !ts.Failed } const (