Skip to content

Commit

Permalink
block-builder: Add an option to consume a partition completely till c…
Browse files Browse the repository at this point in the history
…ycle end offset with no partially region (#10506)

* Add an option to not leave any partially consumed region for block builder

Signed-off-by: Ganesh Vernekar <[email protected]>

* Add TestNoPartiallyConsumedRegions

Signed-off-by: Ganesh Vernekar <[email protected]>

* Fix imports

Signed-off-by: Ganesh Vernekar <[email protected]>

---------

Signed-off-by: Ganesh Vernekar <[email protected]>
  • Loading branch information
codesome authored Jan 24, 2025
1 parent ad6778e commit 5ff8293
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 106 deletions.
9 changes: 8 additions & 1 deletion pkg/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ func newWithSchedulerClient(
limits *validation.Overrides,
schedulerClient schedulerpb.SchedulerClient,
) (*BlockBuilder, error) {
if cfg.NoPartiallyConsumedRegion {
// We should not have a large buffer if we are putting all the records into a block.
cfg.ConsumeIntervalBuffer = 5 * time.Minute
}

b := &BlockBuilder{
cfg: cfg,
logger: logger,
Expand Down Expand Up @@ -557,7 +562,9 @@ consumerLoop:
}

recordAlreadyProcessed := rec.Offset <= state.LastSeenOffset
allSamplesProcessed, err := builder.Process(ctx, rec, state.LastBlockEnd.UnixMilli(), blockEnd.UnixMilli(), recordAlreadyProcessed)
allSamplesProcessed, err := builder.Process(
ctx, rec, state.LastBlockEnd.UnixMilli(), blockEnd.UnixMilli(),
recordAlreadyProcessed, b.cfg.NoPartiallyConsumedRegion)
if err != nil {
// All "non-terminal" errors are handled by the TSDBBuilder.
return state, fmt.Errorf("process record in partition %d at offset %d: %w", rec.Partition, rec.Offset, err)
Expand Down
202 changes: 111 additions & 91 deletions pkg/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,9 @@ func TestBlockBuilder_StartWithExistingCommit(t *testing.T) {
// Because there is a commit, on startup, block-builder must consume samples only after the commit.
expSamples := producedSamples[1+(len(producedSamples)/2):]

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1")
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
compareQuery(t,
db,
expSamples,
nil,
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"),
expSamples, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
}
Expand Down Expand Up @@ -257,14 +252,9 @@ func TestBlockBuilder_StartWithExistingCommit_PullMode(t *testing.T) {
// Because there is a commit, on startup, block-builder must consume samples only after the commit.
expSamples := producedSamples[1+(len(producedSamples)/2):]

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1")
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
compareQuery(t,
db,
expSamples,
nil,
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"),
expSamples, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
}
Expand Down Expand Up @@ -318,15 +308,10 @@ func TestBlockBuilder_StartWithLookbackOnNoCommit(t *testing.T) {
cortex_blockbuilder_consumer_lag_records{partition="1"} 0
`), "cortex_blockbuilder_consumer_lag_records"))

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1")
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
// There should be no samples in the tsdb.
compareQuery(t,
db,
nil,
nil,
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"),
nil, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
}
Expand Down Expand Up @@ -394,15 +379,9 @@ func TestBlockBuilder_StartWithLookbackOnNoCommit_PullMode(t *testing.T) {
scheduler.completeJobCalls,
)

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1")
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
// There should be no samples in the tsdb.
compareQuery(t,
db,
nil,
nil,
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"),
nil, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
}
Expand Down Expand Up @@ -470,14 +449,9 @@ func TestBlockBuilder_ReachHighWatermarkBeforeLastCycleSection(t *testing.T) {
cortex_blockbuilder_consumer_lag_records{partition="1"} 1
`), "cortex_blockbuilder_consumer_lag_records"))

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1")
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
compareQuery(t,
db,
producedSamples,
nil,
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"),
producedSamples, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
}
Expand Down Expand Up @@ -581,14 +555,9 @@ func TestBlockBuilder_ReachHighWatermarkBeforeLastCycleSection_PullMode(t *testi
scheduler.completeJobCalls,
)

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1")
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
compareQuery(t,
db,
producedSamples,
nil,
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"),
producedSamples, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
}
Expand Down Expand Up @@ -641,15 +610,9 @@ func TestBlockBuilder_WithMultipleTenants(t *testing.T) {
require.Eventually(t, func() bool { return kafkaCommits.Load() > 1 }, 5*time.Second, 100*time.Millisecond, "expected kafka commits")

for _, tenant := range tenants {
bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, tenant)
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })

compareQuery(t,
db,
producedPerTenantSamples[tenant],
nil,
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, tenant),
producedPerTenantSamples[tenant], nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
}
Expand Down Expand Up @@ -723,15 +686,9 @@ func TestBlockBuilder_WithMultipleTenants_PullMode(t *testing.T) {
)

for _, tenant := range tenants {
bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, tenant)
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })

