Skip to content

Commit f5ffed0

Browse files
resolve merge conflict
1 parent aeab919 commit f5ffed0

File tree

5 files changed

+281
-16
lines changed

5 files changed

+281
-16
lines changed

.changelog/13055.txt

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:bug
2+
lifecycle: fixed a bug where sidecar tasks were not being stopped last
3+
```

client/allocrunner/alloc_runner.go

+35-11
Original file line numberDiff line numberDiff line change
@@ -518,21 +518,21 @@ func (ar *allocRunner) handleTaskStateUpdates() {
518518
states := make(map[string]*structs.TaskState, trNum)
519519

520520
for name, tr := range ar.tasks {
521-
state := tr.TaskState()
522-
states[name] = state
521+
taskState := tr.TaskState()
522+
states[name] = taskState
523523

524524
if tr.IsPoststopTask() {
525525
continue
526526
}
527527

528528
// Capture live task runners in case we need to kill them
529-
if state.State != structs.TaskStateDead {
529+
if taskState.State != structs.TaskStateDead {
530530
liveRunners = append(liveRunners, tr)
531531
continue
532532
}
533533

534534
// Task is dead, determine if other tasks should be killed
535-
if state.Failed {
535+
if taskState.Failed {
536536
// Only set failed event if no event has been
537537
// set yet to give dead leaders priority.
538538
if killEvent == nil {
@@ -619,16 +619,16 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
619619
ar.logger.Warn("error stopping leader task", "error", err, "task_name", name)
620620
}
621621

622-
state := tr.TaskState()
623-
states[name] = state
622+
taskState := tr.TaskState()
623+
states[name] = taskState
624624
break
625625
}
626626

627-
// Kill the rest concurrently
627+
// Kill the rest non-sidecar or poststop tasks concurrently
628628
wg := sync.WaitGroup{}
629629
for name, tr := range ar.tasks {
630-
// Filter out poststop tasks so they run after all the other tasks are killed
631-
if tr.IsLeader() || tr.IsPoststopTask() {
630+
// Filter out poststop and sidecar tasks so that they stop after all the other tasks are killed
631+
if tr.IsLeader() || tr.IsPoststopTask() || tr.IsSidecarTask() {
632632
continue
633633
}
634634

@@ -642,9 +642,33 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
642642
ar.logger.Warn("error stopping task", "error", err, "task_name", name)
643643
}
644644

645-
state := tr.TaskState()
645+
taskState := tr.TaskState()
646+
mu.Lock()
647+
states[name] = taskState
648+
mu.Unlock()
649+
}(name, tr)
650+
}
651+
wg.Wait()
652+
653+
// Kill the sidecar tasks last.
654+
for name, tr := range ar.tasks {
655+
if !tr.IsSidecarTask() || tr.IsLeader() || tr.IsPoststopTask() {
656+
continue
657+
}
658+
659+
wg.Add(1)
660+
go func(name string, tr *taskrunner.TaskRunner) {
661+
defer wg.Done()
662+
taskEvent := structs.NewTaskEvent(structs.TaskKilling)
663+
taskEvent.SetKillTimeout(tr.Task().KillTimeout)
664+
err := tr.Kill(context.TODO(), taskEvent)
665+
if err != nil && err != taskrunner.ErrTaskNotRunning {
666+
ar.logger.Warn("error stopping sidecar task", "error", err, "task_name", name)
667+
}
668+
669+
taskState := tr.TaskState()
646670
mu.Lock()
647-
states[name] = state
671+
states[name] = taskState
648672
mu.Unlock()
649673
}(name, tr)
650674
}

client/allocrunner/alloc_runner_test.go

+235
Original file line numberDiff line numberDiff line change
@@ -1573,3 +1573,238 @@ func TestAllocRunner_PersistState_Destroyed(t *testing.T) {
15731573
require.NoError(t, err)
15741574
require.Nil(t, ts)
15751575
}
1576+
1577+
func TestAllocRunner_Reconnect(t *testing.T) {
1578+
t.Parallel()
1579+
1580+
type tcase struct {
1581+
clientStatus string
1582+
taskState string
1583+
taskEvent *structs.TaskEvent
1584+
}
1585+
tcases := []tcase{
1586+
{
1587+
structs.AllocClientStatusRunning,
1588+
structs.TaskStateRunning,
1589+
structs.NewTaskEvent(structs.TaskStarted),
1590+
},
1591+
{
1592+
structs.AllocClientStatusComplete,
1593+
structs.TaskStateDead,
1594+
structs.NewTaskEvent(structs.TaskTerminated),
1595+
},
1596+
{
1597+
structs.AllocClientStatusFailed,
1598+
structs.TaskStateDead,
1599+
structs.NewTaskEvent(structs.TaskDriverFailure).SetFailsTask(),
1600+
},
1601+
{
1602+
structs.AllocClientStatusPending,
1603+
structs.TaskStatePending,
1604+
structs.NewTaskEvent(structs.TaskReceived),
1605+
},
1606+
}
1607+
1608+
for _, tc := range tcases {
1609+
t.Run(tc.clientStatus, func(t *testing.T) {
1610+
// create a running alloc
1611+
alloc := mock.BatchAlloc()
1612+
alloc.AllocModifyIndex = 10
1613+
alloc.ModifyIndex = 10
1614+
alloc.ModifyTime = time.Now().UnixNano()
1615+
1616+
// Ensure task takes some time
1617+
task := alloc.Job.TaskGroups[0].Tasks[0]
1618+
task.Driver = "mock_driver"
1619+
task.Config["run_for"] = "30s"
1620+
1621+
original := alloc.Copy()
1622+
1623+
conf, cleanup := testAllocRunnerConfig(t, alloc)
1624+
defer cleanup()
1625+
1626+
ar, err := NewAllocRunner(conf)
1627+
require.NoError(t, err)
1628+
defer destroy(ar)
1629+
1630+
go ar.Run()
1631+
1632+
for _, taskRunner := range ar.tasks {
1633+
taskRunner.UpdateState(tc.taskState, tc.taskEvent)
1634+
}
1635+
1636+
update := ar.Alloc().Copy()
1637+
1638+
update.ClientStatus = structs.AllocClientStatusUnknown
1639+
update.AllocModifyIndex = original.AllocModifyIndex + 10
1640+
update.ModifyIndex = original.ModifyIndex + 10
1641+
update.ModifyTime = original.ModifyTime + 10
1642+
1643+
err = ar.Reconnect(update)
1644+
require.NoError(t, err)
1645+
1646+
require.Equal(t, tc.clientStatus, ar.AllocState().ClientStatus)
1647+
1648+
// Make sure the runner's alloc indexes match the update.
1649+
require.Equal(t, update.AllocModifyIndex, ar.Alloc().AllocModifyIndex)
1650+
require.Equal(t, update.ModifyIndex, ar.Alloc().ModifyIndex)
1651+
require.Equal(t, update.ModifyTime, ar.Alloc().ModifyTime)
1652+
1653+
found := false
1654+
1655+
updater := conf.StateUpdater.(*MockStateUpdater)
1656+
var last *structs.Allocation
1657+
testutil.WaitForResult(func() (bool, error) {
1658+
last = updater.Last()
1659+
if last == nil {
1660+
return false, errors.New("last update nil")
1661+
}
1662+
1663+
states := last.TaskStates
1664+
for _, s := range states {
1665+
for _, e := range s.Events {
1666+
if e.Type == structs.TaskClientReconnected {
1667+
found = true
1668+
return true, nil
1669+
}
1670+
}
1671+
}
1672+
1673+
return false, errors.New("no reconnect event found")
1674+
}, func(err error) {
1675+
require.NoError(t, err)
1676+
})
1677+
1678+
require.True(t, found, "no reconnect event found")
1679+
})
1680+
}
1681+
}
1682+
1683+
// TestAllocRunner_Lifecycle_Shutdown_Order asserts that a service job with 3
1684+
// lifecycle hooks (1 sidecar, 1 ephemeral, 1 poststop) starts all 4 tasks, and shuts down
1685+
// the sidecar after main, but before poststop.
1686+
func TestAllocRunner_Lifecycle_Shutdown_Order(t *testing.T) {
1687+
alloc := mock.LifecycleAllocWithPoststopDeploy()
1688+
1689+
alloc.Job.Type = structs.JobTypeService
1690+
1691+
mainTask := alloc.Job.TaskGroups[0].Tasks[0]
1692+
mainTask.Config["run_for"] = "100s"
1693+
1694+
sidecarTask := alloc.Job.TaskGroups[0].Tasks[1]
1695+
sidecarTask.Lifecycle.Hook = structs.TaskLifecycleHookPoststart
1696+
sidecarTask.Config["run_for"] = "100s"
1697+
1698+
poststopTask := alloc.Job.TaskGroups[0].Tasks[2]
1699+
ephemeralTask := alloc.Job.TaskGroups[0].Tasks[3]
1700+
1701+
alloc.Job.TaskGroups[0].Tasks = []*structs.Task{mainTask, ephemeralTask, sidecarTask, poststopTask}
1702+
1703+
conf, cleanup := testAllocRunnerConfig(t, alloc)
1704+
defer cleanup()
1705+
ar, err := NewAllocRunner(conf)
1706+
require.NoError(t, err)
1707+
defer destroy(ar)
1708+
go ar.Run()
1709+
1710+
upd := conf.StateUpdater.(*MockStateUpdater)
1711+
1712+
// Wait for main and sidecar tasks to be running, and that the
1713+
// ephemeral task ran and exited.
1714+
testutil.WaitForResult(func() (bool, error) {
1715+
last := upd.Last()
1716+
if last == nil {
1717+
return false, fmt.Errorf("No updates")
1718+
}
1719+
1720+
if last.ClientStatus != structs.AllocClientStatusRunning {
1721+
return false, fmt.Errorf("expected alloc to be running not %s", last.ClientStatus)
1722+
}
1723+
1724+
if s := last.TaskStates[mainTask.Name].State; s != structs.TaskStateRunning {
1725+
return false, fmt.Errorf("expected main task to be running not %s", s)
1726+
}
1727+
1728+
if s := last.TaskStates[sidecarTask.Name].State; s != structs.TaskStateRunning {
1729+
return false, fmt.Errorf("expected sidecar task to be running not %s", s)
1730+
}
1731+
1732+
if s := last.TaskStates[ephemeralTask.Name].State; s != structs.TaskStateDead {
1733+
return false, fmt.Errorf("expected ephemeral task to be dead not %s", s)
1734+
}
1735+
1736+
if last.TaskStates[ephemeralTask.Name].Failed {
1737+
return false, fmt.Errorf("expected ephemeral task to be successful not failed")
1738+
}
1739+
1740+
return true, nil
1741+
}, func(err error) {
1742+
t.Fatalf("error waiting for initial state:\n%v", err)
1743+
})
1744+
1745+
// Tell the alloc to stop
1746+
stopAlloc := alloc.Copy()
1747+
stopAlloc.DesiredStatus = structs.AllocDesiredStatusStop
1748+
ar.Update(stopAlloc)
1749+
1750+
// Wait for tasks to stop.
1751+
testutil.WaitForResult(func() (bool, error) {
1752+
last := upd.Last()
1753+
1754+
if s := last.TaskStates[ephemeralTask.Name].State; s != structs.TaskStateDead {
1755+
return false, fmt.Errorf("expected ephemeral task to be dead not %s", s)
1756+
}
1757+
1758+
if last.TaskStates[ephemeralTask.Name].Failed {
1759+
return false, fmt.Errorf("expected ephemeral task to be successful not failed")
1760+
}
1761+
1762+
if s := last.TaskStates[mainTask.Name].State; s != structs.TaskStateDead {
1763+
return false, fmt.Errorf("expected main task to be dead not %s", s)
1764+
}
1765+
1766+
if last.TaskStates[mainTask.Name].Failed {
1767+
return false, fmt.Errorf("expected main task to be successful not failed")
1768+
}
1769+
1770+
if s := last.TaskStates[sidecarTask.Name].State; s != structs.TaskStateDead {
1771+
return false, fmt.Errorf("expected sidecar task to be dead not %s", s)
1772+
}
1773+
1774+
if last.TaskStates[sidecarTask.Name].Failed {
1775+
return false, fmt.Errorf("expected sidecar task to be successful not failed")
1776+
}
1777+
1778+
if s := last.TaskStates[poststopTask.Name].State; s != structs.TaskStateRunning {
1779+
return false, fmt.Errorf("expected poststop task to be running not %s", s)
1780+
}
1781+
1782+
return true, nil
1783+
}, func(err error) {
1784+
t.Fatalf("error waiting for kill state:\n%v", err)
1785+
})
1786+
1787+
last := upd.Last()
1788+
require.Less(t, last.TaskStates[ephemeralTask.Name].FinishedAt, last.TaskStates[mainTask.Name].FinishedAt)
1789+
require.Less(t, last.TaskStates[mainTask.Name].FinishedAt, last.TaskStates[sidecarTask.Name].FinishedAt)
1790+
1791+
// Wait for poststop task to stop.
1792+
testutil.WaitForResult(func() (bool, error) {
1793+
last := upd.Last()
1794+
1795+
if s := last.TaskStates[poststopTask.Name].State; s != structs.TaskStateDead {
1796+
return false, fmt.Errorf("expected poststop task to be dead not %s", s)
1797+
}
1798+
1799+
if last.TaskStates[poststopTask.Name].Failed {
1800+
return false, fmt.Errorf("expected poststop task to be successful not failed")
1801+
}
1802+
1803+
return true, nil
1804+
}, func(err error) {
1805+
t.Fatalf("error waiting for poststop state:\n%v", err)
1806+
})
1807+
1808+
last = upd.Last()
1809+
require.Less(t, last.TaskStates[sidecarTask.Name].FinishedAt, last.TaskStates[poststopTask.Name].FinishedAt)
1810+
}

client/allocrunner/task_hook_coordinator.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -179,20 +179,18 @@ func (c *taskHookCoordinator) StartPoststopTasks() {
179179
// hasNonSidecarTasks returns false if all the passed tasks are sidecar tasks
180180
func hasNonSidecarTasks(tasks []*taskrunner.TaskRunner) bool {
181181
for _, tr := range tasks {
182-
lc := tr.Task().Lifecycle
183-
if lc == nil || !lc.Sidecar {
182+
if !tr.IsSidecarTask() {
184183
return true
185184
}
186185
}
187186

188187
return false
189188
}
190189

191-
// hasSidecarTasks returns true if all the passed tasks are sidecar tasks
190+
// hasSidecarTasks returns true if any of the passed tasks are sidecar tasks
192191
func hasSidecarTasks(tasks map[string]*taskrunner.TaskRunner) bool {
193192
for _, tr := range tasks {
194-
lc := tr.Task().Lifecycle
195-
if lc != nil && lc.Sidecar {
193+
if tr.IsSidecarTask() {
196194
return true
197195
}
198196
}

client/allocrunner/taskrunner/task_runner_getters.go

+5
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ func (tr *TaskRunner) IsPoststopTask() bool {
3333
return tr.Task().Lifecycle != nil && tr.Task().Lifecycle.Hook == structs.TaskLifecycleHookPoststop
3434
}
3535

36+
// IsSidecarTask returns true if this task is a sidecar task in its task group.
37+
func (tr *TaskRunner) IsSidecarTask() bool {
38+
return tr.Task().Lifecycle != nil && tr.Task().Lifecycle.Sidecar
39+
}
40+
3641
func (tr *TaskRunner) Task() *structs.Task {
3742
tr.taskLock.RLock()
3843
defer tr.taskLock.RUnlock()

0 commit comments

Comments
 (0)