Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add node events #3945

Merged
merged 19 commits into from
Mar 14, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
keep state store functions in one file
  • Loading branch information
chelseakomlo authored and dadgar committed Mar 14, 2018
commit 3f561c3870c876d1cbd9e3860544a67ffd8a7fd6
53 changes: 0 additions & 53 deletions nomad/state/events_state_store.go

This file was deleted.

94 changes: 0 additions & 94 deletions nomad/state/events_state_store_test.go

This file was deleted.

45 changes: 45 additions & 0 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3693,3 +3693,48 @@ func (r *StateRestore) addEphemeralDiskToTaskGroups(job *structs.Job) {
}
}
}

// addNodeEvent is a function which wraps upsertNodeEvent
func (s *StateStore) AddNodeEvent(index uint64, node *structs.Node, events []*structs.NodeEvent) error {
txn := s.db.Txn(true)
defer txn.Abort()

err := s.upsertNodeEvents(index, node, events, txn)
txn.Commit()
return err
}

// upsertNodeEvent upserts a node event for a respective node. It also maintains
// that only 10 node events are ever stored simultaneously, deleting older
// events once this bound has been reached.
func (s *StateStore) upsertNodeEvents(index uint64, node *structs.Node, events []*structs.NodeEvent, txn *memdb.Txn) error {

// Copy the existing node
copyNode := new(structs.Node)
*copyNode = *node

nodeEvents := node.NodeEvents

for _, e := range events {
e.CreateIndex = index
e.ModifyIndex = index

// keep node events pruned to below 10 simultaneously
if len(nodeEvents) >= 10 {
delta := len(nodeEvents) - 10
nodeEvents = nodeEvents[delta+1:]
}
nodeEvents = append(nodeEvents, e)
copyNode.NodeEvents = nodeEvents
}

// Insert the node
if err := txn.Insert("nodes", copyNode); err != nil {
return fmt.Errorf("node update failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}

return nil
}
82 changes: 82 additions & 0 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6469,6 +6469,88 @@ func TestStateStore_Abandon(t *testing.T) {
}
}

func TestStateStore_AddSingleNodeEvent(t *testing.T) {
require := require.New(t)
state := testStateStore(t)

node := mock.Node()

// We create a new node event every time we register a node
err := state.UpsertNode(1000, node)
require.Nil(err)

require.Equal(1, len(node.NodeEvents))
require.Equal(structs.Subsystem("Cluster"), node.NodeEvents[0].Subsystem)
require.Equal("Node Registered", node.NodeEvents[0].Message)

// Create a watchset so we can test that AddNodeEvent fires the watch
ws := memdb.NewWatchSet()
_, err = state.NodeByID(ws, node.ID)
require.Nil(err)

nodeEvent := &structs.NodeEvent{
Message: "failed",
Subsystem: "Driver",
Timestamp: time.Now().Unix(),
}
err = state.AddNodeEvent(1001, node, []*structs.NodeEvent{nodeEvent})
require.Nil(err)

require.True(watchFired(ws))

ws = memdb.NewWatchSet()
out, err := state.NodeByID(ws, node.ID)
require.Nil(err)

require.Equal(2, len(out.NodeEvents))
require.Equal(nodeEvent, out.NodeEvents[1])
}

// To prevent stale node events from accumulating, we limit the number of
// stored node events to 10.
func TestStateStore_NodeEvents_RetentionWindow(t *testing.T) {
require := require.New(t)
state := testStateStore(t)

node := mock.Node()

err := state.UpsertNode(1000, node)
if err != nil {
t.Fatalf("err: %v", err)
}
require.Equal(1, len(node.NodeEvents))
require.Equal(structs.Subsystem("Cluster"), node.NodeEvents[0].Subsystem)
require.Equal("Node Registered", node.NodeEvents[0].Message)

var out *structs.Node
for i := 1; i <= 20; i++ {
ws := memdb.NewWatchSet()
out, err = state.NodeByID(ws, node.ID)
require.Nil(err)

nodeEvent := &structs.NodeEvent{
Message: fmt.Sprintf("%dith failed", i),
Subsystem: "Driver",
Timestamp: time.Now().Unix(),
}
err := state.AddNodeEvent(uint64(i), out, []*structs.NodeEvent{nodeEvent})
require.Nil(err)

require.True(watchFired(ws))
ws = memdb.NewWatchSet()
out, err = state.NodeByID(ws, node.ID)
require.Nil(err)
}

ws := memdb.NewWatchSet()
out, err = state.NodeByID(ws, node.ID)
require.Nil(err)

require.Equal(10, len(out.NodeEvents))
require.Equal(uint64(11), out.NodeEvents[0].CreateIndex)
require.Equal(uint64(20), out.NodeEvents[len(out.NodeEvents)-1].CreateIndex)
}

// watchFired is a helper for unit tests that returns if the given watch set
// fired (it doesn't care which watch actually fired). This uses a fixed
// timeout since we already expect the event happened before calling this and
Expand Down