diff --git a/modules/blockbuilder/blockbuilder.go b/modules/blockbuilder/blockbuilder.go index cf7b13dce6a..744ecc4214c 100644 --- a/modules/blockbuilder/blockbuilder.go +++ b/modules/blockbuilder/blockbuilder.go @@ -17,6 +17,8 @@ import ( "github.com/grafana/tempo/tempodb" "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/wal" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo" @@ -27,6 +29,35 @@ const ( ConsumerGroup = "block-builder" ) +var ( + metricPartitionLag = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "tempo", + Subsystem: "block_builder", + Name: "partition_lag", + Help: "Lag of a partition.", + }, []string{"partition"}) + metricConsumeCycleDuration = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: "tempo", + Subsystem: "block_builder", + Name: "consume_cycle_duration_seconds", + Help: "Time spent consuming a full cycle.", + NativeHistogramBucketFactor: 1.1, + }) + metricProcessPartitionSectionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "tempo", + Subsystem: "block_builder", + Name: "process_partition_section_duration_seconds", + Help: "Time spent processing one partition section.", + NativeHistogramBucketFactor: 1.1, + }, []string{"partition"}) + metricFetchErrors = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "tempo", + Subsystem: "block_builder", + Name: "fetch_errors_total", + Help: "Total number of errors while fetching by the consumer.", + }, []string{"partition"}) +) + type BlockBuilder struct { services.Service @@ -142,6 +173,7 @@ func (b *BlockBuilder) stopping(err error) error { func (b *BlockBuilder) consumeCycle(ctx context.Context, cycleEndTime time.Time) error { level.Info(b.logger).Log("msg", "starting consume cycle", "cycle_end", cycleEndTime) + defer func(t time.Time) { metricConsumeCycleDuration.Observe(time.Since(t).Seconds()) }(time.Now()) groupLag, err := getGroupLag( ctx, @@ -166,11 +198,13 @@ func (b *BlockBuilder) consumeCycle(ctx context.Context, cycleEndTime time.Time) return fmt.Errorf("lag for partition %d not found", partition) } - level.Info(b.logger).Log( + level.Debug(b.logger).Log( "msg", "partition lag", "partition", partition, "lag", fmt.Sprintf("%+v", partitionLag), - ) // TODO - Debug + ) + + metricPartitionLag.WithLabelValues(fmt.Sprintf("%d", partition)).Set(float64(partitionLag.Lag)) if partitionLag.Lag <= 0 { level.Info(b.logger).Log( @@ -227,6 +261,10 @@ func (b *BlockBuilder) consumePartitionSection(ctx context.Context, partition in "lag", lag.Lag, ) + defer func(t time.Time) { + metricProcessPartitionSectionDuration.WithLabelValues(fmt.Sprintf("%d", partition)).Observe(time.Since(t).Seconds()) + }(time.Now()) + // TODO - Review what endTimestamp is used here writer := newPartitionSectionWriter(b.logger, int64(partition), sectionEndTime.UnixMilli(), b.cfg.blockConfig, b.overrides, b.wal, b.enc) @@ -260,6 +298,7 @@ consumerLoop: fetches.EachError(func(_ string, _ int32, err error) { if !errors.Is(err, context.Canceled) { level.Error(b.logger).Log("msg", "failed to fetch records", "err", err) + metricFetchErrors.WithLabelValues(fmt.Sprintf("%d", partition)).Inc() } }) diff --git a/modules/blockbuilder/tenant_store.go b/modules/blockbuilder/tenant_store.go index b49bf56acc2..89603fbe96c 100644 --- a/modules/blockbuilder/tenant_store.go +++ b/modules/blockbuilder/tenant_store.go @@ -24,8 +24,7 @@ var metricBlockBuilderFlushedBlocks = promauto.NewCounterVec( Namespace: "tempo", Subsystem: "block_builder", Name: "flushed_blocks", - }, - []string{"tenant_id"}, + }, []string{"tenant_id"}, ) // TODO - This needs locking diff --git a/modules/distributor/distributor.go b/modules/distributor/distributor.go index 812ab1a88a4..10de5b7d287 100644 --- a/modules/distributor/distributor.go +++ b/modules/distributor/distributor.go @@ -116,6 +116,30 @@ var ( Name: "distributor_metrics_generator_clients", Help: "The current number of metrics-generator clients.", }) + metricKafkaRecordsPerRequest = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: "tempo", + Subsystem: "distributor", + Name: "kafka_records_per_request", + Help: "The number of records in each kafka request", + }) + metricKafkaWriteLatency = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: "tempo", + Subsystem: "distributor", + Name: "kafka_write_latency_seconds", + Help: "The latency of writing to kafka", + }) + metricKafkaWriteBytesTotal = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "tempo", + Subsystem: "distributor", + Name: "kafka_write_bytes_total", + Help: "The total number of bytes written to kafka", + }) + metricKafkaAppends = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "tempo", + Subsystem: "distributor", + Name: "kafka_appends_total", + Help: "The total number of appends sent to kafka", + }, []string{"partition", "status"}) statBytesReceived = usagestats.NewCounter("distributor_bytes_received") statSpansReceived = usagestats.NewCounter("distributor_spans_received") @@ -597,18 +621,20 @@ func (d *Distributor) sendToKafka(ctx context.Context, userID string, keys []uin return err } + startTime := time.Now() + records, err := ingest.Encode(int32(partitionID), userID, req, d.cfg.KafkaConfig.ProducerMaxRecordSizeBytes) if err != nil { return fmt.Errorf("failed to encode PushSpansRequest: %w", err) } - // d.kafkaRecordsPerRequest.Observe(float64(len(records))) + metricKafkaRecordsPerRequest.Observe(float64(len(records))) produceResults := d.kafkaProducer.ProduceSync(ctx, records) if count, sizeBytes := successfulProduceRecordsStats(produceResults); count > 0 { - // d.kafkaWriteLatency.Observe(time.Since(startTime).Seconds()) - // d.kafkaWriteBytesTotal.Add(float64(sizeBytes)) + metricKafkaWriteLatency.Observe(time.Since(startTime).Seconds()) + metricKafkaWriteBytesTotal.Add(float64(sizeBytes)) _ = level.Debug(d.logger).Log("msg", "kafka write success stats", "count", count, "size_bytes", sizeBytes) } @@ -616,10 +642,10 @@ func (d *Distributor) sendToKafka(ctx context.Context, userID string, keys []uin for _, result := range produceResults { if result.Err != nil { _ = level.Error(d.logger).Log("msg", "failed to write to kafka", "err", result.Err) - // d.kafkaAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "fail").Inc() + metricKafkaAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "fail").Inc() finalErr = result.Err } else { - // d.kafkaAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "success").Inc() + metricKafkaAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "success").Inc() } }