Skip to content

Commit

Permalink
Add TestNoPartiallyConsumedRegions
Browse files Browse the repository at this point in the history
Signed-off-by: Ganesh Vernekar <[email protected]>
  • Loading branch information
codesome committed Jan 23, 2025
1 parent 74414cc commit 4167f08
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 93 deletions.
206 changes: 113 additions & 93 deletions pkg/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"context"
"errors"
"fmt"
"github.com/prometheus/common/promslog"
"github.com/prometheus/prometheus/tsdb"
"os"
"path"
"strings"
Expand All @@ -17,9 +19,7 @@ import (
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/promslog"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kadm"
Expand Down Expand Up @@ -166,14 +166,9 @@ func TestBlockBuilder_StartWithExistingCommit(t *testing.T) {
// Because there is a commit, on startup, block-builder must consume samples only after the commit.
expSamples := producedSamples[1+(len(producedSamples)/2):]

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1")
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
compareQuery(t,
db,
expSamples,
nil,
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"),
expSamples, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
}
Expand Down Expand Up @@ -257,14 +252,9 @@ func TestBlockBuilder_StartWithExistingCommit_PullMode(t *testing.T) {
// Because there is a commit, on startup, block-builder must consume samples only after the commit.
expSamples := producedSamples[1+(len(producedSamples)/2):]

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1")
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
compareQuery(t,
db,
expSamples,
nil,
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"),
expSamples, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
}
Expand Down Expand Up @@ -318,15 +308,10 @@ func TestBlockBuilder_StartWithLookbackOnNoCommit(t *testing.T) {
cortex_blockbuilder_consumer_lag_records{partition="1"} 0
`), "cortex_blockbuilder_consumer_lag_records"))

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1")
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
// There should be no samples in the tsdb.
compareQuery(t,
db,
nil,
nil,
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"),
nil, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
}
Expand Down Expand Up @@ -394,15 +379,9 @@ func TestBlockBuilder_StartWithLookbackOnNoCommit_PullMode(t *testing.T) {
scheduler.completeJobCalls,
)

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1")
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
// There should be no samples in the tsdb.
compareQuery(t,
db,
nil,
nil,
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"),
nil, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
}
Expand Down Expand Up @@ -470,14 +449,9 @@ func TestBlockBuilder_ReachHighWatermarkBeforeLastCycleSection(t *testing.T) {
cortex_blockbuilder_consumer_lag_records{partition="1"} 1
`), "cortex_blockbuilder_consumer_lag_records"))

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1")
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
compareQuery(t,
db,
producedSamples,
nil,
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"),
producedSamples, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
}
Expand Down Expand Up @@ -581,14 +555,9 @@ func TestBlockBuilder_ReachHighWatermarkBeforeLastCycleSection_PullMode(t *testi
scheduler.completeJobCalls,
)

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1")
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
compareQuery(t,
db,
producedSamples,
nil,
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"),
producedSamples, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
}
Expand Down Expand Up @@ -641,15 +610,9 @@ func TestBlockBuilder_WithMultipleTenants(t *testing.T) {
require.Eventually(t, func() bool { return kafkaCommits.Load() > 1 }, 5*time.Second, 100*time.Millisecond, "expected kafka commits")

for _, tenant := range tenants {
bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, tenant)
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })

compareQuery(t,
db,
producedPerTenantSamples[tenant],
nil,
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, tenant),
producedPerTenantSamples[tenant], nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
}
Expand Down Expand Up @@ -723,15 +686,9 @@ func TestBlockBuilder_WithMultipleTenants_PullMode(t *testing.T) {
)

for _, tenant := range tenants {
bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, tenant)
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })

compareQuery(t,
db,
producedPerTenantSamples[tenant],
nil,
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, tenant),
producedPerTenantSamples[tenant], nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
}
Expand Down Expand Up @@ -817,16 +774,9 @@ func TestBlockBuilder_WithNonMonotonicRecordTimestamps(t *testing.T) {
runTest := func(name string, end time.Time, expSamples []mimirpb.Sample) {
t.Run(name, func(t *testing.T) {
require.NoError(t, bb.nextConsumeCycle(ctx, end))

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, tenantID)
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })

compareQuery(t,
db,
expSamples,
nil,
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, tenantID),
expSamples, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
})
Expand Down Expand Up @@ -889,14 +839,9 @@ func TestBlockBuilder_RetryOnTransientErrors(t *testing.T) {
// We expect at least several cycles because of how the pushed records were structured.
require.Eventually(t, func() bool { return kafkaCommits.Load() >= 1 }, 50*time.Second, 100*time.Millisecond, "expected kafka commits")

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1")
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
compareQuery(t,
db,
producedSamples,
nil,
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"),
producedSamples, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
}
Expand Down Expand Up @@ -1180,16 +1125,91 @@ func TestPullMode(t *testing.T) {
scheduler.completeJobCalls,
)

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1")
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"),
expSamples, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
}

