diff --git a/pkg/blockbuilder/blockbuilder.go b/pkg/blockbuilder/blockbuilder.go index b51e5788e6..34b1071195 100644 --- a/pkg/blockbuilder/blockbuilder.go +++ b/pkg/blockbuilder/blockbuilder.go @@ -74,6 +74,11 @@ func newWithSchedulerClient( limits *validation.Overrides, schedulerClient schedulerpb.SchedulerClient, ) (*BlockBuilder, error) { + if cfg.NoPartiallyConsumedRegion { + // We should not have a large buffer if we are putting all the records into a block. + cfg.ConsumeIntervalBuffer = 5 * time.Minute + } + b := &BlockBuilder{ cfg: cfg, logger: logger, @@ -557,7 +562,9 @@ consumerLoop: } recordAlreadyProcessed := rec.Offset <= state.LastSeenOffset - allSamplesProcessed, err := builder.Process(ctx, rec, state.LastBlockEnd.UnixMilli(), blockEnd.UnixMilli(), recordAlreadyProcessed) + allSamplesProcessed, err := builder.Process( + ctx, rec, state.LastBlockEnd.UnixMilli(), blockEnd.UnixMilli(), + recordAlreadyProcessed, b.cfg.NoPartiallyConsumedRegion) if err != nil { // All "non-terminal" errors are handled by the TSDBBuilder. return state, fmt.Errorf("process record in partition %d at offset %d: %w", rec.Partition, rec.Offset, err) diff --git a/pkg/blockbuilder/blockbuilder_test.go b/pkg/blockbuilder/blockbuilder_test.go index f9ae4bbeb6..f2026fd2f5 100644 --- a/pkg/blockbuilder/blockbuilder_test.go +++ b/pkg/blockbuilder/blockbuilder_test.go @@ -166,14 +166,9 @@ func TestBlockBuilder_StartWithExistingCommit(t *testing.T) { // Because there is a commit, on startup, block-builder must consume samples only after the commit. expSamples := producedSamples[1+(len(producedSamples)/2):] - bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1") - db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, db.Close()) }) - compareQuery(t, - db, - expSamples, - nil, + compareQueryWithDir(t, + path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"), + expSamples, nil, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"), ) } @@ -257,14 +252,9 @@ func TestBlockBuilder_StartWithExistingCommit_PullMode(t *testing.T) { // Because there is a commit, on startup, block-builder must consume samples only after the commit. expSamples := producedSamples[1+(len(producedSamples)/2):] - bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1") - db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, db.Close()) }) - compareQuery(t, - db, - expSamples, - nil, + compareQueryWithDir(t, + path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"), + expSamples, nil, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"), ) } @@ -318,15 +308,10 @@ func TestBlockBuilder_StartWithLookbackOnNoCommit(t *testing.T) { cortex_blockbuilder_consumer_lag_records{partition="1"} 0 `), "cortex_blockbuilder_consumer_lag_records")) - bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1") - db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, db.Close()) }) // There should be no samples in the tsdb. - compareQuery(t, - db, - nil, - nil, + compareQueryWithDir(t, + path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"), + nil, nil, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"), ) } @@ -394,15 +379,9 @@ func TestBlockBuilder_StartWithLookbackOnNoCommit_PullMode(t *testing.T) { scheduler.completeJobCalls, ) - bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1") - db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, db.Close()) }) - // There should be no samples in the tsdb. - compareQuery(t, - db, - nil, - nil, + compareQueryWithDir(t, + path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"), + nil, nil, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"), ) } @@ -470,14 +449,9 @@ func TestBlockBuilder_ReachHighWatermarkBeforeLastCycleSection(t *testing.T) { cortex_blockbuilder_consumer_lag_records{partition="1"} 1 `), "cortex_blockbuilder_consumer_lag_records")) - bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1") - db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, db.Close()) }) - compareQuery(t, - db, - producedSamples, - nil, + compareQueryWithDir(t, + path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"), + producedSamples, nil, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"), ) } @@ -581,14 +555,9 @@ func TestBlockBuilder_ReachHighWatermarkBeforeLastCycleSection_PullMode(t *testi scheduler.completeJobCalls, ) - bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1") - db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, db.Close()) }) - compareQuery(t, - db, - producedSamples, - nil, + compareQueryWithDir(t, + path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"), + producedSamples, nil, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"), ) } @@ -641,15 +610,9 @@ func TestBlockBuilder_WithMultipleTenants(t *testing.T) { require.Eventually(t, func() bool { return kafkaCommits.Load() > 1 }, 5*time.Second, 100*time.Millisecond, "expected kafka commits") for _, tenant := range tenants { - bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, tenant) - db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, db.Close()) }) - - compareQuery(t, - db, - producedPerTenantSamples[tenant], - nil, + compareQueryWithDir(t, + path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, tenant), + producedPerTenantSamples[tenant], nil, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"), ) } @@ -723,15 +686,9 @@ func TestBlockBuilder_WithMultipleTenants_PullMode(t *testing.T) { ) for _, tenant := range tenants { - bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, tenant) - db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, db.Close()) }) - - compareQuery(t, - db, - producedPerTenantSamples[tenant], - nil, + compareQueryWithDir(t, + path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, tenant), + producedPerTenantSamples[tenant], nil, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"), ) } @@ -817,16 +774,9 @@ func TestBlockBuilder_WithNonMonotonicRecordTimestamps(t *testing.T) { runTest := func(name string, end time.Time, expSamples []mimirpb.Sample) { t.Run(name, func(t *testing.T) { require.NoError(t, bb.nextConsumeCycle(ctx, end)) - - bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, tenantID) - db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, db.Close()) }) - - compareQuery(t, - db, - expSamples, - nil, + compareQueryWithDir(t, + path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, tenantID), + expSamples, nil, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"), ) }) @@ -889,14 +839,9 @@ func TestBlockBuilder_RetryOnTransientErrors(t *testing.T) { // We expect at least several cycles because of how the pushed records were structured. require.Eventually(t, func() bool { return kafkaCommits.Load() >= 1 }, 50*time.Second, 100*time.Millisecond, "expected kafka commits") - bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1") - db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, db.Close()) }) - compareQuery(t, - db, - producedSamples, - nil, + compareQueryWithDir(t, + path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"), + producedSamples, nil, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"), ) } @@ -1180,16 +1125,91 @@ func TestPullMode(t *testing.T) { scheduler.completeJobCalls, ) - bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1") - db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil) + compareQueryWithDir(t, + path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"), + expSamples, nil, + labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"), + ) +} + +func TestNoPartiallyConsumedRegions(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancelCause(context.Background()) + t.Cleanup(func() { cancel(errors.New("test done")) }) + + kafkaCluster, kafkaAddr := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, numPartitions, testTopic) + kafkaClient := mustKafkaClient(t, kafkaAddr) + kafkaClient.AddConsumeTopics(testTopic) + + cfg, overrides := blockBuilderConfig(t, kafkaAddr) + cfg.NoPartiallyConsumedRegion = true + + // Set up a hook to track commits from block-builder to kafka. Those indicate the end of a cycle. + kafkaCommits := atomic.NewInt32(0) + kafkaCluster.ControlKey(kmsg.OffsetCommit.Int16(), func(kmsg.Request) (kmsg.Response, error, bool) { + kafkaCommits.Add(1) + return nil, nil, false + }) + + bb, err := New(cfg, test.NewTestingLogger(t), prometheus.NewPedanticRegistry(), overrides) + require.NoError(t, err) + + // NoPartiallyConsumedRegion changes the buffer to 5 mins. + require.Equal(t, 5*time.Minute, bb.cfg.ConsumeIntervalBuffer) + + require.NoError(t, bb.starting(ctx)) + t.Cleanup(func() { + require.NoError(t, bb.stoppingStandaloneMode(nil)) + }) + + // Producing some records + cycleEnd := time.Now().Truncate(cfg.ConsumeInterval).Add(10 * time.Minute) + var producedSamples []mimirpb.Sample + kafkaRecTime := cycleEnd.Truncate(cfg.ConsumeInterval).Add(-2 * time.Hour) + for kafkaRecTime.Before(cycleEnd) { + samples := produceSamples(ctx, t, kafkaClient, 0, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute)) + producedSamples = append(producedSamples, samples...) + + kafkaRecTime = kafkaRecTime.Add(time.Minute) + } + require.NotEmpty(t, producedSamples) + + // Since there is no partial region, all the samples until the 5th minute is consumed. + // Since we are excluding 5 mins, there will be only 4 samples missing. + err = bb.nextConsumeCycle(ctx, cycleEnd.Add(-5*time.Minute)) + require.NoError(t, err) + expSamples := producedSamples[:len(producedSamples)-4] + + db, err := tsdb.Open(path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"), promslog.NewNopLogger(), nil, nil, nil) require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, db.Close()) }) compareQuery(t, - db, - expSamples, - nil, + db, expSamples, nil, + labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"), + ) + require.NoError(t, db.Close()) + + lag, err := bb.getLagForPartition(ctx, 0) + require.NoError(t, err) + state := PartitionStateFromLag(bb.logger, lag, bb.fallbackOffsetMillis) + require.Equal(t, len(producedSamples)-4, int(state.Commit.At)) // Commit point is where to start next. + require.Equal(t, len(producedSamples)-5, int(state.LastSeenOffset)) + + // Consuming until end gets all the samples in the block. + err = bb.nextConsumeCycle(ctx, cycleEnd) + require.NoError(t, err) + + compareQueryWithDir(t, + path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"), + producedSamples, nil, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"), ) + + lag, err = bb.getLagForPartition(ctx, 0) + require.NoError(t, err) + state = PartitionStateFromLag(bb.logger, lag, bb.fallbackOffsetMillis) + require.Equal(t, len(producedSamples), int(state.Commit.At)) // Commit point is where to start next. + require.Equal(t, len(producedSamples)-1, int(state.LastSeenOffset)) } func blockBuilderPullModeConfig(t *testing.T, addr string) (Config, *validation.Overrides) { diff --git a/pkg/blockbuilder/config.go b/pkg/blockbuilder/config.go index f2e61d57fc..11e73e6c35 100644 --- a/pkg/blockbuilder/config.go +++ b/pkg/blockbuilder/config.go @@ -31,6 +31,8 @@ type Config struct { ApplyMaxGlobalSeriesPerUserBelow int `yaml:"apply_max_global_series_per_user_below" category:"experimental"` + NoPartiallyConsumedRegion bool `yaml:"no_partially_consumed_region" category:"experimental"` + // Config parameters defined outside the block-builder config and are injected dynamically. Kafka ingest.KafkaConfig `yaml:"-"` BlocksStorage tsdb.BlocksStorageConfig `yaml:"-"` @@ -58,7 +60,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.DurationVar(&cfg.ConsumeIntervalBuffer, "block-builder.consume-interval-buffer", 15*time.Minute, "Extra buffer between subsequent consumption cycles. To avoid small blocks the block-builder consumes until the last hour boundary of the consumption interval, plus the buffer.") f.DurationVar(&cfg.LookbackOnNoCommit, "block-builder.lookback-on-no-commit", 12*time.Hour, "How much of the historical records to look back when there is no kafka commit for a partition.") f.IntVar(&cfg.ApplyMaxGlobalSeriesPerUserBelow, "block-builder.apply-max-global-series-per-user-below", 0, "Apply the global series limit per partition if the global series limit for the user is <= this given value. 0 means limits are disabled. If a user's limit is more than the given value, then the limits are not applied as well.") - + f.BoolVar(&cfg.NoPartiallyConsumedRegion, "block-builder.no-partially-consumed-region", false, "Get rid of the 'last seen' logic and instead consume everything between two offsets to build the block.") cfg.SchedulerConfig.GRPCClientConfig.RegisterFlags(f) } diff --git a/pkg/blockbuilder/tsdb.go b/pkg/blockbuilder/tsdb.go index acecdc54b2..c0bbbf8406 100644 --- a/pkg/blockbuilder/tsdb.go +++ b/pkg/blockbuilder/tsdb.go @@ -78,7 +78,7 @@ func NewTSDBBuilder(logger log.Logger, dataDir string, blocksStorageCfg mimir_ts // lastBlockMax: max time of the block in the previous block building cycle. // blockMax: max time of the block in the current block building cycle. This blockMax is exclusive of the last sample by design in TSDB. // recordAlreadyProcessed: true if the record was processed in the previous cycle. (It gets processed again if some samples did not fit in the previous cycle.) -func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax, blockMax int64, recordAlreadyProcessed bool) (_ bool, err error) { +func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax, blockMax int64, recordAlreadyProcessed, processEverything bool) (_ bool, err error) { userID := string(rec.Key) req := mimirpb.PreallocWriteRequest{ @@ -133,12 +133,12 @@ func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax ref, copiedLabels := app.GetRef(nonCopiedLabels, hash) for _, s := range ts.Samples { - if s.TimestampMs >= blockMax { + if !processEverything && s.TimestampMs >= blockMax { // We will process this sample in the next cycle. allSamplesProcessed = false continue } - if recordAlreadyProcessed && s.TimestampMs < lastBlockMax { + if !processEverything && recordAlreadyProcessed && s.TimestampMs < lastBlockMax { // This sample was already processed in the previous cycle. continue } @@ -170,12 +170,12 @@ func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax } for _, h := range ts.Histograms { - if h.Timestamp >= blockMax { + if !processEverything && h.Timestamp >= blockMax { // We will process this sample in the next cycle. allSamplesProcessed = false continue } - if recordAlreadyProcessed && h.Timestamp < lastBlockMax { + if !processEverything && recordAlreadyProcessed && h.Timestamp < lastBlockMax { // This sample was already processed in the previous cycle. continue } diff --git a/pkg/blockbuilder/tsdb_test.go b/pkg/blockbuilder/tsdb_test.go index ff36eb1830..870daa90c1 100644 --- a/pkg/blockbuilder/tsdb_test.go +++ b/pkg/blockbuilder/tsdb_test.go @@ -99,13 +99,13 @@ func TestTSDBBuilder(t *testing.T) { } addFloatSample := func(builder *TSDBBuilder, ts int64, val float64, lastEnd, currEnd int64, recordProcessedBefore, wantAccepted bool) { rec := createRequest(userID, floatSample(ts, val), nil, wantAccepted) - allProcessed, err := builder.Process(context.Background(), rec, lastEnd, currEnd, recordProcessedBefore) + allProcessed, err := builder.Process(context.Background(), rec, lastEnd, currEnd, recordProcessedBefore, false) require.NoError(t, err) require.Equal(t, wantAccepted, allProcessed) } addHistogramSample := func(builder *TSDBBuilder, ts int64, lastEnd, currEnd int64, recordProcessedBefore, wantAccepted bool) { rec := createRequest(userID, nil, histogramSample(ts), wantAccepted) - allProcessed, err := builder.Process(context.Background(), rec, lastEnd, currEnd, recordProcessedBefore) + allProcessed, err := builder.Process(context.Background(), rec, lastEnd, currEnd, recordProcessedBefore, false) require.NoError(t, err) require.Equal(t, wantAccepted, allProcessed) } @@ -226,7 +226,7 @@ func TestTSDBBuilder(t *testing.T) { // This one goes into the block. samples := floatSample(lastEnd+20, 1) rec := createRequest(userID, samples, nil, false) - allProcessed, err := builder.Process(context.Background(), rec, lastEnd, currEnd, false) + allProcessed, err := builder.Process(context.Background(), rec, lastEnd, currEnd, false, false) require.NoError(t, err) require.True(t, allProcessed) expOOOSamples := append([]mimirpb.Sample(nil), samples...) @@ -234,7 +234,7 @@ func TestTSDBBuilder(t *testing.T) { // This one doesn't go into the block because of "ErrOutOfOrderSample" (soft error) samples = floatSample(lastEnd-20, 1) rec = createRequest(userID, samples, nil, false) - allProcessed, err = builder.Process(context.Background(), rec, lastEnd, currEnd, false) + allProcessed, err = builder.Process(context.Background(), rec, lastEnd, currEnd, false, false) require.NoError(t, err) require.True(t, allProcessed) @@ -305,6 +305,16 @@ func TestTSDBBuilder_CompactAndUpload_fail(t *testing.T) { require.ErrorIs(t, err, errUploadFailed) } +func compareQueryWithDir(t *testing.T, bucketDir string, expSamples []mimirpb.Sample, expHistograms []mimirpb.Histogram, matchers ...*labels.Matcher) *tsdb.DB { + db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, db.Close()) }) + + compareQuery(t, db, expSamples, expHistograms, matchers...) + + return db +} + func compareQuery(t *testing.T, db *tsdb.DB, expSamples []mimirpb.Sample, expHistograms []mimirpb.Histogram, matchers ...*labels.Matcher) { querier, err := db.Querier(math.MinInt64, math.MaxInt64) require.NoError(t, err) @@ -389,7 +399,7 @@ func TestProcessingEmptyRequest(t *testing.T) { data, err := req.Marshal() require.NoError(t, err) rec.Value = data - allProcessed, err := builder.Process(context.Background(), &rec, lastEnd, currEnd, false) + allProcessed, err := builder.Process(context.Background(), &rec, lastEnd, currEnd, false, false) require.NoError(t, err) require.True(t, allProcessed) @@ -398,7 +408,7 @@ func TestProcessingEmptyRequest(t *testing.T) { data, err = req.Marshal() require.NoError(t, err) rec.Value = data - allProcessed, err = builder.Process(context.Background(), &rec, lastEnd, currEnd, false) + allProcessed, err = builder.Process(context.Background(), &rec, lastEnd, currEnd, false, false) require.NoError(t, err) require.True(t, allProcessed) @@ -459,7 +469,7 @@ func TestTSDBBuilderLimits(t *testing.T) { for seriesID := 1; seriesID <= 100; seriesID++ { for userID := range limits { rec := createRequest(userID, seriesID) - allProcessed, err := builder.Process(context.Background(), rec, lastEnd, currEnd, false) + allProcessed, err := builder.Process(context.Background(), rec, lastEnd, currEnd, false, false) require.NoError(t, err) require.Equal(t, true, allProcessed) } @@ -515,7 +525,7 @@ func TestTSDBBuilderNativeHistogramEnabledError(t *testing.T) { Key: []byte(userID), Value: createWriteRequest(t, strconv.Itoa(seriesID), nil, histogramSample(ts)), } - allProcessed, err := builder.Process(context.Background(), rec, lastEnd, currEnd, false) + allProcessed, err := builder.Process(context.Background(), rec, lastEnd, currEnd, false, false) require.NoError(t, err) require.Equal(t, true, allProcessed) }