Skip to content

Commit 952b971

Browse files
committed
core: update LastAllocUpdateIndex with allocs
Update allocs and the LastAllocUpdateIndex in the same Raft transaction to avoid data inconsistency in case the UpdateAlloc request fails midway.
1 parent 67411b9 commit 952b971

File tree

3 files changed

+250
-183
lines changed

3 files changed

+250
-183
lines changed

nomad/node_endpoint.go

-11
Original file line numberDiff line numberDiff line change
@@ -1339,17 +1339,6 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
13391339
return err
13401340
}
13411341

1342-
// Update node alloc update index.
1343-
copyNode := node.Copy()
1344-
copyNode.LastAllocUpdateIndex = future.Index()
1345-
1346-
_, _, err = n.srv.raftApply(structs.NodeRegisterRequestType, &structs.NodeRegisterRequest{
1347-
Node: copyNode,
1348-
})
1349-
if err != nil {
1350-
return fmt.Errorf("node update failed: %v", err)
1351-
}
1352-
13531342
// Setup the response
13541343
reply.Index = future.Index()
13551344
return nil

nomad/state/state_store.go

+34
Original file line numberDiff line numberDiff line change
@@ -3602,8 +3602,13 @@ func (s *StateStore) UpdateAllocsFromClient(msgType structs.MessageType, index u
36023602
txn := s.db.WriteTxnMsgT(msgType, index)
36033603
defer txn.Abort()
36043604

3605+
// Capture all nodes being affected. Alloc updates from clients are batched
3606+
// so this request may include allocs from several nodes.
3607+
nodeIDs := make(map[string]interface{})
3608+
36053609
// Handle each of the updated allocations
36063610
for _, alloc := range allocs {
3611+
nodeIDs[alloc.NodeID] = struct{}{}
36073612
if err := s.nestedUpdateAllocFromClient(txn, index, alloc); err != nil {
36083613
return err
36093614
}
@@ -3614,6 +3619,13 @@ func (s *StateStore) UpdateAllocsFromClient(msgType structs.MessageType, index u
36143619
return fmt.Errorf("index update failed: %v", err)
36153620
}
36163621

3622+
// Update the index of when nodes last updated their allocs.
3623+
for nodeID := range nodeIDs {
3624+
if err := s.updateClientAllocUpdateIndex(txn, index, nodeID); err != nil {
3625+
return fmt.Errorf("node update failed: %v", err)
3626+
}
3627+
}
3628+
36173629
return txn.Commit()
36183630
}
36193631

@@ -3705,6 +3717,28 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *txn, index uint64, alloc *
37053717
return nil
37063718
}
37073719

3720+
func (s *StateStore) updateClientAllocUpdateIndex(txn *txn, index uint64, nodeID string) error {
3721+
existing, err := txn.First("nodes", "id", nodeID)
3722+
if err != nil {
3723+
return fmt.Errorf("node lookup failed: %v", err)
3724+
}
3725+
if existing == nil {
3726+
return nil
3727+
}
3728+
3729+
node := existing.(*structs.Node)
3730+
copyNode := node.Copy()
3731+
copyNode.LastAllocUpdateIndex = index
3732+
3733+
if err := txn.Insert("nodes", copyNode); err != nil {
3734+
return fmt.Errorf("node update failed: %v", err)
3735+
}
3736+
if err := txn.Insert("index", &IndexEntry{"nodes", txn.Index}); err != nil {
3737+
return fmt.Errorf("index update failed: %v", err)
3738+
}
3739+
return nil
3740+
}
3741+
37083742
// UpsertAllocs is used to evict a set of allocations and allocate new ones at
37093743
// the same time.
37103744
func (s *StateStore) UpsertAllocs(msgType structs.MessageType, index uint64, allocs []*structs.Allocation) error {

0 commit comments

Comments
 (0)