Skip to content

Commit

Permalink
[rhythm] Multiple fixes to block-builder consumption (grafana#4413)
Browse files Browse the repository at this point in the history
* Multiple fixes to cycle consumption

* fmt

* happy now?

* ups
  • Loading branch information
mapno committed Jan 10, 2025
1 parent 9084814 commit cab05ca
Show file tree
Hide file tree
Showing 9 changed files with 265 additions and 15 deletions.
51 changes: 44 additions & 7 deletions modules/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,7 @@ func (b *BlockBuilder) running(ctx context.Context) error {
continue
}

cycleEndTime = cycleEndTime.Add(b.cfg.ConsumeCycleDuration)
waitTime = time.Until(cycleEndTime)
cycleEndTime, waitTime = nextCycleEnd(cycleEndTime, b.cfg.ConsumeCycleDuration)
case <-ctx.Done():
return nil
}
Expand Down Expand Up @@ -230,14 +229,42 @@ func (b *BlockBuilder) consumeCycle(ctx context.Context, cycleEndTime time.Time)
}

func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, partitionLag kadm.GroupMemberLag, cycleEndTime time.Time) error {
level.Info(b.logger).Log("msg", "consuming partition", "partition", partition)
level.Info(b.logger).Log(
"msg", "consuming partition",
"partition", partition,
"cycle_end", cycleEndTime,
)

sectionEndTime := cycleEndTime
commitRecTs := time.UnixMilli(max(partitionLag.Commit.At, b.fallbackOffsetMillis))
if sectionEndTime.Sub(commitRecTs) > time.Duration(1.5*float64(b.cfg.ConsumeCycleDuration)) {

lastCommitTs, err := unmarshallCommitMeta(partitionLag.Commit.Metadata)
if err != nil {
return fmt.Errorf("failed to unmarshal commit metadata: %w", err)
}
if lastCommitTs == 0 {
lastCommitTs = b.fallbackOffsetMillis // No commit yet, use fallback offset.
}
commitRecTs := time.UnixMilli(lastCommitTs)

// We need to align the commit record timestamp to the section end time so we don't consume the same section again.
commitSectionEndTime := alignToSectionEndTime(commitRecTs, b.cfg.ConsumeCycleDuration)
if sectionEndTime.Sub(commitSectionEndTime) > time.Duration(1.5*float64(b.cfg.ConsumeCycleDuration)) {
// We're lagging behind or there is no commit, we need to consume in smaller sections.
sectionEndTime, _ = nextCycleEnd(commitRecTs, b.cfg.ConsumeCycleDuration)
// We iterate through all the ConsumeInterval intervals, starting from the first one after the last commit until the cycleEndTime,
// i.e. [T, T+interval), [T+interval, T+2*interval), ... [T+S*interval, cycleEndTime)
// where T is the CommitRecordTimestamp, the timestamp of the record, whose offset we committed previously.
sectionEndTime, _ = nextCycleEnd(commitSectionEndTime, b.cfg.ConsumeCycleDuration)

level.Debug(b.logger).Log(
"msg", "lagging behind, consuming in sections",
"partition", partition,
"section_end", sectionEndTime,
"commit_rec_ts", commitRecTs,
"commit_section_end", commitSectionEndTime,
"cycle_end", cycleEndTime,
)
}

// Continue consuming in sections until we're caught up.
for !sectionEndTime.After(cycleEndTime) {
newCommitAt, err := b.consumePartitionSection(ctx, partition, sectionEndTime, partitionLag)
Expand Down Expand Up @@ -309,7 +336,12 @@ consumerLoop:
for recIter := fetches.RecordIter(); !recIter.Done(); {
rec := recIter.Next()
recOffset = rec.Offset
level.Info(b.logger).Log("msg", "processing record", "partition", rec.Partition, "offset", rec.Offset, "timestamp", rec.Timestamp)
level.Debug(b.logger).Log(
"msg", "processing record",
"partition", rec.Partition,
"offset", rec.Offset,
"timestamp", rec.Timestamp,
)

if firstRec == nil {
firstRec = rec
Expand Down Expand Up @@ -351,6 +383,7 @@ consumerLoop:
Partition: lastRec.Partition,
At: lastRec.Offset + 1, // offset+1 means everything up to (including) the offset was processed
LeaderEpoch: lastRec.LeaderEpoch,
Metadata: marshallCommitMeta(lastRec.Timestamp.UnixMilli()),
}
return commit.At, b.commitState(ctx, commit)
}
Expand Down Expand Up @@ -506,3 +539,7 @@ func nextCycleEnd(t time.Time, interval time.Duration) (time.Time, time.Duration
}
return cycleEnd, waitTime
}

func alignToSectionEndTime(t time.Time, interval time.Duration) time.Time {
return t.Truncate(interval).Add(interval)
}
40 changes: 40 additions & 0 deletions modules/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,46 @@ func TestCycleEndAtStartup(t *testing.T) {
}
}

func TestNextCycleEnd(t *testing.T) {
tests := []struct {
name string
t time.Time
interval time.Duration
expectedTime time.Time
expectedWait time.Duration
}{
{
name: "ExactInterval",
t: time.Date(2023, 10, 1, 12, 0, 0, 0, time.UTC),
interval: time.Hour,
expectedTime: time.Date(2023, 10, 1, 13, 0, 0, 0, time.UTC),
expectedWait: time.Hour,
},
{
name: "PastInterval",
t: time.Date(2023, 10, 1, 12, 30, 0, 0, time.UTC),
interval: time.Hour,
expectedTime: time.Date(2023, 10, 1, 13, 0, 0, 0, time.UTC),
expectedWait: 30 * time.Minute,
},
{
name: "FutureInterval",
t: time.Date(2023, 10, 1, 12, 0, 0, 1, time.UTC),
interval: time.Hour,
expectedTime: time.Date(2023, 10, 1, 13, 0, 0, 0, time.UTC),
expectedWait: 59*time.Minute + 59*time.Second + 999999999*time.Nanosecond,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
resultTime, resultWait := nextCycleEnd(tc.t, tc.interval)
require.Equal(t, tc.expectedTime, resultTime)
require.Equal(t, tc.expectedWait, resultWait)
})
}
}

