Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add node events #3945

Merged
merged 19 commits into from
Mar 14, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
code review feedback
  • Loading branch information
chelseakomlo authored and dadgar committed Mar 14, 2018
commit 319f80907cf1df94d3c624a5cd13121ff915c3e7
56 changes: 25 additions & 31 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,6 @@ const (
// allocSyncRetryIntv is the interval on which we retry updating
// the status of the allocation
allocSyncRetryIntv = 5 * time.Second

// nodeEventsEmitIntv is the interval at which node events are synced with
// the server
nodeEventsEmitIntv = 3 * time.Second
)

// ClientStatsReporter exposes all the APIs related to resource usage of a Nomad
Expand Down Expand Up @@ -1062,7 +1058,7 @@ func (c *Client) registerAndHeartbeat() {
go c.watchNodeUpdates()

// Start watching for emitting node events
go c.watchEmitEvents()
go c.watchNodeEvents()

// Setup the heartbeat timer, for the initial registration
// we want to do this quickly. We want to do it extra quickly
Expand Down Expand Up @@ -1147,7 +1143,7 @@ func (c *Client) run() {
// these kinds of events include when a driver moves from healthy to unhealthy
// (and vice versa)
func (c *Client) submitNodeEvents(events []*structs.NodeEvent) error {
nodeID := c.Node().ID
nodeID := c.NodeID()
nodeEvents := map[string][]*structs.NodeEvent{
nodeID: events,
}
Expand All @@ -1159,44 +1155,42 @@ func (c *Client) submitNodeEvents(events []*structs.NodeEvent) error {
if err := c.RPC("Node.EmitEvents", &req, &resp); err != nil {
return fmt.Errorf("Emitting node event failed: %v", err)
}
c.logger.Printf("[INFO] client: emit node events complete")
return nil
}

// watchEmitEvents is a handler which receives node events and on a interval and
// submits them in batch format to the server
func (c *Client) watchEmitEvents() {
batchEvents := make([]*structs.NodeEvent, 0)
// watchNodeEvents is a handler which receives node events and on a interval
// and submits them in batch format to the server
func (c *Client) watchNodeEvents() {
// batchEvents stores events that have yet to be published
var batchEvents []*structs.NodeEvent

timer := time.NewTimer(c.retryIntv(nodeEventsEmitIntv))
// Create and drain the timer
timer := time.NewTimer(0)
timer.Stop()
select {
case <-timer.C:
default:
}
defer timer.Stop()

for {
select {
case event := <-c.triggerEmitNodeEvent:
batchEvents = append(batchEvents, event)

case <-timer.C:
timer.Reset(c.retryIntv(nodeUpdateRetryIntv))

if len(batchEvents) == 0 {
// if we haven't received any events to emit, continue until the next
// time interval
continue
if l := len(batchEvents); l <= structs.MaxRetainedNodeEvents {
batchEvents = append(batchEvents, event)
} else {
// Drop the oldest event
c.logger.Printf("[WARN] client: dropping node event: %v", batchEvents[0])
batchEvents = append(batchEvents[1:], event)
}

err := c.submitNodeEvents(batchEvents)
if err != nil {
batchEvents = make([]*structs.NodeEvent, 0)
c.logger.Printf("[ERR] client: Failure in thie process of trying to submit node events: %v", err)
} else if len(batchEvents) >= structs.MaxRetainedNodeEvents {
// Truncate list to under 10
batchEvents = make([]*structs.NodeEvent, 0)
timer.Reset(c.retryIntv(nodeUpdateRetryIntv))
case <-timer.C:
if err := c.submitNodeEvents(batchEvents); err != nil {
c.logger.Printf("[ERR] client: submitting node events failed: %v", err)
timer.Reset(c.retryIntv(nodeUpdateRetryIntv))
}

case <-c.shutdownCh:
return
default:
}
}
}
Expand Down
17 changes: 11 additions & 6 deletions command/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,6 @@ func (c *NodeStatusCommand) formatNode(client *api.Client, node *api.Node) int {
}

func (c *NodeStatusCommand) outputNodeStatusEvents(node *api.Node) {
if !c.verbose {
return
}

c.Ui.Output(c.Colorize().Color("\n[bold]Node Events "))
c.outputNodeEvent(node.NodeEvents)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you take in whether it is in verbose mode and only emit details in verbose

Expand All @@ -400,14 +397,22 @@ func (c *NodeStatusCommand) outputNodeStatusEvents(node *api.Node) {
func (c *NodeStatusCommand) outputNodeEvent(events []*api.NodeEvent) {
size := len(events)
nodeEvents := make([]string, size+1)
nodeEvents[0] = "Time|Subsystem|Message|Details"
if c.verbose {
nodeEvents[0] = "Time|Subsystem|Message|Details"
} else {
nodeEvents[0] = "Time|Subsystem|Message"
}

for i, event := range events {
timestamp := formatUnixNanoTime(event.Timestamp)
subsystem := event.Subsystem
msg := event.Message
details := formatEventDetails(event.Details)
nodeEvents[size-i] = fmt.Sprintf("%s|%s|%s|%s", timestamp, subsystem, msg, details)
if c.verbose {
details := formatEventDetails(event.Details)
nodeEvents[size-i] = fmt.Sprintf("%s|%s|%s|%s", timestamp, subsystem, msg, details)
} else {
nodeEvents[size-i] = fmt.Sprintf("%s|%s|%s", timestamp, subsystem, msg)
}
}
c.Ui.Output(formatList(nodeEvents))
}
Expand Down
6 changes: 6 additions & 0 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ func (n *Node) EmitEvents(args *structs.EmitNodeEventsRequest, reply *structs.Em
}
defer metrics.MeasureSince([]string{"nomad", "client", "emit_event"}, time.Now())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add validation that the node event map isn't isn't empty

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return an error if there are no events

if args.NodeEvents == nil {
err := fmt.Errorf("No event to add; node event map is nil")
n.srv.logger.Printf("[ERR] nomad.node AddNodeEventsType failed: %v", err)
return err
}

_, index, err := n.srv.raftApply(structs.AddNodeEventsType, args)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove blank line

if err != nil {
Expand Down
5 changes: 2 additions & 3 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3722,8 +3722,7 @@ func (s *StateStore) upsertNodeEvents(index uint64, nodeEvents map[string][]*str
}

// Copy the existing node
copyNode := new(structs.Node)
*copyNode = *node
copyNode := node.Copy()

nodeEvents := node.NodeEvents

Expand All @@ -3733,7 +3732,7 @@ func (s *StateStore) upsertNodeEvents(index uint64, nodeEvents map[string][]*str

// keep node events pruned to below 10 simultaneously
if len(nodeEvents) >= structs.MaxRetainedNodeEvents {
delta := len(nodeEvents) - 10
delta := len(nodeEvents) - structs.MaxRetainedNodeEvents
nodeEvents = nodeEvents[delta+1:]
}
nodeEvents = append(nodeEvents, e)
Expand Down
21 changes: 14 additions & 7 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1188,6 +1188,15 @@ type NodeEvent struct {
ModifyIndex uint64
}

func (ne *NodeEvent) String() string {
var details string
for k, v := range ne.Details {
details = fmt.Sprintf("%s: %s", k, v)
}

return fmt.Sprintf("Message: %s, Subsystem: %s, Details: %s, Timestamp: %d", ne.Message, string(ne.Subsystem), details, ne.Timestamp)
}

// EmitNodeEventsRequest is a request to update the node events source
// with a new client-side event
type EmitNodeEventsRequest struct {
Expand Down Expand Up @@ -1221,16 +1230,14 @@ func (n *Node) Copy() *Node {
nn.Reserved = nn.Reserved.Copy()
nn.Links = helper.CopyMapStringString(nn.Links)
nn.Meta = helper.CopyMapStringString(nn.Meta)
nn.NodeEvents = copyNodeEvents(n)
nn.NodeEvents = copyNodeEvents(n.NodeEvents)
return nn
}

func copyNodeEvents(first *Node) []*NodeEvent {
nodeEvents := make([]*NodeEvent, 0)
for _, e := range first.NodeEvents {
nodeEvents = append(nodeEvents, e)
}
return nodeEvents
func copyNodeEvents(first []*NodeEvent) []*NodeEvent {
second := make([]*NodeEvent, len(first))
copy(second, first)
return second
}

// TerminalStatus returns if the current status is terminal and
Expand Down