Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rhythm] Add more metrics #4395

Merged
merged 1 commit into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 41 additions & 2 deletions modules/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/grafana/tempo/tempodb"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/wal"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
Expand All @@ -27,6 +29,35 @@ const (
ConsumerGroup = "block-builder"
)

var (
metricPartitionLag = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "tempo",
Subsystem: "block_builder",
Name: "partition_lag",
Help: "Lag of a partition.",
}, []string{"partition"})
metricConsumeCycleDuration = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "tempo",
Subsystem: "block_builder",
Name: "consume_cycle_duration_seconds",
Help: "Time spent consuming a full cycle.",
NativeHistogramBucketFactor: 1.1,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to limit the max buckets by setting NativeHistogramMaxBucketNumber to something high so there is a bound.

will this work for folks who are not using Prometheus v2.40+?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've migrated all histograms in Tempo to native histograms. This shouldn't make a difference.

})
metricProcessPartitionSectionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "tempo",
Subsystem: "block_builder",
Name: "process_partition_section_duration_seconds",
Help: "Time spent processing one partition section.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same for this histogram

NativeHistogramBucketFactor: 1.1,
}, []string{"partition"})
metricFetchErrors = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "tempo",
Subsystem: "block_builder",
Name: "fetch_errors_total",
Help: "Total number of errors while fetching by the consumer.",
}, []string{"partition"})
)

type BlockBuilder struct {
services.Service

Expand Down Expand Up @@ -142,6 +173,7 @@ func (b *BlockBuilder) stopping(err error) error {

func (b *BlockBuilder) consumeCycle(ctx context.Context, cycleEndTime time.Time) error {
level.Info(b.logger).Log("msg", "starting consume cycle", "cycle_end", cycleEndTime)
defer func(t time.Time) { metricConsumeCycleDuration.Observe(time.Since(t).Seconds()) }(time.Now())

groupLag, err := getGroupLag(
ctx,
Expand All @@ -166,11 +198,13 @@ func (b *BlockBuilder) consumeCycle(ctx context.Context, cycleEndTime time.Time)
return fmt.Errorf("lag for partition %d not found", partition)
}

level.Info(b.logger).Log(
level.Debug(b.logger).Log(
"msg", "partition lag",
"partition", partition,
"lag", fmt.Sprintf("%+v", partitionLag),
) // TODO - Debug
)

metricPartitionLag.WithLabelValues(fmt.Sprintf("%d", partition)).Set(float64(partitionLag.Lag))

if partitionLag.Lag <= 0 {
level.Info(b.logger).Log(
Expand Down Expand Up @@ -227,6 +261,10 @@ func (b *BlockBuilder) consumePartitionSection(ctx context.Context, partition in
"lag", lag.Lag,
)

defer func(t time.Time) {
metricProcessPartitionSectionDuration.WithLabelValues(fmt.Sprintf("%d", partition)).Observe(time.Since(t).Seconds())
}(time.Now())

// TODO - Review what endTimestamp is used here
writer := newPartitionSectionWriter(b.logger, int64(partition), sectionEndTime.UnixMilli(), b.cfg.blockConfig, b.overrides, b.wal, b.enc)

Expand Down Expand Up @@ -260,6 +298,7 @@ consumerLoop:
fetches.EachError(func(_ string, _ int32, err error) {
if !errors.Is(err, context.Canceled) {
level.Error(b.logger).Log("msg", "failed to fetch records", "err", err)
metricFetchErrors.WithLabelValues(fmt.Sprintf("%d", partition)).Inc()
}
})

Expand Down
3 changes: 1 addition & 2 deletions modules/blockbuilder/tenant_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ var metricBlockBuilderFlushedBlocks = promauto.NewCounterVec(
Namespace: "tempo",
Subsystem: "block_builder",
Name: "flushed_blocks",
},
[]string{"tenant_id"},
}, []string{"tenant_id"},
)

// TODO - This needs locking
Expand Down
36 changes: 31 additions & 5 deletions modules/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,30 @@ var (
Name: "distributor_metrics_generator_clients",
Help: "The current number of metrics-generator clients.",
})
metricKafkaRecordsPerRequest = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "tempo",
Subsystem: "distributor",
Name: "kafka_records_per_request",
Help: "The number of records in each kafka request",
})
metricKafkaWriteLatency = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "tempo",
Subsystem: "distributor",
Name: "kafka_write_latency_seconds",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do we want to use the word Kafka here? or should we use a more generic word like queue, distributed_queue or durable_queue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it's too ambitious to call it anything else. It's kafka everywhere, not any durable queue.

Help: "The latency of writing to kafka",
})
metricKafkaWriteBytesTotal = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "tempo",
Subsystem: "distributor",
Name: "kafka_write_bytes_total",
Help: "The total number of bytes written to kafka",
})
metricKafkaAppends = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "tempo",
Subsystem: "distributor",
Name: "kafka_appends_total",
Help: "The total number of appends sent to kafka",
}, []string{"partition", "status"})

statBytesReceived = usagestats.NewCounter("distributor_bytes_received")
statSpansReceived = usagestats.NewCounter("distributor_spans_received")
Expand Down Expand Up @@ -597,29 +621,31 @@ func (d *Distributor) sendToKafka(ctx context.Context, userID string, keys []uin
return err
}

startTime := time.Now()

records, err := ingest.Encode(int32(partitionID), userID, req, d.cfg.KafkaConfig.ProducerMaxRecordSizeBytes)
if err != nil {
return fmt.Errorf("failed to encode PushSpansRequest: %w", err)
}

// d.kafkaRecordsPerRequest.Observe(float64(len(records)))
metricKafkaRecordsPerRequest.Observe(float64(len(records)))

produceResults := d.kafkaProducer.ProduceSync(ctx, records)

if count, sizeBytes := successfulProduceRecordsStats(produceResults); count > 0 {
// d.kafkaWriteLatency.Observe(time.Since(startTime).Seconds())
// d.kafkaWriteBytesTotal.Add(float64(sizeBytes))
metricKafkaWriteLatency.Observe(time.Since(startTime).Seconds())
metricKafkaWriteBytesTotal.Add(float64(sizeBytes))
_ = level.Debug(d.logger).Log("msg", "kafka write success stats", "count", count, "size_bytes", sizeBytes)
}

var finalErr error
for _, result := range produceResults {
if result.Err != nil {
_ = level.Error(d.logger).Log("msg", "failed to write to kafka", "err", result.Err)
// d.kafkaAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "fail").Inc()
metricKafkaAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "fail").Inc()
finalErr = result.Err
} else {
// d.kafkaAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "success").Inc()
metricKafkaAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "success").Inc()
}
}

Expand Down