Skip to content

Commit 4331b7a

Browse files
committed
core: enforce strict steps for clients reconnect
When a Nomad client that is running an allocation with `max_client_disconnect` set misses a heartbeat the Nomad server will update its status to `disconnected`. Upon reconnecting, the client will make three main RPC calls: - `Node.UpdateStatus` is used to set the client status to `ready`. - `Node.UpdateAlloc` is used to update the client-side information about allocations, such as their `ClientStatus`, task states etc. - `Node.Register` is used to upsert the entire node information, including its status. These calls are made concurrently and are also running in parallel with the scheduler. Depending on the order they run the scheduler may end up with incomplete data when reconciling allocations. For example, a client disconnects and its replacement allocation cannot be placed anywhere else, so there's a pending eval waiting for resources. When this client comes back the order of events may be: 1. Client calls `Node.UpdateStatus` and is now `ready`. 2. Scheduler reconciles allocations and places the replacement alloc to the client. The client is now assigned two allocations: the original alloc that is still `unknown` and the replacement that is `pending`. 3. Client calls `Node.UpdateAlloc` and updates the original alloc to `running`. 4. Scheduler notices too many allocs and stops the replacement. This creates unnecessary placements or, in a different order of events, may leave the job without any allocations running until the whole state is updated and reconciled. To avoid problems like this clients must update _all_ of its relevant information before they can be considered `ready` and available for scheduling. To achieve this goal the RPC endpoints mentioned above have been modified to enforce strict steps for nodes reconnecting: - `Node.Register` does not set the client status anymore. - `Node.UpdateStatus` sets the reconnecting client to the `initializing` status until it successfully calls `Node.UpdateAlloc`. These changes are done server-side to avoid the need of additional coordination between clients and servers. Clients are kept oblivious of these changes and will keep making these calls as they normally would. The verification of whether allocations have been updates is done by storing and comparing the Raft index of the last time the client missed a heartbeat and the last time it updated its allocations.
1 parent e451be7 commit 4331b7a

File tree

5 files changed

+314
-23
lines changed

5 files changed

+314
-23
lines changed

nomad/node_endpoint.go

+40-3
Original file line numberDiff line numberDiff line change
@@ -157,11 +157,17 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp
157157
return err
158158
}
159159

160-
// Check if the SecretID has been tampered with
161160
if originalNode != nil {
161+
// Check if the SecretID has been tampered with
162162
if args.Node.SecretID != originalNode.SecretID && originalNode.SecretID != "" {
163163
return fmt.Errorf("node secret ID does not match. Not registering node.")
164164
}
165+
166+
// Don't allow the Register method to update the node status. Only the
167+
// UpdateStatus method should be able to do this.
168+
if originalNode.Status != "" {
169+
args.Node.Status = originalNode.Status
170+
}
165171
}
166172

167173
// We have a valid node connection, so add the mapping to cache the
@@ -486,6 +492,26 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
486492
// Update the timestamp of when the node status was updated
487493
args.UpdatedAt = time.Now().Unix()
488494

495+
// Compute next status.
496+
switch node.Status {
497+
case structs.NodeStatusInit:
498+
if args.Status == structs.NodeStatusReady {
499+
allocs, err := snap.AllocsByNodeTerminal(ws, args.NodeID, false)
500+
if err != nil {
501+
return fmt.Errorf("failed to query node allocs: %v", err)
502+
}
503+
504+
allocsUpdated := node.LastAllocUpdateIndex > node.LastMissedHeartbeatIndex
505+
if len(allocs) > 0 && !allocsUpdated {
506+
args.Status = structs.NodeStatusInit
507+
}
508+
}
509+
case structs.NodeStatusDisconnected:
510+
if args.Status == structs.NodeStatusReady {
511+
args.Status = structs.NodeStatusInit
512+
}
513+
}
514+
489515
// Commit this update via Raft
490516
var index uint64
491517
if node.Status != args.Status {
@@ -1179,8 +1205,8 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
11791205
if node == nil {
11801206
return fmt.Errorf("node %s not found", nodeID)
11811207
}
1182-
if node.Status != structs.NodeStatusReady {
1183-
return fmt.Errorf("node %s is %s, not %s", nodeID, node.Status, structs.NodeStatusReady)
1208+
if node.UnresponsiveStatus() {
1209+
return fmt.Errorf("node %s is not allow to update allocs while in status %s", nodeID, node.Status)
11841210
}
11851211

11861212
// Ensure that evals aren't set from client RPCs
@@ -1313,6 +1339,17 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
13131339
return err
13141340
}
13151341

1342+
// Update node alloc update index.
1343+
copyNode := node.Copy()
1344+
copyNode.LastAllocUpdateIndex = future.Index()
1345+
1346+
_, _, err = n.srv.raftApply(structs.NodeRegisterRequestType, &structs.NodeRegisterRequest{
1347+
Node: copyNode,
1348+
})
1349+
if err != nil {
1350+
return fmt.Errorf("node update failed: %v", err)
1351+
}
1352+
13161353
// Setup the response
13171354
reply.Index = future.Index()
13181355
return nil

nomad/node_endpoint_test.go

+179-17
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/hashicorp/nomad/testutil"
2424
vapi "github.com/hashicorp/vault/api"
2525
"github.com/kr/pretty"
26+
"github.com/shoenig/test/must"
2627
"github.com/stretchr/testify/assert"
2728
"github.com/stretchr/testify/require"
2829
)
@@ -524,6 +525,171 @@ func TestClientEndpoint_UpdateStatus_Vault(t *testing.T) {
524525
}
525526
}
526527

