Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Rhythm] Block-builder consumption loop #4480

Merged
merged 19 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 204 additions & 38 deletions modules/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"strconv"
"time"

"github.com/go-kit/log"
Expand All @@ -27,6 +27,7 @@ import (
const (
blockBuilderServiceName = "block-builder"
ConsumerGroup = "block-builder"
pollTimeout = 2 * time.Second
)

var (
Expand All @@ -36,6 +37,12 @@ var (
Name: "partition_lag",
Help: "Lag of a partition.",
}, []string{"partition"})
metricPartitionLagSeconds = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "tempo",
Subsystem: "block_builder",
Name: "partition_lag_s",
mapno marked this conversation as resolved.
Show resolved Hide resolved
mdisibio marked this conversation as resolved.
Show resolved Hide resolved
Help: "Lag of a partition in seconds.",
}, []string{"partition"})
metricConsumeCycleDuration = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "tempo",
Subsystem: "block_builder",
Expand Down Expand Up @@ -67,6 +74,7 @@ type BlockBuilder struct {
fallbackOffsetMillis int64
mdisibio marked this conversation as resolved.
Show resolved Hide resolved

kafkaClient *kgo.Client
kadm *kadm.Client
decoder *ingest.Decoder
partitionRing ring.PartitionRingReader

Expand Down Expand Up @@ -142,10 +150,14 @@ func (b *BlockBuilder) starting(ctx context.Context) (err error) {
return fmt.Errorf("failed to ping kafka: %w", err)
}

b.kadm = kadm.NewClient(b.kafkaClient)

go b.metricLag(ctx)

return nil
}

func (b *BlockBuilder) running(ctx context.Context) error {
func (b *BlockBuilder) runningOld(ctx context.Context) error {
// Initial polling and delay
cycleEndTime := cycleEndAtStartup(time.Now(), b.cfg.ConsumeCycleDuration)
waitTime := 2 * time.Second
Expand All @@ -167,6 +179,195 @@ func (b *BlockBuilder) running(ctx context.Context) error {
}
}

func (b *BlockBuilder) running(ctx context.Context) error {
// Initial delay
waitTime := 0 * time.Second
for {
select {
case <-time.After(waitTime):
err := b.consume(ctx)
if err != nil {
level.Error(b.logger).Log("msg", "consumeCycle failed", "err", err)
}

// Real delay on subsequent
waitTime = b.cfg.ConsumeCycleDuration
case <-ctx.Done():
return nil
}
}
}

func (b *BlockBuilder) consume(ctx context.Context) error {
var (
end = time.Now()
partitions = b.getAssignedActivePartitions()
)

level.Info(b.logger).Log("msg", "starting consume cycle", "cycle_end", end, "active_partitions", partitions)
defer func(t time.Time) { metricConsumeCycleDuration.Observe(time.Since(t).Seconds()) }(time.Now())

for _, partition := range partitions {
// Consume partition while data remains.
// TODO - round-robin one consumption per partition instead to equalize catch-up time.
for {
more, err := b.consumePartition2(ctx, partition, end)
if err != nil {
return err
}

if !more {
break
}
}
}

return nil
}

func (b *BlockBuilder) consumePartition2(ctx context.Context, partition int32, overallEnd time.Time) (more bool, err error) {
defer func(t time.Time) {
metricProcessPartitionSectionDuration.WithLabelValues(strconv.Itoa(int(partition))).Observe(time.Since(t).Seconds())
}(time.Now())

var (
dur = b.cfg.ConsumeCycleDuration
topic = b.cfg.IngestStorageConfig.Kafka.Topic
group = b.cfg.IngestStorageConfig.Kafka.ConsumerGroup
startOffset kgo.Offset
init bool
writer *writer
lastRec *kgo.Record
end time.Time
)

commits, err := b.kadm.FetchOffsetsForTopics(ctx, group, topic)
if err != nil {
return false, err
}

lastCommit, ok := commits.Lookup(topic, partition)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
lastCommit, ok := commits.Lookup(topic, partition)
lastCommit, exists := commits.Lookup(topic, partition)
if exists && lastCommit.At >= 0 {
startOffset = startOffset.At(lastCommit.At)
} else {
startOffset = kgo.NewOffset().AtStart()
}

https://pkg.go.dev/github.com/twmb/franz-go/pkg/[email protected]#OffsetResponses.Lookup

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ok is more idiomatic, and the lib is reading from a map internally anyway.

if ok && lastCommit.At >= 0 {
startOffset = startOffset.At(lastCommit.At)
} else {
startOffset = kgo.NewOffset().AtStart()
}

// We always rewind the partition's offset to the commit offset by reassigning the partition to the client (this triggers partition assignment).
// This is so the cycle started exactly at the commit offset, and not at what was (potentially over-) consumed previously.
// In the end, we remove the partition from the client (refer to the defer below) to guarantee the client always consumes
// from one partition at a time. I.e. when this partition is consumed, we start consuming the next one.
b.kafkaClient.AddConsumePartitions(map[string]map[int32]kgo.Offset{
topic: {
partition: startOffset,
},
})
defer b.kafkaClient.RemoveConsumePartitions(map[string][]int32{topic: {partition}})

outer:
for {
fetches := func() kgo.Fetches {
ctx2, cancel := context.WithTimeout(ctx, pollTimeout)
defer cancel()
return b.kafkaClient.PollFetches(ctx2)
}()
err = fetches.Err()
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
// No more data
break
}
metricFetchErrors.WithLabelValues(strconv.Itoa(int(partition))).Inc()
return false, err
}

if fetches.Empty() {
break
}

for iter := fetches.RecordIter(); !iter.Done(); {
rec := iter.Next()

// Initialize on first record
if !init {
end = rec.Timestamp.Add(dur) // When block will be cut
metricPartitionLagSeconds.WithLabelValues(strconv.Itoa(int(partition))).Set(time.Since(rec.Timestamp).Seconds())
writer = newPartitionSectionWriter(b.logger, uint64(partition), uint64(rec.Offset), b.cfg.BlockConfig, b.overrides, b.wal, b.enc)
init = true
}

if rec.Timestamp.After(end) {
// Cut this block but continue only if we have at least another full cycle
if overallEnd.Sub(rec.Timestamp) >= dur {
more = true
}
break outer
}

if rec.Timestamp.After(overallEnd) {
break outer
}

err := b.pushTraces(rec.Key, rec.Value, writer)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if something is wrong with the WAL? I guess it will enter in a loop

return false, err
}

lastRec = rec
}
}

