From cd3101d624f24a6c35bf7c2363b34b79e71ac7eb Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 27 Jun 2024 16:54:06 -0400 Subject: [PATCH] scale: add `-check-index` to `job scale` command (#23457) The RPC handler for scaling a job passes flags to enforce the job modify index is unchanged when it makes the write to Raft. But its only checking against the existing job modify index at the time the RPC handler snapshots the state store, so it can only enforce consistency for its own validation. In clusters with automated scaling, it would be useful to expose the enforce index options to the API, so that cluster admins can enforce that scaling only happens when the job state is consistent with a state they've previously seen in other API calls. Add this option to the CLI and API and have the RPC handler check them if asked. Fixes: https://github.com/hashicorp/nomad/issues/23444 --- .changelog/23457.txt | 3 ++ api/jobs.go | 14 ++++++- api/scaling.go | 5 +++ command/agent/job_endpoint.go | 1 + command/job_scale.go | 26 ++++++++++-- nomad/job_endpoint.go | 8 ++++ nomad/job_endpoint_test.go | 46 +++++++++++++++++++++ nomad/structs/structs.go | 8 ++++ website/content/api-docs/jobs.mdx | 35 ++++++++++------ website/content/docs/commands/job/scale.mdx | 5 +++ 10 files changed, 134 insertions(+), 17 deletions(-) create mode 100644 .changelog/23457.txt diff --git a/.changelog/23457.txt b/.changelog/23457.txt new file mode 100644 index 00000000000..a5e658d2c62 --- /dev/null +++ b/.changelog/23457.txt @@ -0,0 +1,3 @@ +```release-note:improvement +scaling: Added `-check-index` support to `job scale` command +``` diff --git a/api/jobs.go b/api/jobs.go index 8ac8555f56f..02c6b226da3 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -215,8 +215,7 @@ func (j *Jobs) Info(jobID string, q *QueryOptions) (*Job, *QueryMeta, error) { return &resp, qm, nil } -// Scale is used to retrieve information about a particular -// job given its unique ID. +// Scale is used to scale a job. func (j *Jobs) Scale(jobID, group string, count *int, message string, error bool, meta map[string]interface{}, q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) { @@ -242,6 +241,17 @@ func (j *Jobs) Scale(jobID, group string, count *int, message string, error bool return &resp, qm, nil } +// ScaleWithRequest is used to scale a job, giving the caller complete control +// over the ScalingRequest +func (j *Jobs) ScaleWithRequest(jobID string, req *ScalingRequest, q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) { + var resp JobRegisterResponse + qm, err := j.client.put(fmt.Sprintf("/v1/job/%s/scale", url.PathEscape(jobID)), req, &resp, q) + if err != nil { + return nil, nil, err + } + return &resp, qm, nil +} + // ScaleStatus is used to retrieve information about a particular // job given its unique ID. func (j *Jobs) ScaleStatus(jobID string, q *QueryOptions) (*JobScaleStatusResponse, *QueryMeta, error) { diff --git a/api/scaling.go b/api/scaling.go index e3f49d0363b..cad20bd3fb6 100644 --- a/api/scaling.go +++ b/api/scaling.go @@ -57,8 +57,13 @@ type ScalingRequest struct { Error bool Meta map[string]interface{} WriteRequest + // this is effectively a job update, so we need the ability to override policy. PolicyOverride bool + + // If JobModifyIndex is set then the job will only be scaled if it matches + // the current Jobs index. The JobModifyIndex is ignored if 0. + JobModifyIndex uint64 } // ScalingPolicy is the user-specified API object for an autoscaling policy diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 53d1c0f370e..b12427d9795 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -668,6 +668,7 @@ func (s *HTTPServer) jobScaleAction(resp http.ResponseWriter, req *http.Request, Message: args.Message, Error: args.Error, Meta: args.Meta, + JobModifyIndex: args.JobModifyIndex, } // parseWriteRequest overrides Namespace, Region and AuthToken // based on values from the original http request diff --git a/command/job_scale.go b/command/job_scale.go index 5daf117c619..626f27c3394 100644 --- a/command/job_scale.go +++ b/command/job_scale.go @@ -11,6 +11,7 @@ import ( "time" "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/helper/pointer" "github.com/mitchellh/cli" "github.com/posener/complete" ) @@ -49,6 +50,11 @@ General Options: Scale Options: + -check-index + If set, the job is only scaled if the passed job modify index matches the + server side version. Ignored if value of zero is passed. If a non-zero value + is passed, it ensures that the job is being updated from a known state. + -detach Return immediately instead of entering monitor mode. After job scaling, the evaluation ID will be printed to the screen, which can be used to @@ -68,8 +74,9 @@ func (j *JobScaleCommand) Synopsis() string { func (j *JobScaleCommand) AutocompleteFlags() complete.Flags { return mergeAutocompleteFlags(j.Meta.AutocompleteFlags(FlagSetClient), complete.Flags{ - "-detach": complete.PredictNothing, - "-verbose": complete.PredictNothing, + "-check-index": complete.PredictNothing, + "-detach": complete.PredictNothing, + "-verbose": complete.PredictNothing, }) } @@ -79,9 +86,11 @@ func (j *JobScaleCommand) Name() string { return "job scale" } // Run satisfies the cli.Command Run function. func (j *JobScaleCommand) Run(args []string) int { var detach, verbose bool + var checkIndex uint64 flags := j.Meta.FlagSet(j.Name(), FlagSetClient) flags.Usage = func() { j.Ui.Output(j.Help()) } + flags.Uint64Var(&checkIndex, "check-index", 0, "") flags.BoolVar(&detach, "detach", false, "") flags.BoolVar(&verbose, "verbose", false, "") if err := flags.Parse(args); err != nil { @@ -144,7 +153,18 @@ func (j *JobScaleCommand) Run(args []string) int { // Perform the scaling action. w := &api.WriteOptions{Namespace: namespace} - resp, _, err := client.Jobs().Scale(jobID, groupString, &count, msg, false, nil, w) + req := &api.ScalingRequest{ + Count: pointer.Of(int64(count)), + Target: map[string]string{ + "Job": jobID, + "Group": groupString, + }, + Message: msg, + PolicyOverride: false, + JobModifyIndex: checkIndex, + } + + resp, _, err := client.Jobs().ScaleWithRequest(jobID, req, w) if err != nil { j.Ui.Error(fmt.Sprintf("Error submitting scaling request: %s", err)) return 1 diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index ce75e35d491..d30fa2b6bf3 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -1094,6 +1094,14 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes return structs.NewErrRPCCoded(400, "job scaling blocked due to active deployment") } + // If JobModifyIndex set, check it before trying to apply + if args.JobModifyIndex > 0 { + if args.JobModifyIndex != job.JobModifyIndex { + return fmt.Errorf("%s %d: job exists with conflicting job modify index: %d", + RegisterEnforceIndexErrPrefix, args.JobModifyIndex, job.JobModifyIndex) + } + } + // Commit the job update _, jobModifyIndex, err := j.srv.raftApply( structs.JobRegisterRequestType, diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index e975fa9b5fd..0595a3327aa 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -7749,6 +7749,52 @@ func TestJobEndpoint_Scale_DeploymentBlocking(t *testing.T) { } } +func TestJobEndpoint_ScaleEnforceIndex(t *testing.T) { + ci.Parallel(t) + + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + store := s1.fsm.State() + + job := mock.Job() + originalCount := job.TaskGroups[0].Count + err := store.UpsertJob(structs.MsgTypeTestSetup, 1000, nil, job) + must.NoError(t, err) + + groupName := job.TaskGroups[0].Name + scale := &structs.JobScaleRequest{ + JobID: job.ID, + Target: map[string]string{ + structs.ScalingTargetGroup: groupName, + }, + Count: pointer.Of(int64(originalCount + 1)), + Message: "because of the load", + Meta: map[string]interface{}{ + "metrics": map[string]string{ + "1": "a", + "2": "b", + }, + "other": "value", + }, + PolicyOverride: false, + EnforceIndex: true, + JobModifyIndex: 1000000, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp structs.JobRegisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Scale", scale, &resp) + must.EqError(t, err, + "Enforcing job modify index 1000000: job exists with conflicting job modify index: 1000") + + events, _, _ := store.ScalingEventsByJob(nil, job.Namespace, job.ID) + must.Len(t, 0, events[groupName]) +} + func TestJobEndpoint_Scale_InformationalEventsShouldNotBeBlocked(t *testing.T) { ci.Parallel(t) require := require.New(t) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index b5b0a1ec1f8..06b21756143 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -859,8 +859,16 @@ type JobScaleRequest struct { Message string Error bool Meta map[string]interface{} + // PolicyOverride is set when the user is attempting to override any policies PolicyOverride bool + + // If EnforceIndex is set then the job will only be scaled if the passed + // JobModifyIndex matches the current Jobs index. If the index is zero, + // EnforceIndex is ignored. + EnforceIndex bool + JobModifyIndex uint64 + WriteRequest } diff --git a/website/content/api-docs/jobs.mdx b/website/content/api-docs/jobs.mdx index 8e7695f9097..5222ceedc8a 100644 --- a/website/content/api-docs/jobs.mdx +++ b/website/content/api-docs/jobs.mdx @@ -2389,24 +2389,35 @@ The table below shows this endpoint's support for - `Count` `(int: )` - Specifies the new task group count. -- `Target` `(json: required)` - JSON map containing the target of the scaling operation. - Must contain a field `Group` with the name of the task group that is the target of this scaling action. +- `EnforceIndex` `(bool: false)` - If set, the job will only be registered if + the passed `JobModifyIndex` matches the current job's index. If the index is + zero, the register only occurs if the job is new. This paradigm allows + check-and-set style job updating. -- `Message` `(string: )` - Description of the scale action, persisted as part of the scaling event. - Indicates information or reason for scaling; one of `Message` or `Error` must be provided. +- `Error` `(string: )` - Description of the scale action, persisted as + part of the scaling event. Indicates an error state preventing scaling; one + of `Message` or `Error` must be provided. -- `Error` `(string: )` - Description of the scale action, persisted as part of the scaling event. - Indicates an error state preventing scaling; one of `Message` or `Error` must be provided. +- `JobModifyIndex` `(int: 0)` - Specifies the `JobModifyIndex` to enforce the + current job is at. -- `Meta` `(json: )` - JSON block that is persisted as part of the scaling event. +- `Message` `(string: )` - Description of the scale action, persisted + as part of the scaling event. Indicates information or reason for scaling; + one of `Message` or `Error` must be provided. -- `PolicyOverride` `(bool: false)` - If set, any soft mandatory Sentinel policies - will be overridden. This allows a job to be scaled when it would be denied - by policy. +- `Meta` `(json: )` - JSON block that is persisted as part of the scaling event. - `namespace` `(string: "default")` - Specifies the target namespace. If ACL is -enabled, this value must match a namespace that the token is allowed to -access. This is specified as a query string parameter. + enabled, this value must match a namespace that the token is allowed to + access. This is specified as a query string parameter. + +- `PolicyOverride` `(bool: false)` - If set, any soft mandatory Sentinel + policies will be overridden. This allows a job to be scaled when it would be + denied by policy. + +- `Target` `(json: required)` - JSON map containing the target of the scaling + operation. Must contain a field `Group` with the name of the task group that + is the target of this scaling action. ### Sample Payload diff --git a/website/content/docs/commands/job/scale.mdx b/website/content/docs/commands/job/scale.mdx index f3171dc5f38..0647d7d2090 100644 --- a/website/content/docs/commands/job/scale.mdx +++ b/website/content/docs/commands/job/scale.mdx @@ -41,6 +41,11 @@ not used. ## Scale Options +- `-check-index`: If set, the job is only scaled if the passed job modify index + matches the server side version. Ignored if value of zero is passed. If a + non-zero value is passed, it ensures that the job is being updated from a + known state. + - `-detach`: Return immediately instead of entering monitor mode. After the scale command is submitted, a new evaluation ID is printed to the screen, which can be used to examine the evaluation using the [eval status] command.