From 8f87f7bfbe397c22a97fef70ab135d10e6068b2f Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 15 Jan 2021 12:21:36 +0100 Subject: [PATCH 1/3] Logs PushRequest data. This will allows to find information about received size and total entries per tenant. Example of a log from my dev testing: ``` level=debug ts=2021-01-15T11:16:21.735663076Z caller=http.go:67 org_id=3927 traceID=11c4774c6ec4bbf4 msg="push request parsed" path=/loki/api/v1/push content-type=application/x-protobuf body-size="11 kB" streams=5 entries=298 streamLabelsSize="1.9 kB" entriesSize="45 kB" totalSize="47 kB" ``` Of course this means we can use LogQL on this. Signed-off-by: Cyril Tovena --- pkg/distributor/http.go | 43 ++++++++++++++++++++++++++++++++++++----- pkg/util/reader.go | 31 +++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 5 deletions(-) create mode 100644 pkg/util/reader.go diff --git a/pkg/distributor/http.go b/pkg/distributor/http.go index b25e3961bf0f2..54d5296d0f8e7 100644 --- a/pkg/distributor/http.go +++ b/pkg/distributor/http.go @@ -4,6 +4,8 @@ import ( "math" "net/http" + "github.com/dustin/go-humanize" + "github.com/go-kit/kit/log/level" "github.com/weaveworks/common/httpgrpc" "github.com/cortexproject/cortex/pkg/util" @@ -12,6 +14,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/unmarshal" unmarshal_legacy "github.com/grafana/loki/pkg/logql/unmarshal/legacy" + lokiutil "github.com/grafana/loki/pkg/util" ) var contentType = http.CanonicalHeaderKey("Content-Type") @@ -20,7 +23,6 @@ const applicationJSON = "application/json" // PushHandler reads a snappy-compressed proto from the HTTP body. func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) { - req, err := ParseRequest(r) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) @@ -44,14 +46,45 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) { func ParseRequest(r *http.Request) (*logproto.PushRequest, error) { var req logproto.PushRequest - switch r.Header.Get(contentType) { + logger := util.WithContext(r.Context(), util.Logger) + body := lokiutil.NewSizeReader(r.Body) + contentType := r.Header.Get(contentType) + + defer func() { + var ( + entriesSize int64 + streamLabelsSize int64 + totalEntries int64 + ) + + for _, s := range req.Streams { + streamLabelsSize += int64(len(s.Labels)) + for _, e := range s.Entries { + totalEntries++ + entriesSize += int64(len(e.Line)) + } + } + level.Debug(logger).Log( + "msg", "push request parsed", + "path", r.URL.Path, + "content-type", contentType, + "body-size", humanize.Bytes(uint64(body.Size())), + "streams", len(req.Streams), + "entries", totalEntries, + "streamLabelsSize", humanize.Bytes(uint64(streamLabelsSize)), + "entriesSize", humanize.Bytes(uint64(entriesSize)), + "totalSize", humanize.Bytes(uint64(entriesSize+streamLabelsSize)), + ) + }() + + switch contentType { case applicationJSON: var err error if loghttp.GetVersion(r.RequestURI) == loghttp.VersionV1 { - err = unmarshal.DecodePushRequest(r.Body, &req) + err = unmarshal.DecodePushRequest(body, &req) } else { - err = unmarshal_legacy.DecodePushRequest(r.Body, &req) + err = unmarshal_legacy.DecodePushRequest(body, &req) } if err != nil { @@ -59,7 +92,7 @@ func ParseRequest(r *http.Request) (*logproto.PushRequest, error) { } default: - if err := util.ParseProtoReader(r.Context(), r.Body, int(r.ContentLength), math.MaxInt32, &req, util.RawSnappy); err != nil { + if err := util.ParseProtoReader(r.Context(), body, int(r.ContentLength), math.MaxInt32, &req, util.RawSnappy); err != nil { return nil, err } } diff --git a/pkg/util/reader.go b/pkg/util/reader.go new file mode 100644 index 0000000000000..47a5764143bd7 --- /dev/null +++ b/pkg/util/reader.go @@ -0,0 +1,31 @@ +package util + +import ( + "io" +) + +type sizeReader struct { + size int64 + r io.Reader +} + +type SizeReader interface { + io.Reader + Size() int64 +} + +// NewSizeReader returns an io.Reader that will have the number of bytes +// read from r available. +func NewSizeReader(r io.Reader) SizeReader { + return &sizeReader{r: r} +} + +func (v *sizeReader) Read(p []byte) (int, error) { + n, err := v.r.Read(p) + v.size += int64(n) + return n, err +} + +func (v *sizeReader) Size() int64 { + return v.size +} From 28b7eddebd5cc14dce2bb7ee5d2f736d2c66b37c Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 15 Jan 2021 14:55:43 +0100 Subject: [PATCH 2/3] Move metrics to avoid request traversal twice. Signed-off-by: Cyril Tovena --- pkg/distributor/distributor.go | 23 ---------------------- pkg/distributor/http.go | 35 ++++++++++++++++++++++++++++++---- 2 files changed, 31 insertions(+), 27 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index bc73889e5ce61..43006cd939dca 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -42,17 +42,6 @@ var ( Help: "The total number of failed batch appends sent to ingesters.", }, []string{"ingester"}) - bytesIngested = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: "loki", - Name: "distributor_bytes_received_total", - Help: "The total number of uncompressed bytes received per tenant", - }, []string{"tenant"}) - linesIngested = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: "loki", - Name: "distributor_lines_received_total", - Help: "The total number of lines received per tenant", - }, []string{"tenant"}) - maxLabelCacheSize = 100000 ) @@ -193,18 +182,6 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log return nil, err } - // Track metrics. - bytesCount := 0 - lineCount := 0 - for _, stream := range req.Streams { - for _, entry := range stream.Entries { - bytesCount += len(entry.Line) - lineCount++ - } - } - bytesIngested.WithLabelValues(userID).Add(float64(bytesCount)) - linesIngested.WithLabelValues(userID).Add(float64(lineCount)) - // First we flatten out the request into a list of samples. // We use the heuristic of 1 sample per TS to size the array. // We also work out the hash value at the same time. diff --git a/pkg/distributor/http.go b/pkg/distributor/http.go index 54d5296d0f8e7..3551b9ce1d478 100644 --- a/pkg/distributor/http.go +++ b/pkg/distributor/http.go @@ -6,7 +6,10 @@ import ( "github.com/dustin/go-humanize" "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/util" @@ -17,7 +20,20 @@ import ( lokiutil "github.com/grafana/loki/pkg/util" ) -var contentType = http.CanonicalHeaderKey("Content-Type") +var ( + contentType = http.CanonicalHeaderKey("Content-Type") + + bytesIngested = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "loki", + Name: "distributor_bytes_received_total", + Help: "The total number of uncompressed bytes received per tenant", + }, []string{"tenant"}) + linesIngested = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "loki", + Name: "distributor_lines_received_total", + Help: "The total number of lines received per tenant", + }, []string{"tenant"}) +) const applicationJSON = "application/json" @@ -44,11 +60,15 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) { } func ParseRequest(r *http.Request) (*logproto.PushRequest, error) { - var req logproto.PushRequest + userID, err := user.ExtractOrgID(r.Context()) + if err != nil { + return nil, err + } logger := util.WithContext(r.Context(), util.Logger) body := lokiutil.NewSizeReader(r.Body) contentType := r.Header.Get(contentType) + var req logproto.PushRequest defer func() { var ( @@ -64,11 +84,18 @@ func ParseRequest(r *http.Request) (*logproto.PushRequest, error) { entriesSize += int64(len(e.Line)) } } + + // incrementing tenant metrics. + if totalEntries != 0 { + bytesIngested.WithLabelValues(userID).Add(float64(entriesSize)) + linesIngested.WithLabelValues(userID).Add(float64(totalEntries)) + } + level.Debug(logger).Log( "msg", "push request parsed", "path", r.URL.Path, - "content-type", contentType, - "body-size", humanize.Bytes(uint64(body.Size())), + "contentType", contentType, + "bodySize", humanize.Bytes(uint64(body.Size())), "streams", len(req.Streams), "entries", totalEntries, "streamLabelsSize", humanize.Bytes(uint64(streamLabelsSize)), From cd5f4155cde862cd8c9834c6c2c63e17dce38fc2 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 15 Jan 2021 15:16:04 +0100 Subject: [PATCH 3/3] Fixes tests. Signed-off-by: Cyril Tovena --- pkg/distributor/http.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/pkg/distributor/http.go b/pkg/distributor/http.go index 3551b9ce1d478..8935bd657134b 100644 --- a/pkg/distributor/http.go +++ b/pkg/distributor/http.go @@ -60,11 +60,7 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) { } func ParseRequest(r *http.Request) (*logproto.PushRequest, error) { - userID, err := user.ExtractOrgID(r.Context()) - if err != nil { - return nil, err - } - + userID, _ := user.ExtractOrgID(r.Context()) logger := util.WithContext(r.Context(), util.Logger) body := lokiutil.NewSizeReader(r.Body) contentType := r.Header.Get(contentType) @@ -85,8 +81,8 @@ func ParseRequest(r *http.Request) (*logproto.PushRequest, error) { } } - // incrementing tenant metrics. - if totalEntries != 0 { + // incrementing tenant metrics if we have a tenant. + if totalEntries != 0 && userID != "" { bytesIngested.WithLabelValues(userID).Add(float64(entriesSize)) linesIngested.WithLabelValues(userID).Add(float64(totalEntries)) }