-
Notifications
You must be signed in to change notification settings - Fork 2k
/
Copy pathpropertyset.go
276 lines (227 loc) · 8.21 KB
/
propertyset.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
package scheduler
import (
"fmt"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/structs"
)
// propertySet is used to track the values used for a particular property.
type propertySet struct {
// ctx is used to lookup the plan and state
ctx Context
// jobID is the job we are operating on
jobID string
// taskGroup is optionally set if the constraint is for a task group
taskGroup string
// constraint is the constraint this property set is checking
constraint *structs.Constraint
// errorBuilding marks whether there was an error when building the property
// set
errorBuilding error
// existingValues is the set of values for the given property that have been
// used by pre-existing allocations.
existingValues map[string]struct{}
// proposedValues is the set of values for the given property that are used
// from proposed allocations.
proposedValues map[string]struct{}
// clearedValues is the set of values that are no longer being used by
// existingValues because of proposed stops.
clearedValues map[string]struct{}
}
// NewPropertySet returns a new property set used to guarantee unique property
// values for new allocation placements.
func NewPropertySet(ctx Context, job *structs.Job) *propertySet {
p := &propertySet{
ctx: ctx,
jobID: job.ID,
existingValues: make(map[string]struct{}),
}
return p
}
// SetJobConstraint is used to parameterize the property set for a
// distinct_property constraint set at the job level.
func (p *propertySet) SetJobConstraint(constraint *structs.Constraint) {
// Store the constraint
p.constraint = constraint
p.populateExisting(constraint)
// Populate the proposed when setting the constraint. We do this because
// when detecting if we can inplace update an allocation we stage an
// eviction and then select. This means the plan has an eviction before a
// single select has finished.
p.PopulateProposed()
}
// SetTGConstraint is used to parameterize the property set for a
// distinct_property constraint set at the task group level. The inputs are the
// constraint and the task group name.
func (p *propertySet) SetTGConstraint(constraint *structs.Constraint, taskGroup string) {
// Store that this is for a task group
p.taskGroup = taskGroup
// Store the constraint
p.constraint = constraint
p.populateExisting(constraint)
// Populate the proposed when setting the constraint. We do this because
// when detecting if we can inplace update an allocation we stage an
// eviction and then select. This means the plan has an eviction before a
// single select has finished.
p.PopulateProposed()
}
// populateExisting is a helper shared when setting the constraint to populate
// the existing values.
func (p *propertySet) populateExisting(constraint *structs.Constraint) {
// Retrieve all previously placed allocations
ws := memdb.NewWatchSet()
allocs, err := p.ctx.State().AllocsByJob(ws, p.jobID, false)
if err != nil {
p.errorBuilding = fmt.Errorf("failed to get job's allocations: %v", err)
p.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: %v", p.errorBuilding)
return
}
// Filter to the correct set of allocs
allocs = p.filterAllocs(allocs, true)
// Get all the nodes that have been used by the allocs
nodes, err := p.buildNodeMap(allocs)
if err != nil {
p.errorBuilding = err
p.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: %v", err)
return
}
// Build existing properties map
p.populateProperties(allocs, nodes, p.existingValues)
}
// PopulateProposed populates the proposed values and recomputes any cleared
// value. It should be called whenever the plan is updated to ensure correct
// results when checking an option.
func (p *propertySet) PopulateProposed() {
// Reset the proposed properties
p.proposedValues = make(map[string]struct{})
p.clearedValues = make(map[string]struct{})
// Gather the set of proposed stops.
var stopping []*structs.Allocation
for _, updates := range p.ctx.Plan().NodeUpdate {
stopping = append(stopping, updates...)
}
stopping = p.filterAllocs(stopping, false)
// Gather the proposed allocations
var proposed []*structs.Allocation
for _, pallocs := range p.ctx.Plan().NodeAllocation {
proposed = append(proposed, pallocs...)
}
proposed = p.filterAllocs(proposed, true)
// Get the used nodes
both := make([]*structs.Allocation, 0, len(stopping)+len(proposed))
both = append(both, stopping...)
both = append(both, proposed...)
nodes, err := p.buildNodeMap(both)
if err != nil {
p.errorBuilding = err
p.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: %v", err)
return
}
// Populate the cleared values
p.populateProperties(stopping, nodes, p.clearedValues)
// Populate the proposed values
p.populateProperties(proposed, nodes, p.proposedValues)
// Remove any cleared value that is now being used by the proposed allocs
for value := range p.proposedValues {
delete(p.clearedValues, value)
}
}
// SatisfiesDistinctProperties checks if the option satisfies the
// distinct_property constraints given the existing placements and proposed
// placements. If the option does not satisfy the constraints an explanation is
// given.
func (p *propertySet) SatisfiesDistinctProperties(option *structs.Node, tg string) (bool, string) {
// Check if there was an error building
if p.errorBuilding != nil {
return false, p.errorBuilding.Error()
}
// Get the nodes property value
nValue, ok := getProperty(option, p.constraint.LTarget)
if !ok {
return false, fmt.Sprintf("missing property %q", p.constraint.LTarget)
}
// both is used to iterate over both the proposed and existing used
// properties
bothAll := []map[string]struct{}{p.existingValues, p.proposedValues}
// Check if the nodes value has already been used.
for _, usedProperties := range bothAll {
// Check if the nodes value has been used
_, used := usedProperties[nValue]
if !used {
continue
}
// Check if the value has been cleared from a proposed stop
if _, cleared := p.clearedValues[nValue]; cleared {
continue
}
return false, fmt.Sprintf("distinct_property: %s=%s already used", p.constraint.LTarget, nValue)
}
return true, ""
}
// filterAllocs filters a set of allocations to just be those that are running
// and if the property set is operation at a task group level, for allocations
// for that task group
func (p *propertySet) filterAllocs(allocs []*structs.Allocation, filterTerminal bool) []*structs.Allocation {
n := len(allocs)
for i := 0; i < n; i++ {
remove := false
if filterTerminal {
remove = allocs[i].TerminalStatus()
}
// If the constraint is on the task group filter the allocations to just
// those on the task group
if p.taskGroup != "" {
remove = remove || allocs[i].TaskGroup != p.taskGroup
}
if remove {
allocs[i], allocs[n-1] = allocs[n-1], nil
i--
n--
}
}
return allocs[:n]
}
// buildNodeMap takes a list of allocations and returns a map of the nodes used
// by those allocations
func (p *propertySet) buildNodeMap(allocs []*structs.Allocation) (map[string]*structs.Node, error) {
// Get all the nodes that have been used by the allocs
nodes := make(map[string]*structs.Node)
ws := memdb.NewWatchSet()
for _, alloc := range allocs {
if _, ok := nodes[alloc.NodeID]; ok {
continue
}
node, err := p.ctx.State().NodeByID(ws, alloc.NodeID)
if err != nil {
return nil, fmt.Errorf("failed to lookup node ID %q: %v", alloc.NodeID, err)
}
nodes[alloc.NodeID] = node
}
return nodes, nil
}
// populateProperties goes through all allocations and builds up the used
// properties from the nodes storing the results in the passed properties map.
func (p *propertySet) populateProperties(allocs []*structs.Allocation, nodes map[string]*structs.Node,
properties map[string]struct{}) {
for _, alloc := range allocs {
nProperty, ok := getProperty(nodes[alloc.NodeID], p.constraint.LTarget)
if !ok {
continue
}
properties[nProperty] = struct{}{}
}
}
// getProperty is used to lookup the property value on the node
func getProperty(n *structs.Node, property string) (string, bool) {
if n == nil || property == "" {
return "", false
}
val, ok := resolveConstraintTarget(property, n)
if !ok {
return "", false
}
nodeValue, ok := val.(string)
if !ok {
return "", false
}
return nodeValue, true
}