Skip to content

Commit

Permalink
Add node's active state to diagnostic output
Browse files Browse the repository at this point in the history
  • Loading branch information
otoolep committed Oct 27, 2015
1 parent 95f9937 commit f38c536
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 15 deletions.
26 changes: 17 additions & 9 deletions services/hh/node_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,25 +202,23 @@ func (n *NodeProcessor) run() {
}

// SendWrite attempts to sent the current block of hinted data to the target node. If successful,
// it returns the number of bytes it sent and advances to the next block.
// it returns the number of bytes it sent and advances to the next block. Otherwise returns EOF
// when there is no more data or the node is inactive.
func (n *NodeProcessor) SendWrite() (int, error) {
n.mu.RLock()
defer n.mu.RUnlock()

if nio, err := n.meta.Node(n.nodeID); err != nil {
n.Logger.Printf("failed to determine if node %d is active: %s", n.nodeID, err.Error())
active, err := n.Active()
if err != nil {
return 0, err
} else if nio == nil {
// Node is inactive, nothing to do.
return 0, nil
}
if !active {
return 0, io.EOF
}

// Get the current block from the queue
buf, err := n.queue.Current()
if err != nil {
if err == io.EOF {
return 0, nil
}
return 0, err
}

Expand Down Expand Up @@ -265,6 +263,16 @@ func (n *NodeProcessor) Tail() string {
return qp.tail
}

// Active returns whether this node processor is for a currently active node.
func (n *NodeProcessor) Active() (bool, error) {
nio, err := n.meta.Node(n.nodeID)
if err != nil {
n.Logger.Printf("failed to determine if node %d is active: %s", n.nodeID, err.Error())
return false, err
}
return nio != nil, nil
}

func marshalWrite(shardID uint64, points []models.Point) []byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, shardID)
Expand Down
23 changes: 21 additions & 2 deletions services/hh/node_processor_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package hh

import (
"io"
"io/ioutil"
"os"
"testing"
Expand Down Expand Up @@ -75,6 +76,15 @@ func TestNodeProcessorSendBlock(t *testing.T) {
t.Fatalf("Failed to open node processor: %v", err)
}

// Check the active state.
active, err := n.Active()
if err != nil {
t.Fatalf("Failed to check node processor state: %v", err)
}
if !active {
t.Fatalf("Node processor state is unexpected value of: %v", active)
}

// This should queue a write for the active node.
if err := n.WriteShard(expShardID, []models.Point{pt}); err != nil {
t.Fatalf("SendWrite() failed to write points: %v", err)
Expand All @@ -90,7 +100,7 @@ func TestNodeProcessorSendBlock(t *testing.T) {
}

// All data should have been handled so no writes should be sent again
if _, err := n.SendWrite(); err != nil {
if _, err := n.SendWrite(); err != nil && err != io.EOF {
t.Fatalf("SendWrite() failed to write points: %v", err)
}

Expand All @@ -108,13 +118,22 @@ func TestNodeProcessorSendBlock(t *testing.T) {
return nil, nil
}

// Check the active state.
active, err = n.Active()
if err != nil {
t.Fatalf("Failed to check node processor state: %v", err)
}
if active {
t.Fatalf("Node processor state is unexpected value of: %v", active)
}

// This should queue a write for the node.
if err := n.WriteShard(expShardID, []models.Point{pt}); err != nil {
t.Fatalf("SendWrite() failed to write points: %v", err)
}

// This should not send the write to the shard writer since the node is inactive.
if _, err := n.SendWrite(); err != nil {
if _, err := n.SendWrite(); err != nil && err != io.EOF {
t.Fatalf("SendWrite() failed to write points: %v", err)
}

Expand Down
19 changes: 15 additions & 4 deletions services/hh/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func (s *Service) Open() error {
}

func (s *Service) Close() error {
s.Logger.Println("shutting down hh service")
s.mu.Lock()
defer s.mu.Unlock()

Expand Down Expand Up @@ -187,16 +188,26 @@ func (s *Service) Diagnostics() (*monitor.Diagnostic, error) {
defer s.mu.RUnlock()

d := &monitor.Diagnostic{
Columns: []string{"node", "last modified", "head", "tail"},
Columns: []string{"node", "active", "last modified", "head", "tail"},
Rows: make([][]interface{}, 0, len(s.processors)),
}

for k, v := range s.processors {
lm, err := v.LastModified()
if err != nil {
return nil, err
}

d.Rows = append(d.Rows, []interface{}{k, lm, v.Head(), v.Tail()})
active := "no"
b, err := v.Active()
if err != nil {
return nil, err
}
if b {
active = "yes"
}

d.Rows = append(d.Rows, []interface{}{k, active, lm, v.Head(), v.Tail()})
}
return d, nil
}
Expand All @@ -223,12 +234,12 @@ func (s *Service) purgeInactiveProcessors() {
continue
}

ni, err := s.metastore.Node(k)
active, err := v.Active()
if err != nil {
s.Logger.Printf("failed to determine if node %d is active: %s", k, err.Error())
continue
}
if ni != nil {
if active {
// Node is active.
continue
}
Expand Down

0 comments on commit f38c536

Please sign in to comment.