Skip to content

Commit 38b27ea

Browse files
committed
Handle leader task being dead in RestoreState
Fixes the panic mentioned in #3420 (comment) While a leader task dying serially stops all follower tasks, the synchronizing of state is asynchrnous. Nomad can shutdown before all follower tasks have updated their state to dead thus saving the state necessary to hit this panic: *have a non-terminal alloc with a dead leader.* The actual fix is a simple nil check to not assume non-terminal allocs leader's have a TaskRunner.
1 parent 6ab0e8f commit 38b27ea

File tree

3 files changed

+104
-5
lines changed

3 files changed

+104
-5
lines changed

client/alloc_runner.go

+12-4
Original file line numberDiff line numberDiff line change
@@ -932,6 +932,7 @@ OUTER:
932932
r.logger.Printf("[WARN] client: failed to sync alloc %q status upon receiving alloc update: %v",
933933
r.allocID, err)
934934
}
935+
935936
case <-r.ctx.Done():
936937
taskDestroyEvent = structs.NewTaskEvent(structs.TaskKilled)
937938
break OUTER
@@ -967,10 +968,17 @@ func (r *AllocRunner) destroyTaskRunners(destroyEvent *structs.TaskEvent) {
967968
tr := r.tasks[leader]
968969
r.taskLock.RUnlock()
969970

970-
r.logger.Printf("[DEBUG] client: alloc %q destroying leader task %q of task group %q first",
971-
r.allocID, leader, r.alloc.TaskGroup)
972-
tr.Destroy(destroyEvent)
973-
<-tr.WaitCh()
971+
// Dead tasks don't have a task runner created so guard against
972+
// the leader being dead when this AR was saved.
973+
if tr == nil {
974+
r.logger.Printf("[DEBUG] client: alloc %q leader task %q of task group %q already stopped",
975+
r.allocID, leader, r.alloc.TaskGroup)
976+
} else {
977+
r.logger.Printf("[DEBUG] client: alloc %q destroying leader task %q of task group %q first",
978+
r.allocID, leader, r.alloc.TaskGroup)
979+
tr.Destroy(destroyEvent)
980+
<-tr.WaitCh()
981+
}
974982
}
975983

976984
// Then destroy non-leader tasks concurrently

client/alloc_runner_test.go

+91
Original file line numberDiff line numberDiff line change
@@ -1365,6 +1365,97 @@ func TestAllocRunner_TaskLeader_StopTG(t *testing.T) {
13651365
})
13661366
}
13671367

1368+
// TestAllocRunner_TaskLeader_StopRestoredTG asserts that when stopping a
1369+
// restored task group with a leader that failed before restoring the leader is
1370+
// not stopped as it does not exist.
1371+
// See https://github.com/hashicorp/nomad/issues/3420#issuecomment-341666932
1372+
func TestAllocRunner_TaskLeader_StopRestoredTG(t *testing.T) {
1373+
t.Parallel()
1374+
_, ar := testAllocRunner(false)
1375+
defer ar.Destroy()
1376+
1377+
// Create a leader and follower task in the task group
1378+
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
1379+
task.Name = "follower1"
1380+
task.Driver = "mock_driver"
1381+
task.KillTimeout = 10 * time.Second
1382+
task.Config = map[string]interface{}{
1383+
"run_for": "10s",
1384+
}
1385+
1386+
task2 := ar.alloc.Job.TaskGroups[0].Tasks[0].Copy()
1387+
task2.Name = "leader"
1388+
task2.Driver = "mock_driver"
1389+
task2.Leader = true
1390+
task2.KillTimeout = 10 * time.Millisecond
1391+
task2.Config = map[string]interface{}{
1392+
"run_for": "0s",
1393+
}
1394+
1395+
ar.alloc.Job.TaskGroups[0].Tasks = append(ar.alloc.Job.TaskGroups[0].Tasks, task2)
1396+
ar.alloc.TaskResources[task2.Name] = task2.Resources
1397+
1398+
// Mimic Nomad exiting before the leader stopping is able to stop other tasks.
1399+
ar.tasks = map[string]*TaskRunner{
1400+
"leader": NewTaskRunner(ar.logger, ar.config, ar.stateDB, ar.setTaskState,
1401+
ar.allocDir.NewTaskDir(task2.Name), ar.Alloc(), task2.Copy(),
1402+
ar.vaultClient, ar.consulClient),
1403+
"follower1": NewTaskRunner(ar.logger, ar.config, ar.stateDB, ar.setTaskState,
1404+
ar.allocDir.NewTaskDir(task.Name), ar.Alloc(), task.Copy(),
1405+
ar.vaultClient, ar.consulClient),
1406+
}
1407+
ar.taskStates = map[string]*structs.TaskState{
1408+
"leader": {State: structs.TaskStateDead},
1409+
"follower1": {State: structs.TaskStateRunning},
1410+
}
1411+
if err := ar.SaveState(); err != nil {
1412+
t.Fatalf("error saving state: %v", err)
1413+
}
1414+
1415+
// Create a new AllocRunner to test RestoreState and Run
1416+
upd2 := &MockAllocStateUpdater{}
1417+
ar2 := NewAllocRunner(ar.logger, ar.config, ar.stateDB, upd2.Update, ar.alloc,
1418+
ar.vaultClient, ar.consulClient, ar.prevAlloc)
1419+
defer ar2.Destroy()
1420+
1421+
if err := ar2.RestoreState(); err != nil {
1422+
t.Fatalf("error restoring state: %v", err)
1423+
}
1424+
go ar2.Run()
1425+
1426+
// Wait for tasks to be stopped because leader is dead
1427+
testutil.WaitForResult(func() (bool, error) {
1428+
_, last := upd2.Last()
1429+
if last == nil {
1430+
return false, fmt.Errorf("no updates yet")
1431+
}
1432+
if actual := last.TaskStates["leader"].State; actual != structs.TaskStateDead {
1433+
return false, fmt.Errorf("Task leader is not dead yet (it's %q)", actual)
1434+
}
1435+
if actual := last.TaskStates["follower1"].State; actual != structs.TaskStateDead {
1436+
return false, fmt.Errorf("Task follower1 is not dead yet (it's %q)", actual)
1437+
}
1438+
return true, nil
1439+
}, func(err error) {
1440+
count, last := upd2.Last()
1441+
t.Logf("Updates: %d", count)
1442+
for name, state := range last.TaskStates {
1443+
t.Logf("%s: %s", name, state.State)
1444+
}
1445+
t.Fatalf("err: %v", err)
1446+
})
1447+
1448+
// Make sure it GCs properly
1449+
ar2.Destroy()
1450+
1451+
select {
1452+
case <-ar2.WaitCh():
1453+
// exited as expected
1454+
case <-time.After(10 * time.Second):
1455+
t.Fatalf("timed out waiting for AR to GC")
1456+
}
1457+
}
1458+
13681459
// TestAllocRunner_MoveAllocDir asserts that a file written to an alloc's
13691460
// local/ dir will be moved to a replacement alloc's local/ dir if sticky
13701461
// volumes is on.

client/task_runner.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
237237
// Build the restart tracker.
238238
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
239239
if tg == nil {
240-
logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup)
240+
logger.Printf("[ERR] client: alloc %q for missing task group %q", alloc.ID, alloc.TaskGroup)
241241
return nil
242242
}
243243
restartTracker := newRestartTracker(tg.RestartPolicy, alloc.Job.Type)

0 commit comments

Comments
 (0)