Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

alloc_runner: stop sidecar tasks last #13055

Merged
merged 6 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/13055.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
lifecycle: fixed a bug where sidecar tasks were not being stopped last
```
46 changes: 35 additions & 11 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,21 +525,21 @@ func (ar *allocRunner) handleTaskStateUpdates() {
states := make(map[string]*structs.TaskState, trNum)

for name, tr := range ar.tasks {
state := tr.TaskState()
states[name] = state
taskState := tr.TaskState()
states[name] = taskState

if tr.IsPoststopTask() {
continue
}

// Capture live task runners in case we need to kill them
if state.State != structs.TaskStateDead {
if taskState.State != structs.TaskStateDead {
liveRunners = append(liveRunners, tr)
continue
}

// Task is dead, determine if other tasks should be killed
if state.Failed {
if taskState.Failed {
// Only set failed event if no event has been
// set yet to give dead leaders priority.
if killEvent == nil {
Expand Down Expand Up @@ -626,16 +626,16 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
ar.logger.Warn("error stopping leader task", "error", err, "task_name", name)
}

state := tr.TaskState()
states[name] = state
taskState := tr.TaskState()
states[name] = taskState
break
}

// Kill the rest concurrently
// Kill the rest non-sidecar or poststop tasks concurrently
wg := sync.WaitGroup{}
for name, tr := range ar.tasks {
// Filter out poststop tasks so they run after all the other tasks are killed
if tr.IsLeader() || tr.IsPoststopTask() {
// Filter out poststop and sidecar tasks so that they stop after all the other tasks are killed
if tr.IsLeader() || tr.IsPoststopTask() || tr.IsSidecarTask() {
continue
}

Expand All @@ -649,9 +649,33 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
ar.logger.Warn("error stopping task", "error", err, "task_name", name)
}

state := tr.TaskState()
taskState := tr.TaskState()
mu.Lock()
states[name] = taskState
mu.Unlock()
}(name, tr)
}
wg.Wait()

// Kill the sidecar tasks last.
for name, tr := range ar.tasks {
if !tr.IsSidecarTask() || tr.IsLeader() || tr.IsPoststopTask() {
continue
}

wg.Add(1)
go func(name string, tr *taskrunner.TaskRunner) {
defer wg.Done()
taskEvent := structs.NewTaskEvent(structs.TaskKilling)
taskEvent.SetKillTimeout(tr.Task().KillTimeout)
err := tr.Kill(context.TODO(), taskEvent)
if err != nil && err != taskrunner.ErrTaskNotRunning {
ar.logger.Warn("error stopping sidecar task", "error", err, "task_name", name)
}

taskState := tr.TaskState()
mu.Lock()
states[name] = state
states[name] = taskState
mu.Unlock()
}(name, tr)
}
Expand Down
130 changes: 129 additions & 1 deletion client/allocrunner/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1648,7 +1648,6 @@ func TestAllocRunner_Reconnect(t *testing.T) {

require.Equal(t, tc.clientStatus, ar.AllocState().ClientStatus)


// Make sure the runner's alloc indexes match the update.
require.Equal(t, update.AllocModifyIndex, ar.Alloc().AllocModifyIndex)
require.Equal(t, update.ModifyIndex, ar.Alloc().ModifyIndex)
Expand Down Expand Up @@ -1683,3 +1682,132 @@ func TestAllocRunner_Reconnect(t *testing.T) {
})
}
}

// TestAllocRunner_Lifecycle_Shutdown_Order asserts that a service job with 3
// lifecycle hooks (1 sidecar, 1 ephemeral, 1 poststop) starts all 4 tasks, and shuts down
// the sidecar after main, but before poststop.
func TestAllocRunner_Lifecycle_Shutdown_Order(t *testing.T) {
alloc := mock.LifecycleAllocWithPoststopDeploy()

alloc.Job.Type = structs.JobTypeService

mainTask := alloc.Job.TaskGroups[0].Tasks[0]
mainTask.Config["run_for"] = "100s"

sidecarTask := alloc.Job.TaskGroups[0].Tasks[1]
sidecarTask.Lifecycle.Hook = structs.TaskLifecycleHookPoststart
sidecarTask.Config["run_for"] = "100s"

poststopTask := alloc.Job.TaskGroups[0].Tasks[2]
ephemeralTask := alloc.Job.TaskGroups[0].Tasks[3]

alloc.Job.TaskGroups[0].Tasks = []*structs.Task{mainTask, ephemeralTask, sidecarTask, poststopTask}

conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()
ar, err := NewAllocRunner(conf)
require.NoError(t, err)
defer destroy(ar)
go ar.Run()

upd := conf.StateUpdater.(*MockStateUpdater)

// Wait for main and sidecar tasks to be running, and that the
// ephemeral task ran and exited.
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()
if last == nil {
return false, fmt.Errorf("No updates")
}

if last.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("expected alloc to be running not %s", last.ClientStatus)
}

if s := last.TaskStates[mainTask.Name].State; s != structs.TaskStateRunning {
return false, fmt.Errorf("expected main task to be running not %s", s)
}

if s := last.TaskStates[sidecarTask.Name].State; s != structs.TaskStateRunning {
return false, fmt.Errorf("expected sidecar task to be running not %s", s)
}

if s := last.TaskStates[ephemeralTask.Name].State; s != structs.TaskStateDead {
return false, fmt.Errorf("expected ephemeral task to be dead not %s", s)
}

if last.TaskStates[ephemeralTask.Name].Failed {
return false, fmt.Errorf("expected ephemeral task to be successful not failed")
}

return true, nil
}, func(err error) {
t.Fatalf("error waiting for initial state:\n%v", err)
})

// Tell the alloc to stop
stopAlloc := alloc.Copy()
stopAlloc.DesiredStatus = structs.AllocDesiredStatusStop
ar.Update(stopAlloc)

// Wait for tasks to stop.
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()

if s := last.TaskStates[ephemeralTask.Name].State; s != structs.TaskStateDead {
return false, fmt.Errorf("expected ephemeral task to be dead not %s", s)
}

if last.TaskStates[ephemeralTask.Name].Failed {
return false, fmt.Errorf("expected ephemeral task to be successful not failed")
}

if s := last.TaskStates[mainTask.Name].State; s != structs.TaskStateDead {
return false, fmt.Errorf("expected main task to be dead not %s", s)
}

if last.TaskStates[mainTask.Name].Failed {
return false, fmt.Errorf("expected main task to be successful not failed")
}

if s := last.TaskStates[sidecarTask.Name].State; s != structs.TaskStateDead {
return false, fmt.Errorf("expected sidecar task to be dead not %s", s)
}

if last.TaskStates[sidecarTask.Name].Failed {
return false, fmt.Errorf("expected sidecar task to be successful not failed")
}

if s := last.TaskStates[poststopTask.Name].State; s != structs.TaskStateRunning {
return false, fmt.Errorf("expected poststop task to be running not %s", s)
}

return true, nil
}, func(err error) {
t.Fatalf("error waiting for kill state:\n%v", err)
})

last := upd.Last()
require.Less(t, last.TaskStates[ephemeralTask.Name].FinishedAt, last.TaskStates[mainTask.Name].FinishedAt)
require.Less(t, last.TaskStates[mainTask.Name].FinishedAt, last.TaskStates[sidecarTask.Name].FinishedAt)

// Wait for poststop task to stop.
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()

if s := last.TaskStates[poststopTask.Name].State; s != structs.TaskStateDead {
return false, fmt.Errorf("expected poststop task to be dead not %s", s)
}

if last.TaskStates[poststopTask.Name].Failed {
return false, fmt.Errorf("expected poststop task to be successful not failed")
}

return true, nil
}, func(err error) {
t.Fatalf("error waiting for poststop state:\n%v", err)
})

last = upd.Last()
require.Less(t, last.TaskStates[sidecarTask.Name].FinishedAt, last.TaskStates[poststopTask.Name].FinishedAt)
}
8 changes: 3 additions & 5 deletions client/allocrunner/task_hook_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,20 +179,18 @@ func (c *taskHookCoordinator) StartPoststopTasks() {
// hasNonSidecarTasks returns false if all the passed tasks are sidecar tasks
func hasNonSidecarTasks(tasks []*taskrunner.TaskRunner) bool {
for _, tr := range tasks {
lc := tr.Task().Lifecycle
if lc == nil || !lc.Sidecar {
if !tr.IsSidecarTask() {
return true
}
}

return false
}

// hasSidecarTasks returns true if all the passed tasks are sidecar tasks
// hasSidecarTasks returns true if any of the passed tasks are sidecar tasks
func hasSidecarTasks(tasks map[string]*taskrunner.TaskRunner) bool {
for _, tr := range tasks {
lc := tr.Task().Lifecycle
if lc != nil && lc.Sidecar {
if tr.IsSidecarTask() {
return true
}
}
Expand Down
5 changes: 5 additions & 0 deletions client/allocrunner/taskrunner/task_runner_getters.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ func (tr *TaskRunner) IsPoststopTask() bool {
return tr.Task().Lifecycle != nil && tr.Task().Lifecycle.Hook == structs.TaskLifecycleHookPoststop
}

// IsSidecarTask returns true if this task is a sidecar task in its task group.
func (tr *TaskRunner) IsSidecarTask() bool {
return tr.Task().Lifecycle != nil && tr.Task().Lifecycle.Sidecar
}

func (tr *TaskRunner) Task() *structs.Task {
tr.taskLock.RLock()
defer tr.taskLock.RUnlock()
Expand Down