Skip to content

Commit

Permalink
Updates based on review.
Browse files Browse the repository at this point in the history
  • Loading branch information
jrasell committed May 3, 2024
1 parent 8fd33cb commit 039bf6a
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 40 deletions.
1 change: 0 additions & 1 deletion helper/raftutil/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ func jsonifyJobBatchDeregisterRequest(v *structs.JobBatchDeregisterRequest) inte
Evals []*structs.Evaluation
structs.WriteRequest
}
data.Evals = v.Evals
data.WriteRequest = v.WriteRequest

data.Jobs = make(map[string]*structs.JobDeregisterOptions, len(v.Jobs))
Expand Down
21 changes: 3 additions & 18 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,32 +739,17 @@ func (n *nomadFSM) applyBatchDeregisterJob(msgType structs.MessageType, buf []by
panic(fmt.Errorf("failed to decode request: %v", err))
}

// Perform all store updates atomically to ensure a consistent view for store readers.
// A partial update may increment the snapshot index, allowing eval brokers to process
// evals for jobs whose deregistering didn't get committed yet.
err := n.state.WithWriteTransaction(msgType, index, func(tx state.Txn) error {
// Perform all store updates atomically to ensure a consistent view for
// store readers.
return n.state.WithWriteTransaction(msgType, index, func(tx state.Txn) error {
for jobNS, options := range req.Jobs {
if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge, req.SubmitTime, false, tx); err != nil {
n.logger.Error("deregistering job failed", "job", jobNS.ID, "error", err)
return err
}
}

if err := n.state.UpsertEvalsTxn(index, req.Evals, tx); err != nil {
n.logger.Error("UpsertEvals failed", "error", err)
return err
}

return nil
})

if err != nil {
return err
}

// perform the side effects outside the transactions
n.handleUpsertedEvals(req.Evals)
return nil
}

// handleJobDeregister is used to deregister a job. Leaves error logging up to
Expand Down
11 changes: 4 additions & 7 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -942,10 +942,10 @@ func (j *Job) BatchDeregister(args *structs.JobBatchDeregisterRequest, reply *st
return err
}

// This RPC endpoint is only called from the Nomad server leader when
// performing garbage collection, therefore we can get away with a simple
// management check to ensure the caller can trigger this functionality as
// the leader token is always a management token.
// This RPC endpoint is only called from a Nomad server using the leader
// ACL when performing garbage collection, therefore we can get away with a
// simple management check to ensure the caller can trigger this
// functionality as the leader token is always a management token.
if !aclObj.IsManagement() {
return structs.ErrPermissionDenied
}
Expand All @@ -954,9 +954,6 @@ func (j *Job) BatchDeregister(args *structs.JobBatchDeregisterRequest, reply *st
if len(args.Jobs) == 0 {
return fmt.Errorf("given no jobs to deregister")
}
if len(args.Evals) != 0 {
return fmt.Errorf("evaluations should not be populated")
}

// Commit this update via Raft
_, index, err := j.srv.raftApply(structs.JobBatchDeregisterRequestType, args)
Expand Down
1 change: 0 additions & 1 deletion nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4076,7 +4076,6 @@ func TestJobEndpoint_BatchDeregister(t *testing.T) {
var resp2 structs.JobBatchDeregisterResponse
require.Nil(msgpackrpc.CallWithCodec(codec, structs.JobBatchDeregisterRPCMethod, dereg, &resp2))
require.NotZero(resp2.Index)
require.Nil(resp2.JobEvals)

// Check for the job in the FSM
state := s1.fsm.State()
Expand Down
13 changes: 0 additions & 13 deletions nomad/structs/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,6 @@ type JobBatchDeregisterRequest struct {
// Jobs is the set of jobs to deregister.
Jobs map[NamespacedID]*JobDeregisterOptions

// Evals is the set of evaluations to create.
//
// Deprecated: the job batch deregister endpoint no longer generates an
// eval per job.
Evals []*Evaluation

// SubmitTime is the time at which the job was requested to be stopped.
//
// Deprecated: The job batch deregister endpoint is only used by internal
Expand All @@ -61,13 +55,6 @@ type JobDeregisterOptions struct {

// JobBatchDeregisterResponse is used to respond to a batch job deregistration.
type JobBatchDeregisterResponse struct {

// JobEvals maps the job to its created evaluation.
//
// Deprecated: The job batch deregister endpoint is only used by internal
// garbage collection which no longer creates and evaluation per job GC.
JobEvals map[NamespacedID]string

QueryMeta
}

Expand Down

0 comments on commit 039bf6a

Please sign in to comment.