Skip to content

Commit 2cbfc83

Browse files
authored
Merge pull request #8168 from hashicorp/f-8158-add-preserve-counts
add `PreserveCounts` to `Job.Register`
2 parents 5a942f0 + 0284698 commit 2cbfc83

File tree

13 files changed

+272
-44
lines changed

13 files changed

+272
-44
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ FEATURES:
66

77
IMPROVEMENTS:
88

9+
* core: support for persisting previous task group counts when updating a job [[GH-8168](https://github.com/hashicorp/nomad/issues/8168)]
910
* api: Persist previous count with scaling events [[GH-8167](https://github.com/hashicorp/nomad/issues/8167)]
1011
* build: Updated to Go 1.14.4 [[GH-8172](https://github.com/hashicorp/nomad/issues/9172)]
1112

api/jobs.go

+8-15
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ type RegisterOptions struct {
8787
EnforceIndex bool
8888
ModifyIndex uint64
8989
PolicyOverride bool
90+
PreserveCounts bool
9091
}
9192

9293
// Register is used to register a new job. It returns the ID
@@ -105,17 +106,16 @@ func (j *Jobs) EnforceRegister(job *Job, modifyIndex uint64, q *WriteOptions) (*
105106
// of the evaluation, along with any errors encountered.
106107
func (j *Jobs) RegisterOpts(job *Job, opts *RegisterOptions, q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) {
107108
// Format the request
108-
req := &RegisterJobRequest{
109+
req := &JobRegisterRequest{
109110
Job: job,
110111
}
111112
if opts != nil {
112113
if opts.EnforceIndex {
113114
req.EnforceIndex = true
114115
req.JobModifyIndex = opts.ModifyIndex
115116
}
116-
if opts.PolicyOverride {
117-
req.PolicyOverride = true
118-
}
117+
req.PolicyOverride = opts.PolicyOverride
118+
req.PreserveCounts = opts.PreserveCounts
119119
}
120120

121121
var resp JobRegisterResponse
@@ -1035,25 +1035,18 @@ type JobRevertRequest struct {
10351035
WriteRequest
10361036
}
10371037

1038-
// JobUpdateRequest is used to update a job
1038+
// JobRegisterRequest is used to update a job
10391039
type JobRegisterRequest struct {
10401040
Job *Job
10411041
// If EnforceIndex is set then the job will only be registered if the passed
10421042
// JobModifyIndex matches the current Jobs index. If the index is zero, the
10431043
// register only occurs if the job is new.
1044-
EnforceIndex bool
1045-
JobModifyIndex uint64
1046-
PolicyOverride bool
1047-
1048-
WriteRequest
1049-
}
1050-
1051-
// RegisterJobRequest is used to serialize a job registration
1052-
type RegisterJobRequest struct {
1053-
Job *Job
10541044
EnforceIndex bool `json:",omitempty"`
10551045
JobModifyIndex uint64 `json:",omitempty"`
10561046
PolicyOverride bool `json:",omitempty"`
1047+
PreserveCounts bool `json:",omitempty"`
1048+
1049+
WriteRequest
10571050
}
10581051

10591052
// JobRegisterResponse is used to respond to a job registration

api/jobs_test.go

+142
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,148 @@ func TestJobs_Register(t *testing.T) {
4444
}
4545
}
4646

47+
func TestJobs_Register_PreserveCounts(t *testing.T) {
48+
t.Parallel()
49+
require := require.New(t)
50+
51+
c, s := makeClient(t, nil, nil)
52+
defer s.Stop()
53+
jobs := c.Jobs()
54+
55+
// Listing jobs before registering returns nothing
56+
resp, _, err := jobs.List(nil)
57+
require.Nil(err)
58+
require.Emptyf(resp, "expected 0 jobs, got: %d", len(resp))
59+
60+
// Create a job
61+
task := NewTask("task", "exec").
62+
SetConfig("command", "/bin/sleep").
63+
Require(&Resources{
64+
CPU: intToPtr(100),
65+
MemoryMB: intToPtr(256),
66+
}).
67+
SetLogConfig(&LogConfig{
68+
MaxFiles: intToPtr(1),
69+
MaxFileSizeMB: intToPtr(2),
70+
})
71+
72+
group1 := NewTaskGroup("group1", 1).
73+
AddTask(task).
74+
RequireDisk(&EphemeralDisk{
75+
SizeMB: intToPtr(25),
76+
})
77+
group2 := NewTaskGroup("group2", 2).
78+
AddTask(task).
79+
RequireDisk(&EphemeralDisk{
80+
SizeMB: intToPtr(25),
81+
})
82+
83+
job := NewBatchJob("job", "redis", "global", 1).
84+
AddDatacenter("dc1").
85+
AddTaskGroup(group1).
86+
AddTaskGroup(group2)
87+
88+
// Create a job and register it
89+
resp2, wm, err := jobs.Register(job, nil)
90+
require.Nil(err)
91+
require.NotNil(resp2)
92+
require.NotEmpty(resp2.EvalID)
93+
assertWriteMeta(t, wm)
94+
95+
// Update the job, new groups to test PreserveCounts
96+
group1.Count = nil
97+
group2.Count = intToPtr(0)
98+
group3 := NewTaskGroup("group3", 3).
99+
AddTask(task).
100+
RequireDisk(&EphemeralDisk{
101+
SizeMB: intToPtr(25),
102+
})
103+
job.AddTaskGroup(group3)
104+
105+
// Update the job, with PreserveCounts = true
106+
_, _, err = jobs.RegisterOpts(job, &RegisterOptions{
107+
PreserveCounts: true,
108+
}, nil)
109+
require.NoError(err)
110+
111+
// Query the job scale status
112+
status, _, err := jobs.ScaleStatus(*job.ID, nil)
113+
require.NoError(err)
114+
require.Equal(1, status.TaskGroups["group1"].Desired) // present and nil => preserved
115+
require.Equal(2, status.TaskGroups["group2"].Desired) // present and specified => preserved
116+
require.Equal(3, status.TaskGroups["group3"].Desired) // new => as specific in job spec
117+
}
118+
119+
func TestJobs_Register_NoPreserveCounts(t *testing.T) {
120+
t.Parallel()
121+
require := require.New(t)
122+
123+
c, s := makeClient(t, nil, nil)
124+
defer s.Stop()
125+
jobs := c.Jobs()
126+
127+
// Listing jobs before registering returns nothing
128+
resp, _, err := jobs.List(nil)
129+
require.Nil(err)
130+
require.Emptyf(resp, "expected 0 jobs, got: %d", len(resp))
131+
132+
// Create a job
133+
task := NewTask("task", "exec").
134+
SetConfig("command", "/bin/sleep").
135+
Require(&Resources{
136+
CPU: intToPtr(100),
137+
MemoryMB: intToPtr(256),
138+
}).
139+
SetLogConfig(&LogConfig{
140+
MaxFiles: intToPtr(1),
141+
MaxFileSizeMB: intToPtr(2),
142+
})
143+
144+
group1 := NewTaskGroup("group1", 1).
145+
AddTask(task).
146+
RequireDisk(&EphemeralDisk{
147+
SizeMB: intToPtr(25),
148+
})
149+
group2 := NewTaskGroup("group2", 2).
150+
AddTask(task).
151+
RequireDisk(&EphemeralDisk{
152+
SizeMB: intToPtr(25),
153+
})
154+
155+
job := NewBatchJob("job", "redis", "global", 1).
156+
AddDatacenter("dc1").
157+
AddTaskGroup(group1).
158+
AddTaskGroup(group2)
159+
160+
// Create a job and register it
161+
resp2, wm, err := jobs.Register(job, nil)
162+
require.Nil(err)
163+
require.NotNil(resp2)
164+
require.NotEmpty(resp2.EvalID)
165+
assertWriteMeta(t, wm)
166+
167+
// Update the job, new groups to test PreserveCounts
168+
group1.Count = intToPtr(0)
169+
group2.Count = nil
170+
group3 := NewTaskGroup("group3", 3).
171+
AddTask(task).
172+
RequireDisk(&EphemeralDisk{
173+
SizeMB: intToPtr(25),
174+
})
175+
job.AddTaskGroup(group3)
176+
177+
// Update the job, with PreserveCounts = default [false]
178+
_, _, err = jobs.Register(job, nil)
179+
require.NoError(err)
180+
181+
// Query the job scale status
182+
status, _, err := jobs.ScaleStatus(*job.ID, nil)
183+
require.NoError(err)
184+
require.Equal(0, status.TaskGroups["group1"].Desired) // present => as specified
185+
require.Equal(1, status.TaskGroups["group2"].Desired) // nil => default (1)
186+
require.Equal(3, status.TaskGroups["group3"].Desired) // new => as specified
187+
}
188+
47189
func TestJobs_Validate(t *testing.T) {
48190
t.Parallel()
49191
c, s := makeClient(t, nil, nil)

command/agent/job_endpoint.go

+1
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,7 @@ func (s *HTTPServer) jobUpdate(resp http.ResponseWriter, req *http.Request,
413413
EnforceIndex: args.EnforceIndex,
414414
JobModifyIndex: args.JobModifyIndex,
415415
PolicyOverride: args.PolicyOverride,
416+
PreserveCounts: args.PreserveCounts,
416417
WriteRequest: structs.WriteRequest{
417418
Region: sJob.Region,
418419
AuthToken: args.WriteRequest.SecretID,

command/job_inspect.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,11 @@ func (c *JobInspectCommand) Run(args []string) int {
162162
}
163163

164164
// Print the contents of the job
165-
req := api.RegisterJobRequest{Job: job}
165+
req := struct {
166+
Job *api.Job
167+
}{
168+
Job: job,
169+
}
166170
f, err := DataFormat("json", "")
167171
if err != nil {
168172
c.Ui.Error(fmt.Sprintf("Error getting formatter: %s", err))

command/job_run.go

+13-5
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ Run Options:
8686
-policy-override
8787
Sets the flag to force override any soft mandatory Sentinel policies.
8888
89+
-preserve-counts
90+
If set, the existing task group counts will be preserved when updating a job.
91+
8992
-consul-token
9093
If set, the passed Consul token is stored in the job before sending to the
9194
Nomad servers. This allows passing the Consul token without storing it in
@@ -118,6 +121,7 @@ func (c *JobRunCommand) AutocompleteFlags() complete.Flags {
118121
"-vault-token": complete.PredictAnything,
119122
"-output": complete.PredictNothing,
120123
"-policy-override": complete.PredictNothing,
124+
"-preserve-counts": complete.PredictNothing,
121125
})
122126
}
123127

@@ -128,7 +132,7 @@ func (c *JobRunCommand) AutocompleteArgs() complete.Predictor {
128132
func (c *JobRunCommand) Name() string { return "job run" }
129133

130134
func (c *JobRunCommand) Run(args []string) int {
131-
var detach, verbose, output, override bool
135+
var detach, verbose, output, override, preserveCounts bool
132136
var checkIndexStr, consulToken, vaultToken string
133137

134138
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
@@ -137,6 +141,7 @@ func (c *JobRunCommand) Run(args []string) int {
137141
flags.BoolVar(&verbose, "verbose", false, "")
138142
flags.BoolVar(&output, "output", false, "")
139143
flags.BoolVar(&override, "policy-override", false, "")
144+
flags.BoolVar(&preserveCounts, "preserve-counts", false, "")
140145
flags.StringVar(&checkIndexStr, "check-index", "", "")
141146
flags.StringVar(&consulToken, "consul-token", "", "")
142147
flags.StringVar(&vaultToken, "vault-token", "", "")
@@ -208,7 +213,11 @@ func (c *JobRunCommand) Run(args []string) int {
208213
}
209214

210215
if output {
211-
req := api.RegisterJobRequest{Job: job}
216+
req := struct {
217+
Job *api.Job
218+
}{
219+
Job: job,
220+
}
212221
buf, err := json.MarshalIndent(req, "", " ")
213222
if err != nil {
214223
c.Ui.Error(fmt.Sprintf("Error converting job: %s", err))
@@ -232,9 +241,8 @@ func (c *JobRunCommand) Run(args []string) int {
232241
opts.EnforceIndex = true
233242
opts.ModifyIndex = checkIndex
234243
}
235-
if override {
236-
opts.PolicyOverride = true
237-
}
244+
opts.PolicyOverride = override
245+
opts.PreserveCounts = preserveCounts
238246

239247
// Submit the job
240248
resp, _, err := client.Jobs().RegisterOpts(job, opts, nil)

nomad/job_endpoint.go

+13
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,19 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
278278
// Clear the Consul token
279279
args.Job.ConsulToken = ""
280280

281+
// Preserve the existing task group counts, if so requested
282+
if existingJob != nil && args.PreserveCounts {
283+
prevCounts := make(map[string]int)
284+
for _, tg := range existingJob.TaskGroups {
285+
prevCounts[tg.Name] = tg.Count
286+
}
287+
for _, tg := range args.Job.TaskGroups {
288+
if count, ok := prevCounts[tg.Name]; ok {
289+
tg.Count = count
290+
}
291+
}
292+
}
293+
281294
// Check if the job has changed at all
282295
if existingJob == nil || existingJob.SpecChanged(args.Job) {
283296
// Set the submit time

nomad/job_endpoint_test.go

+61
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,67 @@ func TestJobEndpoint_Register(t *testing.T) {
108108
}
109109
}
110110

111+
func TestJobEndpoint_Register_PreserveCounts(t *testing.T) {
112+
t.Parallel()
113+
require := require.New(t)
114+
115+
s1, cleanupS1 := TestServer(t, func(c *Config) {
116+
c.NumSchedulers = 0 // Prevent automatic dequeue
117+
})
118+
defer cleanupS1()
119+
codec := rpcClient(t, s1)
120+
testutil.WaitForLeader(t, s1.RPC)
121+
122+
// Create the register request
123+
job := mock.Job()
124+
job.TaskGroups[0].Name = "group1"
125+
job.TaskGroups[0].Count = 10
126+
job.Canonicalize()
127+
128+
// Register the job
129+
require.NoError(msgpackrpc.CallWithCodec(codec, "Job.Register", &structs.JobRegisterRequest{
130+
Job: job,
131+
WriteRequest: structs.WriteRequest{
132+
Region: "global",
133+
Namespace: job.Namespace,
134+
},
135+
}, &structs.JobRegisterResponse{}))
136+
137+
// Check the job in the FSM state
138+
state := s1.fsm.State()
139+
out, err := state.JobByID(nil, job.Namespace, job.ID)
140+
require.NoError(err)
141+
require.NotNil(out)
142+
require.Equal(10, out.TaskGroups[0].Count)
143+
144+
// New version:
145+
// new "group2" with 2 instances
146+
// "group1" goes from 10 -> 0 in the spec
147+
job = job.Copy()
148+
job.TaskGroups[0].Count = 0 // 10 -> 0 in the job spec
149+
job.TaskGroups = append(job.TaskGroups, job.TaskGroups[0].Copy())
150+
job.TaskGroups[1].Name = "group2"
151+
job.TaskGroups[1].Count = 2
152+
153+
// Perform the update
154+
require.NoError(msgpackrpc.CallWithCodec(codec, "Job.Register", &structs.JobRegisterRequest{
155+
Job: job,
156+
PreserveCounts: true,
157+
WriteRequest: structs.WriteRequest{
158+
Region: "global",
159+
Namespace: job.Namespace,
160+
},
161+
}, &structs.JobRegisterResponse{}))
162+
163+
// Check the job in the FSM state
164+
out, err = state.JobByID(nil, job.Namespace, job.ID)
165+
require.NoError(err)
166+
require.NotNil(out)
167+
require.Equal(10, out.TaskGroups[0].Count) // should not change
168+
require.Equal(2, out.TaskGroups[1].Count) // should be as in job spec
169+
}
170+
171+
111172
func TestJobEndpoint_Register_Connect(t *testing.T) {
112173
t.Parallel()
113174
require := require.New(t)

nomad/structs/structs.go

+5
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,11 @@ type JobRegisterRequest struct {
549549
EnforceIndex bool
550550
JobModifyIndex uint64
551551

552+
// PreserveCounts indicates that during job update, existing task group
553+
// counts should be preserved, over those specified in the new job spec
554+
// PreserveCounts is ignored for newly created jobs.
555+
PreserveCounts bool
556+
552557
// PolicyOverride is set when the user is attempting to override any policies
553558
PolicyOverride bool
554559

0 commit comments

Comments
 (0)