Skip to content

Commit 176b48c

Browse files
author
Preetha
authored
Merge pull request #5545 from hashicorp/f-preemption-scheduler-refactor
Refactor scheduler package to enable preemption for batch/service jobs
2 parents 68470e9 + a134c16 commit 176b48c

7 files changed

+125
-79
lines changed

scheduler/generic_sched.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
468468

469469
// Compute penalty nodes for rescheduled allocs
470470
selectOptions := getSelectOptions(prevAllocation, preferredNode)
471-
option := s.stack.Select(tg, selectOptions)
471+
option := s.selectNextOption(tg, selectOptions)
472472

473473
// Store the available nodes by datacenter
474474
s.ctx.Metrics().NodesAvailable = byDC
@@ -527,6 +527,8 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
527527
}
528528
}
529529

530+
s.handlePreemptions(option, alloc, missing)
531+
530532
// Track the placement
531533
s.plan.AppendAlloc(alloc)
532534

scheduler/generic_sched_oss.go

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
// +build !pro,!ent
2+
3+
package scheduler
4+
5+
import "github.com/hashicorp/nomad/nomad/structs"
6+
7+
// selectNextOption calls the stack to get a node for placement
8+
func (s *GenericScheduler) selectNextOption(tg *structs.TaskGroup, selectOptions *SelectOptions) *RankedNode {
9+
return s.stack.Select(tg, selectOptions)
10+
}
11+
12+
// handlePreemptions sets relevant preeemption related fields. In OSS this is a no op.
13+
func (s *GenericScheduler) handlePreemptions(option *RankedNode, alloc *structs.Allocation, missing placementResult) {
14+
15+
}

scheduler/preemption.go