528+
func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
529+
ci.Parallel(t)
530+
531+
// Setup server with tighther heartbeat so we don't have to wait so long
532+
// for nodes to go down.
533+
heartbeatTTL := time.Duration(500*testutil.TestMultiplier()) * time.Millisecond
534+
s, cleanupS := TestServer(t, func(c *Config) {
535+
c.MinHeartbeatTTL = heartbeatTTL
536+
c.HeartbeatGrace = 2 * heartbeatTTL
537+
})
538+
codec := rpcClient(t, s)
539+
defer cleanupS()
540+
testutil.WaitForLeader(t, s.RPC)
541+
542+
// Register node.
543+
node := mock.Node()
544+
reg := &structs.NodeRegisterRequest{
545+
Node: node,
546+
WriteRequest: structs.WriteRequest{Region: "global"},
547+
}
548+
var nodeUpdateResp structs.NodeUpdateResponse
549+
err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &nodeUpdateResp)
550+
must.NoError(t, err)
551+
552+
// Start heartbeat.
553+
stopHeartbeat := make(chan interface{})
554+
heartbeat := func() {
555+
ticker := time.NewTicker(heartbeatTTL / 2)
556+
for {
557+
select {
558+
case <-stopHeartbeat:
559+
ticker.Stop()
560+
return
561+
case <-ticker.C:
562+
hb := &structs.NodeUpdateStatusRequest{
563+
NodeID: node.ID,
564+
Status: structs.NodeStatusReady,
565+
WriteRequest: structs.WriteRequest{Region: "global"},
566+
}
567+
err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", hb, &nodeUpdateResp)
568+
must.NoError(t, err)
569+
}
570+
}
571+
}
572+
go heartbeat()
573+
574+
// Wait for node to be ready.
575+
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusReady)
576+
577+
// Register job with max_client_disconnect.
578+
job := mock.Job()
579+
job.Constraints = []*structs.Constraint{}
580+
job.TaskGroups[0].Count = 1
581+
job.TaskGroups[0].MaxClientDisconnect = pointer.Of(time.Hour)
582+
job.TaskGroups[0].Constraints = []*structs.Constraint{}
583+
job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
584+
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
585+
"run_for": "10m",
586+
}
587+
588+
jobReq := &structs.JobRegisterRequest{
589+
Job: job,
590+
WriteRequest: structs.WriteRequest{
591+
Region: "global",
592+
Namespace: job.Namespace,
593+
},
594+
}
595+
var jobResp structs.JobRegisterResponse
596+
err = msgpackrpc.CallWithCodec(codec, "Job.Register", jobReq, &jobResp)
597+
must.NoError(t, err)
598+
599+
// Wait for alloc run be pending in the server.
600+
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
601+
structs.AllocClientStatusPending: 1,
602+
})
603+
604+
// Get allocs that node should run.
605+
allocsReq := &structs.NodeSpecificRequest{
606+
NodeID: node.ID,
607+
QueryOptions: structs.QueryOptions{
608+
Region: "global",
609+
},
610+
}
611+
var allocsResp structs.NodeAllocsResponse
612+
err = msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", allocsReq, &allocsResp)
613+
must.NoError(t, err)
614+
must.Len(t, 1, allocsResp.Allocs)
615+
616+
// Tell server the alloc is running.
617+
// Save the alloc so we can reuse the request later.
618+
alloc := allocsResp.Allocs[0].Copy()
619+
alloc.ClientStatus = structs.AllocClientStatusRunning
620+
621+
allocUpdateReq := &structs.AllocUpdateRequest{
622+
Alloc: []*structs.Allocation{alloc},
623+
WriteRequest: structs.WriteRequest{
624+
Region: "global",
625+
},
626+
}
627+
var resp structs.GenericResponse
628+
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &resp)
629+
must.NoError(t, err)
630+
631+
// Wait for alloc run be running in the server.
632+
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
633+
structs.AllocClientStatusRunning: 1,
634+
})
635+
636+
// Stop heartbeat and wait for the client to be disconnected and the alloc
637+
// to be unknown.
638+
close(stopHeartbeat)
639+
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusDisconnected)
640+
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
641+
structs.AllocClientStatusUnknown: 1,
642+
})
643+
644+
// There should be a pending eval for the alloc replacement.
645+
state := s.fsm.State()
646+
ws := memdb.NewWatchSet()
647+
evals, err := state.EvalsByJob(ws, job.Namespace, job.ID)
648+
found := false
649+
for _, eval := range evals {
650+
if eval.Status == structs.EvalStatusPending {
651+
found = true
652+
break
653+
}
654+
}
655+
must.True(t, found)
656+
657+
// Restart heartbeat to reconnect node.
658+
stopHeartbeat = make(chan interface{})
659+
go heartbeat()
660+
661+
// Wait for node to be initializing.
662+
// It must remain initializing until it updates its allocs with the server
663+
// so the scheduler have the necessary information to avoid unnecessary
664+
// placements by the pending eval.
665+
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusInit)
666+
667+
// Get allocs that node should run.
668+
// The node should only have one alloc assigned until it updates its allocs
669+
// status with the server.
670+
allocsReq = &structs.NodeSpecificRequest{
671+
NodeID: node.ID,
672+
QueryOptions: structs.QueryOptions{
673+
Region: "global",
674+
},
675+
}
676+
err = msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", allocsReq, &allocsResp)
677+
must.NoError(t, err)
678+
must.Len(t, 1, allocsResp.Allocs)
679+
680+
// Tell server the alloc is running.
681+
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &resp)
682+
must.NoError(t, err)
683+
684+
// Wait for alloc run be running in the server.
685+
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
686+
structs.AllocClientStatusRunning: 1,
687+
})
688+
689+
// Wait for the client to be ready.
690+
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusReady)
691+
}
692+
527693
func TestClientEndpoint_UpdateStatus_HeartbeatRecovery(t *testing.T) {
528694
ci.Parallel(t)
529695
require := require.New(t)
@@ -639,29 +805,25 @@ func TestClientEndpoint_Register_GetEvals(t *testing.T) {
639805
}
640806

641807
// Transition it to down and then ready
642-
node.Status = structs.NodeStatusDown
643-
reg = &structs.NodeRegisterRequest{
644-
Node: node,
808+
req := &structs.NodeUpdateStatusRequest{
809+
NodeID: node.ID,
810+
Status: structs.NodeStatusDown,
645811
WriteRequest: structs.WriteRequest{Region: "global"},
646812
}
647-
648-
// Fetch the response
649-
if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
813+
if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp); err != nil {
650814
t.Fatalf("err: %v", err)
651815
}
652816

653817
if len(resp.EvalIDs) != 1 {
654818
t.Fatalf("expected one eval; got %#v", resp.EvalIDs)
655819
}
656820

657-
node.Status = structs.NodeStatusReady
658-
reg = &structs.NodeRegisterRequest{
659-
Node: node,
821+
req = &structs.NodeUpdateStatusRequest{
822+
NodeID: node.ID,
823+
Status: structs.NodeStatusReady,
660824
WriteRequest: structs.WriteRequest{Region: "global"},
661825
}
662-
663-
// Fetch the response
664-
if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
826+
if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp); err != nil {
665827
t.Fatalf("err: %v", err)
666828
}
667829

@@ -1369,12 +1531,12 @@ func TestClientEndpoint_Drain_Down(t *testing.T) {
13691531
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp2))
13701532

