From 4331b7a92415880cd59f498659d81f80ba6d3889 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Tue, 17 Jan 2023 19:34:49 -0500 Subject: [PATCH 01/13] core: enforce strict steps for clients reconnect When a Nomad client that is running an allocation with `max_client_disconnect` set misses a heartbeat the Nomad server will update its status to `disconnected`. Upon reconnecting, the client will make three main RPC calls: - `Node.UpdateStatus` is used to set the client status to `ready`. - `Node.UpdateAlloc` is used to update the client-side information about allocations, such as their `ClientStatus`, task states etc. - `Node.Register` is used to upsert the entire node information, including its status. These calls are made concurrently and are also running in parallel with the scheduler. Depending on the order they run the scheduler may end up with incomplete data when reconciling allocations. For example, a client disconnects and its replacement allocation cannot be placed anywhere else, so there's a pending eval waiting for resources. When this client comes back the order of events may be: 1. Client calls `Node.UpdateStatus` and is now `ready`. 2. Scheduler reconciles allocations and places the replacement alloc to the client. The client is now assigned two allocations: the original alloc that is still `unknown` and the replacement that is `pending`. 3. Client calls `Node.UpdateAlloc` and updates the original alloc to `running`. 4. Scheduler notices too many allocs and stops the replacement. This creates unnecessary placements or, in a different order of events, may leave the job without any allocations running until the whole state is updated and reconciled. To avoid problems like this clients must update _all_ of its relevant information before they can be considered `ready` and available for scheduling. To achieve this goal the RPC endpoints mentioned above have been modified to enforce strict steps for nodes reconnecting: - `Node.Register` does not set the client status anymore. - `Node.UpdateStatus` sets the reconnecting client to the `initializing` status until it successfully calls `Node.UpdateAlloc`. These changes are done server-side to avoid the need of additional coordination between clients and servers. Clients are kept oblivious of these changes and will keep making these calls as they normally would. The verification of whether allocations have been updates is done by storing and comparing the Raft index of the last time the client missed a heartbeat and the last time it updated its allocations. --- nomad/node_endpoint.go | 43 +++++++- nomad/node_endpoint_test.go | 196 ++++++++++++++++++++++++++++++++---- nomad/state/state_store.go | 20 ++++ nomad/structs/structs.go | 19 ++++ testutil/wait.go | 59 ++++++++++- 5 files changed, 314 insertions(+), 23 deletions(-) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 0151e62719b..47c658c2c63 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -157,11 +157,17 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp return err } - // Check if the SecretID has been tampered with if originalNode != nil { + // Check if the SecretID has been tampered with if args.Node.SecretID != originalNode.SecretID && originalNode.SecretID != "" { return fmt.Errorf("node secret ID does not match. Not registering node.") } + + // Don't allow the Register method to update the node status. Only the + // UpdateStatus method should be able to do this. + if originalNode.Status != "" { + args.Node.Status = originalNode.Status + } } // We have a valid node connection, so add the mapping to cache the @@ -486,6 +492,26 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct // Update the timestamp of when the node status was updated args.UpdatedAt = time.Now().Unix() + // Compute next status. + switch node.Status { + case structs.NodeStatusInit: + if args.Status == structs.NodeStatusReady { + allocs, err := snap.AllocsByNodeTerminal(ws, args.NodeID, false) + if err != nil { + return fmt.Errorf("failed to query node allocs: %v", err) + } + + allocsUpdated := node.LastAllocUpdateIndex > node.LastMissedHeartbeatIndex + if len(allocs) > 0 && !allocsUpdated { + args.Status = structs.NodeStatusInit + } + } + case structs.NodeStatusDisconnected: + if args.Status == structs.NodeStatusReady { + args.Status = structs.NodeStatusInit + } + } + // Commit this update via Raft var index uint64 if node.Status != args.Status { @@ -1179,8 +1205,8 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene if node == nil { return fmt.Errorf("node %s not found", nodeID) } - if node.Status != structs.NodeStatusReady { - return fmt.Errorf("node %s is %s, not %s", nodeID, node.Status, structs.NodeStatusReady) + if node.UnresponsiveStatus() { + return fmt.Errorf("node %s is not allow to update allocs while in status %s", nodeID, node.Status) } // Ensure that evals aren't set from client RPCs @@ -1313,6 +1339,17 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene return err } + // Update node alloc update index. + copyNode := node.Copy() + copyNode.LastAllocUpdateIndex = future.Index() + + _, _, err = n.srv.raftApply(structs.NodeRegisterRequestType, &structs.NodeRegisterRequest{ + Node: copyNode, + }) + if err != nil { + return fmt.Errorf("node update failed: %v", err) + } + // Setup the response reply.Index = future.Index() return nil diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 32aa966b5cd..dd61ecb2270 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -23,6 +23,7 @@ import ( "github.com/hashicorp/nomad/testutil" vapi "github.com/hashicorp/vault/api" "github.com/kr/pretty" + "github.com/shoenig/test/must" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -524,6 +525,171 @@ func TestClientEndpoint_UpdateStatus_Vault(t *testing.T) { } } +func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) { + ci.Parallel(t) + + // Setup server with tighther heartbeat so we don't have to wait so long + // for nodes to go down. + heartbeatTTL := time.Duration(500*testutil.TestMultiplier()) * time.Millisecond + s, cleanupS := TestServer(t, func(c *Config) { + c.MinHeartbeatTTL = heartbeatTTL + c.HeartbeatGrace = 2 * heartbeatTTL + }) + codec := rpcClient(t, s) + defer cleanupS() + testutil.WaitForLeader(t, s.RPC) + + // Register node. + node := mock.Node() + reg := &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var nodeUpdateResp structs.NodeUpdateResponse + err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &nodeUpdateResp) + must.NoError(t, err) + + // Start heartbeat. + stopHeartbeat := make(chan interface{}) + heartbeat := func() { + ticker := time.NewTicker(heartbeatTTL / 2) + for { + select { + case <-stopHeartbeat: + ticker.Stop() + return + case <-ticker.C: + hb := &structs.NodeUpdateStatusRequest{ + NodeID: node.ID, + Status: structs.NodeStatusReady, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", hb, &nodeUpdateResp) + must.NoError(t, err) + } + } + } + go heartbeat() + + // Wait for node to be ready. + testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusReady) + + // Register job with max_client_disconnect. + job := mock.Job() + job.Constraints = []*structs.Constraint{} + job.TaskGroups[0].Count = 1 + job.TaskGroups[0].MaxClientDisconnect = pointer.Of(time.Hour) + job.TaskGroups[0].Constraints = []*structs.Constraint{} + job.TaskGroups[0].Tasks[0].Driver = "mock_driver" + job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ + "run_for": "10m", + } + + jobReq := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var jobResp structs.JobRegisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Register", jobReq, &jobResp) + must.NoError(t, err) + + // Wait for alloc run be pending in the server. + testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{ + structs.AllocClientStatusPending: 1, + }) + + // Get allocs that node should run. + allocsReq := &structs.NodeSpecificRequest{ + NodeID: node.ID, + QueryOptions: structs.QueryOptions{ + Region: "global", + }, + } + var allocsResp structs.NodeAllocsResponse + err = msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", allocsReq, &allocsResp) + must.NoError(t, err) + must.Len(t, 1, allocsResp.Allocs) + + // Tell server the alloc is running. + // Save the alloc so we can reuse the request later. + alloc := allocsResp.Allocs[0].Copy() + alloc.ClientStatus = structs.AllocClientStatusRunning + + allocUpdateReq := &structs.AllocUpdateRequest{ + Alloc: []*structs.Allocation{alloc}, + WriteRequest: structs.WriteRequest{ + Region: "global", + }, + } + var resp structs.GenericResponse + err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &resp) + must.NoError(t, err) + + // Wait for alloc run be running in the server. + testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{ + structs.AllocClientStatusRunning: 1, + }) + + // Stop heartbeat and wait for the client to be disconnected and the alloc + // to be unknown. + close(stopHeartbeat) + testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusDisconnected) + testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{ + structs.AllocClientStatusUnknown: 1, + }) + + // There should be a pending eval for the alloc replacement. + state := s.fsm.State() + ws := memdb.NewWatchSet() + evals, err := state.EvalsByJob(ws, job.Namespace, job.ID) + found := false + for _, eval := range evals { + if eval.Status == structs.EvalStatusPending { + found = true + break + } + } + must.True(t, found) + + // Restart heartbeat to reconnect node. + stopHeartbeat = make(chan interface{}) + go heartbeat() + + // Wait for node to be initializing. + // It must remain initializing until it updates its allocs with the server + // so the scheduler have the necessary information to avoid unnecessary + // placements by the pending eval. + testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusInit) + + // Get allocs that node should run. + // The node should only have one alloc assigned until it updates its allocs + // status with the server. + allocsReq = &structs.NodeSpecificRequest{ + NodeID: node.ID, + QueryOptions: structs.QueryOptions{ + Region: "global", + }, + } + err = msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", allocsReq, &allocsResp) + must.NoError(t, err) + must.Len(t, 1, allocsResp.Allocs) + + // Tell server the alloc is running. + err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &resp) + must.NoError(t, err) + + // Wait for alloc run be running in the server. + testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{ + structs.AllocClientStatusRunning: 1, + }) + + // Wait for the client to be ready. + testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusReady) +} + func TestClientEndpoint_UpdateStatus_HeartbeatRecovery(t *testing.T) { ci.Parallel(t) require := require.New(t) @@ -639,14 +805,12 @@ func TestClientEndpoint_Register_GetEvals(t *testing.T) { } // Transition it to down and then ready - node.Status = structs.NodeStatusDown - reg = &structs.NodeRegisterRequest{ - Node: node, + req := &structs.NodeUpdateStatusRequest{ + NodeID: node.ID, + Status: structs.NodeStatusDown, WriteRequest: structs.WriteRequest{Region: "global"}, } - - // Fetch the response - if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp); err != nil { t.Fatalf("err: %v", err) } @@ -654,14 +818,12 @@ func TestClientEndpoint_Register_GetEvals(t *testing.T) { t.Fatalf("expected one eval; got %#v", resp.EvalIDs) } - node.Status = structs.NodeStatusReady - reg = &structs.NodeRegisterRequest{ - Node: node, + req = &structs.NodeUpdateStatusRequest{ + NodeID: node.ID, + Status: structs.NodeStatusReady, WriteRequest: structs.WriteRequest{Region: "global"}, } - - // Fetch the response - if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp); err != nil { t.Fatalf("err: %v", err) } @@ -1369,12 +1531,12 @@ func TestClientEndpoint_Drain_Down(t *testing.T) { require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp2)) // Mark the node as down - node.Status = structs.NodeStatusDown - reg = &structs.NodeRegisterRequest{ - Node: node, + req := &structs.NodeUpdateStatusRequest{ + NodeID: node.ID, + Status: structs.NodeStatusDown, WriteRequest: structs.WriteRequest{Region: "global"}, } - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)) + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp)) // Ensure that the allocation has transitioned to lost testutil.WaitForResult(func() (bool, error) { @@ -2581,7 +2743,7 @@ func TestClientEndpoint_UpdateAlloc_NodeNotReady(t *testing.T) { } var allocUpdateResp structs.NodeAllocsResponse err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp) - require.ErrorContains(t, err, "not ready") + require.ErrorContains(t, err, "not allow to update allocs") // Send request without an explicit node ID. updatedAlloc.NodeID = "" diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 4e2f48c937b..948f81e3bbc 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -909,6 +909,11 @@ func upsertNodeTxn(txn *txn, index uint64, node *structs.Node) error { node.CreateIndex = exist.CreateIndex node.ModifyIndex = index + // Update last missed heartbeat if the node became unresponsive. + if !exist.UnresponsiveStatus() && node.UnresponsiveStatus() { + node.LastMissedHeartbeatIndex = index + } + // Retain node events that have already been set on the node node.Events = exist.Events @@ -923,6 +928,16 @@ func upsertNodeTxn(txn *txn, index uint64, node *structs.Node) error { node.SchedulingEligibility = exist.SchedulingEligibility // Retain the eligibility node.DrainStrategy = exist.DrainStrategy // Retain the drain strategy node.LastDrain = exist.LastDrain // Retain the drain metadata + + // Retain the last index the node missed a heartbeat. + if node.LastMissedHeartbeatIndex < exist.LastMissedHeartbeatIndex { + node.LastMissedHeartbeatIndex = exist.LastMissedHeartbeatIndex + } + + // Retain the last index the node updated its allocs. + if node.LastAllocUpdateIndex < exist.LastAllocUpdateIndex { + node.LastAllocUpdateIndex = exist.LastAllocUpdateIndex + } } else { // Because this is the first time the node is being registered, we should // also create a node registration event @@ -1029,6 +1044,11 @@ func (s *StateStore) updateNodeStatusTxn(txn *txn, nodeID, status string, update copyNode.Status = status copyNode.ModifyIndex = txn.Index + // Update last missed heartbeat if the node became unresponsive. + if !existingNode.UnresponsiveStatus() && copyNode.UnresponsiveStatus() { + copyNode.LastMissedHeartbeatIndex = txn.Index + } + // Insert the node if err := txn.Insert("nodes", copyNode); err != nil { return fmt.Errorf("node update failed: %v", err) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index ca53fcb5616..6c052df4e84 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2090,6 +2090,14 @@ type Node struct { // LastDrain contains metadata about the most recent drain operation LastDrain *DrainMetadata + // LastMissedHeartbeatIndex stores the Raft index when the node + // last missed a heartbeat. + LastMissedHeartbeatIndex uint64 + + // LastAllocUpdateIndex stores the Raft index of the last time the node + // updatedd its allocations status. + LastAllocUpdateIndex uint64 + // Raft Indexes CreateIndex uint64 ModifyIndex uint64 @@ -2184,6 +2192,17 @@ func (n *Node) Copy() *Node { return &nn } +// UnresponsiveStatus returns true if the node is a status where it is not +// communicating with the server. +func (n *Node) UnresponsiveStatus() bool { + switch n.Status { + case NodeStatusDown, NodeStatusDisconnected: + return true + default: + return false + } +} + // TerminalStatus returns if the current status is terminal and // will no longer transition. func (n *Node) TerminalStatus() bool { diff --git a/testutil/wait.go b/testutil/wait.go index 7cc87961368..41643cdfc15 100644 --- a/testutil/wait.go +++ b/testutil/wait.go @@ -7,8 +7,10 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/hashicorp/nomad/nomad/structs" "github.com/kr/pretty" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" ) @@ -143,7 +145,12 @@ func WaitForLeader(t testing.TB, rpc rpcFn) { // WaitForClient blocks until the client can be found func WaitForClient(t testing.TB, rpc rpcFn, nodeID string, region string) { + t.Helper() + WaitForClientStatus(t, rpc, nodeID, region, structs.NodeStatusReady) +} +// WaitForClientStatus blocks until the client is in the expected status. +func WaitForClientStatus(t testing.TB, rpc rpcFn, nodeID string, region string, status string) { t.Helper() if region == "" { @@ -163,12 +170,15 @@ func WaitForClient(t testing.TB, rpc rpcFn, nodeID string, region string) { if out.Node == nil { return false, fmt.Errorf("node not found") } - return out.Node.Status == structs.NodeStatusReady, nil + if out.Node.Status != status { + return false, fmt.Errorf("node is %s, not %s", out.Node.Status, status) + } + return true, nil }, func(err error) { - t.Fatalf("failed to find node: %v", err) + t.Fatalf("failed to wait for node staus: %v", err) }) - t.Logf("[TEST] Client for test %s ready, id: %s, region: %s", t.Name(), nodeID, region) + t.Logf("[TEST] Client for test %s %s, id: %s, region: %s", t.Name(), status, nodeID, region) } // WaitForVotingMembers blocks until autopilot promotes all server peers @@ -270,6 +280,49 @@ func WaitForRunning(t testing.TB, rpc rpcFn, job *structs.Job) []*structs.AllocL return WaitForRunningWithToken(t, rpc, job, "") } +func WaitForJobAllocStatus(t testing.TB, rpc rpcFn, job *structs.Job, allocStatus map[string]int) { + t.Helper() + WaitForJobAllocStatusWithToken(t, rpc, job, allocStatus, "") +} + +func WaitForJobAllocStatusWithToken(t testing.TB, rpc rpcFn, job *structs.Job, allocStatus map[string]int, token string) { + t.Helper() + + WaitForResultRetries(2000*TestMultiplier(), func() (bool, error) { + args := &structs.JobSpecificRequest{ + JobID: job.ID, + QueryOptions: structs.QueryOptions{ + AuthToken: token, + Namespace: job.Namespace, + Region: job.Region, + }, + } + + var resp structs.JobAllocationsResponse + err := rpc("Job.Allocations", args, &resp) + if err != nil { + return false, fmt.Errorf("Job.Allocations error: %v", err) + } + + if len(resp.Allocations) == 0 { + evals := structs.JobEvaluationsResponse{} + require.NoError(t, rpc("Job.Evaluations", args, &evals), "error looking up evals") + return false, fmt.Errorf("0 allocations; evals: %s", pretty.Sprint(evals.Evaluations)) + } + + got := map[string]int{} + for _, alloc := range resp.Allocations { + got[alloc.ClientStatus]++ + } + if diff := cmp.Diff(allocStatus, got); diff != "" { + return false, fmt.Errorf("alloc status mismatch (-want +got):\n%s", diff) + } + return true, nil + }, func(err error) { + must.NoError(t, err) + }) +} + // WaitForFiles blocks until all the files in the slice are present func WaitForFiles(t testing.TB, files []string) { WaitForResult(func() (bool, error) { From bb7cab4b35846735c923862cfb1e8335b1ecb1e7 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Wed, 18 Jan 2023 20:37:14 -0500 Subject: [PATCH 02/13] chagelog: add entry for #15808 --- .changelog/15808.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/15808.txt diff --git a/.changelog/15808.txt b/.changelog/15808.txt new file mode 100644 index 00000000000..30371c4f2a8 --- /dev/null +++ b/.changelog/15808.txt @@ -0,0 +1,3 @@ +```release-note:bug +core: enforce strict steps for clients reconnect +``` From 8ae81441ac070fc11ea3268303a292bc191c1fa3 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Wed, 18 Jan 2023 20:57:44 -0500 Subject: [PATCH 03/13] test: add missing godoc strings and don't reuse variables --- nomad/node_endpoint_test.go | 5 +++-- testutil/wait.go | 4 ++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index dd61ecb2270..7a7dd3669d6 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -559,12 +559,13 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) { ticker.Stop() return case <-ticker.C: - hb := &structs.NodeUpdateStatusRequest{ + req := &structs.NodeUpdateStatusRequest{ NodeID: node.ID, Status: structs.NodeStatusReady, WriteRequest: structs.WriteRequest{Region: "global"}, } - err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", hb, &nodeUpdateResp) + var resp structs.NodeUpdateResponse + err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp) must.NoError(t, err) } } diff --git a/testutil/wait.go b/testutil/wait.go index 41643cdfc15..4bf44167c66 100644 --- a/testutil/wait.go +++ b/testutil/wait.go @@ -280,11 +280,15 @@ func WaitForRunning(t testing.TB, rpc rpcFn, job *structs.Job) []*structs.AllocL return WaitForRunningWithToken(t, rpc, job, "") } +// WaitforJobAllocStatus blocks until the ClientStatus of allocations for a job +// match the expected map of : . func WaitForJobAllocStatus(t testing.TB, rpc rpcFn, job *structs.Job, allocStatus map[string]int) { t.Helper() WaitForJobAllocStatusWithToken(t, rpc, job, allocStatus, "") } +// WaitForJobAllocStatusWithToken behaves the same way as WaitForJobAllocStatus +// but is used for clusters with ACL enabled. func WaitForJobAllocStatusWithToken(t testing.TB, rpc rpcFn, job *structs.Job, allocStatus map[string]int, token string) { t.Helper() From 0b03f926f71b657f0f02fe1256ba7ef599ccb0cd Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Thu, 19 Jan 2023 10:37:37 -0500 Subject: [PATCH 04/13] test: don't leak goroutine --- nomad/node_endpoint_test.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 7a7dd3669d6..7d55a84dbf9 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -559,14 +559,19 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) { ticker.Stop() return case <-ticker.C: + if t.Failed() { + return + } + req := &structs.NodeUpdateStatusRequest{ NodeID: node.ID, Status: structs.NodeStatusReady, WriteRequest: structs.WriteRequest{Region: "global"}, } var resp structs.NodeUpdateResponse - err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp) - must.NoError(t, err) + // Ignore errors since an unexpected failed hearbeat will cause + // the test conditions to fail. + msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp) } } } @@ -689,6 +694,10 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) { // Wait for the client to be ready. testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusReady) + + // Cleanup heartbeat goroutine before exiting. + close(stopHeartbeat) + testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusDisconnected) } func TestClientEndpoint_UpdateStatus_HeartbeatRecovery(t *testing.T) { From 0040e1788581762da90573dd91e9013681227631 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Thu, 19 Jan 2023 19:05:55 -0500 Subject: [PATCH 05/13] test: apply code review suggestions --- nomad/node_endpoint_test.go | 89 +++++++++++++++++++++---------------- 1 file changed, 51 insertions(+), 38 deletions(-) diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 7d55a84dbf9..8e9e2cc01b2 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -1,6 +1,7 @@ package nomad import ( + "context" "errors" "fmt" "net" @@ -528,7 +529,7 @@ func TestClientEndpoint_UpdateStatus_Vault(t *testing.T) { func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) { ci.Parallel(t) - // Setup server with tighther heartbeat so we don't have to wait so long + // Setup server with tighter heartbeat so we don't have to wait so long // for nodes to go down. heartbeatTTL := time.Duration(500*testutil.TestMultiplier()) * time.Millisecond s, cleanupS := TestServer(t, func(c *Config) { @@ -550,13 +551,13 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) { must.NoError(t, err) // Start heartbeat. - stopHeartbeat := make(chan interface{}) - heartbeat := func() { + heartbeat := func(ctx context.Context) { ticker := time.NewTicker(heartbeatTTL / 2) + defer ticker.Stop() + for { select { - case <-stopHeartbeat: - ticker.Stop() + case <-ctx.Done(): return case <-ticker.C: if t.Failed() { @@ -569,13 +570,15 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) { WriteRequest: structs.WriteRequest{Region: "global"}, } var resp structs.NodeUpdateResponse - // Ignore errors since an unexpected failed hearbeat will cause + // Ignore errors since an unexpected failed heartbeat will cause // the test conditions to fail. msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp) } } } - go heartbeat() + heartbeatCtx, cancelHeartbeat := context.WithCancel(context.Background()) + defer cancelHeartbeat() + go heartbeat(heartbeatCtx) // Wait for node to be ready. testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusReady) @@ -602,7 +605,7 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) { err = msgpackrpc.CallWithCodec(codec, "Job.Register", jobReq, &jobResp) must.NoError(t, err) - // Wait for alloc run be pending in the server. + // Wait for alloc to be pending in the server. testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{ structs.AllocClientStatusPending: 1, }) @@ -634,40 +637,30 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) { err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &resp) must.NoError(t, err) - // Wait for alloc run be running in the server. + // Wait for alloc to be running in the server. testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{ structs.AllocClientStatusRunning: 1, }) // Stop heartbeat and wait for the client to be disconnected and the alloc // to be unknown. - close(stopHeartbeat) + cancelHeartbeat() testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusDisconnected) testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{ structs.AllocClientStatusUnknown: 1, }) - // There should be a pending eval for the alloc replacement. - state := s.fsm.State() - ws := memdb.NewWatchSet() - evals, err := state.EvalsByJob(ws, job.Namespace, job.ID) - found := false - for _, eval := range evals { - if eval.Status == structs.EvalStatusPending { - found = true - break - } - } - must.True(t, found) - // Restart heartbeat to reconnect node. - stopHeartbeat = make(chan interface{}) - go heartbeat() - - // Wait for node to be initializing. - // It must remain initializing until it updates its allocs with the server - // so the scheduler have the necessary information to avoid unnecessary - // placements by the pending eval. + heartbeatCtx, cancelHeartbeat = context.WithCancel(context.Background()) + defer cancelHeartbeat() + go heartbeat(heartbeatCtx) + + // Wait a few heartbeats and check that the node is still initializing. + // + // The heartbeat should not update the node to ready until it updates its + // allocs status with the server so the scheduler have the necessary + // information to avoid unnecessary placements. + time.Sleep(3 * heartbeatTTL) testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusInit) // Get allocs that node should run. @@ -683,21 +676,41 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) { must.NoError(t, err) must.Len(t, 1, allocsResp.Allocs) - // Tell server the alloc is running. + // Tell server the alloc is still running. err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &resp) must.NoError(t, err) - // Wait for alloc run be running in the server. + // The client must end in the same state as before it disconnected: + // - client status is ready. + // - only 1 alloc and the alloc is running. + // - all evals are terminal, so cluster is in a stable state. + testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusReady) testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{ structs.AllocClientStatusRunning: 1, }) + testutil.WaitForResult(func() (bool, error) { + state := s.fsm.State() + ws := memdb.NewWatchSet() + evals, err := state.EvalsByJob(ws, job.Namespace, job.ID) + if err != nil { + return false, fmt.Errorf("failed to read evals: %v", err) + } + for _, eval := range evals { + // TODO: remove this check once the disconnect process stops + // leaking a max-disconnect-timeout eval. + // https://github.com/hashicorp/nomad/issues/12809 + if eval.TriggeredBy == structs.EvalTriggerMaxDisconnectTimeout { + continue + } - // Wait for the client to be ready. - testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusReady) - - // Cleanup heartbeat goroutine before exiting. - close(stopHeartbeat) - testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusDisconnected) + if !eval.TerminalStatus() { + return false, fmt.Errorf("found %s eval", eval.Status) + } + } + return true, nil + }, func(err error) { + must.NoError(t, err) + }) } func TestClientEndpoint_UpdateStatus_HeartbeatRecovery(t *testing.T) { From 67411b9214cfe10a815b8d9c95b3bfae381745c9 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Thu, 19 Jan 2023 19:06:11 -0500 Subject: [PATCH 06/13] Update .changelog/15808.txt Co-authored-by: Tim Gross --- .changelog/15808.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changelog/15808.txt b/.changelog/15808.txt index 30371c4f2a8..36e2026b5f6 100644 --- a/.changelog/15808.txt +++ b/.changelog/15808.txt @@ -1,3 +1,3 @@ ```release-note:bug -core: enforce strict steps for clients reconnect +core: enforce strict ordering that node status updates are recorded after allocation updates for reconnecting clients ``` From 952b9712178d17023dc19cde0dac007d0bf6a421 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Fri, 20 Jan 2023 18:14:41 -0500 Subject: [PATCH 07/13] core: update LastAllocUpdateIndex with allocs Update allocs and the LastAllocUpdateIndex in the same Raft transaction to avoid data inconsistency in case the UpdateAlloc request fails midway. --- nomad/node_endpoint.go | 11 - nomad/state/state_store.go | 34 +++ nomad/state/state_store_test.go | 388 ++++++++++++++++++-------------- 3 files changed, 250 insertions(+), 183 deletions(-) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 47c658c2c63..6eae90339b2 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -1339,17 +1339,6 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene return err } - // Update node alloc update index. - copyNode := node.Copy() - copyNode.LastAllocUpdateIndex = future.Index() - - _, _, err = n.srv.raftApply(structs.NodeRegisterRequestType, &structs.NodeRegisterRequest{ - Node: copyNode, - }) - if err != nil { - return fmt.Errorf("node update failed: %v", err) - } - // Setup the response reply.Index = future.Index() return nil diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 948f81e3bbc..7849727c65c 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -3602,8 +3602,13 @@ func (s *StateStore) UpdateAllocsFromClient(msgType structs.MessageType, index u txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() + // Capture all nodes being affected. Alloc updates from clients are batched + // so this request may include allocs from several nodes. + nodeIDs := make(map[string]interface{}) + // Handle each of the updated allocations for _, alloc := range allocs { + nodeIDs[alloc.NodeID] = struct{}{} if err := s.nestedUpdateAllocFromClient(txn, index, alloc); err != nil { return err } @@ -3614,6 +3619,13 @@ func (s *StateStore) UpdateAllocsFromClient(msgType structs.MessageType, index u return fmt.Errorf("index update failed: %v", err) } + // Update the index of when nodes last updated their allocs. + for nodeID := range nodeIDs { + if err := s.updateClientAllocUpdateIndex(txn, index, nodeID); err != nil { + return fmt.Errorf("node update failed: %v", err) + } + } + return txn.Commit() } @@ -3705,6 +3717,28 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *txn, index uint64, alloc * return nil } +func (s *StateStore) updateClientAllocUpdateIndex(txn *txn, index uint64, nodeID string) error { + existing, err := txn.First("nodes", "id", nodeID) + if err != nil { + return fmt.Errorf("node lookup failed: %v", err) + } + if existing == nil { + return nil + } + + node := existing.(*structs.Node) + copyNode := node.Copy() + copyNode.LastAllocUpdateIndex = index + + if err := txn.Insert("nodes", copyNode); err != nil { + return fmt.Errorf("node update failed: %v", err) + } + if err := txn.Insert("index", &IndexEntry{"nodes", txn.Index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + return nil +} + // UpsertAllocs is used to evict a set of allocations and allocate new ones at // the same time. func (s *StateStore) UpsertAllocs(msgType structs.MessageType, index uint64, allocs []*structs.Allocation) error { diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 1f1030ab1d0..e57e95fe12c 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -5089,145 +5089,115 @@ func TestStateStore_UpdateAllocsFromClient(t *testing.T) { ci.Parallel(t) state := testStateStore(t) + + node := mock.Node() + must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 997, node)) + parent := mock.Job() - if err := state.UpsertJob(structs.MsgTypeTestSetup, 998, parent); err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 998, parent)) child := mock.Job() child.Status = "" child.ParentID = parent.ID - if err := state.UpsertJob(structs.MsgTypeTestSetup, 999, child); err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 999, child)) alloc := mock.Alloc() + alloc.NodeID = node.ID alloc.JobID = child.ID alloc.Job = child - - err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc}) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc})) ws := memdb.NewWatchSet() summary, err := state.JobSummaryByID(ws, parent.Namespace, parent.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if summary == nil { - t.Fatalf("nil summary") - } - if summary.JobID != parent.ID { - t.Fatalf("bad summary id: %v", parent.ID) - } - if summary.Children == nil { - t.Fatalf("nil children summary") - } - if summary.Children.Pending != 0 || summary.Children.Running != 1 || summary.Children.Dead != 0 { - t.Fatalf("bad children summary: %v", summary.Children) - } + must.NoError(t, err) + must.NotNil(t, summary) + must.Eq(t, parent.ID, summary.JobID) + must.NotNil(t, summary.Children) + must.Eq(t, 0, summary.Children.Pending) + must.Eq(t, 1, summary.Children.Running) + must.Eq(t, 0, summary.Children.Dead) // Create watchsets so we can test that update fires the watch ws = memdb.NewWatchSet() - if _, err := state.JobSummaryByID(ws, parent.Namespace, parent.ID); err != nil { - t.Fatalf("bad: %v", err) - } + _, err = state.JobSummaryByID(ws, parent.Namespace, parent.ID) + must.NoError(t, err) // Create the delta updates ts := map[string]*structs.TaskState{"web": {State: structs.TaskStateRunning}} update := &structs.Allocation{ ID: alloc.ID, + NodeID: alloc.NodeID, ClientStatus: structs.AllocClientStatusComplete, TaskStates: ts, JobID: alloc.JobID, TaskGroup: alloc.TaskGroup, } err = state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{update}) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, err) - if !watchFired(ws) { - t.Fatalf("bad") - } + must.True(t, watchFired(ws)) ws = memdb.NewWatchSet() summary, err = state.JobSummaryByID(ws, parent.Namespace, parent.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if summary == nil { - t.Fatalf("nil summary") - } - if summary.JobID != parent.ID { - t.Fatalf("bad summary id: %v", parent.ID) - } - if summary.Children == nil { - t.Fatalf("nil children summary") - } - if summary.Children.Pending != 0 || summary.Children.Running != 0 || summary.Children.Dead != 1 { - t.Fatalf("bad children summary: %v", summary.Children) - } + must.NoError(t, err) + must.NotNil(t, summary) + must.Eq(t, parent.ID, summary.JobID) + must.NotNil(t, summary.Children) + must.Eq(t, 0, summary.Children.Pending) + must.Eq(t, 0, summary.Children.Running) + must.Eq(t, 1, summary.Children.Dead) - if watchFired(ws) { - t.Fatalf("bad") - } + must.False(t, watchFired(ws)) } func TestStateStore_UpdateAllocsFromClient_ChildJob(t *testing.T) { ci.Parallel(t) state := testStateStore(t) + + node := mock.Node() + alloc1 := mock.Alloc() - alloc2 := mock.Alloc() + alloc1.NodeID = node.ID - if err := state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc1.Job); err != nil { - t.Fatalf("err: %v", err) - } - if err := state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc2.Job); err != nil { - t.Fatalf("err: %v", err) - } + alloc2 := mock.Alloc() + alloc2.NodeID = node.ID - err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1, alloc2}) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 998, node)) + must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc1.Job)) + must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc2.Job)) + must.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1, alloc2})) // Create watchsets so we can test that update fires the watch watches := make([]memdb.WatchSet, 8) for i := 0; i < 8; i++ { watches[i] = memdb.NewWatchSet() } - if _, err := state.AllocByID(watches[0], alloc1.ID); err != nil { - t.Fatalf("bad: %v", err) - } - if _, err := state.AllocByID(watches[1], alloc2.ID); err != nil { - t.Fatalf("bad: %v", err) - } - if _, err := state.AllocsByEval(watches[2], alloc1.EvalID); err != nil { - t.Fatalf("bad: %v", err) - } - if _, err := state.AllocsByEval(watches[3], alloc2.EvalID); err != nil { - t.Fatalf("bad: %v", err) - } - if _, err := state.AllocsByJob(watches[4], alloc1.Namespace, alloc1.JobID, false); err != nil { - t.Fatalf("bad: %v", err) - } - if _, err := state.AllocsByJob(watches[5], alloc2.Namespace, alloc2.JobID, false); err != nil { - t.Fatalf("bad: %v", err) - } - if _, err := state.AllocsByNode(watches[6], alloc1.NodeID); err != nil { - t.Fatalf("bad: %v", err) - } - if _, err := state.AllocsByNode(watches[7], alloc2.NodeID); err != nil { - t.Fatalf("bad: %v", err) - } + _, err := state.AllocByID(watches[0], alloc1.ID) + must.NoError(t, err) + _, err = state.AllocByID(watches[1], alloc2.ID) + must.NoError(t, err) + + _, err = state.AllocsByEval(watches[2], alloc1.EvalID) + must.NoError(t, err) + _, err = state.AllocsByEval(watches[3], alloc2.EvalID) + must.NoError(t, err) + + _, err = state.AllocsByJob(watches[4], alloc1.Namespace, alloc1.JobID, false) + must.NoError(t, err) + _, err = state.AllocsByJob(watches[5], alloc2.Namespace, alloc2.JobID, false) + must.NoError(t, err) + + _, err = state.AllocsByNode(watches[6], alloc1.NodeID) + must.NoError(t, err) + _, err = state.AllocsByNode(watches[7], alloc2.NodeID) + must.NoError(t, err) // Create the delta updates ts := map[string]*structs.TaskState{"web": {State: structs.TaskStatePending}} update := &structs.Allocation{ ID: alloc1.ID, + NodeID: alloc1.NodeID, ClientStatus: structs.AllocClientStatusFailed, TaskStates: ts, JobID: alloc1.JobID, @@ -5235,6 +5205,7 @@ func TestStateStore_UpdateAllocsFromClient_ChildJob(t *testing.T) { } update2 := &structs.Allocation{ ID: alloc2.ID, + NodeID: alloc2.NodeID, ClientStatus: structs.AllocClientStatusRunning, TaskStates: ts, JobID: alloc2.JobID, @@ -5242,93 +5213,70 @@ func TestStateStore_UpdateAllocsFromClient_ChildJob(t *testing.T) { } err = state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{update, update2}) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, err) - for i, ws := range watches { - if !watchFired(ws) { - t.Fatalf("bad %d", i) - } + for _, ws := range watches { + must.True(t, watchFired(ws)) } ws := memdb.NewWatchSet() out, err := state.AllocByID(ws, alloc1.ID) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, err) alloc1.CreateIndex = 1000 alloc1.ModifyIndex = 1001 alloc1.TaskStates = ts alloc1.ClientStatus = structs.AllocClientStatusFailed - if !reflect.DeepEqual(alloc1, out) { - t.Fatalf("bad: %#v %#v", alloc1, out) - } + must.Eq(t, alloc1, out) out, err = state.AllocByID(ws, alloc2.ID) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, err) alloc2.ModifyIndex = 1000 alloc2.ModifyIndex = 1001 alloc2.ClientStatus = structs.AllocClientStatusRunning alloc2.TaskStates = ts - if !reflect.DeepEqual(alloc2, out) { - t.Fatalf("bad: %#v %#v", alloc2, out) - } + must.Eq(t, alloc2, out) index, err := state.Index("allocs") - if err != nil { - t.Fatalf("err: %v", err) - } - if index != 1001 { - t.Fatalf("bad: %d", index) - } + must.NoError(t, err) + must.Eq(t, 1001, index) // Ensure summaries have been updated summary, err := state.JobSummaryByID(ws, alloc1.Namespace, alloc1.JobID) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, err) + tgSummary := summary.Summary["web"] - if tgSummary.Failed != 1 { - t.Fatalf("expected failed: %v, actual: %v, summary: %#v", 1, tgSummary.Failed, tgSummary) - } + must.Eq(t, 1, tgSummary.Failed) summary2, err := state.JobSummaryByID(ws, alloc2.Namespace, alloc2.JobID) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, err) + tgSummary2 := summary2.Summary["web"] - if tgSummary2.Running != 1 { - t.Fatalf("expected running: %v, actual: %v", 1, tgSummary2.Running) - } + must.Eq(t, 1, tgSummary2.Running) - if watchFired(ws) { - t.Fatalf("bad") - } + must.False(t, watchFired(ws)) } func TestStateStore_UpdateMultipleAllocsFromClient(t *testing.T) { ci.Parallel(t) state := testStateStore(t) + + node := mock.Node() + alloc := mock.Alloc() + alloc.NodeID = node.ID - if err := state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc.Job); err != nil { - t.Fatalf("err: %v", err) - } - err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc}) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 998, node)) + must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc.Job)) + must.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc})) // Create the delta updates ts := map[string]*structs.TaskState{"web": {State: structs.TaskStatePending}} update := &structs.Allocation{ ID: alloc.ID, + NodeID: alloc.NodeID, ClientStatus: structs.AllocClientStatusRunning, TaskStates: ts, JobID: alloc.JobID, @@ -5336,30 +5284,25 @@ func TestStateStore_UpdateMultipleAllocsFromClient(t *testing.T) { } update2 := &structs.Allocation{ ID: alloc.ID, + NodeID: alloc.NodeID, ClientStatus: structs.AllocClientStatusPending, TaskStates: ts, JobID: alloc.JobID, TaskGroup: alloc.TaskGroup, } - err = state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{update, update2}) - if err != nil { - t.Fatalf("err: %v", err) - } + err := state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{update, update2}) + must.NoError(t, err) ws := memdb.NewWatchSet() out, err := state.AllocByID(ws, alloc.ID) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, err) alloc.CreateIndex = 1000 alloc.ModifyIndex = 1001 alloc.TaskStates = ts alloc.ClientStatus = structs.AllocClientStatusPending - if !reflect.DeepEqual(alloc, out) { - t.Fatalf("bad: %#v , actual:%#v", alloc, out) - } + must.Eq(t, alloc, out) summary, err := state.JobSummaryByID(ws, alloc.Namespace, alloc.JobID) expectedSummary := &structs.JobSummary{ @@ -5374,35 +5317,36 @@ func TestStateStore_UpdateMultipleAllocsFromClient(t *testing.T) { CreateIndex: 999, ModifyIndex: 1001, } - if err != nil { - t.Fatalf("err: %v", err) - } - if !reflect.DeepEqual(summary, expectedSummary) { - t.Fatalf("expected: %#v, actual: %#v", expectedSummary, summary) - } + must.NoError(t, err) + must.Eq(t, summary, expectedSummary) } func TestStateStore_UpdateAllocsFromClient_Deployment(t *testing.T) { ci.Parallel(t) - require := require.New(t) state := testStateStore(t) + node := mock.Node() + alloc := mock.Alloc() now := time.Now() + alloc.NodeID = node.ID alloc.CreateTime = now.UnixNano() + pdeadline := 5 * time.Minute deployment := mock.Deployment() deployment.TaskGroups[alloc.TaskGroup].ProgressDeadline = pdeadline alloc.DeploymentID = deployment.ID - require.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc.Job)) - require.Nil(state.UpsertDeployment(1000, deployment)) - require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{alloc})) + must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 998, node)) + must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc.Job)) + must.NoError(t, state.UpsertDeployment(1000, deployment)) + must.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{alloc})) healthy := now.Add(time.Second) update := &structs.Allocation{ ID: alloc.ID, + NodeID: alloc.NodeID, ClientStatus: structs.AllocClientStatusRunning, JobID: alloc.JobID, TaskGroup: alloc.TaskGroup, @@ -5411,29 +5355,33 @@ func TestStateStore_UpdateAllocsFromClient_Deployment(t *testing.T) { Timestamp: healthy, }, } - require.Nil(state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{update})) + must.NoError(t, state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{update})) // Check that the deployment state was updated because the healthy // deployment dout, err := state.DeploymentByID(nil, deployment.ID) - require.Nil(err) - require.NotNil(dout) - require.Len(dout.TaskGroups, 1) + must.NoError(t, err) + must.NotNil(t, dout) + must.MapLen(t, 1, dout.TaskGroups) dstate := dout.TaskGroups[alloc.TaskGroup] - require.NotNil(dstate) - require.Equal(1, dstate.PlacedAllocs) - require.True(healthy.Add(pdeadline).Equal(dstate.RequireProgressBy)) + must.NotNil(t, dstate) + must.Eq(t, 1, dstate.PlacedAllocs) + must.True(t, healthy.Add(pdeadline).Equal(dstate.RequireProgressBy)) } // This tests that the deployment state is merged correctly func TestStateStore_UpdateAllocsFromClient_DeploymentStateMerges(t *testing.T) { ci.Parallel(t) - require := require.New(t) state := testStateStore(t) + + node := mock.Node() + alloc := mock.Alloc() now := time.Now() + alloc.NodeID = node.ID alloc.CreateTime = now.UnixNano() + pdeadline := 5 * time.Minute deployment := mock.Deployment() deployment.TaskGroups[alloc.TaskGroup].ProgressDeadline = pdeadline @@ -5442,12 +5390,14 @@ func TestStateStore_UpdateAllocsFromClient_DeploymentStateMerges(t *testing.T) { Canary: true, } - require.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc.Job)) - require.Nil(state.UpsertDeployment(1000, deployment)) - require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{alloc})) + must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 998, node)) + must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc.Job)) + must.NoError(t, state.UpsertDeployment(1000, deployment)) + must.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{alloc})) update := &structs.Allocation{ ID: alloc.ID, + NodeID: alloc.NodeID, ClientStatus: structs.AllocClientStatusRunning, JobID: alloc.JobID, TaskGroup: alloc.TaskGroup, @@ -5456,15 +5406,109 @@ func TestStateStore_UpdateAllocsFromClient_DeploymentStateMerges(t *testing.T) { Canary: false, }, } - require.Nil(state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{update})) + must.NoError(t, state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{update})) // Check that the merging of the deployment status was correct out, err := state.AllocByID(nil, alloc.ID) - require.Nil(err) - require.NotNil(out) - require.True(out.DeploymentStatus.Canary) - require.NotNil(out.DeploymentStatus.Healthy) - require.True(*out.DeploymentStatus.Healthy) + must.NoError(t, err) + must.NotNil(t, out) + must.True(t, out.DeploymentStatus.Canary) + must.NotNil(t, out.DeploymentStatus.Healthy) + must.True(t, *out.DeploymentStatus.Healthy) +} + +// TestStateStore_UpdateAllocsFromClient_UpdateNodes verifies that the relevant +// node data is updated when clients update their allocs. +func TestStateStore_UpdateAllocsFromClient_UpdateNodes(t *testing.T) { + ci.Parallel(t) + + state := testStateStore(t) + + node1 := mock.Node() + alloc1 := mock.Alloc() + alloc1.NodeID = node1.ID + + node2 := mock.Node() + alloc2 := mock.Alloc() + alloc2.NodeID = node2.ID + + node3 := mock.Node() + alloc3 := mock.Alloc() + alloc3.NodeID = node3.ID + + must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1000, node1)) + must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1001, node2)) + must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1002, node3)) + must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 1003, alloc1.Job)) + must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 1004, alloc2.Job)) + must.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1005, []*structs.Allocation{alloc1, alloc2, alloc3})) + + // Create watches to make sure they fire when nodes are updated. + ws1 := memdb.NewWatchSet() + _, err := state.NodeByID(ws1, node1.ID) + must.NoError(t, err) + + ws2 := memdb.NewWatchSet() + _, err = state.NodeByID(ws2, node2.ID) + must.NoError(t, err) + + ws3 := memdb.NewWatchSet() + _, err = state.NodeByID(ws3, node3.ID) + must.NoError(t, err) + + // Create and apply alloc updates. + // Don't update alloc 3. + updateAlloc1 := &structs.Allocation{ + ID: alloc1.ID, + NodeID: alloc1.NodeID, + ClientStatus: structs.AllocClientStatusRunning, + JobID: alloc1.JobID, + TaskGroup: alloc1.TaskGroup, + } + updateAlloc2 := &structs.Allocation{ + ID: alloc2.ID, + NodeID: alloc2.NodeID, + ClientStatus: structs.AllocClientStatusRunning, + JobID: alloc2.JobID, + TaskGroup: alloc2.TaskGroup, + } + updateAllocNonExisting := &structs.Allocation{ + ID: uuid.Generate(), + NodeID: uuid.Generate(), + ClientStatus: structs.AllocClientStatusRunning, + JobID: uuid.Generate(), + TaskGroup: "group", + } + + err = state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 1005, []*structs.Allocation{ + updateAlloc1, updateAlloc2, updateAllocNonExisting, + }) + must.NoError(t, err) + + // Check that node update watches fired. + must.True(t, watchFired(ws1)) + must.True(t, watchFired(ws2)) + + // Check that node LastAllocUpdateIndex were updated. + ws := memdb.NewWatchSet() + out, err := state.NodeByID(ws, node1.ID) + must.NoError(t, err) + must.NotNil(t, out) + must.Eq(t, 1005, out.LastAllocUpdateIndex) + must.False(t, watchFired(ws)) + + out, err = state.NodeByID(ws, node2.ID) + must.NoError(t, err) + must.NotNil(t, out) + must.Eq(t, 1005, out.LastAllocUpdateIndex) + must.False(t, watchFired(ws)) + + // Node 3 should not be updated. + out, err = state.NodeByID(ws, node3.ID) + must.NoError(t, err) + must.NotNil(t, out) + must.Eq(t, 0, out.LastAllocUpdateIndex) + must.False(t, watchFired(ws)) } func TestStateStore_UpsertAlloc_Alloc(t *testing.T) { From 6e1f1d8c4604bcc06287b788fc056eb3fcc0fd2e Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Fri, 20 Jan 2023 19:28:28 -0500 Subject: [PATCH 08/13] core: update Node.UpdateStatus method doc comment --- nomad/node_endpoint.go | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 6eae90339b2..e51ebc9fbb6 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -439,7 +439,24 @@ func (n *Node) deregister(args *structs.NodeBatchDeregisterRequest, return nil } -// UpdateStatus is used to update the status of a client node +// UpdateStatus is used to update the status of a client node. +// +// Clients with non-terminal allocations must first call UpdateAlloc to be able +// to transition from the initializing status to ready. +// +// ┌────────────────────────────────────── No ───┐ +// │ │ +// ┌──▼───┐ ┌─────────────┐ ┌────────┴────────┐ +// ── Register ─► init ├─ ready ──► Has allocs? ├─ Yes ─► Allocs updated? │ +// └──▲───┘ └─────┬───────┘ └────────┬────────┘ +// │ │ │ +// ready └─ No ─┐ ┌─────── Yes ──┘ +// │ │ │ +// ┌──────┴───────┐ ┌──▼──▼─┐ ┌──────┐ +// │ disconnected ◄─ disconnected ─┤ ready ├─ down ──► down │ +// └──────────────┘ └───▲───┘ └──┬───┘ +// │ │ +// └──── ready ─────┘ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *structs.NodeUpdateResponse) error { isForwarded := args.IsForwarded() if done, err := n.srv.forward("Node.UpdateStatus", args, args, reply); done { From 8202f0b592109d835c768eecc8151149b78609c9 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Fri, 20 Jan 2023 19:53:28 -0500 Subject: [PATCH 09/13] update godoc string for UpdateAlloc --- nomad/node_endpoint.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index e51ebc9fbb6..cf5b4bfaa30 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -1189,8 +1189,11 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, // UpdateAlloc is used to update the client status of an allocation. It should // only be called by clients. // -// Clients must first register and heartbeat successfully before they are able -// to call this method. +// Calling this method returns an error when: +// - The node is not registered in the server yet. Clients must first call the +// Register method. +// - The node status is down or disconnected. Clients must call the +// UpdateStatus method to update its status in the server. func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.GenericResponse) error { // Ensure the connection was initiated by another client if TLS is used. err := validateTLSCertificateLevel(n.srv, n.ctx, tlsCertificateLevelClient) From e52e15f36c4a05119d37c4cca50a6d65ae935911 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Fri, 20 Jan 2023 19:58:08 -0500 Subject: [PATCH 10/13] fix TestFSM_UpdateAllocFromClient_Unblock test --- nomad/fsm_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 9fcdb9f4d23..6ea142c0198 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -1219,6 +1219,7 @@ func TestFSM_UpdateAllocFromClient_Unblock(t *testing.T) { clientAlloc.ClientStatus = structs.AllocClientStatusComplete update2 := &structs.Allocation{ ID: alloc2.ID, + NodeID: alloc2.NodeID, ClientStatus: structs.AllocClientStatusRunning, } From bb0aa1335587d6b4542e69e82fea2f8bfda4b733 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Mon, 23 Jan 2023 16:34:51 -0500 Subject: [PATCH 11/13] fix typo --- nomad/node_endpoint.go | 2 +- nomad/node_endpoint_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index cf5b4bfaa30..b95f17ab993 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -1226,7 +1226,7 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene return fmt.Errorf("node %s not found", nodeID) } if node.UnresponsiveStatus() { - return fmt.Errorf("node %s is not allow to update allocs while in status %s", nodeID, node.Status) + return fmt.Errorf("node %s is not allowed to update allocs while in status %s", nodeID, node.Status) } // Ensure that evals aren't set from client RPCs diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 8e9e2cc01b2..2793f83c9d7 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -2766,7 +2766,7 @@ func TestClientEndpoint_UpdateAlloc_NodeNotReady(t *testing.T) { } var allocUpdateResp structs.NodeAllocsResponse err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp) - require.ErrorContains(t, err, "not allow to update allocs") + require.ErrorContains(t, err, "not allowed to update allocs") // Send request without an explicit node ID. updatedAlloc.NodeID = "" From 5cb8b31cf79b3ee8b3918e1ed74eb92072043c4e Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Mon, 23 Jan 2023 16:43:22 -0500 Subject: [PATCH 12/13] reset LastMissedHeartbeatIndex to zero when node is ready --- nomad/state/state_store.go | 6 ++- nomad/state/state_store_test.go | 96 +++++++++++++++++++++++++++++++++ nomad/structs/structs.go | 4 +- 3 files changed, 103 insertions(+), 3 deletions(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 7849727c65c..80e1b9557fc 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1044,9 +1044,13 @@ func (s *StateStore) updateNodeStatusTxn(txn *txn, nodeID, status string, update copyNode.Status = status copyNode.ModifyIndex = txn.Index - // Update last missed heartbeat if the node became unresponsive. + // Update last missed heartbeat if the node became unresponsive or reset it + // zero if the node became ready. if !existingNode.UnresponsiveStatus() && copyNode.UnresponsiveStatus() { copyNode.LastMissedHeartbeatIndex = txn.Index + } else if existingNode.Status != structs.NodeStatusReady && + copyNode.Status == structs.NodeStatusReady { + copyNode.LastMissedHeartbeatIndex = 0 } // Insert the node diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index e57e95fe12c..6be73d1647a 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -1355,6 +1355,102 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) { require.False(watchFired(ws)) } +func TestStatStore_UpdateNodeStatus_LastMissedHeartbeatIndex(t *testing.T) { + ci.Parallel(t) + + testCases := []struct { + name string + transitions []string + expectedIndexes []uint64 + }{ + { + name: "disconnect", + transitions: []string{ + structs.NodeStatusReady, + structs.NodeStatusDisconnected, + }, + expectedIndexes: []uint64{0, 1001}, + }, + { + name: "reconnect", + transitions: []string{ + structs.NodeStatusReady, + structs.NodeStatusDisconnected, + structs.NodeStatusInit, + structs.NodeStatusReady, + }, + expectedIndexes: []uint64{0, 1001, 1001, 0}, + }, + { + name: "down", + transitions: []string{ + structs.NodeStatusReady, + structs.NodeStatusDown, + }, + expectedIndexes: []uint64{0, 1001}, + }, + { + name: "multiple reconnects", + transitions: []string{ + structs.NodeStatusReady, + structs.NodeStatusDisconnected, + structs.NodeStatusInit, + structs.NodeStatusReady, + structs.NodeStatusDown, + structs.NodeStatusReady, + structs.NodeStatusDisconnected, + structs.NodeStatusInit, + structs.NodeStatusReady, + }, + expectedIndexes: []uint64{0, 1001, 1001, 0, 1004, 0, 1006, 1006, 0}, + }, + { + name: "multiple heartbeats", + transitions: []string{ + structs.NodeStatusReady, + structs.NodeStatusDisconnected, + structs.NodeStatusInit, + structs.NodeStatusReady, + structs.NodeStatusReady, + structs.NodeStatusReady, + }, + expectedIndexes: []uint64{0, 1001, 1001, 0, 0, 0}, + }, + { + name: "delayed alloc update", + transitions: []string{ + structs.NodeStatusReady, + structs.NodeStatusDisconnected, + structs.NodeStatusInit, + structs.NodeStatusInit, + structs.NodeStatusInit, + structs.NodeStatusReady, + }, + expectedIndexes: []uint64{0, 1001, 1001, 1001, 1001, 0}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + state := testStateStore(t) + node := mock.Node() + must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 999, node)) + + for i, status := range tc.transitions { + now := time.Now().UnixNano() + err := state.UpdateNodeStatus(structs.MsgTypeTestSetup, uint64(1000+i), node.ID, status, now, nil) + must.NoError(t, err) + + ws := memdb.NewWatchSet() + out, err := state.NodeByID(ws, node.ID) + must.NoError(t, err) + must.Eq(t, tc.expectedIndexes[i], out.LastMissedHeartbeatIndex) + must.Eq(t, status, out.Status) + } + }) + } +} + func TestStateStore_BatchUpdateNodeDrain(t *testing.T) { ci.Parallel(t) require := require.New(t) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 6c052df4e84..0a7b32f6d08 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2090,8 +2090,8 @@ type Node struct { // LastDrain contains metadata about the most recent drain operation LastDrain *DrainMetadata - // LastMissedHeartbeatIndex stores the Raft index when the node - // last missed a heartbeat. + // LastMissedHeartbeatIndex stores the Raft index when the node last missed + // a heartbeat. It resets to zero once the node is marked as ready again. LastMissedHeartbeatIndex uint64 // LastAllocUpdateIndex stores the Raft index of the last time the node From f479a6ad76919e9ee0fb351d46d384d46e83937c Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Wed, 25 Jan 2023 15:27:41 -0500 Subject: [PATCH 13/13] use go-set --- nomad/state/state_store.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 80e1b9557fc..45030d12e67 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-multierror" + "github.com/hashicorp/go-set" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/structs" @@ -3608,11 +3609,11 @@ func (s *StateStore) UpdateAllocsFromClient(msgType structs.MessageType, index u // Capture all nodes being affected. Alloc updates from clients are batched // so this request may include allocs from several nodes. - nodeIDs := make(map[string]interface{}) + nodeIDs := set.New[string](1) // Handle each of the updated allocations for _, alloc := range allocs { - nodeIDs[alloc.NodeID] = struct{}{} + nodeIDs.Insert(alloc.NodeID) if err := s.nestedUpdateAllocFromClient(txn, index, alloc); err != nil { return err } @@ -3624,7 +3625,7 @@ func (s *StateStore) UpdateAllocsFromClient(msgType structs.MessageType, index u } // Update the index of when nodes last updated their allocs. - for nodeID := range nodeIDs { + for _, nodeID := range nodeIDs.List() { if err := s.updateClientAllocUpdateIndex(txn, index, nodeID); err != nil { return fmt.Errorf("node update failed: %v", err) }