compareQuery(t,
db,
producedPerTenantSamples[tenant],
nil,
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, tenant),
producedPerTenantSamples[tenant], nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
}
Expand Down Expand Up @@ -817,16 +774,9 @@ func TestBlockBuilder_WithNonMonotonicRecordTimestamps(t *testing.T) {
runTest := func(name string, end time.Time, expSamples []mimirpb.Sample) {
t.Run(name, func(t *testing.T) {
require.NoError(t, bb.nextConsumeCycle(ctx, end))

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, tenantID)
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })

compareQuery(t,
db,
expSamples,
nil,
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, tenantID),
expSamples, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
})
Expand Down Expand Up @@ -889,14 +839,9 @@ func TestBlockBuilder_RetryOnTransientErrors(t *testing.T) {
// We expect at least several cycles because of how the pushed records were structured.
require.Eventually(t, func() bool { return kafkaCommits.Load() >= 1 }, 50*time.Second, 100*time.Millisecond, "expected kafka commits")

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1")
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
compareQuery(t,
db,
producedSamples,
nil,
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"),
producedSamples, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
}
Expand Down Expand Up @@ -1180,16 +1125,91 @@ func TestPullMode(t *testing.T) {
scheduler.completeJobCalls,
)

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1")
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"),
expSamples, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
}

func TestNoPartiallyConsumedRegions(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancelCause(context.Background())
t.Cleanup(func() { cancel(errors.New("test done")) })

kafkaCluster, kafkaAddr := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, numPartitions, testTopic)
kafkaClient := mustKafkaClient(t, kafkaAddr)
kafkaClient.AddConsumeTopics(testTopic)

cfg, overrides := blockBuilderConfig(t, kafkaAddr)
cfg.NoPartiallyConsumedRegion = true

// Set up a hook to track commits from block-builder to kafka. Those indicate the end of a cycle.
kafkaCommits := atomic.NewInt32(0)
kafkaCluster.ControlKey(kmsg.OffsetCommit.Int16(), func(kmsg.Request) (kmsg.Response, error, bool) {
kafkaCommits.Add(1)
return nil, nil, false
})

bb, err := New(cfg, test.NewTestingLogger(t), prometheus.NewPedanticRegistry(), overrides)
require.NoError(t, err)

// NoPartiallyConsumedRegion changes the buffer to 5 mins.
require.Equal(t, 5*time.Minute, bb.cfg.ConsumeIntervalBuffer)

require.NoError(t, bb.starting(ctx))
t.Cleanup(func() {
require.NoError(t, bb.stoppingStandaloneMode(nil))
})

// Producing some records
cycleEnd := time.Now().Truncate(cfg.ConsumeInterval).Add(10 * time.Minute)
var producedSamples []mimirpb.Sample
kafkaRecTime := cycleEnd.Truncate(cfg.ConsumeInterval).Add(-2 * time.Hour)
for kafkaRecTime.Before(cycleEnd) {
samples := produceSamples(ctx, t, kafkaClient, 0, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute))
producedSamples = append(producedSamples, samples...)

kafkaRecTime = kafkaRecTime.Add(time.Minute)
}
require.NotEmpty(t, producedSamples)

// Since there is no partial region, all the samples until the 5th minute is consumed.
// Since we are excluding 5 mins, there will be only 4 samples missing.
err = bb.nextConsumeCycle(ctx, cycleEnd.Add(-5*time.Minute))
require.NoError(t, err)
expSamples := producedSamples[:len(producedSamples)-4]

