From f12255e886802617228a9747705cb4f370bff9d2 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Mon, 18 Dec 2017 10:03:55 -0600 Subject: [PATCH 1/5] Update eval modify index as part of plan apply. --- nomad/eval_endpoint_test.go | 67 ++++++++++++++ nomad/fsm_test.go | 61 +++++++------ nomad/job_endpoint.go | 2 + nomad/plan_apply.go | 1 + nomad/plan_apply_test.go | 149 +++++++++++++------------------- nomad/state/state_store.go | 35 ++++++++ nomad/state/state_store_test.go | 146 ++++++++++++++++--------------- nomad/structs/structs.go | 6 ++ nomad/worker_test.go | 1 + scheduler/testing.go | 1 + 10 files changed, 283 insertions(+), 186 deletions(-) diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index 6955182c579..238ee9fde53 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -286,6 +286,73 @@ func TestEvalEndpoint_Dequeue_WaitIndex(t *testing.T) { } } +func TestEvalEndpoint_Dequeue_UpdateWaitIndex(t *testing.T) { + // test enqueing an eval, updating a plan result for the same eval and dequeing the eval + t.Parallel() + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + alloc := mock.Alloc() + job := alloc.Job + alloc.Job = nil + + state := s1.fsm.State() + + if err := state.UpsertJob(999, job); err != nil { + t.Fatalf("err: %v", err) + } + + eval := mock.Eval() + eval.JobID = job.ID + + // Create an eval + if err := state.UpsertEvals(1, []*structs.Evaluation{eval}); err != nil { + t.Fatalf("err: %v", err) + } + + s1.evalBroker.Enqueue(eval) + + // Create a plan result and apply it with a later index + res := structs.ApplyPlanResultsRequest{ + AllocUpdateRequest: structs.AllocUpdateRequest{ + Alloc: []*structs.Allocation{alloc}, + Job: job, + }, + EvalID: eval.ID, + } + assert := assert.New(t) + err := state.UpsertPlanResults(1000, &res) + assert.Nil(err) + + // Dequeue the eval + get := &structs.EvalDequeueRequest{ + Schedulers: defaultSched, + SchedulerVersion: scheduler.SchedulerVersion, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.EvalDequeueResponse + if err := msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure outstanding + token, ok := s1.evalBroker.Outstanding(eval.ID) + if !ok { + t.Fatalf("should be outstanding") + } + if token != resp.Token { + t.Fatalf("bad token: %#v %#v", token, resp.Token) + } + + if resp.WaitIndex != 1000 { + t.Fatalf("bad wait index; got %d; want %d", resp.WaitIndex, 1000) + } +} + func TestEvalEndpoint_Dequeue_Version_Mismatch(t *testing.T) { t.Parallel() s1 := testServer(t, func(c *Config) { diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 4f9051addaf..8414cb75cba 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -1213,6 +1213,10 @@ func TestFSM_ApplyPlanResults(t *testing.T) { alloc.DeploymentID = d.ID + eval := mock.Eval() + eval.JobID = job.ID + fsm.State().UpsertEvals(1, []*structs.Evaluation{eval}) + fsm.State().UpsertJobSummary(1, mock.JobSummary(alloc.JobID)) req := structs.ApplyPlanResultsRequest{ AllocUpdateRequest: structs.AllocUpdateRequest{ @@ -1220,6 +1224,7 @@ func TestFSM_ApplyPlanResults(t *testing.T) { Alloc: []*structs.Allocation{alloc}, }, Deployment: d, + EvalID: eval.ID, } buf, err := structs.Encode(structs.ApplyPlanResultsRequestType, req) if err != nil { @@ -1233,32 +1238,32 @@ func TestFSM_ApplyPlanResults(t *testing.T) { // Verify the allocation is registered ws := memdb.NewWatchSet() + assert := assert.New(t) out, err := fsm.State().AllocByID(ws, alloc.ID) - if err != nil { - t.Fatalf("err: %v", err) - } + assert.Nil(err) alloc.CreateIndex = out.CreateIndex alloc.ModifyIndex = out.ModifyIndex alloc.AllocModifyIndex = out.AllocModifyIndex // Job should be re-attached alloc.Job = job - if !reflect.DeepEqual(alloc, out) { - t.Fatalf("bad: %#v %#v", alloc, out) - } + assert.Equal(alloc, out) dout, err := fsm.State().DeploymentByID(ws, d.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if tg, ok := dout.TaskGroups[alloc.TaskGroup]; !ok || tg.PlacedAllocs != 1 { - t.Fatalf("err: %v %v", tg, err) - } + assert.Nil(err) + tg, ok := dout.TaskGroups[alloc.TaskGroup] + assert.True(ok) + assert.NotNil(tg) + assert.Equal(1, tg.PlacedAllocs) // Ensure that the original job is used evictAlloc := alloc.Copy() job = mock.Job() job.Priority = 123 + eval = mock.Eval() + eval.JobID = job.ID + + fsm.State().UpsertEvals(2, []*structs.Evaluation{eval}) evictAlloc.Job = nil evictAlloc.DesiredStatus = structs.AllocDesiredStatusEvict @@ -1267,28 +1272,28 @@ func TestFSM_ApplyPlanResults(t *testing.T) { Job: job, Alloc: []*structs.Allocation{evictAlloc}, }, + EvalID: eval.ID, } buf, err = structs.Encode(structs.ApplyPlanResultsRequestType, req2) - if err != nil { - t.Fatalf("err: %v", err) - } + assert.Nil(err) - resp = fsm.Apply(makeLog(buf)) - if resp != nil { - t.Fatalf("resp: %v", resp) - } + log := makeLog(buf) + //set the index to something other than 1 + log.Index = 25 + resp = fsm.Apply(log) + assert.Nil(resp) // Verify we are evicted out, err = fsm.State().AllocByID(ws, alloc.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if out.DesiredStatus != structs.AllocDesiredStatusEvict { - t.Fatalf("alloc found!") - } - if out.Job == nil || out.Job.Priority == 123 { - t.Fatalf("bad job") - } + assert.Nil(err) + assert.Equal(structs.AllocDesiredStatusEvict, out.DesiredStatus) + assert.NotNil(out.Job) + assert.NotEqual(123, out.Job.Priority) + + evalOut, err := fsm.State().EvalByID(ws, eval.ID) + assert.Nil(err) + assert.Equal(log.Index, evalOut.ModifyIndex) + } func TestFSM_DeploymentStatusUpdate(t *testing.T) { diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 572b96b81ee..c85abf7c780 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -1088,6 +1088,8 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse) AnnotatePlan: true, } + snap.UpsertEvals(100, []*structs.Evaluation{eval}) + // Create an in-memory Planner that returns no errors and stores the // submitted plan and created evals. planner := &scheduler.Harness{ diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 5d2c29bcfc9..44f78e2c899 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -135,6 +135,7 @@ func (s *Server) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap }, Deployment: result.Deployment, DeploymentUpdates: result.DeploymentUpdates, + EvalID: plan.EvalID, } for _, updateList := range result.NodeUpdate { req.Alloc = append(req.Alloc, updateList...) diff --git a/nomad/plan_apply_test.go b/nomad/plan_apply_test.go index 7d3195d9224..93e44e6175c 100644 --- a/nomad/plan_apply_test.go +++ b/nomad/plan_apply_test.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/hashicorp/raft" + "github.com/stretchr/testify/assert" ) const ( @@ -65,7 +66,7 @@ func TestPlanApply_applyPlan(t *testing.T) { defer s1.Shutdown() testutil.WaitForLeader(t, s1.RPC) - // Register ndoe + // Register node node := mock.Node() testRegisterNode(t, s1, node) @@ -91,6 +92,13 @@ func TestPlanApply_applyPlan(t *testing.T) { // Register alloc, deployment and deployment update alloc := mock.Alloc() s1.State().UpsertJobSummary(1000, mock.JobSummary(alloc.JobID)) + // Create an eval + eval := mock.Eval() + eval.JobID = alloc.JobID + if err := s1.State().UpsertEvals(1, []*structs.Evaluation{eval}); err != nil { + t.Fatalf("err: %v", err) + } + planRes := &structs.PlanResult{ NodeAllocation: map[string][]*structs.Allocation{ node.ID: {alloc}, @@ -110,73 +118,55 @@ func TestPlanApply_applyPlan(t *testing.T) { Job: alloc.Job, Deployment: dnew, DeploymentUpdates: updates, + EvalID: eval.ID, } // Apply the plan future, err := s1.applyPlan(plan, planRes, snap) - if err != nil { - t.Fatalf("err: %v", err) - } + assert := assert.New(t) + assert.Nil(err) // Verify our optimistic snapshot is updated ws := memdb.NewWatchSet() - if out, err := snap.AllocByID(ws, alloc.ID); err != nil || out == nil { - t.Fatalf("bad: %v %v", out, err) - } + allocOut, err := snap.AllocByID(ws, alloc.ID) + assert.Nil(err) + assert.NotNil(allocOut) - if out, err := snap.DeploymentByID(ws, plan.Deployment.ID); err != nil || out == nil { - t.Fatalf("bad: %v %v", out, err) - } + deploymentOut, err := snap.DeploymentByID(ws, plan.Deployment.ID) + assert.Nil(err) + assert.NotNil(deploymentOut) // Check plan does apply cleanly index, err := planWaitFuture(future) - if err != nil { - t.Fatalf("err: %v", err) - } - if index == 0 { - t.Fatalf("bad: %d", index) - } + assert.Nil(err) + assert.NotEqual(0, index) // Lookup the allocation fsmState := s1.fsm.State() - out, err := fsmState.AllocByID(ws, alloc.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if out == nil { - t.Fatalf("missing alloc") - } - - if out.CreateTime <= 0 { - t.Fatalf("invalid create time %v", out.CreateTime) - } - if out.ModifyTime <= 0 { - t.Fatalf("invalid modify time %v", out.CreateTime) - } - if out.CreateTime != out.ModifyTime { - t.Fatalf("create time %v modify time %v must be equal", out.CreateTime, out.ModifyTime) - } + allocOut, err = fsmState.AllocByID(ws, alloc.ID) + assert.Nil(err) + assert.NotNil(allocOut) + assert.True(allocOut.CreateTime > 0) + assert.True(allocOut.ModifyTime > 0) + assert.Equal(allocOut.CreateTime, allocOut.ModifyTime) // Lookup the new deployment dout, err := fsmState.DeploymentByID(ws, plan.Deployment.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if dout == nil { - t.Fatalf("missing deployment") - } + assert.Nil(err) + assert.NotNil(dout) // Lookup the updated deployment dout2, err := fsmState.DeploymentByID(ws, oldDeployment.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if dout2 == nil { - t.Fatalf("missing deployment") - } - if dout2.Status != desiredStatus || dout2.StatusDescription != desiredStatusDescription { - t.Fatalf("bad status: %#v", dout2) - } + assert.Nil(err) + assert.NotNil(dout2) + assert.Equal(desiredStatus, dout2.Status) + assert.Equal(desiredStatusDescription, dout2.StatusDescription) + + // Lookup updated eval + evalOut, err := fsmState.EvalByID(ws, eval.ID) + assert.Nil(err) + assert.NotNil(evalOut) + assert.Equal(index, evalOut.ModifyIndex) // Evict alloc, Register alloc2 allocEvict := new(structs.Allocation) @@ -197,60 +187,43 @@ func TestPlanApply_applyPlan(t *testing.T) { // Snapshot the state snap, err = s1.State().Snapshot() - if err != nil { - t.Fatalf("err: %v", err) - } + assert.Nil(err) // Apply the plan plan = &structs.Plan{ - Job: job, + Job: job, + EvalID: eval.ID, } future, err = s1.applyPlan(plan, planRes, snap) - if err != nil { - t.Fatalf("err: %v", err) - } + assert.Nil(err) // Check that our optimistic view is updated - if out, _ := snap.AllocByID(ws, allocEvict.ID); out.DesiredStatus != structs.AllocDesiredStatusEvict { - t.Fatalf("bad: %#v", out) - } + out, _ := snap.AllocByID(ws, allocEvict.ID) + assert.Equal(structs.AllocDesiredStatusEvict, out.DesiredStatus) // Verify plan applies cleanly index, err = planWaitFuture(future) - if err != nil { - t.Fatalf("err: %v", err) - } - if index == 0 { - t.Fatalf("bad: %d", index) - } + assert.Nil(err) + assert.NotEqual(0, index) // Lookup the allocation - out, err = s1.fsm.State().AllocByID(ws, alloc.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if out.DesiredStatus != structs.AllocDesiredStatusEvict { - t.Fatalf("should be evicted alloc: %#v", out) - } - if out.Job == nil { - t.Fatalf("missing job") - } - - if out.ModifyTime <= 0 { - t.Fatalf("must have valid modify time but was %v", out.ModifyTime) - } + allocOut, err = s1.fsm.State().AllocByID(ws, alloc.ID) + assert.Nil(err) + assert.Equal(structs.AllocDesiredStatusEvict, allocOut.DesiredStatus) + assert.NotNil(allocOut.Job) + assert.True(allocOut.ModifyTime > 0) // Lookup the allocation - out, err = s1.fsm.State().AllocByID(ws, alloc2.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if out == nil { - t.Fatalf("missing alloc") - } - if out.Job == nil { - t.Fatalf("missing job") - } + allocOut, err = s1.fsm.State().AllocByID(ws, alloc2.ID) + assert.Nil(err) + assert.NotNil(allocOut) + assert.NotNil(allocOut.Job) + + // Lookup updated eval + evalOut, err = fsmState.EvalByID(ws, eval.ID) + assert.Nil(err) + assert.NotNil(evalOut) + assert.Equal(index, evalOut.ModifyIndex) } func TestPlanApply_EvalPlan_Simple(t *testing.T) { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 0091055b322..92e9605a735 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -196,6 +196,11 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR return err } + // Update the modify index of the eval id + if err := s.updateEvalModifyIndex(txn, index, results.EvalID); err != nil { + return err + } + txn.Commit() return nil } @@ -1486,6 +1491,36 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct return nil } +// updateEvalModifyIndex is used to update the modify index of an evaluation that has been +// through a scheduler pass. This is done as part of plan apply. It ensures that when a subsequent +// scheduler workers process a re-queued evaluation it sees any partial updates from the plan apply. +func (s *StateStore) updateEvalModifyIndex(txn *memdb.Txn, index uint64, evalID string) error { + // Lookup the evaluation + existing, err := txn.First("evals", "id", evalID) + if err != nil { + return fmt.Errorf("eval lookup failed: %v", err) + } + if existing == nil { + // return if there isn't an eval with this ID. + // In some cases (like snapshot restores), we process evals that are not already in the state store. + s.logger.Printf("[WARN] state_store: unable to find eval ID %v, cannot update modify index ", evalID) + return nil + } + eval := existing.(*structs.Evaluation).Copy() + // Update the indexes + eval.CreateIndex = existing.(*structs.Evaluation).CreateIndex + eval.ModifyIndex = index + + // Insert the eval + if err := txn.Insert("evals", eval); err != nil { + return fmt.Errorf("eval insert failed: %v", err) + } + if err := txn.Insert("index", &IndexEntry{"evals", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + return nil +} + // DeleteEval is used to delete an evaluation func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) error { txn := s.db.Txn(true) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 38081ccfced..6c0a5b567e0 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -100,40 +100,43 @@ func TestStateStore_UpsertPlanResults_AllocationsCreated_Denormalized(t *testing t.Fatalf("err: %v", err) } + eval := mock.Eval() + eval.JobID = job.ID + + // Create an eval + if err := state.UpsertEvals(1, []*structs.Evaluation{eval}); err != nil { + t.Fatalf("err: %v", err) + } + // Create a plan result res := structs.ApplyPlanResultsRequest{ AllocUpdateRequest: structs.AllocUpdateRequest{ Alloc: []*structs.Allocation{alloc}, Job: job, }, + EvalID: eval.ID, } - + assert := assert.New(t) err := state.UpsertPlanResults(1000, &res) - if err != nil { - t.Fatalf("err: %v", err) - } + assert.Nil(err) ws := memdb.NewWatchSet() out, err := state.AllocByID(ws, alloc.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - - if !reflect.DeepEqual(alloc, out) { - t.Fatalf("bad: %#v %#v", alloc, out) - } + assert.Nil(err) + assert.Equal(alloc, out) index, err := state.Index("allocs") - if err != nil { - t.Fatalf("err: %v", err) - } - if index != 1000 { - t.Fatalf("bad: %d", index) - } + assert.Nil(err) + assert.Equal(uint64(1000), index) if watchFired(ws) { t.Fatalf("bad") } + + evalOut, err := state.EvalByID(ws, eval.ID) + assert.Nil(err) + assert.NotNil(evalOut) + assert.Equal(uint64(1000), evalOut.ModifyIndex) } // This test checks that the deployment is created and allocations count towards @@ -154,6 +157,14 @@ func TestStateStore_UpsertPlanResults_Deployment(t *testing.T) { t.Fatalf("err: %v", err) } + eval := mock.Eval() + eval.JobID = job.ID + + // Create an eval + if err := state.UpsertEvals(1, []*structs.Evaluation{eval}); err != nil { + t.Fatalf("err: %v", err) + } + // Create a plan result res := structs.ApplyPlanResultsRequest{ AllocUpdateRequest: structs.AllocUpdateRequest{ @@ -161,6 +172,7 @@ func TestStateStore_UpsertPlanResults_Deployment(t *testing.T) { Job: job, }, Deployment: d, + EvalID: eval.ID, } err := state.UpsertPlanResults(1000, &res) @@ -169,31 +181,24 @@ func TestStateStore_UpsertPlanResults_Deployment(t *testing.T) { } ws := memdb.NewWatchSet() + assert := assert.New(t) out, err := state.AllocByID(ws, alloc.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - - if !reflect.DeepEqual(alloc, out) { - t.Fatalf("bad: %#v %#v", alloc, out) - } + assert.Nil(err) + assert.Equal(alloc, out) dout, err := state.DeploymentByID(ws, d.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - - if dout == nil { - t.Fatalf("bad: nil deployment") - } + assert.Nil(err) + assert.NotNil(dout) tg, ok := dout.TaskGroups[alloc.TaskGroup] - if !ok { - t.Fatalf("bad: nil deployment state") - } - if tg == nil || tg.PlacedAllocs != 2 { - t.Fatalf("bad: %v", dout) - } + assert.True(ok) + assert.NotNil(tg) + assert.Equal(2, tg.PlacedAllocs) + + evalOut, err := state.EvalByID(ws, eval.ID) + assert.Nil(err) + assert.NotNil(evalOut) + assert.Equal(uint64(1000), evalOut.ModifyIndex) if watchFired(ws) { t.Fatalf("bad") @@ -215,6 +220,7 @@ func TestStateStore_UpsertPlanResults_Deployment(t *testing.T) { Job: job, }, Deployment: d2, + EvalID: eval.ID, } err = state.UpsertPlanResults(1001, &res) @@ -223,21 +229,18 @@ func TestStateStore_UpsertPlanResults_Deployment(t *testing.T) { } dout, err = state.DeploymentByID(ws, d2.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - - if dout == nil { - t.Fatalf("bad: nil deployment") - } + assert.Nil(err) + assert.NotNil(dout) tg, ok = dout.TaskGroups[alloc.TaskGroup] - if !ok { - t.Fatalf("bad: nil deployment state") - } - if tg == nil || tg.PlacedAllocs != 2 { - t.Fatalf("bad: %v", dout) - } + assert.True(ok) + assert.NotNil(tg) + assert.Equal(2, tg.PlacedAllocs) + + evalOut, err = state.EvalByID(ws, eval.ID) + assert.Nil(err) + assert.NotNil(evalOut) + assert.Equal(uint64(1001), evalOut.ModifyIndex) } // This test checks that deployment updates are applied correctly @@ -258,6 +261,13 @@ func TestStateStore_UpsertPlanResults_DeploymentUpdates(t *testing.T) { t.Fatalf("err: %v", err) } + eval := mock.Eval() + eval.JobID = job.ID + + // Create an eval + if err := state.UpsertEvals(1, []*structs.Evaluation{eval}); err != nil { + t.Fatalf("err: %v", err) + } alloc := mock.Alloc() alloc.Job = nil @@ -280,41 +290,37 @@ func TestStateStore_UpsertPlanResults_DeploymentUpdates(t *testing.T) { }, Deployment: dnew, DeploymentUpdates: []*structs.DeploymentStatusUpdate{update}, + EvalID: eval.ID, } err := state.UpsertPlanResults(1000, &res) if err != nil { t.Fatalf("err: %v", err) } - + assert := assert.New(t) ws := memdb.NewWatchSet() // Check the deployments are correctly updated. dout, err := state.DeploymentByID(ws, dnew.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - - if dout == nil { - t.Fatalf("bad: nil deployment") - } + assert.Nil(err) + assert.NotNil(dout) tg, ok := dout.TaskGroups[alloc.TaskGroup] - if !ok { - t.Fatalf("bad: nil deployment state") - } - if tg == nil || tg.PlacedAllocs != 1 { - t.Fatalf("bad: %v", dout) - } + assert.True(ok) + assert.NotNil(tg) + assert.Equal(1, tg.PlacedAllocs) doutstandingout, err := state.DeploymentByID(ws, doutstanding.ID) - if err != nil || doutstandingout == nil { - t.Fatalf("bad: %v %v", err, doutstandingout) - } - if doutstandingout.Status != update.Status || doutstandingout.StatusDescription != update.StatusDescription || doutstandingout.ModifyIndex != 1000 { - t.Fatalf("bad: %v", doutstandingout) - } + assert.Nil(err) + assert.NotNil(doutstandingout) + assert.Equal(update.Status, doutstandingout.Status) + assert.Equal(update.StatusDescription, doutstandingout.StatusDescription) + assert.Equal(uint64(1000), doutstandingout.ModifyIndex) + evalOut, err := state.EvalByID(ws, eval.ID) + assert.Nil(err) + assert.NotNil(evalOut) + assert.Equal(uint64(1000), evalOut.ModifyIndex) if watchFired(ws) { t.Fatalf("bad") } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index b15f3881110..ea5fe56902c 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -514,6 +514,12 @@ type ApplyPlanResultsRequest struct { // deployments. This allows the scheduler to cancel any unneeded deployment // because the job is stopped or the update block is removed. DeploymentUpdates []*DeploymentStatusUpdate + + // EvalID is the eval ID of the plan being applied. We also update the modify + // index of the eval ID as part of applying plan results. This is to ensure that + // other workers that are dequeing evaluations don't miss updates that can affect + // scheduling decisions. + EvalID string } // AllocUpdateRequest is used to submit changes to allocations, either diff --git a/nomad/worker_test.go b/nomad/worker_test.go index 627887e3148..faa9cc104e6 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -341,6 +341,7 @@ func TestWorker_SubmitPlan(t *testing.T) { eval1 := mock.Eval() eval1.JobID = job.ID s1.fsm.State().UpsertJob(1000, job) + s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1}) // Create the register request s1.evalBroker.Enqueue(eval1) diff --git a/scheduler/testing.go b/scheduler/testing.go index fb631d44417..a04b99ce860 100644 --- a/scheduler/testing.go +++ b/scheduler/testing.go @@ -122,6 +122,7 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er }, Deployment: plan.Deployment, DeploymentUpdates: plan.DeploymentUpdates, + EvalID: plan.EvalID, } // Apply the full plan From aa35b5b9f21f53b3a054d23831e1f33c3c062e61 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Mon, 18 Dec 2017 14:55:36 -0600 Subject: [PATCH 2/5] Return an error if evaluation doesn't exist in state store at plan apply time. --- nomad/fsm.go | 4 +- nomad/state/state_store.go | 5 +- scheduler/generic_sched_test.go | 106 +++++++++++++++++++++++++++++++- scheduler/system_sched_test.go | 55 +++++++++++++++-- 4 files changed, 158 insertions(+), 12 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index 7d0ef4fc41a..0a004c83686 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -1104,7 +1104,7 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { return nil } -// reconcileSummaries re-calculates the queued allocations for every job that we +// reconcileQueuedAllocations re-calculates the queued allocations for every job that we // created a Job Summary during the snap shot restore func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error { // Get all the jobs @@ -1142,7 +1142,7 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error { Status: structs.EvalStatusPending, AnnotatePlan: true, } - + snap.UpsertEvals(100, []*structs.Evaluation{eval}) // Create the scheduler and run it sched, err := scheduler.NewScheduler(eval.Type, n.logger, snap, planner) if err != nil { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 92e9605a735..98e1d5580db 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1501,10 +1501,7 @@ func (s *StateStore) updateEvalModifyIndex(txn *memdb.Txn, index uint64, evalID return fmt.Errorf("eval lookup failed: %v", err) } if existing == nil { - // return if there isn't an eval with this ID. - // In some cases (like snapshot restores), we process evals that are not already in the state store. - s.logger.Printf("[WARN] state_store: unable to find eval ID %v, cannot update modify index ", evalID) - return nil + return fmt.Errorf("[ERR] state_store: unable to find eval id %q", evalID) } eval := existing.(*structs.Evaluation).Copy() // Update the indexes diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 02296ff76a8..0e88b4255c8 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -35,8 +35,11 @@ func TestServiceSched_JobRegister(t *testing.T) { Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + // Process the evaluation err := h.Process(NewServiceScheduler, eval) if err != nil { @@ -118,7 +121,9 @@ func TestServiceSched_JobRegister_StickyAllocs(t *testing.T) { Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation if err := h.Process(NewServiceScheduler, eval); err != nil { @@ -149,7 +154,9 @@ func TestServiceSched_JobRegister_StickyAllocs(t *testing.T) { Priority: job.Priority, TriggeredBy: structs.EvalTriggerNodeUpdate, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) h1 := NewHarnessWithState(t, h.State) if err := h1.Process(NewServiceScheduler, eval); err != nil { t.Fatalf("err: %v", err) @@ -206,8 +213,11 @@ func TestServiceSched_JobRegister_DiskConstraints(t *testing.T) { Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + // Process the evaluation err := h.Process(NewServiceScheduler, eval) if err != nil { @@ -275,8 +285,11 @@ func TestServiceSched_JobRegister_DistinctHosts(t *testing.T) { Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + // Process the evaluation err := h.Process(NewServiceScheduler, eval) if err != nil { @@ -364,8 +377,11 @@ func TestServiceSched_JobRegister_DistinctProperty(t *testing.T) { Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + // Process the evaluation err := h.Process(NewServiceScheduler, eval) if err != nil { @@ -456,7 +472,9 @@ func TestServiceSched_JobRegister_DistinctProperty_TaskGroup(t *testing.T) { Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewServiceScheduler, eval) @@ -548,7 +566,9 @@ func TestServiceSched_JobRegister_DistinctProperty_TaskGroup_Incr(t *testing.T) Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation assert.Nil(h.Process(NewServiceScheduler, eval), "Process") @@ -602,7 +622,9 @@ func TestServiceSched_JobRegister_Annotate(t *testing.T) { TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, AnnotatePlan: true, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewServiceScheduler, eval) @@ -679,7 +701,9 @@ func TestServiceSched_JobRegister_CountZero(t *testing.T) { Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewServiceScheduler, eval) @@ -720,8 +744,11 @@ func TestServiceSched_JobRegister_AllocFail(t *testing.T) { Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + // Process the evaluation err := h.Process(NewServiceScheduler, eval) if err != nil { @@ -802,8 +829,11 @@ func TestServiceSched_JobRegister_CreateBlockedEval(t *testing.T) { Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + // Process the evaluation err := h.Process(NewServiceScheduler, eval) if err != nil { @@ -899,8 +929,9 @@ func TestServiceSched_JobRegister_FeasibleAndInfeasibleTG(t *testing.T) { Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } - + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewServiceScheduler, eval) if err != nil { @@ -1016,8 +1047,11 @@ func TestServiceSched_Plan_Partial_Progress(t *testing.T) { Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + // Process the evaluation err := h.Process(NewServiceScheduler, eval) if err != nil { @@ -1245,7 +1279,9 @@ func TestServiceSched_JobModify(t *testing.T) { Priority: 50, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewServiceScheduler, eval) @@ -1329,7 +1365,9 @@ func TestServiceSched_JobModify_IncrCount_NodeLimit(t *testing.T) { Priority: 50, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewServiceScheduler, eval) @@ -1436,7 +1474,9 @@ func TestServiceSched_JobModify_CountZero(t *testing.T) { Priority: 50, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewServiceScheduler, eval) @@ -1530,7 +1570,9 @@ func TestServiceSched_JobModify_Rolling(t *testing.T) { Priority: 50, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewServiceScheduler, eval) @@ -1635,7 +1677,9 @@ func TestServiceSched_JobModify_Rolling_FullNode(t *testing.T) { Priority: 50, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewServiceScheduler, eval) @@ -1736,7 +1780,9 @@ func TestServiceSched_JobModify_Canaries(t *testing.T) { Priority: 50, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewServiceScheduler, eval) @@ -1844,7 +1890,9 @@ func TestServiceSched_JobModify_InPlace(t *testing.T) { Priority: 50, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewServiceScheduler, eval) @@ -1958,7 +2006,9 @@ func TestServiceSched_JobModify_DistinctProperty(t *testing.T) { Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewServiceScheduler, eval) @@ -2044,7 +2094,9 @@ func TestServiceSched_JobDeregister_Purged(t *testing.T) { Priority: 50, TriggeredBy: structs.EvalTriggerJobDeregister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewServiceScheduler, eval) @@ -2111,7 +2163,9 @@ func TestServiceSched_JobDeregister_Stopped(t *testing.T) { Priority: 50, TriggeredBy: structs.EvalTriggerJobDeregister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewServiceScheduler, eval) @@ -2201,7 +2255,9 @@ func TestServiceSched_NodeDown(t *testing.T) { TriggeredBy: structs.EvalTriggerNodeUpdate, JobID: job.ID, NodeID: node.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewServiceScheduler, eval) @@ -2267,7 +2323,9 @@ func TestServiceSched_NodeUpdate(t *testing.T) { TriggeredBy: structs.EvalTriggerNodeUpdate, JobID: job.ID, NodeID: node.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewServiceScheduler, eval) @@ -2318,7 +2376,9 @@ func TestServiceSched_NodeDrain(t *testing.T) { TriggeredBy: structs.EvalTriggerNodeUpdate, JobID: job.ID, NodeID: node.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewServiceScheduler, eval) @@ -2419,8 +2479,11 @@ func TestServiceSched_NodeDrain_Down(t *testing.T) { TriggeredBy: structs.EvalTriggerNodeUpdate, JobID: job.ID, NodeID: node.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + // Process the evaluation err := h.Process(NewServiceScheduler, eval) if err != nil { @@ -2493,7 +2556,9 @@ func TestServiceSched_NodeDrain_Queued_Allocations(t *testing.T) { TriggeredBy: structs.EvalTriggerNodeUpdate, JobID: job.ID, NodeID: node.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewServiceScheduler, eval) @@ -2550,7 +2615,9 @@ func TestServiceSched_NodeDrain_UpdateStrategy(t *testing.T) { TriggeredBy: structs.EvalTriggerNodeUpdate, JobID: job.ID, NodeID: node.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewServiceScheduler, eval) @@ -2608,7 +2675,9 @@ func TestServiceSched_RetryLimit(t *testing.T) { Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewServiceScheduler, eval) @@ -2664,7 +2733,9 @@ func TestBatchSched_Run_CompleteAlloc(t *testing.T) { Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewBatchScheduler, eval) @@ -2719,7 +2790,9 @@ func TestBatchSched_Run_FailedAlloc(t *testing.T) { Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewBatchScheduler, eval) @@ -2781,7 +2854,9 @@ func TestBatchSched_Run_FailedAllocQueuedAllocations(t *testing.T) { Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewBatchScheduler, eval) @@ -2841,7 +2916,9 @@ func TestBatchSched_ReRun_SuccessfullyFinishedAlloc(t *testing.T) { Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewBatchScheduler, eval) @@ -2904,7 +2981,9 @@ func TestBatchSched_JobModify_InPlace_Terminal(t *testing.T) { Priority: 50, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewBatchScheduler, eval) @@ -2985,7 +3064,9 @@ func TestBatchSched_JobModify_Destructive_Terminal(t *testing.T) { Priority: 50, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewBatchScheduler, eval) @@ -3039,8 +3120,11 @@ func TestBatchSched_NodeDrain_Running_OldJob(t *testing.T) { Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + // Process the evaluation err := h.Process(NewBatchScheduler, eval) if err != nil { @@ -3102,8 +3186,11 @@ func TestBatchSched_NodeDrain_Complete(t *testing.T) { Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + // Process the evaluation err := h.Process(NewBatchScheduler, eval) if err != nil { @@ -3154,8 +3241,11 @@ func TestBatchSched_ScaleDown_SameName(t *testing.T) { Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + // Process the evaluation err := h.Process(NewBatchScheduler, eval) if err != nil { @@ -3197,7 +3287,9 @@ func TestGenericSched_ChainedAlloc(t *testing.T) { Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation if err := h.Process(NewServiceScheduler, eval); err != nil { t.Fatalf("err: %v", err) @@ -3226,7 +3318,10 @@ func TestGenericSched_ChainedAlloc(t *testing.T) { Priority: job1.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job1.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval1})) + // Process the evaluation if err := h1.Process(NewServiceScheduler, eval1); err != nil { t.Fatalf("err: %v", err) @@ -3287,8 +3382,11 @@ func TestServiceSched_NodeDrain_Sticky(t *testing.T) { TriggeredBy: structs.EvalTriggerNodeUpdate, JobID: alloc.Job.ID, NodeID: node.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + // Process the evaluation err := h.Process(NewServiceScheduler, eval) if err != nil { @@ -3344,8 +3442,11 @@ func TestServiceSched_CancelDeployment_Stopped(t *testing.T) { Priority: 50, TriggeredBy: structs.EvalTriggerJobDeregister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + // Process the evaluation err := h.Process(NewServiceScheduler, eval) if err != nil { @@ -3413,8 +3514,11 @@ func TestServiceSched_CancelDeployment_NewerJob(t *testing.T) { Priority: 50, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + // Process the evaluation err := h.Process(NewServiceScheduler, eval) if err != nil { diff --git a/scheduler/system_sched_test.go b/scheduler/system_sched_test.go index be43026a1a5..2e05355003e 100644 --- a/scheduler/system_sched_test.go +++ b/scheduler/system_sched_test.go @@ -32,7 +32,9 @@ func TestSystemSched_JobRegister(t *testing.T) { Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewSystemScheduler, eval) @@ -105,7 +107,9 @@ func TestSystemeSched_JobRegister_StickyAllocs(t *testing.T) { Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation if err := h.Process(NewSystemScheduler, eval); err != nil { @@ -134,7 +138,9 @@ func TestSystemeSched_JobRegister_StickyAllocs(t *testing.T) { Priority: job.Priority, TriggeredBy: structs.EvalTriggerNodeUpdate, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) h1 := NewHarnessWithState(t, h.State) if err := h1.Process(NewSystemScheduler, eval); err != nil { t.Fatalf("err: %v", err) @@ -181,7 +187,9 @@ func TestSystemSched_JobRegister_EphemeralDiskConstraint(t *testing.T) { Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation if err := h.Process(NewSystemScheduler, eval); err != nil { @@ -207,7 +215,9 @@ func TestSystemSched_JobRegister_EphemeralDiskConstraint(t *testing.T) { Priority: job1.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job1.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation if err := h1.Process(NewSystemScheduler, eval1); err != nil { @@ -241,8 +251,9 @@ func TestSystemSched_ExhaustResources(t *testing.T) { Priority: svcJob.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: svcJob.ID, + Status: structs.EvalStatusPending, } - + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewServiceScheduler, eval) if err != nil { @@ -260,8 +271,9 @@ func TestSystemSched_ExhaustResources(t *testing.T) { Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } - + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation if err := h.Process(NewSystemScheduler, eval1); err != nil { t.Fatalf("err: %v", err) @@ -307,7 +319,9 @@ func TestSystemSched_JobRegister_Annotate(t *testing.T) { TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, AnnotatePlan: true, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewSystemScheduler, eval) @@ -405,8 +419,9 @@ func TestSystemSched_JobRegister_AddNode(t *testing.T) { Priority: 50, TriggeredBy: structs.EvalTriggerNodeUpdate, JobID: job.ID, + Status: structs.EvalStatusPending, } - + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewSystemScheduler, eval) if err != nil { @@ -472,8 +487,9 @@ func TestSystemSched_JobRegister_AllocFail(t *testing.T) { Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } - + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewSystemScheduler, eval) if err != nil { @@ -542,7 +558,9 @@ func TestSystemSched_JobModify(t *testing.T) { Priority: 50, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewSystemScheduler, eval) @@ -633,8 +651,9 @@ func TestSystemSched_JobModify_Rolling(t *testing.T) { Priority: 50, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } - + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewSystemScheduler, eval) if err != nil { @@ -728,7 +747,9 @@ func TestSystemSched_JobModify_InPlace(t *testing.T) { Priority: 50, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewSystemScheduler, eval) @@ -822,7 +843,9 @@ func TestSystemSched_JobDeregister_Purged(t *testing.T) { Priority: 50, TriggeredBy: structs.EvalTriggerJobDeregister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewSystemScheduler, eval) @@ -894,7 +917,9 @@ func TestSystemSched_JobDeregister_Stopped(t *testing.T) { Priority: 50, TriggeredBy: structs.EvalTriggerJobDeregister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewSystemScheduler, eval) @@ -956,7 +981,9 @@ func TestSystemSched_NodeDown(t *testing.T) { TriggeredBy: structs.EvalTriggerNodeUpdate, JobID: job.ID, NodeID: node.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewSystemScheduler, eval) @@ -1021,7 +1048,9 @@ func TestSystemSched_NodeDrain_Down(t *testing.T) { TriggeredBy: structs.EvalTriggerNodeUpdate, JobID: job.ID, NodeID: node.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewServiceScheduler, eval) @@ -1080,7 +1109,9 @@ func TestSystemSched_NodeDrain(t *testing.T) { TriggeredBy: structs.EvalTriggerNodeUpdate, JobID: job.ID, NodeID: node.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewSystemScheduler, eval) @@ -1143,7 +1174,9 @@ func TestSystemSched_NodeUpdate(t *testing.T) { TriggeredBy: structs.EvalTriggerNodeUpdate, JobID: job.ID, NodeID: node.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewSystemScheduler, eval) @@ -1180,7 +1213,9 @@ func TestSystemSched_RetryLimit(t *testing.T) { Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewSystemScheduler, eval) @@ -1230,7 +1265,9 @@ func TestSystemSched_Queued_With_Constraints(t *testing.T) { TriggeredBy: structs.EvalTriggerNodeUpdate, JobID: job.ID, NodeID: node.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewSystemScheduler, eval) @@ -1264,7 +1301,9 @@ func TestSystemSched_ChainedAlloc(t *testing.T) { Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation if err := h.Process(NewSystemScheduler, eval); err != nil { t.Fatalf("err: %v", err) @@ -1299,7 +1338,9 @@ func TestSystemSched_ChainedAlloc(t *testing.T) { Priority: job1.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job1.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval1})) // Process the evaluation if err := h1.Process(NewSystemScheduler, eval1); err != nil { t.Fatalf("err: %v", err) @@ -1389,7 +1430,9 @@ func TestSystemSched_PlanWithDrainedNode(t *testing.T) { TriggeredBy: structs.EvalTriggerNodeUpdate, JobID: job.ID, NodeID: node.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewSystemScheduler, eval) @@ -1460,7 +1503,9 @@ func TestSystemSched_QueuedAllocsMultTG(t *testing.T) { TriggeredBy: structs.EvalTriggerNodeUpdate, JobID: job.ID, NodeID: node.ID, + Status: structs.EvalStatusPending, } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) // Process the evaluation err := h.Process(NewSystemScheduler, eval) From a49db955f4f273a7adc2f2a28a7fe57c21c3570f Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Mon, 18 Dec 2017 15:13:16 -0600 Subject: [PATCH 3/5] Address some code review comments --- nomad/eval_endpoint_test.go | 2 +- nomad/state/state_store.go | 1 - nomad/state/state_store_test.go | 12 ++++++------ nomad/structs/structs.go | 10 ++++++---- 4 files changed, 13 insertions(+), 12 deletions(-) diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index 238ee9fde53..ea0c42a019c 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -287,7 +287,7 @@ func TestEvalEndpoint_Dequeue_WaitIndex(t *testing.T) { } func TestEvalEndpoint_Dequeue_UpdateWaitIndex(t *testing.T) { - // test enqueing an eval, updating a plan result for the same eval and dequeing the eval + // test enqueueing an eval, updating a plan result for the same eval and de-queueing the eval t.Parallel() s1 := testServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 98e1d5580db..508d1e236a8 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1505,7 +1505,6 @@ func (s *StateStore) updateEvalModifyIndex(txn *memdb.Txn, index uint64, evalID } eval := existing.(*structs.Evaluation).Copy() // Update the indexes - eval.CreateIndex = existing.(*structs.Evaluation).CreateIndex eval.ModifyIndex = index // Insert the eval diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 6c0a5b567e0..08179bdfd2b 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -127,7 +127,7 @@ func TestStateStore_UpsertPlanResults_AllocationsCreated_Denormalized(t *testing index, err := state.Index("allocs") assert.Nil(err) - assert.Equal(uint64(1000), index) + assert.EqualValues(1000, index) if watchFired(ws) { t.Fatalf("bad") @@ -136,7 +136,7 @@ func TestStateStore_UpsertPlanResults_AllocationsCreated_Denormalized(t *testing evalOut, err := state.EvalByID(ws, eval.ID) assert.Nil(err) assert.NotNil(evalOut) - assert.Equal(uint64(1000), evalOut.ModifyIndex) + assert.EqualValues(1000, evalOut.ModifyIndex) } // This test checks that the deployment is created and allocations count towards @@ -198,7 +198,7 @@ func TestStateStore_UpsertPlanResults_Deployment(t *testing.T) { evalOut, err := state.EvalByID(ws, eval.ID) assert.Nil(err) assert.NotNil(evalOut) - assert.Equal(uint64(1000), evalOut.ModifyIndex) + assert.EqualValues(1000, evalOut.ModifyIndex) if watchFired(ws) { t.Fatalf("bad") @@ -240,7 +240,7 @@ func TestStateStore_UpsertPlanResults_Deployment(t *testing.T) { evalOut, err = state.EvalByID(ws, eval.ID) assert.Nil(err) assert.NotNil(evalOut) - assert.Equal(uint64(1001), evalOut.ModifyIndex) + assert.EqualValues(1001, evalOut.ModifyIndex) } // This test checks that deployment updates are applied correctly @@ -315,12 +315,12 @@ func TestStateStore_UpsertPlanResults_DeploymentUpdates(t *testing.T) { assert.NotNil(doutstandingout) assert.Equal(update.Status, doutstandingout.Status) assert.Equal(update.StatusDescription, doutstandingout.StatusDescription) - assert.Equal(uint64(1000), doutstandingout.ModifyIndex) + assert.EqualValues(1000, doutstandingout.ModifyIndex) evalOut, err := state.EvalByID(ws, eval.ID) assert.Nil(err) assert.NotNil(evalOut) - assert.Equal(uint64(1000), evalOut.ModifyIndex) + assert.EqualValues(1000, evalOut.ModifyIndex) if watchFired(ws) { t.Fatalf("bad") } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index ea5fe56902c..439e6a85889 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -515,10 +515,12 @@ type ApplyPlanResultsRequest struct { // because the job is stopped or the update block is removed. DeploymentUpdates []*DeploymentStatusUpdate - // EvalID is the eval ID of the plan being applied. We also update the modify - // index of the eval ID as part of applying plan results. This is to ensure that - // other workers that are dequeing evaluations don't miss updates that can affect - // scheduling decisions. + // EvalID is the eval ID of the plan being applied. The modify index of the + // evaluation is updated as part of applying the plan to ensure that subsequent + // scheduling events for the same job will wait for the index that last produced + // state changes. This is necessary for blocked evaluations since they can be + // processed many times, potentially making state updates, without the state of + // the evaluation itself being updated. EvalID string } From 0401c2247b66e503e8b6242728ac1d7af75c8f21 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 18 Dec 2017 15:51:35 -0800 Subject: [PATCH 4/5] Handle upgrade path --- nomad/state/state_store.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 508d1e236a8..6dee12fee8d 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -196,9 +196,14 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR return err } - // Update the modify index of the eval id - if err := s.updateEvalModifyIndex(txn, index, results.EvalID); err != nil { - return err + // COMPAT: Nomad versions before 0.7.1 did not include the eval ID when + // applying the plan. Thus while we are upgrading, we ignore updating the + // modify index of evaluations from older plans. + if results.EvalID != "" { + // Update the modify index of the eval id + if err := s.updateEvalModifyIndex(txn, index, results.EvalID); err != nil { + return err + } } txn.Commit() From 039942f24b1ee49f1360c0f5310c68399b9f00bd Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Mon, 18 Dec 2017 17:56:12 -0600 Subject: [PATCH 5/5] Clean up error logging --- nomad/state/state_store.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 6dee12fee8d..f837a6106e5 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1506,7 +1506,9 @@ func (s *StateStore) updateEvalModifyIndex(txn *memdb.Txn, index uint64, evalID return fmt.Errorf("eval lookup failed: %v", err) } if existing == nil { - return fmt.Errorf("[ERR] state_store: unable to find eval id %q", evalID) + err := fmt.Errorf("unable to find eval id %q", evalID) + s.logger.Printf("[ERR] state_store: %v", err) + return err } eval := existing.(*structs.Evaluation).Copy() // Update the indexes