From c702353863e23cd845a06da791602afc42e3e315 Mon Sep 17 00:00:00 2001
From: Mahmood Ali <mahmood@hashicorp.com>
Date: Tue, 21 May 2019 15:45:00 -0400
Subject: [PATCH 1/2] set node.StatusUpdatedAt in raft

Fix a case where `node.StatusUpdatedAt` was manipulated directly in
memory.

This ensures that StatusUpdatedAt is set in raft layer, and ensures that
the field is updated when node drain/eligibility is updated too.
---
 nomad/drainer_shims.go          |  7 ++++++-
 nomad/fsm.go                    |  8 ++++----
 nomad/node_endpoint.go          |  8 +++++++-
 nomad/state/state_store.go      | 17 ++++++++++-------
 nomad/state/state_store_test.go | 21 +++++++++++++--------
 nomad/structs/structs.go        | 10 ++++++++++
 6 files changed, 50 insertions(+), 21 deletions(-)

diff --git a/nomad/drainer_shims.go b/nomad/drainer_shims.go
index c9795d5ac4e..1df9b9aa47e 100644
--- a/nomad/drainer_shims.go
+++ b/nomad/drainer_shims.go
@@ -1,6 +1,10 @@
 package nomad
 
-import "github.com/hashicorp/nomad/nomad/structs"
+import (
+	"time"
+
+	"github.com/hashicorp/nomad/nomad/structs"
+)
 
 // drainerShim implements the drainer.RaftApplier interface required by the
 // NodeDrainer.
@@ -13,6 +17,7 @@ func (d drainerShim) NodesDrainComplete(nodes []string, event *structs.NodeEvent
 		Updates:      make(map[string]*structs.DrainUpdate, len(nodes)),
 		NodeEvents:   make(map[string]*structs.NodeEvent, len(nodes)),
 		WriteRequest: structs.WriteRequest{Region: d.s.config.Region},
+		UpdatedAt:    time.Now().Unix(),
 	}
 
 	update := &structs.DrainUpdate{}
diff --git a/nomad/fsm.go b/nomad/fsm.go
index 3c91b8f5c69..e2f47783bfc 100644
--- a/nomad/fsm.go
+++ b/nomad/fsm.go
@@ -310,7 +310,7 @@ func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} {
 		panic(fmt.Errorf("failed to decode request: %v", err))
 	}
 
-	if err := n.state.UpdateNodeStatus(index, req.NodeID, req.Status, req.NodeEvent); err != nil {
+	if err := n.state.UpdateNodeStatus(index, req.NodeID, req.Status, req.UpdatedAt, req.NodeEvent); err != nil {
 		n.logger.Error("UpdateNodeStatus failed", "error", err)
 		return err
 	}
@@ -352,7 +352,7 @@ func (n *nomadFSM) applyDrainUpdate(buf []byte, index uint64) interface{} {
 		}
 	}
 
