Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

block-builder: Add an option to consume a partition completely till cycle end offset with no partially region #10506

Merged
merged 3 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion pkg/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ func newWithSchedulerClient(
limits *validation.Overrides,
schedulerClient schedulerpb.SchedulerClient,
) (*BlockBuilder, error) {
if cfg.NoPartiallyConsumedRegion {
// We should not have a large buffer if we are putting all the records into a block.
cfg.ConsumeIntervalBuffer = 5 * time.Minute
}

b := &BlockBuilder{
cfg: cfg,
logger: logger,
Expand Down Expand Up @@ -557,7 +562,9 @@ consumerLoop:
}

recordAlreadyProcessed := rec.Offset <= state.LastSeenOffset
allSamplesProcessed, err := builder.Process(ctx, rec, state.LastBlockEnd.UnixMilli(), blockEnd.UnixMilli(), recordAlreadyProcessed)
allSamplesProcessed, err := builder.Process(
ctx, rec, state.LastBlockEnd.UnixMilli(), blockEnd.UnixMilli(),
recordAlreadyProcessed, b.cfg.NoPartiallyConsumedRegion)
if err != nil {
// All "non-terminal" errors are handled by the TSDBBuilder.
return state, fmt.Errorf("process record in partition %d at offset %d: %w", rec.Partition, rec.Offset, err)
Expand Down
202 changes: 111 additions & 91 deletions pkg/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,9 @@ func TestBlockBuilder_StartWithExistingCommit(t *testing.T) {
// Because there is a commit, on startup, block-builder must consume samples only after the commit.
expSamples := producedSamples[1+(len(producedSamples)/2):]

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1")
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
compareQuery(t,
db,
expSamples,
nil,
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"),
expSamples, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
}
Expand Down Expand Up @@ -257,14 +252,9 @@ func TestBlockBuilder_StartWithExistingCommit_PullMode(t *testing.T) {
// Because there is a commit, on startup, block-builder must consume samples only after the commit.
expSamples := producedSamples[1+(len(producedSamples)/2):]

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1")
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
compareQuery(t,
db,
expSamples,
nil,
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"),
expSamples, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
}
Expand Down Expand Up @@ -318,15 +308,10 @@ func TestBlockBuilder_StartWithLookbackOnNoCommit(t *testing.T) {
cortex_blockbuilder_consumer_lag_records{partition="1"} 0
`), "cortex_blockbuilder_consumer_lag_records"))

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1")
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
// There should be no samples in the tsdb.
compareQuery(t,
db,
nil,
nil,
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"),
nil, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
}
Expand Down Expand Up @@ -394,15 +379,9 @@ func TestBlockBuilder_StartWithLookbackOnNoCommit_PullMode(t *testing.T) {
scheduler.completeJobCalls,
)

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1")
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
// There should be no samples in the tsdb.
compareQuery(t,
db,
nil,
nil,
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"),
nil, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
}
Expand Down Expand Up @@ -470,14 +449,9 @@ func TestBlockBuilder_ReachHighWatermarkBeforeLastCycleSection(t *testing.T) {
cortex_blockbuilder_consumer_lag_records{partition="1"} 1
`), "cortex_blockbuilder_consumer_lag_records"))

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1")
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
compareQuery(t,
db,
producedSamples,
nil,
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"),
producedSamples, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
}
Expand Down Expand Up @@ -581,14 +555,9 @@ func TestBlockBuilder_ReachHighWatermarkBeforeLastCycleSection_PullMode(t *testi
scheduler.completeJobCalls,
)

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1")
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
compareQuery(t,
db,
producedSamples,
nil,
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"),
producedSamples, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
}
Expand Down Expand Up @@ -641,15 +610,9 @@ func TestBlockBuilder_WithMultipleTenants(t *testing.T) {
require.Eventually(t, func() bool { return kafkaCommits.Load() > 1 }, 5*time.Second, 100*time.Millisecond, "expected kafka commits")

for _, tenant := range tenants {
bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, tenant)
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })

compareQuery(t,
db,
producedPerTenantSamples[tenant],
nil,
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, tenant),
producedPerTenantSamples[tenant], nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
}
Expand Down Expand Up @@ -723,15 +686,9 @@ func TestBlockBuilder_WithMultipleTenants_PullMode(t *testing.T) {
)

for _, tenant := range tenants {
bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, tenant)
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })

