Skip to content

Commit 3f21aba

Browse files
author
Mahmood Ali
authored
Merge pull request #5746 from hashicorp/b-no-updating-inmem-node
set node.StatusUpdatedAt in raft
2 parents d9ac7c2 + 40b0c64 commit 3f21aba

8 files changed

+54
-25
lines changed

nomad/drainer/watch_nodes_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ func TestNodeDrainWatcher_Remove(t *testing.T) {
8888
require.Equal(n, tracked[n.ID])
8989

9090
// Change the node to be not draining and wait for it to be untracked
91-
require.Nil(state.UpdateNodeDrain(101, n.ID, nil, false, nil))
91+
require.Nil(state.UpdateNodeDrain(101, n.ID, nil, false, 0, nil))
9292
testutil.WaitForResult(func() (bool, error) {
9393
return len(m.Events) == 2, nil
9494
}, func(err error) {
@@ -166,7 +166,7 @@ func TestNodeDrainWatcher_Update(t *testing.T) {
166166
// Change the node to have a new spec
167167
s2 := n.DrainStrategy.Copy()
168168
s2.Deadline += time.Hour
169-
require.Nil(state.UpdateNodeDrain(101, n.ID, s2, false, nil))
169+
require.Nil(state.UpdateNodeDrain(101, n.ID, s2, false, 0, nil))
170170

171171
// Wait for it to be updated
172172
testutil.WaitForResult(func() (bool, error) {

nomad/drainer_shims.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package nomad
22

3-
import "github.com/hashicorp/nomad/nomad/structs"
3+
import (
4+
"time"
5+
6+
"github.com/hashicorp/nomad/nomad/structs"
7+
)
48

59
// drainerShim implements the drainer.RaftApplier interface required by the
610
// NodeDrainer.
@@ -13,6 +17,7 @@ func (d drainerShim) NodesDrainComplete(nodes []string, event *structs.NodeEvent
1317
Updates: make(map[string]*structs.DrainUpdate, len(nodes)),
1418
NodeEvents: make(map[string]*structs.NodeEvent, len(nodes)),
1519
WriteRequest: structs.WriteRequest{Region: d.s.config.Region},
20+
UpdatedAt: time.Now().Unix(),
1621
}
1722

1823
update := &structs.DrainUpdate{}

nomad/fsm.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} {
310310
panic(fmt.Errorf("failed to decode request: %v", err))
311311
}
312312

313-
if err := n.state.UpdateNodeStatus(index, req.NodeID, req.Status, req.NodeEvent); err != nil {
313+
if err := n.state.UpdateNodeStatus(index, req.NodeID, req.Status, req.UpdatedAt, req.NodeEvent); err != nil {
314314
n.logger.Error("UpdateNodeStatus failed", "error", err)
315315
return err
316316
}
@@ -352,7 +352,7 @@ func (n *nomadFSM) applyDrainUpdate(buf []byte, index uint64) interface{} {
352352
}
353353
}
354354

355-
if err := n.state.UpdateNodeDrain(index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.NodeEvent); err != nil {
355+
if err := n.state.UpdateNodeDrain(index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.UpdatedAt, req.NodeEvent); err != nil {
356356
n.logger.Error("UpdateNodeDrain failed", "error", err)
357357
return err
358358
}
@@ -366,7 +366,7 @@ func (n *nomadFSM) applyBatchDrainUpdate(buf []byte, index uint64) interface{} {
366366
panic(fmt.Errorf("failed to decode request: %v", err))
367367
}
368368

369-
if err := n.state.BatchUpdateNodeDrain(index, req.Updates, req.NodeEvents); err != nil {
369+
if err := n.state.BatchUpdateNodeDrain(index, req.UpdatedAt, req.Updates, req.NodeEvents); err != nil {
370370
n.logger.Error("BatchUpdateNodeDrain failed", "error", err)
371371
return err
372372
}
@@ -387,7 +387,7 @@ func (n *nomadFSM) applyNodeEligibilityUpdate(buf []byte, index uint64) interfac
387387
return err
388388
}
389389

390-
if err := n.state.UpdateNodeEligibility(index, req.NodeID, req.Eligibility, req.NodeEvent); err != nil {
390+
if err := n.state.UpdateNodeEligibility(index, req.NodeID, req.Eligibility, req.UpdatedAt, req.NodeEvent); err != nil {
391391
n.logger.Error("UpdateNodeEligibility failed", "error", err)
392392
return err
393393
}

nomad/node_endpoint.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
369369
// to track SecretIDs.
370370

371371
// Update the timestamp of when the node status was updated
372-
node.StatusUpdatedAt = time.Now().Unix()
372+
args.UpdatedAt = time.Now().Unix()
373373

374374
// Commit this update via Raft
375375
var index uint64
@@ -484,6 +484,9 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest,
484484
return fmt.Errorf("node not found")
485485
}
486486

487+
// Update the timestamp of when the node status was updated
488+
args.UpdatedAt = time.Now().Unix()
489+
487490
// COMPAT: Remove in 0.9. Attempt to upgrade the request if it is of the old
488491
// format.
489492
if args.Drain && args.DrainStrategy == nil {
@@ -589,6 +592,9 @@ func (n *Node) UpdateEligibility(args *structs.NodeUpdateEligibilityRequest,
589592
return fmt.Errorf("invalid scheduling eligibility %q", args.Eligibility)
590593
}
591594

595+
// Update the timestamp of when the node status was updated
596+
args.UpdatedAt = time.Now().Unix()
597+
592598
// Construct the node event
593599
args.NodeEvent = structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster)
594600
if node.SchedulingEligibility == args.Eligibility {

nomad/node_endpoint_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -2662,7 +2662,7 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) {
26622662
Deadline: 10 * time.Second,
26632663
},
26642664
}
2665-
errCh <- state.UpdateNodeDrain(3, node.ID, s, false, nil)
2665+
errCh <- state.UpdateNodeDrain(3, node.ID, s, false, 0, nil)
26662666
})
26672667

26682668
req.MinQueryIndex = 2
@@ -2688,7 +2688,7 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) {
26882688

26892689
// Node status update triggers watches
26902690
time.AfterFunc(100*time.Millisecond, func() {
2691-
errCh <- state.UpdateNodeStatus(40, node.ID, structs.NodeStatusDown, nil)
2691+
errCh <- state.UpdateNodeStatus(40, node.ID, structs.NodeStatusDown, 0, nil)
26922692
})
26932693

26942694
req.MinQueryIndex = 38

nomad/state/state_store.go

+10-7
Original file line numberDiff line numberDiff line change
@@ -732,7 +732,7 @@ func (s *StateStore) DeleteNode(index uint64, nodeID string) error {
732732
}
733733

734734
// UpdateNodeStatus is used to update the status of a node
735-
func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, event *structs.NodeEvent) error {
735+
func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, updatedAt int64, event *structs.NodeEvent) error {
736736
txn := s.db.Txn(true)
737737
defer txn.Abort()
738738

@@ -748,6 +748,7 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, event
748748
// Copy the existing node
749749
existingNode := existing.(*structs.Node)
750750
copyNode := existingNode.Copy()
751+
copyNode.StatusUpdatedAt = updatedAt
751752

752753
// Add the event if given
753754
if event != nil {
@@ -771,11 +772,11 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, event
771772
}
772773

773774
// BatchUpdateNodeDrain is used to update the drain of a node set of nodes
774-
func (s *StateStore) BatchUpdateNodeDrain(index uint64, updates map[string]*structs.DrainUpdate, events map[string]*structs.NodeEvent) error {
775+
func (s *StateStore) BatchUpdateNodeDrain(index uint64, updatedAt int64, updates map[string]*structs.DrainUpdate, events map[string]*structs.NodeEvent) error {
775776
txn := s.db.Txn(true)
776777
defer txn.Abort()
777778
for node, update := range updates {
778-
if err := s.updateNodeDrainImpl(txn, index, node, update.DrainStrategy, update.MarkEligible, events[node]); err != nil {
779+
if err := s.updateNodeDrainImpl(txn, index, node, update.DrainStrategy, update.MarkEligible, updatedAt, events[node]); err != nil {
779780
return err
780781
}
781782
}
@@ -785,19 +786,19 @@ func (s *StateStore) BatchUpdateNodeDrain(index uint64, updates map[string]*stru
785786

786787
// UpdateNodeDrain is used to update the drain of a node
787788
func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string,
788-
drain *structs.DrainStrategy, markEligible bool, event *structs.NodeEvent) error {
789+
drain *structs.DrainStrategy, markEligible bool, updatedAt int64, event *structs.NodeEvent) error {
789790

790791
txn := s.db.Txn(true)
791792
defer txn.Abort()
792-
if err := s.updateNodeDrainImpl(txn, index, nodeID, drain, markEligible, event); err != nil {
793+
if err := s.updateNodeDrainImpl(txn, index, nodeID, drain, markEligible, updatedAt, event); err != nil {
793794
return err
794795
}
795796
txn.Commit()
796797
return nil
797798
}
798799

799800
func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID string,
800-
drain *structs.DrainStrategy, markEligible bool, event *structs.NodeEvent) error {
801+
drain *structs.DrainStrategy, markEligible bool, updatedAt int64, event *structs.NodeEvent) error {
801802

802803
// Lookup the node
803804
existing, err := txn.First("nodes", "id", nodeID)
@@ -811,6 +812,7 @@ func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID st
811812
// Copy the existing node
812813
existingNode := existing.(*structs.Node)
813814
copyNode := existingNode.Copy()
815+
copyNode.StatusUpdatedAt = updatedAt
814816

815817
// Add the event if given
816818
if event != nil {
@@ -840,7 +842,7 @@ func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID st
840842
}
841843

842844
// UpdateNodeEligibility is used to update the scheduling eligibility of a node
843-
func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibility string, event *structs.NodeEvent) error {
845+
func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibility string, updatedAt int64, event *structs.NodeEvent) error {
844846

845847
txn := s.db.Txn(true)
846848
defer txn.Abort()
@@ -857,6 +859,7 @@ func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibil
857859
// Copy the existing node
858860
existingNode := existing.(*structs.Node)
859861
copyNode := existingNode.Copy()
862+
copyNode.StatusUpdatedAt = updatedAt
860863

861864
// Add the event if given
862865
if event != nil {

nomad/state/state_store_test.go

+13-8
Original file line numberDiff line numberDiff line change
@@ -857,14 +857,15 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) {
857857
Timestamp: time.Now(),
858858
}
859859

860-
require.NoError(state.UpdateNodeStatus(801, node.ID, structs.NodeStatusReady, event))
860+
require.NoError(state.UpdateNodeStatus(801, node.ID, structs.NodeStatusReady, 70, event))
861861
require.True(watchFired(ws))
862862

863863
ws = memdb.NewWatchSet()
864864
out, err := state.NodeByID(ws, node.ID)
865865
require.NoError(err)
866866
require.Equal(structs.NodeStatusReady, out.Status)
867867
require.EqualValues(801, out.ModifyIndex)
868+
require.EqualValues(70, out.StatusUpdatedAt)
868869
require.Len(out.Events, 2)
869870
require.Equal(event.Message, out.Events[1].Message)
870871

@@ -912,7 +913,7 @@ func TestStateStore_BatchUpdateNodeDrain(t *testing.T) {
912913
n2.ID: event,
913914
}
914915

915-
require.Nil(state.BatchUpdateNodeDrain(1002, update, events))
916+
require.Nil(state.BatchUpdateNodeDrain(1002, 7, update, events))
916917
require.True(watchFired(ws))
917918

918919
ws = memdb.NewWatchSet()
@@ -924,6 +925,7 @@ func TestStateStore_BatchUpdateNodeDrain(t *testing.T) {
924925
require.Equal(out.DrainStrategy, expectedDrain)
925926
require.Len(out.Events, 2)
926927
require.EqualValues(1002, out.ModifyIndex)
928+
require.EqualValues(7, out.StatusUpdatedAt)
927929
}
928930

929931
index, err := state.Index("nodes")
@@ -955,7 +957,7 @@ func TestStateStore_UpdateNodeDrain_Node(t *testing.T) {
955957
Subsystem: structs.NodeEventSubsystemDrain,
956958
Timestamp: time.Now(),
957959
}
958-
require.Nil(state.UpdateNodeDrain(1001, node.ID, expectedDrain, false, event))
960+
require.Nil(state.UpdateNodeDrain(1001, node.ID, expectedDrain, false, 7, event))
959961
require.True(watchFired(ws))
960962

961963
ws = memdb.NewWatchSet()
@@ -966,6 +968,7 @@ func TestStateStore_UpdateNodeDrain_Node(t *testing.T) {
966968
require.Equal(out.DrainStrategy, expectedDrain)
967969
require.Len(out.Events, 2)
968970
require.EqualValues(1001, out.ModifyIndex)
971+
require.EqualValues(7, out.StatusUpdatedAt)
969972

970973
index, err := state.Index("nodes")
971974
require.Nil(err)
@@ -1084,7 +1087,7 @@ func TestStateStore_UpdateNodeDrain_ResetEligiblity(t *testing.T) {
10841087
Subsystem: structs.NodeEventSubsystemDrain,
10851088
Timestamp: time.Now(),
10861089
}
1087-
require.Nil(state.UpdateNodeDrain(1001, node.ID, drain, false, event1))
1090+
require.Nil(state.UpdateNodeDrain(1001, node.ID, drain, false, 7, event1))
10881091
require.True(watchFired(ws))
10891092

10901093
// Remove the drain
@@ -1093,7 +1096,7 @@ func TestStateStore_UpdateNodeDrain_ResetEligiblity(t *testing.T) {
10931096
Subsystem: structs.NodeEventSubsystemDrain,
10941097
Timestamp: time.Now(),
10951098
}
1096-
require.Nil(state.UpdateNodeDrain(1002, node.ID, nil, true, event2))
1099+
require.Nil(state.UpdateNodeDrain(1002, node.ID, nil, true, 9, event2))
10971100

10981101
ws = memdb.NewWatchSet()
10991102
out, err := state.NodeByID(ws, node.ID)
@@ -1103,6 +1106,7 @@ func TestStateStore_UpdateNodeDrain_ResetEligiblity(t *testing.T) {
11031106
require.Equal(out.SchedulingEligibility, structs.NodeSchedulingEligible)
11041107
require.Len(out.Events, 3)
11051108
require.EqualValues(1002, out.ModifyIndex)
1109+
require.EqualValues(9, out.StatusUpdatedAt)
11061110

11071111
index, err := state.Index("nodes")
11081112
require.Nil(err)
@@ -1133,7 +1137,7 @@ func TestStateStore_UpdateNodeEligibility(t *testing.T) {
11331137
Subsystem: structs.NodeEventSubsystemCluster,
11341138
Timestamp: time.Now(),
11351139
}
1136-
require.Nil(state.UpdateNodeEligibility(1001, node.ID, expectedEligibility, event))
1140+
require.Nil(state.UpdateNodeEligibility(1001, node.ID, expectedEligibility, 7, event))
11371141
require.True(watchFired(ws))
11381142

11391143
ws = memdb.NewWatchSet()
@@ -1143,6 +1147,7 @@ func TestStateStore_UpdateNodeEligibility(t *testing.T) {
11431147
require.Len(out.Events, 2)
11441148
require.Equal(out.Events[1], event)
11451149
require.EqualValues(1001, out.ModifyIndex)
1150+
require.EqualValues(7, out.StatusUpdatedAt)
11461151

11471152
index, err := state.Index("nodes")
11481153
require.Nil(err)
@@ -1155,10 +1160,10 @@ func TestStateStore_UpdateNodeEligibility(t *testing.T) {
11551160
Deadline: -1 * time.Second,
11561161
},
11571162
}
1158-
require.Nil(state.UpdateNodeDrain(1002, node.ID, expectedDrain, false, nil))
1163+
require.Nil(state.UpdateNodeDrain(1002, node.ID, expectedDrain, false, 7, nil))
11591164

11601165
// Try to set the node to eligible
1161-
err = state.UpdateNodeEligibility(1003, node.ID, structs.NodeSchedulingEligible, nil)
1166+
err = state.UpdateNodeEligibility(1003, node.ID, structs.NodeSchedulingEligible, 9, nil)
11621167
require.NotNil(err)
11631168
require.Contains(err.Error(), "while it is draining")
11641169
}

nomad/structs/structs.go

+10
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,7 @@ type NodeUpdateStatusRequest struct {
347347
NodeID string
348348
Status string
349349
NodeEvent *NodeEvent
350+
UpdatedAt int64
350351
WriteRequest
351352
}
352353

@@ -367,6 +368,9 @@ type NodeUpdateDrainRequest struct {
367368
// NodeEvent is the event added to the node
368369
NodeEvent *NodeEvent
369370

371+
// UpdatedAt represents server time of receiving request
372+
UpdatedAt int64
373+
370374
WriteRequest
371375
}
372376

@@ -379,6 +383,9 @@ type BatchNodeUpdateDrainRequest struct {
379383
// NodeEvents is a mapping of the node to the event to add to the node
380384
NodeEvents map[string]*NodeEvent
381385

386+
// UpdatedAt represents server time of receiving request
387+
UpdatedAt int64
388+
382389
WriteRequest
383390
}
384391

@@ -399,6 +406,9 @@ type NodeUpdateEligibilityRequest struct {
399406
// NodeEvent is the event added to the node
400407
NodeEvent *NodeEvent
401408

409+
// UpdatedAt represents server time of receiving request
410+
UpdatedAt int64
411+
402412
WriteRequest
403413
}
404414

0 commit comments

Comments
 (0)