Skip to content

Commit 80aec25

Browse files
feat: Improve pattern ingester tracing (#14707)
1 parent c1fde26 commit 80aec25

File tree

5 files changed

+95
-5
lines changed

5 files changed

+95
-5
lines changed

pkg/loki/modules.go

+1
Original file line numberDiff line numberDiff line change
@@ -682,6 +682,7 @@ func (t *Loki) initPatternIngesterTee() (services.Service, error) {
682682
t.Cfg.Pattern,
683683
t.Overrides,
684684
t.PatternRingClient,
685+
t.tenantConfigs,
685686
t.Cfg.MetricsNamespace,
686687
prometheus.DefaultRegisterer,
687688
logger,

pkg/pattern/aggregation/push.go

+35-2
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ import (
88
"io"
99
"net/http"
1010
"net/url"
11+
"strings"
1112
"sync"
1213
"time"
1314

1415
"github.com/go-kit/log"
1516
"github.com/go-kit/log/level"
1617
"github.com/golang/snappy"
18+
"github.com/opentracing/opentracing-go"
1719
"github.com/prometheus/common/config"
1820
"github.com/prometheus/common/model"
1921
"github.com/prometheus/prometheus/model/labels"
@@ -160,7 +162,13 @@ func (p *Push) Stop() {
160162
}
161163

162164
// buildPayload creates the snappy compressed protobuf to send to Loki
163-
func (p *Push) buildPayload() ([]byte, error) {
165+
func (p *Push) buildPayload(ctx context.Context) ([]byte, error) {
166+
sp, _ := opentracing.StartSpanFromContext(
167+
ctx,
168+
"patternIngester.aggregation.Push.buildPayload",
169+
)
170+
defer sp.Finish()
171+
164172
entries := p.entries.reset()
165173

166174
entriesByStream := make(map[string][]logproto.Entry)
@@ -179,6 +187,14 @@ func (p *Push) buildPayload() ([]byte, error) {
179187
}
180188

181189
streams := make([]logproto.Stream, 0, len(entriesByStream))
190+
191+
// limit the number of services to log to 1000
192+
serviceLimit := len(entriesByStream)
193+
if serviceLimit > 1000 {
194+
serviceLimit = 1000
195+
}
196+
197+
services := make([]string, 0, serviceLimit)
182198
for s, entries := range entriesByStream {
183199
lbls, err := syntax.ParseLabels(s)
184200
if err != nil {
@@ -190,6 +206,10 @@ func (p *Push) buildPayload() ([]byte, error) {
190206
Entries: entries,
191207
Hash: lbls.Hash(),
192208
})
209+
210+
if len(services) < serviceLimit {
211+
services = append(services, lbls.Get(push.AggregatedMetricLabel))
212+
}
193213
}
194214

195215
req := &logproto.PushRequest{
@@ -202,6 +222,14 @@ func (p *Push) buildPayload() ([]byte, error) {
202222

203223
payload = snappy.Encode(nil, payload)
204224

225+
sp.LogKV(
226+
"event", "build aggregated metrics payload",
227+
"num_service", len(entriesByStream),
228+
"first_1k_services", strings.Join(services, ","),
229+
"num_streams", len(streams),
230+
"num_entries", len(entries),
231+
)
232+
205233
return payload, nil
206234
}
207235

@@ -221,7 +249,7 @@ func (p *Push) run(pushPeriod time.Duration) {
221249
cancel()
222250
return
223251
case <-pushTicker.C:
224-
payload, err := p.buildPayload()
252+
payload, err := p.buildPayload(ctx)
225253
if err != nil {
226254
level.Error(p.logger).Log("msg", "failed to build payload", "err", err)
227255
continue
@@ -265,9 +293,14 @@ func (p *Push) send(ctx context.Context, payload []byte) (int, error) {
265293
err error
266294
resp *http.Response
267295
)
296+
268297
// Set a timeout for the request
269298
ctx, cancel := context.WithTimeout(ctx, p.httpClient.Timeout)
270299
defer cancel()
300+
301+
sp, ctx := opentracing.StartSpanFromContext(ctx, "patternIngester.aggregation.Push.send")
302+
defer sp.Finish()
303+
271304
req, err := http.NewRequestWithContext(ctx, "POST", p.lokiURL, bytes.NewReader(payload))
272305
if err != nil {
273306
return -1, fmt.Errorf("failed to create push request: %w", err)

pkg/pattern/instance.go

+15-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/grafana/dskit/httpgrpc"
1313
"github.com/grafana/dskit/multierror"
1414
"github.com/grafana/dskit/ring"
15+
"github.com/opentracing/opentracing-go"
1516
"github.com/prometheus/common/model"
1617
"github.com/prometheus/prometheus/model/labels"
1718

@@ -95,7 +96,7 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
9596
for _, reqStream := range req.Streams {
9697
// All streams are observed for metrics
9798
// TODO(twhitney): this would be better as a queue that drops in response to backpressure
98-
i.Observe(reqStream.Labels, reqStream.Entries)
99+
i.Observe(ctx, reqStream.Labels, reqStream.Entries)
99100

100101
// But only owned streamed are processed for patterns
101102
ownedStream, err := i.isOwnedStream(i.ingesterID, reqStream.Labels)
@@ -252,10 +253,22 @@ func (i *instance) removeStream(s *stream) {
252253
}
253254
}
254255

255-
func (i *instance) Observe(stream string, entries []logproto.Entry) {
256+
func (i *instance) Observe(ctx context.Context, stream string, entries []logproto.Entry) {
256257
i.aggMetricsLock.Lock()
257258
defer i.aggMetricsLock.Unlock()
258259

260+
sp, _ := opentracing.StartSpanFromContext(
261+
ctx,
262+
"patternIngester.Observe",
263+
)
264+
defer sp.Finish()
265+
266+
sp.LogKV(
267+
"event", "observe stream for metrics",
268+
"stream", stream,
269+
"entries", len(entries),
270+
)
271+
259272
for _, entry := range entries {
260273
lvl := constants.LogLevelUnknown
261274
structuredMetadata := logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata)

pkg/pattern/tee_service.go

+42-1
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,16 @@ import (
2121
"github.com/grafana/loki/v3/pkg/loghttp/push"
2222
"github.com/grafana/loki/v3/pkg/logproto"
2323
"github.com/grafana/loki/v3/pkg/logql/syntax"
24+
"github.com/grafana/loki/v3/pkg/runtime"
25+
"github.com/grafana/loki/v3/pkg/util/spanlogger"
2426

2527
ring_client "github.com/grafana/dskit/ring/client"
2628
)
2729

2830
type TeeService struct {
2931
cfg Config
3032
limits Limits
33+
tenantCfgs *runtime.TenantConfigs
3134
logger log.Logger
3235
ringClient RingClient
3336
wg *sync.WaitGroup
@@ -51,6 +54,7 @@ func NewTeeService(
5154
cfg Config,
5255
limits Limits,
5356
ringClient RingClient,
57+
tenantCfgs *runtime.TenantConfigs,
5458
metricsNamespace string,
5559
registerer prometheus.Registerer,
5660
logger log.Logger,
@@ -86,6 +90,7 @@ func NewTeeService(
8690
),
8791
cfg: cfg,
8892
limits: limits,
93+
tenantCfgs: tenantCfgs,
8994
ringClient: ringClient,
9095

9196
wg: &sync.WaitGroup{},
@@ -293,10 +298,11 @@ func (ts *TeeService) sendBatch(ctx context.Context, clientRequest clientRequest
293298
// are gathered by this request
294299
_ = instrument.CollectedRequest(
295300
ctx,
296-
"FlushTeedLogsToPatternIngested",
301+
"FlushTeedLogsToPatternIngester",
297302
ts.sendDuration,
298303
instrument.ErrorCode,
299304
func(ctx context.Context) error {
305+
sp := spanlogger.FromContext(ctx)
300306
client, err := ts.ringClient.GetClientFor(clientRequest.ingesterAddr)
301307
if err != nil {
302308
return err
@@ -313,6 +319,41 @@ func (ts *TeeService) sendBatch(ctx context.Context, clientRequest clientRequest
313319
// Success here means the stream will be processed for both metrics and patterns
314320
ts.ingesterAppends.WithLabelValues(clientRequest.ingesterAddr, "success").Inc()
315321
ts.ingesterMetricAppends.WithLabelValues("success").Inc()
322+
323+
// limit logged labels to 1000
324+
labelsLimit := len(req.Streams)
325+
if labelsLimit > 1000 {
326+
labelsLimit = 1000
327+
}
328+
329+
labels := make([]string, 0, labelsLimit)
330+
for _, stream := range req.Streams {
331+
if len(labels) >= 1000 {
332+
break
333+
}
334+
335+
labels = append(labels, stream.Labels)
336+
}
337+
338+
sp.LogKV(
339+
"event", "forwarded push request to pattern ingester",
340+
"num_streams", len(req.Streams),
341+
"first_1k_labels", strings.Join(labels, ", "),
342+
"tenant", clientRequest.tenant,
343+
)
344+
345+
// this is basically the same as logging push request streams,
346+
// so put it behind the same flag
347+
if ts.tenantCfgs.LogPushRequestStreams(clientRequest.tenant) {
348+
level.Debug(ts.logger).
349+
Log(
350+
"msg", "forwarded push request to pattern ingester",
351+
"num_streams", len(req.Streams),
352+
"first_1k_labels", strings.Join(labels, ", "),
353+
"tenant", clientRequest.tenant,
354+
)
355+
}
356+
316357
return nil
317358
}
318359

pkg/pattern/tee_service_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"github.com/grafana/loki/v3/pkg/distributor"
1717
"github.com/grafana/loki/v3/pkg/logproto"
18+
"github.com/grafana/loki/v3/pkg/runtime"
1819

1920
"github.com/grafana/loki/pkg/push"
2021
)
@@ -51,6 +52,7 @@ func getTestTee(t *testing.T) (*TeeService, *mockPoolClient) {
5152
metricAggregationEnabled: true,
5253
},
5354
ringClient,
55+
runtime.DefaultTenantConfigs(),
5456
"test",
5557
nil,
5658
log.NewNopLogger(),

0 commit comments

Comments
 (0)