Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ReconnectModifyIndex to handle reconnect lifecycle #14948

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1348,6 +1348,9 @@ func (ar *allocRunner) Reconnect(update *structs.Allocation) (err error) {
// Build the client allocation
alloc := ar.clientAlloc(states)

// Set the reconnect modify index so that the scheduler can track that the reconnect has not been processed.
alloc.ReconnectModifyIndex = ar.Alloc().AllocModifyIndex
Comment on lines +1351 to +1352
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This value comes from the server:

// AllocModifyIndex is not updated when the client updates allocations. This
// lets the client pull only the allocs updated by the server.

But that made me remember there are two code paths in the state store for updating allocations: one for upserting allocs from the server and one for updating allocs from the client. But in any case neither of them are handling the ReconnectModifyIndex field because for existing allocations (which is what we care about here), we copy the existing Allocation and then merge the needed fields over before inserting.

So we're not actually updating this field in Nomad's state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in client.go line 2033 it get's added to the stripped alloc during allocSync, that then gets sent to Node Update, which then updates state, and triggers an eval, When the eval fires, the index is set. We then have to unset it when applying the plan. Have you tried it out? I had logging in here during development showing it all flowed through.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which then updates state

That's the bit where I don't see how it's happening. Any update of an existing object takes a copy first (ref state_store.go#L3474) and then modifies the copy before inserting it. So if we haven't pulled in information that the client is authoritative on, the state isn't getting updated for the transaction.

I haven't had a chance to test it out thoroughly (still trying to get 1.4.2 out! 😁 ) but I suspect the reason it's "working" right now is because of the state store corruption on line 1223. That'll appear correct under some circumstances but won't have gone thru raft correctly.


// Update the client state store.
err = ar.stateUpdater.PutAllocation(alloc)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2030,6 +2030,7 @@ func (c *Client) AllocStateUpdated(alloc *structs.Allocation) {
stripped.ClientDescription = alloc.ClientDescription
stripped.DeploymentStatus = alloc.DeploymentStatus
stripped.NetworkStatus = alloc.NetworkStatus
stripped.ReconnectModifyIndex = alloc.ReconnectModifyIndex

select {
case c.allocUpdates <- stripped:
Expand Down
1 change: 1 addition & 0 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1220,6 +1220,7 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
if evalTriggerBy != structs.EvalTriggerJobDeregister &&
alloc.ClientStatus == structs.AllocClientStatusUnknown {
evalTriggerBy = structs.EvalTriggerReconnect
alloc.ReconnectModifyIndex = allocToUpdate.ReconnectModifyIndex
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assignment corrupts the state store because alloc hasn't been copied after being queried from the state store. I'm fairly certain this line isn't needed at all, as the allocToUpdate is what's getting added to the batch of updates and not alloc.

}

// If we weren't able to determine one of our expected eval triggers,
Expand Down
16 changes: 12 additions & 4 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9730,6 +9730,9 @@ type Allocation struct {
// lets the client pull only the allocs updated by the server.
AllocModifyIndex uint64

// ReconnectModifyIndex is used to determine if the server has processed the node reconnect.
ReconnectModifyIndex uint64
Comment on lines +9733 to +9734
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make sure this gets onto the api.Allocation struct as well.


// CreateTime is the time the allocation has finished scheduling and been
// verified by the plan applier.
CreateTime int64
Expand Down Expand Up @@ -10393,9 +10396,14 @@ func (a *Allocation) LastUnknown() time.Time {
return lastUnknown.UTC()
}

// Reconnected determines whether a reconnect event has occurred for any task
// and whether that event occurred within the allowable duration specified by MaxClientDisconnect.
func (a *Allocation) Reconnected() (bool, bool) {
// IsReconnecting determines whether a reconnect event has occurred for a task,
// whether that event occurred within the allowable duration specified by MaxClientDisconnect,
// and whether the reconnect has been processed.
func (a *Allocation) IsReconnecting() (isReconnecting bool, expired bool) { // isReconnecting, expired
if a.ReconnectModifyIndex != 0 && a.AllocModifyIndex > a.ReconnectModifyIndex {
isReconnecting = true
}

var lastReconnect time.Time
for _, taskState := range a.TaskStates {
for _, taskEvent := range taskState.Events {
Expand All @@ -10413,7 +10421,7 @@ func (a *Allocation) Reconnected() (bool, bool) {
return false, false
}

return true, a.Expired(lastReconnect)
return isReconnecting, a.Expired(lastReconnect)
}

func (a *Allocation) ToIdentityClaims(job *Job) *IdentityClaims {
Expand Down
116 changes: 67 additions & 49 deletions nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5515,13 +5515,14 @@ func TestAllocation_Expired(t *testing.T) {
}
}

func TestAllocation_Reconnected(t *testing.T) {
func TestAllocation_IsReconnecting(t *testing.T) {
type testCase struct {
name string
maxDisconnect string
elapsed int
reconnected bool
isReconnecting bool
expired bool
alreadyProcessed bool
nilJob bool
badTaskGroup bool
mixedTZ bool
Expand All @@ -5531,77 +5532,88 @@ func TestAllocation_Reconnected(t *testing.T) {

testCases := []testCase{
{
name: "has-expired",
maxDisconnect: "5s",
elapsed: 10,
reconnected: true,
expired: true,
name: "has-expired",
maxDisconnect: "5s",
elapsed: 10,
isReconnecting: true,
expired: true,
},
{
name: "has-not-expired",
maxDisconnect: "5s",
elapsed: 3,
reconnected: true,
expired: false,
name: "has-not-expired",
maxDisconnect: "5s",
elapsed: 3,
isReconnecting: true,
expired: false,
},
{
name: "are-equal",
maxDisconnect: "5s",
elapsed: 5,
reconnected: true,
expired: true,
name: "are-equal",
maxDisconnect: "5s",
elapsed: 5,
isReconnecting: true,
expired: true,
},
{
name: "nil-job",
maxDisconnect: "5s",
elapsed: 10,
reconnected: true,
expired: false,
nilJob: true,
name: "nil-job",
maxDisconnect: "5s",
elapsed: 10,
isReconnecting: true,
expired: false,
nilJob: true,
},
{
name: "bad-task-group",
maxDisconnect: "",
elapsed: 10,
reconnected: true,
expired: false,
badTaskGroup: true,
name: "bad-task-group",
maxDisconnect: "",
elapsed: 10,
isReconnecting: true,
expired: false,
badTaskGroup: true,
},
{
name: "no-max-disconnect",
maxDisconnect: "",
elapsed: 10,
reconnected: true,
expired: false,
name: "no-max-disconnect",
maxDisconnect: "",
elapsed: 10,
isReconnecting: true,
expired: false,
},
{
name: "mixed-utc-has-expired",
maxDisconnect: "5s",
elapsed: 10,
reconnected: true,
expired: true,
mixedTZ: true,
name: "mixed-utc-has-expired",
maxDisconnect: "5s",
elapsed: 10,
isReconnecting: true,
expired: true,
mixedTZ: true,
},
{
name: "mixed-utc-has-not-expired",
maxDisconnect: "5s",
elapsed: 3,
reconnected: true,
expired: false,
mixedTZ: true,
name: "mixed-utc-has-not-expired",
maxDisconnect: "5s",
elapsed: 3,
isReconnecting: true,
expired: false,
mixedTZ: true,
},
{
name: "no-reconnect-event",
maxDisconnect: "5s",
elapsed: 2,
reconnected: false,
isReconnecting: false,
expired: false,
noReconnectEvent: true,
},
{
name: "reconnect-already-processed",
maxDisconnect: "5s",
elapsed: 2,
isReconnecting: false,
expired: false,
noReconnectEvent: false,
alreadyProcessed: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
alloc := MockAlloc()
alloc.AllocModifyIndex = 100

var err error
var maxDisconnect time.Duration

Expand Down Expand Up @@ -5652,8 +5664,14 @@ func TestAllocation_Reconnected(t *testing.T) {
}
}

reconnected, expired := alloc.Reconnected()
require.Equal(t, tc.reconnected, reconnected)
if tc.alreadyProcessed {
alloc.ReconnectModifyIndex = 0
} else {
alloc.ReconnectModifyIndex = alloc.AllocModifyIndex - 1
}

isReconnecting, expired := alloc.IsReconnecting()
require.Equal(t, tc.isReconnecting, isReconnecting)
require.Equal(t, tc.expired, expired)
})
}
Expand Down
6 changes: 6 additions & 0 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,12 @@ func (s *GenericScheduler) computeJobAllocs() error {
s.ctx.Plan().AppendAlloc(update, nil)
}

// Reset the reconnect index so that subsequent evals can tell the reconnect update was already processed
for _, update := range results.reconnectUpdates {
update.ReconnectModifyIndex = 0
s.ctx.Plan().AppendAlloc(update, nil)
}

// Nothing remaining to do if placement is not required
if len(results.place)+len(results.destructiveUpdate) == 0 {
// If the job has been purged we don't have access to the job. Otherwise
Expand Down
2 changes: 2 additions & 0 deletions scheduler/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ func buildAllocations(job *structs.Job, count int, clientStatus, desiredStatus s
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
alloc.ClientStatus = clientStatus
alloc.DesiredStatus = desiredStatus
alloc.AllocModifyIndex = 100

alloc.Metrics = &structs.AllocMetric{
ScoreMetaData: []*structs.NodeScoreMeta{
Expand Down Expand Up @@ -5539,6 +5540,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
// Set alloc state
disconnectedAllocCount := tc.disconnectedAllocCount
for _, alloc := range allocs {
alloc.ReconnectModifyIndex = alloc.AllocModifyIndex - 10
alloc.DesiredStatus = tc.serverDesiredStatus

if tc.maxDisconnect != nil {
Expand Down
22 changes: 11 additions & 11 deletions scheduler/reconcile_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,21 +229,21 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, serverS
// without max_client_disconnect
supportsDisconnectedClients := alloc.SupportsDisconnectedClients(serverSupportsDisconnectedClients)

reconnected := false
isReconnecting := false
expired := false

// Only compute reconnected for unknown, running, and failed since they need to go through the reconnect logic.
// Only compute isReconnecting for unknown, running, and failed since they need to go through the reconnect logic.
if supportsDisconnectedClients &&
(alloc.ClientStatus == structs.AllocClientStatusUnknown ||
alloc.ClientStatus == structs.AllocClientStatusRunning ||
alloc.ClientStatus == structs.AllocClientStatusFailed) {
reconnected, expired = alloc.Reconnected()
isReconnecting, expired = alloc.IsReconnecting()
}

// Failed reconnected allocs need to be added to reconnecting so that they
// Failed reconnecting allocs need to be added to reconnecting so that they
// can be handled as a failed reconnect.
if supportsDisconnectedClients &&
reconnected &&
isReconnecting &&
alloc.DesiredStatus == structs.AllocDesiredStatusRun &&
alloc.ClientStatus == structs.AllocClientStatusFailed {
reconnecting[alloc.ID] = alloc
Expand Down Expand Up @@ -272,7 +272,7 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, serverS
}
case structs.NodeStatusReady:
// Filter reconnecting allocs on a node that is now connected.
if reconnected {
if isReconnecting {
if expired {
lost[alloc.ID] = alloc
continue
Expand All @@ -284,9 +284,9 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, serverS
}
}

// Terminal allocs, if not reconnected, are always untainted as they
// Terminal allocs, if not reconnecting, are always untainted as they
// should never be migrated.
if alloc.TerminalStatus() && !reconnected {
if alloc.TerminalStatus() && !isReconnecting {
untainted[alloc.ID] = alloc
continue
}
Expand All @@ -311,9 +311,9 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, serverS
continue
}

// Ignore reconnected failed allocs that have been marked stop by the server.
// Ignore reconnecting failed allocs that have been marked stop by the server.
if supportsDisconnectedClients &&
reconnected &&
isReconnecting &&
alloc.ClientStatus == structs.AllocClientStatusFailed &&
alloc.DesiredStatus == structs.AllocDesiredStatusStop {
ignore[alloc.ID] = alloc
Expand All @@ -322,7 +322,7 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, serverS

if !nodeIsTainted {
// Filter allocs on a node that is now re-connected to be resumed.
if reconnected {
if isReconnecting {
if expired {
lost[alloc.ID] = alloc
continue
Expand Down
Loading