Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track shard-level stats #2007

Merged
merged 2 commits into from
Mar 18, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## v0.9.0-rc15 [unreleased]
### Features
- [#2000](https://github.com/influxdb/influxdb/pull/2000): Log broker path when broker fails to start. Thanks @gst.
- [#2007](https://github.com/influxdb/influxdb/pull/2007): Track shard-level stats.

### Bugfixes
- [#2001](https://github.com/influxdb/influxdb/pull/2001): Ensure measurement not found returns status code 200.
Expand Down
60 changes: 46 additions & 14 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,22 +328,40 @@ func (s *Server) StartSelfMonitoring(database, retention string, interval time.D
return fmt.Errorf("statistics check interval must be non-zero")
}

// Function for local use turns stats into a point. Automatically tags all points with the
// server's Raft ID.
pointFromStats := func(st *Stats) Point {
point := Point{
Timestamp: time.Now(),
Name: st.Name(),
Tags: map[string]string{"raftID": strconv.FormatUint(s.id, 10)},
Fields: make(map[string]interface{}),
}
st.Walk(func(k string, v int64) {
point.Fields[k] = int(v)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious, why int(v)? isn't v already an int?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess int64 != int

})
return point
}

go func() {
tick := time.NewTicker(interval)
for {
<-tick.C

// Create the data point and write it.
point := Point{
Timestamp: time.Now(),
Name: s.stats.Name(),
Tags: map[string]string{"raftID": strconv.FormatUint(s.id, 10)},
Fields: make(map[string]interface{}),
batch := make([]Point, 0)

// Server stats.
batch = append(batch, pointFromStats(s.stats))

// Shard-level stats.
for _, sh := range s.shards {
point := pointFromStats(sh.stats)
point.Tags["shardID"] = strconv.FormatUint(s.id, 10)
batch = append(batch, point)
}
s.stats.Walk(func(k string, v int64) {
point.Fields[k] = int(v)
})
s.WriteSeries(database, retention, []Point{point})

s.WriteSeries(database, retention, batch)
}
}()

Expand Down Expand Up @@ -2606,14 +2624,28 @@ func (s *Server) executeShowContinuousQueriesStatement(stmt *influxql.ShowContin
}

func (s *Server) executeShowStatsStatement(stmt *influxql.ShowStatsStatement, user *User) *Result {
row := &influxql.Row{Columns: []string{}}
row.Name = s.stats.Name()
rows := make([]*influxql.Row, 0)
// Server stats.
serverRow := &influxql.Row{Columns: []string{}}
serverRow.Name = s.stats.Name()
s.stats.Walk(func(k string, v int64) {
row.Columns = append(row.Columns, k)
row.Values = append(row.Values, []interface{}{v})
serverRow.Columns = append(serverRow.Columns, k)
serverRow.Values = append(serverRow.Values, []interface{}{v})
})
rows = append(rows, serverRow)

return &Result{Series: []*influxql.Row{row}}
// Shard-level stats.
for _, sh := range s.shards {
row := &influxql.Row{Columns: []string{}}
row.Name = sh.stats.Name()
sh.stats.Walk(func(k string, v int64) {
row.Columns = append(row.Columns, k)
row.Values = append(row.Values, []interface{}{v})
})
rows = append(rows, row)
}

return &Result{Series: rows}
}

// filterMeasurementsByExpr filters a list of measurements by a tags expression.
Expand Down
9 changes: 9 additions & 0 deletions shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ type Shard struct {
store *bolt.DB // underlying data store
conn MessagingConn // streaming connection to broker

stats *Stats // In-memory stats

wg sync.WaitGroup // pending goroutines
closing chan struct{} // close notification
}
Expand All @@ -80,6 +82,10 @@ func (s *Shard) open(path string, conn MessagingConn) error {
return errors.New("shard already open")
}

if s.stats == nil {
s.stats = NewStats("shard")
}

// Open store on shard.
store, err := bolt.Open(path, 0666, &bolt.Options{Timeout: 1 * time.Second})
if err != nil {
Expand Down Expand Up @@ -184,6 +190,8 @@ func (s *Shard) writeSeries(index uint64, batch []byte) error {
if err := b.Put(u64tob(uint64(timestamp)), data); err != nil {
return err
}
s.stats.Add("shardBytes", int64(len(data))+8) // Payload plus timestamp
s.stats.Inc("shardWrite")

// Push the buffer forward and check if we're done.
batch = batch[payloadLength:]
Expand Down Expand Up @@ -242,6 +250,7 @@ func (s *Shard) processor(conn MessagingConn, closing <-chan struct{}) {
// Handle write series separately so we don't lock server during shard writes.
switch m.Type {
case writeRawSeriesMessageType:
s.stats.Inc("writeSeriesMessageRx")
if err := s.writeSeries(m.Index, m.Data); err != nil {
panic(fmt.Errorf("apply shard: id=%d, idx=%d, err=%s", s.ID, m.Index, err))
}
Expand Down