-	if err := n.state.UpdateNodeDrain(index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.NodeEvent); err != nil {
+	if err := n.state.UpdateNodeDrain(index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.UpdatedAt, req.NodeEvent); err != nil {
 		n.logger.Error("UpdateNodeDrain failed", "error", err)
 		return err
 	}
@@ -366,7 +366,7 @@ func (n *nomadFSM) applyBatchDrainUpdate(buf []byte, index uint64) interface{} {
 		panic(fmt.Errorf("failed to decode request: %v", err))
 	}
 
-	if err := n.state.BatchUpdateNodeDrain(index, req.Updates, req.NodeEvents); err != nil {
+	if err := n.state.BatchUpdateNodeDrain(index, req.UpdatedAt, req.Updates, req.NodeEvents); err != nil {
 		n.logger.Error("BatchUpdateNodeDrain failed", "error", err)
 		return err
 	}
@@ -387,7 +387,7 @@ func (n *nomadFSM) applyNodeEligibilityUpdate(buf []byte, index uint64) interfac
 		return err
 	}
 
-	if err := n.state.UpdateNodeEligibility(index, req.NodeID, req.Eligibility, req.NodeEvent); err != nil {
+	if err := n.state.UpdateNodeEligibility(index, req.NodeID, req.Eligibility, req.UpdatedAt, req.NodeEvent); err != nil {
 		n.logger.Error("UpdateNodeEligibility failed", "error", err)
 		return err
 	}
diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go
index 5e57406f8a0..1fed74bbc28 100644
--- a/nomad/node_endpoint.go
+++ b/nomad/node_endpoint.go
@@ -369,7 +369,7 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
 	// to track SecretIDs.
 
 	// Update the timestamp of when the node status was updated
-	node.StatusUpdatedAt = time.Now().Unix()
+	args.UpdatedAt = time.Now().Unix()
 
 	// Commit this update via Raft
 	var index uint64
@@ -484,6 +484,9 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest,
 		return fmt.Errorf("node not found")
 	}
 
+	// Update the timestamp of when the node status was updated
+	args.UpdatedAt = time.Now().Unix()
+
 	// COMPAT: Remove in 0.9. Attempt to upgrade the request if it is of the old
 	// format.
 	if args.Drain && args.DrainStrategy == nil {
@@ -589,6 +592,9 @@ func (n *Node) UpdateEligibility(args *structs.NodeUpdateEligibilityRequest,
 		return fmt.Errorf("invalid scheduling eligibility %q", args.Eligibility)
 	}
 
+	// Update the timestamp of when the node status was updated
+	args.UpdatedAt = time.Now().Unix()
+
 	// Construct the node event
 	args.NodeEvent = structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster)
 	if node.SchedulingEligibility == args.Eligibility {
diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go
index 8bb3b3352e5..cff0aa2347b 100644
--- a/nomad/state/state_store.go
+++ b/nomad/state/state_store.go
@@ -732,7 +732,7 @@ func (s *StateStore) DeleteNode(index uint64, nodeID string) error {
 }
 
 // UpdateNodeStatus is used to update the status of a node
-func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, event *structs.NodeEvent) error {
+func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, updatedAt int64, event *structs.NodeEvent) error {
 	txn := s.db.Txn(true)
 	defer txn.Abort()
 
@@ -748,6 +748,7 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, event
 	// Copy the existing node
 	existingNode := existing.(*structs.Node)
 	copyNode := existingNode.Copy()
+	copyNode.StatusUpdatedAt = updatedAt
 
 	// Add the event if given
 	if event != nil {
@@ -771,11 +772,11 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, event
 }
 
 // BatchUpdateNodeDrain is used to update the drain of a node set of nodes
-func (s *StateStore) BatchUpdateNodeDrain(index uint64, updates map[string]*structs.DrainUpdate, events map[string]*structs.NodeEvent) error {
+func (s *StateStore) BatchUpdateNodeDrain(index uint64, updatedAt int64, updates map[string]*structs.DrainUpdate, events map[string]*structs.NodeEvent) error {
 	txn := s.db.Txn(true)
 	defer txn.Abort()
 	for node, update := range updates {
-		if err := s.updateNodeDrainImpl(txn, index, node, update.DrainStrategy, update.MarkEligible, events[node]); err != nil {
+		if err := s.updateNodeDrainImpl(txn, index, node, update.DrainStrategy, update.MarkEligible, updatedAt, events[node]); err != nil {
 			return err
 		}
 	}
@@ -785,11 +786,11 @@ func (s *StateStore) BatchUpdateNodeDrain(index uint64, updates map[string]*stru
 
 // UpdateNodeDrain is used to update the drain of a node
 func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string,
-	drain *structs.DrainStrategy, markEligible bool, event *structs.NodeEvent) error {
+	drain *structs.DrainStrategy, markEligible bool, updatedAt int64, event *structs.NodeEvent) error {
 
 	txn := s.db.Txn(true)
 	defer txn.Abort()
-	if err := s.updateNodeDrainImpl(txn, index, nodeID, drain, markEligible, event); err != nil {
+	if err := s.updateNodeDrainImpl(txn, index, nodeID, drain, markEligible, updatedAt, event); err != nil {
 		return err
 	}
 	txn.Commit()
@@ -797,7 +798,7 @@ func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string,
 }
 
 func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID string,
-	drain *structs.DrainStrategy, markEligible bool, event *structs.NodeEvent) error {
+	drain *structs.DrainStrategy, markEligible bool, updatedAt int64, event *structs.NodeEvent) error {
 
 	// Lookup the node
 	existing, err := txn.First("nodes", "id", nodeID)
@@ -811,6 +812,7 @@ func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID st
 	// Copy the existing node
 	existingNode := existing.(*structs.Node)
 	copyNode := existingNode.Copy()
+	copyNode.StatusUpdatedAt = updatedAt
 
 	// Add the event if given
 	if event != nil {
@@ -840,7 +842,7 @@ func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID st
 }
 
 // UpdateNodeEligibility is used to update the scheduling eligibility of a node
-func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibility string, event *structs.NodeEvent) error {
+func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibility string, updatedAt int64, event *structs.NodeEvent) error {
 
 	txn := s.db.Txn(true)
 	defer txn.Abort()
@@ -857,6 +859,7 @@ func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibil
 	// Copy the existing node
 	existingNode := existing.(*structs.Node)
 	copyNode := existingNode.Copy()
+	copyNode.StatusUpdatedAt = updatedAt
 
 	// Add the event if given
 	if event != nil {
diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go
index 3cc7a23ce02..7db7a2c7ff9 100644
--- a/nomad/state/state_store_test.go
+++ b/nomad/state/state_store_test.go
@@ -857,7 +857,7 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) {
 		Timestamp: time.Now(),
 	}
 
-	require.NoError(state.UpdateNodeStatus(801, node.ID, structs.NodeStatusReady, event))
+	require.NoError(state.UpdateNodeStatus(801, node.ID, structs.NodeStatusReady, 70, event))
 	require.True(watchFired(ws))
 
 	ws = memdb.NewWatchSet()
@@ -865,6 +865,7 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) {
 	require.NoError(err)
 	require.Equal(structs.NodeStatusReady, out.Status)
 	require.EqualValues(801, out.ModifyIndex)
+	require.EqualValues(70, out.StatusUpdatedAt)
 	require.Len(out.Events, 2)
 	require.Equal(event.Message, out.Events[1].Message)
 
@@ -912,7 +913,7 @@ func TestStateStore_BatchUpdateNodeDrain(t *testing.T) {
 		n2.ID: event,
 	}
 
-	require.Nil(state.BatchUpdateNodeDrain(1002, update, events))
+	require.Nil(state.BatchUpdateNodeDrain(1002, 7, update, events))
 	require.True(watchFired(ws))
 
 	ws = memdb.NewWatchSet()
@@ -924,6 +925,7 @@ func TestStateStore_BatchUpdateNodeDrain(t *testing.T) {
 		require.Equal(out.DrainStrategy, expectedDrain)
 		require.Len(out.Events, 2)
 		require.EqualValues(1002, out.ModifyIndex)
+		require.EqualValues(7, out.StatusUpdatedAt)
 	}
 
 	index, err := state.Index("nodes")
@@ -955,7 +957,7 @@ func TestStateStore_UpdateNodeDrain_Node(t *testing.T) {
 		Subsystem: structs.NodeEventSubsystemDrain,
 		Timestamp: time.Now(),
 	}
-	require.Nil(state.UpdateNodeDrain(1001, node.ID, expectedDrain, false, event))
+	require.Nil(state.UpdateNodeDrain(1001, node.ID, expectedDrain, false, 7, event))
 	require.True(watchFired(ws))
 
 	ws = memdb.NewWatchSet()
@@ -966,6 +968,7 @@ func TestStateStore_UpdateNodeDrain_Node(t *testing.T) {
 	require.Equal(out.DrainStrategy, expectedDrain)
 	require.Len(out.Events, 2)
 	require.EqualValues(1001, out.ModifyIndex)
+	require.EqualValues(7, out.StatusUpdatedAt)
 
 	index, err := state.Index("nodes")
 	require.Nil(err)
@@ -1084,7 +1087,7 @@ func TestStateStore_UpdateNodeDrain_ResetEligiblity(t *testing.T) {
 		Subsystem: structs.NodeEventSubsystemDrain,
 		Timestamp: time.Now(),
 	}
-	require.Nil(state.UpdateNodeDrain(1001, node.ID, drain, false, event1))
+	require.Nil(state.UpdateNodeDrain(1001, node.ID, drain, false, 7, event1))
 	require.True(watchFired(ws))
 
 	// Remove the drain
@@ -1093,7 +1096,7 @@ func TestStateStore_UpdateNodeDrain_ResetEligiblity(t *testing.T) {
 		Subsystem: structs.NodeEventSubsystemDrain,
 		Timestamp: time.Now(),
 	}
-	require.Nil(state.UpdateNodeDrain(1002, node.ID, nil, true, event2))
+	require.Nil(state.UpdateNodeDrain(1002, node.ID, nil, true, 9, event2))
 
 	ws = memdb.NewWatchSet()
 	out, err := state.NodeByID(ws, node.ID)
@@ -1103,6 +1106,7 @@ func TestStateStore_UpdateNodeDrain_ResetEligiblity(t *testing.T) {
 	require.Equal(out.SchedulingEligibility, structs.NodeSchedulingEligible)
 	require.Len(out.Events, 3)
 	require.EqualValues(1002, out.ModifyIndex)
+	require.EqualValues(9, out.StatusUpdatedAt)
 
 	index, err := state.Index("nodes")
 	require.Nil(err)
@@ -1133,7 +1137,7 @@ func TestStateStore_UpdateNodeEligibility(t *testing.T) {
 		Subsystem: structs.NodeEventSubsystemCluster,
 		Timestamp: time.Now(),
 	}
-	require.Nil(state.UpdateNodeEligibility(1001, node.ID, expectedEligibility, event))
+	require.Nil(state.UpdateNodeEligibility(1001, node.ID, expectedEligibility, 7, event))
 	require.True(watchFired(ws))
 
 	ws = memdb.NewWatchSet()
@@ -1143,6 +1147,7 @@ func TestStateStore_UpdateNodeEligibility(t *testing.T) {
 	require.Len(out.Events, 2)
 	require.Equal(out.Events[1], event)
 	require.EqualValues(1001, out.ModifyIndex)
+	require.EqualValues(7, out.StatusUpdatedAt)
 
 	index, err := state.Index("nodes")
 	require.Nil(err)
@@ -1155,10 +1160,10 @@ func TestStateStore_UpdateNodeEligibility(t *testing.T) {
 			Deadline: -1 * time.Second,
 		},
 	}
-	require.Nil(state.UpdateNodeDrain(1002, node.ID, expectedDrain, false, nil))
+	require.Nil(state.UpdateNodeDrain(1002, node.ID, expectedDrain, false, 7, nil))
 
 	// Try to set the node to eligible
-	err = state.UpdateNodeEligibility(1003, node.ID, structs.NodeSchedulingEligible, nil)
+	err = state.UpdateNodeEligibility(1003, node.ID, structs.NodeSchedulingEligible, 9, nil)
 	require.NotNil(err)
 	require.Contains(err.Error(), "while it is draining")
 }
diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go
index 33c81dd7e24..b1d259c9ecf 100644
--- a/nomad/structs/structs.go
+++ b/nomad/structs/structs.go
@@ -347,6 +347,7 @@ type NodeUpdateStatusRequest struct {
 	NodeID    string
 	Status    string
 	NodeEvent *NodeEvent
+	UpdatedAt int64
 	WriteRequest
 }
 
@@ -367,6 +368,9 @@ type NodeUpdateDrainRequest struct {
 	// NodeEvent is the event added to the node
 	NodeEvent *NodeEvent
 
+	// UpdatedAt represents server time of receiving request
+	UpdatedAt int64
+
 	WriteRequest
 }
 
@@ -379,6 +383,9 @@ type BatchNodeUpdateDrainRequest struct {
 	// NodeEvents is a mapping of the node to the event to add to the node
 	NodeEvents map[string]*NodeEvent
 
+	// UpdatedAt represents server time of receiving request
+	UpdatedAt int64
+
 	WriteRequest
 }
 
@@ -399,6 +406,9 @@ type NodeUpdateEligibilityRequest struct {
 	// NodeEvent is the event added to the node
 	NodeEvent *NodeEvent
 
+	// UpdatedAt represents server time of receiving request
+	UpdatedAt int64
+
 	WriteRequest
 }
 

From 40b0c641636f922c8d73d00e0a0c7b3a43330da2 Mon Sep 17 00:00:00 2001
From: Mahmood Ali <mahmood@hashicorp.com>
Date: Tue, 21 May 2019 21:10:17 -0400
Subject: [PATCH 2/2] update callers in tests

---
 nomad/drainer/watch_nodes_test.go | 4 ++--
 nomad/node_endpoint_test.go       | 4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/nomad/drainer/watch_nodes_test.go b/nomad/drainer/watch_nodes_test.go
index ca64e2dd134..6155d577f39 100644
--- a/nomad/drainer/watch_nodes_test.go
+++ b/nomad/drainer/watch_nodes_test.go
@@ -88,7 +88,7 @@ func TestNodeDrainWatcher_Remove(t *testing.T) {
 	require.Equal(n, tracked[n.ID])
 
 	// Change the node to be not draining and wait for it to be untracked
-	require.Nil(state.UpdateNodeDrain(101, n.ID, nil, false, nil))
+	require.Nil(state.UpdateNodeDrain(101, n.ID, nil, false, 0, nil))
 	testutil.WaitForResult(func() (bool, error) {
 		return len(m.Events) == 2, nil
 	}, func(err error) {
@@ -166,7 +166,7 @@ func TestNodeDrainWatcher_Update(t *testing.T) {
 	// Change the node to have a new spec
 	s2 := n.DrainStrategy.Copy()
 	s2.Deadline += time.Hour
-	require.Nil(state.UpdateNodeDrain(101, n.ID, s2, false, nil))
+	require.Nil(state.UpdateNodeDrain(101, n.ID, s2, false, 0, nil))
 
 	// Wait for it to be updated
 	testutil.WaitForResult(func() (bool, error) {
diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go
index 70542392bdc..6b7b90e2f40 100644
--- a/nomad/node_endpoint_test.go
+++ b/nomad/node_endpoint_test.go
@@ -2662,7 +2662,7 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) {
 				Deadline: 10 * time.Second,
 			},
 		}
-		errCh <- state.UpdateNodeDrain(3, node.ID, s, false, nil)
+		errCh <- state.UpdateNodeDrain(3, node.ID, s, false, 0, nil)
 	})
 
 	req.MinQueryIndex = 2
@@ -2688,7 +2688,7 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) {
 
 	// Node status update triggers watches
 	time.AfterFunc(100*time.Millisecond, func() {
-		errCh <- state.UpdateNodeStatus(40, node.ID, structs.NodeStatusDown, nil)
+		errCh <- state.UpdateNodeStatus(40, node.ID, structs.NodeStatusDown, 0, nil)
 	})
 
 	req.MinQueryIndex = 38