Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lifecycle Restart #12939

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error {
DriverManager: ar.driverManager,
ServersContactedCh: ar.serversContactedCh,
StartConditionMetCtx: ar.taskHookCoordinator.startConditionForTask(task),
EndConditionMetCtx: ar.taskHookCoordinator.endConditionForTask(task),
ShutdownDelayCtx: ar.shutdownDelayCtx,
ServiceRegWrapper: ar.serviceRegWrapper,
}
Expand Down Expand Up @@ -1221,6 +1222,9 @@ func (ar *allocRunner) RestartAll(taskEvent *structs.TaskEvent) error {
}
}

// TODO: run before or after the taskRestartHooks?
ar.taskHookCoordinator.RestartTaskHookCoordinator()

return err.ErrorOrNil()
}

Expand Down
38 changes: 38 additions & 0 deletions client/allocrunner/task_hook_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)

// TODO: killed deployment while prestart was stuck in running state>>>
// [ERROR] client.alloc_runner.task_runner.task_hook.stats_hook:
// failed to start stats collection for task: alloc_id=24546e00-09c1-6582-d589-85b63b5b9548
// task=prestart error="task not found for given id"

// TODO: investigate when a task is marked as Dead
// - prestart will never get to Dead if it's being blocked
// - this blocks all tasks from being executed

// TaskHookCoordinator helps coordinate when mainTasks start tasks can launch
// namely after all Prestart Tasks have run, and after all BlockUntilCompleted have completed
type taskHookCoordinator struct {
Expand All @@ -27,6 +36,9 @@ type taskHookCoordinator struct {
poststopTaskCtx context.Context
poststopTaskCtxCancel context.CancelFunc

restartAllocCtx context.Context
restartAllocCtxCancel context.CancelFunc

prestartSidecar map[string]struct{}
prestartEphemeral map[string]struct{}
mainTasksRunning map[string]struct{} // poststop: main tasks running -> finished
Expand All @@ -40,6 +52,7 @@ func newTaskHookCoordinator(logger hclog.Logger, tasks []*structs.Task) *taskHoo
mainTaskCtx, mainCancelFn := context.WithCancel(context.Background())
poststartTaskCtx, poststartCancelFn := context.WithCancel(context.Background())
poststopTaskCtx, poststopTaskCancelFn := context.WithCancel(context.Background())
restartAllocCtx, restartAllocCancelFn := context.WithCancel(context.Background())

c := &taskHookCoordinator{
logger: logger,
Expand All @@ -54,6 +67,8 @@ func newTaskHookCoordinator(logger hclog.Logger, tasks []*structs.Task) *taskHoo
poststartTaskCtxCancel: poststartCancelFn,
poststopTaskCtx: poststopTaskCtx,
poststopTaskCtxCancel: poststopTaskCancelFn,
restartAllocCtx: restartAllocCtx,
restartAllocCtxCancel: restartAllocCancelFn,
}
c.setTasks(tasks)
return c
Expand Down Expand Up @@ -121,6 +136,13 @@ func (c *taskHookCoordinator) startConditionForTask(task *structs.Task) <-chan s
}
}

// Tasks are able to exit the taskrunner Run() loop when poststop tasks are ready to start
// TODO: in case of a restart, poststop tasks will not be ready to run
// - need to refresh the taskhookcoordinator so execution flow happens sequentially
func (c *taskHookCoordinator) endConditionForTask(task *structs.Task) <-chan struct{} {
return c.restartAllocCtx.Done()
}

