Skip to content

Commit

Permalink
Merge pull request #4506 from influxdb/enterprise_stats
Browse files Browse the repository at this point in the history
Enterprise registration as a service
  • Loading branch information
otoolep committed Oct 20, 2015
2 parents ddca29a + 3cfe4e8 commit 956efae
Show file tree
Hide file tree
Showing 10 changed files with 326 additions and 108 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
- [#4409](https://github.com/influxdb/influxdb/pull/4409): wire up INTO queries.
- [#4379](https://github.com/influxdb/influxdb/pull/4379): Auto-create database for UDP input.
- [#4375](https://github.com/influxdb/influxdb/pull/4375): Add Subscriptions so data can be 'forked' out of InfluxDB to another third party.
- [#4459](https://github.com/influxdb/influxdb/pull/4459): Register with Enterprise service if token available.
- [#4506](https://github.com/influxdb/influxdb/pull/4506): Register with Enterprise service and upload stats, if token is available.
- [#4501](https://github.com/influxdb/influxdb/pull/4501): Allow filtering SHOW MEASUREMENTS by regex.

### Bugfixes
Expand Down
20 changes: 8 additions & 12 deletions cmd/influxd/run/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,21 @@ import (
"github.com/influxdb/influxdb/services/httpd"
"github.com/influxdb/influxdb/services/opentsdb"
"github.com/influxdb/influxdb/services/precreator"
"github.com/influxdb/influxdb/services/registration"
"github.com/influxdb/influxdb/services/retention"
"github.com/influxdb/influxdb/services/subscriber"
"github.com/influxdb/influxdb/services/udp"
"github.com/influxdb/influxdb/tsdb"
)

const DefaultEnterpriseURL = "https://enterprise.influxdata.com"

// Config represents the configuration format for the influxd binary.
type Config struct {
Meta *meta.Config `toml:"meta"`
Data tsdb.Config `toml:"data"`
Cluster cluster.Config `toml:"cluster"`
Retention retention.Config `toml:"retention"`
Precreator precreator.Config `toml:"shard-precreation"`
Meta *meta.Config `toml:"meta"`
Data tsdb.Config `toml:"data"`
Cluster cluster.Config `toml:"cluster"`
Retention retention.Config `toml:"retention"`
Registration registration.Config `toml:"registration"`
Precreator precreator.Config `toml:"shard-precreation"`

Admin admin.Config `toml:"admin"`
Monitor monitor.Config `toml:"monitor"`
Expand All @@ -54,19 +54,15 @@ type Config struct {

// Server reporting
ReportingDisabled bool `toml:"reporting-disabled"`

// Server registration
EnterpriseURL string `toml:"enterprise-url"`
EnterpriseToken string `toml:"enterprise-token"`
}

// NewConfig returns an instance of Config with reasonable defaults.
func NewConfig() *Config {
c := &Config{}
c.EnterpriseURL = DefaultEnterpriseURL
c.Meta = meta.NewConfig()
c.Data = tsdb.NewConfig()
c.Cluster = cluster.NewConfig()
c.Registration = registration.NewConfig()
c.Precreator = precreator.NewConfig()

c.Admin = admin.NewConfig()
Expand Down
16 changes: 1 addition & 15 deletions cmd/influxd/run/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ func TestConfig_Parse(t *testing.T) {
// Parse configuration.
var c run.Config
if _, err := toml.Decode(`
enterprise-token = "deadbeef"
[meta]
dir = "/tmp/meta"
Expand Down Expand Up @@ -57,9 +55,7 @@ enabled = true
}

// Validate configuration.
if c.EnterpriseToken != "deadbeef" {
t.Fatalf("unexpected Enterprise token: %s", c.EnterpriseToken)
} else if c.Meta.Dir != "/tmp/meta" {
if c.Meta.Dir != "/tmp/meta" {
t.Fatalf("unexpected meta dir: %s", c.Meta.Dir)
} else if c.Data.Dir != "/tmp/data" {
t.Fatalf("unexpected data dir: %s", c.Data.Dir)
Expand Down Expand Up @@ -91,8 +87,6 @@ func TestConfig_Parse_EnvOverride(t *testing.T) {
// Parse configuration.
var c run.Config
if _, err := toml.Decode(`
enterprise-token = "deadbeef"
[meta]
dir = "/tmp/meta"
Expand Down Expand Up @@ -131,10 +125,6 @@ enabled = true
t.Fatal(err)
}

if err := os.Setenv("INFLUXDB_ENTERPRISE_TOKEN", "wheresthebeef"); err != nil {
t.Fatalf("failed to set env var: %v", err)
}

if err := os.Setenv("INFLUXDB_UDP_BIND_ADDRESS", ":1234"); err != nil {
t.Fatalf("failed to set env var: %v", err)
}
Expand All @@ -147,10 +137,6 @@ enabled = true
t.Fatalf("failed to apply env overrides: %v", err)
}

if c.EnterpriseToken != "wheresthebeef" {
t.Fatalf("unexpected Enterprise token: %s", c.EnterpriseToken)
}

if c.UDPs[0].BindAddress != ":4444" {
t.Fatalf("unexpected udp bind address: %s", c.UDPs[0].BindAddress)
}
Expand Down
81 changes: 17 additions & 64 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package run

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
Expand All @@ -26,6 +24,7 @@ import (
"github.com/influxdb/influxdb/services/httpd"
"github.com/influxdb/influxdb/services/opentsdb"
"github.com/influxdb/influxdb/services/precreator"
"github.com/influxdb/influxdb/services/registration"
"github.com/influxdb/influxdb/services/retention"
"github.com/influxdb/influxdb/services/snapshotter"
"github.com/influxdb/influxdb/services/subscriber"
Expand Down Expand Up @@ -76,8 +75,6 @@ type Server struct {

// Server reporting and registration
reportingDisabled bool
enterpriseURL string
enterpriseToken string

// Profiling
CPUProfile string
Expand All @@ -104,8 +101,6 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
Monitor: monitor.New(c.Monitor),

reportingDisabled: c.ReportingDisabled,
enterpriseURL: c.EnterpriseURL,
enterpriseToken: c.EnterpriseToken,
}

// Copy TSDB configuration.
Expand Down Expand Up @@ -162,6 +157,7 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
// Append services.
s.appendClusterService(c.Cluster)
s.appendPrecreatorService(c.Precreator)
s.appendRegistrationService(c.Registration)
s.appendSnapshotterService()
s.appendCopierService()
s.appendAdminService(c.Admin)
Expand Down Expand Up @@ -299,6 +295,21 @@ func (s *Server) appendPrecreatorService(c precreator.Config) error {
return nil
}

func (s *Server) appendRegistrationService(c registration.Config) error {
if !c.Enabled {
return nil
}
srv, err := registration.NewService(c, s.buildInfo.Version)
if err != nil {
return err
}

srv.MetaStore = s.MetaStore
srv.Monitor = s.Monitor
s.Services = append(s.Services, srv)
return nil
}

func (s *Server) appendUDPService(c udp.Config) {
if !c.Enabled {
return
Expand Down Expand Up @@ -403,11 +414,6 @@ func (s *Server) Open() error {
go s.startServerReporting()
}

// Register server
if err := s.registerServer(); err != nil {
log.Printf("failed to register server: %s", err.Error())
}

return nil

}(); err != nil {
Expand Down Expand Up @@ -519,59 +525,6 @@ func (s *Server) reportServer() {
go client.Post("http://m.influxdb.com:8086/db/reporting/series?u=reporter&p=influxdb", "application/json", data)
}

// registerServer registers the server on start-up.
func (s *Server) registerServer() error {
if s.enterpriseToken == "" {
return nil
}

clusterID, err := s.MetaStore.ClusterID()
if err != nil {
log.Printf("failed to retrieve cluster ID for registration: %s", err.Error())
return err
}

hostname, err := os.Hostname()
if err != nil {
return err
}

j := map[string]interface{}{
"cluster_id": fmt.Sprintf("%d", clusterID),
"server_id": fmt.Sprintf("%d", s.MetaStore.NodeID()),
"host": hostname,
"product": "influxdb",
"version": s.buildInfo.Version,
}
b, err := json.Marshal(j)
if err != nil {
return err
}

url := fmt.Sprintf("%s/api/v1/servers?token=%s", s.enterpriseURL, s.enterpriseToken)
go func() {
client := http.Client{Timeout: time.Duration(5 * time.Second)}
resp, err := client.Post(url, "application/json", bytes.NewBuffer(b))
if err != nil {
log.Printf("failed to register server with %s: %s", s.enterpriseURL, err.Error())
return
}
defer resp.Body.Close()

if resp.StatusCode == http.StatusCreated {
return
}

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Printf("failed to read response from registration server: %s", err.Error())
return
}
log.Printf("failed to register server with %s: received code %s, body: %s", s.enterpriseURL, resp.Status, string(body))
}()
return nil
}

// monitorErrorChan reads an error channel and resends it through the server.
func (s *Server) monitorErrorChan(ch <-chan error) {
for {
Expand Down
11 changes: 8 additions & 3 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@
# Change this option to true to disable reporting.
reporting-disabled = false

# Enterprise registration control
# enterprise-url = "https://enterprise.influxdata.com" # The Enterprise server URL
# enterprise-token = "" # Registration token for Enterprise server
###
### Enterprise registration control
###

[registration]
# enabled = true
# url = "https://enterprise.influxdata.com" # The Enterprise server URL
# token = "" # Registration token for Enterprise server

###
### [meta]
Expand Down
24 changes: 12 additions & 12 deletions monitor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,16 +171,16 @@ func (m *Monitor) DeregisterDiagnosticsClient(name string) {

// Statistics returns the combined statistics for all expvar data. The given
// tags are added to each of the returned statistics.
func (m *Monitor) Statistics(tags map[string]string) ([]*statistic, error) {
statistics := make([]*statistic, 0)
func (m *Monitor) Statistics(tags map[string]string) ([]*Statistic, error) {
statistics := make([]*Statistic, 0)

expvar.Do(func(kv expvar.KeyValue) {
// Skip built-in expvar stats.
if kv.Key == "memstats" || kv.Key == "cmdline" {
return
}

statistic := &statistic{
statistic := &Statistic{
Tags: make(map[string]string),
Values: make(map[string]interface{}),
}
Expand Down Expand Up @@ -246,7 +246,7 @@ func (m *Monitor) Statistics(tags map[string]string) ([]*statistic, error) {
})

// Add Go memstats.
statistic := &statistic{
statistic := &Statistic{
Name: "runtime",
Tags: make(map[string]string),
Values: make(map[string]interface{}),
Expand Down Expand Up @@ -388,24 +388,24 @@ func (m *Monitor) storeStatistics() {
}
}

// statistic represents the information returned by a single monitor client.
type statistic struct {
Name string
Tags map[string]string
Values map[string]interface{}
// Statistic represents the information returned by a single monitor client.
type Statistic struct {
Name string `json:"name"`
Tags map[string]string `json:"tags"`
Values map[string]interface{} `json:"values"`
}

// newStatistic returns a new statistic object.
func newStatistic(name string, tags map[string]string, values map[string]interface{}) *statistic {
return &statistic{
func newStatistic(name string, tags map[string]string, values map[string]interface{}) *Statistic {
return &Statistic{
Name: name,
Tags: tags,
Values: values,
}
}

// valueNames returns a sorted list of the value names, if any.
func (s *statistic) valueNames() []string {
func (s *Statistic) valueNames() []string {
a := make([]string, 0, len(s.Values))
for k, _ := range s.Values {
a = append(a, k)
Expand Down
2 changes: 1 addition & 1 deletion monitor/statement_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
// StatementExecutor translates InfluxQL queries to Monitor methods.
type StatementExecutor struct {
Monitor interface {
Statistics(map[string]string) ([]*statistic, error)
Statistics(map[string]string) ([]*Statistic, error)
Diagnostics() (map[string]*Diagnostic, error)
}
}
Expand Down
27 changes: 27 additions & 0 deletions services/registration/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package registration

import (
"time"

"github.com/influxdb/influxdb/toml"
)

const (
DefaultURL = "https://enterprise.influxdata.com"
DefaultStatsInterval = time.Minute
)

type Config struct {
Enabled bool `toml:"enabled"`
URL string `toml:"url"`
Token string `toml:"token"`
StatsInterval toml.Duration `toml:"stats-interval"`
}

func NewConfig() Config {
return Config{
Enabled: true,
URL: DefaultURL,
StatsInterval: toml.Duration(DefaultStatsInterval),
}
}
33 changes: 33 additions & 0 deletions services/registration/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package registration_test

import (
"testing"
"time"

"github.com/BurntSushi/toml"
"github.com/influxdb/influxdb/services/registration"
)

func TestConfig_Parse(t *testing.T) {
// Parse configuration.
var c registration.Config
if _, err := toml.Decode(`
enabled = true
url = "a.b.c"
token = "1234"
stats-interval = "1s"
`, &c); err != nil {
t.Fatal(err)
}

// Validate configuration.
if c.Enabled != true {
t.Fatalf("unexpected enabled state: %v", c.Enabled)
} else if c.URL != "a.b.c" {
t.Fatalf("unexpected Enterprise URL: %s", c.URL)
} else if c.Token != "1234" {
t.Fatalf("unexpected Enterprise URL: %s", c.URL)
} else if time.Duration(c.StatsInterval) != time.Second {
t.Fatalf("unexpected stats interval: %v", c.StatsInterval)
}
}
Loading

0 comments on commit 956efae

Please sign in to comment.