Skip to content

Commit

Permalink
Add collectd stats
Browse files Browse the repository at this point in the history
  • Loading branch information
otoolep committed Sep 9, 2015
1 parent abaee68 commit 9677a0f
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
With this release InfluxDB is moving to Go 1.5.

### Features
- [#4050](https://github.com/influxdb/influxdb/pull/4050): Add stats to collectd
- [#3771](https://github.com/influxdb/influxdb/pull/3771): Close idle Graphite TCP connections
- [#3755](https://github.com/influxdb/influxdb/issues/3755): Add option to build script. Thanks @fg2it
- [#3863](https://github.com/influxdb/influxdb/pull/3863): Move to Go 1.5
Expand Down
39 changes: 34 additions & 5 deletions services/collectd/service.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package collectd

import (
"expvar"
"fmt"
"log"
"net"
"os"
"strings"
"sync"
"time"

"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/tsdb"
Expand All @@ -16,6 +19,17 @@ import (

const leaderWaitTimeout = 30 * time.Second

// statistics gathered by the collectd service.
const (
statPointsReceived = "points_rx"
statBytesReceived = "bytes_rx"
statPointsParseFail = "points_parse_fail"
statReadFail = "read_fail"
statBatchesTrasmitted = "batches_tx"
statPointsTransmitted = "points_tx"
statBatchesTransmitFail = "batches_tx_fail"
)

// pointsWriter is an internal interface to make testing easier.
type pointsWriter interface {
WritePoints(p *cluster.WritePointsRequest) error
Expand All @@ -42,6 +56,9 @@ type Service struct {
batcher *tsdb.PointBatcher
typesdb gollectd.Types
addr net.Addr

// expvar-based stats.
statMap *expvar.Map
}

// NewService returns a new instance of the collectd service.
Expand All @@ -59,6 +76,12 @@ func NewService(c Config) *Service {
func (s *Service) Open() error {
s.Logger.Printf("Starting collectd service")

// Configure expvar monitoring. It's OK to do this even if the service fails to open and
// should be done before any data could arrive for the service.
key := strings.Join([]string{"collectd", s.Config.BindAddress}, ":")
tags := map[string]string{"bind": s.Config.BindAddress}
s.statMap = influxdb.NewStatistics(key, "collectd", tags)

if s.Config.BindAddress == "" {
return fmt.Errorf("bind address is blank")
} else if s.Config.Database == "" {
Expand Down Expand Up @@ -182,10 +205,12 @@ func (s *Service) serve() {

n, _, err := s.ln.ReadFromUDP(buffer)
if err != nil {
s.statMap.Add(statReadFail, 1)
s.Logger.Printf("collectd ReadFromUDP error: %s", err)
continue
}
if n > 0 {
s.statMap.Add(statBytesReceived, int64(n))
s.handleMessage(buffer[:n])
}
}
Expand All @@ -194,6 +219,7 @@ func (s *Service) serve() {
func (s *Service) handleMessage(buffer []byte) {
packets, err := gollectd.Packets(buffer, s.typesdb)
if err != nil {
s.statMap.Add(statPointsParseFail, 1)
s.Logger.Printf("Collectd parse error: %s", err)
return
}
Expand All @@ -202,6 +228,7 @@ func (s *Service) handleMessage(buffer []byte) {
for _, p := range points {
s.batcher.In() <- p
}
s.statMap.Add(statPointsReceived, int64(len(points)))
}
}

Expand All @@ -213,15 +240,17 @@ func (s *Service) writePoints() {
case <-s.stop:
return
case batch := <-s.batcher.Out():
req := &cluster.WritePointsRequest{
if err := s.PointsWriter.WritePoints(&cluster.WritePointsRequest{
Database: s.Config.Database,
RetentionPolicy: s.Config.RetentionPolicy,
ConsistencyLevel: cluster.ConsistencyLevelAny,
Points: batch,
}
if err := s.PointsWriter.WritePoints(req); err != nil {
s.Logger.Printf("failed to write batch: %s", err)
continue
}); err == nil {
s.statMap.Add(statBatchesTrasmitted, 1)
s.statMap.Add(statPointsTransmitted, int64(len(batch)))
} else {
s.Logger.Printf("failed to write point batch to database %q: %s", s.Config.Database, err)
s.statMap.Add(statBatchesTransmitFail, 1)
}
}
}
Expand Down

0 comments on commit 9677a0f

Please sign in to comment.