Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blockbuilder: Add initial set of metrics #8447

Merged
merged 2 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 39 additions & 31 deletions pkg/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type BlockBuilder struct {
kafkaClient *kgo.Client
bucket objstore.Bucket

metrics blockBuilderMetrics

// for testing
tsdbBuilder func() builder

Expand All @@ -54,6 +56,7 @@ func New(
logger: logger,
register: reg,
limits: limits,
metrics: newBlockBuilderMetrics(reg),
}

b.tsdbBuilder = func() builder {
Expand Down Expand Up @@ -133,6 +136,10 @@ func (b *BlockBuilder) starting(ctx context.Context) (err error) {
func (b *BlockBuilder) handlePartitionsAssigned(_ context.Context, _ *kgo.Client, assignment map[string][]int32) {
level.Info(b.logger).Log("msg", "partition assigned", "assignment", fmt.Sprintf("%+v", assignment))

for topic, parts := range assignment {
b.metrics.assignedPartitions.WithLabelValues(topic).Set(float64(len(parts)))
}

// Pause fetching for all assigned partitions. We manage the order and the pace of the consumption ourself.
// TODO(codesome): how does this behave when there is a block building cycle in progress? Should we not pause the
// ones being consumed at the moment by this BB?
Expand All @@ -143,8 +150,6 @@ func (b *BlockBuilder) handlePartitionsAssigned(_ context.Context, _ *kgo.Client
b.assignmentMu.Unlock()
}

// TODO(codesome): question: is handlePartitionsAssigned also called by kafka client when partitions are revoked?
// TODO(codesome): how does this behave when there is a block building cycle in progress?
func (b *BlockBuilder) handlePartitionsLost(_ context.Context, _ *kgo.Client, lostAssignment map[string][]int32) {
level.Info(b.logger).Log("msg", "partition lost", "lost", fmt.Sprintf("%+v", lostAssignment))

Expand Down Expand Up @@ -212,6 +217,7 @@ func (b *BlockBuilder) running(ctx context.Context) error {
case cycleEnd := <-time.After(waitTime):
err := b.NextConsumeCycle(ctx, cycleEnd.Add(-time.Second))
if err != nil {
b.metrics.consumeCycleFailures.Inc()
level.Error(b.logger).Log("msg", "consume cycle failed", "cycle_end", cycleEnd, "err", err)
}

Expand Down Expand Up @@ -257,6 +263,10 @@ func (b *BlockBuilder) NextConsumeCycle(ctx context.Context, cycleEnd time.Time)
return fmt.Errorf("no partitions assigned in %+v, topic %s", assignment, b.cfg.Kafka.Topic)
}

defer func(t time.Time) {
b.metrics.consumeCycleDuration.Observe(time.Since(t).Seconds())
}(time.Now())

kadmClient := kadm.NewClient(b.kafkaClient)

lags, err := kadmClient.Lag(ctx, b.cfg.Kafka.ConsumerGroup)
Expand Down Expand Up @@ -301,6 +311,8 @@ func (b *BlockBuilder) NextConsumeCycle(ctx context.Context, cycleEnd time.Time)
level.Warn(b.logger).Log("msg", "error unmarshalling commit metadata", "err", err, "part", part, "offset", offset.At, "metadata", offset.Metadata)
}

b.metrics.consumerLag.WithLabelValues(pl.Topic, fmt.Sprintf("%d", pl.Partition)).Set(float64(pl.Lag))

parts[part] = partitionLag{
Partition: part,
Lag: pl.Lag,
Expand All @@ -317,8 +329,7 @@ func (b *BlockBuilder) NextConsumeCycle(ctx context.Context, cycleEnd time.Time)
}

// TODO(v): rebalancing can happen between the calls to consumePartition; if that happens, the instance may loose
// the ownership of some of its partitions
// TODO(v): test for this case
// the ownership of some of its partitions. Add a test for this case.

for _, pl := range parts {
// TODO(codesome): when we deploy it first, we will not have any kafka commit and we will
Expand All @@ -343,7 +354,6 @@ func (b *BlockBuilder) NextConsumeCycle(ctx context.Context, cycleEnd time.Time)

// Make sure to unblock rebalance of the group after the partition was consumed AND after we (potentially) committed
// this partition's offset to the group.
// TODO(v): test for this case
b.kafkaClient.AllowRebalance()
continue
}
Expand All @@ -356,7 +366,7 @@ func (b *BlockBuilder) NextConsumeCycle(ctx context.Context, cycleEnd time.Time)
// in the next iteration.
lag, seenTillTs, lastBlockEnd, err = b.consumePartition(ctx, pl.Partition, lag, seenTillTs, lastBlockEnd, ce)
if err != nil {
level.Error(b.logger).Log("msg", "failed to consume partition", "err", err, "part")
level.Error(b.logger).Log("msg", "failed to consume partition", "err", err, "part", pl.Partition)
}
// If adding the ConsumeInterval takes it beyond the cycleEnd, we set it to the cycleEnd to not
// exit the loop without consuming until cycleEnd.
Expand All @@ -367,7 +377,6 @@ func (b *BlockBuilder) NextConsumeCycle(ctx context.Context, cycleEnd time.Time)

// Make sure to unblock rebalance of the group after the partition was consumed AND after we (potentially) committed
// this partition's offset to the group.
// TODO(v): test for this case
b.kafkaClient.AllowRebalance()
}

Expand Down Expand Up @@ -399,7 +408,9 @@ func (b *BlockBuilder) consumePartition(
level.Info(b.logger).Log("msg", "consume partition", "part", part, "lag", lag, "cycle_end", cycleEnd)

defer func(t time.Time) {
level.Info(b.logger).Log("msg", "done consuming partition", "part", part, "dur", time.Since(t))
dur := time.Since(t)
b.metrics.processPartitionDuration.Observe(dur.Seconds())
level.Info(b.logger).Log("msg", "done consuming partition", "part", part, "dur", dur)
}(time.Now())

builder := b.tsdbBuilder()
Expand Down Expand Up @@ -431,10 +442,14 @@ func (b *BlockBuilder) consumePartition(
fetches.EachError(func(_ string, part int32, err error) {
if !errors.Is(err, context.DeadlineExceeded) {
level.Error(b.logger).Log("msg", "failed to fetch records", "part", part, "err", err)
b.metrics.fetchErrors.Inc()
}
})

if fetches.Empty() && lag <= 0 {
numRecs := fetches.NumRecords()
b.metrics.fetchRecordsTotal.Add(float64(numRecs))

if numRecs == 0 && lag <= 0 {
level.Warn(b.logger).Log("msg", "got empty fetches from broker", "part", part)
break
}
Expand All @@ -461,10 +476,9 @@ func (b *BlockBuilder) consumePartition(
recProcessedBefore := rec.Timestamp.UnixMilli() <= seenTillTs
allSamplesProcessed, err := builder.process(ctx, rec, lastBlockMax, blockMax, recProcessedBefore)
if err != nil {
// TODO(codesome): do we just ignore this? What if it was Mimir's issue and this leading to data loss?
level.Error(b.logger).Log("msg", "failed to process record", "part", part, "key", string(rec.Key), "err", err)
continue
// TODO(codesome): do we just ignore this? What if it was Mimir's issue and this leading to data loss?
// TODO(codesome): add metric
}
if !allSamplesProcessed && commitRec == nil {
// If block builder restarts, it will start consuming from the record after this from kafka.
Expand All @@ -477,7 +491,6 @@ func (b *BlockBuilder) consumePartition(
}

if err := builder.compactAndUpload(ctx, b.blockUploaderForUser); err != nil {
// TODO(codesome): add metric
return lag, seenTillTs, lastBlockMax, err
}

Expand All @@ -493,7 +506,7 @@ func (b *BlockBuilder) consumePartition(
Epoch: firstRec.LeaderEpoch,
Offset: firstRec.Offset,
}
b.seekPartition(ctx, part, rec)
b.seekPartition(part, rec)
return lag, seenTillTs, lastBlockMax, nil
}

Expand All @@ -510,7 +523,7 @@ func (b *BlockBuilder) consumePartition(
Epoch: commitRec.LeaderEpoch,
Offset: commitRec.Offset + 1, // offset+1 means everything up (including) to commitRec was processed
}
b.seekPartition(ctx, part, rec)
b.seekPartition(part, rec)
}()

// We should take the max of "seen till" timestamp. If the partition was lagging
Expand All @@ -528,7 +541,7 @@ func (b *BlockBuilder) consumePartition(
return lag, seenTillTs, commitBlockMax, err
}

func (b *BlockBuilder) seekPartition(ctx context.Context, part int32, rec kgo.EpochOffset) {
func (b *BlockBuilder) seekPartition(part int32, rec kgo.EpochOffset) {
offsets := map[string]map[int32]kgo.EpochOffset{
b.cfg.Kafka.Topic: {
part: rec,
Expand Down Expand Up @@ -597,14 +610,14 @@ const (
kafkaCommitMetaV1 = 1
)

// commitRecTs: timestamp of the record which was comitted (and not the commit time).
// commitRecTs: timestamp of the record which was committed (and not the commit time).
// lastRecTs: timestamp of the last record processed (which will be >= commitRecTs).
// blockEnd: timestamp of the block end in this cycle.
func marshallCommitMeta(commitRecTs, lastRecTs, blockEnd int64) string {
return fmt.Sprintf("%d,%d,%d,%d", kafkaCommitMetaV1, commitRecTs, lastRecTs, blockEnd)
}

// commitRecTs: timestamp of the record which was comitted (and not the commit time).
// commitRecTs: timestamp of the record which was committed (and not the commit time).
// lastRecTs: timestamp of the last record processed (which will be >= commitRecTs).
// blockEnd: timestamp of the block end in this cycle.
func unmarshallCommitMeta(meta string) (commitRecTs, lastRecTs, blockEnd int64, err error) {
Expand Down Expand Up @@ -637,15 +650,14 @@ type Config struct {
BlocksStorageConfig mimir_tsdb.BlocksStorageConfig `yaml:"-"` // TODO(codesome): check how this is passed. Copied over form ingester.
}

// RegisterFlags registers the MultitenantCompactor flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
cfg.Kafka.RegisterFlags(f, logger)
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.Kafka.RegisterFlags(f)

f.DurationVar(&cfg.ConsumeInterval, "consume-internal", time.Hour, "Interval between block consumption cycles.")
f.DurationVar(&cfg.ConsumeIntervalBuffer, "consume-internal-buffer", 5*time.Minute, "Extra buffer between subsequent block consumption cycles to avoid small blocks.")
}

func (cfg *Config) Validate(logger log.Logger) error {
func (cfg *Config) Validate() error {
if err := cfg.Kafka.Validate(); err != nil {
return err
}
Expand All @@ -659,18 +671,14 @@ func (cfg *Config) Validate(logger log.Logger) error {

// KafkaConfig holds the generic config for the Kafka backend.
type KafkaConfig struct {
Address string `yaml:"address"`
Topic string `yaml:"topic"`
ClientID string `yaml:"client_id"`
DialTimeout time.Duration `yaml:"dial_timeout"`

ConsumerGroup string `yaml:"consumer_group"`
ConsumerGroupOffsetCommitInterval time.Duration `yaml:"consumer_group_offset_commit_interval"`

ConsumeFromPositionAtStartup string `yaml:"consume_from_position_at_startup"`
Address string `yaml:"address"`
Topic string `yaml:"topic"`
ClientID string `yaml:"client_id"`
DialTimeout time.Duration `yaml:"dial_timeout"`
ConsumerGroup string `yaml:"consumer_group"`
}

func (cfg *KafkaConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
func (cfg *KafkaConfig) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("block-builder.kafka", f)
}

Expand Down
58 changes: 58 additions & 0 deletions pkg/blockbuilder/blockbuilder_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package blockbuilder

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

type blockBuilderMetrics struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a metric for number of partitions handled by this block builder? Other way to determine is taking the count on perPartitionLog in queries, so maybe it is not required

consumeCycleDuration prometheus.Histogram
consumeCycleFailures prometheus.Counter
processPartitionDuration prometheus.Histogram
fetchRecordsTotal prometheus.Counter
fetchErrors prometheus.Counter
assignedPartitions *prometheus.GaugeVec
consumerLag *prometheus.GaugeVec
}

func newBlockBuilderMetrics(reg prometheus.Registerer) blockBuilderMetrics {
var m blockBuilderMetrics

m.consumeCycleFailures = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_blockbuilder_consume_cycle_failed_total",
Help: "Total number of failed consume cycles.",
})
m.consumeCycleDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_blockbuilder_consume_cycle_duration_seconds",
Help: "Time spent consuming a full cycle.",

NativeHistogramBucketFactor: 1.1,
})

m.processPartitionDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_blockbuilder_process_partition_duration_seconds",
Help: "Time spent processing one partition.",
NativeHistogramBucketFactor: 1.1,
})

m.fetchRecordsTotal = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_blockbuilder_fetch_records_total",
Help: "Total number of records received by the consumer.",
})
m.fetchErrors = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_blockbuilder_fetch_errors_total",
Help: "Total number of errors while fetching by the consumer.",
})

m.assignedPartitions = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_blockbuilder_assigned_partitions",
Help: "The number of partitions currently assigned to this instance.",
}, []string{"topic"})

m.consumerLag = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_blockbuilder_consumer_lag",
Help: "The per-partition lag instance needs to work through each cycle.",
}, []string{"topic", "partition"})

