Skip to content

Commit

Permalink
Track shard-level stats
Browse files Browse the repository at this point in the history
  • Loading branch information
otoolep committed Mar 18, 2015
1 parent 6f646b2 commit 43a5747
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 14 deletions.
60 changes: 46 additions & 14 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,22 +328,40 @@ func (s *Server) StartSelfMonitoring(database, retention string, interval time.D
return fmt.Errorf("statistics check interval must be non-zero")
}

// Function for local use turns stats into a point. Automatically tags all points with the
// server's Raft ID.
pointFromStats := func(st *Stats) Point {
point := Point{
Timestamp: time.Now(),
Name: st.Name(),
Tags: map[string]string{"raftID": strconv.FormatUint(s.id, 10)},
Fields: make(map[string]interface{}),
}
st.Walk(func(k string, v int64) {
point.Fields[k] = int(v)
})
return point
}

go func() {
tick := time.NewTicker(interval)
for {
<-tick.C

// Create the data point and write it.
point := Point{
Timestamp: time.Now(),
Name: s.stats.Name(),
Tags: map[string]string{"raftID": strconv.FormatUint(s.id, 10)},
Fields: make(map[string]interface{}),
batch := make([]Point, 0)

// Server stats.
batch = append(batch, pointFromStats(s.stats))

// Shard-level stats.
for _, sh := range s.shards {
point := pointFromStats(sh.stats)
point.Tags["shardID"] = strconv.FormatUint(s.id, 10)
batch = append(batch, point)
}
s.stats.Walk(func(k string, v int64) {
point.Fields[k] = int(v)
})
s.WriteSeries(database, retention, []Point{point})

s.WriteSeries(database, retention, batch)
}
}()

Expand Down Expand Up @@ -2606,14 +2624,28 @@ func (s *Server) executeShowContinuousQueriesStatement(stmt *influxql.ShowContin
}

func (s *Server) executeShowStatsStatement(stmt *influxql.ShowStatsStatement, user *User) *Result {
row := &influxql.Row{Columns: []string{}}
row.Name = s.stats.Name()
rows := make([]*influxql.Row, 0)
// Server stats.
serverRow := &influxql.Row{Columns: []string{}}
serverRow.Name = s.stats.Name()
s.stats.Walk(func(k string, v int64) {
row.Columns = append(row.Columns, k)
row.Values = append(row.Values, []interface{}{v})
serverRow.Columns = append(serverRow.Columns, k)
serverRow.Values = append(serverRow.Values, []interface{}{v})
})
rows = append(rows, serverRow)

return &Result{Series: []*influxql.Row{row}}
// Shard-level stats.
for _, sh := range s.shards {
row := &influxql.Row{Columns: []string{}}
row.Name = sh.stats.Name()
sh.stats.Walk(func(k string, v int64) {
row.Columns = append(row.Columns, k)
row.Values = append(row.Values, []interface{}{v})
})
rows = append(rows, row)
}

return &Result{Series: rows}
}

// filterMeasurementsByExpr filters a list of measurements by a tags expression.
Expand Down
9 changes: 9 additions & 0 deletions shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ type Shard struct {
store *bolt.DB // underlying data store
conn MessagingConn // streaming connection to broker

stats *Stats // In-memory stats

wg sync.WaitGroup // pending goroutines
closing chan struct{} // close notification
}
Expand All @@ -80,6 +82,10 @@ func (s *Shard) open(path string, conn MessagingConn) error {
return errors.New("shard already open")
}

if s.stats == nil {
s.stats = NewStats("shard")
}

// Open store on shard.
store, err := bolt.Open(path, 0666, &bolt.Options{Timeout: 1 * time.Second})
if err != nil {
Expand Down Expand Up @@ -184,6 +190,8 @@ func (s *Shard) writeSeries(index uint64, batch []byte) error {
if err := b.Put(u64tob(uint64(timestamp)), data); err != nil {
return err
}
s.stats.Add("shardBytes", int64(len(data))+8) // Payload plus timestamp
s.stats.Inc("shardWrite")

// Push the buffer forward and check if we're done.
batch = batch[payloadLength:]
Expand Down Expand Up @@ -242,6 +250,7 @@ func (s *Shard) processor(conn MessagingConn, closing <-chan struct{}) {
// Handle write series separately so we don't lock server during shard writes.
switch m.Type {
case writeRawSeriesMessageType:
s.stats.Inc("writeSeriesMessageRx")
if err := s.writeSeries(m.Index, m.Data); err != nil {
panic(fmt.Errorf("apply shard: id=%d, idx=%d, err=%s", s.ID, m.Index, err))
}
Expand Down

0 comments on commit 43a5747

Please sign in to comment.