db, err := tsdb.Open(path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"), promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
compareQuery(t,
db,
expSamples,
nil,
db, expSamples, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
require.NoError(t, db.Close())

lag, err := bb.getLagForPartition(ctx, 0)
require.NoError(t, err)
state := PartitionStateFromLag(bb.logger, lag, bb.fallbackOffsetMillis)
require.Equal(t, len(producedSamples)-4, int(state.Commit.At)) // Commit point is where to start next.
require.Equal(t, len(producedSamples)-5, int(state.LastSeenOffset))

// Consuming until end gets all the samples in the block.
err = bb.nextConsumeCycle(ctx, cycleEnd)
require.NoError(t, err)

compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"),
producedSamples, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)

lag, err = bb.getLagForPartition(ctx, 0)
require.NoError(t, err)
state = PartitionStateFromLag(bb.logger, lag, bb.fallbackOffsetMillis)
require.Equal(t, len(producedSamples), int(state.Commit.At)) // Commit point is where to start next.
require.Equal(t, len(producedSamples)-1, int(state.LastSeenOffset))
}

func blockBuilderPullModeConfig(t *testing.T, addr string) (Config, *validation.Overrides) {
Expand Down
4 changes: 3 additions & 1 deletion pkg/blockbuilder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type Config struct {

ApplyMaxGlobalSeriesPerUserBelow int `yaml:"apply_max_global_series_per_user_below" category:"experimental"`

NoPartiallyConsumedRegion bool `yaml:"no_partially_consumed_region" category:"experimental"`

// Config parameters defined outside the block-builder config and are injected dynamically.
Kafka ingest.KafkaConfig `yaml:"-"`
BlocksStorage tsdb.BlocksStorageConfig `yaml:"-"`
Expand Down Expand Up @@ -58,7 +60,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.DurationVar(&cfg.ConsumeIntervalBuffer, "block-builder.consume-interval-buffer", 15*time.Minute, "Extra buffer between subsequent consumption cycles. To avoid small blocks the block-builder consumes until the last hour boundary of the consumption interval, plus the buffer.")
f.DurationVar(&cfg.LookbackOnNoCommit, "block-builder.lookback-on-no-commit", 12*time.Hour, "How much of the historical records to look back when there is no kafka commit for a partition.")
f.IntVar(&cfg.ApplyMaxGlobalSeriesPerUserBelow, "block-builder.apply-max-global-series-per-user-below", 0, "Apply the global series limit per partition if the global series limit for the user is <= this given value. 0 means limits are disabled. If a user's limit is more than the given value, then the limits are not applied as well.")

f.BoolVar(&cfg.NoPartiallyConsumedRegion, "block-builder.no-partially-consumed-region", false, "Get rid of the 'last seen' logic and instead consume everything between two offsets to build the block.")
cfg.SchedulerConfig.GRPCClientConfig.RegisterFlags(f)
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/blockbuilder/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func NewTSDBBuilder(logger log.Logger, dataDir string, blocksStorageCfg mimir_ts
// lastBlockMax: max time of the block in the previous block building cycle.
// blockMax: max time of the block in the current block building cycle. This blockMax is exclusive of the last sample by design in TSDB.
// recordAlreadyProcessed: true if the record was processed in the previous cycle. (It gets processed again if some samples did not fit in the previous cycle.)
func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax, blockMax int64, recordAlreadyProcessed bool) (_ bool, err error) {
func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax, blockMax int64, recordAlreadyProcessed, processEverything bool) (_ bool, err error) {
userID := string(rec.Key)

req := mimirpb.PreallocWriteRequest{
Expand Down Expand Up @@ -133,12 +133,12 @@ func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax
ref, copiedLabels := app.GetRef(nonCopiedLabels, hash)

for _, s := range ts.Samples {
if s.TimestampMs >= blockMax {
if !processEverything && s.TimestampMs >= blockMax {
// We will process this sample in the next cycle.
allSamplesProcessed = false
continue
}
if recordAlreadyProcessed && s.TimestampMs < lastBlockMax {
if !processEverything && recordAlreadyProcessed && s.TimestampMs < lastBlockMax {
// This sample was already processed in the previous cycle.
continue
}
Expand Down Expand Up @@ -170,12 +170,12 @@ func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax
}

for _, h := range ts.Histograms {
if h.Timestamp >= blockMax {
if !processEverything && h.Timestamp >= blockMax {
// We will process this sample in the next cycle.
allSamplesProcessed = false
continue
}
if recordAlreadyProcessed && h.Timestamp < lastBlockMax {
if !processEverything && recordAlreadyProcessed && h.Timestamp < lastBlockMax {
// This sample was already processed in the previous cycle.
continue
}
Expand Down
Loading

0 comments on commit 5ff8293

Please sign in to comment.