if lastRec == nil {
// Received no data
return false, nil
}

err = writer.flush(ctx, b.writer)
if err != nil {
return false, err
}

// TOOD - Retry commit
resp, err := b.kadm.CommitOffsets(ctx, group, kadm.OffsetsFromRecords(*lastRec))
if err != nil {
return false, err
}
if err := resp.Error(); err != nil {
return false, err
}

return more, nil
}

func (b *BlockBuilder) metricLag(ctx context.Context) {
var (
waitTime = time.Second * 15
topic = b.cfg.IngestStorageConfig.Kafka.Topic
group = b.cfg.IngestStorageConfig.Kafka.ConsumerGroup
)

for {
select {
case <-time.After(waitTime):
metricPartitionLag.Reset()
mdisibio marked this conversation as resolved.
Show resolved Hide resolved

lag, err := getGroupLag(ctx, b.kadm, topic, group)
if err != nil {
level.Error(b.logger).Log("msg", "metric lag failed:", "err", err)
continue
}
for _, p := range b.getAssignedActivePartitions() {
l, ok := lag.Lookup(topic, p)
if ok {
metricPartitionLag.WithLabelValues(strconv.Itoa(int(p))).Set(float64(l.Lag))
}
}
case <-ctx.Done():
return
}
}
}

func (b *BlockBuilder) stopping(err error) error {
if b.kafkaClient != nil {
b.kafkaClient.Close()
Expand All @@ -183,7 +384,6 @@ func (b *BlockBuilder) consumeCycle(ctx context.Context, cycleEndTime time.Time)
kadm.NewClient(b.kafkaClient),
b.cfg.IngestStorageConfig.Kafka.Topic,
b.cfg.IngestStorageConfig.Kafka.ConsumerGroup,
b.fallbackOffsetMillis,
)
if err != nil {
return fmt.Errorf("failed to get group lag: %w", err)
Expand Down Expand Up @@ -433,7 +633,7 @@ func (b *BlockBuilder) getAssignedActivePartitions() []int32 {
// the lag is the difference between the last produced offset and the offset committed in the consumer group.
// Otherwise, if the block builder didn't commit an offset for a given partition yet (e.g. block builder is
// running for the first time), then the lag is the difference between the last produced offset and fallbackOffsetMillis.
func getGroupLag(ctx context.Context, admClient *kadm.Client, topic, group string, fallbackOffsetMillis int64) (kadm.GroupLag, error) {
func getGroupLag(ctx context.Context, admClient *kadm.Client, topic, group string) (kadm.GroupLag, error) {
offsets, err := admClient.FetchOffsets(ctx, group)
if err != nil {
if !errors.Is(err, kerr.GroupIDNotFound) {
Expand All @@ -453,40 +653,6 @@ func getGroupLag(ctx context.Context, admClient *kadm.Client, topic, group strin
return nil, err
}

resolveFallbackOffsets := sync.OnceValues(func() (kadm.ListedOffsets, error) {
if fallbackOffsetMillis < 0 {
return nil, fmt.Errorf("cannot resolve fallback offset for value %v", fallbackOffsetMillis)
}
return admClient.ListOffsetsAfterMilli(ctx, fallbackOffsetMillis, topic)
})
// If the group-partition in offsets doesn't have a commit, fall back depending on where fallbackOffsetMillis points at.
for topic, pt := range startOffsets.Offsets() {
for partition, startOffset := range pt {
if _, ok := offsets.Lookup(topic, partition); ok {
continue
}
fallbackOffsets, err := resolveFallbackOffsets()
if err != nil {
return nil, fmt.Errorf("resolve fallback offsets: %w", err)
}
o, ok := fallbackOffsets.Lookup(topic, partition)
if !ok {
return nil, fmt.Errorf("partition %d not found in fallback offsets for topic %s", partition, topic)
}
if o.Offset < startOffset.At {
// Skip the resolved fallback offset if it's before the partition's start offset (i.e. before the earliest offset of the partition).
// This should not happen in Kafka, but can happen in Kafka-compatible systems, e.g. Warpstream.
continue
}
offsets.Add(kadm.OffsetResponse{Offset: kadm.Offset{
Topic: o.Topic,
Partition: o.Partition,
At: o.Offset,
LeaderEpoch: o.LeaderEpoch,
}})
}
}

descrGroup := kadm.DescribedGroup{
// "Empty" is the state that indicates that the group doesn't have active consumer members; this is always the case for block-builder,
// because we don't use group consumption.
Expand Down
15 changes: 3 additions & 12 deletions modules/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,10 @@ func TestBlockbuilder_lookbackOnNoCommit(t *testing.T) {
ctx, cancel := context.WithCancelCause(context.Background())
t.Cleanup(func() { cancel(errors.New("test done")) })

k, address := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 1, "test-topic")
k, _ := testkafka.CreateCluster(t, 1, "test-topic")
t.Cleanup(k.Close)

kafkaCommits := atomic.NewInt32(0)
k.ControlKey(kmsg.OffsetCommit.Int16(), func(kmsg.Request) (kmsg.Response, error, bool) {
kafkaCommits.Add(1)
return nil, nil, false
})
address := k.ListenAddrs()[0]

store := newStore(ctx, t)
cfg := blockbuilderConfig(t, address)
Expand All @@ -59,14 +55,9 @@ func TestBlockbuilder_lookbackOnNoCommit(t *testing.T) {
client := newKafkaClient(t, cfg.IngestStorageConfig.Kafka)
sendReq(t, ctx, client)

// Wait for record to be consumed and committed.
require.Eventually(t, func() bool {
return kafkaCommits.Load() > 0
}, time.Minute, time.Second)

// Wait for the block to be flushed.
require.Eventually(t, func() bool {
return len(store.BlockMetas(util.FakeTenantID)) == 1
return len(store.BlockMetas(util.FakeTenantID)) == 1 && store.BlockMetas(util.FakeTenantID)[0].TotalObjects == 1
}, time.Minute, time.Second)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ingest/testkafka/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func addSupportForConsumerGroups(t testing.TB, cluster *kfake.Cluster, topicName
// This mimics the real Kafka behaviour.
var partitionsResp []kmsg.OffsetFetchResponseGroupTopicPartition
if partitionID == allPartitions {
for i := int32(1); i < numPartitions+1; i++ {
for i := int32(0); i < numPartitions; i++ {
if committedOffsets[consumerGroup][i] >= 0 {
partitionsResp = append(partitionsResp, kmsg.OffsetFetchResponseGroupTopicPartition{
Partition: i,
Expand Down
Loading