compareQuery(t,
db,
producedPerTenantSamples[tenant],
nil,
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, tenant),
producedPerTenantSamples[tenant], nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
}
Expand Down Expand Up @@ -817,16 +774,9 @@ func TestBlockBuilder_WithNonMonotonicRecordTimestamps(t *testing.T) {
runTest := func(name string, end time.Time, expSamples []mimirpb.Sample) {
t.Run(name, func(t *testing.T) {
require.NoError(t, bb.nextConsumeCycle(ctx, end))

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, tenantID)
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })

compareQuery(t,
db,
expSamples,
nil,
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, tenantID),
expSamples, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
})
Expand Down Expand Up @@ -889,14 +839,9 @@ func TestBlockBuilder_RetryOnTransientErrors(t *testing.T) {
// We expect at least several cycles because of how the pushed records were structured.
require.Eventually(t, func() bool { return kafkaCommits.Load() >= 1 }, 50*time.Second, 100*time.Millisecond, "expected kafka commits")

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1")
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
compareQuery(t,
db,
producedSamples,
nil,
Comment on lines -892 to -899
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, you had mentioned this, but I didn't quite see the blatant duplication. 👍

compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"),
producedSamples, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
}
Expand Down Expand Up @@ -1180,16 +1125,91 @@ func TestPullMode(t *testing.T) {
scheduler.completeJobCalls,
)

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1")
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"),
expSamples, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
}

func TestNoPartiallyConsumedRegions(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancelCause(context.Background())
t.Cleanup(func() { cancel(errors.New("test done")) })

kafkaCluster, kafkaAddr := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, numPartitions, testTopic)
kafkaClient := mustKafkaClient(t, kafkaAddr)
kafkaClient.AddConsumeTopics(testTopic)

cfg, overrides := blockBuilderConfig(t, kafkaAddr)
cfg.NoPartiallyConsumedRegion = true

// Set up a hook to track commits from block-builder to kafka. Those indicate the end of a cycle.
kafkaCommits := atomic.NewInt32(0)
kafkaCluster.ControlKey(kmsg.OffsetCommit.Int16(), func(kmsg.Request) (kmsg.Response, error, bool) {
kafkaCommits.Add(1)
return nil, nil, false
})

bb, err := New(cfg, test.NewTestingLogger(t), prometheus.NewPedanticRegistry(), overrides)
require.NoError(t, err)

// NoPartiallyConsumedRegion changes the buffer to 5 mins.
require.Equal(t, 5*time.Minute, bb.cfg.ConsumeIntervalBuffer)

require.NoError(t, bb.starting(ctx))
t.Cleanup(func() {
require.NoError(t, bb.stoppingStandaloneMode(nil))
})

// Producing some records
cycleEnd := time.Now().Truncate(cfg.ConsumeInterval).Add(10 * time.Minute)
var producedSamples []mimirpb.Sample
kafkaRecTime := cycleEnd.Truncate(cfg.ConsumeInterval).Add(-2 * time.Hour)
for kafkaRecTime.Before(cycleEnd) {
samples := produceSamples(ctx, t, kafkaClient, 0, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute))
producedSamples = append(producedSamples, samples...)

kafkaRecTime = kafkaRecTime.Add(time.Minute)
}
require.NotEmpty(t, producedSamples)

// Since there is no partial region, all the samples until the 5th minute is consumed.
// Since we are excluding 5 mins, there will be only 4 samples missing.
err = bb.nextConsumeCycle(ctx, cycleEnd.Add(-5*time.Minute))
require.NoError(t, err)
expSamples := producedSamples[:len(producedSamples)-4]

db, err := tsdb.Open(path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"), promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
compareQuery(t,
db,
expSamples,
nil,
db, expSamples, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
require.NoError(t, db.Close())

lag, err := bb.getLagForPartition(ctx, 0)
require.NoError(t, err)
state := PartitionStateFromLag(bb.logger, lag, bb.fallbackOffsetMillis)
require.Equal(t, len(producedSamples)-4, int(state.Commit.At)) // Commit point is where to start next.
require.Equal(t, len(producedSamples)-5, int(state.LastSeenOffset))

// Consuming until end gets all the samples in the block.
err = bb.nextConsumeCycle(ctx, cycleEnd)
require.NoError(t, err)

compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"),
producedSamples, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)

lag, err = bb.getLagForPartition(ctx, 0)
require.NoError(t, err)
state = PartitionStateFromLag(bb.logger, lag, bb.fallbackOffsetMillis)
require.Equal(t, len(producedSamples), int(state.Commit.At)) // Commit point is where to start next.
require.Equal(t, len(producedSamples)-1, int(state.LastSeenOffset))
}