// This is not thread safe! This must only be called from one thread per alloc runner.
func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskState) {
for task := range c.prestartSidecar {
Expand All @@ -141,6 +163,7 @@ func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskSt
delete(c.prestartEphemeral, task)
}

// This unblocks the PostStop tasks
for task := range c.mainTasksRunning {
st := states[task]

Expand All @@ -151,6 +174,7 @@ func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskSt
delete(c.mainTasksRunning, task)
}

// This unblocks the PostStart tasks
for task := range c.mainTasksPending {
st := states[task]
if st == nil || st.StartedAt.IsZero() {
Expand All @@ -160,16 +184,30 @@ func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskSt
delete(c.mainTasksPending, task)
}

// This unblocks the Main tasks
if !c.hasPrestartTasks() {
c.mainTaskCtxCancel()
}

// This unblocks the PostStart tasks
if !c.hasPendingMainTasks() {
c.poststartTaskCtxCancel()
}

// This unblocks the PostStop tasks
if !c.hasRunningMainTasks() {
c.poststopTaskCtxCancel()

// TODO: realizing now that the naming is bad,
// you can unblock the endCondition in case of
// regular execution or in case of a restart
c.restartAllocCtxCancel()
}

}

func (c *taskHookCoordinator) RestartTaskHookCoordinator() {
c.restartAllocCtxCancel()
}

func (c *taskHookCoordinator) StartPoststopTasks() {
Expand Down
3 changes: 3 additions & 0 deletions client/allocrunner/taskrunner/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package taskrunner

import (
"context"
"fmt"

"github.com/hashicorp/nomad/nomad/structs"
)
Expand All @@ -16,6 +17,8 @@ func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, fai

// Check it is running
if handle == nil {
//if handle == nil && !(tr.IsPrestartTask() || tr.IsPoststartTask()) {
fmt.Println("!!! task not running, not restarting")
return ErrTaskNotRunning
}

Expand Down
1 change: 1 addition & 0 deletions client/allocrunner/taskrunner/restarts/restarts.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func (r *RestartTracker) GetState() (string, time.Duration) {

// Hot path if a restart was triggered
if r.restartTriggered {
fmt.Println("!!! restart triggered")
r.reason = ""
return structs.TaskRestarting, 0
}
Expand Down
34 changes: 33 additions & 1 deletion client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,16 @@ type TaskRunner struct {
// GetClientAllocs has been called in case of a failed restore.
serversContactedCh <-chan struct{}

// startConditionMetCtx is done when TR should start the task
// startConditionMetCtx is done when TaskRunner should start the task
// within the allocation lifecycle. This will allow the task to proceed
// runtime execution.
startConditionMetCtx <-chan struct{}

// endConditionMetCtx blocks tasks from exiting the taskrunner Run loop
// until the whole allocation is ready to finish. This allows prestart ephemeral
// tasks to be restarted successfully
endConditionMetCtx <-chan struct{}

// waitOnServers defaults to false but will be set true if a restore
// fails and the Run method should wait until serversContactedCh is
// closed.
Expand Down Expand Up @@ -299,6 +306,10 @@ type Config struct {
// startConditionMetCtx is done when TR should start the task
StartConditionMetCtx <-chan struct{}

// EndConditionMetCtx is done when TR can let the task exit the run loop
// i.e. when the whole allocation is ready to finish
EndConditionMetCtx <-chan struct{}

// ShutdownDelayCtx is a context from the alloc runner which will
// tell us to exit early from shutdown_delay
ShutdownDelayCtx context.Context
Expand Down Expand Up @@ -364,6 +375,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
maxEvents: defaultMaxEvents,
serversContactedCh: config.ServersContactedCh,
startConditionMetCtx: config.StartConditionMetCtx,
endConditionMetCtx: config.EndConditionMetCtx,
shutdownDelayCtx: config.ShutdownDelayCtx,
shutdownDelayCancelFn: config.ShutdownDelayCancelFn,
serviceRegWrapper: config.ServiceRegWrapper,
Expand Down Expand Up @@ -530,6 +542,7 @@ func (tr *TaskRunner) Run() {
}

select {
// TODO: rename lifecycleStartConditionMetCtx or something
case <-tr.startConditionMetCtx:
tr.logger.Debug("lifecycle start condition has been met, proceeding")
// yay proceed
Expand Down Expand Up @@ -610,6 +623,21 @@ MAIN:
}
}

tr.UpdateState(structs.TaskStateComplete, nil)

// Block until the allocation is ready to finish
// TODO: this whole design will break prestart tasks
// - if we block prestart tasks from fully executing, they will always be in running state
// - if they never reach dead state, then main tasks will never start :<
select {
case <-tr.endConditionMetCtx:
tr.logger.Debug("lifecycle end condition has been met, proceeding")
// yay proceed
case <-tr.killCtx.Done():
case <-tr.shutdownCtx.Done():
return
}

// Clear the handle
tr.clearDriverHandle()

Expand Down Expand Up @@ -639,6 +667,7 @@ MAIN:
tr.logger.Trace("gracefully shutting down during restart delay")
return
}

}

// Ensure handle is cleaned up. Restore could have recovered a task
Expand All @@ -656,6 +685,9 @@ MAIN:
}
}

// TODO: prestart phase depends on all prestart tasks being dead
// - prestart tasks will never reach this code
// - how do we adjust the event
// Mark the task as dead
tr.UpdateState(structs.TaskStateDead, nil)

Expand Down
10 changes: 10 additions & 0 deletions client/allocrunner/taskrunner/task_runner_getters.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ func (tr *TaskRunner) IsLeader() bool {
return tr.taskLeader
}

// IsPrestartTask returns true if this task is a prestart task in its task group.
func (tr *TaskRunner) IsPrestartTask() bool {
return tr.Task().Lifecycle != nil && tr.Task().Lifecycle.Hook == structs.TaskLifecycleHookPrestart
}

// IsPostStartTask returns true if this task is a poststart task in its task group.
func (tr *TaskRunner) IsPoststartTask() bool {
return tr.Task().Lifecycle != nil && tr.Task().Lifecycle.Hook == structs.TaskLifecycleHookPoststart
}

// IsPoststopTask returns true if this task is a poststop task in its task group.
func (tr *TaskRunner) IsPoststopTask() bool {
return tr.Task().Lifecycle != nil && tr.Task().Lifecycle.Hook == structs.TaskLifecycleHookPoststop
Expand Down
9 changes: 5 additions & 4 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7836,9 +7836,10 @@ func (h *TaskHandle) Copy() *TaskHandle {

// Set of possible states for a task.
const (
TaskStatePending = "pending" // The task is waiting to be run.
TaskStateRunning = "running" // The task is currently running.
TaskStateDead = "dead" // Terminal state of task.
TaskStatePending = "pending" // The task is waiting to be run.
TaskStateRunning = "running" // The task is currently running.
TaskStateComplete = "complete" // The task is completed but not terminal
TaskStateDead = "dead" // Terminal state of task.
)

// TaskState tracks the current state of a task and events that caused state
Expand Down Expand Up @@ -7910,7 +7911,7 @@ func (ts *TaskState) Copy() *TaskState {
// for batch allocations or ephemeral (non-sidecar) lifecycle tasks part of a
// service or system allocation.
func (ts *TaskState) Successful() bool {
return ts.State == TaskStateDead && !ts.Failed
return (ts.State == TaskStateDead || ts.State == TaskStateComplete) && !ts.Failed
}

const (
Expand Down