Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

alloc_runner: stop sidecar tasks last #13055

Merged
merged 6 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 35 additions & 11 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,21 +525,21 @@ func (ar *allocRunner) handleTaskStateUpdates() {
states := make(map[string]*structs.TaskState, trNum)

for name, tr := range ar.tasks {
state := tr.TaskState()
states[name] = state
taskState := tr.TaskState()
states[name] = taskState

if tr.IsPoststopTask() {
continue
}

// Capture live task runners in case we need to kill them
if state.State != structs.TaskStateDead {
if taskState.State != structs.TaskStateDead {
liveRunners = append(liveRunners, tr)
continue
}

// Task is dead, determine if other tasks should be killed
if state.Failed {
if taskState.Failed {
// Only set failed event if no event has been
// set yet to give dead leaders priority.
if killEvent == nil {
Expand Down Expand Up @@ -626,16 +626,40 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
ar.logger.Warn("error stopping leader task", "error", err, "task_name", name)
}

state := tr.TaskState()
states[name] = state
taskState := tr.TaskState()
states[name] = taskState
break
}

// Kill the rest concurrently
// Kill the rest non-sidecar or poststop tasks concurrently
wg := sync.WaitGroup{}
for name, tr := range ar.tasks {
// Filter out poststop tasks so they run after all the other tasks are killed
if tr.IsLeader() || tr.IsPoststopTask() {
// Filter out poststop and sidecar tasks so that they stop after all the other tasks are killed
if tr.IsLeader() || tr.IsPoststopTask() || tr.IsSidecarTask() {
continue
}

wg.Add(1)
go func(name string, tr *taskrunner.TaskRunner) {
defer wg.Done()
taskEvent := structs.NewTaskEvent(structs.TaskKilling)
taskEvent.SetKillTimeout(tr.Task().KillTimeout)
err := tr.Kill(context.TODO(), taskEvent)
if err != nil && err != taskrunner.ErrTaskNotRunning {
ar.logger.Warn("error stopping task", "error", err, "task_name", name)
}

taskState := tr.TaskState()
mu.Lock()
states[name] = taskState
mu.Unlock()
}(name, tr)
}
wg.Wait()

// Kill the sidecar tasks last.
for name, tr := range ar.tasks {
if !tr.IsSidecarTask() || tr.IsLeader() || tr.IsPoststopTask() {
continue
}

Expand All @@ -649,9 +673,9 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
ar.logger.Warn("error stopping task", "error", err, "task_name", name)
}

state := tr.TaskState()
taskState := tr.TaskState()
mu.Lock()
states[name] = state
states[name] = taskState
mu.Unlock()
}(name, tr)
}
Expand Down
8 changes: 3 additions & 5 deletions client/allocrunner/task_hook_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,20 +179,18 @@ func (c *taskHookCoordinator) StartPoststopTasks() {
// hasNonSidecarTasks returns false if all the passed tasks are sidecar tasks
func hasNonSidecarTasks(tasks []*taskrunner.TaskRunner) bool {
for _, tr := range tasks {
lc := tr.Task().Lifecycle
if lc == nil || !lc.Sidecar {
if !tr.IsSidecarTask() {
return true
}
}

return false
}

// hasSidecarTasks returns true if all the passed tasks are sidecar tasks
// hasSidecarTasks returns true if any of the passed tasks are sidecar tasks
func hasSidecarTasks(tasks map[string]*taskrunner.TaskRunner) bool {
for _, tr := range tasks {
lc := tr.Task().Lifecycle
if lc != nil && lc.Sidecar {
if tr.IsSidecarTask() {
return true
}
}
Expand Down
5 changes: 5 additions & 0 deletions client/allocrunner/taskrunner/task_runner_getters.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ func (tr *TaskRunner) IsPoststopTask() bool {
return tr.Task().Lifecycle != nil && tr.Task().Lifecycle.Hook == structs.TaskLifecycleHookPoststop
}

// IsSidecarTask returns true if this task is a sidecar task in its task group.
func (tr *TaskRunner) IsSidecarTask() bool {
return tr.Task().Lifecycle != nil && tr.Task().Lifecycle.Sidecar
}

func (tr *TaskRunner) Task() *structs.Task {
tr.taskLock.RLock()
defer tr.taskLock.RUnlock()
Expand Down