13711533
// Mark the node as down
1372-
node.Status = structs.NodeStatusDown
1373-
reg = &structs.NodeRegisterRequest{
1374-
Node: node,
1534+
req := &structs.NodeUpdateStatusRequest{
1535+
NodeID: node.ID,
1536+
Status: structs.NodeStatusDown,
13751537
WriteRequest: structs.WriteRequest{Region: "global"},
13761538
}
1377-
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp))
1539+
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp))
13781540

13791541
// Ensure that the allocation has transitioned to lost
13801542
testutil.WaitForResult(func() (bool, error) {
@@ -2581,7 +2743,7 @@ func TestClientEndpoint_UpdateAlloc_NodeNotReady(t *testing.T) {
25812743
}
25822744
var allocUpdateResp structs.NodeAllocsResponse
25832745
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp)
2584-
require.ErrorContains(t, err, "not ready")
2746+
require.ErrorContains(t, err, "not allow to update allocs")
25852747

25862748
// Send request without an explicit node ID.
25872749
updatedAlloc.NodeID = ""

nomad/state/state_store.go

+20
Original file line numberDiff line numberDiff line change
@@ -909,6 +909,11 @@ func upsertNodeTxn(txn *txn, index uint64, node *structs.Node) error {
909909
node.CreateIndex = exist.CreateIndex
910910
node.ModifyIndex = index
911911

912+
// Update last missed heartbeat if the node became unresponsive.
913+
if !exist.UnresponsiveStatus() && node.UnresponsiveStatus() {
914+
node.LastMissedHeartbeatIndex = index
915+
}
916+
912917
// Retain node events that have already been set on the node
913918
node.Events = exist.Events
914919

@@ -923,6 +928,16 @@ func upsertNodeTxn(txn *txn, index uint64, node *structs.Node) error {
923928
node.SchedulingEligibility = exist.SchedulingEligibility // Retain the eligibility
924929
node.DrainStrategy = exist.DrainStrategy // Retain the drain strategy
925930
node.LastDrain = exist.LastDrain // Retain the drain metadata
931+
932+
// Retain the last index the node missed a heartbeat.
933+
if node.LastMissedHeartbeatIndex < exist.LastMissedHeartbeatIndex {
934+
node.LastMissedHeartbeatIndex = exist.LastMissedHeartbeatIndex
935+
}
936+
937+
// Retain the last index the node updated its allocs.
938+
if node.LastAllocUpdateIndex < exist.LastAllocUpdateIndex {
939+
node.LastAllocUpdateIndex = exist.LastAllocUpdateIndex
940+
}
926941
} else {
927942
// Because this is the first time the node is being registered, we should
928943
// also create a node registration event
@@ -1029,6 +1044,11 @@ func (s *StateStore) updateNodeStatusTxn(txn *txn, nodeID, status string, update
10291044
copyNode.Status = status
10301045
copyNode.ModifyIndex = txn.Index
10311046

1047+
// Update last missed heartbeat if the node became unresponsive.
1048+
if !existingNode.UnresponsiveStatus() && copyNode.UnresponsiveStatus() {
1049+
copyNode.LastMissedHeartbeatIndex = txn.Index
1050+
}
1051+
10321052
// Insert the node
10331053
if err := txn.Insert("nodes", copyNode); err != nil {
10341054
return fmt.Errorf("node update failed: %v", err)

nomad/structs/structs.go

+19
Original file line numberDiff line numberDiff line change
@@ -2090,6 +2090,14 @@ type Node struct {
20902090
// LastDrain contains metadata about the most recent drain operation
20912091
LastDrain *DrainMetadata
20922092

2093+
// LastMissedHeartbeatIndex stores the Raft index when the node
2094+
// last missed a heartbeat.
2095+
LastMissedHeartbeatIndex uint64
2096+
2097+
// LastAllocUpdateIndex stores the Raft index of the last time the node
2098+
// updatedd its allocations status.
2099+
LastAllocUpdateIndex uint64
2100+
20932101
// Raft Indexes
20942102
CreateIndex uint64
20952103
ModifyIndex uint64
@@ -2184,6 +2192,17 @@ func (n *Node) Copy() *Node {
21842192
return &nn
21852193
}
21862194

2195+
// UnresponsiveStatus returns true if the node is a status where it is not
2196+
// communicating with the server.
2197+
func (n *Node) UnresponsiveStatus() bool {
2198+
switch n.Status {
2199+
case NodeStatusDown, NodeStatusDisconnected:
2200+
return true
2201+
default:
2202+
return false
2203+
}
2204+
}
2205+
21872206
// TerminalStatus returns if the current status is terminal and
21882207
// will no longer transition.
21892208
func (n *Node) TerminalStatus() bool {

0 commit comments

Comments
 (0)