diff --git a/.changelog/15808.txt b/.changelog/15808.txt new file mode 100644 index 00000000000..36e2026b5f6 --- /dev/null +++ b/.changelog/15808.txt @@ -0,0 +1,3 @@ +```release-note:bug +core: enforce strict ordering that node status updates are recorded after allocation updates for reconnecting clients +``` diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 9fcdb9f4d23..6ea142c0198 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -1219,6 +1219,7 @@ func TestFSM_UpdateAllocFromClient_Unblock(t *testing.T) { clientAlloc.ClientStatus = structs.AllocClientStatusComplete update2 := &structs.Allocation{ ID: alloc2.ID, + NodeID: alloc2.NodeID, ClientStatus: structs.AllocClientStatusRunning, } diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 0151e62719b..b95f17ab993 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -157,11 +157,17 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp return err } - // Check if the SecretID has been tampered with if originalNode != nil { + // Check if the SecretID has been tampered with if args.Node.SecretID != originalNode.SecretID && originalNode.SecretID != "" { return fmt.Errorf("node secret ID does not match. Not registering node.") } + + // Don't allow the Register method to update the node status. Only the + // UpdateStatus method should be able to do this. + if originalNode.Status != "" { + args.Node.Status = originalNode.Status + } } // We have a valid node connection, so add the mapping to cache the @@ -433,7 +439,24 @@ func (n *Node) deregister(args *structs.NodeBatchDeregisterRequest, return nil } -// UpdateStatus is used to update the status of a client node +// UpdateStatus is used to update the status of a client node. +// +// Clients with non-terminal allocations must first call UpdateAlloc to be able +// to transition from the initializing status to ready. +// +// ┌────────────────────────────────────── No ───┐ +// │ │ +// ┌──▼───┐ ┌─────────────┐ ┌────────┴────────┐ +// ── Register ─► init ├─ ready ──► Has allocs? ├─ Yes ─► Allocs updated? │ +// └──▲───┘ └─────┬───────┘ └────────┬────────┘ +// │ │ │ +// ready └─ No ─┐ ┌─────── Yes ──┘ +// │ │ │ +// ┌──────┴───────┐ ┌──▼──▼─┐ ┌──────┐ +// │ disconnected ◄─ disconnected ─┤ ready ├─ down ──► down │ +// └──────────────┘ └───▲───┘ └──┬───┘ +// │ │ +// └──── ready ─────┘ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *structs.NodeUpdateResponse) error { isForwarded := args.IsForwarded() if done, err := n.srv.forward("Node.UpdateStatus", args, args, reply); done { @@ -486,6 +509,26 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct // Update the timestamp of when the node status was updated args.UpdatedAt = time.Now().Unix() + // Compute next status. + switch node.Status { + case structs.NodeStatusInit: + if args.Status == structs.NodeStatusReady { + allocs, err := snap.AllocsByNodeTerminal(ws, args.NodeID, false) + if err != nil { + return fmt.Errorf("failed to query node allocs: %v", err) + } + + allocsUpdated := node.LastAllocUpdateIndex > node.LastMissedHeartbeatIndex + if len(allocs) > 0 && !allocsUpdated { + args.Status = structs.NodeStatusInit + } + } + case structs.NodeStatusDisconnected: + if args.Status == structs.NodeStatusReady { + args.Status = structs.NodeStatusInit + } + } + // Commit this update via Raft var index uint64 if node.Status != args.Status { @@ -1146,8 +1189,11 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, // UpdateAlloc is used to update the client status of an allocation. It should // only be called by clients. // -// Clients must first register and heartbeat successfully before they are able -// to call this method. +// Calling this method returns an error when: +// - The node is not registered in the server yet. Clients must first call the +// Register method. +// - The node status is down or disconnected. Clients must call the +// UpdateStatus method to update its status in the server. func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.GenericResponse) error { // Ensure the connection was initiated by another client if TLS is used. err := validateTLSCertificateLevel(n.srv, n.ctx, tlsCertificateLevelClient) @@ -1179,8 +1225,8 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene if node == nil { return fmt.Errorf("node %s not found", nodeID) } - if node.Status != structs.NodeStatusReady { - return fmt.Errorf("node %s is %s, not %s", nodeID, node.Status, structs.NodeStatusReady) + if node.UnresponsiveStatus() { + return fmt.Errorf("node %s is not allowed to update allocs while in status %s", nodeID, node.Status) } // Ensure that evals aren't set from client RPCs diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 32aa966b5cd..2793f83c9d7 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -1,6 +1,7 @@ package nomad import ( + "context" "errors" "fmt" "net" @@ -23,6 +24,7 @@ import ( "github.com/hashicorp/nomad/testutil" vapi "github.com/hashicorp/vault/api" "github.com/kr/pretty" + "github.com/shoenig/test/must" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -524,6 +526,193 @@ func TestClientEndpoint_UpdateStatus_Vault(t *testing.T) { } } +func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) { + ci.Parallel(t) + + // Setup server with tighter heartbeat so we don't have to wait so long + // for nodes to go down. + heartbeatTTL := time.Duration(500*testutil.TestMultiplier()) * time.Millisecond + s, cleanupS := TestServer(t, func(c *Config) { + c.MinHeartbeatTTL = heartbeatTTL + c.HeartbeatGrace = 2 * heartbeatTTL + }) + codec := rpcClient(t, s) + defer cleanupS() + testutil.WaitForLeader(t, s.RPC) + + // Register node. + node := mock.Node() + reg := &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var nodeUpdateResp structs.NodeUpdateResponse + err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &nodeUpdateResp) + must.NoError(t, err) + + // Start heartbeat. + heartbeat := func(ctx context.Context) { + ticker := time.NewTicker(heartbeatTTL / 2) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if t.Failed() { + return + } + + req := &structs.NodeUpdateStatusRequest{ + NodeID: node.ID, + Status: structs.NodeStatusReady, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.NodeUpdateResponse + // Ignore errors since an unexpected failed heartbeat will cause + // the test conditions to fail. + msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp) + } + } + } + heartbeatCtx, cancelHeartbeat := context.WithCancel(context.Background()) + defer cancelHeartbeat() + go heartbeat(heartbeatCtx) + + // Wait for node to be ready. + testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusReady) + + // Register job with max_client_disconnect. + job := mock.Job() + job.Constraints = []*structs.Constraint{} + job.TaskGroups[0].Count = 1 + job.TaskGroups[0].MaxClientDisconnect = pointer.Of(time.Hour) + job.TaskGroups[0].Constraints = []*structs.Constraint{} + job.TaskGroups[0].Tasks[0].Driver = "mock_driver" + job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ + "run_for": "10m", + } + + jobReq := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var jobResp structs.JobRegisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Register", jobReq, &jobResp) + must.NoError(t, err) + + // Wait for alloc to be pending in the server. + testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{ + structs.AllocClientStatusPending: 1, + }) + + // Get allocs that node should run. + allocsReq := &structs.NodeSpecificRequest{ + NodeID: node.ID, + QueryOptions: structs.QueryOptions{ + Region: "global", + }, + } + var allocsResp structs.NodeAllocsResponse + err = msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", allocsReq, &allocsResp) + must.NoError(t, err) + must.Len(t, 1, allocsResp.Allocs) + + // Tell server the alloc is running. + // Save the alloc so we can reuse the request later. + alloc := allocsResp.Allocs[0].Copy() + alloc.ClientStatus = structs.AllocClientStatusRunning + + allocUpdateReq := &structs.AllocUpdateRequest{ + Alloc: []*structs.Allocation{alloc}, + WriteRequest: structs.WriteRequest{ + Region: "global", + }, + } + var resp structs.GenericResponse + err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &resp) + must.NoError(t, err) + + // Wait for alloc to be running in the server. + testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{ + structs.AllocClientStatusRunning: 1, + }) + + // Stop heartbeat and wait for the client to be disconnected and the alloc + // to be unknown. + cancelHeartbeat() + testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusDisconnected) + testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{ + structs.AllocClientStatusUnknown: 1, + }) + + // Restart heartbeat to reconnect node. + heartbeatCtx, cancelHeartbeat = context.WithCancel(context.Background()) + defer cancelHeartbeat() + go heartbeat(heartbeatCtx) + + // Wait a few heartbeats and check that the node is still initializing. + // + // The heartbeat should not update the node to ready until it updates its + // allocs status with the server so the scheduler have the necessary + // information to avoid unnecessary placements. + time.Sleep(3 * heartbeatTTL) + testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusInit) + + // Get allocs that node should run. + // The node should only have one alloc assigned until it updates its allocs + // status with the server. + allocsReq = &structs.NodeSpecificRequest{ + NodeID: node.ID, + QueryOptions: structs.QueryOptions{ + Region: "global", + }, + } + err = msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", allocsReq, &allocsResp) + must.NoError(t, err) + must.Len(t, 1, allocsResp.Allocs) + + // Tell server the alloc is still running. + err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &resp) + must.NoError(t, err) + + // The client must end in the same state as before it disconnected: + // - client status is ready. + // - only 1 alloc and the alloc is running. + // - all evals are terminal, so cluster is in a stable state. + testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusReady) + testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{ + structs.AllocClientStatusRunning: 1, + }) + testutil.WaitForResult(func() (bool, error) { + state := s.fsm.State() + ws := memdb.NewWatchSet() + evals, err := state.EvalsByJob(ws, job.Namespace, job.ID) + if err != nil { + return false, fmt.Errorf("failed to read evals: %v", err) + } + for _, eval := range evals { + // TODO: remove this check once the disconnect process stops + // leaking a max-disconnect-timeout eval. + // https://github.com/hashicorp/nomad/issues/12809 + if eval.TriggeredBy == structs.EvalTriggerMaxDisconnectTimeout { + continue + } + + if !eval.TerminalStatus() { + return false, fmt.Errorf("found %s eval", eval.Status) + } + } + return true, nil + }, func(err error) { + must.NoError(t, err) + }) +} + func TestClientEndpoint_UpdateStatus_HeartbeatRecovery(t *testing.T) { ci.Parallel(t) require := require.New(t) @@ -639,14 +828,12 @@ func TestClientEndpoint_Register_GetEvals(t *testing.T) { } // Transition it to down and then ready - node.Status = structs.NodeStatusDown - reg = &structs.NodeRegisterRequest{ - Node: node, + req := &structs.NodeUpdateStatusRequest{ + NodeID: node.ID, + Status: structs.NodeStatusDown, WriteRequest: structs.WriteRequest{Region: "global"}, } - - // Fetch the response - if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp); err != nil { t.Fatalf("err: %v", err) } @@ -654,14 +841,12 @@ func TestClientEndpoint_Register_GetEvals(t *testing.T) { t.Fatalf("expected one eval; got %#v", resp.EvalIDs) } - node.Status = structs.NodeStatusReady - reg = &structs.NodeRegisterRequest{ - Node: node, + req = &structs.NodeUpdateStatusRequest{ + NodeID: node.ID, + Status: structs.NodeStatusReady, WriteRequest: structs.WriteRequest{Region: "global"}, } - - // Fetch the response - if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp); err != nil { t.Fatalf("err: %v", err) } @@ -1369,12 +1554,12 @@ func TestClientEndpoint_Drain_Down(t *testing.T) { require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp2)) // Mark the node as down - node.Status = structs.NodeStatusDown - reg = &structs.NodeRegisterRequest{ - Node: node, + req := &structs.NodeUpdateStatusRequest{ + NodeID: node.ID, + Status: structs.NodeStatusDown, WriteRequest: structs.WriteRequest{Region: "global"}, } - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)) + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp)) // Ensure that the allocation has transitioned to lost testutil.WaitForResult(func() (bool, error) { @@ -2581,7 +2766,7 @@ func TestClientEndpoint_UpdateAlloc_NodeNotReady(t *testing.T) { } var allocUpdateResp structs.NodeAllocsResponse err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp) - require.ErrorContains(t, err, "not ready") + require.ErrorContains(t, err, "not allowed to update allocs") // Send request without an explicit node ID. updatedAlloc.NodeID = "" diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 4e2f48c937b..45030d12e67 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-multierror" + "github.com/hashicorp/go-set" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/structs" @@ -909,6 +910,11 @@ func upsertNodeTxn(txn *txn, index uint64, node *structs.Node) error { node.CreateIndex = exist.CreateIndex node.ModifyIndex = index + // Update last missed heartbeat if the node became unresponsive. + if !exist.UnresponsiveStatus() && node.UnresponsiveStatus() { + node.LastMissedHeartbeatIndex = index + } + // Retain node events that have already been set on the node node.Events = exist.Events @@ -923,6 +929,16 @@ func upsertNodeTxn(txn *txn, index uint64, node *structs.Node) error { node.SchedulingEligibility = exist.SchedulingEligibility // Retain the eligibility node.DrainStrategy = exist.DrainStrategy // Retain the drain strategy node.LastDrain = exist.LastDrain // Retain the drain metadata + + // Retain the last index the node missed a heartbeat. + if node.LastMissedHeartbeatIndex < exist.LastMissedHeartbeatIndex { + node.LastMissedHeartbeatIndex = exist.LastMissedHeartbeatIndex + } + + // Retain the last index the node updated its allocs. + if node.LastAllocUpdateIndex < exist.LastAllocUpdateIndex { + node.LastAllocUpdateIndex = exist.LastAllocUpdateIndex + } } else { // Because this is the first time the node is being registered, we should // also create a node registration event @@ -1029,6 +1045,15 @@ func (s *StateStore) updateNodeStatusTxn(txn *txn, nodeID, status string, update copyNode.Status = status copyNode.ModifyIndex = txn.Index + // Update last missed heartbeat if the node became unresponsive or reset it + // zero if the node became ready. + if !existingNode.UnresponsiveStatus() && copyNode.UnresponsiveStatus() { + copyNode.LastMissedHeartbeatIndex = txn.Index + } else if existingNode.Status != structs.NodeStatusReady && + copyNode.Status == structs.NodeStatusReady { + copyNode.LastMissedHeartbeatIndex = 0 + } + // Insert the node if err := txn.Insert("nodes", copyNode); err != nil { return fmt.Errorf("node update failed: %v", err) @@ -3582,8 +3607,13 @@ func (s *StateStore) UpdateAllocsFromClient(msgType structs.MessageType, index u txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() + // Capture all nodes being affected. Alloc updates from clients are batched + // so this request may include allocs from several nodes. + nodeIDs := set.New[string](1) + // Handle each of the updated allocations for _, alloc := range allocs { + nodeIDs.Insert(alloc.NodeID) if err := s.nestedUpdateAllocFromClient(txn, index, alloc); err != nil { return err } @@ -3594,6 +3624,13 @@ func (s *StateStore) UpdateAllocsFromClient(msgType structs.MessageType, index u return fmt.Errorf("index update failed: %v", err) } + // Update the index of when nodes last updated their allocs. + for _, nodeID := range nodeIDs.List() { + if err := s.updateClientAllocUpdateIndex(txn, index, nodeID); err != nil { + return fmt.Errorf("node update failed: %v", err) + } + } + return txn.Commit() } @@ -3685,6 +3722,28 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *txn, index uint64, alloc * return nil } +func (s *StateStore) updateClientAllocUpdateIndex(txn *txn, index uint64, nodeID string) error { + existing, err := txn.First("nodes", "id", nodeID) + if err != nil { + return fmt.Errorf("node lookup failed: %v", err) + } + if existing == nil { + return nil + } + + node := existing.(*structs.Node) + copyNode := node.Copy() + copyNode.LastAllocUpdateIndex = index + + if err := txn.Insert("nodes", copyNode); err != nil { + return fmt.Errorf("node update failed: %v", err) + } + if err := txn.Insert("index", &IndexEntry{"nodes", txn.Index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + return nil +} + // UpsertAllocs is used to evict a set of allocations and allocate new ones at // the same time. func (s *StateStore) UpsertAllocs(msgType structs.MessageType, index uint64, allocs []*structs.Allocation) error { diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 1f1030ab1d0..6be73d1647a 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -1355,6 +1355,102 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) { require.False(watchFired(ws)) } +func TestStatStore_UpdateNodeStatus_LastMissedHeartbeatIndex(t *testing.T) { + ci.Parallel(t) + + testCases := []struct { + name string + transitions []string + expectedIndexes []uint64 + }{ + { + name: "disconnect", + transitions: []string{ + structs.NodeStatusReady, + structs.NodeStatusDisconnected, + }, + expectedIndexes: []uint64{0, 1001}, + }, + { + name: "reconnect", + transitions: []string{ + structs.NodeStatusReady, + structs.NodeStatusDisconnected, + structs.NodeStatusInit, + structs.NodeStatusReady, + }, + expectedIndexes: []uint64{0, 1001, 1001, 0}, + }, + { + name: "down", + transitions: []string{ + structs.NodeStatusReady, + structs.NodeStatusDown, + }, + expectedIndexes: []uint64{0, 1001}, + }, + { + name: "multiple reconnects", + transitions: []string{ + structs.NodeStatusReady, + structs.NodeStatusDisconnected, + structs.NodeStatusInit, + structs.NodeStatusReady, + structs.NodeStatusDown, + structs.NodeStatusReady, + structs.NodeStatusDisconnected, + structs.NodeStatusInit, + structs.NodeStatusReady, + }, + expectedIndexes: []uint64{0, 1001, 1001, 0, 1004, 0, 1006, 1006, 0}, + }, + { + name: "multiple heartbeats", + transitions: []string{ + structs.NodeStatusReady, + structs.NodeStatusDisconnected, + structs.NodeStatusInit, + structs.NodeStatusReady, + structs.NodeStatusReady, + structs.NodeStatusReady, + }, + expectedIndexes: []uint64{0, 1001, 1001, 0, 0, 0}, + }, + { + name: "delayed alloc update", + transitions: []string{ + structs.NodeStatusReady, + structs.NodeStatusDisconnected, + structs.NodeStatusInit, + structs.NodeStatusInit, + structs.NodeStatusInit, + structs.NodeStatusReady, + }, + expectedIndexes: []uint64{0, 1001, 1001, 1001, 1001, 0}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + state := testStateStore(t) + node := mock.Node() + must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 999, node)) + + for i, status := range tc.transitions { + now := time.Now().UnixNano() + err := state.UpdateNodeStatus(structs.MsgTypeTestSetup, uint64(1000+i), node.ID, status, now, nil) + must.NoError(t, err) + + ws := memdb.NewWatchSet() + out, err := state.NodeByID(ws, node.ID) + must.NoError(t, err) + must.Eq(t, tc.expectedIndexes[i], out.LastMissedHeartbeatIndex) + must.Eq(t, status, out.Status) + } + }) + } +} + func TestStateStore_BatchUpdateNodeDrain(t *testing.T) { ci.Parallel(t) require := require.New(t) @@ -5089,145 +5185,115 @@ func TestStateStore_UpdateAllocsFromClient(t *testing.T) { ci.Parallel(t) state := testStateStore(t) + + node := mock.Node() + must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 997, node)) + parent := mock.Job() - if err := state.UpsertJob(structs.MsgTypeTestSetup, 998, parent); err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 998, parent)) child := mock.Job() child.Status = "" child.ParentID = parent.ID - if err := state.UpsertJob(structs.MsgTypeTestSetup, 999, child); err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 999, child)) alloc := mock.Alloc() + alloc.NodeID = node.ID alloc.JobID = child.ID alloc.Job = child - - err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc}) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc})) ws := memdb.NewWatchSet() summary, err := state.JobSummaryByID(ws, parent.Namespace, parent.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if summary == nil { - t.Fatalf("nil summary") - } - if summary.JobID != parent.ID { - t.Fatalf("bad summary id: %v", parent.ID) - } - if summary.Children == nil { - t.Fatalf("nil children summary") - } - if summary.Children.Pending != 0 || summary.Children.Running != 1 || summary.Children.Dead != 0 { - t.Fatalf("bad children summary: %v", summary.Children) - } + must.NoError(t, err) + must.NotNil(t, summary) + must.Eq(t, parent.ID, summary.JobID) + must.NotNil(t, summary.Children) + must.Eq(t, 0, summary.Children.Pending) + must.Eq(t, 1, summary.Children.Running) + must.Eq(t, 0, summary.Children.Dead) // Create watchsets so we can test that update fires the watch ws = memdb.NewWatchSet() - if _, err := state.JobSummaryByID(ws, parent.Namespace, parent.ID); err != nil { - t.Fatalf("bad: %v", err) - } + _, err = state.JobSummaryByID(ws, parent.Namespace, parent.ID) + must.NoError(t, err) // Create the delta updates ts := map[string]*structs.TaskState{"web": {State: structs.TaskStateRunning}} update := &structs.Allocation{ ID: alloc.ID, + NodeID: alloc.NodeID, ClientStatus: structs.AllocClientStatusComplete, TaskStates: ts, JobID: alloc.JobID, TaskGroup: alloc.TaskGroup, } err = state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{update}) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, err) - if !watchFired(ws) { - t.Fatalf("bad") - } + must.True(t, watchFired(ws)) ws = memdb.NewWatchSet() summary, err = state.JobSummaryByID(ws, parent.Namespace, parent.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if summary == nil { - t.Fatalf("nil summary") - } - if summary.JobID != parent.ID { - t.Fatalf("bad summary id: %v", parent.ID) - } - if summary.Children == nil { - t.Fatalf("nil children summary") - } - if summary.Children.Pending != 0 || summary.Children.Running != 0 || summary.Children.Dead != 1 { - t.Fatalf("bad children summary: %v", summary.Children) - } + must.NoError(t, err) + must.NotNil(t, summary) + must.Eq(t, parent.ID, summary.JobID) + must.NotNil(t, summary.Children) + must.Eq(t, 0, summary.Children.Pending) + must.Eq(t, 0, summary.Children.Running) + must.Eq(t, 1, summary.Children.Dead) - if watchFired(ws) { - t.Fatalf("bad") - } + must.False(t, watchFired(ws)) } func TestStateStore_UpdateAllocsFromClient_ChildJob(t *testing.T) { ci.Parallel(t) state := testStateStore(t) + + node := mock.Node() + alloc1 := mock.Alloc() + alloc1.NodeID = node.ID + alloc2 := mock.Alloc() + alloc2.NodeID = node.ID - if err := state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc1.Job); err != nil { - t.Fatalf("err: %v", err) - } - if err := state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc2.Job); err != nil { - t.Fatalf("err: %v", err) - } - - err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1, alloc2}) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 998, node)) + must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc1.Job)) + must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc2.Job)) + must.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1, alloc2})) // Create watchsets so we can test that update fires the watch watches := make([]memdb.WatchSet, 8) for i := 0; i < 8; i++ { watches[i] = memdb.NewWatchSet() } - if _, err := state.AllocByID(watches[0], alloc1.ID); err != nil { - t.Fatalf("bad: %v", err) - } - if _, err := state.AllocByID(watches[1], alloc2.ID); err != nil { - t.Fatalf("bad: %v", err) - } - if _, err := state.AllocsByEval(watches[2], alloc1.EvalID); err != nil { - t.Fatalf("bad: %v", err) - } - if _, err := state.AllocsByEval(watches[3], alloc2.EvalID); err != nil { - t.Fatalf("bad: %v", err) - } - if _, err := state.AllocsByJob(watches[4], alloc1.Namespace, alloc1.JobID, false); err != nil { - t.Fatalf("bad: %v", err) - } - if _, err := state.AllocsByJob(watches[5], alloc2.Namespace, alloc2.JobID, false); err != nil { - t.Fatalf("bad: %v", err) - } - if _, err := state.AllocsByNode(watches[6], alloc1.NodeID); err != nil { - t.Fatalf("bad: %v", err) - } - if _, err := state.AllocsByNode(watches[7], alloc2.NodeID); err != nil { - t.Fatalf("bad: %v", err) - } + _, err := state.AllocByID(watches[0], alloc1.ID) + must.NoError(t, err) + _, err = state.AllocByID(watches[1], alloc2.ID) + must.NoError(t, err) + + _, err = state.AllocsByEval(watches[2], alloc1.EvalID) + must.NoError(t, err) + _, err = state.AllocsByEval(watches[3], alloc2.EvalID) + must.NoError(t, err) + + _, err = state.AllocsByJob(watches[4], alloc1.Namespace, alloc1.JobID, false) + must.NoError(t, err) + _, err = state.AllocsByJob(watches[5], alloc2.Namespace, alloc2.JobID, false) + must.NoError(t, err) + + _, err = state.AllocsByNode(watches[6], alloc1.NodeID) + must.NoError(t, err) + _, err = state.AllocsByNode(watches[7], alloc2.NodeID) + must.NoError(t, err) // Create the delta updates ts := map[string]*structs.TaskState{"web": {State: structs.TaskStatePending}} update := &structs.Allocation{ ID: alloc1.ID, + NodeID: alloc1.NodeID, ClientStatus: structs.AllocClientStatusFailed, TaskStates: ts, JobID: alloc1.JobID, @@ -5235,6 +5301,7 @@ func TestStateStore_UpdateAllocsFromClient_ChildJob(t *testing.T) { } update2 := &structs.Allocation{ ID: alloc2.ID, + NodeID: alloc2.NodeID, ClientStatus: structs.AllocClientStatusRunning, TaskStates: ts, JobID: alloc2.JobID, @@ -5242,93 +5309,70 @@ func TestStateStore_UpdateAllocsFromClient_ChildJob(t *testing.T) { } err = state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{update, update2}) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, err) - for i, ws := range watches { - if !watchFired(ws) { - t.Fatalf("bad %d", i) - } + for _, ws := range watches { + must.True(t, watchFired(ws)) } ws := memdb.NewWatchSet() out, err := state.AllocByID(ws, alloc1.ID) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, err) alloc1.CreateIndex = 1000 alloc1.ModifyIndex = 1001 alloc1.TaskStates = ts alloc1.ClientStatus = structs.AllocClientStatusFailed - if !reflect.DeepEqual(alloc1, out) { - t.Fatalf("bad: %#v %#v", alloc1, out) - } + must.Eq(t, alloc1, out) out, err = state.AllocByID(ws, alloc2.ID) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, err) alloc2.ModifyIndex = 1000 alloc2.ModifyIndex = 1001 alloc2.ClientStatus = structs.AllocClientStatusRunning alloc2.TaskStates = ts - if !reflect.DeepEqual(alloc2, out) { - t.Fatalf("bad: %#v %#v", alloc2, out) - } + must.Eq(t, alloc2, out) index, err := state.Index("allocs") - if err != nil { - t.Fatalf("err: %v", err) - } - if index != 1001 { - t.Fatalf("bad: %d", index) - } + must.NoError(t, err) + must.Eq(t, 1001, index) // Ensure summaries have been updated summary, err := state.JobSummaryByID(ws, alloc1.Namespace, alloc1.JobID) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, err) + tgSummary := summary.Summary["web"] - if tgSummary.Failed != 1 { - t.Fatalf("expected failed: %v, actual: %v, summary: %#v", 1, tgSummary.Failed, tgSummary) - } + must.Eq(t, 1, tgSummary.Failed) summary2, err := state.JobSummaryByID(ws, alloc2.Namespace, alloc2.JobID) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, err) + tgSummary2 := summary2.Summary["web"] - if tgSummary2.Running != 1 { - t.Fatalf("expected running: %v, actual: %v", 1, tgSummary2.Running) - } + must.Eq(t, 1, tgSummary2.Running) - if watchFired(ws) { - t.Fatalf("bad") - } + must.False(t, watchFired(ws)) } func TestStateStore_UpdateMultipleAllocsFromClient(t *testing.T) { ci.Parallel(t) state := testStateStore(t) + + node := mock.Node() + alloc := mock.Alloc() + alloc.NodeID = node.ID - if err := state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc.Job); err != nil { - t.Fatalf("err: %v", err) - } - err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc}) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 998, node)) + must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc.Job)) + must.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc})) // Create the delta updates ts := map[string]*structs.TaskState{"web": {State: structs.TaskStatePending}} update := &structs.Allocation{ ID: alloc.ID, + NodeID: alloc.NodeID, ClientStatus: structs.AllocClientStatusRunning, TaskStates: ts, JobID: alloc.JobID, @@ -5336,30 +5380,25 @@ func TestStateStore_UpdateMultipleAllocsFromClient(t *testing.T) { } update2 := &structs.Allocation{ ID: alloc.ID, + NodeID: alloc.NodeID, ClientStatus: structs.AllocClientStatusPending, TaskStates: ts, JobID: alloc.JobID, TaskGroup: alloc.TaskGroup, } - err = state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{update, update2}) - if err != nil { - t.Fatalf("err: %v", err) - } + err := state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{update, update2}) + must.NoError(t, err) ws := memdb.NewWatchSet() out, err := state.AllocByID(ws, alloc.ID) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, err) alloc.CreateIndex = 1000 alloc.ModifyIndex = 1001 alloc.TaskStates = ts alloc.ClientStatus = structs.AllocClientStatusPending - if !reflect.DeepEqual(alloc, out) { - t.Fatalf("bad: %#v , actual:%#v", alloc, out) - } + must.Eq(t, alloc, out) summary, err := state.JobSummaryByID(ws, alloc.Namespace, alloc.JobID) expectedSummary := &structs.JobSummary{ @@ -5374,35 +5413,36 @@ func TestStateStore_UpdateMultipleAllocsFromClient(t *testing.T) { CreateIndex: 999, ModifyIndex: 1001, } - if err != nil { - t.Fatalf("err: %v", err) - } - if !reflect.DeepEqual(summary, expectedSummary) { - t.Fatalf("expected: %#v, actual: %#v", expectedSummary, summary) - } + must.NoError(t, err) + must.Eq(t, summary, expectedSummary) } func TestStateStore_UpdateAllocsFromClient_Deployment(t *testing.T) { ci.Parallel(t) - require := require.New(t) state := testStateStore(t) + node := mock.Node() + alloc := mock.Alloc() now := time.Now() + alloc.NodeID = node.ID alloc.CreateTime = now.UnixNano() + pdeadline := 5 * time.Minute deployment := mock.Deployment() deployment.TaskGroups[alloc.TaskGroup].ProgressDeadline = pdeadline alloc.DeploymentID = deployment.ID - require.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc.Job)) - require.Nil(state.UpsertDeployment(1000, deployment)) - require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{alloc})) + must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 998, node)) + must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc.Job)) + must.NoError(t, state.UpsertDeployment(1000, deployment)) + must.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{alloc})) healthy := now.Add(time.Second) update := &structs.Allocation{ ID: alloc.ID, + NodeID: alloc.NodeID, ClientStatus: structs.AllocClientStatusRunning, JobID: alloc.JobID, TaskGroup: alloc.TaskGroup, @@ -5411,29 +5451,33 @@ func TestStateStore_UpdateAllocsFromClient_Deployment(t *testing.T) { Timestamp: healthy, }, } - require.Nil(state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{update})) + must.NoError(t, state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{update})) // Check that the deployment state was updated because the healthy // deployment dout, err := state.DeploymentByID(nil, deployment.ID) - require.Nil(err) - require.NotNil(dout) - require.Len(dout.TaskGroups, 1) + must.NoError(t, err) + must.NotNil(t, dout) + must.MapLen(t, 1, dout.TaskGroups) dstate := dout.TaskGroups[alloc.TaskGroup] - require.NotNil(dstate) - require.Equal(1, dstate.PlacedAllocs) - require.True(healthy.Add(pdeadline).Equal(dstate.RequireProgressBy)) + must.NotNil(t, dstate) + must.Eq(t, 1, dstate.PlacedAllocs) + must.True(t, healthy.Add(pdeadline).Equal(dstate.RequireProgressBy)) } // This tests that the deployment state is merged correctly func TestStateStore_UpdateAllocsFromClient_DeploymentStateMerges(t *testing.T) { ci.Parallel(t) - require := require.New(t) state := testStateStore(t) + + node := mock.Node() + alloc := mock.Alloc() now := time.Now() + alloc.NodeID = node.ID alloc.CreateTime = now.UnixNano() + pdeadline := 5 * time.Minute deployment := mock.Deployment() deployment.TaskGroups[alloc.TaskGroup].ProgressDeadline = pdeadline @@ -5442,12 +5486,14 @@ func TestStateStore_UpdateAllocsFromClient_DeploymentStateMerges(t *testing.T) { Canary: true, } - require.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc.Job)) - require.Nil(state.UpsertDeployment(1000, deployment)) - require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{alloc})) + must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 998, node)) + must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc.Job)) + must.NoError(t, state.UpsertDeployment(1000, deployment)) + must.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{alloc})) update := &structs.Allocation{ ID: alloc.ID, + NodeID: alloc.NodeID, ClientStatus: structs.AllocClientStatusRunning, JobID: alloc.JobID, TaskGroup: alloc.TaskGroup, @@ -5456,15 +5502,109 @@ func TestStateStore_UpdateAllocsFromClient_DeploymentStateMerges(t *testing.T) { Canary: false, }, } - require.Nil(state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{update})) + must.NoError(t, state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{update})) // Check that the merging of the deployment status was correct out, err := state.AllocByID(nil, alloc.ID) - require.Nil(err) - require.NotNil(out) - require.True(out.DeploymentStatus.Canary) - require.NotNil(out.DeploymentStatus.Healthy) - require.True(*out.DeploymentStatus.Healthy) + must.NoError(t, err) + must.NotNil(t, out) + must.True(t, out.DeploymentStatus.Canary) + must.NotNil(t, out.DeploymentStatus.Healthy) + must.True(t, *out.DeploymentStatus.Healthy) +} + +// TestStateStore_UpdateAllocsFromClient_UpdateNodes verifies that the relevant +// node data is updated when clients update their allocs. +func TestStateStore_UpdateAllocsFromClient_UpdateNodes(t *testing.T) { + ci.Parallel(t) + + state := testStateStore(t) + + node1 := mock.Node() + alloc1 := mock.Alloc() + alloc1.NodeID = node1.ID + + node2 := mock.Node() + alloc2 := mock.Alloc() + alloc2.NodeID = node2.ID + + node3 := mock.Node() + alloc3 := mock.Alloc() + alloc3.NodeID = node3.ID + + must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1000, node1)) + must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1001, node2)) + must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1002, node3)) + must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 1003, alloc1.Job)) + must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 1004, alloc2.Job)) + must.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1005, []*structs.Allocation{alloc1, alloc2, alloc3})) + + // Create watches to make sure they fire when nodes are updated. + ws1 := memdb.NewWatchSet() + _, err := state.NodeByID(ws1, node1.ID) + must.NoError(t, err) + + ws2 := memdb.NewWatchSet() + _, err = state.NodeByID(ws2, node2.ID) + must.NoError(t, err) + + ws3 := memdb.NewWatchSet() + _, err = state.NodeByID(ws3, node3.ID) + must.NoError(t, err) + + // Create and apply alloc updates. + // Don't update alloc 3. + updateAlloc1 := &structs.Allocation{ + ID: alloc1.ID, + NodeID: alloc1.NodeID, + ClientStatus: structs.AllocClientStatusRunning, + JobID: alloc1.JobID, + TaskGroup: alloc1.TaskGroup, + } + updateAlloc2 := &structs.Allocation{ + ID: alloc2.ID, + NodeID: alloc2.NodeID, + ClientStatus: structs.AllocClientStatusRunning, + JobID: alloc2.JobID, + TaskGroup: alloc2.TaskGroup, + } + updateAllocNonExisting := &structs.Allocation{ + ID: uuid.Generate(), + NodeID: uuid.Generate(), + ClientStatus: structs.AllocClientStatusRunning, + JobID: uuid.Generate(), + TaskGroup: "group", + } + + err = state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 1005, []*structs.Allocation{ + updateAlloc1, updateAlloc2, updateAllocNonExisting, + }) + must.NoError(t, err) + + // Check that node update watches fired. + must.True(t, watchFired(ws1)) + must.True(t, watchFired(ws2)) + + // Check that node LastAllocUpdateIndex were updated. + ws := memdb.NewWatchSet() + out, err := state.NodeByID(ws, node1.ID) + must.NoError(t, err) + must.NotNil(t, out) + must.Eq(t, 1005, out.LastAllocUpdateIndex) + must.False(t, watchFired(ws)) + + out, err = state.NodeByID(ws, node2.ID) + must.NoError(t, err) + must.NotNil(t, out) + must.Eq(t, 1005, out.LastAllocUpdateIndex) + must.False(t, watchFired(ws)) + + // Node 3 should not be updated. + out, err = state.NodeByID(ws, node3.ID) + must.NoError(t, err) + must.NotNil(t, out) + must.Eq(t, 0, out.LastAllocUpdateIndex) + must.False(t, watchFired(ws)) } func TestStateStore_UpsertAlloc_Alloc(t *testing.T) { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index ca53fcb5616..0a7b32f6d08 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2090,6 +2090,14 @@ type Node struct { // LastDrain contains metadata about the most recent drain operation LastDrain *DrainMetadata + // LastMissedHeartbeatIndex stores the Raft index when the node last missed + // a heartbeat. It resets to zero once the node is marked as ready again. + LastMissedHeartbeatIndex uint64 + + // LastAllocUpdateIndex stores the Raft index of the last time the node + // updatedd its allocations status. + LastAllocUpdateIndex uint64 + // Raft Indexes CreateIndex uint64 ModifyIndex uint64 @@ -2184,6 +2192,17 @@ func (n *Node) Copy() *Node { return &nn } +// UnresponsiveStatus returns true if the node is a status where it is not +// communicating with the server. +func (n *Node) UnresponsiveStatus() bool { + switch n.Status { + case NodeStatusDown, NodeStatusDisconnected: + return true + default: + return false + } +} + // TerminalStatus returns if the current status is terminal and // will no longer transition. func (n *Node) TerminalStatus() bool { diff --git a/testutil/wait.go b/testutil/wait.go index 7cc87961368..4bf44167c66 100644 --- a/testutil/wait.go +++ b/testutil/wait.go @@ -7,8 +7,10 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/hashicorp/nomad/nomad/structs" "github.com/kr/pretty" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" ) @@ -143,7 +145,12 @@ func WaitForLeader(t testing.TB, rpc rpcFn) { // WaitForClient blocks until the client can be found func WaitForClient(t testing.TB, rpc rpcFn, nodeID string, region string) { + t.Helper() + WaitForClientStatus(t, rpc, nodeID, region, structs.NodeStatusReady) +} +// WaitForClientStatus blocks until the client is in the expected status. +func WaitForClientStatus(t testing.TB, rpc rpcFn, nodeID string, region string, status string) { t.Helper() if region == "" { @@ -163,12 +170,15 @@ func WaitForClient(t testing.TB, rpc rpcFn, nodeID string, region string) { if out.Node == nil { return false, fmt.Errorf("node not found") } - return out.Node.Status == structs.NodeStatusReady, nil + if out.Node.Status != status { + return false, fmt.Errorf("node is %s, not %s", out.Node.Status, status) + } + return true, nil }, func(err error) { - t.Fatalf("failed to find node: %v", err) + t.Fatalf("failed to wait for node staus: %v", err) }) - t.Logf("[TEST] Client for test %s ready, id: %s, region: %s", t.Name(), nodeID, region) + t.Logf("[TEST] Client for test %s %s, id: %s, region: %s", t.Name(), status, nodeID, region) } // WaitForVotingMembers blocks until autopilot promotes all server peers @@ -270,6 +280,53 @@ func WaitForRunning(t testing.TB, rpc rpcFn, job *structs.Job) []*structs.AllocL return WaitForRunningWithToken(t, rpc, job, "") } +// WaitforJobAllocStatus blocks until the ClientStatus of allocations for a job +// match the expected map of : . +func WaitForJobAllocStatus(t testing.TB, rpc rpcFn, job *structs.Job, allocStatus map[string]int) { + t.Helper() + WaitForJobAllocStatusWithToken(t, rpc, job, allocStatus, "") +} + +// WaitForJobAllocStatusWithToken behaves the same way as WaitForJobAllocStatus +// but is used for clusters with ACL enabled. +func WaitForJobAllocStatusWithToken(t testing.TB, rpc rpcFn, job *structs.Job, allocStatus map[string]int, token string) { + t.Helper() + + WaitForResultRetries(2000*TestMultiplier(), func() (bool, error) { + args := &structs.JobSpecificRequest{ + JobID: job.ID, + QueryOptions: structs.QueryOptions{ + AuthToken: token, + Namespace: job.Namespace, + Region: job.Region, + }, + } + + var resp structs.JobAllocationsResponse + err := rpc("Job.Allocations", args, &resp) + if err != nil { + return false, fmt.Errorf("Job.Allocations error: %v", err) + } + + if len(resp.Allocations) == 0 { + evals := structs.JobEvaluationsResponse{} + require.NoError(t, rpc("Job.Evaluations", args, &evals), "error looking up evals") + return false, fmt.Errorf("0 allocations; evals: %s", pretty.Sprint(evals.Evaluations)) + } + + got := map[string]int{} + for _, alloc := range resp.Allocations { + got[alloc.ClientStatus]++ + } + if diff := cmp.Diff(allocStatus, got); diff != "" { + return false, fmt.Errorf("alloc status mismatch (-want +got):\n%s", diff) + } + return true, nil + }, func(err error) { + must.NoError(t, err) + }) +} + // WaitForFiles blocks until all the files in the slice are present func WaitForFiles(t testing.TB, files []string) { WaitForResult(func() (bool, error) {