Skip to content

Commit a5857e4

Browse files
authored
Merge pull request #1256 from hashicorp/b-node-gc
Improve partial garbage collection of allocations
2 parents e8781fc + 7bca84d commit a5857e4

File tree

2 files changed

+227
-35
lines changed

2 files changed

+227
-35
lines changed

nomad/core_sched.go

+48-35
Original file line numberDiff line numberDiff line change
@@ -105,20 +105,26 @@ OUTER:
105105
continue
106106
}
107107

108+
allEvalsGC := true
108109
for _, eval := range evals {
109110
gc, allocs, err := c.gcEval(eval, oldThreshold)
110-
if err != nil || !gc {
111-
// We skip the job because it is not finished if it has
112-
// non-terminal allocations.
111+
if err != nil {
113112
continue OUTER
114113
}
115114

116-
gcEval = append(gcEval, eval.ID)
115+
// Update whether all evals GC'd so we know whether to GC the job.
116+
allEvalsGC = allEvalsGC && gc
117+
118+
if gc {
119+
gcEval = append(gcEval, eval.ID)
120+
}
117121
gcAlloc = append(gcAlloc, allocs...)
118122
}
119123

120124
// Job is eligible for garbage collection
121-
gcJob = append(gcJob, job.ID)
125+
if allEvalsGC {
126+
gcJob = append(gcJob, job.ID)
127+
}
122128
}
123129

124130
// Fast-path the nothing case
@@ -186,28 +192,10 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
186192
return err
187193
}
188194

189-
// If the eval is from a running "batch" job we don't want to garbage
190-
// collect its allocations. If there is a long running batch job and its
191-
// terminal allocations get GC'd the scheduler would re-run the
192-
// allocations.
193-
if eval.Type == structs.JobTypeBatch {
194-
// Check if the job is running
195-
job, err := c.snap.JobByID(eval.JobID)
196-
if err != nil {
197-
return err
198-
}
199-
200-
// If the job has been deregistered, we want to garbage collect the
201-
// allocations and evaluations.
202-
if job != nil && len(allocs) != 0 {
203-
continue
204-
}
205-
}
206-
207195
if gc {
208196
gcEval = append(gcEval, eval.ID)
209-
gcAlloc = append(gcAlloc, allocs...)
210197
}
198+
gcAlloc = append(gcAlloc, allocs...)
211199
}
212200

213201
// Fast-path the nothing case
@@ -232,6 +220,24 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64)
232220
return false, nil, nil
233221
}
234222

223+
// If the eval is from a running "batch" job we don't want to garbage
224+
// collect its allocations. If there is a long running batch job and its
225+
// terminal allocations get GC'd the scheduler would re-run the
226+
// allocations.
227+
if eval.Type == structs.JobTypeBatch {
228+
// Check if the job is running
229+
job, err := c.snap.JobByID(eval.JobID)
230+
if err != nil {
231+
return false, nil, err
232+
}
233+
234+
// If the job has been deregistered, we want to garbage collect the
235+
// allocations and evaluations.
236+
if job != nil {
237+
return false, nil, nil
238+
}
239+
}
240+
235241
// Get the allocations by eval
236242
allocs, err := c.snap.AllocsByEval(eval.ID)
237243
if err != nil {
@@ -241,19 +247,20 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64)
241247
}
242248

243249
// Scan the allocations to ensure they are terminal and old
250+
gcEval := true
251+
var gcAllocIDs []string
244252
for _, alloc := range allocs {
245253
if !alloc.TerminalStatus() || alloc.ModifyIndex > thresholdIndex {
246-
return false, nil, nil
254+
// Can't GC the evaluation since not all of the allocations are
255+
// terminal
256+
gcEval = false
257+
} else {
258+
// The allocation is eligible to be GC'd
259+
gcAllocIDs = append(gcAllocIDs, alloc.ID)
247260
}
248261
}
249262

250-
allocIds := make([]string, len(allocs))
251-
for i, alloc := range allocs {
252-
allocIds[i] = alloc.ID
253-
}
254-
255-
// Evaluation is eligible for garbage collection
256-
return true, allocIds, nil
263+
return gcEval, gcAllocIDs, nil
257264
}
258265

