Skip to content

Commit ec3b715

Browse files
alloc_runner: stop sidecar tasks last (#13055)
alloc_runner: stop sidecar tasks last
1 parent 78553f9 commit ec3b715

File tree

5 files changed

+175
-16
lines changed

5 files changed

+175
-16
lines changed

.changelog/13055.txt

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:bug
2+
lifecycle: fixed a bug where sidecar tasks were not being stopped last
3+
```

client/allocrunner/alloc_runner.go

+35-11
Original file line numberDiff line numberDiff line change
@@ -525,21 +525,21 @@ func (ar *allocRunner) handleTaskStateUpdates() {
525525
states := make(map[string]*structs.TaskState, trNum)
526526

527527
for name, tr := range ar.tasks {
528-
state := tr.TaskState()
529-
states[name] = state
528+
taskState := tr.TaskState()
529+
states[name] = taskState
530530

531531
if tr.IsPoststopTask() {
532532
continue
533533
}
534534

535535
// Capture live task runners in case we need to kill them
536-
if state.State != structs.TaskStateDead {
536+
if taskState.State != structs.TaskStateDead {
537537
liveRunners = append(liveRunners, tr)
538538
continue
539539
}
540540

541541
// Task is dead, determine if other tasks should be killed
542-
if state.Failed {
542+
if taskState.Failed {
543543
// Only set failed event if no event has been
544544
// set yet to give dead leaders priority.
545545
if killEvent == nil {
@@ -626,16 +626,16 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
626626
ar.logger.Warn("error stopping leader task", "error", err, "task_name", name)
627627
}
628628

629-
state := tr.TaskState()
630-
states[name] = state
629+
taskState := tr.TaskState()
630+
states[name] = taskState
631631
break
632632
}
633633

634-
// Kill the rest concurrently
634+
// Kill the rest non-sidecar or poststop tasks concurrently
635635
wg := sync.WaitGroup{}
636636
for name, tr := range ar.tasks {
637-
// Filter out poststop tasks so they run after all the other tasks are killed
638-
if tr.IsLeader() || tr.IsPoststopTask() {
637+
// Filter out poststop and sidecar tasks so that they stop after all the other tasks are killed
638+
if tr.IsLeader() || tr.IsPoststopTask() || tr.IsSidecarTask() {
639639
continue
640640
}
641641

@@ -649,9 +649,33 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
649649
ar.logger.Warn("error stopping task", "error", err, "task_name", name)
650650
}
651651

652-
state := tr.TaskState()
652+
taskState := tr.TaskState()
653+
mu.Lock()
654+
states[name] = taskState
655+
mu.Unlock()
656+
}(name, tr)
657+
}
658+
wg.Wait()
659+
660+
// Kill the sidecar tasks last.
661+
for name, tr := range ar.tasks {
662+
if !tr.IsSidecarTask() || tr.IsLeader() || tr.IsPoststopTask() {
663+
continue
664+
}
665+
666+
wg.Add(1)
667+
go func(name string, tr *taskrunner.TaskRunner) {
668+
defer wg.Done()
669+
taskEvent := structs.NewTaskEvent(structs.TaskKilling)
670+
taskEvent.SetKillTimeout(tr.Task().KillTimeout)
671+
err := tr.Kill(context.TODO(), taskEvent)
672+
if err != nil && err != taskrunner.ErrTaskNotRunning {
673+
ar.logger.Warn("error stopping sidecar task", "error", err, "task_name", name)
674+
}
675+
676+
taskState := tr.TaskState()
653677
mu.Lock()
654-
states[name] = state
678+
states[name] = taskState
655679
mu.Unlock()
656680
}(name, tr)
657681
}

client/allocrunner/alloc_runner_test.go

+129
Original file line numberDiff line numberDiff line change
@@ -1682,3 +1682,132 @@ func TestAllocRunner_Reconnect(t *testing.T) {
16821682
})
16831683
}
16841684
}
1685+
1686+
// TestAllocRunner_Lifecycle_Shutdown_Order asserts that a service job with 3
1687+
// lifecycle hooks (1 sidecar, 1 ephemeral, 1 poststop) starts all 4 tasks, and shuts down
1688+
// the sidecar after main, but before poststop.
1689+
func TestAllocRunner_Lifecycle_Shutdown_Order(t *testing.T) {
1690+
alloc := mock.LifecycleAllocWithPoststopDeploy()
1691+
1692+
alloc.Job.Type = structs.JobTypeService
1693+
1694+
mainTask := alloc.Job.TaskGroups[0].Tasks[0]
1695+
mainTask.Config["run_for"] = "100s"
1696+
1697+
sidecarTask := alloc.Job.TaskGroups[0].Tasks[1]
1698+
sidecarTask.Lifecycle.Hook = structs.TaskLifecycleHookPoststart
1699+
sidecarTask.Config["run_for"] = "100s"
1700+
1701+
poststopTask := alloc.Job.TaskGroups[0].Tasks[2]
1702+
ephemeralTask := alloc.Job.TaskGroups[0].Tasks[3]
1703+
1704+
alloc.Job.TaskGroups[0].Tasks = []*structs.Task{mainTask, ephemeralTask, sidecarTask, poststopTask}
1705+
1706+
conf, cleanup := testAllocRunnerConfig(t, alloc)
1707+
defer cleanup()
1708+
ar, err := NewAllocRunner(conf)
1709+
require.NoError(t, err)
1710+
defer destroy(ar)
1711+
go ar.Run()
1712+
1713+
upd := conf.StateUpdater.(*MockStateUpdater)
1714+
1715+
// Wait for main and sidecar tasks to be running, and that the
1716+
// ephemeral task ran and exited.
1717+
testutil.WaitForResult(func() (bool, error) {
1718+
last := upd.Last()
1719+
if last == nil {
1720+
return false, fmt.Errorf("No updates")
1721+
}
1722+
1723+
if last.ClientStatus != structs.AllocClientStatusRunning {
1724+
return false, fmt.Errorf("expected alloc to be running not %s", last.ClientStatus)
1725+
}
1726+
1727+
if s := last.TaskStates[mainTask.Name].State; s != structs.TaskStateRunning {
1728+
return false, fmt.Errorf("expected main task to be running not %s", s)
1729+
}
1730+
1731+
if s := last.TaskStates[sidecarTask.Name].State; s != structs.TaskStateRunning {
1732+
return false, fmt.Errorf("expected sidecar task to be running not %s", s)
1733+
}
1734+
1735+
if s := last.TaskStates[ephemeralTask.Name].State; s != structs.TaskStateDead {
1736+
return false, fmt.Errorf("expected ephemeral task to be dead not %s", s)
1737+
}
1738+
1739+
if last.TaskStates[ephemeralTask.Name].Failed {
1740+
return false, fmt.Errorf("expected ephemeral task to be successful not failed")
1741+
}
1742+
1743+
return true, nil
1744+
}, func(err error) {
1745+
t.Fatalf("error waiting for initial state:\n%v", err)
1746+
})
1747+
1748+
// Tell the alloc to stop
1749+
stopAlloc := alloc.Copy()
1750+
stopAlloc.DesiredStatus = structs.AllocDesiredStatusStop
1751+
ar.Update(stopAlloc)
1752+
1753+
// Wait for tasks to stop.
1754+
testutil.WaitForResult(func() (bool, error) {
1755+
last := upd.Last()
1756+
1757+
if s := last.TaskStates[ephemeralTask.Name].State; s != structs.TaskStateDead {
1758+
return false, fmt.Errorf("expected ephemeral task to be dead not %s", s)
1759+
}
1760+
1761+
if last.TaskStates[ephemeralTask.Name].Failed {
1762+
return false, fmt.Errorf("expected ephemeral task to be successful not failed")
1763+
}
1764+
1765+
if s := last.TaskStates[mainTask.Name].State; s != structs.TaskStateDead {
1766+
return false, fmt.Errorf("expected main task to be dead not %s", s)
1767+
}
1768+
1769+
if last.TaskStates[mainTask.Name].Failed {
1770+
return false, fmt.Errorf("expected main task to be successful not failed")
1771+
}
1772+
1773+
if s := last.TaskStates[sidecarTask.Name].State; s != structs.TaskStateDead {
1774+
return false, fmt.Errorf("expected sidecar task to be dead not %s", s)
1775+
}
1776+
1777+
if last.TaskStates[sidecarTask.Name].Failed {
1778+
return false, fmt.Errorf("expected sidecar task to be successful not failed")
1779+
}
1780+
1781+
if s := last.TaskStates[poststopTask.Name].State; s != structs.TaskStateRunning {
1782+
return false, fmt.Errorf("expected poststop task to be running not %s", s)
1783+
}
1784+
1785+
return true, nil
1786+
}, func(err error) {
1787+
t.Fatalf("error waiting for kill state:\n%v", err)
1788+
})
1789+
1790+
last := upd.Last()
1791+
require.Less(t, last.TaskStates[ephemeralTask.Name].FinishedAt, last.TaskStates[mainTask.Name].FinishedAt)
1792+
require.Less(t, last.TaskStates[mainTask.Name].FinishedAt, last.TaskStates[sidecarTask.Name].FinishedAt)
1793+
1794+
// Wait for poststop task to stop.
1795+
testutil.WaitForResult(func() (bool, error) {
1796+
last := upd.Last()
1797+
1798+
if s := last.TaskStates[poststopTask.Name].State; s != structs.TaskStateDead {
1799+
return false, fmt.Errorf("expected poststop task to be dead not %s", s)
1800+
}
1801+
1802+
if last.TaskStates[poststopTask.Name].Failed {
1803+
return false, fmt.Errorf("expected poststop task to be successful not failed")
1804+
}
1805+
1806+
return true, nil
1807+
}, func(err error) {
1808+
t.Fatalf("error waiting for poststop state:\n%v", err)
1809+
})
1810+
1811+
last = upd.Last()
1812+
require.Less(t, last.TaskStates[sidecarTask.Name].FinishedAt, last.TaskStates[poststopTask.Name].FinishedAt)
1813+
}

client/allocrunner/task_hook_coordinator.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -179,20 +179,18 @@ func (c *taskHookCoordinator) StartPoststopTasks() {
179179
// hasNonSidecarTasks returns false if all the passed tasks are sidecar tasks
180180
func hasNonSidecarTasks(tasks []*taskrunner.TaskRunner) bool {
181181
for _, tr := range tasks {
182-
lc := tr.Task().Lifecycle
183-
if lc == nil || !lc.Sidecar {
182+
if !tr.IsSidecarTask() {
184183
return true
185184
}
186185
}
187186

188187
return false
189188
}
190189

191-
// hasSidecarTasks returns true if all the passed tasks are sidecar tasks
190+
// hasSidecarTasks returns true if any of the passed tasks are sidecar tasks
192191
func hasSidecarTasks(tasks map[string]*taskrunner.TaskRunner) bool {
193192
for _, tr := range tasks {
194-
lc := tr.Task().Lifecycle
195-
if lc != nil && lc.Sidecar {
193+
if tr.IsSidecarTask() {
196194
return true
197195
}
198196
}

client/allocrunner/taskrunner/task_runner_getters.go

+5
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ func (tr *TaskRunner) IsPoststopTask() bool {
3333
return tr.Task().Lifecycle != nil && tr.Task().Lifecycle.Hook == structs.TaskLifecycleHookPoststop
3434
}
3535

36+
// IsSidecarTask returns true if this task is a sidecar task in its task group.
37+
func (tr *TaskRunner) IsSidecarTask() bool {
38+
return tr.Task().Lifecycle != nil && tr.Task().Lifecycle.Sidecar
39+
}
40+
3641
func (tr *TaskRunner) Task() *structs.Task {
3742
tr.taskLock.RLock()
3843
defer tr.taskLock.RUnlock()

0 commit comments

Comments
 (0)