+14-2
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,9 @@ type Preemptor struct {
107107
// jobPriority is the priority of the job being preempted
108108
jobPriority int
109109

110+
// jobID is the ID of the job being preempted
111+
jobID *structs.NamespacedID
112+
110113
// nodeRemainingResources tracks available resources on the node after
111114
// accounting for running allocations
112115
nodeRemainingResources *structs.ComparableResources
@@ -118,10 +121,11 @@ type Preemptor struct {
118121
ctx Context
119122
}
120123

121-
func NewPreemptor(jobPriority int, ctx Context) *Preemptor {
124+
func NewPreemptor(jobPriority int, ctx Context, jobID *structs.NamespacedID) *Preemptor {
122125
return &Preemptor{
123126
currentPreemptions: make(map[structs.NamespacedID]map[string]int),
124127
jobPriority: jobPriority,
128+
jobID: jobID,
125129
allocDetails: make(map[string]*allocInfo),
126130
ctx: ctx,
127131
}
@@ -140,14 +144,22 @@ func (p *Preemptor) SetNode(node *structs.Node) {
140144

141145
// SetCandidates initializes the candidate set from which preemptions are chosen
142146
func (p *Preemptor) SetCandidates(allocs []*structs.Allocation) {
143-
p.currentAllocs = allocs
147+
// Reset candidate set
148+
p.currentAllocs = []*structs.Allocation{}
144149
for _, alloc := range allocs {
150+
// Ignore any allocations of the job being placed
151+
// This filters out any previous allocs of the job, and any new allocs in the plan
152+
if alloc.JobID == p.jobID.ID && alloc.Namespace == p.jobID.Namespace {
153+
continue
154+
}
155+
145156
maxParallel := 0
146157
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
147158
if tg != nil && tg.Migrate != nil {
148159
maxParallel = tg.Migrate.MaxParallel
149160
}
150161
p.allocDetails[alloc.ID] = &allocInfo{maxParallel: maxParallel, resources: alloc.ComparableResources()}
162+
p.currentAllocs = append(p.currentAllocs, alloc)
151163
}
152164
}
153165

scheduler/preemption_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -1249,6 +1249,7 @@ func TestPreemption(t *testing.T) {
12491249
node.ReservedResources = tc.nodeReservedCapacity
12501250

12511251
state, ctx := testContext(t)
1252+
12521253
nodes := []*RankedNode{
12531254
{
12541255
Node: node,
@@ -1267,6 +1268,9 @@ func TestPreemption(t *testing.T) {
12671268
}
12681269
static := NewStaticRankIterator(ctx, nodes)
12691270
binPackIter := NewBinPackIterator(ctx, static, true, tc.jobPriority)
1271+
job := mock.Job()
1272+
job.Priority = tc.jobPriority
1273+
binPackIter.SetJob(job)
12701274

12711275
taskGroup := &structs.TaskGroup{
12721276
EphemeralDisk: &structs.EphemeralDisk{},

scheduler/rank.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ type BinPackIterator struct {
147147
source RankIterator
148148
evict bool
149149
priority int
150+
jobId *structs.NamespacedID
150151
taskGroup *structs.TaskGroup
151152
}
152153

@@ -162,8 +163,9 @@ func NewBinPackIterator(ctx Context, source RankIterator, evict bool, priority i
162163
return iter
163164
}
164165

165-
func (iter *BinPackIterator) SetPriority(p int) {
166-
iter.priority = p
166+
func (iter *BinPackIterator) SetJob(job *structs.Job) {
167+
iter.priority = job.Priority
168+
iter.jobId = job.NamespacedID()
167169
}
168170

169171
func (iter *BinPackIterator) SetTaskGroup(taskGroup *structs.TaskGroup) {
@@ -211,7 +213,7 @@ OUTER:
211213
var allocsToPreempt []*structs.Allocation
212214

213215
// Initialize preemptor with node
214-
preemptor := NewPreemptor(iter.priority, iter.ctx)
216+
preemptor := NewPreemptor(iter.priority, iter.ctx, iter.jobId)
215217
preemptor.SetNode(option.Node)
216218

217219
// Count the number of existing preemptions

scheduler/stack.go

+6-73
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ type Stack interface {
3434
type SelectOptions struct {
3535
PenaltyNodeIDs map[string]struct{}
3636
PreferredNodes []*structs.Node
37+
Preempt bool
3738
}
3839

3940
// GenericStack is the Stack used for the Generic scheduler. It is
@@ -62,77 +63,6 @@ type GenericStack struct {
6263
scoreNorm *ScoreNormalizationIterator
6364
}
6465

65-
// NewGenericStack constructs a stack used for selecting service placements
66-
func NewGenericStack(batch bool, ctx Context) *GenericStack {
67-
// Create a new stack
68-
s := &GenericStack{
69-
batch: batch,
70-
ctx: ctx,
71-
}
72-
73-
// Create the source iterator. We randomize the order we visit nodes
74-
// to reduce collisions between schedulers and to do a basic load
75-
// balancing across eligible nodes.
76-
s.source = NewRandomIterator(ctx, nil)
77-
78-
// Create the quota iterator to determine if placements would result in the
79-
// quota attached to the namespace of the job to go over.
80-
s.quota = NewQuotaIterator(ctx, s.source)
81-
82-
// Attach the job constraints. The job is filled in later.
83-
s.jobConstraint = NewConstraintChecker(ctx, nil)
84-
85-
// Filter on task group drivers first as they are faster
86-
s.taskGroupDrivers = NewDriverChecker(ctx, nil)
87-
88-
// Filter on task group constraints second
89-
s.taskGroupConstraint = NewConstraintChecker(ctx, nil)
90-
91-
// Filter on task group devices
92-
s.taskGroupDevices = NewDeviceChecker(ctx)
93-
94-
// Create the feasibility wrapper which wraps all feasibility checks in
95-
// which feasibility checking can be skipped if the computed node class has
96-
// previously been marked as eligible or ineligible. Generally this will be
97-
// checks that only needs to examine the single node to determine feasibility.
98-
jobs := []FeasibilityChecker{s.jobConstraint}
99-
tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint, s.taskGroupDevices}
100-
s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs)
101-
102-
// Filter on distinct host constraints.
103-
s.distinctHostsConstraint = NewDistinctHostsIterator(ctx, s.wrappedChecks)
104-
105-
// Filter on distinct property constraints.
106-
s.distinctPropertyConstraint = NewDistinctPropertyIterator(ctx, s.distinctHostsConstraint)
107-
108-
// Upgrade from feasible to rank iterator
109-
rankSource := NewFeasibleRankIterator(ctx, s.distinctPropertyConstraint)
110-
111-
// Apply the bin packing, this depends on the resources needed
112-
// by a particular task group.
113-
114-
s.binPack = NewBinPackIterator(ctx, rankSource, false, 0)
115-
116-
// Apply the job anti-affinity iterator. This is to avoid placing
117-
// multiple allocations on the same node for this job.
118-
s.jobAntiAff = NewJobAntiAffinityIterator(ctx, s.binPack, "")
119-
120-
s.nodeReschedulingPenalty = NewNodeReschedulingPenaltyIterator(ctx, s.jobAntiAff)
121-
122-
s.nodeAffinity = NewNodeAffinityIterator(ctx, s.nodeReschedulingPenalty)
123-
124-
s.spread = NewSpreadIterator(ctx, s.nodeAffinity)
125-
126-
s.scoreNorm = NewScoreNormalizationIterator(ctx, s.spread)
127-
128-
// Apply a limit function. This is to avoid scanning *every* possible node.
129-
s.limit = NewLimitIterator(ctx, s.scoreNorm, 2, skipScoreThreshold, maxSkip)
130-
131-
// Select the node with the maximum score for placement
132-
s.maxScore = NewMaxScoreIterator(ctx, s.limit)
133-
return s
134-
}
135-
13666
func (s *GenericStack) SetNodes(baseNodes []*structs.Node) {
13767
// Shuffle base nodes
13868
shuffleNodes(baseNodes)
@@ -159,7 +89,7 @@ func (s *GenericStack) SetJob(job *structs.Job) {
15989
s.jobConstraint.SetConstraints(job.Constraints)
16090
s.distinctHostsConstraint.SetJob(job)
16191
s.distinctPropertyConstraint.SetJob(job)
162-
s.binPack.SetPriority(job.Priority)
92+
s.binPack.SetJob(job)
16393
s.jobAntiAff.SetJob(job)
16494
s.nodeAffinity.SetJob(job)
16595
s.spread.SetJob(job)
@@ -203,6 +133,9 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra
203133
s.distinctPropertyConstraint.SetTaskGroup(tg)
204134
s.wrappedChecks.SetTaskGroup(tg.Name)
205135
s.binPack.SetTaskGroup(tg)
136+
if options != nil {
137+
s.binPack.evict = options.Preempt
138+
}
206139
s.jobAntiAff.SetTaskGroup(tg)
207140
if options != nil {
208141
s.nodeReschedulingPenalty.SetPenaltyNodes(options.PenaltyNodeIDs)
@@ -306,7 +239,7 @@ func (s *SystemStack) SetNodes(baseNodes []*structs.Node) {
306239
func (s *SystemStack) SetJob(job *structs.Job) {
307240
s.jobConstraint.SetConstraints(job.Constraints)
308241
s.distinctPropertyConstraint.SetJob(job)
309-
s.binPack.SetPriority(job.Priority)
242+
s.binPack.SetJob(job)
310243
s.ctx.Eligibility().SetJob(job)
311244

312245
if contextual, ok := s.quota.(ContextualIterator); ok {

scheduler/stack_oss.go

+78
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// +build !pro,!ent
2+
3+
package scheduler
4+
5+
// NewGenericStack constructs a stack used for selecting service placements
6+
func NewGenericStack(batch bool, ctx Context) *GenericStack {
7+
// Create a new stack
8+
s := &GenericStack{
9+
batch: batch,
10+
ctx: ctx,
11+
}
12+
13+
// Create the source iterator. We randomize the order we visit nodes
14+
// to reduce collisions between schedulers and to do a basic load
15+
// balancing across eligible nodes.
16+
s.source = NewRandomIterator(ctx, nil)
17+
18+
// Create the quota iterator to determine if placements would result in the
19+
// quota attached to the namespace of the job to go over.
20+
s.quota = NewQuotaIterator(ctx, s.source)
21+
22+
// Attach the job constraints. The job is filled in later.
23+
s.jobConstraint = NewConstraintChecker(ctx, nil)
24+
25+
// Filter on task group drivers first as they are faster
26+
s.taskGroupDrivers = NewDriverChecker(ctx, nil)
27+
28+
// Filter on task group constraints second
29+
s.taskGroupConstraint = NewConstraintChecker(ctx, nil)
30+
31+
// Filter on task group devices
32+
s.taskGroupDevices = NewDeviceChecker(ctx)
33+
34+
// Create the feasibility wrapper which wraps all feasibility checks in
35+
// which feasibility checking can be skipped if the computed node class has
36+
// previously been marked as eligible or ineligible. Generally this will be
37+
// checks that only needs to examine the single node to determine feasibility.
38+
jobs := []FeasibilityChecker{s.jobConstraint}
39+
tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint, s.taskGroupDevices}
40+
s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs)
41+
42+
// Filter on distinct host constraints.
43+
s.distinctHostsConstraint = NewDistinctHostsIterator(ctx, s.wrappedChecks)
44+
45+
// Filter on distinct property constraints.
46+
s.distinctPropertyConstraint = NewDistinctPropertyIterator(ctx, s.distinctHostsConstraint)
47+
48+
// Upgrade from feasible to rank iterator
49+
rankSource := NewFeasibleRankIterator(ctx, s.distinctPropertyConstraint)
50+
51+
// Apply the bin packing, this depends on the resources needed
52+
// by a particular task group.
53+
s.binPack = NewBinPackIterator(ctx, rankSource, false, 0)
54+
55+
// Apply the job anti-affinity iterator. This is to avoid placing
56+
// multiple allocations on the same node for this job.
57+
s.jobAntiAff = NewJobAntiAffinityIterator(ctx, s.binPack, "")
58+
59+
// Apply node rescheduling penalty. This tries to avoid placing on a
60+
// node where the allocation failed previously
61+
s.nodeReschedulingPenalty = NewNodeReschedulingPenaltyIterator(ctx, s.jobAntiAff)
62+
63+
// Apply scores based on affinity stanza
64+
s.nodeAffinity = NewNodeAffinityIterator(ctx, s.nodeReschedulingPenalty)
65+
66+
// Apply scores based on spread stanza
67+
s.spread = NewSpreadIterator(ctx, s.nodeAffinity)
68+
69+
// Normalizes scores by averaging them across various scorers
70+
s.scoreNorm = NewScoreNormalizationIterator(ctx, s.spread)
71+
72+
// Apply a limit function. This is to avoid scanning *every* possible node.
73+
s.limit = NewLimitIterator(ctx, s.scoreNorm, 2, skipScoreThreshold, maxSkip)
74+
75+
// Select the node with the maximum score for placement
76+
s.maxScore = NewMaxScoreIterator(ctx, s.limit)
77+
return s
78+
}

0 commit comments

Comments
 (0)