return m
}
17 changes: 13 additions & 4 deletions pkg/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ func TestBlockBuilder(t *testing.T) {
createAndProduceSample := func(t *testing.T, sampleTs, kafkaRecTs time.Time) {
samples := floatSample(sampleTs.UnixMilli())
val := createWriteRequest(t, samples, nil)
produceRecords(t, ctx, writeClient, kafkaRecTs, userID, testTopic, 0, val)
produceRecords(ctx, t, writeClient, kafkaRecTs, userID, testTopic, 0, val)

hSamples := histogramSample(sampleTs.UnixMilli())
val = createWriteRequest(t, nil, hSamples)
produceRecords(t, ctx, writeClient, kafkaRecTs, userID, testTopic, 1, val)
produceRecords(ctx, t, writeClient, kafkaRecTs, userID, testTopic, 1, val)

kafkaSamples = append(kafkaSamples, samples...)
kafkaHSamples = append(kafkaHSamples, hSamples...)
Expand Down Expand Up @@ -407,7 +407,7 @@ func TestBlockBuilder_StartupWithExistingCommit(t *testing.T) {
sampleTs := kafkaTime.Add(-time.Minute)
samples := floatSample(sampleTs.UnixMilli())
val := createWriteRequest(t, samples, nil)
produceRecords(t, ctx, writeClient, kafkaTime, "1", testTopic, 0, val)
produceRecords(ctx, t, writeClient, kafkaTime, "1", testTopic, 0, val)
expSamples = append(expSamples, samples...)
}

Expand Down Expand Up @@ -473,7 +473,16 @@ func TestBlockBuilder_StartupWithExistingCommit(t *testing.T) {
)
}

func produceRecords(t *testing.T, ctx context.Context, writeClient *kgo.Client, ts time.Time, userID, topic string, part int32, val []byte) kgo.ProduceResults {
func produceRecords(
ctx context.Context,
t *testing.T,
writeClient *kgo.Client,
ts time.Time,
userID string,
topic string,
part int32,
val []byte,
) kgo.ProduceResults {
rec := &kgo.Record{
Timestamp: ts,
Key: []byte(userID),
Expand Down
2 changes: 2 additions & 0 deletions pkg/blockbuilder/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func newTSDBBuilder(logger log.Logger, limits *validation.Overrides, blocksStora
// where the sample was not put in the TSDB because it was discarded or was already processed before.
// lastEnd: "end" time of the previous block building cycle.
// currEnd: end time of the block we are looking at right now.
// TODO(codesome): add metric
func (b *tsdbBuilder) process(ctx context.Context, rec *kgo.Record, lastBlockMax, blockMax int64, recordProcessedBefore bool) (_ bool, err error) {
userID := string(rec.Key)

Expand Down Expand Up @@ -274,6 +275,7 @@ func (b *tsdbBuilder) newTSDB(tenant tsdbTenant) (*userTSDB, error) {

// compactAndUpload compacts the blocks of all the TSDBs
// and uploads them.
// TODO(codesome): add metric
func (b *tsdbBuilder) compactAndUpload(ctx context.Context, blockUploaderForUser func(context.Context, string) blockUploader) error {
b.tsdbsMu.Lock()
defer b.tsdbsMu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
c.Frontend.RegisterFlags(f, logger)
c.IngestStorage.RegisterFlags(f)
c.BlocksStorage.RegisterFlags(f)
c.BlockBuilder.RegisterFlags(f, logger)
c.BlockBuilder.RegisterFlags(f)
c.Compactor.RegisterFlags(f, logger)
c.StoreGateway.RegisterFlags(f, logger)
c.TenantFederation.RegisterFlags(f)
Expand Down
Loading