Skip to content

Commit

Permalink
core: do not create evaluations within batch deregister endpoint.
Browse files Browse the repository at this point in the history
The batch deregister RPC endpoint is only used by the internal
garbage collection process, it is not exposed via the HTTP API or
used anywhere else.

The GC process ensures that a job can only be removed from state
if all related evaluations and allocations are in a state that
means they can also be removed from state. This means that we do
not need to create evaluations when jobs are being deregistered
via this endpoint.
  • Loading branch information
jrasell committed May 2, 2024
1 parent 3f866a7 commit 8fd33cb
Show file tree
Hide file tree
Showing 7 changed files with 367 additions and 123 deletions.
2 changes: 1 addition & 1 deletion nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (c *CoreScheduler) jobReap(jobs []*structs.Job, leaderACL string) error {
// Call to the leader to issue the reap
for _, req := range c.partitionJobReap(jobs, leaderACL, structs.MaxUUIDsPerWriteRequest) {
var resp structs.JobBatchDeregisterResponse
if err := c.srv.RPC("Job.BatchDeregister", req, &resp); err != nil {
if err := c.srv.RPC(structs.JobBatchDeregisterRPCMethod, req, &resp); err != nil {
c.logger.Error("batch job reap failed", "error", err)
return err
}
Expand Down
127 changes: 127 additions & 0 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package nomad

import (
"fmt"
"math"
"testing"
"time"

Expand Down Expand Up @@ -1736,6 +1737,132 @@ func TestCoreScheduler_JobGC_Periodic(t *testing.T) {
}
}

func TestCoreScheduler_jobGC(t *testing.T) {
ci.Parallel(t)

// Create our test server and ensure we have a leader before continuing.
testServer, testServerCleanup := TestServer(t, nil)
defer testServerCleanup()
testutil.WaitForLeader(t, testServer.RPC)

testFn := func(inputJob *structs.Job) {

// Create and upsert a job which has a completed eval and 2 running
// allocations associated.
inputJob.Status = structs.JobStatusRunning

mockEval1 := mock.Eval()
mockEval1.JobID = inputJob.ID
mockEval1.Namespace = inputJob.Namespace
mockEval1.Status = structs.EvalStatusComplete

mockJob1Alloc1 := mock.Alloc()
mockJob1Alloc1.EvalID = mockEval1.ID
mockJob1Alloc1.JobID = inputJob.ID
mockJob1Alloc1.ClientStatus = structs.AllocClientStatusRunning

mockJob1Alloc2 := mock.Alloc()
mockJob1Alloc2.EvalID = mockEval1.ID
mockJob1Alloc2.JobID = inputJob.ID
mockJob1Alloc2.ClientStatus = structs.AllocClientStatusRunning

must.NoError(t,
testServer.fsm.State().UpsertJob(structs.MsgTypeTestSetup, 10, nil, inputJob))
must.NoError(t,
testServer.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 10, []*structs.Evaluation{mockEval1}))
must.NoError(t,
testServer.fsm.State().UpsertAllocs(structs.MsgTypeTestSetup, 10, []*structs.Allocation{
mockJob1Alloc1, mockJob1Alloc2}))

// Trigger a run of the job GC using the forced GC max index value to
// ensure all objects that can be GC'd are.
stateSnapshot, err := testServer.fsm.State().Snapshot()
must.NoError(t, err)
coreScheduler := NewCoreScheduler(testServer, stateSnapshot)

testJobGCEval1 := testServer.coreJobEval(structs.CoreJobForceGC, math.MaxUint64)
must.NoError(t, coreScheduler.Process(testJobGCEval1))

// Ensure the eval, allocations, and job are still present within state and
// have not been removed.
evalList, err := testServer.fsm.State().EvalsByJob(nil, inputJob.Namespace, inputJob.ID)
must.NoError(t, err)
must.Len(t, 1, evalList)
must.Eq(t, mockEval1, evalList[0])

allocList, err := testServer.fsm.State().AllocsByJob(nil, inputJob.Namespace, inputJob.ID, true)
must.NoError(t, err)
must.Len(t, 2, allocList)

jobInfo, err := testServer.fsm.State().JobByID(nil, inputJob.Namespace, inputJob.ID)
must.NoError(t, err)
must.Eq(t, inputJob, jobInfo)

// Mark the job as stopped.
inputJob.Stop = true

must.NoError(t,
testServer.fsm.State().UpsertJob(structs.MsgTypeTestSetup, 20, nil, inputJob))

// Force another GC, again the objects should exist in state, particularly
// the job as it has non-terminal allocs.
stateSnapshot, err = testServer.fsm.State().Snapshot()
must.NoError(t, err)
coreScheduler = NewCoreScheduler(testServer, stateSnapshot)

testJobGCEval2 := testServer.coreJobEval(structs.CoreJobForceGC, math.MaxUint64)
must.NoError(t, coreScheduler.Process(testJobGCEval2))

evalList, err = testServer.fsm.State().EvalsByJob(nil, inputJob.Namespace, inputJob.ID)
must.NoError(t, err)
must.Len(t, 1, evalList)
must.Eq(t, mockEval1, evalList[0])

allocList, err = testServer.fsm.State().AllocsByJob(nil, inputJob.Namespace, inputJob.ID, true)
must.NoError(t, err)
must.Len(t, 2, allocList)

jobInfo, err = testServer.fsm.State().JobByID(nil, inputJob.Namespace, inputJob.ID)
must.NoError(t, err)
must.Eq(t, inputJob, jobInfo)

// Mark that the allocations have reached a terminal state.
mockJob1Alloc1.DesiredStatus = structs.AllocDesiredStatusStop
mockJob1Alloc1.ClientStatus = structs.AllocClientStatusComplete
mockJob1Alloc2.DesiredStatus = structs.AllocDesiredStatusStop
mockJob1Alloc2.ClientStatus = structs.AllocClientStatusComplete

must.NoError(t,
testServer.fsm.State().UpsertAllocs(structs.MsgTypeTestSetup, 30, []*structs.Allocation{
mockJob1Alloc1, mockJob1Alloc2}))

// Force another GC. This time all objects are in a terminal state, so
// should be removed.
stateSnapshot, err = testServer.fsm.State().Snapshot()
must.NoError(t, err)
coreScheduler = NewCoreScheduler(testServer, stateSnapshot)

testJobGCEval3 := testServer.coreJobEval(structs.CoreJobForceGC, math.MaxUint64)
must.NoError(t, coreScheduler.Process(testJobGCEval3))

evalList, err = testServer.fsm.State().EvalsByJob(nil, inputJob.Namespace, inputJob.ID)
must.NoError(t, err)
must.Len(t, 0, evalList)

allocList, err = testServer.fsm.State().AllocsByJob(nil, inputJob.Namespace, inputJob.ID, true)
must.NoError(t, err)
must.Len(t, 0, allocList)

jobInfo, err = testServer.fsm.State().JobByID(nil, inputJob.Namespace, inputJob.ID)
must.NoError(t, err)
must.Nil(t, jobInfo)
}

for _, job := range []*structs.Job{mock.Job(), mock.BatchJob(), mock.SystemBatchJob()} {
testFn(job)
}
}

func TestCoreScheduler_DeploymentGC(t *testing.T) {
ci.Parallel(t)

Expand Down
70 changes: 13 additions & 57 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,10 +922,13 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
return nil
}

// BatchDeregister is used to remove a set of jobs from the cluster.
// BatchDeregister is used to remove a set of jobs from the cluster. This
// endpoint is used for garbage collection purposes only and is not exposed via
// the HTTP API. It is the responsibility of the caller to ensure the jobs are
// eligible for GC and that they have no running or pending allocs or evals.
func (j *Job) BatchDeregister(args *structs.JobBatchDeregisterRequest, reply *structs.JobBatchDeregisterResponse) error {
authErr := j.srv.Authenticate(j.ctx, args)
if done, err := j.srv.forward("Job.BatchDeregister", args, args, reply); done {
if done, err := j.srv.forward(structs.JobBatchDeregisterRPCMethod, args, args, reply); done {
return err
}
j.srv.MeasureRPCRate("job", structs.RateMetricWrite, args)
Expand All @@ -939,6 +942,14 @@ func (j *Job) BatchDeregister(args *structs.JobBatchDeregisterRequest, reply *st
return err
}

// This RPC endpoint is only called from the Nomad server leader when
// performing garbage collection, therefore we can get away with a simple
// management check to ensure the caller can trigger this functionality as
// the leader token is always a management token.
if !aclObj.IsManagement() {
return structs.ErrPermissionDenied
}

// Validate the arguments
if len(args.Jobs) == 0 {
return fmt.Errorf("given no jobs to deregister")
Expand All @@ -947,61 +958,6 @@ func (j *Job) BatchDeregister(args *structs.JobBatchDeregisterRequest, reply *st
return fmt.Errorf("evaluations should not be populated")
}

// Loop through checking for permissions
for jobNS := range args.Jobs {
// Check for submit-job permissions
if !aclObj.AllowNsOp(jobNS.Namespace, acl.NamespaceCapabilitySubmitJob) {
return structs.ErrPermissionDenied
}
}

// Grab a snapshot
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}

// Loop through to create evals
for jobNS, options := range args.Jobs {
if options == nil {
return fmt.Errorf("no deregister options provided for %v", jobNS)
}

job, err := snap.JobByID(nil, jobNS.Namespace, jobNS.ID)
if err != nil {
return err
}

// If the job is periodic or parameterized, we don't create an eval.
if job != nil && (job.IsPeriodic() || job.IsParameterized()) {
continue
}

priority := j.srv.config.JobDefaultPriority
jtype := structs.JobTypeService
if job != nil {
priority = job.Priority
jtype = job.Type
}

// Create a new evaluation
now := time.Now().UnixNano()
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: jobNS.Namespace,
Priority: priority,
Type: jtype,
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: jobNS.ID,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
args.Evals = append(args.Evals, eval)
}

args.SubmitTime = time.Now().UnixNano()

// Commit this update via Raft
_, index, err := j.srv.raftApply(structs.JobBatchDeregisterRequestType, args)
if err != nil {
Expand Down
39 changes: 5 additions & 34 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4074,8 +4074,9 @@ func TestJobEndpoint_BatchDeregister(t *testing.T) {
},
}
var resp2 structs.JobBatchDeregisterResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.BatchDeregister", dereg, &resp2))
require.Nil(msgpackrpc.CallWithCodec(codec, structs.JobBatchDeregisterRPCMethod, dereg, &resp2))
require.NotZero(resp2.Index)
require.Nil(resp2.JobEvals)

// Check for the job in the FSM
state := s1.fsm.State()
Expand All @@ -4087,26 +4088,6 @@ func TestJobEndpoint_BatchDeregister(t *testing.T) {
out, err = state.JobByID(nil, job2.Namespace, job2.ID)
require.Nil(err)
require.Nil(out)

// Lookup the evaluation
for jobNS, eval := range resp2.JobEvals {
expectedJob := job
if jobNS.ID != job.ID {
expectedJob = job2
}

eval, err := state.EvalByID(nil, eval)
require.Nil(err)
require.NotNil(eval)
require.EqualValues(resp2.Index, eval.CreateIndex)
require.Equal(expectedJob.Priority, eval.Priority)
require.Equal(expectedJob.Type, eval.Type)
require.Equal(structs.EvalTriggerJobDeregister, eval.TriggeredBy)
require.Equal(expectedJob.ID, eval.JobID)
require.Equal(structs.EvalStatusPending, eval.Status)
require.NotZero(eval.CreateTime)
require.NotZero(eval.ModifyTime)
}
}

func TestJobEndpoint_BatchDeregister_ACL(t *testing.T) {
Expand Down Expand Up @@ -4145,7 +4126,7 @@ func TestJobEndpoint_BatchDeregister_ACL(t *testing.T) {

// Expect failure for request without a token
var resp structs.JobBatchDeregisterResponse
err := msgpackrpc.CallWithCodec(codec, "Job.BatchDeregister", req, &resp)
err := msgpackrpc.CallWithCodec(codec, structs.JobBatchDeregisterRPCMethod, req, &resp)
require.NotNil(err)
require.True(structs.IsErrPermissionDenied(err))

Expand All @@ -4155,27 +4136,17 @@ func TestJobEndpoint_BatchDeregister_ACL(t *testing.T) {
req.AuthToken = invalidToken.SecretID

var invalidResp structs.JobDeregisterResponse
err = msgpackrpc.CallWithCodec(codec, "Job.BatchDeregister", req, &invalidResp)
err = msgpackrpc.CallWithCodec(codec, structs.JobBatchDeregisterRPCMethod, req, &invalidResp)
require.NotNil(err)
require.True(structs.IsErrPermissionDenied(err))

// Expect success with a valid management token
req.AuthToken = root.SecretID

var validResp structs.JobDeregisterResponse
err = msgpackrpc.CallWithCodec(codec, "Job.BatchDeregister", req, &validResp)
err = msgpackrpc.CallWithCodec(codec, structs.JobBatchDeregisterRPCMethod, req, &validResp)
require.Nil(err)
require.NotEqual(validResp.Index, 0)

// Expect success with a valid token
validToken := mock.CreatePolicyAndToken(t, state, 1005, "test-valid",
mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilitySubmitJob}))
req.AuthToken = validToken.SecretID

var validResp2 structs.JobDeregisterResponse
err = msgpackrpc.CallWithCodec(codec, "Job.BatchDeregister", req, &validResp2)
require.Nil(err)
require.NotEqual(validResp2.Index, 0)
}

func TestJobEndpoint_Deregister_Priority(t *testing.T) {
Expand Down
Loading

0 comments on commit 8fd33cb

Please sign in to comment.