func TestNoPartiallyConsumedRegions(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancelCause(context.Background())
t.Cleanup(func() { cancel(errors.New("test done")) })

kafkaCluster, kafkaAddr := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, numPartitions, testTopic)
kafkaClient := mustKafkaClient(t, kafkaAddr)
kafkaClient.AddConsumeTopics(testTopic)

cfg, overrides := blockBuilderConfig(t, kafkaAddr)
cfg.NoPartiallyConsumedRegion = true

// Set up a hook to track commits from block-builder to kafka. Those indicate the end of a cycle.
kafkaCommits := atomic.NewInt32(0)
kafkaCluster.ControlKey(kmsg.OffsetCommit.Int16(), func(kmsg.Request) (kmsg.Response, error, bool) {
kafkaCommits.Add(1)
return nil, nil, false
})

bb, err := New(cfg, test.NewTestingLogger(t), prometheus.NewPedanticRegistry(), overrides)
require.NoError(t, err)

// NoPartiallyConsumedRegion changes the buffer to 5 mins.
require.Equal(t, 5*time.Minute, bb.cfg.ConsumeIntervalBuffer)

require.NoError(t, bb.starting(ctx))
t.Cleanup(func() {
require.NoError(t, bb.stoppingStandaloneMode(nil))
})

// Producing some records
cycleEnd := time.Now().Truncate(cfg.ConsumeInterval).Add(10 * time.Minute)
var producedSamples []mimirpb.Sample
kafkaRecTime := cycleEnd.Truncate(cfg.ConsumeInterval).Add(-2 * time.Hour)
for kafkaRecTime.Before(cycleEnd) {
samples := produceSamples(ctx, t, kafkaClient, 0, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute))
producedSamples = append(producedSamples, samples...)

kafkaRecTime = kafkaRecTime.Add(time.Minute)
}
require.NotEmpty(t, producedSamples)

// Since there is no partial region, all the samples until the 5th minute is consumed.
// Since we are excluding 5 mins, there will be only 4 samples missing.
err = bb.nextConsumeCycle(ctx, cycleEnd.Add(-5*time.Minute))
require.NoError(t, err)
expSamples := producedSamples[:len(producedSamples)-4]

db, err := tsdb.Open(path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"), promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
compareQuery(t,
db,
expSamples,
nil,
db, expSamples, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
require.NoError(t, db.Close())

lag, err := bb.getLagForPartition(ctx, 0)
require.NoError(t, err)
state := PartitionStateFromLag(bb.logger, lag, bb.fallbackOffsetMillis)
require.Equal(t, len(producedSamples)-4, int(state.Commit.At)) // Commit point is where to start next.
require.Equal(t, len(producedSamples)-5, int(state.LastSeenOffset))

// Consuming until end gets all the samples in the block.
err = bb.nextConsumeCycle(ctx, cycleEnd)
require.NoError(t, err)

compareQueryWithDir(t,
path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1"),
producedSamples, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)

lag, err = bb.getLagForPartition(ctx, 0)
require.NoError(t, err)
state = PartitionStateFromLag(bb.logger, lag, bb.fallbackOffsetMillis)
require.Equal(t, len(producedSamples), int(state.Commit.At)) // Commit point is where to start next.
require.Equal(t, len(producedSamples)-1, int(state.LastSeenOffset))
}

func blockBuilderPullModeConfig(t *testing.T, addr string) (Config, *validation.Overrides) {
Expand Down
10 changes: 10 additions & 0 deletions pkg/blockbuilder/tsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,16 @@ func TestTSDBBuilder_CompactAndUpload_fail(t *testing.T) {
require.ErrorIs(t, err, errUploadFailed)
}

func compareQueryWithDir(t *testing.T, bucketDir string, expSamples []mimirpb.Sample, expHistograms []mimirpb.Histogram, matchers ...*labels.Matcher) *tsdb.DB {
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })

compareQuery(t, db, expSamples, expHistograms, matchers...)

return db
}

func compareQuery(t *testing.T, db *tsdb.DB, expSamples []mimirpb.Sample, expHistograms []mimirpb.Histogram, matchers ...*labels.Matcher) {
querier, err := db.Querier(math.MinInt64, math.MaxInt64)
require.NoError(t, err)
Expand Down

0 comments on commit 4167f08

Please sign in to comment.