Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update alloc after reconnect and enforece client heartbeat order #15068

Merged
merged 12 commits into from
Nov 4, 2022
7 changes: 7 additions & 0 deletions .changelog/15068.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:bug
scheduler: Fixed a bug that prevented allocations with `max_client_disconnect` set to be updated after they reconnected.
```

```release-note:bug
scheduler: Prevent unnecessary placements when allocations with `max_client_disconnect` set reconnect.
```
8 changes: 6 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2068,6 +2068,7 @@ func (c *Client) allocSync() {

// Send to server.
args := structs.AllocUpdateRequest{
NodeID: c.NodeID(),
Alloc: sync,
WriteRequest: structs.WriteRequest{Region: c.Region()},
}
Expand Down Expand Up @@ -2479,8 +2480,11 @@ func (c *Client) updateAlloc(update *structs.Allocation) {
return
}

// Reconnect unknown allocations
if update.ClientStatus == structs.AllocClientStatusUnknown && update.AllocModifyIndex > ar.Alloc().AllocModifyIndex {
// Reconnect unknown allocations if they were updated and are not terminal.
reconnect := update.ClientStatus == structs.AllocClientStatusUnknown &&
update.AllocModifyIndex > ar.Alloc().AllocModifyIndex &&
!update.ServerTerminalStatus()
if reconnect {
err = ar.Reconnect(update)
if err != nil {
c.logger.Error("error reconnecting alloc", "alloc_id", update.ID, "alloc_modify_index", update.AllocModifyIndex, "error", err)
Expand Down
22 changes: 22 additions & 0 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,8 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest,
}

// UpdateAlloc is used to update the client status of an allocation
// Clients must first register and heartbeat successfully before being able to
// call this method.
func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.GenericResponse) error {
// Ensure the connection was initiated by another client if TLS is used.
err := validateTLSCertificateLevel(n.srv, n.ctx, tlsCertificateLevelClient)
Expand All @@ -1150,6 +1152,26 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
return fmt.Errorf("must update at least one allocation")
}

// Ensure the node is allowed to update allocs.
// The node needs to successfully heartbeat before updating its allocs.
nodeID := args.NodeID
if nodeID == "" {
// COMPAT 1.4
// Maintain backwards compatibility with clients that don't set the
// NodeID field in the request.
nodeID = args.Alloc[0].NodeID
}
node, err := n.srv.State().NodeByID(nil, nodeID)
if err != nil {
return fmt.Errorf("failed to retrieve node %s: %v", args.NodeID, err)
}
if node == nil {
return fmt.Errorf("node %s not found", args.NodeID)
}
if node.Status != structs.NodeStatusReady {
return fmt.Errorf("node %s is %s, not %s", args.NodeID, node.Status, structs.NodeStatusReady)
}

// Ensure that evals aren't set from client RPCs
// We create them here before the raft update
if len(args.Evals) != 0 {
Expand Down
78 changes: 78 additions & 0 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2528,6 +2528,84 @@ func TestClientEndpoint_UpdateAlloc(t *testing.T) {

}

func TestClientEndpoint_UpdateAlloc_NodeNotReady(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Register node.
node := mock.Node()
reg := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)
require.NoError(t, err)

// Inject mock job and allocation.
state := s1.fsm.State()

job := mock.Job()
err = state.UpsertJob(structs.MsgTypeTestSetup, 101, job)
require.NoError(t, err)

alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.TaskGroup = job.TaskGroups[0].Name
alloc.ClientStatus = structs.AllocClientStatusRunning

err = state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID))
require.NoError(t, err)
err = state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc})
require.NoError(t, err)

// Mark node as down.
err = state.UpdateNodeStatus(structs.MsgTypeTestSetup, 101, node.ID, structs.NodeStatusDown, time.Now().UnixNano(), nil)
require.NoError(t, err)

// Try to update alloc.
updatedAlloc := new(structs.Allocation)
*updatedAlloc = *alloc
updatedAlloc.ClientStatus = structs.AllocClientStatusFailed

allocUpdateReq := &structs.AllocUpdateRequest{
NodeID: node.ID,
Alloc: []*structs.Allocation{updatedAlloc},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var allocUpdateResp structs.NodeAllocsResponse
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp)
require.ErrorContains(t, err, "not ready")

// Send request without an explicit node ID.
allocUpdateReq.NodeID = ""
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp)
require.ErrorContains(t, err, "not ready")

// Send request with invalid node ID.
allocUpdateReq.NodeID = "not-valid"
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp)
require.ErrorContains(t, err, "node lookup failed")

// Send request with non-existing node ID.
allocUpdateReq.NodeID = uuid.Generate()
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp)
require.ErrorContains(t, err, "not found")

// Mark node as ready and try again.
err = state.UpdateNodeStatus(structs.MsgTypeTestSetup, 102, node.ID, structs.NodeStatusReady, time.Now().UnixNano(), nil)
require.NoError(t, err)

allocUpdateReq.NodeID = node.ID
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp)
require.NoError(t, err)
}

func TestClientEndpoint_BatchUpdate(t *testing.T) {
ci.Parallel(t)

Expand Down
35 changes: 18 additions & 17 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,10 @@ type ApplyPlanResultsRequest struct {
// to cause evictions or to assign new allocations. Both can be done
// within a single transaction
type AllocUpdateRequest struct {
// NodeID is the ID of the node sending the request.
// It may be empty if this is not an RPC request from a client.
NodeID string

// COMPAT 0.11
// Alloc is the list of new allocations to assign
// Deprecated: Replaced with two separate slices, one containing stopped allocations
Expand Down Expand Up @@ -10393,27 +10397,24 @@ func (a *Allocation) LastUnknown() time.Time {
return lastUnknown.UTC()
}

// Reconnected determines whether a reconnect event has occurred for any task
// and whether that event occurred within the allowable duration specified by MaxClientDisconnect.
func (a *Allocation) Reconnected() (bool, bool) {
var lastReconnect time.Time
for _, taskState := range a.TaskStates {
for _, taskEvent := range taskState.Events {
if taskEvent.Type != TaskClientReconnected {
continue
}
eventTime := time.Unix(0, taskEvent.Time).UTC()
if lastReconnect.IsZero() || lastReconnect.Before(eventTime) {
lastReconnect = eventTime
}
// NeedsToReconnect returns true if the last known ClientStatus value is
// "unknown" and so the allocation did not reconnect yet.
func (a *Allocation) NeedsToReconnect() bool {
disconnected := false

// AllocStates are appended to the list and we only need the latest
// ClientStatus transition, so traverse from the end until we find one.
for i := len(a.AllocStates) - 1; i >= 0; i-- {
s := a.AllocStates[i]
if s.Field != AllocStateFieldClientStatus {
continue
}
}
Comment on lines +10403 to -10410
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes that states are order chronologically, which I think it's fine since entries are added using alloc.AppendState? Another option would be to traverse all values.


if lastReconnect.IsZero() {
return false, false
disconnected = s.Value == AllocClientStatusUnknown
break
}

return true, a.Expired(lastReconnect)
return disconnected
}

func (a *Allocation) ToIdentityClaims(job *Job) *IdentityClaims {
Expand Down
Loading