Skip to content

Commit 82c25e4

Browse files
author
Mahmood Ali
authored
Merge pull request #8311 from hashicorp/b-terminate-sidecars-after-main
allocrunner: terminate sidecars in the end
2 parents 42c2ee4 + 73f19eb commit 82c25e4

File tree

5 files changed

+253
-9
lines changed

5 files changed

+253
-9
lines changed

client/allocrunner/alloc_runner.go

+12-8
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,8 @@ func (ar *allocRunner) TaskStateUpdated() {
443443
func (ar *allocRunner) handleTaskStateUpdates() {
444444
defer close(ar.taskStateUpdateHandlerCh)
445445

446+
hasSidecars := hasSidecarTasks(ar.tasks)
447+
446448
for done := false; !done; {
447449
select {
448450
case <-ar.taskStateUpdatedCh:
@@ -462,10 +464,6 @@ func (ar *allocRunner) handleTaskStateUpdates() {
462464
// name whose fault it is.
463465
killTask := ""
464466

465-
// True if task runners should be killed because a leader
466-
// failed (informational).
467-
leaderFailed := false
468-
469467
// Task state has been updated; gather the state of the other tasks
470468
trNum := len(ar.tasks)
471469
liveRunners := make([]*taskrunner.TaskRunner, 0, trNum)
@@ -492,18 +490,24 @@ func (ar *allocRunner) handleTaskStateUpdates() {
492490
}
493491
} else if tr.IsLeader() {
494492
killEvent = structs.NewTaskEvent(structs.TaskLeaderDead)
495-
leaderFailed = true
496-
killTask = name
497493
}
498494
}
499495

496+
// if all live runners are sidecars - kill alloc
497+
if killEvent == nil && hasSidecars && !hasNonSidecarTasks(liveRunners) {
498+
killEvent = structs.NewTaskEvent(structs.TaskMainDead)
499+
}
500+
500501
// If there's a kill event set and live runners, kill them
501502
if killEvent != nil && len(liveRunners) > 0 {
502503

503504
// Log kill reason
504-
if leaderFailed {
505+
switch killEvent.Type {
506+
case structs.TaskLeaderDead:
505507
ar.logger.Debug("leader task dead, destroying all tasks", "leader_task", killTask)
506-
} else {
508+
case structs.TaskMainDead:
509+
ar.logger.Debug("main tasks dead, destroying all sidecar tasks")
510+
default:
507511
ar.logger.Debug("task failure, destroying all tasks", "failed_task", killTask)
508512
}
509513

client/allocrunner/alloc_runner_test.go

+122-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func TestAllocRunner_TaskLeader_KillTG(t *testing.T) {
110110
found := false
111111
killingMsg := ""
112112
for _, e := range state1.Events {
113-
if e.Type != structs.TaskLeaderDead {
113+
if e.Type == structs.TaskLeaderDead {
114114
found = true
115115
}
116116
if e.Type == structs.TaskKilling {
@@ -142,6 +142,127 @@ func TestAllocRunner_TaskLeader_KillTG(t *testing.T) {
142142
})
143143
}
144144

145+
// TestAllocRunner_TaskMain_KillTG asserts that when main tasks die the
146+
// entire task group is killed.
147+
func TestAllocRunner_TaskMain_KillTG(t *testing.T) {
148+
t.Parallel()
149+
150+
alloc := mock.BatchAlloc()
151+
tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name]
152+
alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0
153+
alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy.Attempts = 0
154+
155+
// Create three tasks in the task group
156+
sidecar := alloc.Job.TaskGroups[0].Tasks[0].Copy()
157+
sidecar.Name = "sidecar"
158+
sidecar.Driver = "mock_driver"
159+
sidecar.KillTimeout = 10 * time.Millisecond
160+
sidecar.Lifecycle = &structs.TaskLifecycleConfig{
161+
Hook: structs.TaskLifecycleHookPrestart,
162+
Sidecar: true,
163+
}
164+
165+
sidecar.Config = map[string]interface{}{
166+
"run_for": "100s",
167+
}
168+
169+
main1 := alloc.Job.TaskGroups[0].Tasks[0].Copy()
170+
main1.Name = "task2"
171+
main1.Driver = "mock_driver"
172+
main1.Config = map[string]interface{}{
173+
"run_for": "1s",
174+
}
175+
176+
main2 := alloc.Job.TaskGroups[0].Tasks[0].Copy()
177+
main2.Name = "task2"
178+
main2.Driver = "mock_driver"
179+
main2.Config = map[string]interface{}{
180+
"run_for": "2s",
181+
}
182+
183+
alloc.Job.TaskGroups[0].Tasks = []*structs.Task{sidecar, main1, main2}
184+
alloc.AllocatedResources.Tasks = map[string]*structs.AllocatedTaskResources{
185+
sidecar.Name: tr,
186+
main1.Name: tr,
187+
main2.Name: tr,
188+
}
189+
190+
conf, cleanup := testAllocRunnerConfig(t, alloc)
191+
defer cleanup()
192+
ar, err := NewAllocRunner(conf)
193+
require.NoError(t, err)
194+
defer destroy(ar)
195+
go ar.Run()
196+
197+
hasTaskMainEvent := func(state *structs.TaskState) bool {
198+
for _, e := range state.Events {
199+
if e.Type == structs.TaskMainDead {
200+
return true
201+
}
202+
}
203+
204+
return false
205+
}
206+
207+
// Wait for all tasks to be killed
208+
upd := conf.StateUpdater.(*MockStateUpdater)
209+
testutil.WaitForResult(func() (bool, error) {
210+
last := upd.Last()
211+
if last == nil {
212+
return false, fmt.Errorf("No updates")
213+
}
214+
if last.ClientStatus != structs.AllocClientStatusComplete {
215+
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusComplete)
216+
}
217+
218+
var state *structs.TaskState
219+
220+
// Task1 should be killed because Task2 exited
221+
state = last.TaskStates[sidecar.Name]
222+
if state.State != structs.TaskStateDead {
223+
return false, fmt.Errorf("got state %v; want %v", state.State, structs.TaskStateDead)
224+
}
225+
if state.FinishedAt.IsZero() || state.StartedAt.IsZero() {
226+
return false, fmt.Errorf("expected to have a start and finish time")
227+
}
228+
if len(state.Events) < 2 {
229+
// At least have a received and destroyed
230+
return false, fmt.Errorf("Unexpected number of events")
231+
}
232+
233+
if !hasTaskMainEvent(state) {
234+
return false, fmt.Errorf("Did not find event %v: %#+v", structs.TaskMainDead, state.Events)
235+
}
236+
237+
// main tasks should die naturely
238+
state = last.TaskStates[main1.Name]
239+
if state.State != structs.TaskStateDead {
240+
return false, fmt.Errorf("got state %v; want %v", state.State, structs.TaskStateDead)
241+
}
242+
if state.FinishedAt.IsZero() || state.StartedAt.IsZero() {
243+
return false, fmt.Errorf("expected to have a start and finish time")
244+
}
245+
if hasTaskMainEvent(state) {
246+
return false, fmt.Errorf("unexpected event %#+v in %v", structs.TaskMainDead, state.Events)
247+
}
248+
249+
state = last.TaskStates[main2.Name]
250+
if state.State != structs.TaskStateDead {
251+
return false, fmt.Errorf("got state %v; want %v", state.State, structs.TaskStateDead)
252+
}
253+
if state.FinishedAt.IsZero() || state.StartedAt.IsZero() {
254+
return false, fmt.Errorf("expected to have a start and finish time")
255+
}
256+
if hasTaskMainEvent(state) {
257+
return false, fmt.Errorf("unexpected event %v in %#+v", structs.TaskMainDead, state.Events)
258+
}
259+
260+
return true, nil
261+
}, func(err error) {
262+
t.Fatalf("err: %v", err)
263+
})
264+
}
265+
145266
func TestAllocRunner_TaskGroup_ShutdownDelay(t *testing.T) {
146267
t.Parallel()
147268

client/allocrunner/task_hook_coordinator.go

+25
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55

66
"github.com/hashicorp/go-hclog"
7+
"github.com/hashicorp/nomad/client/allocrunner/taskrunner"
78
"github.com/hashicorp/nomad/nomad/structs"
89
)
910

@@ -108,3 +109,27 @@ func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskSt
108109
c.mainTaskCtxCancel()
109110
}
110111
}
112+
113+
// hasNonSidecarTasks returns false if all the passed tasks are sidecar tasks
114+
func hasNonSidecarTasks(tasks []*taskrunner.TaskRunner) bool {
115+
for _, tr := range tasks {
116+
lc := tr.Task().Lifecycle
117+
if lc == nil || !lc.Sidecar {
118+
return true
119+
}
120+
}
121+
122+
return false
123+
}
124+
125+
// hasSidecarTasks returns true if all the passed tasks are sidecar tasks
126+
func hasSidecarTasks(tasks map[string]*taskrunner.TaskRunner) bool {
127+
for _, tr := range tasks {
128+
lc := tr.Task().Lifecycle
129+
if lc != nil && lc.Sidecar {
130+
return true
131+
}
132+
}
133+
134+
return false
135+
}

client/allocrunner/task_hook_coordinator_test.go

+89
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package allocrunner
22

33
import (
4+
"fmt"
45
"testing"
56
"time"
67

78
"github.com/stretchr/testify/require"
89

10+
"github.com/hashicorp/nomad/client/allocrunner/taskrunner"
911
"github.com/hashicorp/nomad/nomad/structs"
1012

1113
"github.com/hashicorp/nomad/helper/testlog"
@@ -230,3 +232,90 @@ func isChannelClosed(ch <-chan struct{}) bool {
230232
return false
231233
}
232234
}
235+
236+
func TestHasSidecarTasks(t *testing.T) {
237+
238+
falseV, trueV := false, true
239+
240+
cases := []struct {
241+
name string
242+
// nil if main task, false if non-sidecar hook, true if sidecar hook
243+
indicators []*bool
244+
245+
hasSidecars bool
246+
hasNonsidecars bool
247+
}{
248+
{
249+
name: "all sidecar - one",
250+
indicators: []*bool{&trueV},
251+
hasSidecars: true,
252+
hasNonsidecars: false,
253+
},
254+
{
255+
name: "all sidecar - multiple",
256+
indicators: []*bool{&trueV, &trueV, &trueV},
257+
hasSidecars: true,
258+
hasNonsidecars: false,
259+
},
260+
{
261+
name: "some sidecars, some others",
262+
indicators: []*bool{nil, &falseV, &trueV},
263+
hasSidecars: true,
264+
hasNonsidecars: true,
265+
},
266+
{
267+
name: "no sidecars",
268+
indicators: []*bool{nil, &falseV, nil},
269+
hasSidecars: false,
270+
hasNonsidecars: true,
271+
},
272+
}
273+
274+
for _, c := range cases {
275+
t.Run(c.name, func(t *testing.T) {
276+
alloc := allocWithSidecarIndicators(c.indicators)
277+
arConf, cleanup := testAllocRunnerConfig(t, alloc)
278+
defer cleanup()
279+
280+
ar, err := NewAllocRunner(arConf)
281+
require.NoError(t, err)
282+
283+
require.Equal(t, c.hasSidecars, hasSidecarTasks(ar.tasks), "sidecars")
284+
285+
runners := []*taskrunner.TaskRunner{}
286+
for _, r := range ar.tasks {
287+
runners = append(runners, r)
288+
}
289+
require.Equal(t, c.hasNonsidecars, hasNonSidecarTasks(runners), "non-sidecars")
290+
291+
})
292+
}
293+
}
294+
295+
func allocWithSidecarIndicators(indicators []*bool) *structs.Allocation {
296+
alloc := mock.BatchAlloc()
297+
298+
tasks := []*structs.Task{}
299+
resources := map[string]*structs.AllocatedTaskResources{}
300+
301+
tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name]
302+
303+
for i, indicator := range indicators {
304+
task := alloc.Job.TaskGroups[0].Tasks[0].Copy()
305+
task.Name = fmt.Sprintf("task%d", i)
306+
if indicator != nil {
307+
task.Lifecycle = &structs.TaskLifecycleConfig{
308+
Hook: structs.TaskLifecycleHookPrestart,
309+
Sidecar: *indicator,
310+
}
311+
}
312+
tasks = append(tasks, task)
313+
resources[task.Name] = tr
314+
}
315+
316+
alloc.Job.TaskGroups[0].Tasks = tasks
317+
318+
alloc.AllocatedResources.Tasks = resources
319+
return alloc
320+
321+
}

nomad/structs/structs.go

+5
Original file line numberDiff line numberDiff line change
@@ -6996,6 +6996,9 @@ const (
69966996
// TaskLeaderDead indicates that the leader task within the has finished.
69976997
TaskLeaderDead = "Leader Task Dead"
69986998

6999+
// TaskMainDead indicates that the main tasks have dead
7000+
TaskMainDead = "Main Tasks Dead"
7001+
69997002
// TaskHookFailed indicates that one of the hooks for a task failed.
70007003
TaskHookFailed = "Task hook failed"
70017004

@@ -7217,6 +7220,8 @@ func (event *TaskEvent) PopulateEventDisplayMessage() {
72177220
desc = event.DriverMessage
72187221
case TaskLeaderDead:
72197222
desc = "Leader Task in Group dead"
7223+
case TaskMainDead:
7224+
desc = "Main tasks in the group died"
72207225
default:
72217226
desc = event.Message
72227227
}

0 commit comments

Comments
 (0)