Skip to content

Commit

Permalink
Merge pull request #3996 from influxdb/httpd_stats2
Browse files Browse the repository at this point in the history
Add stats to httpd package
  • Loading branch information
otoolep committed Sep 4, 2015
2 parents 267f8e6 + e513b68 commit a500eb6
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ With this release InfluxDB is moving to Go 1.5.
- [#3975](https://github.com/influxdb/influxdb/pull/3975): Add shard copy service
- [#3986](https://github.com/influxdb/influxdb/pull/3986): Support sorting by time desc
- [#3930](https://github.com/influxdb/influxdb/pull/3930): Wire up TOP aggregate function - fixes [#1821](https://github.com/influxdb/influxdb/issues/1821)
- [#3996](https://github.com/influxdb/influxdb/pull/3996): Add statistics to httpd package

### Bugfixes
- [#3804](https://github.com/influxdb/influxdb/pull/3804): init.d script fixes, fixes issue 3803.
Expand Down
38 changes: 30 additions & 8 deletions services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,18 @@ type Handler struct {
Logger *log.Logger
loggingEnabled bool // Log every HTTP access.
WriteTrace bool // Detailed logging of write path
statMap *expvar.Map
}

// NewHandler returns a new instance of handler with routes.
func NewHandler(requireAuthentication, loggingEnabled, writeTrace bool) *Handler {
func NewHandler(requireAuthentication, loggingEnabled, writeTrace bool, statMap *expvar.Map) *Handler {
h := &Handler{
mux: pat.New(),
requireAuthentication: requireAuthentication,
Logger: log.New(os.Stderr, "[http] ", log.LstdFlags),
loggingEnabled: loggingEnabled,
WriteTrace: writeTrace,
statMap: statMap,
}

h.SetRoutes([]route{
Expand Down Expand Up @@ -150,6 +152,8 @@ func (h *Handler) SetRoutes(routes []route) {

// ServeHTTP responds to HTTP request to the handler.
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.statMap.Add(statRequest, 1)

// FIXME(benbjohnson): Add pprof enabled flag.
if strings.HasPrefix(r.URL.Path, "/debug/pprof") {
switch r.URL.Path {
Expand All @@ -170,6 +174,8 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
h.statMap.Add(statCQRequest, 1)

// If the continuous query service isn't configured, return 404.
if h.ContinuousQuerier == nil {
w.WriteHeader(http.StatusNotImplemented)
Expand Down Expand Up @@ -210,6 +216,8 @@ func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.R

// serveQuery parses an incoming query and, if valid, executes the query.
func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
h.statMap.Add(statQueryRequest, 1)

q := r.URL.Query()
pretty := q.Get("pretty") == "true"

Expand Down Expand Up @@ -288,9 +296,10 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.

// Write out result immediately if chunked.
if chunked {
w.Write(MarshalJSON(Response{
n, _ := w.Write(MarshalJSON(Response{
Results: []*influxql.Result{r},
}, pretty))
h.statMap.Add(statQueryRequestBytesTransmitted, int64(n))
w.(http.Flusher).Flush()
continue
}
Expand Down Expand Up @@ -327,11 +336,13 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.

// If it's not chunked we buffered everything in memory, so write it out
if !chunked {
w.Write(MarshalJSON(resp, pretty))
n, _ := w.Write(MarshalJSON(resp, pretty))
h.statMap.Add(statQueryRequestBytesTransmitted, int64(n))
}
}

func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
h.statMap.Add(statWriteRequest, 1)

// Handle gzip decoding of the body
body := r.Body
Expand All @@ -353,6 +364,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.
h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest)
return
}
h.statMap.Add(statWriteRequestBytesReceived, int64(len(b)))
if h.WriteTrace {
h.Logger.Printf("write body received by handler: %s", string(b))
}
Expand Down Expand Up @@ -415,13 +427,16 @@ func (h *Handler) serveWriteJSON(w http.ResponseWriter, r *http.Request, body []
RetentionPolicy: bp.RetentionPolicy,
ConsistencyLevel: cluster.ConsistencyLevelOne,
Points: points,
}); influxdb.IsClientError(err) {
resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
return
} else if err != nil {
resultError(w, influxql.Result{Err: err}, http.StatusInternalServerError)
}); err != nil {
h.statMap.Add(statPointsWrittenFail, int64(len(points)))
if influxdb.IsClientError(err) {
h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest)
} else {
h.writeError(w, influxql.Result{Err: err}, http.StatusInternalServerError)
}
return
}
h.statMap.Add(statPointsWrittenOK, int64(len(points)))

w.WriteHeader(http.StatusNoContent)
}
Expand Down Expand Up @@ -512,13 +527,16 @@ func (h *Handler) serveWriteLine(w http.ResponseWriter, r *http.Request, body []
ConsistencyLevel: consistency,
Points: points,
}); influxdb.IsClientError(err) {
h.statMap.Add(statPointsWrittenFail, int64(len(points)))
h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest)
return
} else if err != nil {
h.statMap.Add(statPointsWrittenFail, int64(len(points)))
h.writeError(w, influxql.Result{Err: err}, http.StatusInternalServerError)
return
}

h.statMap.Add(statPointsWrittenOK, int64(len(points)))
w.WriteHeader(http.StatusNoContent)
}

Expand All @@ -529,6 +547,7 @@ func (h *Handler) serveOptions(w http.ResponseWriter, r *http.Request) {

// servePing returns a simple response to let the client know the server is running.
func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) {
h.statMap.Add(statPingRequest, 1)
w.WriteHeader(http.StatusNoContent)
}

Expand Down Expand Up @@ -667,16 +686,19 @@ func authenticate(inner func(http.ResponseWriter, *http.Request, *meta.UserInfo)
if requireAuthentication && len(uis) > 0 {
username, password, err := parseCredentials(r)
if err != nil {
h.statMap.Add(statAuthFail, 1)
httpError(w, err.Error(), false, http.StatusUnauthorized)
return
}
if username == "" {
h.statMap.Add(statAuthFail, 1)
httpError(w, "username required", false, http.StatusUnauthorized)
return
}

user, err = h.MetaStore.Authenticate(username, password)
if err != nil {
h.statMap.Add(statAuthFail, 1)
httpError(w, err.Error(), false, http.StatusUnauthorized)
return
}
Expand Down
4 changes: 3 additions & 1 deletion services/httpd/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"testing"
"time"

"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/client"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/meta"
Expand Down Expand Up @@ -365,8 +366,9 @@ type Handler struct {

// NewHandler returns a new instance of Handler.
func NewHandler(requireAuthentication bool) *Handler {
statMap := influxdb.NewStatistics("httpd", "httpd", nil)
h := &Handler{
Handler: httpd.NewHandler(requireAuthentication, true, false),
Handler: httpd.NewHandler(requireAuthentication, true, false, statMap),
}
h.Handler.MetaStore = &h.MetaStore
h.Handler.QueryExecutor = &h.QueryExecutor
Expand Down
27 changes: 26 additions & 1 deletion services/httpd/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,29 @@ package httpd

import (
"crypto/tls"
"expvar"
"fmt"
"log"
"net"
"net/http"
"os"
"strings"

"github.com/influxdb/influxdb"
)

// statistics gathered by the httpd package.
const (
statRequest = "req" // Number of HTTP requests served
statCQRequest = "cq_req" // Number of CQ-execute requests served
statQueryRequest = "query_req" // Number of query requests served
statWriteRequest = "write_req" // Number of write requests serverd
statPingRequest = "ping_req" // Number of ping requests served
statWriteRequestBytesReceived = "write_req_bytes" // Sum of all bytes in write requests
statQueryRequestBytesTransmitted = "query_resp_bytes" // Sum of all bytes returned in query reponses
statPointsWrittenOK = "points_written_ok" // Number of points written OK
statPointsWrittenFail = "points_written_fail" // Number of points that failed to be written
statAuthFail = "auth_fail" // Number of authentication failures
)

// Service manages the listener and handler for an HTTP endpoint.
Expand All @@ -20,11 +37,18 @@ type Service struct {

Handler *Handler

Logger *log.Logger
Logger *log.Logger
statMap *expvar.Map
}

// NewService returns a new instance of Service.
func NewService(c Config) *Service {
// Configure expvar monitoring. It's OK to do this even if the service fails to open and
// should be done before any data could arrive for the service.
key := strings.Join([]string{"httpd", c.BindAddress}, ":")
tags := map[string]string{"bind": c.BindAddress}
statMap := influxdb.NewStatistics(key, "httpd", tags)

s := &Service{
addr: c.BindAddress,
https: c.HttpsEnabled,
Expand All @@ -34,6 +58,7 @@ func NewService(c Config) *Service {
c.AuthEnabled,
c.LogEnabled,
c.WriteTracing,
statMap,
),
Logger: log.New(os.Stderr, "[httpd] ", log.LstdFlags),
}
Expand Down

0 comments on commit a500eb6

Please sign in to comment.