259266
// evalReap contacts the leader and issues a reap on the passed evals and
@@ -343,6 +350,7 @@ func (c *CoreScheduler) nodeGC(eval *structs.Evaluation) error {
343350

344351
// Collect the nodes to GC
345352
var gcNode []string
353+
OUTER:
346354
for {
347355
raw := iter.Next()
348356
if raw == nil {
@@ -363,9 +371,14 @@ func (c *CoreScheduler) nodeGC(eval *structs.Evaluation) error {
363371
continue
364372
}
365373

366-
// If there are any allocations, skip the node
367-
if len(allocs) > 0 {
368-
continue
374+
// If there are any non-terminal allocations, skip the node. If the node
375+
// is terminal and the allocations are not, the scheduler may not have
376+
// run yet to transistion the allocs on the node to terminal. We delay
377+
// GC'ing until this happens.
378+
for _, alloc := range allocs {
379+
if !alloc.TerminalStatus() {
380+
continue OUTER
381+
}
369382
}
370383

371384
// Node is eligible for garbage collection

nomad/core_sched_test.go

+179
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,83 @@ func TestCoreScheduler_EvalGC(t *testing.T) {
6969
}
7070
}
7171

72+
func TestCoreScheduler_EvalGC_Partial(t *testing.T) {
73+
s1 := testServer(t, nil)
74+
defer s1.Shutdown()
75+
testutil.WaitForLeader(t, s1.RPC)
76+
77+
// Insert "dead" eval
78+
state := s1.fsm.State()
79+
eval := mock.Eval()
80+
eval.Status = structs.EvalStatusComplete
81+
err := state.UpsertEvals(1000, []*structs.Evaluation{eval})
82+
if err != nil {
83+
t.Fatalf("err: %v", err)
84+
}
85+
86+
// Insert "dead" alloc
87+
alloc := mock.Alloc()
88+
alloc.EvalID = eval.ID
89+
alloc.DesiredStatus = structs.AllocDesiredStatusFailed
90+
err = state.UpsertAllocs(1001, []*structs.Allocation{alloc})
91+
if err != nil {
92+
t.Fatalf("err: %v", err)
93+
}
94+
95+
// Insert "running" alloc
96+
alloc2 := mock.Alloc()
97+
alloc2.EvalID = eval.ID
98+
err = state.UpsertAllocs(1002, []*structs.Allocation{alloc2})
99+
if err != nil {
100+
t.Fatalf("err: %v", err)
101+
}
102+
103+
// Update the time tables to make this work
104+
tt := s1.fsm.TimeTable()
105+
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
106+
107+
// Create a core scheduler
108+
snap, err := state.Snapshot()
109+
if err != nil {
110+
t.Fatalf("err: %v", err)
111+
}
112+
core := NewCoreScheduler(s1, snap)
113+
114+
// Attempt the GC
115+
gc := s1.coreJobEval(structs.CoreJobEvalGC)
116+
gc.ModifyIndex = 2000
117+
err = core.Process(gc)
118+
if err != nil {
119+
t.Fatalf("err: %v", err)
120+
}
121+
122+
// Should not be gone
123+
out, err := state.EvalByID(eval.ID)
124+
if err != nil {
125+
t.Fatalf("err: %v", err)
126+
}
127+
if out == nil {
128+
t.Fatalf("bad: %v", out)
129+
}
130+
131+
outA, err := state.AllocByID(alloc2.ID)
132+
if err != nil {
133+
t.Fatalf("err: %v", err)
134+
}
135+
if outA == nil {
136+
t.Fatalf("bad: %v", outA)
137+
}
138+
139+
// Should be gone
140+
outB, err := state.AllocByID(alloc.ID)
141+
if err != nil {
142+
t.Fatalf("err: %v", err)
143+
}
144+
if outB != nil {
145+
t.Fatalf("bad: %v", outB)
146+
}
147+
}
148+
72149
func TestCoreScheduler_EvalGC_Batch_NoAllocs(t *testing.T) {
73150
s1 := testServer(t, nil)
74151
defer s1.Shutdown()
@@ -334,6 +411,108 @@ func TestCoreScheduler_NodeGC(t *testing.T) {
334411
}
335412
}
336413

414+
func TestCoreScheduler_NodeGC_TerminalAllocs(t *testing.T) {
415+
s1 := testServer(t, nil)
416+
defer s1.Shutdown()
417+
testutil.WaitForLeader(t, s1.RPC)
418+
419+
// Insert "dead" node
420+
state := s1.fsm.State()
421+
node := mock.Node()
422+
node.Status = structs.NodeStatusDown
423+
err := state.UpsertNode(1000, node)
424+
if err != nil {
425+
t.Fatalf("err: %v", err)
426+
}
427+
428+
// Insert a terminal alloc on that node
429+
alloc := mock.Alloc()
430+
alloc.DesiredStatus = structs.AllocDesiredStatusStop
431+
if err := state.UpsertAllocs(1001, []*structs.Allocation{alloc}); err != nil {
432+
t.Fatalf("err: %v", err)
433+
}
434+
435+
// Update the time tables to make this work
436+
tt := s1.fsm.TimeTable()
437+
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.NodeGCThreshold))
438+
439+
// Create a core scheduler
440+
snap, err := state.Snapshot()
441+
if err != nil {
442+
t.Fatalf("err: %v", err)
443+
}
444+
core := NewCoreScheduler(s1, snap)
445+
446+
// Attempt the GC
447+
gc := s1.coreJobEval(structs.CoreJobNodeGC)
448+
gc.ModifyIndex = 2000
449+
err = core.Process(gc)
450+
if err != nil {
451+
t.Fatalf("err: %v", err)
452+
}
453+
454+
// Should be gone
455+
out, err := state.NodeByID(node.ID)
456+
if err != nil {
457+
t.Fatalf("err: %v", err)
458+
}
459+
if out != nil {
460+
t.Fatalf("bad: %v", out)
461+
}
462+
}
463+
464+
func TestCoreScheduler_NodeGC_RunningAllocs(t *testing.T) {
465+
s1 := testServer(t, nil)
466+
defer s1.Shutdown()
467+
testutil.WaitForLeader(t, s1.RPC)
468+
469+
// Insert "dead" node
470+
state := s1.fsm.State()
471+
node := mock.Node()
472+
node.Status = structs.NodeStatusDown
473+
err := state.UpsertNode(1000, node)
474+
if err != nil {
475+
t.Fatalf("err: %v", err)
476+
}
477+
478+
// Insert a running alloc on that node
479+
alloc := mock.Alloc()
480+
alloc.NodeID = node.ID
481+
alloc.DesiredStatus = structs.AllocDesiredStatusRun
482+
alloc.ClientStatus = structs.AllocClientStatusRunning
483+
if err := state.UpsertAllocs(1001, []*structs.Allocation{alloc}); err != nil {
484+
t.Fatalf("err: %v", err)
485+
}
486+
487+
// Update the time tables to make this work
488+
tt := s1.fsm.TimeTable()
489+
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.NodeGCThreshold))
490+
491+
// Create a core scheduler
492+
snap, err := state.Snapshot()
493+
if err != nil {
494+
t.Fatalf("err: %v", err)
495+
}
496+
core := NewCoreScheduler(s1, snap)
497+
498+
// Attempt the GC
499+
gc := s1.coreJobEval(structs.CoreJobNodeGC)
500+
gc.ModifyIndex = 2000
501+
err = core.Process(gc)
502+
if err != nil {
503+
t.Fatalf("err: %v", err)
504+
}
505+
506+
// Should still be here
507+
out, err := state.NodeByID(node.ID)
508+
if err != nil {
509+
t.Fatalf("err: %v", err)
510+
}
511+
if out == nil {
512+
t.Fatalf("bad: %v", out)
513+
}
514+
}
515+
337516
func TestCoreScheduler_NodeGC_Force(t *testing.T) {
338517
s1 := testServer(t, nil)
339518
defer s1.Shutdown()

0 commit comments

Comments
 (0)