Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stats to httpd package #3996

Merged
merged 2 commits into from
Sep 4, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ With this release InfluxDB is moving to Go 1.5.
- [#3975](https://github.com/influxdb/influxdb/pull/3975): Add shard copy service
- [#3986](https://github.com/influxdb/influxdb/pull/3986): Support sorting by time desc
- [#3930](https://github.com/influxdb/influxdb/pull/3930): Wire up TOP aggregate function - fixes [#1821](https://github.com/influxdb/influxdb/issues/1821)
- [#3996](https://github.com/influxdb/influxdb/pull/3996): Add statistics to httpd package

### Bugfixes
- [#3804](https://github.com/influxdb/influxdb/pull/3804): init.d script fixes, fixes issue 3803.
Expand Down
38 changes: 30 additions & 8 deletions services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,18 @@ type Handler struct {
Logger *log.Logger
loggingEnabled bool // Log every HTTP access.
WriteTrace bool // Detailed logging of write path
statMap *expvar.Map
}

// NewHandler returns a new instance of handler with routes.
func NewHandler(requireAuthentication, loggingEnabled, writeTrace bool) *Handler {
func NewHandler(requireAuthentication, loggingEnabled, writeTrace bool, statMap *expvar.Map) *Handler {
h := &Handler{
mux: pat.New(),
requireAuthentication: requireAuthentication,
Logger: log.New(os.Stderr, "[http] ", log.LstdFlags),
loggingEnabled: loggingEnabled,
WriteTrace: writeTrace,
statMap: statMap,
}

h.SetRoutes([]route{
Expand Down Expand Up @@ -150,6 +152,8 @@ func (h *Handler) SetRoutes(routes []route) {

// ServeHTTP responds to HTTP request to the handler.
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.statMap.Add(statRequest, 1)

// FIXME(benbjohnson): Add pprof enabled flag.
if strings.HasPrefix(r.URL.Path, "/debug/pprof") {
switch r.URL.Path {
Expand All @@ -170,6 +174,8 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
h.statMap.Add(statCQRequest, 1)

// If the continuous query service isn't configured, return 404.
if h.ContinuousQuerier == nil {
w.WriteHeader(http.StatusNotImplemented)
Expand Down Expand Up @@ -210,6 +216,8 @@ func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.R

// serveQuery parses an incoming query and, if valid, executes the query.
func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
h.statMap.Add(statQueryRequest, 1)

q := r.URL.Query()
pretty := q.Get("pretty") == "true"

Expand Down Expand Up @@ -288,9 +296,10 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.

// Write out result immediately if chunked.
if chunked {
w.Write(MarshalJSON(Response{
n, _ := w.Write(MarshalJSON(Response{
Results: []*influxql.Result{r},
}, pretty))
h.statMap.Add(statQueryRequestBytesTransmitted, int64(n))
w.(http.Flusher).Flush()
continue
}
Expand Down Expand Up @@ -327,11 +336,13 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.

// If it's not chunked we buffered everything in memory, so write it out
if !chunked {
w.Write(MarshalJSON(resp, pretty))
n, _ := w.Write(MarshalJSON(resp, pretty))
h.statMap.Add(statQueryRequestBytesTransmitted, int64(n))
}
}

func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
h.statMap.Add(statWriteRequest, 1)

// Handle gzip decoding of the body
body := r.Body
Expand All @@ -353,6 +364,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.
h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest)
return
}
h.statMap.Add(statWriteRequestBytesReceived, int64(len(b)))
if h.WriteTrace {
h.Logger.Printf("write body received by handler: %s", string(b))
}
Expand Down Expand Up @@ -415,13 +427,16 @@ func (h *Handler) serveWriteJSON(w http.ResponseWriter, r *http.Request, body []
RetentionPolicy: bp.RetentionPolicy,
ConsistencyLevel: cluster.ConsistencyLevelOne,
Points: points,
}); influxdb.IsClientError(err) {
resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
return
} else if err != nil {
resultError(w, influxql.Result{Err: err}, http.StatusInternalServerError)
}); err != nil {
h.statMap.Add(statPointsWrittenFail, int64(len(points)))
if influxdb.IsClientError(err) {
h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest)
} else {
h.writeError(w, influxql.Result{Err: err}, http.StatusInternalServerError)
}
return
}
h.statMap.Add(statPointsWrittenOK, int64(len(points)))

w.WriteHeader(http.StatusNoContent)
}
Expand Down Expand Up @@ -512,13 +527,16 @@ func (h *Handler) serveWriteLine(w http.ResponseWriter, r *http.Request, body []
ConsistencyLevel: consistency,
Points: points,
}); influxdb.IsClientError(err) {
h.statMap.Add(statPointsWrittenFail, int64(len(points)))
h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest)
return
} else if err != nil {
h.statMap.Add(statPointsWrittenFail, int64(len(points)))
h.writeError(w, influxql.Result{Err: err}, http.StatusInternalServerError)
return
}

h.statMap.Add(statPointsWrittenOK, int64(len(points)))
w.WriteHeader(http.StatusNoContent)
}

Expand All @@ -529,6 +547,7 @@ func (h *Handler) serveOptions(w http.ResponseWriter, r *http.Request) {

// servePing returns a simple response to let the client know the server is running.
func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) {
h.statMap.Add(statPingRequest, 1)
w.WriteHeader(http.StatusNoContent)
}

Expand Down Expand Up @@ -667,16 +686,19 @@ func authenticate(inner func(http.ResponseWriter, *http.Request, *meta.UserInfo)
if requireAuthentication && len(uis) > 0 {
username, password, err := parseCredentials(r)
if err != nil {
h.statMap.Add(statAuthFail, 1)
httpError(w, err.Error(), false, http.StatusUnauthorized)
return
}
if username == "" {
h.statMap.Add(statAuthFail, 1)
httpError(w, "username required", false, http.StatusUnauthorized)
return
}

user, err = h.MetaStore.Authenticate(username, password)
if err != nil {
h.statMap.Add(statAuthFail, 1)
httpError(w, err.Error(), false, http.StatusUnauthorized)
return
}
Expand Down
4 changes: 3 additions & 1 deletion services/httpd/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"testing"
"time"

"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/client"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/meta"
Expand Down Expand Up @@ -365,8 +366,9 @@ type Handler struct {

// NewHandler returns a new instance of Handler.
func NewHandler(requireAuthentication bool) *Handler {
statMap := influxdb.NewStatistics("httpd", "httpd", nil)
h := &Handler{
Handler: httpd.NewHandler(requireAuthentication, true, false),
Handler: httpd.NewHandler(requireAuthentication, true, false, statMap),
}
h.Handler.MetaStore = &h.MetaStore
h.Handler.QueryExecutor = &h.QueryExecutor
Expand Down
27 changes: 26 additions & 1 deletion services/httpd/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,29 @@ package httpd

import (
"crypto/tls"
"expvar"
"fmt"
"log"
"net"
"net/http"
"os"
"strings"

"github.com/influxdb/influxdb"
)

// statistics gathered by the httpd package.
const (
statRequest = "req" // Number of HTTP requests served
statCQRequest = "cq_req" // Number of CQ-execute requests served
statQueryRequest = "query_req" // Number of query requests served
statWriteRequest = "write_req" // Number of write requests serverd
statPingRequest = "ping_req" // Number of ping requests served
statWriteRequestBytesReceived = "write_req_bytes" // Sum of all bytes in write requests
statQueryRequestBytesTransmitted = "query_resp_bytes" // Sum of all bytes returned in query reponses
statPointsWrittenOK = "points_written_ok" // Number of points written OK
statPointsWrittenFail = "points_written_fail" // Number of points that failed to be written
statAuthFail = "auth_fail" // Number of authentication failures
)

// Service manages the listener and handler for an HTTP endpoint.
Expand All @@ -20,11 +37,18 @@ type Service struct {

Handler *Handler

Logger *log.Logger
Logger *log.Logger
statMap *expvar.Map
}

// NewService returns a new instance of Service.
func NewService(c Config) *Service {
// Configure expvar monitoring. It's OK to do this even if the service fails to open and
// should be done before any data could arrive for the service.
key := strings.Join([]string{"httpd", c.BindAddress}, ":")
tags := map[string]string{"bind": c.BindAddress}
statMap := influxdb.NewStatistics(key, "httpd", tags)

s := &Service{
addr: c.BindAddress,
https: c.HttpsEnabled,
Expand All @@ -34,6 +58,7 @@ func NewService(c Config) *Service {
c.AuthEnabled,
c.LogEnabled,
c.WriteTracing,
statMap,
),
Logger: log.New(os.Stderr, "[httpd] ", log.LstdFlags),
}
Expand Down