func blockBuilderPullModeConfig(t *testing.T, addr string) (Config, *validation.Overrides) {
Expand Down
4 changes: 3 additions & 1 deletion pkg/blockbuilder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type Config struct {

ApplyMaxGlobalSeriesPerUserBelow int `yaml:"apply_max_global_series_per_user_below" category:"experimental"`

NoPartiallyConsumedRegion bool `yaml:"no_partially_consumed_region" category:"experimental"`

// Config parameters defined outside the block-builder config and are injected dynamically.
Kafka ingest.KafkaConfig `yaml:"-"`
BlocksStorage tsdb.BlocksStorageConfig `yaml:"-"`
Expand Down Expand Up @@ -58,7 +60,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.DurationVar(&cfg.ConsumeIntervalBuffer, "block-builder.consume-interval-buffer", 15*time.Minute, "Extra buffer between subsequent consumption cycles. To avoid small blocks the block-builder consumes until the last hour boundary of the consumption interval, plus the buffer.")
f.DurationVar(&cfg.LookbackOnNoCommit, "block-builder.lookback-on-no-commit", 12*time.Hour, "How much of the historical records to look back when there is no kafka commit for a partition.")
f.IntVar(&cfg.ApplyMaxGlobalSeriesPerUserBelow, "block-builder.apply-max-global-series-per-user-below", 0, "Apply the global series limit per partition if the global series limit for the user is <= this given value. 0 means limits are disabled. If a user's limit is more than the given value, then the limits are not applied as well.")

f.BoolVar(&cfg.NoPartiallyConsumedRegion, "block-builder.no-partially-consumed-region", false, "Get rid of the 'last seen' logic and instead consume everything between two offsets to build the block.")
cfg.SchedulerConfig.GRPCClientConfig.RegisterFlags(f)
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/blockbuilder/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func NewTSDBBuilder(logger log.Logger, dataDir string, blocksStorageCfg mimir_ts
// lastBlockMax: max time of the block in the previous block building cycle.
// blockMax: max time of the block in the current block building cycle. This blockMax is exclusive of the last sample by design in TSDB.
// recordAlreadyProcessed: true if the record was processed in the previous cycle. (It gets processed again if some samples did not fit in the previous cycle.)
func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax, blockMax int64, recordAlreadyProcessed bool) (_ bool, err error) {
func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax, blockMax int64, recordAlreadyProcessed, processEverything bool) (_ bool, err error) {
userID := string(rec.Key)

req := mimirpb.PreallocWriteRequest{
Expand Down Expand Up @@ -133,12 +133,12 @@ func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax
ref, copiedLabels := app.GetRef(nonCopiedLabels, hash)

for _, s := range ts.Samples {
if s.TimestampMs >= blockMax {
if !processEverything && s.TimestampMs >= blockMax {
// We will process this sample in the next cycle.
allSamplesProcessed = false
continue
}
if recordAlreadyProcessed && s.TimestampMs < lastBlockMax {
if !processEverything && recordAlreadyProcessed && s.TimestampMs < lastBlockMax {
Copy link
Contributor

@seizethedave seizethedave Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand it the first time we restart blockbuilders with this flag enabled, we will produce a small amount of duplicate records. (records that had recordAlreadyProcessed from the previous cycle, but we're no longer skipping past those.) I do not see an avenue where we'd skip any records on the first run. Am I right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, migrating from old the new way will create some duplicate samples in blocks which is harmless since compactor will merge them together.

// This sample was already processed in the previous cycle.
continue
}
Expand Down Expand Up @@ -170,12 +170,12 @@ func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax
}

for _, h := range ts.Histograms {
if h.Timestamp >= blockMax {
if !processEverything && h.Timestamp >= blockMax {
// We will process this sample in the next cycle.
allSamplesProcessed = false
continue
}
if recordAlreadyProcessed && h.Timestamp < lastBlockMax {
if !processEverything && recordAlreadyProcessed && h.Timestamp < lastBlockMax {
// This sample was already processed in the previous cycle.
continue
}
Expand Down
Loading
Loading