diff --git a/CHANGELOG.md b/CHANGELOG.md index 452e0663dcf..a774dbcc1a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ With this release InfluxDB is moving to Go 1.5. - [#3975](https://github.com/influxdb/influxdb/pull/3975): Add shard copy service - [#3986](https://github.com/influxdb/influxdb/pull/3986): Support sorting by time desc - [#3930](https://github.com/influxdb/influxdb/pull/3930): Wire up TOP aggregate function - fixes [#1821](https://github.com/influxdb/influxdb/issues/1821) +- [#3996](https://github.com/influxdb/influxdb/pull/3996): Add statistics to httpd package ### Bugfixes - [#3804](https://github.com/influxdb/influxdb/pull/3804): init.d script fixes, fixes issue 3803. diff --git a/services/httpd/handler.go b/services/httpd/handler.go index efc650656b5..3e6042b638d 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -74,16 +74,18 @@ type Handler struct { Logger *log.Logger loggingEnabled bool // Log every HTTP access. WriteTrace bool // Detailed logging of write path + statMap *expvar.Map } // NewHandler returns a new instance of handler with routes. -func NewHandler(requireAuthentication, loggingEnabled, writeTrace bool) *Handler { +func NewHandler(requireAuthentication, loggingEnabled, writeTrace bool, statMap *expvar.Map) *Handler { h := &Handler{ mux: pat.New(), requireAuthentication: requireAuthentication, Logger: log.New(os.Stderr, "[http] ", log.LstdFlags), loggingEnabled: loggingEnabled, WriteTrace: writeTrace, + statMap: statMap, } h.SetRoutes([]route{ @@ -150,6 +152,8 @@ func (h *Handler) SetRoutes(routes []route) { // ServeHTTP responds to HTTP request to the handler. func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + h.statMap.Add(statRequest, 1) + // FIXME(benbjohnson): Add pprof enabled flag. if strings.HasPrefix(r.URL.Path, "/debug/pprof") { switch r.URL.Path { @@ -170,6 +174,8 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) { + h.statMap.Add(statCQRequest, 1) + // If the continuous query service isn't configured, return 404. if h.ContinuousQuerier == nil { w.WriteHeader(http.StatusNotImplemented) @@ -210,6 +216,8 @@ func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.R // serveQuery parses an incoming query and, if valid, executes the query. func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) { + h.statMap.Add(statQueryRequest, 1) + q := r.URL.Query() pretty := q.Get("pretty") == "true" @@ -288,9 +296,10 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta. // Write out result immediately if chunked. if chunked { - w.Write(MarshalJSON(Response{ + n, _ := w.Write(MarshalJSON(Response{ Results: []*influxql.Result{r}, }, pretty)) + h.statMap.Add(statQueryRequestBytesTransmitted, int64(n)) w.(http.Flusher).Flush() continue } @@ -327,11 +336,13 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta. // If it's not chunked we buffered everything in memory, so write it out if !chunked { - w.Write(MarshalJSON(resp, pretty)) + n, _ := w.Write(MarshalJSON(resp, pretty)) + h.statMap.Add(statQueryRequestBytesTransmitted, int64(n)) } } func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) { + h.statMap.Add(statWriteRequest, 1) // Handle gzip decoding of the body body := r.Body @@ -353,6 +364,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta. h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest) return } + h.statMap.Add(statWriteRequestBytesReceived, int64(len(b))) if h.WriteTrace { h.Logger.Printf("write body received by handler: %s", string(b)) } @@ -415,13 +427,16 @@ func (h *Handler) serveWriteJSON(w http.ResponseWriter, r *http.Request, body [] RetentionPolicy: bp.RetentionPolicy, ConsistencyLevel: cluster.ConsistencyLevelOne, Points: points, - }); influxdb.IsClientError(err) { - resultError(w, influxql.Result{Err: err}, http.StatusBadRequest) - return - } else if err != nil { - resultError(w, influxql.Result{Err: err}, http.StatusInternalServerError) + }); err != nil { + h.statMap.Add(statPointsWrittenFail, int64(len(points))) + if influxdb.IsClientError(err) { + h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest) + } else { + h.writeError(w, influxql.Result{Err: err}, http.StatusInternalServerError) + } return } + h.statMap.Add(statPointsWrittenOK, int64(len(points))) w.WriteHeader(http.StatusNoContent) } @@ -512,13 +527,16 @@ func (h *Handler) serveWriteLine(w http.ResponseWriter, r *http.Request, body [] ConsistencyLevel: consistency, Points: points, }); influxdb.IsClientError(err) { + h.statMap.Add(statPointsWrittenFail, int64(len(points))) h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest) return } else if err != nil { + h.statMap.Add(statPointsWrittenFail, int64(len(points))) h.writeError(w, influxql.Result{Err: err}, http.StatusInternalServerError) return } + h.statMap.Add(statPointsWrittenOK, int64(len(points))) w.WriteHeader(http.StatusNoContent) } @@ -529,6 +547,7 @@ func (h *Handler) serveOptions(w http.ResponseWriter, r *http.Request) { // servePing returns a simple response to let the client know the server is running. func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) { + h.statMap.Add(statPingRequest, 1) w.WriteHeader(http.StatusNoContent) } @@ -667,16 +686,19 @@ func authenticate(inner func(http.ResponseWriter, *http.Request, *meta.UserInfo) if requireAuthentication && len(uis) > 0 { username, password, err := parseCredentials(r) if err != nil { + h.statMap.Add(statAuthFail, 1) httpError(w, err.Error(), false, http.StatusUnauthorized) return } if username == "" { + h.statMap.Add(statAuthFail, 1) httpError(w, "username required", false, http.StatusUnauthorized) return } user, err = h.MetaStore.Authenticate(username, password) if err != nil { + h.statMap.Add(statAuthFail, 1) httpError(w, err.Error(), false, http.StatusUnauthorized) return } diff --git a/services/httpd/handler_test.go b/services/httpd/handler_test.go index 228b3bf7718..c945d12231d 100644 --- a/services/httpd/handler_test.go +++ b/services/httpd/handler_test.go @@ -12,6 +12,7 @@ import ( "testing" "time" + "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/client" "github.com/influxdb/influxdb/influxql" "github.com/influxdb/influxdb/meta" @@ -365,8 +366,9 @@ type Handler struct { // NewHandler returns a new instance of Handler. func NewHandler(requireAuthentication bool) *Handler { + statMap := influxdb.NewStatistics("httpd", "httpd", nil) h := &Handler{ - Handler: httpd.NewHandler(requireAuthentication, true, false), + Handler: httpd.NewHandler(requireAuthentication, true, false, statMap), } h.Handler.MetaStore = &h.MetaStore h.Handler.QueryExecutor = &h.QueryExecutor diff --git a/services/httpd/service.go b/services/httpd/service.go index 7ada11e7674..6e0db0bfcb1 100644 --- a/services/httpd/service.go +++ b/services/httpd/service.go @@ -2,12 +2,29 @@ package httpd import ( "crypto/tls" + "expvar" "fmt" "log" "net" "net/http" "os" "strings" + + "github.com/influxdb/influxdb" +) + +// statistics gathered by the httpd package. +const ( + statRequest = "req" // Number of HTTP requests served + statCQRequest = "cq_req" // Number of CQ-execute requests served + statQueryRequest = "query_req" // Number of query requests served + statWriteRequest = "write_req" // Number of write requests serverd + statPingRequest = "ping_req" // Number of ping requests served + statWriteRequestBytesReceived = "write_req_bytes" // Sum of all bytes in write requests + statQueryRequestBytesTransmitted = "query_resp_bytes" // Sum of all bytes returned in query reponses + statPointsWrittenOK = "points_written_ok" // Number of points written OK + statPointsWrittenFail = "points_written_fail" // Number of points that failed to be written + statAuthFail = "auth_fail" // Number of authentication failures ) // Service manages the listener and handler for an HTTP endpoint. @@ -20,11 +37,18 @@ type Service struct { Handler *Handler - Logger *log.Logger + Logger *log.Logger + statMap *expvar.Map } // NewService returns a new instance of Service. func NewService(c Config) *Service { + // Configure expvar monitoring. It's OK to do this even if the service fails to open and + // should be done before any data could arrive for the service. + key := strings.Join([]string{"httpd", c.BindAddress}, ":") + tags := map[string]string{"bind": c.BindAddress} + statMap := influxdb.NewStatistics(key, "httpd", tags) + s := &Service{ addr: c.BindAddress, https: c.HttpsEnabled, @@ -34,6 +58,7 @@ func NewService(c Config) *Service { c.AuthEnabled, c.LogEnabled, c.WriteTracing, + statMap, ), Logger: log.New(os.Stderr, "[httpd] ", log.LstdFlags), }