diff --git a/CHANGELOG.md b/CHANGELOG.md index 373445c7d2f..de75b30208c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ### Features - [#2076](https://github.com/influxdb/influxdb/pull/2076): Seperate stdout and stderr output in init.d script - [#2091](https://github.com/influxdb/influxdb/pull/2091): Support disabling snapshot endpoint. +- [#2081](https://github.com/influxdb/influxdb/pull/2081): Support writing diagnostic data into the internal database. ### Bugfixes - [#2084](https://github.com/influxdb/influxdb/pull/2084): Allowing leading underscores in identifiers. diff --git a/diagnostics.go b/diagnostics.go new file mode 100644 index 00000000000..4b4e29916cd --- /dev/null +++ b/diagnostics.go @@ -0,0 +1,143 @@ +package influxdb + +import ( + "os" + "runtime" + "time" + + "github.com/influxdb/influxdb/influxql" +) + +// GoDiagnostics captures basic information about the runtime. +type GoDiagnostics struct { + GoMaxProcs int + NumGoroutine int + Version string +} + +// NewGoDiagnostics returns a GoDiagnostics object. +func NewGoDiagnostics() *GoDiagnostics { + return &GoDiagnostics{ + GoMaxProcs: runtime.GOMAXPROCS(0), + NumGoroutine: runtime.NumGoroutine(), + Version: runtime.Version(), + } +} + +// AsRow returns the GoDiagnostic object as an InfluxQL row. +func (g *GoDiagnostics) AsRow(measurement string, tags map[string]string) *influxql.Row { + return &influxql.Row{ + Name: measurement, + Columns: []string{"time", "goMaxProcs", "numGoRoutine", "version"}, + Tags: tags, + Values: [][]interface{}{[]interface{}{time.Now().UTC(), + g.GoMaxProcs, g.NumGoroutine, g.Version}}, + } +} + +// SystemDiagnostics captures basic machine data. +type SystemDiagnostics struct { + Hostname string + PID int + OS string + Arch string + NumCPU int +} + +// NewSystemDiagnostics returns a SystemDiagnostics object. +func NewSystemDiagnostics() *SystemDiagnostics { + hostname, err := os.Hostname() + if err != nil { + hostname = "unknown" + } + + return &SystemDiagnostics{ + Hostname: hostname, + PID: os.Getpid(), + OS: runtime.GOOS, + Arch: runtime.GOARCH, + NumCPU: runtime.NumCPU(), + } +} + +// AsRow returns the GoDiagnostic object as an InfluxQL row. +func (s *SystemDiagnostics) AsRow(measurement string, tags map[string]string) *influxql.Row { + return &influxql.Row{ + Name: measurement, + Columns: []string{"time", "hostname", "pid", "os", "arch", "numCPU"}, + Tags: tags, + Values: [][]interface{}{[]interface{}{time.Now().UTC(), + s.Hostname, s.PID, s.OS, s.Arch, s.NumCPU}}, + } +} + +// MemoryDiagnostics captures Go memory stats. +type MemoryDiagnostics struct { + Alloc int + TotalAlloc int + Sys int + Lookups int + Mallocs int + Frees int + HeapAlloc int + HeapSys int + HeapIdle int + HeapInUse int + HeapReleased int + HeapObjects int + PauseTotalNs int + NumGC int +} + +// NewMemoryDiagnostics returns a MemoryDiagnostics object. +func NewMemoryDiagnostics() *MemoryDiagnostics { + var m runtime.MemStats + runtime.ReadMemStats(&m) + + return &MemoryDiagnostics{ + Alloc: int(m.Alloc), + TotalAlloc: int(m.TotalAlloc), + Sys: int(m.Sys), + Lookups: int(m.Lookups), + Mallocs: int(m.Mallocs), + Frees: int(m.Frees), + HeapAlloc: int(m.HeapAlloc), + HeapSys: int(m.HeapSys), + HeapIdle: int(m.HeapIdle), + HeapInUse: int(m.HeapInuse), + HeapReleased: int(m.HeapReleased), + HeapObjects: int(m.HeapObjects), + PauseTotalNs: int(m.PauseTotalNs), + NumGC: int(m.NumGC), + } +} + +// AsRow returns the MemoryDiagnostics object as an InfluxQL row. +func (m *MemoryDiagnostics) AsRow(measurement string, tags map[string]string) *influxql.Row { + return &influxql.Row{ + Name: measurement, + Columns: []string{"time", "alloc", "totalAlloc", "sys", "lookups", "mallocs", "frees", "heapAlloc", + "heapSys", "heapIdle", "heapInUse", "heapReleased", "heapObjects", "pauseTotalNs", "numGG"}, + Tags: tags, + Values: [][]interface{}{[]interface{}{time.Now().UTC(), + m.Alloc, m.TotalAlloc, m.Sys, m.Lookups, m.Mallocs, m.Frees, m.HeapAlloc, + m.HeapSys, m.HeapIdle, m.HeapInUse, m.HeapReleased, m.HeapObjects, m.PauseTotalNs, m.NumGC}}, + } +} + +// BuildDiagnostics capture basic build version information. +type BuildDiagnostics struct { + Version string + CommitHash string +} + +// AsRow returns the BuildDiagnostics object as an InfluxQL row. +func (b *BuildDiagnostics) AsRow(measurement string, tags map[string]string) *influxql.Row { + return &influxql.Row{ + Name: measurement, + Columns: []string{"time", "version", "commitHash"}, + Tags: tags, + Values: [][]interface{}{[]interface{}{time.Now().UTC(), + b.Version, b.CommitHash}}, + } +} diff --git a/influxdb.go b/influxdb.go index c73d81721cd..f84d6db635d 100644 --- a/influxdb.go +++ b/influxdb.go @@ -10,6 +10,12 @@ import ( "github.com/influxdb/influxdb/client" ) +var startTime time.Time + +func init() { + startTime = time.Now().UTC() +} + var ( // ErrServerOpen is returned when opening an already open server. ErrServerOpen = errors.New("server already open") diff --git a/server.go b/server.go index dfa40900a27..98c431d5647 100644 --- a/server.go +++ b/server.go @@ -51,12 +51,6 @@ const ( retentionPolicyMinDuration = time.Hour ) -var startTime time.Time - -func init() { - startTime = time.Now().UTC() -} - // Server represents a collection of metadata and raw metric data. type Server struct { mu sync.RWMutex @@ -345,23 +339,26 @@ func (s *Server) StartSelfMonitoring(database, retention string, interval time.D return fmt.Errorf("statistics check interval must be non-zero") } - // Function for local use turns stats into a point. - pointFromStats := func(st *Stats, tags map[string]string) Point { - point := Point{ - Timestamp: time.Now(), - Name: st.Name(), - Tags: make(map[string]string), - Fields: make(map[string]interface{}), - } - // Specifically create a new map. - for k, v := range tags { - point.Tags[k] = v - } + // Function for local use turns stats into a slice of points + pointsFromStats := func(st *Stats, tags map[string]string) []Point { + points := make([]Point, 0) + now := time.Now() st.Walk(func(k string, v int64) { - point.Fields[k] = int(v) + point := Point{ + Timestamp: now, + Name: st.name + "_" + k, + Tags: make(map[string]string), + Fields: map[string]interface{}{"value": int(v)}, + } + // Specifically create a new map. + for k, v := range tags { + point.Tags[k] = v + } + points = append(points, point) }) - return point + + return points } go func() { @@ -370,20 +367,29 @@ func (s *Server) StartSelfMonitoring(database, retention string, interval time.D <-tick.C // Create the batch and tags - batch := make([]Point, 0) tags := map[string]string{"serverID": strconv.FormatUint(s.ID(), 10)} if h, err := os.Hostname(); err == nil { tags["host"] = h } - - // Server stats. - batch = append(batch, pointFromStats(s.stats, tags)) + batch := pointsFromStats(s.stats, tags) // Shard-level stats. + tags["shardID"] = strconv.FormatUint(s.id, 10) for _, sh := range s.shards { - point := pointFromStats(sh.stats, tags) - point.Tags["shardID"] = strconv.FormatUint(s.id, 10) - batch = append(batch, point) + batch = append(batch, pointsFromStats(sh.stats, tags)...) + } + + // Server diagnostics. + for _, row := range s.DiagnosticsAsRows() { + points, err := s.convertRowToPoints(row.Name, row) + if err != nil { + s.Logger.Printf("failed to write diagnostic row for %s: %s", row.Name, err.Error()) + continue + } + for _, p := range points { + p.Tags = map[string]string{"serverID": strconv.FormatUint(s.ID(), 10)} + } + batch = append(batch, points...) } s.WriteSeries(database, retention, batch) @@ -2714,137 +2720,7 @@ func (s *Server) executeShowDiagnosticsStatement(stmt *influxql.ShowDiagnosticsS s.mu.RLock() defer s.mu.RUnlock() - rows := make([]*influxql.Row, 0) - now := time.Now().UTC().Format(time.RFC3339Nano) - - var m runtime.MemStats - runtime.ReadMemStats(&m) - - hostname, err := os.Hostname() - if err != nil { - hostname = "unknown" - } - - diags := []struct { - name string - fields map[string]interface{} - }{ - { - name: "build", - fields: map[string]interface{}{ - "version": s.Version, - "commitHash": s.CommitHash, - }, - }, - { - name: "server", - fields: map[string]interface{}{ - "id": s.id, - "path": s.path, - "authEnabled": s.authenticationEnabled, - "index": s.index, - "retentionAutoCreate": s.RetentionAutoCreate, - "numShards": len(s.shards), - }, - }, - { - name: "cq", - fields: map[string]interface{}{ - "lastRun": s.lastContinuousQueryRun, - }, - }, - { - name: "system", - fields: map[string]interface{}{ - "startTime": startTime, - "uptime": time.Since(startTime).String(), - "hostname": hostname, - "pid": os.Getpid(), - "os": runtime.GOOS, - "arch": runtime.GOARCH, - "numcpu": runtime.NumCPU(), - }, - }, - { - name: "memory", - fields: map[string]interface{}{ - "alloc": m.Alloc, - "totalAlloc": m.TotalAlloc, - "sys": m.Sys, - "lookups": m.Lookups, - "mallocs": m.Mallocs, - "frees": m.Frees, - "heapAlloc": m.HeapAlloc, - "heapSys": m.HeapSys, - "heapIdle": m.HeapIdle, - "heapInUse": m.HeapInuse, - "heapReleased": m.HeapReleased, - "heapObjects": m.HeapObjects, - "pauseTotalNs": m.PauseTotalNs, - "numGC": m.NumGC, - }, - }, - { - name: "go", - fields: map[string]interface{}{ - "goMaxProcs": runtime.GOMAXPROCS(0), - "numGoroutine": runtime.NumGoroutine(), - "version": runtime.Version(), - }, - }, - } - - for _, d := range diags { - row := &influxql.Row{Columns: []string{"time"}} - row.Name = d.name - - // Get sorted list of keys. - sortedKeys := make([]string, 0, len(d.fields)) - for k, _ := range d.fields { - sortedKeys = append(sortedKeys, k) - } - sort.Strings(sortedKeys) - - values := []interface{}{now} - for _, k := range sortedKeys { - row.Columns = append(row.Columns, k) - values = append(values, d.fields[k]) - } - row.Values = append(row.Values, values) - rows = append(rows, row) - } - - // Shard groups. - shardGroupsRow := &influxql.Row{Columns: []string{}} - shardGroupsRow.Name = "shardGroups" - shardGroupsRow.Columns = append(shardGroupsRow.Columns, "time", "database", "retentionPolicy", "id", - "startTime", "endTime", "duration", "numShards") - // Check all shard groups. - for _, db := range s.databases { - for _, rp := range db.policies { - for _, g := range rp.shardGroups { - shardGroupsRow.Values = append(shardGroupsRow.Values, []interface{}{now, db.name, rp.Name, - g.ID, g.StartTime, g.EndTime, g.Duration().String(), len(g.Shards)}) - } - } - } - rows = append(rows, shardGroupsRow) - - // Shards - shardsRow := &influxql.Row{Columns: []string{}} - shardsRow.Name = "shards" - shardsRow.Columns = append(shardsRow.Columns, "time", "id", "dataNodes", "index", "path") - for _, sh := range s.shards { - var nodes []string - for _, n := range sh.DataNodeIDs { - nodes = append(nodes, strconv.FormatUint(n, 10)) - shardsRow.Values = append(shardsRow.Values, []interface{}{now, sh.ID, strings.Join(nodes, ","), - sh.index, sh.store.Path()}) - } - } - rows = append(rows, shardsRow) - - return &Result{Series: rows} + return &Result{Series: s.DiagnosticsAsRows()} } // filterMeasurementsByExpr filters a list of measurements by a tags expression. @@ -3225,6 +3101,73 @@ func (s *Server) normalizeMeasurement(m *influxql.Measurement, defaultDatabase s return nm, nil } +// DiagnosticsAsRows returns diagnostic information about the server, as a slice of +// InfluxQL rows. +func (s *Server) DiagnosticsAsRows() []*influxql.Row { + s.mu.RLock() + defer s.mu.RUnlock() + now := time.Now().UTC() + + // Common rows. + gd := NewGoDiagnostics() + sd := NewSystemDiagnostics() + md := NewMemoryDiagnostics() + bd := BuildDiagnostics{Version: s.Version, CommitHash: s.CommitHash} + + // Common tagset. + tags := map[string]string{"serverID": strconv.FormatUint(s.id, 10)} + + // Server row. + serverRow := &influxql.Row{ + Name: "server_diag", + Columns: []string{"time", "startTime", "uptime", "id", + "path", "authEnabled", "index", "retentionAutoCreate", "numShards", "cqLastRun"}, + Tags: tags, + Values: [][]interface{}{[]interface{}{now, startTime.String(), time.Since(startTime).String(), strconv.FormatUint(s.id, 10), + s.path, s.authenticationEnabled, int(s.index), s.RetentionAutoCreate, len(s.shards), s.lastContinuousQueryRun.String()}}, + } + + // Shard groups. + shardGroupsRow := &influxql.Row{Columns: []string{}} + shardGroupsRow.Name = "shardGroups_diag" + shardGroupsRow.Columns = append(shardGroupsRow.Columns, "time", "database", "retentionPolicy", "id", + "startTime", "endTime", "duration", "numShards") + shardGroupsRow.Tags = tags + // Check all shard groups. + for _, db := range s.databases { + for _, rp := range db.policies { + for _, g := range rp.shardGroups { + shardGroupsRow.Values = append(shardGroupsRow.Values, []interface{}{now, db.name, rp.Name, + strconv.FormatUint(g.ID, 10), g.StartTime.String(), g.EndTime.String(), g.Duration().String(), len(g.Shards)}) + } + } + } + + // Shards + shardsRow := &influxql.Row{Columns: []string{}} + shardsRow.Name = "shards_diag" + shardsRow.Columns = append(shardsRow.Columns, "time", "id", "dataNodes", "index", "path") + shardsRow.Tags = tags + for _, sh := range s.shards { + var nodes []string + for _, n := range sh.DataNodeIDs { + nodes = append(nodes, strconv.FormatUint(n, 10)) + shardsRow.Values = append(shardsRow.Values, []interface{}{now, strconv.FormatUint(sh.ID, 10), strings.Join(nodes, ","), + strconv.FormatUint(sh.index, 10), sh.store.Path()}) + } + } + + return []*influxql.Row{ + gd.AsRow("server_go", tags), + sd.AsRow("server_system", tags), + md.AsRow("server_memory", tags), + bd.AsRow("server_build", tags), + serverRow, + shardGroupsRow, + shardsRow, + } +} + // processor runs in a separate goroutine and processes all incoming broker messages. func (s *Server) processor(conn MessagingConn, done chan struct{}) { for {