diff --git a/.changelog/15068.txt b/.changelog/15068.txt new file mode 100644 index 00000000000..4e587b83784 --- /dev/null +++ b/.changelog/15068.txt @@ -0,0 +1,7 @@ +```release-note:bug +scheduler: Fixed a bug that prevented disconnected allocations to be updated after they reconnect. +``` + +```release-note:bug +scheduler: Prevent unnecessary placements when disconnected allocations reconnect. +``` diff --git a/client/client.go b/client/client.go index 8d6f820740a..04348ea40cf 100644 --- a/client/client.go +++ b/client/client.go @@ -2479,8 +2479,11 @@ func (c *Client) updateAlloc(update *structs.Allocation) { return } - // Reconnect unknown allocations - if update.ClientStatus == structs.AllocClientStatusUnknown && update.AllocModifyIndex > ar.Alloc().AllocModifyIndex { + // Reconnect unknown allocations if they were updated and are not terminal. + reconnect := update.ClientStatus == structs.AllocClientStatusUnknown && + update.AllocModifyIndex > ar.Alloc().AllocModifyIndex && + !update.ServerTerminalStatus() + if reconnect { err = ar.Reconnect(update) if err != nil { c.logger.Error("error reconnecting alloc", "alloc_id", update.ID, "alloc_modify_index", update.AllocModifyIndex, "error", err) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 689b1082e77..a0021595548 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -1132,7 +1132,11 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, return n.srv.blockingRPC(&opts) } -// UpdateAlloc is used to update the client status of an allocation +// UpdateAlloc is used to update the client status of an allocation. It should +// only be called by clients. +// +// Clients must first register and heartbeat successfully before they are able +// to call this method. func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.GenericResponse) error { // Ensure the connection was initiated by another client if TLS is used. err := validateTLSCertificateLevel(n.srv, n.ctx, tlsCertificateLevelClient) @@ -1150,6 +1154,24 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene return fmt.Errorf("must update at least one allocation") } + // Ensure the node is allowed to update allocs. + // The node needs to successfully heartbeat before updating its allocs. + nodeID := args.Alloc[0].NodeID + if nodeID == "" { + return fmt.Errorf("missing node ID") + } + + node, err := n.srv.State().NodeByID(nil, nodeID) + if err != nil { + return fmt.Errorf("failed to retrieve node %s: %v", nodeID, err) + } + if node == nil { + return fmt.Errorf("node %s not found", nodeID) + } + if node.Status != structs.NodeStatusReady { + return fmt.Errorf("node %s is %s, not %s", nodeID, node.Status, structs.NodeStatusReady) + } + // Ensure that evals aren't set from client RPCs // We create them here before the raft update if len(args.Evals) != 0 { diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 899f51470cb..082773ec3ea 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -2528,6 +2528,83 @@ func TestClientEndpoint_UpdateAlloc(t *testing.T) { } +func TestClientEndpoint_UpdateAlloc_NodeNotReady(t *testing.T) { + ci.Parallel(t) + + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Register node. + node := mock.Node() + reg := &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp) + require.NoError(t, err) + + // Inject mock job and allocation. + state := s1.fsm.State() + + job := mock.Job() + err = state.UpsertJob(structs.MsgTypeTestSetup, 101, job) + require.NoError(t, err) + + alloc := mock.Alloc() + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.TaskGroup = job.TaskGroups[0].Name + alloc.ClientStatus = structs.AllocClientStatusRunning + + err = state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)) + require.NoError(t, err) + err = state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc}) + require.NoError(t, err) + + // Mark node as down. + err = state.UpdateNodeStatus(structs.MsgTypeTestSetup, 101, node.ID, structs.NodeStatusDown, time.Now().UnixNano(), nil) + require.NoError(t, err) + + // Try to update alloc. + updatedAlloc := new(structs.Allocation) + *updatedAlloc = *alloc + updatedAlloc.ClientStatus = structs.AllocClientStatusFailed + + allocUpdateReq := &structs.AllocUpdateRequest{ + Alloc: []*structs.Allocation{updatedAlloc}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var allocUpdateResp structs.NodeAllocsResponse + err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp) + require.ErrorContains(t, err, "not ready") + + // Send request without an explicit node ID. + updatedAlloc.NodeID = "" + err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp) + require.ErrorContains(t, err, "missing node ID") + + // Send request with invalid node ID. + updatedAlloc.NodeID = "not-valid" + err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp) + require.ErrorContains(t, err, "node lookup failed") + + // Send request with non-existing node ID. + updatedAlloc.NodeID = uuid.Generate() + err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp) + require.ErrorContains(t, err, "not found") + + // Mark node as ready and try again. + err = state.UpdateNodeStatus(structs.MsgTypeTestSetup, 102, node.ID, structs.NodeStatusReady, time.Now().UnixNano(), nil) + require.NoError(t, err) + + updatedAlloc.NodeID = node.ID + err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp) + require.NoError(t, err) +} + func TestClientEndpoint_BatchUpdate(t *testing.T) { ci.Parallel(t) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index a4dad02d973..68f4ee93ef4 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -10393,27 +10393,24 @@ func (a *Allocation) LastUnknown() time.Time { return lastUnknown.UTC() } -// Reconnected determines whether a reconnect event has occurred for any task -// and whether that event occurred within the allowable duration specified by MaxClientDisconnect. -func (a *Allocation) Reconnected() (bool, bool) { - var lastReconnect time.Time - for _, taskState := range a.TaskStates { - for _, taskEvent := range taskState.Events { - if taskEvent.Type != TaskClientReconnected { - continue - } - eventTime := time.Unix(0, taskEvent.Time).UTC() - if lastReconnect.IsZero() || lastReconnect.Before(eventTime) { - lastReconnect = eventTime - } +// NeedsToReconnect returns true if the last known ClientStatus value is +// "unknown" and so the allocation did not reconnect yet. +func (a *Allocation) NeedsToReconnect() bool { + disconnected := false + + // AllocStates are appended to the list and we only need the latest + // ClientStatus transition, so traverse from the end until we find one. + for i := len(a.AllocStates) - 1; i >= 0; i-- { + s := a.AllocStates[i] + if s.Field != AllocStateFieldClientStatus { + continue } - } - if lastReconnect.IsZero() { - return false, false + disconnected = s.Value == AllocClientStatusUnknown + break } - return true, a.Expired(lastReconnect) + return disconnected } func (a *Allocation) ToIdentityClaims(job *Job) *IdentityClaims { diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 72c7c068aa1..d00cddca8b8 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -5515,146 +5515,106 @@ func TestAllocation_Expired(t *testing.T) { } } -func TestAllocation_Reconnected(t *testing.T) { - type testCase struct { - name string - maxDisconnect string - elapsed int - reconnected bool - expired bool - nilJob bool - badTaskGroup bool - mixedTZ bool - noReconnectEvent bool - status string - } +func TestAllocation_NeedsToReconnect(t *testing.T) { + ci.Parallel(t) - testCases := []testCase{ - { - name: "has-expired", - maxDisconnect: "5s", - elapsed: 10, - reconnected: true, - expired: true, - }, - { - name: "has-not-expired", - maxDisconnect: "5s", - elapsed: 3, - reconnected: true, - expired: false, - }, + testCases := []struct { + name string + states []*AllocState + expected bool + }{ { - name: "are-equal", - maxDisconnect: "5s", - elapsed: 5, - reconnected: true, - expired: true, + name: "no state", + expected: false, }, { - name: "nil-job", - maxDisconnect: "5s", - elapsed: 10, - reconnected: true, - expired: false, - nilJob: true, + name: "never disconnected", + states: []*AllocState{}, + expected: false, }, { - name: "bad-task-group", - maxDisconnect: "", - elapsed: 10, - reconnected: true, - expired: false, - badTaskGroup: true, - }, - { - name: "no-max-disconnect", - maxDisconnect: "", - elapsed: 10, - reconnected: true, - expired: false, + name: "disconnected once", + states: []*AllocState{ + { + Field: AllocStateFieldClientStatus, + Value: AllocClientStatusUnknown, + Time: time.Now(), + }, + }, + expected: true, }, { - name: "mixed-utc-has-expired", - maxDisconnect: "5s", - elapsed: 10, - reconnected: true, - expired: true, - mixedTZ: true, + name: "disconnect reconnect disconnect", + states: []*AllocState{ + { + Field: AllocStateFieldClientStatus, + Value: AllocClientStatusUnknown, + Time: time.Now().Add(-2 * time.Minute), + }, + { + Field: AllocStateFieldClientStatus, + Value: AllocClientStatusRunning, + Time: time.Now().Add(-1 * time.Minute), + }, + { + Field: AllocStateFieldClientStatus, + Value: AllocClientStatusUnknown, + Time: time.Now(), + }, + }, + expected: true, }, { - name: "mixed-utc-has-not-expired", - maxDisconnect: "5s", - elapsed: 3, - reconnected: true, - expired: false, - mixedTZ: true, + name: "disconnect multiple times before reconnect", + states: []*AllocState{ + { + Field: AllocStateFieldClientStatus, + Value: AllocClientStatusUnknown, + Time: time.Now().Add(-2 * time.Minute), + }, + { + Field: AllocStateFieldClientStatus, + Value: AllocClientStatusUnknown, + Time: time.Now().Add(-1 * time.Minute), + }, + { + Field: AllocStateFieldClientStatus, + Value: AllocClientStatusRunning, + Time: time.Now(), + }, + }, + expected: false, }, { - name: "no-reconnect-event", - maxDisconnect: "5s", - elapsed: 2, - reconnected: false, - expired: false, - noReconnectEvent: true, + name: "disconnect after multiple updates", + states: []*AllocState{ + { + Field: AllocStateFieldClientStatus, + Value: AllocClientStatusPending, + Time: time.Now().Add(-2 * time.Minute), + }, + { + Field: AllocStateFieldClientStatus, + Value: AllocClientStatusRunning, + Time: time.Now().Add(-1 * time.Minute), + }, + { + Field: AllocStateFieldClientStatus, + Value: AllocClientStatusUnknown, + Time: time.Now(), + }, + }, + expected: true, }, } + for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { alloc := MockAlloc() - var err error - var maxDisconnect time.Duration - - if tc.maxDisconnect != "" { - maxDisconnect, err = time.ParseDuration(tc.maxDisconnect) - require.NoError(t, err) - alloc.Job.TaskGroups[0].MaxClientDisconnect = &maxDisconnect - } - - if tc.nilJob { - alloc.Job = nil - } - - if tc.badTaskGroup { - alloc.TaskGroup = "bad" - } - - alloc.ClientStatus = AllocClientStatusUnknown - if tc.status != "" { - alloc.ClientStatus = tc.status - } - - alloc.AllocStates = []*AllocState{{ - Field: AllocStateFieldClientStatus, - Value: AllocClientStatusUnknown, - Time: time.Now().UTC(), - }} - - now := time.Now().UTC() - if tc.mixedTZ { - var loc *time.Location - loc, err = time.LoadLocation("America/New_York") - require.NoError(t, err) - now = time.Now().In(loc) - } - - ellapsedDuration := time.Duration(tc.elapsed) * time.Second - now = now.Add(ellapsedDuration) - - if !tc.noReconnectEvent { - event := NewTaskEvent(TaskClientReconnected) - event.Time = now.UnixNano() - - alloc.TaskStates = map[string]*TaskState{ - "web": { - Events: []*TaskEvent{event}, - }, - } - } + alloc.AllocStates = tc.states - reconnected, expired := alloc.Reconnected() - require.Equal(t, tc.reconnected, reconnected) - require.Equal(t, tc.expired, expired) + got := alloc.NeedsToReconnect() + require.Equal(t, tc.expected, got) }) } } diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 90fba5427e7..bf1876f7a7b 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -415,6 +415,12 @@ func (s *GenericScheduler) computeJobAllocs() error { s.plan.AppendUnknownAlloc(update) } + // Handle reconnect updates. + // Reconnected allocs have a new AllocState entry. + for _, update := range results.reconnectUpdates { + s.ctx.Plan().AppendAlloc(update, nil) + } + // Handle the in-place updates for _, update := range results.inplaceUpdate { if update.DeploymentID != s.deployment.GetID() { diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index 50a819cf806..da4928f8181 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -1201,7 +1201,12 @@ func (a *allocReconciler) computeReconnecting(reconnecting allocSet) { continue } - a.result.reconnectUpdates[alloc.ID] = alloc + // Record the new ClientStatus to indicate to future evals that the + // alloc has already reconnected. + // Use a copy to prevent mutating the object from statestore. + reconnectedAlloc := alloc.Copy() + reconnectedAlloc.AppendState(structs.AllocStateFieldClientStatus, alloc.ClientStatus) + a.result.reconnectUpdates[reconnectedAlloc.ID] = reconnectedAlloc } } diff --git a/scheduler/reconcile_test.go b/scheduler/reconcile_test.go index de72faff0bf..fc88fb54727 100644 --- a/scheduler/reconcile_test.go +++ b/scheduler/reconcile_test.go @@ -5272,9 +5272,49 @@ func TestReconciler_Node_Disconnect_Updates_Alloc_To_Unknown(t *testing.T) { }) } +func TestReconciler_Disconnect_UpdateJobAfterReconnect(t *testing.T) { + ci.Parallel(t) + + // Create 2 allocs and simulate one have being previously disconnected and + // then reconnected. + job, allocs := buildResumableAllocations(2, structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun, 2) + allocs[0].AllocStates = []*structs.AllocState{ + { + Field: structs.AllocStateFieldClientStatus, + Value: structs.AllocClientStatusUnknown, + Time: time.Now().Add(-5 * time.Minute), + }, + { + Field: structs.AllocStateFieldClientStatus, + Value: structs.AllocClientStatusRunning, + Time: time.Now(), + }, + } + + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnInplace, false, job.ID, job, + nil, allocs, nil, "", 50, true) + results := reconciler.Compute() + + // Assert both allocations will be updated. + assertResults(t, results, &resultExpectation{ + inplace: 2, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + InPlaceUpdate: 2, + }, + }, + }) +} + // Tests that when a node disconnects/reconnects allocations for that node are // reconciled according to the business rules. func TestReconciler_Disconnected_Client(t *testing.T) { + disconnectAllocState := []*structs.AllocState{{ + Field: structs.AllocStateFieldClientStatus, + Value: structs.AllocClientStatusUnknown, + Time: time.Now(), + }} + type testCase struct { name string allocCount int @@ -5282,6 +5322,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { jobVersionIncrement uint64 nodeScoreIncrement float64 disconnectedAllocStatus string + disconnectedAllocStates []*structs.AllocState serverDesiredStatus string isBatch bool nodeStatusDisconnected bool @@ -5299,6 +5340,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { replace: false, disconnectedAllocCount: 2, disconnectedAllocStatus: structs.AllocClientStatusRunning, + disconnectedAllocStates: disconnectAllocState, serverDesiredStatus: structs.AllocDesiredStatusRun, shouldStopOnDisconnectedNode: false, expected: &resultExpectation{ @@ -5316,6 +5358,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { replace: true, disconnectedAllocCount: 1, disconnectedAllocStatus: structs.AllocClientStatusRunning, + disconnectedAllocStates: disconnectAllocState, serverDesiredStatus: structs.AllocDesiredStatusRun, shouldStopOnDisconnectedNode: false, expected: &resultExpectation{ @@ -5335,6 +5378,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { replace: true, disconnectedAllocCount: 1, disconnectedAllocStatus: structs.AllocClientStatusRunning, + disconnectedAllocStates: disconnectAllocState, serverDesiredStatus: structs.AllocDesiredStatusRun, shouldStopOnDisconnectedNode: true, nodeScoreIncrement: 1, @@ -5354,6 +5398,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { replace: true, disconnectedAllocCount: 2, disconnectedAllocStatus: structs.AllocClientStatusFailed, + disconnectedAllocStates: disconnectAllocState, serverDesiredStatus: structs.AllocDesiredStatusRun, shouldStopOnDisconnectedNode: true, expected: &resultExpectation{ @@ -5372,6 +5417,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { replace: false, disconnectedAllocCount: 2, disconnectedAllocStatus: structs.AllocClientStatusFailed, + disconnectedAllocStates: disconnectAllocState, serverDesiredStatus: structs.AllocDesiredStatusRun, shouldStopOnDisconnectedNode: true, expected: &resultExpectation{ @@ -5392,6 +5438,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { replace: false, disconnectedAllocCount: 2, disconnectedAllocStatus: structs.AllocClientStatusComplete, + disconnectedAllocStates: disconnectAllocState, serverDesiredStatus: structs.AllocDesiredStatusRun, isBatch: true, expected: &resultExpectation{ @@ -5408,6 +5455,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { replace: true, disconnectedAllocCount: 2, disconnectedAllocStatus: structs.AllocClientStatusRunning, + disconnectedAllocStates: disconnectAllocState, serverDesiredStatus: structs.AllocDesiredStatusRun, shouldStopOnDisconnectedNode: true, jobVersionIncrement: 1, @@ -5427,6 +5475,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { replace: true, disconnectedAllocCount: 2, disconnectedAllocStatus: structs.AllocClientStatusRunning, + disconnectedAllocStates: disconnectAllocState, serverDesiredStatus: structs.AllocDesiredStatusRun, shouldStopOnDisconnectedNode: true, jobVersionIncrement: 1, @@ -5446,6 +5495,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { replace: true, disconnectedAllocCount: 2, disconnectedAllocStatus: structs.AllocClientStatusRunning, + disconnectedAllocStates: disconnectAllocState, serverDesiredStatus: structs.AllocDesiredStatusRun, failReplacement: true, shouldStopOnDisconnectedNode: true, @@ -5466,6 +5516,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { replace: true, disconnectedAllocCount: 1, disconnectedAllocStatus: structs.AllocClientStatusPending, + disconnectedAllocStates: disconnectAllocState, serverDesiredStatus: structs.AllocDesiredStatusRun, shouldStopOnDisconnectedNode: true, nodeStatusDisconnected: true, @@ -5485,6 +5536,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { replace: true, disconnectedAllocCount: 2, disconnectedAllocStatus: structs.AllocClientStatusUnknown, + disconnectedAllocStates: disconnectAllocState, serverDesiredStatus: structs.AllocDesiredStatusRun, shouldStopOnDisconnectedNode: true, nodeStatusDisconnected: true, @@ -5505,6 +5557,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { replace: false, disconnectedAllocCount: 2, disconnectedAllocStatus: structs.AllocClientStatusRunning, + disconnectedAllocStates: []*structs.AllocState{}, serverDesiredStatus: structs.AllocDesiredStatusRun, nodeStatusDisconnected: true, expected: &resultExpectation{ @@ -5547,24 +5600,11 @@ func TestReconciler_Disconnected_Client(t *testing.T) { if disconnectedAllocCount > 0 { alloc.ClientStatus = tc.disconnectedAllocStatus + alloc.AllocStates = tc.disconnectedAllocStates // Set the node id on all the disconnected allocs to the node under test. alloc.NodeID = testNode.ID alloc.NodeName = "disconnected" - alloc.AllocStates = []*structs.AllocState{{ - Field: structs.AllocStateFieldClientStatus, - Value: structs.AllocClientStatusUnknown, - Time: time.Now(), - }} - - event := structs.NewTaskEvent(structs.TaskClientReconnected) - event.Time = time.Now().UnixNano() - - alloc.TaskStates = map[string]*structs.TaskState{ - alloc.Job.TaskGroups[0].Tasks[0].Name: { - Events: []*structs.TaskEvent{event}, - }, - } disconnectedAllocCount-- } } diff --git a/scheduler/reconcile_util.go b/scheduler/reconcile_util.go index d0eb0a92a18..e820cfafc44 100644 --- a/scheduler/reconcile_util.go +++ b/scheduler/reconcile_util.go @@ -229,21 +229,25 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, serverS // without max_client_disconnect supportsDisconnectedClients := alloc.SupportsDisconnectedClients(serverSupportsDisconnectedClients) - reconnected := false + reconnect := false expired := false - // Only compute reconnected for unknown, running, and failed since they need to go through the reconnect logic. + // Only compute reconnect for unknown, running, and failed since they + // need to go through the reconnect logic. if supportsDisconnectedClients && (alloc.ClientStatus == structs.AllocClientStatusUnknown || alloc.ClientStatus == structs.AllocClientStatusRunning || alloc.ClientStatus == structs.AllocClientStatusFailed) { - reconnected, expired = alloc.Reconnected() + reconnect = alloc.NeedsToReconnect() + if reconnect { + expired = alloc.Expired(now) + } } - // Failed reconnected allocs need to be added to reconnecting so that they - // can be handled as a failed reconnect. + // Failed allocs that need to be reconnected must be added to + // reconnecting so that they can be handled as a failed reconnect. if supportsDisconnectedClients && - reconnected && + reconnect && alloc.DesiredStatus == structs.AllocDesiredStatusRun && alloc.ClientStatus == structs.AllocClientStatusFailed { reconnecting[alloc.ID] = alloc @@ -272,7 +276,7 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, serverS } case structs.NodeStatusReady: // Filter reconnecting allocs on a node that is now connected. - if reconnected { + if reconnect { if expired { lost[alloc.ID] = alloc continue @@ -284,9 +288,9 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, serverS } } - // Terminal allocs, if not reconnected, are always untainted as they + // Terminal allocs, if not reconnect, are always untainted as they // should never be migrated. - if alloc.TerminalStatus() && !reconnected { + if alloc.TerminalStatus() && !reconnect { untainted[alloc.ID] = alloc continue } @@ -311,9 +315,10 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, serverS continue } - // Ignore reconnected failed allocs that have been marked stop by the server. + // Ignore failed allocs that need to be reconnected and that have been + // marked to stop by the server. if supportsDisconnectedClients && - reconnected && + reconnect && alloc.ClientStatus == structs.AllocClientStatusFailed && alloc.DesiredStatus == structs.AllocDesiredStatusStop { ignore[alloc.ID] = alloc @@ -322,7 +327,7 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, serverS if !nodeIsTainted { // Filter allocs on a node that is now re-connected to be resumed. - if reconnected { + if reconnect { if expired { lost[alloc.ID] = alloc continue diff --git a/scheduler/reconcile_util_test.go b/scheduler/reconcile_util_test.go index 21f19814e42..3f50bcb0261 100644 --- a/scheduler/reconcile_util_test.go +++ b/scheduler/reconcile_util_test.go @@ -8,6 +8,7 @@ import ( "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -79,11 +80,16 @@ func TestAllocSet_filterByTainted(t *testing.T) { Time: now.Add(-60 * time.Second), }} - reconnectedEvent := structs.NewTaskEvent(structs.TaskClientReconnected) - reconnectedEvent.Time = time.Now().UnixNano() - reconnectTaskState := map[string]*structs.TaskState{ - testJob.TaskGroups[0].Tasks[0].Name: { - Events: []*structs.TaskEvent{reconnectedEvent}, + reconnectedAllocState := []*structs.AllocState{ + { + Field: structs.AllocStateFieldClientStatus, + Value: structs.AllocClientStatusUnknown, + Time: now.Add(-time.Second), + }, + { + Field: structs.AllocStateFieldClientStatus, + Value: structs.AllocClientStatusRunning, + Time: now, }, } @@ -322,7 +328,6 @@ func TestAllocSet_filterByTainted(t *testing.T) { NodeID: "normal", TaskGroup: "web", AllocStates: unknownAllocState, - TaskStates: reconnectTaskState, }, }, untainted: allocSet{ @@ -349,7 +354,6 @@ func TestAllocSet_filterByTainted(t *testing.T) { NodeID: "normal", TaskGroup: "web", AllocStates: unknownAllocState, - TaskStates: reconnectTaskState, }, }, ignore: allocSet{}, @@ -374,7 +378,6 @@ func TestAllocSet_filterByTainted(t *testing.T) { NodeID: "normal", TaskGroup: "web", AllocStates: unknownAllocState, - TaskStates: reconnectTaskState, }, }, untainted: allocSet{}, @@ -390,7 +393,6 @@ func TestAllocSet_filterByTainted(t *testing.T) { NodeID: "normal", TaskGroup: "web", AllocStates: unknownAllocState, - TaskStates: reconnectTaskState, }, }, ignore: allocSet{}, @@ -413,7 +415,6 @@ func TestAllocSet_filterByTainted(t *testing.T) { NodeID: "normal", TaskGroup: "web", AllocStates: unknownAllocState, - TaskStates: reconnectTaskState, }, // Failed allocs on reconnected nodes are in reconnecting so that // they be marked with desired status stop at the server. @@ -426,7 +427,6 @@ func TestAllocSet_filterByTainted(t *testing.T) { NodeID: "normal", TaskGroup: "web", AllocStates: unknownAllocState, - TaskStates: reconnectTaskState, }, // Lost allocs on reconnected nodes don't get restarted "untainted-reconnect-lost": { @@ -438,7 +438,6 @@ func TestAllocSet_filterByTainted(t *testing.T) { NodeID: "normal", TaskGroup: "web", AllocStates: unknownAllocState, - TaskStates: reconnectTaskState, }, // Replacement allocs that are complete are untainted "untainted-reconnect-complete-replacement": { @@ -461,7 +460,6 @@ func TestAllocSet_filterByTainted(t *testing.T) { Job: testJob, NodeID: "normal", TaskGroup: "web", - AllocStates: unknownAllocState, PreviousAllocation: "reconnecting-failed", }, // Lost replacement allocs on reconnected nodes don't get restarted @@ -487,7 +485,6 @@ func TestAllocSet_filterByTainted(t *testing.T) { NodeID: "normal", TaskGroup: "web", AllocStates: unknownAllocState, - TaskStates: reconnectTaskState, }, "untainted-reconnect-lost": { ID: "untainted-reconnect-lost", @@ -498,7 +495,6 @@ func TestAllocSet_filterByTainted(t *testing.T) { NodeID: "normal", TaskGroup: "web", AllocStates: unknownAllocState, - TaskStates: reconnectTaskState, }, "untainted-reconnect-complete-replacement": { ID: "untainted-reconnect-complete-replacement", @@ -519,7 +515,6 @@ func TestAllocSet_filterByTainted(t *testing.T) { Job: testJob, NodeID: "normal", TaskGroup: "web", - AllocStates: unknownAllocState, PreviousAllocation: "reconnecting-failed", }, "untainted-reconnect-lost-replacement": { @@ -546,7 +541,6 @@ func TestAllocSet_filterByTainted(t *testing.T) { NodeID: "normal", TaskGroup: "web", AllocStates: unknownAllocState, - TaskStates: reconnectTaskState, }, }, ignore: allocSet{}, @@ -611,7 +605,6 @@ func TestAllocSet_filterByTainted(t *testing.T) { Job: testJob, NodeID: "normal", TaskGroup: "web", - TaskStates: reconnectTaskState, AllocStates: expiredAllocState, }, // Failed and stopped allocs on disconnected nodes are ignored @@ -623,7 +616,6 @@ func TestAllocSet_filterByTainted(t *testing.T) { Job: testJob, NodeID: "disconnected", TaskGroup: "web", - TaskStates: reconnectTaskState, AllocStates: unknownAllocState, }, }, @@ -661,7 +653,6 @@ func TestAllocSet_filterByTainted(t *testing.T) { Job: testJob, NodeID: "disconnected", TaskGroup: "web", - TaskStates: reconnectTaskState, AllocStates: unknownAllocState, }, }, @@ -693,7 +684,6 @@ func TestAllocSet_filterByTainted(t *testing.T) { Job: testJob, NodeID: "normal", TaskGroup: "web", - TaskStates: reconnectTaskState, AllocStates: expiredAllocState, }, }, @@ -714,7 +704,6 @@ func TestAllocSet_filterByTainted(t *testing.T) { Job: testJob, NodeID: "normal", TaskGroup: "web", - TaskStates: reconnectTaskState, AllocStates: expiredAllocState, }, }, @@ -732,7 +721,6 @@ func TestAllocSet_filterByTainted(t *testing.T) { Job: testJob, NodeID: "normal", TaskGroup: "web", - TaskStates: reconnectTaskState, AllocStates: expiredAllocState, }, }, @@ -764,7 +752,6 @@ func TestAllocSet_filterByTainted(t *testing.T) { NodeID: "normal", TaskGroup: "web", AllocStates: unknownAllocState, - TaskStates: reconnectTaskState, }, }, untainted: allocSet{ @@ -791,24 +778,62 @@ func TestAllocSet_filterByTainted(t *testing.T) { NodeID: "normal", TaskGroup: "web", AllocStates: unknownAllocState, - TaskStates: reconnectTaskState, }, }, ignore: allocSet{}, lost: allocSet{}, }, + { + // After an alloc is reconnected, it should be considered + // "untainted" instead of "reconnecting" to allow changes such as + // job updates to be applied properly. + name: "disco-client-reconnected-alloc-untainted", + supportsDisconnectedClients: true, + now: time.Now(), + taintedNodes: nodes, + skipNilNodeTest: false, + all: allocSet{ + "running-reconnected": { + ID: "running-reconnected", + Name: "web", + ClientStatus: structs.AllocClientStatusRunning, + DesiredStatus: structs.AllocDesiredStatusRun, + Job: testJob, + NodeID: "normal", + TaskGroup: "web", + AllocStates: reconnectedAllocState, + }, + }, + untainted: allocSet{ + "running-reconnected": { + ID: "running-reconnected", + Name: "web", + ClientStatus: structs.AllocClientStatusRunning, + DesiredStatus: structs.AllocDesiredStatusRun, + Job: testJob, + NodeID: "normal", + TaskGroup: "web", + AllocStates: reconnectedAllocState, + }, + }, + migrate: allocSet{}, + disconnecting: allocSet{}, + reconnecting: allocSet{}, + ignore: allocSet{}, + lost: allocSet{}, + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { // With tainted nodes untainted, migrate, lost, disconnecting, reconnecting, ignore := tc.all.filterByTainted(tc.taintedNodes, tc.supportsDisconnectedClients, tc.now) - require.Equal(t, tc.untainted, untainted, "with-nodes: %s", "untainted") - require.Equal(t, tc.migrate, migrate, "with-nodes: %s", "migrate") - require.Equal(t, tc.lost, lost, "with-nodes: %s", "lost") - require.Equal(t, tc.disconnecting, disconnecting, "with-nodes: %s", "disconnecting") - require.Equal(t, tc.reconnecting, reconnecting, "with-nodes: %s", "reconnecting") - require.Equal(t, tc.ignore, ignore, "with-nodes: %s", "ignore") + assert.Equal(t, tc.untainted, untainted, "with-nodes: %s", "untainted") + assert.Equal(t, tc.migrate, migrate, "with-nodes: %s", "migrate") + assert.Equal(t, tc.lost, lost, "with-nodes: %s", "lost") + assert.Equal(t, tc.disconnecting, disconnecting, "with-nodes: %s", "disconnecting") + assert.Equal(t, tc.reconnecting, reconnecting, "with-nodes: %s", "reconnecting") + assert.Equal(t, tc.ignore, ignore, "with-nodes: %s", "ignore") if tc.skipNilNodeTest { return @@ -816,12 +841,12 @@ func TestAllocSet_filterByTainted(t *testing.T) { // Now again with nodes nil untainted, migrate, lost, disconnecting, reconnecting, ignore = tc.all.filterByTainted(nil, tc.supportsDisconnectedClients, tc.now) - require.Equal(t, tc.untainted, untainted, "nodes-nil: %s", "untainted") - require.Equal(t, tc.migrate, migrate, "nodes-nil: %s", "migrate") - require.Equal(t, tc.lost, lost, "nodes-nil: %s", "lost") - require.Equal(t, tc.disconnecting, disconnecting, "nodes-nil: %s", "disconnecting") - require.Equal(t, tc.reconnecting, reconnecting, "nodes-nil: %s", "reconnecting") - require.Equal(t, tc.ignore, ignore, "nodes-nil: %s", "ignore") + assert.Equal(t, tc.untainted, untainted, "nodes-nil: %s", "untainted") + assert.Equal(t, tc.migrate, migrate, "nodes-nil: %s", "migrate") + assert.Equal(t, tc.lost, lost, "nodes-nil: %s", "lost") + assert.Equal(t, tc.disconnecting, disconnecting, "nodes-nil: %s", "disconnecting") + assert.Equal(t, tc.reconnecting, reconnecting, "nodes-nil: %s", "reconnecting") + assert.Equal(t, tc.ignore, ignore, "nodes-nil: %s", "ignore") }) } } diff --git a/scheduler/scheduler_system.go b/scheduler/scheduler_system.go index 2d5e03bda38..372531c1c46 100644 --- a/scheduler/scheduler_system.go +++ b/scheduler/scheduler_system.go @@ -231,11 +231,7 @@ func (s *SystemScheduler) computeJobAllocs() error { // Diff the required and existing allocations diff := diffSystemAllocs(s.job, s.nodes, s.notReadyNodes, tainted, live, term, s.planner.ServersMeetMinimumVersion(minVersionMaxClientDisconnect, true)) - - s.logger.Debug("reconciled current state with desired state", - "place", len(diff.place), "update", len(diff.update), - "migrate", len(diff.migrate), "stop", len(diff.stop), - "ignore", len(diff.ignore), "lost", len(diff.lost)) + s.logger.Debug("reconciled current state with desired state", "results", log.Fmt("%#v", diff)) // Add all the allocs to stop for _, e := range diff.stop { @@ -257,8 +253,13 @@ func (s *SystemScheduler) computeJobAllocs() error { s.plan.AppendUnknownAlloc(e.Alloc) } - // Attempt to do the upgrades in place - destructiveUpdates, inplaceUpdates := inplaceUpdate(s.ctx, s.eval, s.job, s.stack, diff.update) + // Attempt to do the upgrades in place. + // Reconnecting allocations need to be updated to persists alloc state + // changes. + updates := make([]allocTuple, 0, len(diff.update)+len(diff.reconnecting)) + updates = append(updates, diff.update...) + updates = append(updates, diff.reconnecting...) + destructiveUpdates, inplaceUpdates := inplaceUpdate(s.ctx, s.eval, s.job, s.stack, updates) diff.update = destructiveUpdates if s.eval.AnnotatePlan { diff --git a/scheduler/scheduler_system_test.go b/scheduler/scheduler_system_test.go index bb26d7d695c..14ae817d9d8 100644 --- a/scheduler/scheduler_system_test.go +++ b/scheduler/scheduler_system_test.go @@ -2160,11 +2160,16 @@ func TestSystemSched_NodeDisconnected(t *testing.T) { Time: now.Add(-60 * time.Second), }} - reconnectedEvent := structs.NewTaskEvent(structs.TaskClientReconnected) - reconnectedEvent.Time = time.Now().UnixNano() - systemJobReconnectTaskState := map[string]*structs.TaskState{ - systemJob.TaskGroups[0].Tasks[0].Name: { - Events: []*structs.TaskEvent{reconnectedEvent}, + reconnectedAllocState := []*structs.AllocState{ + { + Field: structs.AllocStateFieldClientStatus, + Value: structs.AllocClientStatusUnknown, + Time: now.Add(-60 * time.Second), + }, + { + Field: structs.AllocStateFieldClientStatus, + Value: structs.AllocClientStatusRunning, + Time: now, }, } @@ -2175,12 +2180,6 @@ func TestSystemSched_NodeDisconnected(t *testing.T) { }, } - sysBatchJobReconnectTaskState := map[string]*structs.TaskState{ - sysBatchJob.TaskGroups[0].Tasks[0].Name: { - Events: []*structs.TaskEvent{reconnectedEvent}, - }, - } - type testCase struct { name string jobType string @@ -2216,7 +2215,6 @@ func TestSystemSched_NodeDisconnected(t *testing.T) { clientStatus: structs.AllocClientStatusRunning, desiredStatus: structs.AllocDesiredStatusRun, allocState: nil, - taskState: nil, expectedPlanCount: 1, expectedNodeAllocation: map[string]*structs.Allocation{ "id": { @@ -2239,8 +2237,7 @@ func TestSystemSched_NodeDisconnected(t *testing.T) { previousTerminal: false, clientStatus: structs.AllocClientStatusRunning, desiredStatus: structs.AllocDesiredStatusRun, - allocState: unknownAllocState, - taskState: systemJobReconnectTaskState, + allocState: reconnectedAllocState, expectedPlanCount: 0, expectedNodeAllocation: nil, expectedNodeUpdate: nil, @@ -2259,7 +2256,6 @@ func TestSystemSched_NodeDisconnected(t *testing.T) { clientStatus: structs.AllocClientStatusUnknown, desiredStatus: structs.AllocDesiredStatusRun, allocState: expiredAllocState, - taskState: systemJobReconnectTaskState, expectedPlanCount: 1, expectedNodeAllocation: nil, expectedNodeUpdate: map[string]*structs.Allocation{ @@ -2283,7 +2279,6 @@ func TestSystemSched_NodeDisconnected(t *testing.T) { clientStatus: structs.AllocClientStatusRunning, desiredStatus: structs.AllocDesiredStatusRun, allocState: nil, - taskState: nil, expectedPlanCount: 1, expectedNodeAllocation: nil, expectedNodeUpdate: map[string]*structs.Allocation{ @@ -2307,7 +2302,6 @@ func TestSystemSched_NodeDisconnected(t *testing.T) { clientStatus: structs.AllocClientStatusRunning, desiredStatus: structs.AllocDesiredStatusRun, allocState: nil, - taskState: nil, expectedPlanCount: 1, expectedNodeAllocation: map[string]*structs.Allocation{ "id": { @@ -2331,7 +2325,6 @@ func TestSystemSched_NodeDisconnected(t *testing.T) { clientStatus: structs.AllocClientStatusUnknown, desiredStatus: structs.AllocDesiredStatusRun, allocState: unknownAllocState, - taskState: nil, expectedPlanCount: 0, expectedNodeAllocation: nil, expectedNodeUpdate: nil, @@ -2350,7 +2343,6 @@ func TestSystemSched_NodeDisconnected(t *testing.T) { clientStatus: structs.AllocClientStatusUnknown, desiredStatus: structs.AllocDesiredStatusRun, allocState: unknownAllocState, - taskState: nil, expectedPlanCount: 0, expectedNodeAllocation: nil, expectedNodeUpdate: nil, @@ -2369,7 +2361,6 @@ func TestSystemSched_NodeDisconnected(t *testing.T) { clientStatus: structs.AllocClientStatusComplete, desiredStatus: structs.AllocDesiredStatusRun, allocState: unknownAllocState, - taskState: nil, expectedPlanCount: 0, expectedNodeAllocation: nil, expectedNodeUpdate: nil, @@ -2387,8 +2378,7 @@ func TestSystemSched_NodeDisconnected(t *testing.T) { previousTerminal: false, clientStatus: structs.AllocClientStatusRunning, desiredStatus: structs.AllocDesiredStatusRun, - allocState: unknownAllocState, - taskState: sysBatchJobReconnectTaskState, + allocState: reconnectedAllocState, expectedPlanCount: 0, expectedNodeAllocation: nil, expectedNodeUpdate: nil, @@ -2406,8 +2396,7 @@ func TestSystemSched_NodeDisconnected(t *testing.T) { previousTerminal: false, clientStatus: structs.AllocClientStatusFailed, desiredStatus: structs.AllocDesiredStatusRun, - allocState: unknownAllocState, - taskState: sysBatchJobReconnectTaskState, + allocState: reconnectedAllocState, expectedPlanCount: 0, expectedNodeAllocation: nil, expectedNodeUpdate: nil, @@ -2425,8 +2414,7 @@ func TestSystemSched_NodeDisconnected(t *testing.T) { previousTerminal: false, clientStatus: structs.AllocClientStatusComplete, desiredStatus: structs.AllocDesiredStatusRun, - allocState: unknownAllocState, - taskState: sysBatchJobReconnectTaskState, + allocState: reconnectedAllocState, expectedPlanCount: 0, expectedNodeAllocation: nil, expectedNodeUpdate: nil, @@ -2445,7 +2433,6 @@ func TestSystemSched_NodeDisconnected(t *testing.T) { clientStatus: structs.AllocClientStatusUnknown, desiredStatus: structs.AllocDesiredStatusRun, allocState: expiredAllocState, - taskState: sysBatchJobReconnectTaskState, expectedPlanCount: 1, expectedNodeAllocation: nil, expectedNodeUpdate: map[string]*structs.Allocation{ @@ -2469,7 +2456,6 @@ func TestSystemSched_NodeDisconnected(t *testing.T) { clientStatus: structs.AllocClientStatusRunning, desiredStatus: structs.AllocDesiredStatusRun, allocState: nil, - taskState: nil, expectedPlanCount: 1, expectedNodeAllocation: nil, expectedNodeUpdate: map[string]*structs.Allocation{ @@ -2493,7 +2479,6 @@ func TestSystemSched_NodeDisconnected(t *testing.T) { clientStatus: structs.AllocClientStatusRunning, desiredStatus: structs.AllocDesiredStatusRun, allocState: nil, - taskState: nil, expectedPlanCount: 1, expectedNodeAllocation: nil, expectedNodeUpdate: map[string]*structs.Allocation{ @@ -2517,7 +2502,6 @@ func TestSystemSched_NodeDisconnected(t *testing.T) { clientStatus: structs.AllocClientStatusRunning, desiredStatus: structs.AllocDesiredStatusRun, allocState: nil, - taskState: nil, expectedPlanCount: 1, expectedNodeAllocation: nil, expectedNodeUpdate: map[string]*structs.Allocation{ @@ -2541,7 +2525,6 @@ func TestSystemSched_NodeDisconnected(t *testing.T) { clientStatus: structs.AllocClientStatusRunning, desiredStatus: structs.AllocDesiredStatusRun, allocState: nil, - taskState: nil, expectedPlanCount: 1, expectedNodeAllocation: nil, expectedNodeUpdate: map[string]*structs.Allocation{ @@ -2706,7 +2689,6 @@ func TestSystemSched_NodeDisconnected(t *testing.T) { clientStatus: structs.AllocClientStatusRunning, desiredStatus: structs.AllocDesiredStatusRun, allocState: nil, - taskState: nil, expectedPlanCount: 1, expectedNodeAllocation: nil, expectedNodeUpdate: map[string]*structs.Allocation{ @@ -2730,7 +2712,6 @@ func TestSystemSched_NodeDisconnected(t *testing.T) { clientStatus: structs.AllocClientStatusRunning, desiredStatus: structs.AllocDesiredStatusRun, allocState: nil, - taskState: nil, expectedPlanCount: 1, expectedNodeAllocation: map[string]*structs.Allocation{ "id": { @@ -2759,7 +2740,6 @@ func TestSystemSched_NodeDisconnected(t *testing.T) { clientStatus: structs.AllocClientStatusRunning, desiredStatus: structs.AllocDesiredStatusRun, allocState: nil, - taskState: nil, expectedPlanCount: 1, expectedNodeAllocation: map[string]*structs.Allocation{ "id": { @@ -2807,7 +2787,6 @@ func TestSystemSched_NodeDisconnected(t *testing.T) { clientStatus: structs.AllocClientStatusRunning, desiredStatus: structs.AllocDesiredStatusRun, allocState: nil, - taskState: nil, expectedPlanCount: 1, expectedNodeAllocation: map[string]*structs.Allocation{ "id": { @@ -2831,7 +2810,6 @@ func TestSystemSched_NodeDisconnected(t *testing.T) { clientStatus: structs.AllocClientStatusRunning, desiredStatus: structs.AllocDesiredStatusRun, allocState: nil, - taskState: nil, expectedPlanCount: 1, expectedNodeAllocation: map[string]*structs.Allocation{ "id": { @@ -2860,7 +2838,6 @@ func TestSystemSched_NodeDisconnected(t *testing.T) { clientStatus: structs.AllocClientStatusRunning, desiredStatus: structs.AllocDesiredStatusRun, allocState: nil, - taskState: nil, expectedPlanCount: 0, expectedNodeAllocation: nil, expectedNodeUpdate: nil, diff --git a/scheduler/util.go b/scheduler/util.go index 6b35943d87c..2db7a3927cf 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -102,12 +102,18 @@ func diffSystemAllocsForNode( supportsDisconnectedClients := exist.SupportsDisconnectedClients(serverSupportsDisconnectedClients) - reconnected := false - // Only compute reconnected for unknown and running since they need to go through the reconnect process. + reconnect := false + expired := false + + // Only compute reconnect for unknown and running since they need to go + // through the reconnect process. if supportsDisconnectedClients && (exist.ClientStatus == structs.AllocClientStatusUnknown || exist.ClientStatus == structs.AllocClientStatusRunning) { - reconnected, _ = exist.Reconnected() + reconnect = exist.NeedsToReconnect() + if reconnect { + expired = exist.Expired(time.Now()) + } } // If we have been marked for migration and aren't terminal, migrate @@ -131,7 +137,7 @@ func diffSystemAllocsForNode( } // Expired unknown allocs are lost. Expired checks that status is unknown. - if supportsDisconnectedClients && exist.Expired(time.Now().UTC()) { + if supportsDisconnectedClients && expired { result.lost = append(result.lost, allocTuple{ Name: name, TaskGroup: tg, @@ -157,11 +163,16 @@ func diffSystemAllocsForNode( // Filter allocs on a node that is now re-connected to reconnecting. if supportsDisconnectedClients && !nodeIsTainted && - reconnected { + reconnect { + + // Record the new ClientStatus to indicate to future evals that the + // alloc has already reconnected. + reconnecting := exist.Copy() + reconnecting.AppendState(structs.AllocStateFieldClientStatus, exist.ClientStatus) result.reconnecting = append(result.reconnecting, allocTuple{ Name: name, TaskGroup: tg, - Alloc: exist, + Alloc: reconnecting, }) continue } diff --git a/scheduler/util_test.go b/scheduler/util_test.go index 4133e9edc18..4fd34cda829 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/hashicorp/nomad/ci" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/hashicorp/nomad/helper/pointer" @@ -292,6 +293,153 @@ func TestDiffSystemAllocsForNode_ExistingAllocIneligibleNode(t *testing.T) { require.Len(t, diff.lost, 0) } +func TestDiffSystemAllocsForNode_DisconnectedNode(t *testing.T) { + ci.Parallel(t) + + // Create job. + job := mock.SystemJob() + job.TaskGroups[0].MaxClientDisconnect = pointer.Of(time.Hour) + + // Create nodes. + readyNode := mock.Node() + readyNode.Status = structs.NodeStatusReady + + disconnectedNode := mock.Node() + disconnectedNode.Status = structs.NodeStatusDisconnected + + eligibleNodes := map[string]*structs.Node{ + readyNode.ID: readyNode, + } + + taintedNodes := map[string]*structs.Node{ + disconnectedNode.ID: disconnectedNode, + } + + // Create allocs. + required := materializeTaskGroups(job) + terminal := make(structs.TerminalByNodeByName) + + type diffResultCount struct { + place, update, migrate, stop, ignore, lost, disconnecting, reconnecting int + } + + testCases := []struct { + name string + node *structs.Node + allocFn func(*structs.Allocation) + expect diffResultCount + }{ + { + name: "alloc in disconnected client is marked as unknown", + node: disconnectedNode, + allocFn: func(alloc *structs.Allocation) { + alloc.ClientStatus = structs.AllocClientStatusRunning + }, + expect: diffResultCount{ + disconnecting: 1, + }, + }, + { + name: "disconnected alloc reconnects", + node: readyNode, + allocFn: func(alloc *structs.Allocation) { + alloc.ClientStatus = structs.AllocClientStatusRunning + + alloc.AllocStates = []*structs.AllocState{{ + Field: structs.AllocStateFieldClientStatus, + Value: structs.AllocClientStatusUnknown, + Time: time.Now().Add(-time.Minute), + }} + }, + expect: diffResultCount{ + reconnecting: 1, + }, + }, + { + name: "alloc not reconnecting after it reconnects", + node: readyNode, + allocFn: func(alloc *structs.Allocation) { + alloc.ClientStatus = structs.AllocClientStatusRunning + + alloc.AllocStates = []*structs.AllocState{ + { + Field: structs.AllocStateFieldClientStatus, + Value: structs.AllocClientStatusUnknown, + Time: time.Now().Add(-time.Minute), + }, + { + Field: structs.AllocStateFieldClientStatus, + Value: structs.AllocClientStatusRunning, + Time: time.Now(), + }, + } + }, + expect: diffResultCount{ + ignore: 1, + }, + }, + { + name: "disconnected alloc is lost after it expires", + node: disconnectedNode, + allocFn: func(alloc *structs.Allocation) { + alloc.ClientStatus = structs.AllocClientStatusUnknown + + alloc.AllocStates = []*structs.AllocState{{ + Field: structs.AllocStateFieldClientStatus, + Value: structs.AllocClientStatusUnknown, + Time: time.Now().Add(-10 * time.Hour), + }} + }, + expect: diffResultCount{ + lost: 1, + }, + }, + { + name: "disconnected allocs are ignored", + node: disconnectedNode, + allocFn: func(alloc *structs.Allocation) { + alloc.ClientStatus = structs.AllocClientStatusUnknown + + alloc.AllocStates = []*structs.AllocState{{ + Field: structs.AllocStateFieldClientStatus, + Value: structs.AllocClientStatusUnknown, + Time: time.Now(), + }} + }, + expect: diffResultCount{ + ignore: 1, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + alloc := mock.AllocForNode(tc.node) + alloc.JobID = job.ID + alloc.Job = job + alloc.Name = fmt.Sprintf("%s.%s[0]", job.Name, job.TaskGroups[0].Name) + + if tc.allocFn != nil { + tc.allocFn(alloc) + } + + got := diffSystemAllocsForNode( + job, tc.node.ID, eligibleNodes, nil, taintedNodes, + required, []*structs.Allocation{alloc}, terminal, true, + ) + + assert.Len(t, got.place, tc.expect.place, "place") + assert.Len(t, got.update, tc.expect.update, "update") + assert.Len(t, got.migrate, tc.expect.migrate, "migrate") + assert.Len(t, got.stop, tc.expect.stop, "stop") + assert.Len(t, got.ignore, tc.expect.ignore, "ignore") + assert.Len(t, got.lost, tc.expect.lost, "lost") + assert.Len(t, got.disconnecting, tc.expect.disconnecting, "disconnecting") + assert.Len(t, got.reconnecting, tc.expect.reconnecting, "reconnecting") + }) + } +} + func TestDiffSystemAllocs(t *testing.T) { ci.Parallel(t)