func blockbuilderConfig(t *testing.T, address string) Config {
cfg := Config{}
flagext.DefaultValues(&cfg)
Expand Down
38 changes: 38 additions & 0 deletions modules/blockbuilder/commit_meta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package blockbuilder

import "fmt"

const (
kafkaCommitMetaV1 = 1
)

// marshallCommitMeta generates the commit metadata string.
// commitRecTs: timestamp of the record which was committed (and not the commit time).
func marshallCommitMeta(commitRecTs int64) string {
return fmt.Sprintf("%d,%d", kafkaCommitMetaV1, commitRecTs)
}

// unmarshallCommitMeta parses the commit metadata string.
// commitRecTs: timestamp of the record which was committed (and not the commit time).
func unmarshallCommitMeta(s string) (commitRecTs int64, err error) {
if s == "" {
return
}
var (
version int
metaStr string
)
_, err = fmt.Sscanf(s, "%d,%s", &version, &metaStr)
if err != nil {
return 0, fmt.Errorf("invalid commit metadata format: parse meta version: %w", err)
}

if version != kafkaCommitMetaV1 {
return 0, fmt.Errorf("unsupported commit meta version %d", version)
}
_, err = fmt.Sscanf(metaStr, "%d", &commitRecTs)
if err != nil {
return 0, fmt.Errorf("invalid commit metadata format: %w", err)
}
return
}
47 changes: 47 additions & 0 deletions modules/blockbuilder/commit_meta_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package blockbuilder

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestMarshallCommitMeta(t *testing.T) {
tests := []struct {
name string
commitRecTs int64
expectedMeta string
}{
{"ValidTimestamp", 1627846261, "1,1627846261"},
{"ZeroTimestamp", 0, "1,0"},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
meta := marshallCommitMeta(tc.commitRecTs)
assert.Equal(t, tc.expectedMeta, meta, "expected: %s, got: %s", tc.expectedMeta, meta)
})
}
}

func TestUnmarshallCommitMeta(t *testing.T) {
tests := []struct {
name string
meta string
expectedTs int64
expectedError bool
}{
{"ValidMeta", "1,1627846261", 1627846261, false},
{"InvalidMetaFormat", "1,invalid", 0, true},
{"UnsupportedVersion", "2,1627846261", 0, true},
{"EmptyMeta", "", 0, false},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ts, err := unmarshallCommitMeta(tc.meta)
assert.Equal(t, tc.expectedError, err != nil, "expected error: %v, got: %v", tc.expectedError, err)
assert.Equal(t, tc.expectedTs, ts, "expected: %d, got: %d", tc.expectedTs, ts)
})
}
}
64 changes: 64 additions & 0 deletions modules/blockbuilder/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package blockbuilder

import (
"testing"

"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
v2 "github.com/grafana/tempo/tempodb/encoding/v2"
"github.com/grafana/tempo/tempodb/encoding/vparquet4"
"github.com/grafana/tempo/tempodb/wal"
"github.com/stretchr/testify/assert"
)

