Skip to content

Commit d9254ab

Browse files
Task lifecycle restart (#14127) (#14312)
* allocrunner: handle lifecycle when all tasks die When all tasks die the Coordinator must transition to its terminal state, coordinatorStatePoststop, to unblock poststop tasks. Since this could happen at any time (for example, a prestart task dies), all states must be able to transition to this terminal state. * allocrunner: implement different alloc restarts Add a new alloc restart mode where all tasks are restarted, even if they have already exited. Also unifies the alloc restart logic to use the implementation that restarts tasks concurrently and ignores ErrTaskNotRunning errors since those are expected when restarting the allocation. * allocrunner: allow tasks to run again Prevent the task runner Run() method from exiting to allow a dead task to run again. When the task runner is signaled to restart, the function will jump back to the MAIN loop and run it again. The task runner determines if a task needs to run again based on two new task events that were added to differentiate between a request to restart a specific task, the tasks that are currently running, or all tasks that have already run. * api/cli: add support for all tasks alloc restart Implement the new -all-tasks alloc restart CLI flag and its API counterpar, AllTasks. The client endpoint calls the appropriate restart method from the allocrunner depending on the restart parameters used. * test: fix tasklifecycle Coordinator test * allocrunner: kill taskrunners if all tasks are dead When all non-poststop tasks are dead we need to kill the taskrunners so we don't leak their goroutines, which are blocked in the alloc restart loop. This also ensures the allocrunner exits on its own. * taskrunner: fix tests that waited on WaitCh Now that "dead" tasks may run again, the taskrunner Run() method will not return when the task finishes running, so tests must wait for the task state to be "dead" instead of using the WaitCh, since it won't be closed until the taskrunner is killed. * tests: add tests for all tasks alloc restart * changelog: add entry for #14127 * taskrunner: fix restore logic. The first implementation of the task runner restore process relied on server data (`tr.Alloc().TerminalStatus()`) which may not be available to the client at the time of restore. It also had the incorrect code path. When restoring a dead task the driver handle always needs to be clear cleanly using `clearDriverHandle` otherwise, after exiting the MAIN loop, the task may be killed by `tr.handleKill`. The fix is to store the state of the Run() loop in the task runner local client state: if the task runner ever exits this loop cleanly (not with a shutdown) it will never be able to run again. So if the Run() loops starts with this local state flag set, it must exit early. This local state flag is also being checked on task restart requests. If the task is "dead" and its Run() loop is not active it will never be able to run again. * address code review requests * apply more code review changes * taskrunner: add different Restart modes Using the task event to differentiate between the allocrunner restart methods proved to be confusing for developers to understand how it all worked. So instead of relying on the event type, this commit separated the logic of restarting an taskRunner into two methods: - `Restart` will retain the current behaviour and only will only restart the task if it's currently running. - `ForceRestart` is the new method where a `dead` task is allowed to restart if its `Run()` method is still active. Callers will need to restart the allocRunner taskCoordinator to make sure it will allow the task to run again. * minor fixes Co-authored-by: Luiz Aoqui <[email protected]>
1 parent 75a7a7e commit d9254ab

22 files changed

+1094
-211
lines changed

.changelog/14127.txt

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
```release-note:improvement
2+
client: add option to restart all tasks of an allocation, regardless of lifecycle type or state.
3+
```
4+
5+
```release-note:improvement
6+
client: only start poststop tasks after poststart tasks are done.
7+
```

api/allocations.go

+16
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,9 @@ func (a *Allocations) GC(alloc *Allocation, q *QueryOptions) error {
116116
return err
117117
}
118118

119+
// Restart restarts the tasks that are currently running or a specific task if
120+
// taskName is provided. An error is returned if the task to be restarted is
121+
// not running.
119122
func (a *Allocations) Restart(alloc *Allocation, taskName string, q *QueryOptions) error {
120123
req := AllocationRestartRequest{
121124
TaskName: taskName,
@@ -126,6 +129,18 @@ func (a *Allocations) Restart(alloc *Allocation, taskName string, q *QueryOption
126129
return err
127130
}
128131

132+
// RestartAllTasks restarts all tasks in the allocation, regardless of
133+
// lifecycle type or state. Tasks will restart following their lifecycle order.
134+
func (a *Allocations) RestartAllTasks(alloc *Allocation, q *QueryOptions) error {
135+
req := AllocationRestartRequest{
136+
AllTasks: true,
137+
}
138+
139+
var resp struct{}
140+
_, err := a.client.putQuery("/v1/client/allocation/"+alloc.ID+"/restart", &req, &resp, q)
141+
return err
142+
}
143+
129144
func (a *Allocations) Stop(alloc *Allocation, q *QueryOptions) (*AllocStopResponse, error) {
130145
var resp AllocStopResponse
131146
_, err := a.client.putQuery("/v1/allocation/"+alloc.ID+"/stop", nil, &resp, q)
@@ -407,6 +422,7 @@ func (a Allocation) RescheduleInfo(t time.Time) (int, int) {
407422

408423
type AllocationRestartRequest struct {
409424
TaskName string
425+
AllTasks bool
410426
}
411427

412428
type AllocSignalRequest struct {

client/alloc_endpoint.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func (a *Allocations) Restart(args *nstructs.AllocRestartRequest, reply *nstruct
103103
return nstructs.ErrPermissionDenied
104104
}
105105

106-
return a.c.RestartAllocation(args.AllocID, args.TaskName)
106+
return a.c.RestartAllocation(args.AllocID, args.TaskName, args.AllTasks)
107107
}
108108

109109
// Stats is used to collect allocation statistics

client/alloc_endpoint_test.go

+39
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,45 @@ func TestAllocations_Restart(t *testing.T) {
6767
})
6868
}
6969

70+
func TestAllocations_RestartAllTasks(t *testing.T) {
71+
ci.Parallel(t)
72+
73+
require := require.New(t)
74+
client, cleanup := TestClient(t, nil)
75+
defer cleanup()
76+
77+
alloc := mock.LifecycleAlloc()
78+
require.Nil(client.addAlloc(alloc, ""))
79+
80+
// Can't restart all tasks while specifying a task name.
81+
req := &nstructs.AllocRestartRequest{
82+
AllocID: alloc.ID,
83+
AllTasks: true,
84+
TaskName: "web",
85+
}
86+
var resp nstructs.GenericResponse
87+
err := client.ClientRPC("Allocations.Restart", &req, &resp)
88+
require.Error(err)
89+
90+
// Good request.
91+
req = &nstructs.AllocRestartRequest{
92+
AllocID: alloc.ID,
93+
AllTasks: true,
94+
}
95+
96+
testutil.WaitForResult(func() (bool, error) {
97+
var resp2 nstructs.GenericResponse
98+
err := client.ClientRPC("Allocations.Restart", &req, &resp2)
99+
if err != nil && strings.Contains(err.Error(), "not running") {
100+
return false, err
101+
}
102+
103+
return true, nil
104+
}, func(err error) {
105+
t.Fatalf("err: %v", err)
106+
})
107+
}
108+
70109
func TestAllocations_Restart_ACL(t *testing.T) {
71110
ci.Parallel(t)
72111
require := require.New(t)

client/allocrunner/alloc_runner.go

+88-57
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
cstate "github.com/hashicorp/nomad/client/state"
2828
cstructs "github.com/hashicorp/nomad/client/structs"
2929
"github.com/hashicorp/nomad/client/vaultclient"
30-
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
3130
"github.com/hashicorp/nomad/helper/pointer"
3231
"github.com/hashicorp/nomad/nomad/structs"
3332
"github.com/hashicorp/nomad/plugins/device"
@@ -541,40 +540,64 @@ func (ar *allocRunner) handleTaskStateUpdates() {
541540
}
542541
}
543542

544-
// if all live runners are sidecars - kill alloc
545-
if killEvent == nil && hasSidecars && !hasNonSidecarTasks(liveRunners) {
546-
killEvent = structs.NewTaskEvent(structs.TaskMainDead)
547-
}
548-
549-
// If there's a kill event set and live runners, kill them
550-
if killEvent != nil && len(liveRunners) > 0 {
551-
552-
// Log kill reason
553-
switch killEvent.Type {
554-
case structs.TaskLeaderDead:
555-
ar.logger.Debug("leader task dead, destroying all tasks", "leader_task", killTask)
556-
case structs.TaskMainDead:
557-
ar.logger.Debug("main tasks dead, destroying all sidecar tasks")
558-
default:
559-
ar.logger.Debug("task failure, destroying all tasks", "failed_task", killTask)
543+
if len(liveRunners) > 0 {
544+
// if all live runners are sidecars - kill alloc
545+
onlySidecarsRemaining := hasSidecars && !hasNonSidecarTasks(liveRunners)
546+
if killEvent == nil && onlySidecarsRemaining {
547+
killEvent = structs.NewTaskEvent(structs.TaskMainDead)
560548
}
561549

562-
// Emit kill event for live runners
563-
for _, tr := range liveRunners {
564-
tr.EmitEvent(killEvent)
565-
}
550+
// If there's a kill event set and live runners, kill them
551+
if killEvent != nil {
552+
553+
// Log kill reason
554+
switch killEvent.Type {
555+
case structs.TaskLeaderDead:
556+
ar.logger.Debug("leader task dead, destroying all tasks", "leader_task", killTask)
557+
case structs.TaskMainDead:
558+
ar.logger.Debug("main tasks dead, destroying all sidecar tasks")
559+
default:
560+
ar.logger.Debug("task failure, destroying all tasks", "failed_task", killTask)
561+
}
566562

567-
// Kill 'em all
568-
states = ar.killTasks()
563+
// Emit kill event for live runners
564+
for _, tr := range liveRunners {
565+
tr.EmitEvent(killEvent)
566+
}
567+
568+
// Kill 'em all
569+
states = ar.killTasks()
570+
571+
// Wait for TaskRunners to exit before continuing. This will
572+
// prevent looping before TaskRunners have transitioned to
573+
// Dead.
574+
for _, tr := range liveRunners {
575+
ar.logger.Info("waiting for task to exit", "task", tr.Task().Name)
576+
select {
577+
case <-tr.WaitCh():
578+
case <-ar.waitCh:
579+
}
580+
}
581+
}
582+
} else {
583+
// If there are no live runners left kill all non-poststop task
584+
// runners to unblock them from the alloc restart loop.
585+
for _, tr := range ar.tasks {
586+
if tr.IsPoststopTask() {
587+
continue
588+
}
569589

570-
// Wait for TaskRunners to exit before continuing to
571-
// prevent looping before TaskRunners have transitioned
572-
// to Dead.
573-
for _, tr := range liveRunners {
574-
ar.logger.Info("killing task", "task", tr.Task().Name)
575590
select {
576591
case <-tr.WaitCh():
577592
case <-ar.waitCh:
593+
default:
594+
// Kill task runner without setting an event because the
595+
// task is already dead, it's just waiting in the alloc
596+
// restart loop.
597+
err := tr.Kill(context.TODO(), nil)
598+
if err != nil {
599+
ar.logger.Warn("failed to kill task", "task", tr.Task().Name, "error", err)
600+
}
578601
}
579602
}
580603
}
@@ -642,7 +665,7 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
642665
break
643666
}
644667

645-
// Kill the rest non-sidecar or poststop tasks concurrently
668+
// Kill the rest non-sidecar and non-poststop tasks concurrently
646669
wg := sync.WaitGroup{}
647670
for name, tr := range ar.tasks {
648671
// Filter out poststop and sidecar tasks so that they stop after all the other tasks are killed
@@ -1199,19 +1222,37 @@ func (ar *allocRunner) GetTaskEventHandler(taskName string) drivermanager.EventH
11991222
return nil
12001223
}
12011224

1202-
// RestartTask signalls the task runner for the provided task to restart.
1203-
func (ar *allocRunner) RestartTask(taskName string, taskEvent *structs.TaskEvent) error {
1225+
// Restart satisfies the WorkloadRestarter interface and restarts all tasks
1226+
// that are currently running.
1227+
func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
1228+
return ar.restartTasks(ctx, event, failure, false)
1229+
}
1230+
1231+
// RestartTask restarts the provided task.
1232+
func (ar *allocRunner) RestartTask(taskName string, event *structs.TaskEvent) error {
12041233
tr, ok := ar.tasks[taskName]
12051234
if !ok {
12061235
return fmt.Errorf("Could not find task runner for task: %s", taskName)
12071236
}
12081237

1209-
return tr.Restart(context.TODO(), taskEvent, false)
1238+
return tr.Restart(context.TODO(), event, false)
12101239
}
12111240

1212-
// Restart satisfies the WorkloadRestarter interface restarts all task runners
1213-
// concurrently
1214-
func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
1241+
// RestartRunning restarts all tasks that are currently running.
1242+
func (ar *allocRunner) RestartRunning(event *structs.TaskEvent) error {
1243+
return ar.restartTasks(context.TODO(), event, false, false)
1244+
}
1245+
1246+
// RestartAll restarts all tasks in the allocation, including dead ones. They
1247+
// will restart following their lifecycle order.
1248+
func (ar *allocRunner) RestartAll(event *structs.TaskEvent) error {
1249+
// Restart the taskCoordinator to allow dead tasks to run again.
1250+
ar.taskCoordinator.Restart()
1251+
return ar.restartTasks(context.TODO(), event, false, true)
1252+
}
1253+
1254+
// restartTasks restarts all task runners concurrently.
1255+
func (ar *allocRunner) restartTasks(ctx context.Context, event *structs.TaskEvent, failure bool, force bool) error {
12151256
waitCh := make(chan struct{})
12161257
var err *multierror.Error
12171258
var errMutex sync.Mutex
@@ -1224,10 +1265,19 @@ func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, fa
12241265
defer close(waitCh)
12251266
for tn, tr := range ar.tasks {
12261267
wg.Add(1)
1227-
go func(taskName string, r agentconsul.WorkloadRestarter) {
1268+
go func(taskName string, taskRunner *taskrunner.TaskRunner) {
12281269
defer wg.Done()
1229-
e := r.Restart(ctx, event, failure)
1230-
if e != nil {
1270+
1271+
var e error
1272+
if force {
1273+
e = taskRunner.ForceRestart(ctx, event.Copy(), failure)
1274+
} else {
1275+
e = taskRunner.Restart(ctx, event.Copy(), failure)
1276+
}
1277+
1278+
// Ignore ErrTaskNotRunning errors since tasks that are not
1279+
// running are expected to not be restarted.
1280+
if e != nil && e != taskrunner.ErrTaskNotRunning {
12311281
errMutex.Lock()
12321282
defer errMutex.Unlock()
12331283
err = multierror.Append(err, fmt.Errorf("failed to restart task %s: %v", taskName, e))
@@ -1245,25 +1295,6 @@ func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, fa
12451295
return err.ErrorOrNil()
12461296
}
12471297

1248-
// RestartAll signalls all task runners in the allocation to restart and passes
1249-
// a copy of the task event to each restart event.
1250-
// Returns any errors in a concatenated form.
1251-
func (ar *allocRunner) RestartAll(taskEvent *structs.TaskEvent) error {
1252-
var err *multierror.Error
1253-
1254-
// run alloc task restart hooks
1255-
ar.taskRestartHooks()
1256-
1257-
for tn := range ar.tasks {
1258-
rerr := ar.RestartTask(tn, taskEvent.Copy())
1259-
if rerr != nil {
1260-
err = multierror.Append(err, rerr)
1261-
}
1262-
}
1263-
1264-
return err.ErrorOrNil()
1265-
}
1266-
12671298
// Signal sends a signal request to task runners inside an allocation. If the
12681299
// taskName is empty, then it is sent to all tasks.
12691300
func (ar *allocRunner) Signal(taskName, signal string) error {

0 commit comments

Comments
 (0)