From 0b0fcdede516827d901aeb7ae58558328b6ffbb4 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Mon, 15 Aug 2022 19:44:55 -0400 Subject: [PATCH 01/14] allocrunner: handle lifecycle when all tasks die When all tasks die the Coordinator must transition to its terminal state, coordinatorStatePoststop, to unblock poststop tasks. Since this could happen at any time (for example, a prestart task dies), all states must be able to transition to this terminal state. --- .../allocrunner/tasklifecycle/coordinator.go | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/client/allocrunner/tasklifecycle/coordinator.go b/client/allocrunner/tasklifecycle/coordinator.go index 60b798ed504..25b602e5890 100644 --- a/client/allocrunner/tasklifecycle/coordinator.go +++ b/client/allocrunner/tasklifecycle/coordinator.go @@ -16,7 +16,7 @@ const ( coordinatorStatePrestart coordinatorStateMain coordinatorStatePoststart - coordinatorStateWaitMain + coordinatorStateWaitAlloc coordinatorStatePoststop ) @@ -30,8 +30,8 @@ func (s coordinatorState) String() string { return "main" case coordinatorStatePoststart: return "poststart" - case coordinatorStateWaitMain: - return "wait_main" + case coordinatorStateWaitAlloc: + return "wait_alloc" case coordinatorStatePoststop: return "poststart" } @@ -151,6 +151,13 @@ func (c *Coordinator) TaskStateUpdated(states map[string]*structs.TaskState) { // current internal state and the received states of the tasks. // The currentStateLock must be held before calling this method. func (c *Coordinator) nextStateLocked(states map[string]*structs.TaskState) coordinatorState { + + // coordinatorStatePoststop is the terminal state of the FSM, and can be + // reached at any time. + if c.isAllocDone(states) { + return coordinatorStatePoststop + } + switch c.currentState { case coordinatorStateInit: if !c.isInitDone(states) { @@ -174,11 +181,11 @@ func (c *Coordinator) nextStateLocked(states map[string]*structs.TaskState) coor if !c.isPoststartDone(states) { return coordinatorStatePoststart } - return coordinatorStateWaitMain + return coordinatorStateWaitAlloc - case coordinatorStateWaitMain: - if !c.isWaitMainDone(states) { - return coordinatorStateWaitMain + case coordinatorStateWaitAlloc: + if !c.isAllocDone(states) { + return coordinatorStateWaitAlloc } return coordinatorStatePoststop @@ -233,7 +240,7 @@ func (c *Coordinator) enterStateLocked(state coordinatorState) { c.allow(lifecycleStagePoststartEphemeral) c.allow(lifecycleStagePoststartSidecar) - case coordinatorStateWaitMain: + case coordinatorStateWaitAlloc: c.block(lifecycleStagePrestartEphemeral) c.block(lifecycleStagePoststartEphemeral) c.block(lifecycleStagePoststop) @@ -321,9 +328,9 @@ func (c *Coordinator) isPoststartDone(states map[string]*structs.TaskState) bool return true } -// isWaitMainDone returns true when the following conditions are met: -// - all tasks that are not poststop are in the "dead" state. -func (c *Coordinator) isWaitMainDone(states map[string]*structs.TaskState) bool { +// isAllocDone returns true when the following conditions are met: +// - all non-poststop tasks are in the "dead" state. +func (c *Coordinator) isAllocDone(states map[string]*structs.TaskState) bool { for lifecycle, tasks := range c.tasksByLifecycle { if lifecycle == lifecycleStagePoststop { continue From 7e3efaeb5d6393cdff75e464eca349aa9a62c11c Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Thu, 11 Aug 2022 17:23:58 -0400 Subject: [PATCH 02/14] allocrunner: implement different alloc restarts Add a new alloc restart mode where all tasks are restarted, even if they have already exited. Also unifies the alloc restart logic to use the implementation that restarts tasks concurrently and ignores ErrTaskNotRunning errors since those are expected when restarting the allocation. --- client/allocrunner/alloc_runner.go | 78 ++++++++++++------- .../allocrunner/tasklifecycle/coordinator.go | 9 +++ client/client.go | 1 + command/agent/consul/check_watcher.go | 2 +- 4 files changed, 59 insertions(+), 31 deletions(-) diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 33bc57a8f61..6c655f0eb7c 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -28,7 +28,6 @@ import ( cstate "github.com/hashicorp/nomad/client/state" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/vaultclient" - agentconsul "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/device" @@ -648,7 +647,7 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState { break } - // Kill the rest non-sidecar or poststop tasks concurrently + // Kill the rest non-sidecar and non-poststop tasks concurrently wg := sync.WaitGroup{} for name, tr := range ar.tasks { // Filter out poststop and sidecar tasks so that they stop after all the other tasks are killed @@ -1205,19 +1204,55 @@ func (ar *allocRunner) GetTaskEventHandler(taskName string) drivermanager.EventH return nil } -// RestartTask signalls the task runner for the provided task to restart. -func (ar *allocRunner) RestartTask(taskName string, taskEvent *structs.TaskEvent) error { +// Restart satisfies the WorkloadRestarter interface and restarts all tasks +// that are currently running. Only the TaskRestartRunningSignal event type may +// be used. +func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error { + if event.Type != structs.TaskRestartRunningSignal { + return fmt.Errorf("Invalid event %s for alloc restart request", event.Type) + } + return ar.restartTasks(ctx, event, failure) +} + +// RestartTask restarts the provided task. Only TaskRestartSignal event type +// may be used. +func (ar *allocRunner) RestartTask(taskName string, event *structs.TaskEvent) error { + if event.Type != structs.TaskRestartSignal { + return fmt.Errorf("Invalid event %s for task restart request", event.Type) + } + tr, ok := ar.tasks[taskName] if !ok { return fmt.Errorf("Could not find task runner for task: %s", taskName) } - return tr.Restart(context.TODO(), taskEvent, false) + return tr.Restart(context.TODO(), event, false) } -// Restart satisfies the WorkloadRestarter interface restarts all task runners -// concurrently -func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error { +// RestartRunning restarts all tasks that are currently running. Only the +// TaskRestartRunningSignal event type may be used. +func (ar *allocRunner) RestartRunning(event *structs.TaskEvent) error { + if event.Type != structs.TaskRestartRunningSignal { + return fmt.Errorf("Invalid event %s for running tasks restart request", event.Type) + } + return ar.restartTasks(context.TODO(), event, false) +} + +// RestartAll restarts all tasks in the allocation, including dead ones. They +// will restart following their lifecycle order. Only the TaskRestartAllSignal +// event type may be used. +func (ar *allocRunner) RestartAll(event *structs.TaskEvent) error { + if event.Type != structs.TaskRestartAllSignal { + return fmt.Errorf("Invalid event %s for all tasks restart request", event.Type) + } + + // Restart the taskCoordinator to allow dead tasks to run again. + ar.taskCoordinator.Restart() + return ar.restartTasks(context.TODO(), event, false) +} + +// restartTasks restarts all task runners concurrently. +func (ar *allocRunner) restartTasks(ctx context.Context, event *structs.TaskEvent, failure bool) error { waitCh := make(chan struct{}) var err *multierror.Error var errMutex sync.Mutex @@ -1230,10 +1265,12 @@ func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, fa defer close(waitCh) for tn, tr := range ar.tasks { wg.Add(1) - go func(taskName string, r agentconsul.WorkloadRestarter) { + go func(taskName string, taskRunner *taskrunner.TaskRunner) { defer wg.Done() - e := r.Restart(ctx, event, failure) - if e != nil { + e := taskRunner.Restart(ctx, event.Copy(), failure) + // Ignore ErrTaskNotRunning errors since tasks that are not + // running are expected to not be restarted. + if e != nil && e != taskrunner.ErrTaskNotRunning { errMutex.Lock() defer errMutex.Unlock() err = multierror.Append(err, fmt.Errorf("failed to restart task %s: %v", taskName, e)) @@ -1251,25 +1288,6 @@ func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, fa return err.ErrorOrNil() } -// RestartAll signalls all task runners in the allocation to restart and passes -// a copy of the task event to each restart event. -// Returns any errors in a concatenated form. -func (ar *allocRunner) RestartAll(taskEvent *structs.TaskEvent) error { - var err *multierror.Error - - // run alloc task restart hooks - ar.taskRestartHooks() - - for tn := range ar.tasks { - rerr := ar.RestartTask(tn, taskEvent.Copy()) - if rerr != nil { - err = multierror.Append(err, rerr) - } - } - - return err.ErrorOrNil() -} - // Signal sends a signal request to task runners inside an allocation. If the // taskName is empty, then it is sent to all tasks. func (ar *allocRunner) Signal(taskName, signal string) error { diff --git a/client/allocrunner/tasklifecycle/coordinator.go b/client/allocrunner/tasklifecycle/coordinator.go index 25b602e5890..e6cb78f191e 100644 --- a/client/allocrunner/tasklifecycle/coordinator.go +++ b/client/allocrunner/tasklifecycle/coordinator.go @@ -109,6 +109,15 @@ func NewCoordinator(logger hclog.Logger, tasks []*structs.Task, shutdownCh <-cha return c } +// Restart sets the Coordinator state back to "init" and is used to coordinate +// a full alloc restart. Since all tasks will run again they need to be pending +// before they are allowed to proceed. +func (c *Coordinator) Restart() { + c.currentStateLock.Lock() + defer c.currentStateLock.Unlock() + c.enterStateLocked(coordinatorStateInit) +} + // Restore is used to set the Coordinator FSM to the correct state when an // alloc is restored. Must be called before the allocrunner is running. func (c *Coordinator) Restore(states map[string]*structs.TaskState) { diff --git a/client/client.go b/client/client.go index cd4925b40bb..1b37fcaa8d2 100644 --- a/client/client.go +++ b/client/client.go @@ -160,6 +160,7 @@ type AllocRunner interface { PersistState() error RestartTask(taskName string, taskEvent *structs.TaskEvent) error + RestartRunning(taskEvent *structs.TaskEvent) error RestartAll(taskEvent *structs.TaskEvent) error Reconnect(update *structs.Allocation) error diff --git a/command/agent/consul/check_watcher.go b/command/agent/consul/check_watcher.go index ee2e18a287b..926a735d207 100644 --- a/command/agent/consul/check_watcher.go +++ b/command/agent/consul/check_watcher.go @@ -103,7 +103,7 @@ func (c *checkRestart) apply(ctx context.Context, now time.Time, status string) // Tell TaskRunner to restart due to failure reason := fmt.Sprintf("healthcheck: check %q unhealthy", c.checkName) - event := structs.NewTaskEvent(structs.TaskRestartSignal).SetRestartReason(reason) + event := structs.NewTaskEvent(structs.TaskRestartRunningSignal).SetRestartReason(reason) go asyncRestart(ctx, c.logger, c.task, event) return true } From d704f8417c749c39670f64984240c89b0654f4ef Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Mon, 15 Aug 2022 20:56:07 -0400 Subject: [PATCH 03/14] allocrunner: allow tasks to run again Prevent the task runner Run() method from exiting to allow a dead task to run again. When the task runner is signaled to restart, the function will jump back to the MAIN loop and run it again. The task runner determines if a task needs to run again based on two new task events that were added to differentiate between a request to restart a specific task, the tasks that are currently running, or all tasks that have already run. --- .../allocrunner/tasklifecycle/coordinator.go | 4 +- client/allocrunner/taskrunner/lifecycle.go | 47 +++++++++++--- client/allocrunner/taskrunner/task_runner.go | 61 ++++++++++++++----- nomad/structs/structs.go | 25 +++++++- 4 files changed, 112 insertions(+), 25 deletions(-) diff --git a/client/allocrunner/tasklifecycle/coordinator.go b/client/allocrunner/tasklifecycle/coordinator.go index e6cb78f191e..90a8e7fc097 100644 --- a/client/allocrunner/tasklifecycle/coordinator.go +++ b/client/allocrunner/tasklifecycle/coordinator.go @@ -284,7 +284,7 @@ func (c *Coordinator) isInitDone(states map[string]*structs.TaskState) bool { // isPrestartDone returns true when the following conditions are met: // - there is at least one prestart task -// - all ephemeral prestart tasks are in the "dead" state. +// - all ephemeral prestart tasks are successful. // - no ephemeral prestart task has failed. // - all prestart sidecar tasks are running. func (c *Coordinator) isPrestartDone(states map[string]*structs.TaskState) bool { @@ -293,7 +293,7 @@ func (c *Coordinator) isPrestartDone(states map[string]*structs.TaskState) bool } for _, task := range c.tasksByLifecycle[lifecycleStagePrestartEphemeral] { - if states[task].State != structs.TaskStateDead || states[task].Failed { + if !states[task].Successful() { return false } } diff --git a/client/allocrunner/taskrunner/lifecycle.go b/client/allocrunner/taskrunner/lifecycle.go index b812156a846..beb03bd34f6 100644 --- a/client/allocrunner/taskrunner/lifecycle.go +++ b/client/allocrunner/taskrunner/lifecycle.go @@ -9,25 +9,56 @@ import ( // Restart a task. Returns immediately if no task is running. Blocks until // existing task exits or passed-in context is canceled. func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error { - tr.logger.Trace("Restart requested", "failure", failure) + tr.logger.Trace("Restart requested", "failure", failure, "event", event.GoString()) - // Grab the handle - handle := tr.getDriverHandle() + // Check if the task is able to restart based on its state and the type of + // restart event that was triggered. + taskState := tr.TaskState() + if taskState == nil { + return ErrTaskNotRunning + } - // Check it is running - if handle == nil { + switch taskState.State { + case structs.TaskStatePending: + // Tasks that are "pending" are never allowed to restart. return ErrTaskNotRunning + case structs.TaskStateDead: + // Tasks that are "dead" are only allowed to restart when restarting + // all tasks in the alloc, otherwise the taskCoordinator will prevent + // it from running again. + if event.Type != structs.TaskRestartAllSignal { + return ErrTaskNotRunning + } } // Emit the event since it may take a long time to kill tr.EmitEvent(event) - // Run the pre-kill hooks prior to restarting the task - tr.preKill() - // Tell the restart tracker that a restart triggered the exit tr.restartTracker.SetRestartTriggered(failure) + // Signal a restart to unblock tasks that are in the "dead" state, but + // don't block since the channel is buffered. Only one signal is enough to + // notify the tr.Run() loop. + // The channel must be signaled after SetRestartTriggered is called so the + // tr.Run() loop runs again. + if taskState.State == structs.TaskStateDead { + select { + case tr.restartCh <- struct{}{}: + default: + } + } + + // Grab the handle to see if the task is still running and needs to be + // killed. + handle := tr.getDriverHandle() + if handle == nil { + return nil + } + + // Run the pre-kill hooks prior to restarting the task + tr.preKill() + // Grab a handle to the wait channel that will timeout with context cancelation // _before_ killing the task. waitCh, err := handle.WaitCh(ctx) diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index ecfb2e90503..44d7f13d3a9 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -62,6 +62,11 @@ const ( // updates have come in since the last one was handled, we only need to // handle the last one. triggerUpdateChCap = 1 + + // restartChCap is the capacity for the restartCh used for triggering task + // restarts. It should be exactly 1 as even if multiple restarts have come + // we only need to handle the last one. + restartChCap = 1 ) type TaskRunner struct { @@ -95,6 +100,9 @@ type TaskRunner struct { // stateDB is for persisting localState and taskState stateDB cstate.StateDB + // restartCh is used to signal that the task should restart. + restartCh chan struct{} + // shutdownCtx is used to exit the TaskRunner *without* affecting task state. shutdownCtx context.Context @@ -367,6 +375,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { shutdownCtx: trCtx, shutdownCtxCancel: trCancel, triggerUpdateCh: make(chan struct{}, triggerUpdateChCap), + restartCh: make(chan struct{}, restartChCap), waitCh: make(chan struct{}), csiManager: config.CSIManager, cpusetCgroupPathGetter: config.CpusetCgroupPathGetter, @@ -505,12 +514,14 @@ func (tr *TaskRunner) Run() { var result *drivers.ExitResult tr.stateLock.RLock() - dead := tr.state.State == structs.TaskStateDead + restoredDead := tr.state.State == structs.TaskStateDead tr.stateLock.RUnlock() - // if restoring a dead task, ensure that task is cleared and all post hooks - // are called without additional state updates - if dead { + // If restoring a dead task, ensure that task is cleared and all post hooks + // are called without additional state updates. + // If the alloc is not terminal we must proceed until the ALLOC_RESTART + // loop to allow the task to run again in case the alloc is restarted. + if restoredDead && tr.Alloc().TerminalStatus() { // do cleanup functions without emitting any additional events/work // to handle cases where we restored a dead task where client terminated // after task finished before completing post-run actions. @@ -544,27 +555,27 @@ func (tr *TaskRunner) Run() { // Set the initial task state. tr.stateUpdater.TaskStateUpdated() - select { - case <-tr.startConditionMetCh: - tr.logger.Debug("lifecycle start condition has been met, proceeding") - // yay proceed - case <-tr.killCtx.Done(): - case <-tr.shutdownCtx.Done(): - return - } - timer, stop := helper.NewSafeTimer(0) // timer duration calculated JIT defer stop() MAIN: for !tr.shouldShutdown() { + if restoredDead { + // Break early when restoring a dead task and reset the flag so the + // loop runs again if the task is restarted. + restoredDead = false + break + } + select { case <-tr.killCtx.Done(): break MAIN case <-tr.shutdownCtx.Done(): // TaskRunner was told to exit immediately return - default: + case <-tr.startConditionMetCh: + tr.logger.Debug("lifecycle start condition has been met, proceeding") + // yay proceed } // Run the prestart hooks @@ -674,6 +685,28 @@ MAIN: // Mark the task as dead tr.UpdateState(structs.TaskStateDead, nil) + // Wait here in case the allocation is restarted. Poststop tasks will never + // run again so, skip them to avoid blocking forever. + if !tr.Task().IsPoststop() { + ALLOC_RESTART: + // Run in a loop to handle cases where restartCh is triggered but the + // task runner doesn't need to restart. + for { + select { + case <-tr.killCtx.Done(): + break ALLOC_RESTART + case <-tr.shutdownCtx.Done(): + return + case <-tr.restartCh: + // Restart without delay since the task is not running anymore. + restart, _ := tr.shouldRestart() + if restart { + goto MAIN + } + } + } + } + // Run the stop hooks if err := tr.stop(); err != nil { tr.logger.Error("stop failed", "error", err) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index e23b6cef9ea..2f58b342e43 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -8017,10 +8017,18 @@ const ( // restarted because it has exceeded its restart policy. TaskNotRestarting = "Not Restarting" - // TaskRestartSignal indicates that the task has been signalled to be + // TaskRestartSignal indicates that the task has been signaled to be // restarted TaskRestartSignal = "Restart Signaled" + // TaskRestartRunningSignal indicates that all tasks in the allocation that + // are currently running have been signaled to be restarted. + TaskRestartRunningSignal = "Restart Running Signaled" + + // TaskRestartAllSignal indicates that all tasks in the allocation have + // been signaled to be restarted, even the ones that have already run. + TaskRestartAllSignal = "Restart All Signaled" + // TaskSignaling indicates that the task is being signalled. TaskSignaling = "Signaling" @@ -8278,6 +8286,18 @@ func (e *TaskEvent) PopulateEventDisplayMessage() { } else { desc = "Task signaled to restart" } + case TaskRestartRunningSignal: + if e.RestartReason != "" { + desc = e.RestartReason + } else { + desc = "Running tasks signaled to restart" + } + case TaskRestartAllSignal: + if e.RestartReason != "" { + desc = e.RestartReason + } else { + desc = "All tasks signaled to restart" + } case TaskDriverMessage: desc = e.DriverMessage case TaskLeaderDead: @@ -8294,6 +8314,9 @@ func (e *TaskEvent) PopulateEventDisplayMessage() { } func (e *TaskEvent) GoString() string { + if e == nil { + return "" + } return fmt.Sprintf("%v - %v", e.Time, e.Type) } From be625293c942de6c75da99d4e7dde003c0bb0381 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Mon, 15 Aug 2022 22:07:26 -0400 Subject: [PATCH 04/14] api/cli: add support for all tasks alloc restart Implement the new -all-tasks alloc restart CLI flag and its API counterpar, AllTasks. The client endpoint calls the appropriate restart method from the allocrunner depending on the restart parameters used. --- api/allocations.go | 11 +++++ client/alloc_endpoint.go | 2 +- client/client.go | 21 +++++++-- command/agent/alloc_endpoint.go | 4 ++ command/alloc_restart.go | 46 ++++++++++++++----- nomad/structs/structs.go | 1 + website/content/api-docs/allocations.mdx | 7 +++ .../content/docs/commands/alloc/restart.mdx | 16 +++++-- 8 files changed, 86 insertions(+), 22 deletions(-) diff --git a/api/allocations.go b/api/allocations.go index f1fce2c6d23..741ee1fde39 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -156,6 +156,16 @@ func (a *Allocations) Restart(alloc *Allocation, taskName string, q *QueryOption return err } +func (a *Allocations) RestartAllTasks(alloc *Allocation, q *QueryOptions) error { + req := AllocationRestartRequest{ + AllTasks: true, + } + + var resp struct{} + _, err := a.client.putQuery("/v1/client/allocation/"+alloc.ID+"/restart", &req, &resp, q) + return err +} + // Stop stops an allocation. // // Note: for cluster topologies where API consumers don't have network access to @@ -447,6 +457,7 @@ func (a Allocation) RescheduleInfo(t time.Time) (int, int) { type AllocationRestartRequest struct { TaskName string + AllTasks bool } type AllocSignalRequest struct { diff --git a/client/alloc_endpoint.go b/client/alloc_endpoint.go index 2be7dfb41bb..52ab8f414e6 100644 --- a/client/alloc_endpoint.go +++ b/client/alloc_endpoint.go @@ -102,7 +102,7 @@ func (a *Allocations) Restart(args *nstructs.AllocRestartRequest, reply *nstruct return nstructs.ErrPermissionDenied } - return a.c.RestartAllocation(args.AllocID, args.TaskName) + return a.c.RestartAllocation(args.AllocID, args.TaskName, args.AllTasks) } // Stats is used to collect allocation statistics diff --git a/client/client.go b/client/client.go index 1b37fcaa8d2..4932c858ed3 100644 --- a/client/client.go +++ b/client/client.go @@ -926,20 +926,31 @@ func (c *Client) CollectAllAllocs() { c.garbageCollector.CollectAll() } -func (c *Client) RestartAllocation(allocID, taskName string) error { +func (c *Client) RestartAllocation(allocID, taskName string, allTasks bool) error { + if allTasks && taskName != "" { + return fmt.Errorf("task name cannot be set when restarting all tasks") + } + ar, err := c.getAllocRunner(allocID) if err != nil { return err } - event := structs.NewTaskEvent(structs.TaskRestartSignal). - SetRestartReason("User requested restart") - if taskName != "" { + event := structs.NewTaskEvent(structs.TaskRestartSignal). + SetRestartReason("User requested task to restart") return ar.RestartTask(taskName, event) } - return ar.RestartAll(event) + if allTasks { + event := structs.NewTaskEvent(structs.TaskRestartAllSignal). + SetRestartReason("User requested all tasks to restart") + return ar.RestartAll(event) + } + + event := structs.NewTaskEvent(structs.TaskRestartRunningSignal). + SetRestartReason("User requested running tasks to restart") + return ar.RestartRunning(event) } // Node returns the locally registered node diff --git a/command/agent/alloc_endpoint.go b/command/agent/alloc_endpoint.go index e7e1e1d092b..c043fe0240e 100644 --- a/command/agent/alloc_endpoint.go +++ b/command/agent/alloc_endpoint.go @@ -283,6 +283,7 @@ func (s *HTTPServer) allocRestart(allocID string, resp http.ResponseWriter, req // Explicitly parse the body separately to disallow overriding AllocID in req Body. var reqBody struct { TaskName string + AllTasks bool } err := json.NewDecoder(req.Body).Decode(&reqBody) if err != nil && err != io.EOF { @@ -291,6 +292,9 @@ func (s *HTTPServer) allocRestart(allocID string, resp http.ResponseWriter, req if reqBody.TaskName != "" { args.TaskName = reqBody.TaskName } + if reqBody.AllTasks { + args.AllTasks = reqBody.AllTasks + } // Determine the handler to use useLocalClient, useClientRPC, useServerRPC := s.rpcHandlerForAlloc(allocID) diff --git a/command/alloc_restart.go b/command/alloc_restart.go index 9c5dcb3be9c..909e5666ea7 100644 --- a/command/alloc_restart.go +++ b/command/alloc_restart.go @@ -18,8 +18,11 @@ func (c *AllocRestartCommand) Help() string { Usage: nomad alloc restart [options] Restart an existing allocation. This command is used to restart a specific alloc - and its tasks. If no task is provided then all of the allocation's tasks will - be restarted. + and its tasks. If no task is provided then all of the allocation's tasks that + are currently running will be restarted. + + Use the option '-all-tasks' to restart tasks that have already run, such as + non-sidecar prestart and poststart tasks. When ACLs are enabled, this command requires a token with the 'alloc-lifecycle', 'read-job', and 'list-jobs' capabilities for the @@ -31,9 +34,14 @@ General Options: Restart Specific Options: + -all-tasks + If set, all tasks in the allocation will be restarted, even the ones that + already ran. This option cannot be used if a task is defined. + -task - Specify the individual task to restart. If task name is given with both an + Specify the individual task to restart. If task name is given with both an argument and the '-task' option, preference is given to the '-task' option. + This option cannot be used with '-all-tasks'. -verbose Show full information. @@ -44,11 +52,12 @@ Restart Specific Options: func (c *AllocRestartCommand) Name() string { return "alloc restart" } func (c *AllocRestartCommand) Run(args []string) int { - var verbose bool + var allTasks, verbose bool var task string flags := c.Meta.FlagSet(c.Name(), FlagSetClient) flags.Usage = func() { c.Ui.Output(c.Help()) } + flags.BoolVar(&allTasks, "all-tasks", false, "") flags.BoolVar(&verbose, "verbose", false, "") flags.StringVar(&task, "task", "", "") @@ -66,6 +75,17 @@ func (c *AllocRestartCommand) Run(args []string) int { allocID := args[0] + // If -task isn't provided fallback to reading the task name + // from args. + if task == "" && len(args) >= 2 { + task = args[1] + } + + if allTasks && task != "" { + c.Ui.Error("The -all-tasks option is not allowed when restarting a specific task.") + return 1 + } + // Truncate the id unless full length is requested length := shortId if verbose { @@ -113,12 +133,6 @@ func (c *AllocRestartCommand) Run(args []string) int { return 1 } - // If -task isn't provided fallback to reading the task name - // from args. - if task == "" && len(args) >= 2 { - task = args[1] - } - if task != "" { err := validateTaskExistsInAllocation(task, alloc) if err != nil { @@ -127,9 +141,17 @@ func (c *AllocRestartCommand) Run(args []string) int { } } - err = client.Allocations().Restart(alloc, task, nil) + if allTasks { + err = client.Allocations().RestartAllTasks(alloc, nil) + } else { + err = client.Allocations().Restart(alloc, task, nil) + } if err != nil { - c.Ui.Error(fmt.Sprintf("Failed to restart allocation:\n\n%s", err.Error())) + target := "allocation" + if task != "" { + target = "task" + } + c.Ui.Error(fmt.Sprintf("Failed to restart %s:\n\n%s", target, err.Error())) return 1 } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 2f58b342e43..e61e0134073 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1028,6 +1028,7 @@ type AllocsGetRequest struct { type AllocRestartRequest struct { AllocID string TaskName string + AllTasks bool QueryOptions } diff --git a/website/content/api-docs/allocations.mdx b/website/content/api-docs/allocations.mdx index 2afe3322f22..128dbf6609a 100644 --- a/website/content/api-docs/allocations.mdx +++ b/website/content/api-docs/allocations.mdx @@ -731,6 +731,13 @@ The table below shows this endpoint's support for must be the full UUID, not the short 8-character one. This is specified as part of the path. +- `TaskName` `(string: "")` - Specifies the individual task to restart. Cannot + be used with `AllTasks` set to `true`. + +- `AllTasks` `(bool: false)` - If set to `true` all tasks in the allocation + will be restarted, even the ones that already ran. Cannot be set to `true` if + `TaskName` is defined. + ### Sample Payload ```json diff --git a/website/content/docs/commands/alloc/restart.mdx b/website/content/docs/commands/alloc/restart.mdx index a07820567c9..ece6fde933f 100644 --- a/website/content/docs/commands/alloc/restart.mdx +++ b/website/content/docs/commands/alloc/restart.mdx @@ -18,12 +18,16 @@ nomad alloc restart [options] This command accepts a single allocation ID and a task name. The task name must be part of the allocation and the task must be currently running. The task name -is optional and if omitted every task in the allocation will be restarted. +is optional and if omitted all tasks that are currently running will be +restarted. -Task name may also be specified using the `-task` option rather than a command -argument. If task name is given with both an argument and the `-task` option, +Task name may also be specified using the `-task` option rather than a command +argument. If task name is given with both an argument and the `-task` option, preference is given to the `-task` option. +Use the option `-all-tasks` to restart tasks that have already run, such as +non-sidecar prestart and poststart tasks. + When ACLs are enabled, this command requires a token with the `alloc-lifecycle`, `read-job`, and `list-jobs` capabilities for the allocation's namespace. @@ -34,7 +38,11 @@ allocation's namespace. ## Restart Options -- `-task`: Specify the individual task to restart. +- `-all-tasks`: If set, all tasks in the allocation will be restarted, even the + ones that already ran. This option cannot be used if a task is defined. + +- `-task`: Specify the individual task to restart. This option cannot be used + with `-all-tasks`. - `-verbose`: Display verbose output. From e9a4035c6e61ee61468f9923f4f1d0146fe3ba3e Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Tue, 16 Aug 2022 13:41:50 -0400 Subject: [PATCH 05/14] test: fix tasklifecycle Coordinator test --- .../allocrunner/tasklifecycle/coordinator_test.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/client/allocrunner/tasklifecycle/coordinator_test.go b/client/allocrunner/tasklifecycle/coordinator_test.go index 17e42e96e41..3f86dcc99f3 100644 --- a/client/allocrunner/tasklifecycle/coordinator_test.go +++ b/client/allocrunner/tasklifecycle/coordinator_test.go @@ -58,6 +58,9 @@ func TestCoordinator_PrestartRunsBeforeMain(t *testing.T) { sideTask := tasks[1] initTask := tasks[2] + // Only use the tasks that we care about. + tasks = []*structs.Task{mainTask, sideTask, initTask} + shutdownCh := make(chan struct{}) defer close(shutdownCh) coord := NewCoordinator(logger, tasks, shutdownCh) @@ -161,6 +164,9 @@ func TestCoordinator_MainRunsAfterManyInitTasks(t *testing.T) { init1Task := tasks[1] init2Task := tasks[2] + // Only use the tasks that we care about. + tasks = []*structs.Task{mainTask, init1Task, init2Task} + shutdownCh := make(chan struct{}) defer close(shutdownCh) coord := NewCoordinator(logger, tasks, shutdownCh) @@ -227,6 +233,9 @@ func TestCoordinator_FailedInitTask(t *testing.T) { init1Task := tasks[1] init2Task := tasks[2] + // Only use the tasks that we care about. + tasks = []*structs.Task{mainTask, init1Task, init2Task} + shutdownCh := make(chan struct{}) defer close(shutdownCh) coord := NewCoordinator(logger, tasks, shutdownCh) @@ -292,6 +301,9 @@ func TestCoordinator_SidecarNeverStarts(t *testing.T) { sideTask := tasks[1] initTask := tasks[2] + // Only use the tasks that we care about. + tasks = []*structs.Task{mainTask, sideTask, initTask} + shutdownCh := make(chan struct{}) defer close(shutdownCh) coord := NewCoordinator(logger, tasks, shutdownCh) @@ -356,6 +368,9 @@ func TestCoordinator_PoststartStartsAfterMain(t *testing.T) { sideTask := tasks[1] postTask := tasks[2] + // Only use the tasks that we care about. + tasks = []*structs.Task{mainTask, sideTask, postTask} + // Make the the third task is a poststart hook postTask.Lifecycle.Hook = structs.TaskLifecycleHookPoststart From 34be19707fd03382e3ae3c929a72ec9e4275004a Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Tue, 16 Aug 2022 14:12:41 -0400 Subject: [PATCH 06/14] allocrunner: kill taskrunners if all tasks are dead When all non-poststop tasks are dead we need to kill the taskrunners so we don't leak their goroutines, which are blocked in the alloc restart loop. This also ensures the allocrunner exits on its own. --- client/allocrunner/alloc_runner.go | 77 ++++++++++++++-------- client/allocrunner/taskrunner/lifecycle.go | 7 +- 2 files changed, 55 insertions(+), 29 deletions(-) diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 6c655f0eb7c..96ea74524e4 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -546,40 +546,63 @@ func (ar *allocRunner) handleTaskStateUpdates() { } } - // if all live runners are sidecars - kill alloc - if killEvent == nil && hasSidecars && !hasNonSidecarTasks(liveRunners) { - killEvent = structs.NewTaskEvent(structs.TaskMainDead) - } - - // If there's a kill event set and live runners, kill them - if killEvent != nil && len(liveRunners) > 0 { - - // Log kill reason - switch killEvent.Type { - case structs.TaskLeaderDead: - ar.logger.Debug("leader task dead, destroying all tasks", "leader_task", killTask) - case structs.TaskMainDead: - ar.logger.Debug("main tasks dead, destroying all sidecar tasks") - default: - ar.logger.Debug("task failure, destroying all tasks", "failed_task", killTask) + if len(liveRunners) > 0 { + // if all live runners are sidecars - kill alloc + if killEvent == nil && hasSidecars && !hasNonSidecarTasks(liveRunners) { + killEvent = structs.NewTaskEvent(structs.TaskMainDead) } - // Emit kill event for live runners - for _, tr := range liveRunners { - tr.EmitEvent(killEvent) - } + // If there's a kill event set and live runners, kill them + if killEvent != nil { + + // Log kill reason + switch killEvent.Type { + case structs.TaskLeaderDead: + ar.logger.Debug("leader task dead, destroying all tasks", "leader_task", killTask) + case structs.TaskMainDead: + ar.logger.Debug("main tasks dead, destroying all sidecar tasks") + default: + ar.logger.Debug("task failure, destroying all tasks", "failed_task", killTask) + } + + // Emit kill event for live runners + for _, tr := range liveRunners { + tr.EmitEvent(killEvent) + } - // Kill 'em all - states = ar.killTasks() + // Kill 'em all + states = ar.killTasks() + + // Wait for TaskRunners to exit before continuing to + // prevent looping before TaskRunners have transitioned + // to Dead. + for _, tr := range liveRunners { + ar.logger.Info("killing task", "task", tr.Task().Name) + select { + case <-tr.WaitCh(): + case <-ar.waitCh: + } + } + } + } else { + // If there are no live runners left kill all non-poststop task + // runners to unblock them from the alloc restart loop. + for _, tr := range ar.tasks { + if tr.IsPoststopTask() { + continue + } - // Wait for TaskRunners to exit before continuing to - // prevent looping before TaskRunners have transitioned - // to Dead. - for _, tr := range liveRunners { - ar.logger.Info("killing task", "task", tr.Task().Name) select { case <-tr.WaitCh(): case <-ar.waitCh: + default: + // Kill task runner without setting an event because the + // task is already dead, it's just waiting in the alloc + // restart loop. + err := tr.Kill(context.TODO(), nil) + if err != nil { + ar.logger.Warn("failed to kill task", "task", tr.Task().Name, "error", err) + } } } } diff --git a/client/allocrunner/taskrunner/lifecycle.go b/client/allocrunner/taskrunner/lifecycle.go index beb03bd34f6..66577d6035b 100644 --- a/client/allocrunner/taskrunner/lifecycle.go +++ b/client/allocrunner/taskrunner/lifecycle.go @@ -100,14 +100,17 @@ func (tr *TaskRunner) Signal(event *structs.TaskEvent, s string) error { // Kill a task. Blocks until task exits or context is canceled. State is set to // dead. func (tr *TaskRunner) Kill(ctx context.Context, event *structs.TaskEvent) error { - tr.logger.Trace("Kill requested", "event_type", event.Type, "event_reason", event.KillReason) + tr.logger.Trace("Kill requested") // Cancel the task runner to break out of restart delay or the main run // loop. tr.killCtxCancel() // Emit kill event - tr.EmitEvent(event) + if event != nil { + tr.logger.Trace("Kill event", "event_type", event.Type, "event_reason", event.KillReason) + tr.EmitEvent(event) + } select { case <-tr.WaitCh(): From 275819a04c9289d3cdd38cb3af5e72c5f399eb4d Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Tue, 16 Aug 2022 21:38:51 -0400 Subject: [PATCH 07/14] taskrunner: fix tests that waited on WaitCh Now that "dead" tasks may run again, the taskrunner Run() method will not return when the task finishes running, so tests must wait for the task state to be "dead" instead of using the WaitCh, since it won't be closed until the taskrunner is killed. --- client/allocrunner/alloc_runner_test.go | 12 +- .../allocrunner/taskrunner/sids_hook_test.go | 7 +- .../taskrunner/task_runner_test.go | 138 ++++++++---------- 3 files changed, 70 insertions(+), 87 deletions(-) diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index 0494c938a7d..637c58f0e20 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -1213,7 +1213,11 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) { ar.Run() defer destroy(ar) - require.Equal(t, structs.AllocClientStatusComplete, ar.AllocState().ClientStatus) + testutil.WaitForResult(func() (bool, error) { + return ar.AllocState().ClientStatus == structs.AllocClientStatusComplete, nil + }, func(_ error) { + t.Fatalf("expected alloc to be complete") + }) // Step 2. Modify its directory task := alloc.Job.TaskGroups[0].Tasks[0] @@ -1241,7 +1245,11 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) { ar2.Run() defer destroy(ar2) - require.Equal(t, structs.AllocClientStatusComplete, ar2.AllocState().ClientStatus) + testutil.WaitForResult(func() (bool, error) { + return ar.AllocState().ClientStatus == structs.AllocClientStatusComplete, nil + }, func(_ error) { + t.Fatalf("expected alloc to be complete") + }) // Ensure that data from ar was moved to ar2 dataFile = filepath.Join(ar2.allocDir.SharedDir, "data", "data_file") diff --git a/client/allocrunner/taskrunner/sids_hook_test.go b/client/allocrunner/taskrunner/sids_hook_test.go index d5951ed017e..c8f7657417c 100644 --- a/client/allocrunner/taskrunner/sids_hook_test.go +++ b/client/allocrunner/taskrunner/sids_hook_test.go @@ -22,7 +22,6 @@ import ( "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" "golang.org/x/sys/unix" ) @@ -297,11 +296,7 @@ func TestTaskRunner_DeriveSIToken_UnWritableTokenFile(t *testing.T) { go tr.Run() // wait for task runner to finish running - select { - case <-tr.WaitCh(): - case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): - r.Fail("timed out waiting for task runner") - } + testWaitForTaskToDie(t, tr) // assert task exited un-successfully finalState := tr.TaskState() diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index 85de7b2eb99..235f1c7ee6e 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -335,7 +335,7 @@ func TestTaskRunner_Restore_Running(t *testing.T) { defer newTR.Kill(context.Background(), structs.NewTaskEvent("cleanup")) // Wait for new task runner to exit when the process does - <-newTR.WaitCh() + testWaitForTaskToDie(t, newTR) // Assert that the process was only started once started := 0 @@ -603,11 +603,7 @@ func TestTaskRunner_TaskEnv_Interpolated(t *testing.T) { defer cleanup() // Wait for task to complete - select { - case <-tr.WaitCh(): - case <-time.After(3 * time.Second): - require.Fail("timeout waiting for task to exit") - } + testWaitForTaskToDie(t, tr) // Get the mock driver plugin driverPlugin, err := conf.DriverManager.Dispense(mockdriver.PluginID.Name) @@ -654,7 +650,9 @@ func TestTaskRunner_TaskEnv_Chroot(t *testing.T) { go tr.Run() defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) - // Wait for task to exit + // Wait for task to exit and kill the task runner to run the stop hooks. + testWaitForTaskToDie(t, tr) + tr.Kill(context.Background(), structs.NewTaskEvent("kill")) timeout := 15 * time.Second if testutil.IsCI() { timeout = 120 * time.Second @@ -703,7 +701,9 @@ func TestTaskRunner_TaskEnv_Image(t *testing.T) { tr, conf, cleanup := runTestTaskRunner(t, alloc, task.Name) defer cleanup() - // Wait for task to exit + // Wait for task to exit and kill task runner to run the stop hooks. + testWaitForTaskToDie(t, tr) + tr.Kill(context.Background(), structs.NewTaskEvent("kill")) select { case <-tr.WaitCh(): case <-time.After(15 * time.Second): @@ -750,7 +750,9 @@ func TestTaskRunner_TaskEnv_None(t *testing.T) { %s `, root, taskDir, taskDir, os.Getenv("PATH")) - // Wait for task to exit + // Wait for task to exit and kill the task runner to run the stop hooks. + testWaitForTaskToDie(t, tr) + tr.Kill(context.Background(), structs.NewTaskEvent("kill")) select { case <-tr.WaitCh(): case <-time.After(15 * time.Second): @@ -818,10 +820,7 @@ func TestTaskRunner_DevicePropogation(t *testing.T) { defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) // Wait for task to complete - select { - case <-tr.WaitCh(): - case <-time.After(3 * time.Second): - } + testWaitForTaskToDie(t, tr) // Get the mock driver plugin driverPlugin, err := conf.DriverManager.Dispense(mockdriver.PluginID.Name) @@ -1306,15 +1305,15 @@ func TestTaskRunner_CheckWatcher_Restart(t *testing.T) { "Received", "Task Setup", "Started", - "Restart Signaled", + "Restart Running Signaled", "Terminated", "Restarting", "Started", - "Restart Signaled", + "Restart Running Signaled", "Terminated", "Restarting", "Started", - "Restart Signaled", + "Restart Running Signaled", "Terminated", "Not Restarting", } @@ -1328,11 +1327,7 @@ func TestTaskRunner_CheckWatcher_Restart(t *testing.T) { // Wait until the task exits. Don't simply wait for it to run as it may // get restarted and terminated before the test is able to observe it // running. - select { - case <-tr.WaitCh(): - case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): - require.Fail(t, "timeout") - } + testWaitForTaskToDie(t, tr) state := tr.TaskState() actualEvents := make([]string, len(state.Events)) @@ -1421,11 +1416,7 @@ func TestTaskRunner_BlockForSIDSToken(t *testing.T) { // task runner should exit now that it has been unblocked and it is a batch // job with a zero sleep time - select { - case <-tr.WaitCh(): - case <-time.After(15 * time.Second * time.Duration(testutil.TestMultiplier())): - r.Fail("timed out waiting for batch task to exist") - } + testWaitForTaskToDie(t, tr) // assert task exited successfully finalState := tr.TaskState() @@ -1478,11 +1469,7 @@ func TestTaskRunner_DeriveSIToken_Retry(t *testing.T) { go tr.Run() // assert task runner blocks on SI token - select { - case <-tr.WaitCh(): - case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): - r.Fail("timed out waiting for task runner") - } + testWaitForTaskToDie(t, tr) // assert task exited successfully finalState := tr.TaskState() @@ -1598,11 +1585,7 @@ func TestTaskRunner_BlockForVaultToken(t *testing.T) { // TR should exit now that it's unblocked by vault as its a batch job // with 0 sleeping. - select { - case <-tr.WaitCh(): - case <-time.After(15 * time.Second * time.Duration(testutil.TestMultiplier())): - require.Fail(t, "timed out waiting for batch task to exit") - } + testWaitForTaskToDie(t, tr) // Assert task exited successfully finalState := tr.TaskState() @@ -1615,6 +1598,14 @@ func TestTaskRunner_BlockForVaultToken(t *testing.T) { require.NoError(t, err) require.Equal(t, token, string(data)) + // Kill task runner to trigger stop hooks + tr.Kill(context.Background(), structs.NewTaskEvent("kill")) + select { + case <-tr.WaitCh(): + case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): + require.Fail(t, "timed out waiting for task runner to exit") + } + // Check the token was revoked testutil.WaitForResult(func() (bool, error) { if len(vaultClient.StoppedTokens()) != 1 { @@ -1661,17 +1652,21 @@ func TestTaskRunner_DeriveToken_Retry(t *testing.T) { defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) go tr.Run() - // Wait for TR to exit and check its state + // Wait for TR to die and check its state + testWaitForTaskToDie(t, tr) + + state := tr.TaskState() + require.Equal(t, structs.TaskStateDead, state.State) + require.False(t, state.Failed) + + // Kill task runner to trigger stop hooks + tr.Kill(context.Background(), structs.NewTaskEvent("kill")) select { case <-tr.WaitCh(): case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): require.Fail(t, "timed out waiting for task runner to exit") } - state := tr.TaskState() - require.Equal(t, structs.TaskStateDead, state.State) - require.False(t, state.Failed) - require.Equal(t, 1, count) // Check that the token is on disk @@ -1771,11 +1766,7 @@ func TestTaskRunner_Download_ChrootExec(t *testing.T) { defer cleanup() // Wait for task to run and exit - select { - case <-tr.WaitCh(): - case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): - require.Fail(t, "timed out waiting for task runner to exit") - } + testWaitForTaskToDie(t, tr) state := tr.TaskState() require.Equal(t, structs.TaskStateDead, state.State) @@ -1816,11 +1807,7 @@ func TestTaskRunner_Download_RawExec(t *testing.T) { defer cleanup() // Wait for task to run and exit - select { - case <-tr.WaitCh(): - case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): - require.Fail(t, "timed out waiting for task runner to exit") - } + testWaitForTaskToDie(t, tr) state := tr.TaskState() require.Equal(t, structs.TaskStateDead, state.State) @@ -1851,11 +1838,7 @@ func TestTaskRunner_Download_List(t *testing.T) { defer cleanup() // Wait for task to run and exit - select { - case <-tr.WaitCh(): - case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): - require.Fail(t, "timed out waiting for task runner to exit") - } + testWaitForTaskToDie(t, tr) state := tr.TaskState() require.Equal(t, structs.TaskStateDead, state.State) @@ -1902,11 +1885,7 @@ func TestTaskRunner_Download_Retries(t *testing.T) { tr, _, cleanup := runTestTaskRunner(t, alloc, task.Name) defer cleanup() - select { - case <-tr.WaitCh(): - case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): - require.Fail(t, "timed out waiting for task to exit") - } + testWaitForTaskToDie(t, tr) state := tr.TaskState() require.Equal(t, structs.TaskStateDead, state.State) @@ -2100,6 +2079,8 @@ func TestTaskRunner_RestartSignalTask_NotRunning(t *testing.T) { case <-time.After(1 * time.Second): } + require.Equal(t, structs.TaskStatePending, tr.TaskState().State) + // Send a signal and restart err = tr.Signal(structs.NewTaskEvent("don't panic"), "QUIT") require.EqualError(t, err, ErrTaskNotRunning.Error()) @@ -2110,12 +2091,7 @@ func TestTaskRunner_RestartSignalTask_NotRunning(t *testing.T) { // Unblock and let it finish waitCh <- struct{}{} - - select { - case <-tr.WaitCh(): - case <-time.After(10 * time.Second): - require.Fail(t, "timed out waiting for task to complete") - } + testWaitForTaskToDie(t, tr) // Assert the task ran and never restarted state := tr.TaskState() @@ -2153,11 +2129,7 @@ func TestTaskRunner_Run_RecoverableStartError(t *testing.T) { tr, _, cleanup := runTestTaskRunner(t, alloc, task.Name) defer cleanup() - select { - case <-tr.WaitCh(): - case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): - require.Fail(t, "timed out waiting for task to exit") - } + testWaitForTaskToDie(t, tr) state := tr.TaskState() require.Equal(t, structs.TaskStateDead, state.State) @@ -2202,11 +2174,7 @@ func TestTaskRunner_Template_Artifact(t *testing.T) { go tr.Run() // Wait for task to run and exit - select { - case <-tr.WaitCh(): - case <-time.After(15 * time.Second * time.Duration(testutil.TestMultiplier())): - require.Fail(t, "timed out waiting for task runner to exit") - } + testWaitForTaskToDie(t, tr) state := tr.TaskState() require.Equal(t, structs.TaskStateDead, state.State) @@ -2536,7 +2504,9 @@ func TestTaskRunner_UnregisterConsul_Retries(t *testing.T) { tr, err := NewTaskRunner(conf) require.NoError(t, err) defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) - tr.Run() + go tr.Run() + + testWaitForTaskToDie(t, tr) state := tr.TaskState() require.Equal(t, structs.TaskStateDead, state.State) @@ -2562,7 +2532,17 @@ func TestTaskRunner_UnregisterConsul_Retries(t *testing.T) { func testWaitForTaskToStart(t *testing.T, tr *TaskRunner) { testutil.WaitForResult(func() (bool, error) { ts := tr.TaskState() - return ts.State == structs.TaskStateRunning, fmt.Errorf("%v", ts.State) + return ts.State == structs.TaskStateRunning, fmt.Errorf("expected task to be runnig, got %v", ts.State) + }, func(err error) { + require.NoError(t, err) + }) +} + +// testWaitForTaskToDie waits for the task to die or fails the test +func testWaitForTaskToDie(t *testing.T, tr *TaskRunner) { + testutil.WaitForResult(func() (bool, error) { + ts := tr.TaskState() + return ts.State == structs.TaskStateDead, fmt.Errorf("expected task to be dead, got %v", ts.State) }, func(err error) { require.NoError(t, err) }) From 0781b7569674023809319a9ecaabf54ef605f751 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Wed, 17 Aug 2022 19:40:28 -0400 Subject: [PATCH 08/14] tests: add tests for all tasks alloc restart --- client/alloc_endpoint_test.go | 39 ++ client/allocrunner/alloc_runner_test.go | 463 ++++++++++++++++++ client/allocrunner/alloc_runner_unix_test.go | 17 +- .../taskrunner/task_runner_test.go | 54 ++ nomad/mock/mock.go | 44 ++ 5 files changed, 609 insertions(+), 8 deletions(-) diff --git a/client/alloc_endpoint_test.go b/client/alloc_endpoint_test.go index c8f2560d02b..f3c6e3e2bfd 100644 --- a/client/alloc_endpoint_test.go +++ b/client/alloc_endpoint_test.go @@ -68,6 +68,45 @@ func TestAllocations_Restart(t *testing.T) { }) } +func TestAllocations_RestartAllTasks(t *testing.T) { + ci.Parallel(t) + + require := require.New(t) + client, cleanup := TestClient(t, nil) + defer cleanup() + + alloc := mock.LifecycleAlloc() + require.Nil(client.addAlloc(alloc, "")) + + // Can't restart all tasks while specifying a task name. + req := &nstructs.AllocRestartRequest{ + AllocID: alloc.ID, + AllTasks: true, + TaskName: "web", + } + var resp nstructs.GenericResponse + err := client.ClientRPC("Allocations.Restart", &req, &resp) + require.Error(err) + + // Good request. + req = &nstructs.AllocRestartRequest{ + AllocID: alloc.ID, + AllTasks: true, + } + + testutil.WaitForResult(func() (bool, error) { + var resp2 nstructs.GenericResponse + err := client.ClientRPC("Allocations.Restart", &req, &resp2) + if err != nil && strings.Contains(err.Error(), "not running") { + return false, err + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) +} + func TestAllocations_Restart_ACL(t *testing.T) { ci.Parallel(t) require := require.New(t) diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index 637c58f0e20..68576539326 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/hashicorp/consul/api" + multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/client/allochealth" "github.com/hashicorp/nomad/client/allocrunner/tasklifecycle" @@ -483,6 +484,468 @@ func TestAllocRunner_Lifecycle_Poststop(t *testing.T) { } +func TestAllocRunner_Lifecycle_Restart(t *testing.T) { + ci.Parallel(t) + + // test cases can use this default or override w/ taskDefs param + alloc := mock.LifecycleAllocFromTasks([]mock.LifecycleTaskDef{ + {"main", "100s", 0, "", false}, + {"prestart-oneshot", "1s", 0, "prestart", false}, + {"prestart-sidecar", "100s", 0, "prestart", true}, + {"poststart-oneshot", "1s", 0, "poststart", false}, + {"poststart-sidecar", "100s", 0, "poststart", true}, + {"poststop", "1s", 0, "poststop", false}, + }) + alloc.Job.Type = structs.JobTypeService + rp := &structs.RestartPolicy{ + Attempts: 1, + Interval: 10 * time.Minute, + Delay: 1 * time.Nanosecond, + Mode: structs.RestartPolicyModeFail, + } + + testCases := []struct { + name string + taskDefs []mock.LifecycleTaskDef + isBatch bool + hasLeader bool + action func(*allocRunner, *structs.Allocation) error + expectedErr string + expectedAfter map[string]structs.TaskState + }{ + { + name: "restart entire allocation", + action: func(ar *allocRunner, alloc *structs.Allocation) error { + ev := &structs.TaskEvent{Type: structs.TaskRestartAllSignal} + return ar.RestartAll(ev) + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "running", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 1}, + "prestart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 1}, + "poststart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststop": structs.TaskState{State: "pending", Restarts: 0}, + }, + }, + { + name: "restart only running tasks", + action: func(ar *allocRunner, alloc *structs.Allocation) error { + ev := &structs.TaskEvent{Type: structs.TaskRestartRunningSignal} + return ar.RestartRunning(ev) + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "running", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststop": structs.TaskState{State: "pending", Restarts: 0}, + }, + }, + { + name: "batch job restart entire allocation", + taskDefs: []mock.LifecycleTaskDef{ + {"main", "100s", 1, "", false}, + {"prestart-oneshot", "1s", 0, "prestart", false}, + {"prestart-sidecar", "100s", 0, "prestart", true}, + {"poststart-oneshot", "1s", 0, "poststart", false}, + {"poststart-sidecar", "100s", 0, "poststart", true}, + {"poststop", "1s", 0, "poststop", false}, + }, + isBatch: true, + action: func(ar *allocRunner, alloc *structs.Allocation) error { + ev := &structs.TaskEvent{Type: structs.TaskRestartAllSignal} + return ar.RestartAll(ev) + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "running", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 1}, + "prestart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 1}, + "poststart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststop": structs.TaskState{State: "pending", Restarts: 0}, + }, + }, + { + name: "batch job restart only running tasks ", + taskDefs: []mock.LifecycleTaskDef{ + {"main", "100s", 1, "", false}, + {"prestart-oneshot", "1s", 0, "prestart", false}, + {"prestart-sidecar", "100s", 0, "prestart", true}, + {"poststart-oneshot", "1s", 0, "poststart", false}, + {"poststart-sidecar", "100s", 0, "poststart", true}, + {"poststop", "1s", 0, "poststop", false}, + }, + isBatch: true, + action: func(ar *allocRunner, alloc *structs.Allocation) error { + ev := &structs.TaskEvent{Type: structs.TaskRestartRunningSignal} + return ar.RestartRunning(ev) + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "running", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststop": structs.TaskState{State: "pending", Restarts: 0}, + }, + }, + { + name: "restart entire allocation with leader", + hasLeader: true, + action: func(ar *allocRunner, alloc *structs.Allocation) error { + ev := &structs.TaskEvent{Type: structs.TaskRestartAllSignal} + return ar.RestartAll(ev) + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "running", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 1}, + "prestart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 1}, + "poststart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststop": structs.TaskState{State: "pending", Restarts: 0}, + }, + }, + { + name: "stop from server", + action: func(ar *allocRunner, alloc *structs.Allocation) error { + stopAlloc := alloc.Copy() + stopAlloc.DesiredStatus = structs.AllocDesiredStatusStop + ar.Update(stopAlloc) + return nil + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststop": structs.TaskState{State: "dead", Restarts: 0}, + }, + }, + { + name: "restart main task", + action: func(ar *allocRunner, alloc *structs.Allocation) error { + ev := &structs.TaskEvent{Type: structs.TaskRestartSignal} + return ar.RestartTask("main", ev) + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "running", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "running", Restarts: 0}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "running", Restarts: 0}, + "poststop": structs.TaskState{State: "pending", Restarts: 0}, + }, + }, + { + name: "restart leader main task", + hasLeader: true, + action: func(ar *allocRunner, alloc *structs.Allocation) error { + return ar.RestartTask("main", &structs.TaskEvent{Type: structs.TaskRestartSignal}) + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "running", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "running", Restarts: 0}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "running", Restarts: 0}, + "poststop": structs.TaskState{State: "pending", Restarts: 0}, + }, + }, + { + name: "main task fails and restarts once", + taskDefs: []mock.LifecycleTaskDef{ + {"main", "2s", 1, "", false}, + {"prestart-oneshot", "1s", 0, "prestart", false}, + {"prestart-sidecar", "100s", 0, "prestart", true}, + {"poststart-oneshot", "1s", 0, "poststart", false}, + {"poststart-sidecar", "100s", 0, "poststart", true}, + {"poststop", "1s", 0, "poststop", false}, + }, + action: func(ar *allocRunner, alloc *structs.Allocation) error { + time.Sleep(3 * time.Second) // make sure main task has exited + return nil + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "dead", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststop": structs.TaskState{State: "dead", Restarts: 0}, + }, + }, + { + name: "leader main task fails and restarts once", + taskDefs: []mock.LifecycleTaskDef{ + {"main", "2s", 1, "", false}, + {"prestart-oneshot", "1s", 0, "prestart", false}, + {"prestart-sidecar", "100s", 0, "prestart", true}, + {"poststart-oneshot", "1s", 0, "poststart", false}, + {"poststart-sidecar", "100s", 0, "poststart", true}, + {"poststop", "1s", 0, "poststop", false}, + }, + hasLeader: true, + action: func(ar *allocRunner, alloc *structs.Allocation) error { + time.Sleep(3 * time.Second) // make sure main task has exited + return nil + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "dead", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststop": structs.TaskState{State: "dead", Restarts: 0}, + }, + }, + { + name: "main stopped unexpectedly and restarts once", + taskDefs: []mock.LifecycleTaskDef{ + {"main", "2s", 0, "", false}, + {"prestart-oneshot", "1s", 0, "prestart", false}, + {"prestart-sidecar", "100s", 0, "prestart", true}, + {"poststart-oneshot", "1s", 0, "poststart", false}, + {"poststart-sidecar", "100s", 0, "poststart", true}, + {"poststop", "1s", 0, "poststop", false}, + }, + action: func(ar *allocRunner, alloc *structs.Allocation) error { + time.Sleep(3 * time.Second) // make sure main task has exited + return nil + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "dead", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststop": structs.TaskState{State: "dead", Restarts: 0}, + }, + }, + { + name: "leader main stopped unexpectedly and restarts once", + taskDefs: []mock.LifecycleTaskDef{ + {"main", "2s", 0, "", false}, + {"prestart-oneshot", "1s", 0, "prestart", false}, + {"prestart-sidecar", "100s", 0, "prestart", true}, + {"poststart-oneshot", "1s", 0, "poststart", false}, + {"poststart-sidecar", "100s", 0, "poststart", true}, + {"poststop", "1s", 0, "poststop", false}, + }, + action: func(ar *allocRunner, alloc *structs.Allocation) error { + time.Sleep(3 * time.Second) // make sure main task has exited + return nil + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "dead", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststop": structs.TaskState{State: "dead", Restarts: 0}, + }, + }, + { + name: "failed main task cannot be restarted", + taskDefs: []mock.LifecycleTaskDef{ + {"main", "2s", 1, "", false}, + {"prestart-oneshot", "1s", 0, "prestart", false}, + {"prestart-sidecar", "100s", 0, "prestart", true}, + {"poststart-oneshot", "1s", 0, "poststart", false}, + {"poststart-sidecar", "100s", 0, "poststart", true}, + {"poststop", "1s", 0, "poststop", false}, + }, + action: func(ar *allocRunner, alloc *structs.Allocation) error { + // make sure main task has had a chance to restart once on its + // own and fail again before we try to manually restart it + time.Sleep(5 * time.Second) + return ar.RestartTask("main", &structs.TaskEvent{Type: structs.TaskRestartSignal}) + }, + expectedErr: "Task not running", + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "dead", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststop": structs.TaskState{State: "dead", Restarts: 0}, + }, + }, + { + name: "restart prestart-sidecar task", + action: func(ar *allocRunner, alloc *structs.Allocation) error { + return ar.RestartTask("prestart-sidecar", &structs.TaskEvent{Type: structs.TaskRestartSignal}) + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "running", Restarts: 0}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "running", Restarts: 0}, + "poststop": structs.TaskState{State: "pending", Restarts: 0}, + }, + }, + { + name: "restart poststart-sidecar task", + action: func(ar *allocRunner, alloc *structs.Allocation) error { + return ar.RestartTask("poststart-sidecar", &structs.TaskEvent{Type: structs.TaskRestartSignal}) + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "running", Restarts: 0}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "running", Restarts: 0}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststop": structs.TaskState{State: "pending", Restarts: 0}, + }, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + ci.Parallel(t) + + alloc := alloc.Copy() + alloc.Job.TaskGroups[0].RestartPolicy = rp + if tc.taskDefs != nil { + alloc = mock.LifecycleAllocFromTasks(tc.taskDefs) + alloc.Job.Type = structs.JobTypeService + } + for _, task := range alloc.Job.TaskGroups[0].Tasks { + task.RestartPolicy = rp // tasks inherit the group policy + } + if tc.hasLeader { + for _, task := range alloc.Job.TaskGroups[0].Tasks { + if task.Name == "main" { + task.Leader = true + } + } + } + if tc.isBatch { + alloc.Job.Type = structs.JobTypeBatch + } + + conf, cleanup := testAllocRunnerConfig(t, alloc) + defer cleanup() + ar, err := NewAllocRunner(conf) + require.NoError(t, err) + defer destroy(ar) + go ar.Run() + + upd := conf.StateUpdater.(*MockStateUpdater) + + // assert our "before" states: + // - all one-shot tasks should be dead but not failed + // - all main tasks and sidecars should be running + // - no tasks should have restarted + testutil.WaitForResult(func() (bool, error) { + last := upd.Last() + if last == nil { + return false, fmt.Errorf("no update") + } + if last.ClientStatus != structs.AllocClientStatusRunning { + return false, fmt.Errorf( + "expected alloc to be running not %s", last.ClientStatus) + } + var errs *multierror.Error + + expectedBefore := map[string]string{ + "main": "running", + "prestart-oneshot": "dead", + "prestart-sidecar": "running", + "poststart-oneshot": "dead", + "poststart-sidecar": "running", + "poststop": "pending", + } + + for task, expected := range expectedBefore { + got, ok := last.TaskStates[task] + if !ok { + continue + } + if got.State != expected { + errs = multierror.Append(errs, fmt.Errorf( + "expected initial state of task %q to be %q not %q", + task, expected, got.State)) + } + if got.Restarts != 0 { + errs = multierror.Append(errs, fmt.Errorf( + "expected no initial restarts of task %q, not %q", + task, got.Restarts)) + } + if expected == "dead" && got.Failed { + errs = multierror.Append(errs, fmt.Errorf( + "expected ephemeral task %q to be dead but not failed", + task)) + } + + } + if errs.ErrorOrNil() != nil { + return false, errs.ErrorOrNil() + } + return true, nil + }, func(err error) { + require.NoError(t, err, "error waiting for initial state") + }) + + // perform the action + err = tc.action(ar, alloc.Copy()) + if tc.expectedErr != "" { + require.EqualError(t, err, tc.expectedErr) + } else { + require.NoError(t, err) + } + + // assert our "after" states + testutil.WaitForResult(func() (bool, error) { + last := upd.Last() + if last == nil { + return false, fmt.Errorf("no update") + } + var errs *multierror.Error + for task, expected := range tc.expectedAfter { + got, ok := last.TaskStates[task] + if !ok { + errs = multierror.Append(errs, fmt.Errorf( + "no final state found for task %q", task, + )) + } + if got.State != expected.State { + errs = multierror.Append(errs, fmt.Errorf( + "expected final state of task %q to be %q not %q", + task, expected.State, got.State)) + } + if expected.State == "dead" { + if got.FinishedAt.IsZero() || got.StartedAt.IsZero() { + errs = multierror.Append(errs, fmt.Errorf( + "expected final state of task %q to have start and finish time", task)) + } + if len(got.Events) < 2 { + errs = multierror.Append(errs, fmt.Errorf( + "expected final state of task %q to include at least 2 tasks", task)) + } + } + + if got.Restarts != expected.Restarts { + errs = multierror.Append(errs, fmt.Errorf( + "expected final restarts of task %q to be %v not %v", + task, expected.Restarts, got.Restarts)) + } + } + if errs.ErrorOrNil() != nil { + return false, errs.ErrorOrNil() + } + return true, nil + }, func(err error) { + require.NoError(t, err, "error waiting for final state") + }) + }) + } +} + func TestAllocRunner_TaskGroup_ShutdownDelay(t *testing.T) { ci.Parallel(t) diff --git a/client/allocrunner/alloc_runner_unix_test.go b/client/allocrunner/alloc_runner_unix_test.go index ab3c777dd20..7afb6faeaa5 100644 --- a/client/allocrunner/alloc_runner_unix_test.go +++ b/client/allocrunner/alloc_runner_unix_test.go @@ -154,9 +154,10 @@ func TestAllocRunner_Restore_CompletedBatch(t *testing.T) { alloc := mock.Alloc() alloc.Job.Type = structs.JobTypeBatch task := alloc.Job.TaskGroups[0].Tasks[0] - task.Driver = "mock_driver" + task.Driver = "raw_exec" task.Config = map[string]interface{}{ - "run_for": "2ms", + "command": "sleep", + "args": []string{"2"}, } conf, cleanup := testAllocRunnerConfig(t, alloc.Copy()) @@ -207,18 +208,18 @@ func TestAllocRunner_Restore_CompletedBatch(t *testing.T) { go ar2.Run() defer destroy(ar2) - // AR waitCh must be closed even when task doesn't run again + // AR waitCh must be open as the task waits for a possible alloc restart. select { case <-ar2.WaitCh(): - case <-time.After(10 * time.Second): - require.Fail(t, "alloc.waitCh wasn't closed") + require.Fail(t, "alloc.waitCh was closed") + default: } - // TR waitCh must be closed too! + // TR waitCh must be open too! select { case <-ar2.tasks[task.Name].WaitCh(): - case <-time.After(10 * time.Second): - require.Fail(t, "tr.waitCh wasn't closed") + require.Fail(t, "tr.waitCh was closed") + default: } // Assert that events are unmodified, which they would if task re-run diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index 235f1c7ee6e..18cacaf418d 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -349,6 +349,60 @@ func TestTaskRunner_Restore_Running(t *testing.T) { assert.Equal(t, 1, started) } +// TestTaskRunner_Restore_Dead asserts restoring a dead task will cause it to +// block in alloc restart loop and it will be able to restart on request. +func TestTaskRunner_Restore_Dead(t *testing.T) { + ci.Parallel(t) + + alloc := mock.BatchAlloc() + alloc.Job.TaskGroups[0].Count = 1 + task := alloc.Job.TaskGroups[0].Tasks[0] + // use raw_exec because mock_driver restore doesn't behave as expected. + task.Driver = "raw_exec" + task.Config = map[string]interface{}{ + "command": "sleep", + "args": []string{"2"}, + } + conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + conf.StateDB = cstate.NewMemDB(conf.Logger) // "persist" state between task runners + defer cleanup() + + // Run the first TaskRunner + origTR, err := NewTaskRunner(conf) + require.NoError(t, err) + go origTR.Run() + defer origTR.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + + // Wait for it to be dead + testWaitForTaskToDie(t, origTR) + + // Cause TR to exit without shutting down task + origTR.Shutdown() + + // Start a new TaskRunner and make sure it does not rerun the task + newTR, err := NewTaskRunner(conf) + require.NoError(t, err) + + // Do the Restore + require.NoError(t, newTR.Restore()) + + go newTR.Run() + defer newTR.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + + // Verify that the TaskRunner is still active + select { + case <-newTR.WaitCh(): + require.Fail(t, "WaitCh is not blocking") + default: + } + + // Verify that we can restart task + ev := &structs.TaskEvent{Type: structs.TaskRestartAllSignal} + err = newTR.Restart(context.Background(), ev, false) + require.NoError(t, err) + testWaitForTaskToStart(t, newTR) +} + // setupRestoreFailureTest starts a service, shuts down the task runner, and // kills the task before restarting a new TaskRunner. The new TaskRunner is // returned once it is running and waiting in pending along with a cleanup diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 63c7132c5fd..d5df8d285fe 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -700,6 +700,50 @@ func LifecycleAlloc() *structs.Allocation { return alloc } +type LifecycleTaskDef struct { + Name string + RunFor string + ExitCode int + Hook string + IsSidecar bool +} + +// LifecycleAllocFromTasks generates an Allocation with mock tasks that have +// the provided lifecycles. +func LifecycleAllocFromTasks(tasks []LifecycleTaskDef) *structs.Allocation { + alloc := LifecycleAlloc() + alloc.Job.TaskGroups[0].Tasks = []*structs.Task{} + for _, task := range tasks { + var lc *structs.TaskLifecycleConfig + if task.Hook != "" { + // TODO: task coordinator doesn't treat nil and empty structs the same + lc = &structs.TaskLifecycleConfig{ + Hook: task.Hook, + Sidecar: task.IsSidecar, + } + } + + alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, + &structs.Task{ + Name: task.Name, + Driver: "mock_driver", + Config: map[string]interface{}{ + "run_for": task.RunFor, + "exit_code": task.ExitCode}, + Lifecycle: lc, + LogConfig: structs.DefaultLogConfig(), + Resources: &structs.Resources{CPU: 100, MemoryMB: 256}, + }, + ) + alloc.TaskResources[task.Name] = &structs.Resources{CPU: 100, MemoryMB: 256} + alloc.AllocatedResources.Tasks[task.Name] = &structs.AllocatedTaskResources{ + Cpu: structs.AllocatedCpuResources{CpuShares: 100}, + Memory: structs.AllocatedMemoryResources{MemoryMB: 256}, + } + } + return alloc +} + func LifecycleJobWithPoststopDeploy() *structs.Job { job := &structs.Job{ Region: "global", From 4400c3730865fc8cc216bc58bbd8b6950cab8ee0 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Wed, 17 Aug 2022 20:55:43 -0400 Subject: [PATCH 09/14] changelog: add entry for #14127 --- .changelog/14127.txt | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changelog/14127.txt diff --git a/.changelog/14127.txt b/.changelog/14127.txt new file mode 100644 index 00000000000..dc4b042f218 --- /dev/null +++ b/.changelog/14127.txt @@ -0,0 +1,7 @@ +```release-note:improvement +client: add option to restart all tasks of an allocation, even if the task already run, such as non-sidecar prestart and poststart tasks. +``` + +```release-note:improvement +client: only start poststop tasks after poststart tasks are done. +``` From b97b1227aa03f026b2e41b3b0f2680a97bd42fd7 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Mon, 22 Aug 2022 11:25:35 -0400 Subject: [PATCH 10/14] taskrunner: fix restore logic. The first implementation of the task runner restore process relied on server data (`tr.Alloc().TerminalStatus()`) which may not be available to the client at the time of restore. It also had the incorrect code path. When restoring a dead task the driver handle always needs to be clear cleanly using `clearDriverHandle` otherwise, after exiting the MAIN loop, the task may be killed by `tr.handleKill`. The fix is to store the state of the Run() loop in the task runner local client state: if the task runner ever exits this loop cleanly (not with a shutdown) it will never be able to run again. So if the Run() loops starts with this local state flag set, it must exit early. This local state flag is also being checked on task restart requests. If the task is "dead" and its Run() loop is not active it will never be able to run again. --- client/allocrunner/alloc_runner_unix_test.go | 5 +- client/allocrunner/taskrunner/lifecycle.go | 11 +++- client/allocrunner/taskrunner/state/state.go | 6 ++ client/allocrunner/taskrunner/task_runner.go | 32 +++++++---- .../taskrunner/task_runner_test.go | 55 ++++++++++++++----- 5 files changed, 80 insertions(+), 29 deletions(-) diff --git a/client/allocrunner/alloc_runner_unix_test.go b/client/allocrunner/alloc_runner_unix_test.go index 7afb6faeaa5..0859569101e 100644 --- a/client/allocrunner/alloc_runner_unix_test.go +++ b/client/allocrunner/alloc_runner_unix_test.go @@ -154,10 +154,9 @@ func TestAllocRunner_Restore_CompletedBatch(t *testing.T) { alloc := mock.Alloc() alloc.Job.Type = structs.JobTypeBatch task := alloc.Job.TaskGroups[0].Tasks[0] - task.Driver = "raw_exec" + task.Driver = "mock_driver" task.Config = map[string]interface{}{ - "command": "sleep", - "args": []string{"2"}, + "run_for": "2ms", } conf, cleanup := testAllocRunnerConfig(t, alloc.Copy()) diff --git a/client/allocrunner/taskrunner/lifecycle.go b/client/allocrunner/taskrunner/lifecycle.go index 66577d6035b..6f3599dce81 100644 --- a/client/allocrunner/taskrunner/lifecycle.go +++ b/client/allocrunner/taskrunner/lifecycle.go @@ -18,6 +18,13 @@ func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, fai return ErrTaskNotRunning } + tr.stateLock.Lock() + localState := tr.localState.Copy() + tr.stateLock.Unlock() + if localState == nil { + return ErrTaskNotRunning + } + switch taskState.State { case structs.TaskStatePending: // Tasks that are "pending" are never allowed to restart. @@ -25,8 +32,8 @@ func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, fai case structs.TaskStateDead: // Tasks that are "dead" are only allowed to restart when restarting // all tasks in the alloc, otherwise the taskCoordinator will prevent - // it from running again. - if event.Type != structs.TaskRestartAllSignal { + // it from running again, and if their Run method is still running. + if event.Type != structs.TaskRestartAllSignal || localState.RunComplete { return ErrTaskNotRunning } } diff --git a/client/allocrunner/taskrunner/state/state.go b/client/allocrunner/taskrunner/state/state.go index 5f83c476c80..a4bc26b0f0c 100644 --- a/client/allocrunner/taskrunner/state/state.go +++ b/client/allocrunner/taskrunner/state/state.go @@ -16,6 +16,11 @@ type LocalState struct { // TaskHandle is the handle used to reattach to the task during recovery TaskHandle *drivers.TaskHandle + + // RunComplete is set to true when the TaskRunner.Run() method finishes. + // It is used to distinguish between a dead task that could be restarted + // and one that will never run again. + RunComplete bool } func NewLocalState() *LocalState { @@ -52,6 +57,7 @@ func (s *LocalState) Copy() *LocalState { Hooks: make(map[string]*HookState, len(s.Hooks)), DriverNetwork: s.DriverNetwork.Copy(), TaskHandle: s.TaskHandle.Copy(), + RunComplete: s.RunComplete, } // Copy the hook state diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 44d7f13d3a9..c6899d1f938 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -514,23 +514,26 @@ func (tr *TaskRunner) Run() { var result *drivers.ExitResult tr.stateLock.RLock() - restoredDead := tr.state.State == structs.TaskStateDead + dead := tr.state.State == structs.TaskStateDead + runComplete := tr.localState.RunComplete tr.stateLock.RUnlock() // If restoring a dead task, ensure that task is cleared and all post hooks // are called without additional state updates. // If the alloc is not terminal we must proceed until the ALLOC_RESTART // loop to allow the task to run again in case the alloc is restarted. - if restoredDead && tr.Alloc().TerminalStatus() { + if dead { // do cleanup functions without emitting any additional events/work // to handle cases where we restored a dead task where client terminated // after task finished before completing post-run actions. tr.clearDriverHandle() tr.stateUpdater.TaskStateUpdated() - if err := tr.stop(); err != nil { - tr.logger.Error("stop failed on terminal task", "error", err) + if runComplete { + if err := tr.stop(); err != nil { + tr.logger.Error("stop failed on terminal task", "error", err) + } + return } - return } // Updates are handled asynchronously with the other hooks but each @@ -560,10 +563,7 @@ func (tr *TaskRunner) Run() { MAIN: for !tr.shouldShutdown() { - if restoredDead { - // Break early when restoring a dead task and reset the flag so the - // loop runs again if the task is restarted. - restoredDead = false + if dead { break } @@ -701,12 +701,22 @@ MAIN: // Restart without delay since the task is not running anymore. restart, _ := tr.shouldRestart() if restart { + // Set runner as not dead to allow the MAIN loop to run. + dead = false goto MAIN } } } } + tr.stateLock.Lock() + tr.localState.RunComplete = true + err := tr.stateDB.PutTaskRunnerLocalState(tr.allocID, tr.taskName, tr.localState) + if err != nil { + tr.logger.Warn("error persisting task state on run loop exit", "error", err) + } + tr.stateLock.Unlock() + // Run the stop hooks if err := tr.stop(); err != nil { tr.logger.Error("stop failed", "error", err) @@ -1233,8 +1243,10 @@ func (tr *TaskRunner) UpdateState(state string, event *structs.TaskEvent) { tr.stateLock.Lock() defer tr.stateLock.Unlock() + tr.logger.Trace("setting task state", "state", state) + if event != nil { - tr.logger.Trace("setting task state", "state", state, "event", event.Type) + tr.logger.Trace("appending task event", "state", state, "event", event.Type) // Append the event tr.appendEvent(event) diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index 18cacaf418d..febd534f750 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -349,19 +349,18 @@ func TestTaskRunner_Restore_Running(t *testing.T) { assert.Equal(t, 1, started) } -// TestTaskRunner_Restore_Dead asserts restoring a dead task will cause it to -// block in alloc restart loop and it will be able to restart on request. +// TestTaskRunner_Restore_Dead asserts that restoring a dead task will place it +// back in the correct state. If the task was waiting for an alloc restart it +// must be able to be restarted after restore, otherwise a restart must fail. func TestTaskRunner_Restore_Dead(t *testing.T) { ci.Parallel(t) alloc := mock.BatchAlloc() alloc.Job.TaskGroups[0].Count = 1 task := alloc.Job.TaskGroups[0].Tasks[0] - // use raw_exec because mock_driver restore doesn't behave as expected. - task.Driver = "raw_exec" + task.Driver = "mock_driver" task.Config = map[string]interface{}{ - "command": "sleep", - "args": []string{"2"}, + "run_for": "2s", } conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) conf.StateDB = cstate.NewMemDB(conf.Logger) // "persist" state between task runners @@ -379,28 +378,56 @@ func TestTaskRunner_Restore_Dead(t *testing.T) { // Cause TR to exit without shutting down task origTR.Shutdown() - // Start a new TaskRunner and make sure it does not rerun the task + // Start a new TaskRunner and do the Restore newTR, err := NewTaskRunner(conf) require.NoError(t, err) - - // Do the Restore require.NoError(t, newTR.Restore()) go newTR.Run() defer newTR.Kill(context.Background(), structs.NewTaskEvent("cleanup")) - // Verify that the TaskRunner is still active + // Verify that the TaskRunner is still active since it was recovered after + // a forced shutdown. select { case <-newTR.WaitCh(): require.Fail(t, "WaitCh is not blocking") default: } - // Verify that we can restart task - ev := &structs.TaskEvent{Type: structs.TaskRestartAllSignal} - err = newTR.Restart(context.Background(), ev, false) - require.NoError(t, err) + // Verify that we can restart task. + // Retry a few times as the newTR.Run() may not have started yet. + testutil.WaitForResult(func() (bool, error) { + ev := &structs.TaskEvent{Type: structs.TaskRestartAllSignal} + err = newTR.Restart(context.Background(), ev, false) + return err == nil, err + }, func(err error) { + require.NoError(t, err) + }) testWaitForTaskToStart(t, newTR) + + // Kill task to verify that it's restored as dead and not able to restart. + newTR.Kill(context.Background(), nil) + testutil.WaitForResult(func() (bool, error) { + select { + case <-newTR.WaitCh(): + return true, nil + default: + return false, fmt.Errorf("task still running") + } + }, func(err error) { + require.NoError(t, err) + }) + + newTR2, err := NewTaskRunner(conf) + require.NoError(t, err) + require.NoError(t, newTR2.Restore()) + + go newTR2.Run() + defer newTR2.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + + ev := &structs.TaskEvent{Type: structs.TaskRestartAllSignal} + err = newTR2.Restart(context.Background(), ev, false) + require.Equal(t, err, ErrTaskNotRunning) } // setupRestoreFailureTest starts a service, shuts down the task runner, and From c56cfdb8ebcafeedc80e64b19c100dcfd4e81083 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Mon, 22 Aug 2022 14:03:30 -0400 Subject: [PATCH 11/14] address code review requests --- .changelog/14127.txt | 2 +- api/allocations.go | 10 ++++++- client/allocrunner/alloc_runner.go | 35 +++++++++++++++---------- client/allocrunner/alloc_runner_test.go | 12 ++------- client/allocrunner/testing.go | 12 +++++++++ command/alloc_restart.go | 3 ++- 6 files changed, 47 insertions(+), 27 deletions(-) diff --git a/.changelog/14127.txt b/.changelog/14127.txt index dc4b042f218..61c0368774e 100644 --- a/.changelog/14127.txt +++ b/.changelog/14127.txt @@ -1,5 +1,5 @@ ```release-note:improvement -client: add option to restart all tasks of an allocation, even if the task already run, such as non-sidecar prestart and poststart tasks. +client: add option to restart all tasks of an allocation, regardless of lifecycle type or state. ``` ```release-note:improvement diff --git a/api/allocations.go b/api/allocations.go index 741ee1fde39..dc2ebb27900 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -141,7 +141,9 @@ func (a *Allocations) GC(alloc *Allocation, q *QueryOptions) error { return err } -// Restart restarts an allocation. +// Restart restarts the tasks that are currently running or a specific task if +// taskName is provided. An error is returned if the task to be restarted is +// not running. // // Note: for cluster topologies where API consumers don't have network access to // Nomad clients, set api.ClientConnTimeout to a small value (ex 1ms) to avoid @@ -156,6 +158,12 @@ func (a *Allocations) Restart(alloc *Allocation, taskName string, q *QueryOption return err } +// RestartAllTasks restarts all tasks in the allocation, regardless of +// lifecycle type or state. Tasks will restart following their lifecycle order. +// +// Note: for cluster topologies where API consumers don't have network access to +// Nomad clients, set api.ClientConnTimeout to a small value (ex 1ms) to avoid +// long pauses on this API call. func (a *Allocations) RestartAllTasks(alloc *Allocation, q *QueryOptions) error { req := AllocationRestartRequest{ AllTasks: true, diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 96ea74524e4..f39f6e1f583 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -548,7 +548,8 @@ func (ar *allocRunner) handleTaskStateUpdates() { if len(liveRunners) > 0 { // if all live runners are sidecars - kill alloc - if killEvent == nil && hasSidecars && !hasNonSidecarTasks(liveRunners) { + hasLiveSidecars := hasSidecars && !hasNonSidecarTasks(liveRunners) + if killEvent == nil && hasLiveSidecars { killEvent = structs.NewTaskEvent(structs.TaskMainDead) } @@ -573,11 +574,11 @@ func (ar *allocRunner) handleTaskStateUpdates() { // Kill 'em all states = ar.killTasks() - // Wait for TaskRunners to exit before continuing to - // prevent looping before TaskRunners have transitioned - // to Dead. + // Wait for TaskRunners to exit before continuing. This will + // prevent looping before TaskRunners have transitioned to + // Dead. for _, tr := range liveRunners { - ar.logger.Info("killing task", "task", tr.Task().Name) + ar.logger.Info("waiting for task to exit", "task", tr.Task().Name) select { case <-tr.WaitCh(): case <-ar.waitCh: @@ -1228,20 +1229,24 @@ func (ar *allocRunner) GetTaskEventHandler(taskName string) drivermanager.EventH } // Restart satisfies the WorkloadRestarter interface and restarts all tasks -// that are currently running. Only the TaskRestartRunningSignal event type may -// be used. +// that are currently running. +// +// The event type will be set to TaskRestartRunningSignal to comply with +// internal restart logic requirements. func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error { if event.Type != structs.TaskRestartRunningSignal { - return fmt.Errorf("Invalid event %s for alloc restart request", event.Type) + event.Type = structs.TaskRestartRunningSignal } return ar.restartTasks(ctx, event, failure) } -// RestartTask restarts the provided task. Only TaskRestartSignal event type -// may be used. +// RestartTask restarts the provided task. +// +// The event type will be set to TaskRestartSignal to comply with internal +// restart logic requirements. func (ar *allocRunner) RestartTask(taskName string, event *structs.TaskEvent) error { if event.Type != structs.TaskRestartSignal { - return fmt.Errorf("Invalid event %s for task restart request", event.Type) + event.Type = structs.TaskRestartSignal } tr, ok := ar.tasks[taskName] @@ -1252,11 +1257,13 @@ func (ar *allocRunner) RestartTask(taskName string, event *structs.TaskEvent) er return tr.Restart(context.TODO(), event, false) } -// RestartRunning restarts all tasks that are currently running. Only the -// TaskRestartRunningSignal event type may be used. +// RestartRunning restarts all tasks that are currently running. +// +// The event type will be set to TaskRestartRunningSignal to comply with +// internal restart logic requirements. func (ar *allocRunner) RestartRunning(event *structs.TaskEvent) error { if event.Type != structs.TaskRestartRunningSignal { - return fmt.Errorf("Invalid event %s for running tasks restart request", event.Type) + event.Type = structs.TaskRestartRunningSignal } return ar.restartTasks(context.TODO(), event, false) } diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index 68576539326..a1e2272b6c5 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -1676,11 +1676,7 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) { ar.Run() defer destroy(ar) - testutil.WaitForResult(func() (bool, error) { - return ar.AllocState().ClientStatus == structs.AllocClientStatusComplete, nil - }, func(_ error) { - t.Fatalf("expected alloc to be complete") - }) + WaitForClientState(t, ar, structs.AllocClientStatusComplete) // Step 2. Modify its directory task := alloc.Job.TaskGroups[0].Tasks[0] @@ -1708,11 +1704,7 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) { ar2.Run() defer destroy(ar2) - testutil.WaitForResult(func() (bool, error) { - return ar.AllocState().ClientStatus == structs.AllocClientStatusComplete, nil - }, func(_ error) { - t.Fatalf("expected alloc to be complete") - }) + WaitForClientState(t, ar, structs.AllocClientStatusComplete) // Ensure that data from ar was moved to ar2 dataFile = filepath.Join(ar2.allocDir.SharedDir, "data", "data_file") diff --git a/client/allocrunner/testing.go b/client/allocrunner/testing.go index 6f2fd7b03df..44e3eb52479 100644 --- a/client/allocrunner/testing.go +++ b/client/allocrunner/testing.go @@ -4,6 +4,7 @@ package allocrunner import ( + "fmt" "sync" "testing" @@ -20,6 +21,7 @@ import ( "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" ) @@ -104,3 +106,13 @@ func TestAllocRunnerFromAlloc(t *testing.T, alloc *structs.Allocation) (*allocRu return ar, cleanup } + +func WaitForClientState(t *testing.T, ar *allocRunner, state string) { + testutil.WaitForResult(func() (bool, error) { + got := ar.AllocState().ClientStatus + return got == state, + fmt.Errorf("expected alloc runner to be in state %s, got %s", state, got) + }, func(err error) { + require.NoError(t, err) + }) +} diff --git a/command/alloc_restart.go b/command/alloc_restart.go index 909e5666ea7..cc6d9165497 100644 --- a/command/alloc_restart.go +++ b/command/alloc_restart.go @@ -36,7 +36,8 @@ Restart Specific Options: -all-tasks If set, all tasks in the allocation will be restarted, even the ones that - already ran. This option cannot be used if a task is defined. + already ran. This option cannot be used with '-task' or the '' + argument. -task Specify the individual task to restart. If task name is given with both an From 647f071cb7e3dba8c1171c50ba31c652ea630a41 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Tue, 23 Aug 2022 22:32:05 -0400 Subject: [PATCH 12/14] apply more code review changes --- client/allocrunner/alloc_runner.go | 4 +- client/allocrunner/alloc_runner_test.go | 96 +++++++++---------- .../taskrunner/task_runner_test.go | 2 +- 3 files changed, 51 insertions(+), 51 deletions(-) diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index f39f6e1f583..0e8885ca33c 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -548,8 +548,8 @@ func (ar *allocRunner) handleTaskStateUpdates() { if len(liveRunners) > 0 { // if all live runners are sidecars - kill alloc - hasLiveSidecars := hasSidecars && !hasNonSidecarTasks(liveRunners) - if killEvent == nil && hasLiveSidecars { + onlySidecarsRemaining := hasSidecars && !hasNonSidecarTasks(liveRunners) + if killEvent == nil && onlySidecarsRemaining { killEvent = structs.NewTaskEvent(structs.TaskMainDead) } diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index a1e2272b6c5..4913addd73b 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -489,12 +489,12 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { // test cases can use this default or override w/ taskDefs param alloc := mock.LifecycleAllocFromTasks([]mock.LifecycleTaskDef{ - {"main", "100s", 0, "", false}, - {"prestart-oneshot", "1s", 0, "prestart", false}, - {"prestart-sidecar", "100s", 0, "prestart", true}, - {"poststart-oneshot", "1s", 0, "poststart", false}, - {"poststart-sidecar", "100s", 0, "poststart", true}, - {"poststop", "1s", 0, "poststop", false}, + {Name: "main", RunFor: "100s", ExitCode: 0, Hook: "", IsSidecar: false}, + {Name: "prestart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "prestart", IsSidecar: false}, + {Name: "prestart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "prestart", IsSidecar: true}, + {Name: "poststart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "poststart", IsSidecar: false}, + {Name: "poststart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "poststart", IsSidecar: true}, + {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, }) alloc.Job.Type = structs.JobTypeService rp := &structs.RestartPolicy{ @@ -546,12 +546,12 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { { name: "batch job restart entire allocation", taskDefs: []mock.LifecycleTaskDef{ - {"main", "100s", 1, "", false}, - {"prestart-oneshot", "1s", 0, "prestart", false}, - {"prestart-sidecar", "100s", 0, "prestart", true}, - {"poststart-oneshot", "1s", 0, "poststart", false}, - {"poststart-sidecar", "100s", 0, "poststart", true}, - {"poststop", "1s", 0, "poststop", false}, + {Name: "main", RunFor: "100s", ExitCode: 1, Hook: "", IsSidecar: false}, + {Name: "prestart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "prestart", IsSidecar: false}, + {Name: "prestart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "prestart", IsSidecar: true}, + {Name: "poststart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "poststart", IsSidecar: false}, + {Name: "poststart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "poststart", IsSidecar: true}, + {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, }, isBatch: true, action: func(ar *allocRunner, alloc *structs.Allocation) error { @@ -570,12 +570,12 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { { name: "batch job restart only running tasks ", taskDefs: []mock.LifecycleTaskDef{ - {"main", "100s", 1, "", false}, - {"prestart-oneshot", "1s", 0, "prestart", false}, - {"prestart-sidecar", "100s", 0, "prestart", true}, - {"poststart-oneshot", "1s", 0, "poststart", false}, - {"poststart-sidecar", "100s", 0, "poststart", true}, - {"poststop", "1s", 0, "poststop", false}, + {Name: "main", RunFor: "100s", ExitCode: 1, Hook: "", IsSidecar: false}, + {Name: "prestart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "prestart", IsSidecar: false}, + {Name: "prestart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "prestart", IsSidecar: true}, + {Name: "poststart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "poststart", IsSidecar: false}, + {Name: "poststart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "poststart", IsSidecar: true}, + {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, }, isBatch: true, action: func(ar *allocRunner, alloc *structs.Allocation) error { @@ -657,12 +657,12 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { { name: "main task fails and restarts once", taskDefs: []mock.LifecycleTaskDef{ - {"main", "2s", 1, "", false}, - {"prestart-oneshot", "1s", 0, "prestart", false}, - {"prestart-sidecar", "100s", 0, "prestart", true}, - {"poststart-oneshot", "1s", 0, "poststart", false}, - {"poststart-sidecar", "100s", 0, "poststart", true}, - {"poststop", "1s", 0, "poststop", false}, + {Name: "main", RunFor: "2s", ExitCode: 1, Hook: "", IsSidecar: false}, + {Name: "prestart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "prestart", IsSidecar: false}, + {Name: "prestart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "prestart", IsSidecar: true}, + {Name: "poststart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "poststart", IsSidecar: false}, + {Name: "poststart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "poststart", IsSidecar: true}, + {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, }, action: func(ar *allocRunner, alloc *structs.Allocation) error { time.Sleep(3 * time.Second) // make sure main task has exited @@ -680,12 +680,12 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { { name: "leader main task fails and restarts once", taskDefs: []mock.LifecycleTaskDef{ - {"main", "2s", 1, "", false}, - {"prestart-oneshot", "1s", 0, "prestart", false}, - {"prestart-sidecar", "100s", 0, "prestart", true}, - {"poststart-oneshot", "1s", 0, "poststart", false}, - {"poststart-sidecar", "100s", 0, "poststart", true}, - {"poststop", "1s", 0, "poststop", false}, + {Name: "main", RunFor: "2s", ExitCode: 1, Hook: "", IsSidecar: false}, + {Name: "prestart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "prestart", IsSidecar: false}, + {Name: "prestart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "prestart", IsSidecar: true}, + {Name: "poststart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "poststart", IsSidecar: false}, + {Name: "poststart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "poststart", IsSidecar: true}, + {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, }, hasLeader: true, action: func(ar *allocRunner, alloc *structs.Allocation) error { @@ -704,12 +704,12 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { { name: "main stopped unexpectedly and restarts once", taskDefs: []mock.LifecycleTaskDef{ - {"main", "2s", 0, "", false}, - {"prestart-oneshot", "1s", 0, "prestart", false}, - {"prestart-sidecar", "100s", 0, "prestart", true}, - {"poststart-oneshot", "1s", 0, "poststart", false}, - {"poststart-sidecar", "100s", 0, "poststart", true}, - {"poststop", "1s", 0, "poststop", false}, + {Name: "main", RunFor: "2s", ExitCode: 0, Hook: "", IsSidecar: false}, + {Name: "prestart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "prestart", IsSidecar: false}, + {Name: "prestart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "prestart", IsSidecar: true}, + {Name: "poststart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "poststart", IsSidecar: false}, + {Name: "poststart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "poststart", IsSidecar: true}, + {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, }, action: func(ar *allocRunner, alloc *structs.Allocation) error { time.Sleep(3 * time.Second) // make sure main task has exited @@ -727,12 +727,12 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { { name: "leader main stopped unexpectedly and restarts once", taskDefs: []mock.LifecycleTaskDef{ - {"main", "2s", 0, "", false}, - {"prestart-oneshot", "1s", 0, "prestart", false}, - {"prestart-sidecar", "100s", 0, "prestart", true}, - {"poststart-oneshot", "1s", 0, "poststart", false}, - {"poststart-sidecar", "100s", 0, "poststart", true}, - {"poststop", "1s", 0, "poststop", false}, + {Name: "main", RunFor: "2s", ExitCode: 0, Hook: "", IsSidecar: false}, + {Name: "prestart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "prestart", IsSidecar: false}, + {Name: "prestart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "prestart", IsSidecar: true}, + {Name: "poststart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "poststart", IsSidecar: false}, + {Name: "poststart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "poststart", IsSidecar: true}, + {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, }, action: func(ar *allocRunner, alloc *structs.Allocation) error { time.Sleep(3 * time.Second) // make sure main task has exited @@ -750,12 +750,12 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { { name: "failed main task cannot be restarted", taskDefs: []mock.LifecycleTaskDef{ - {"main", "2s", 1, "", false}, - {"prestart-oneshot", "1s", 0, "prestart", false}, - {"prestart-sidecar", "100s", 0, "prestart", true}, - {"poststart-oneshot", "1s", 0, "poststart", false}, - {"poststart-sidecar", "100s", 0, "poststart", true}, - {"poststop", "1s", 0, "poststop", false}, + {Name: "main", RunFor: "2s", ExitCode: 1, Hook: "", IsSidecar: false}, + {Name: "prestart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "prestart", IsSidecar: false}, + {Name: "prestart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "prestart", IsSidecar: true}, + {Name: "poststart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "poststart", IsSidecar: false}, + {Name: "poststart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "poststart", IsSidecar: true}, + {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, }, action: func(ar *allocRunner, alloc *structs.Allocation) error { // make sure main task has had a chance to restart once on its diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index febd534f750..916d70d0bc9 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -2613,7 +2613,7 @@ func TestTaskRunner_UnregisterConsul_Retries(t *testing.T) { func testWaitForTaskToStart(t *testing.T, tr *TaskRunner) { testutil.WaitForResult(func() (bool, error) { ts := tr.TaskState() - return ts.State == structs.TaskStateRunning, fmt.Errorf("expected task to be runnig, got %v", ts.State) + return ts.State == structs.TaskStateRunning, fmt.Errorf("expected task to be running, got %v", ts.State) }, func(err error) { require.NoError(t, err) }) From e620cd187452b45ab04bf65d56aa2f90fabfc9eb Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Wed, 24 Aug 2022 15:42:34 -0400 Subject: [PATCH 13/14] taskrunner: add different Restart modes Using the task event to differentiate between the allocrunner restart methods proved to be confusing for developers to understand how it all worked. So instead of relying on the event type, this commit separated the logic of restarting an taskRunner into two methods: - `Restart` will retain the current behaviour and only will only restart the task if it's currently running. - `ForceRestart` is the new method where a `dead` task is allowed to restart if its `Run()` method is still active. Callers will need to restart the allocRunner taskCoordinator to make sure it will allow the task to run again. --- client/allocrunner/alloc_runner.go | 43 +++++---------- client/allocrunner/alloc_runner_test.go | 16 ++---- client/allocrunner/taskrunner/lifecycle.go | 55 ++++++++++++++++--- .../taskrunner/task_runner_test.go | 8 +-- client/client.go | 4 +- command/agent/consul/check_watcher.go | 2 +- nomad/structs/structs.go | 20 ------- 7 files changed, 72 insertions(+), 76 deletions(-) diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 0e8885ca33c..1d50c2595bb 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -1230,25 +1230,12 @@ func (ar *allocRunner) GetTaskEventHandler(taskName string) drivermanager.EventH // Restart satisfies the WorkloadRestarter interface and restarts all tasks // that are currently running. -// -// The event type will be set to TaskRestartRunningSignal to comply with -// internal restart logic requirements. func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error { - if event.Type != structs.TaskRestartRunningSignal { - event.Type = structs.TaskRestartRunningSignal - } - return ar.restartTasks(ctx, event, failure) + return ar.restartTasks(ctx, event, failure, false) } // RestartTask restarts the provided task. -// -// The event type will be set to TaskRestartSignal to comply with internal -// restart logic requirements. func (ar *allocRunner) RestartTask(taskName string, event *structs.TaskEvent) error { - if event.Type != structs.TaskRestartSignal { - event.Type = structs.TaskRestartSignal - } - tr, ok := ar.tasks[taskName] if !ok { return fmt.Errorf("Could not find task runner for task: %s", taskName) @@ -1258,31 +1245,20 @@ func (ar *allocRunner) RestartTask(taskName string, event *structs.TaskEvent) er } // RestartRunning restarts all tasks that are currently running. -// -// The event type will be set to TaskRestartRunningSignal to comply with -// internal restart logic requirements. func (ar *allocRunner) RestartRunning(event *structs.TaskEvent) error { - if event.Type != structs.TaskRestartRunningSignal { - event.Type = structs.TaskRestartRunningSignal - } - return ar.restartTasks(context.TODO(), event, false) + return ar.restartTasks(context.TODO(), event, false, false) } // RestartAll restarts all tasks in the allocation, including dead ones. They -// will restart following their lifecycle order. Only the TaskRestartAllSignal -// event type may be used. +// will restart following their lifecycle order. func (ar *allocRunner) RestartAll(event *structs.TaskEvent) error { - if event.Type != structs.TaskRestartAllSignal { - return fmt.Errorf("Invalid event %s for all tasks restart request", event.Type) - } - // Restart the taskCoordinator to allow dead tasks to run again. ar.taskCoordinator.Restart() - return ar.restartTasks(context.TODO(), event, false) + return ar.restartTasks(context.TODO(), event, false, true) } // restartTasks restarts all task runners concurrently. -func (ar *allocRunner) restartTasks(ctx context.Context, event *structs.TaskEvent, failure bool) error { +func (ar *allocRunner) restartTasks(ctx context.Context, event *structs.TaskEvent, failure bool, force bool) error { waitCh := make(chan struct{}) var err *multierror.Error var errMutex sync.Mutex @@ -1297,7 +1273,14 @@ func (ar *allocRunner) restartTasks(ctx context.Context, event *structs.TaskEven wg.Add(1) go func(taskName string, taskRunner *taskrunner.TaskRunner) { defer wg.Done() - e := taskRunner.Restart(ctx, event.Copy(), failure) + + var e error + if force { + e = taskRunner.ForceRestart(ctx, event.Copy(), failure) + } else { + e = taskRunner.Restart(ctx, event.Copy(), failure) + } + // Ignore ErrTaskNotRunning errors since tasks that are not // running are expected to not be restarted. if e != nil && e != taskrunner.ErrTaskNotRunning { diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index 4913addd73b..90b94657b3d 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -504,6 +504,8 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { Mode: structs.RestartPolicyModeFail, } + ev := &structs.TaskEvent{Type: structs.TaskRestartSignal} + testCases := []struct { name string taskDefs []mock.LifecycleTaskDef @@ -516,7 +518,6 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { { name: "restart entire allocation", action: func(ar *allocRunner, alloc *structs.Allocation) error { - ev := &structs.TaskEvent{Type: structs.TaskRestartAllSignal} return ar.RestartAll(ev) }, expectedAfter: map[string]structs.TaskState{ @@ -531,7 +532,6 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { { name: "restart only running tasks", action: func(ar *allocRunner, alloc *structs.Allocation) error { - ev := &structs.TaskEvent{Type: structs.TaskRestartRunningSignal} return ar.RestartRunning(ev) }, expectedAfter: map[string]structs.TaskState{ @@ -555,7 +555,6 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { }, isBatch: true, action: func(ar *allocRunner, alloc *structs.Allocation) error { - ev := &structs.TaskEvent{Type: structs.TaskRestartAllSignal} return ar.RestartAll(ev) }, expectedAfter: map[string]structs.TaskState{ @@ -579,7 +578,6 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { }, isBatch: true, action: func(ar *allocRunner, alloc *structs.Allocation) error { - ev := &structs.TaskEvent{Type: structs.TaskRestartRunningSignal} return ar.RestartRunning(ev) }, expectedAfter: map[string]structs.TaskState{ @@ -595,7 +593,6 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { name: "restart entire allocation with leader", hasLeader: true, action: func(ar *allocRunner, alloc *structs.Allocation) error { - ev := &structs.TaskEvent{Type: structs.TaskRestartAllSignal} return ar.RestartAll(ev) }, expectedAfter: map[string]structs.TaskState{ @@ -627,7 +624,6 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { { name: "restart main task", action: func(ar *allocRunner, alloc *structs.Allocation) error { - ev := &structs.TaskEvent{Type: structs.TaskRestartSignal} return ar.RestartTask("main", ev) }, expectedAfter: map[string]structs.TaskState{ @@ -643,7 +639,7 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { name: "restart leader main task", hasLeader: true, action: func(ar *allocRunner, alloc *structs.Allocation) error { - return ar.RestartTask("main", &structs.TaskEvent{Type: structs.TaskRestartSignal}) + return ar.RestartTask("main", ev) }, expectedAfter: map[string]structs.TaskState{ "main": structs.TaskState{State: "running", Restarts: 1}, @@ -761,7 +757,7 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { // make sure main task has had a chance to restart once on its // own and fail again before we try to manually restart it time.Sleep(5 * time.Second) - return ar.RestartTask("main", &structs.TaskEvent{Type: structs.TaskRestartSignal}) + return ar.RestartTask("main", ev) }, expectedErr: "Task not running", expectedAfter: map[string]structs.TaskState{ @@ -776,7 +772,7 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { { name: "restart prestart-sidecar task", action: func(ar *allocRunner, alloc *structs.Allocation) error { - return ar.RestartTask("prestart-sidecar", &structs.TaskEvent{Type: structs.TaskRestartSignal}) + return ar.RestartTask("prestart-sidecar", ev) }, expectedAfter: map[string]structs.TaskState{ "main": structs.TaskState{State: "running", Restarts: 0}, @@ -790,7 +786,7 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { { name: "restart poststart-sidecar task", action: func(ar *allocRunner, alloc *structs.Allocation) error { - return ar.RestartTask("poststart-sidecar", &structs.TaskEvent{Type: structs.TaskRestartSignal}) + return ar.RestartTask("poststart-sidecar", ev) }, expectedAfter: map[string]structs.TaskState{ "main": structs.TaskState{State: "running", Restarts: 0}, diff --git a/client/allocrunner/taskrunner/lifecycle.go b/client/allocrunner/taskrunner/lifecycle.go index 6f3599dce81..90c3d37189a 100644 --- a/client/allocrunner/taskrunner/lifecycle.go +++ b/client/allocrunner/taskrunner/lifecycle.go @@ -6,13 +6,34 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) -// Restart a task. Returns immediately if no task is running. Blocks until -// existing task exits or passed-in context is canceled. +// Restart restarts a task that is already running. Returns an error if the +// task is not running. Blocks until existing task exits or passed-in context +// is canceled. func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error { tr.logger.Trace("Restart requested", "failure", failure, "event", event.GoString()) - // Check if the task is able to restart based on its state and the type of - // restart event that was triggered. + taskState := tr.TaskState() + if taskState == nil { + return ErrTaskNotRunning + } + + switch taskState.State { + case structs.TaskStatePending, structs.TaskStateDead: + return ErrTaskNotRunning + } + + return tr.restartImpl(ctx, event, failure) +} + +// ForceRestart restarts a task that is already running or reruns it if dead. +// Returns an error if the task is not able to rerun. Blocks until existing +// task exits or passed-in context is canceled. +// +// Callers must restart the AllocRuner taskCoordinator beforehand to make sure +// the task will be able to run again. +func (tr *TaskRunner) ForceRestart(ctx context.Context, event *structs.TaskEvent, failure bool) error { + tr.logger.Trace("Force restart requested", "failure", failure, "event", event.GoString()) + taskState := tr.TaskState() if taskState == nil { return ErrTaskNotRunning @@ -21,23 +42,39 @@ func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, fai tr.stateLock.Lock() localState := tr.localState.Copy() tr.stateLock.Unlock() + if localState == nil { return ErrTaskNotRunning } switch taskState.State { case structs.TaskStatePending: - // Tasks that are "pending" are never allowed to restart. return ErrTaskNotRunning + case structs.TaskStateDead: - // Tasks that are "dead" are only allowed to restart when restarting - // all tasks in the alloc, otherwise the taskCoordinator will prevent - // it from running again, and if their Run method is still running. - if event.Type != structs.TaskRestartAllSignal || localState.RunComplete { + // Tasks that are in the "dead" state are only allowed to restart if + // their Run() method is still active. + if localState.RunComplete { return ErrTaskNotRunning } } + return tr.restartImpl(ctx, event, failure) +} + +// restartImpl implements to task restart process. +// +// It should never be called directly as it doesn't verify if the task state +// allows for a restart. +func (tr *TaskRunner) restartImpl(ctx context.Context, event *structs.TaskEvent, failure bool) error { + + // Check if the task is able to restart based on its state and the type of + // restart event that was triggered. + taskState := tr.TaskState() + if taskState == nil { + return ErrTaskNotRunning + } + // Emit the event since it may take a long time to kill tr.EmitEvent(event) diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index 916d70d0bc9..53ffbf87e30 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -397,8 +397,8 @@ func TestTaskRunner_Restore_Dead(t *testing.T) { // Verify that we can restart task. // Retry a few times as the newTR.Run() may not have started yet. testutil.WaitForResult(func() (bool, error) { - ev := &structs.TaskEvent{Type: structs.TaskRestartAllSignal} - err = newTR.Restart(context.Background(), ev, false) + ev := &structs.TaskEvent{Type: structs.TaskRestartSignal} + err = newTR.Rerun(context.Background(), ev, false) return err == nil, err }, func(err error) { require.NoError(t, err) @@ -425,8 +425,8 @@ func TestTaskRunner_Restore_Dead(t *testing.T) { go newTR2.Run() defer newTR2.Kill(context.Background(), structs.NewTaskEvent("cleanup")) - ev := &structs.TaskEvent{Type: structs.TaskRestartAllSignal} - err = newTR2.Restart(context.Background(), ev, false) + ev := &structs.TaskEvent{Type: structs.TaskRestartSignal} + err = newTR2.Rerun(context.Background(), ev, false) require.Equal(t, err, ErrTaskNotRunning) } diff --git a/client/client.go b/client/client.go index 4932c858ed3..9e8253132ee 100644 --- a/client/client.go +++ b/client/client.go @@ -943,12 +943,12 @@ func (c *Client) RestartAllocation(allocID, taskName string, allTasks bool) erro } if allTasks { - event := structs.NewTaskEvent(structs.TaskRestartAllSignal). + event := structs.NewTaskEvent(structs.TaskRestartSignal). SetRestartReason("User requested all tasks to restart") return ar.RestartAll(event) } - event := structs.NewTaskEvent(structs.TaskRestartRunningSignal). + event := structs.NewTaskEvent(structs.TaskRestartSignal). SetRestartReason("User requested running tasks to restart") return ar.RestartRunning(event) } diff --git a/command/agent/consul/check_watcher.go b/command/agent/consul/check_watcher.go index 926a735d207..ee2e18a287b 100644 --- a/command/agent/consul/check_watcher.go +++ b/command/agent/consul/check_watcher.go @@ -103,7 +103,7 @@ func (c *checkRestart) apply(ctx context.Context, now time.Time, status string) // Tell TaskRunner to restart due to failure reason := fmt.Sprintf("healthcheck: check %q unhealthy", c.checkName) - event := structs.NewTaskEvent(structs.TaskRestartRunningSignal).SetRestartReason(reason) + event := structs.NewTaskEvent(structs.TaskRestartSignal).SetRestartReason(reason) go asyncRestart(ctx, c.logger, c.task, event) return true } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index e61e0134073..afdbe7313bb 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -8022,14 +8022,6 @@ const ( // restarted TaskRestartSignal = "Restart Signaled" - // TaskRestartRunningSignal indicates that all tasks in the allocation that - // are currently running have been signaled to be restarted. - TaskRestartRunningSignal = "Restart Running Signaled" - - // TaskRestartAllSignal indicates that all tasks in the allocation have - // been signaled to be restarted, even the ones that have already run. - TaskRestartAllSignal = "Restart All Signaled" - // TaskSignaling indicates that the task is being signalled. TaskSignaling = "Signaling" @@ -8287,18 +8279,6 @@ func (e *TaskEvent) PopulateEventDisplayMessage() { } else { desc = "Task signaled to restart" } - case TaskRestartRunningSignal: - if e.RestartReason != "" { - desc = e.RestartReason - } else { - desc = "Running tasks signaled to restart" - } - case TaskRestartAllSignal: - if e.RestartReason != "" { - desc = e.RestartReason - } else { - desc = "All tasks signaled to restart" - } case TaskDriverMessage: desc = e.DriverMessage case TaskLeaderDead: From 8037a17e65fa504946f43669763b0a8feb0acdda Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Wed, 24 Aug 2022 16:45:01 -0400 Subject: [PATCH 14/14] minor fixes --- client/allocrunner/taskrunner/task_runner.go | 10 +++++----- client/allocrunner/taskrunner/task_runner_test.go | 10 +++++----- website/content/docs/commands/alloc/restart.mdx | 3 ++- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index c6899d1f938..445cf044038 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -518,10 +518,10 @@ func (tr *TaskRunner) Run() { runComplete := tr.localState.RunComplete tr.stateLock.RUnlock() - // If restoring a dead task, ensure that task is cleared and all post hooks - // are called without additional state updates. - // If the alloc is not terminal we must proceed until the ALLOC_RESTART - // loop to allow the task to run again in case the alloc is restarted. + // If restoring a dead task, ensure the task is cleared and, if the local + // state indicates that the previous Run() call is complete, execute all + // post stop hooks and exit early, otherwise proceed until the + // ALLOC_RESTART loop skipping MAIN since the task is dead. if dead { // do cleanup functions without emitting any additional events/work // to handle cases where we restored a dead task where client terminated @@ -686,7 +686,7 @@ MAIN: tr.UpdateState(structs.TaskStateDead, nil) // Wait here in case the allocation is restarted. Poststop tasks will never - // run again so, skip them to avoid blocking forever. + // run again so skip them to avoid blocking forever. if !tr.Task().IsPoststop() { ALLOC_RESTART: // Run in a loop to handle cases where restartCh is triggered but the diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index 53ffbf87e30..6c9eda48f70 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -398,7 +398,7 @@ func TestTaskRunner_Restore_Dead(t *testing.T) { // Retry a few times as the newTR.Run() may not have started yet. testutil.WaitForResult(func() (bool, error) { ev := &structs.TaskEvent{Type: structs.TaskRestartSignal} - err = newTR.Rerun(context.Background(), ev, false) + err = newTR.ForceRestart(context.Background(), ev, false) return err == nil, err }, func(err error) { require.NoError(t, err) @@ -426,7 +426,7 @@ func TestTaskRunner_Restore_Dead(t *testing.T) { defer newTR2.Kill(context.Background(), structs.NewTaskEvent("cleanup")) ev := &structs.TaskEvent{Type: structs.TaskRestartSignal} - err = newTR2.Rerun(context.Background(), ev, false) + err = newTR2.ForceRestart(context.Background(), ev, false) require.Equal(t, err, ErrTaskNotRunning) } @@ -1386,15 +1386,15 @@ func TestTaskRunner_CheckWatcher_Restart(t *testing.T) { "Received", "Task Setup", "Started", - "Restart Running Signaled", + "Restart Signaled", "Terminated", "Restarting", "Started", - "Restart Running Signaled", + "Restart Signaled", "Terminated", "Restarting", "Started", - "Restart Running Signaled", + "Restart Signaled", "Terminated", "Not Restarting", } diff --git a/website/content/docs/commands/alloc/restart.mdx b/website/content/docs/commands/alloc/restart.mdx index ece6fde933f..32f14b6e834 100644 --- a/website/content/docs/commands/alloc/restart.mdx +++ b/website/content/docs/commands/alloc/restart.mdx @@ -39,7 +39,8 @@ allocation's namespace. ## Restart Options - `-all-tasks`: If set, all tasks in the allocation will be restarted, even the - ones that already ran. This option cannot be used if a task is defined. + ones that already ran. This option cannot be used with `-task` or the + `` argument. - `-task`: Specify the individual task to restart. This option cannot be used with `-all-tasks`.