func TestConfig_validate(t *testing.T) {
tests := []struct {
name string
cfg Config
expectedErr bool
}{
{
name: "ValidConfig",
cfg: Config{
BlockConfig: BlockConfig{
BlockCfg: common.BlockConfig{
Version: encoding.LatestEncoding().Version(),
IndexDownsampleBytes: 1,
IndexPageSizeBytes: 1,
BloomFP: 0.1,
BloomShardSizeBytes: 1,
DedicatedColumns: backend.DedicatedColumns{
{Scope: backend.DedicatedColumnScopeResource, Name: "foo", Type: backend.DedicatedColumnTypeString},
},
},
},
WAL: wal.Config{
Version: encoding.LatestEncoding().Version(),
},
},
expectedErr: false,
},
{
name: "InvalidBlockConfig",
cfg: Config{
BlockConfig: BlockConfig{
BlockCfg: common.BlockConfig{
Version: vparquet4.VersionString,
IndexDownsampleBytes: 0,
},
},
WAL: wal.Config{
Version: v2.VersionString,
},
},
expectedErr: true,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
err := tc.cfg.Validate()
assert.Equal(t, tc.expectedErr, err != nil, "unexpected error: %v", err)
})
}
}
19 changes: 17 additions & 2 deletions modules/blockbuilder/partition_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package blockbuilder
import (
"context"
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -50,11 +51,11 @@ func newPartitionSectionWriter(logger log.Logger, partition, cycleEndTs int64, b
}

func (p *writer) pushBytes(tenant string, req *tempopb.PushBytesRequest) error {
level.Info(p.logger).Log(
level.Debug(p.logger).Log(
"msg", "pushing bytes",
"tenant", tenant,
"num_traces", len(req.Traces),
"id", util.TraceIDToHexString(req.Ids[0]),
"id", idsToString(req.Ids),
)

i, err := p.instanceForTenant(tenant)
Expand Down Expand Up @@ -121,3 +122,17 @@ func (p *writer) instanceForTenant(tenant string) (*tenantStore, error) {

return i, nil
}

func idsToString(ids [][]byte) string {
b := strings.Builder{}
b.WriteString("[")
for i, id := range ids {
if i > 0 {
b.WriteString(", ")
}
b.WriteString(util.TraceIDToHexString(id))
}
b.WriteString("]")

return b.String()
}
6 changes: 4 additions & 2 deletions modules/blockbuilder/tenant_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var metricBlockBuilderFlushedBlocks = promauto.NewCounterVec(
Namespace: "tempo",
Subsystem: "block_builder",
Name: "flushed_blocks",
}, []string{"tenant_id"},
}, []string{"tenant"},
)

// TODO - This needs locking
Expand All @@ -49,7 +49,7 @@ type tenantStore struct {
func newTenantStore(tenantID string, partitionID, endTimestamp int64, cfg BlockConfig, logger log.Logger, wal *wal.WAL, enc encoding.VersionedEncoding, o Overrides) (*tenantStore, error) {
s := &tenantStore{
tenantID: tenantID,
idGenerator: util.NewDeterministicIDGenerator(partitionID, endTimestamp),
idGenerator: util.NewDeterministicIDGenerator(tenantID, partitionID, endTimestamp),
cfg: cfg,
logger: logger,
overrides: o,
Expand Down Expand Up @@ -156,6 +156,8 @@ func (s *tenantStore) Flush(ctx context.Context, store tempodb.Writer) error {
}

func (s *tenantStore) buildWriteableBlock(ctx context.Context, b common.WALBlock) (tempodb.WriteableBlock, error) {
level.Debug(s.logger).Log("msg", "building writeable block", "block_id", b.BlockMeta().BlockID.String())

iter, err := b.Iterator()
if err != nil {
return nil, err
Expand Down
8 changes: 7 additions & 1 deletion modules/blockbuilder/util/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ type DeterministicIDGenerator struct {
seq *atomic.Int64
}

func NewDeterministicIDGenerator(seeds ...int64) *DeterministicIDGenerator {
func NewDeterministicIDGenerator(tenantID string, seeds ...int64) *DeterministicIDGenerator {
seeds = append(seeds, int64(binary.LittleEndian.Uint64(stringToBytes(tenantID))))
return &DeterministicIDGenerator{
seeds: seeds,
seq: atomic.NewInt64(0),
Expand All @@ -44,6 +45,11 @@ func newDeterministicID(seeds []int64) uuid.UUID {
return uuid.NewHash(hash, ns, b, 5)
}

// TODO - Try to avoid allocs here
func stringToBytes(s string) []byte {
return []byte(s)
}

func int64ToBytes(seeds ...int64) []byte {
l := len(seeds)
bytes := make([]byte, l*8)
Expand Down
Loading

0 comments on commit cab05ca

Please sign in to comment.