diff --git a/pkg/bloombuild/planner/metrics.go b/pkg/bloombuild/planner/metrics.go index 3f68ab5206303..165f530f1709c 100644 --- a/pkg/bloombuild/planner/metrics.go +++ b/pkg/bloombuild/planner/metrics.go @@ -32,6 +32,9 @@ type Metrics struct { buildCompleted *prometheus.CounterVec buildTime *prometheus.HistogramVec + blocksDeleted prometheus.Counter + metasDeleted prometheus.Counter + tenantsDiscovered prometheus.Counter } @@ -107,6 +110,19 @@ func NewMetrics( Buckets: prometheus.DefBuckets, }, []string{"status"}), + blocksDeleted: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "blocks_deleted_total", + Help: "Number of blocks deleted", + }), + metasDeleted: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "metas_deleted_total", + Help: "Number of metas deleted", + }), + tenantsDiscovered: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, diff --git a/pkg/bloombuild/planner/planner.go b/pkg/bloombuild/planner/planner.go index 287a859745f5a..106716d47b584 100644 --- a/pkg/bloombuild/planner/planner.go +++ b/pkg/bloombuild/planner/planner.go @@ -3,6 +3,7 @@ package planner import ( "context" "fmt" + "math" "sort" "sync" "time" @@ -156,6 +157,17 @@ func (p *Planner) running(ctx context.Context) error { } } +type tenantTableTaskResults struct { + tasksToWait int + originalMetas []bloomshipper.Meta + resultsCh chan *protos.TaskResult +} + +type tenantTable struct { + table config.DayTable + tenant string +} + func (p *Planner) runOne(ctx context.Context) error { var ( start = time.Now() @@ -171,39 +183,79 @@ func (p *Planner) runOne(ctx context.Context) error { tables := p.tables(time.Now()) level.Debug(p.logger).Log("msg", "loaded tables", "tables", tables.TotalDays()) - work, err := p.loadWork(ctx, tables) + work, err := p.loadTenantWork(ctx, tables) if err != nil { return fmt.Errorf("error loading work: %w", err) } + // For deletion, we need to aggregate the results for each table and tenant tuple + // We cannot delete the returned tombstoned metas as soon as a task finishes since + // other tasks may still be using the now tombstoned metas + tasksResultForTenantTable := make(map[tenantTable]tenantTableTaskResults) var totalTasks int - for _, w := range work { - logger := log.With(p.logger, "tenant", w.tenant, "table", w.table.Addr(), "ownership", w.ownershipRange.String()) - gaps, err := p.findGapsForBounds(ctx, w.tenant, w.table, w.ownershipRange) - if err != nil { - level.Error(logger).Log("msg", "error finding gaps", "err", err) - continue - } + for table, tenants := range work { + for tenant, ownershipRanges := range tenants { + logger := log.With(p.logger, "tenant", tenant, "table", table.Addr()) + tt := tenantTable{ + tenant: tenant, + table: table, + } - now := time.Now() - for _, gap := range gaps { - totalTasks++ + tasks, existingMetas, err := p.computeTasks(ctx, table, tenant, ownershipRanges) + if err != nil { + level.Error(logger).Log("msg", "error computing tasks", "err", err) + continue + } - task := NewTask( - ctx, now, - protos.NewTask(w.table, w.tenant, w.ownershipRange, gap.tsdb, gap.gaps), - ) + var tenantTableEnqueuedTasks int + resultsCh := make(chan *protos.TaskResult, len(tasks)) - if err := p.enqueueTask(task); err != nil { - level.Error(logger).Log("msg", "error enqueuing task", "err", err) - continue + now := time.Now() + for _, task := range tasks { + queueTask := NewQueueTask(ctx, now, task, resultsCh) + if err := p.enqueueTask(queueTask); err != nil { + level.Error(logger).Log("msg", "error enqueuing task", "err", err) + continue + } + + totalTasks++ + tenantTableEnqueuedTasks++ + } + + tasksResultForTenantTable[tt] = tenantTableTaskResults{ + tasksToWait: tenantTableEnqueuedTasks, + originalMetas: existingMetas, + resultsCh: resultsCh, } + + level.Debug(logger).Log("msg", "enqueued tasks", "tasks", tenantTableEnqueuedTasks) } } level.Debug(p.logger).Log("msg", "planning completed", "tasks", totalTasks) + // Create a goroutine to process the results for each table tenant tuple + // TODO(salvacorts): This may end up creating too many goroutines. + // Create a pool of workers to process table-tenant tuples. + var wg sync.WaitGroup + for tt, results := range tasksResultForTenantTable { + wg.Add(1) + go func(table config.DayTable, tenant string, results tenantTableTaskResults) { + defer wg.Done() + + if err := p.processTenantTaskResults( + ctx, table, tenant, + results.originalMetas, results.tasksToWait, results.resultsCh, + ); err != nil { + level.Error(p.logger).Log("msg", "failed to process tenant task results", "err", err) + } + }(tt.table, tt.tenant, results) + } + + level.Debug(p.logger).Log("msg", "waiting for all tasks to be completed", "tasks", totalTasks, "tenantTables", len(tasksResultForTenantTable)) + wg.Wait() + status = statusSuccess level.Info(p.logger).Log( "msg", "bloom build iteration completed", @@ -212,6 +264,180 @@ func (p *Planner) runOne(ctx context.Context) error { return nil } +// computeTasks computes the tasks for a given table and tenant and ownership range. +// It returns the tasks to be executed and the metas that are existing relevant for the ownership range. +func (p *Planner) computeTasks( + ctx context.Context, + table config.DayTable, + tenant string, + ownershipRanges []v1.FingerprintBounds, +) ([]*protos.Task, []bloomshipper.Meta, error) { + var tasks []*protos.Task + logger := log.With(p.logger, "table", table.Addr(), "tenant", tenant) + + // Fetch source metas to be used in both build and cleanup of out-of-date metas+blooms + metas, err := p.bloomStore.FetchMetas( + ctx, + bloomshipper.MetaSearchParams{ + TenantID: tenant, + Interval: bloomshipper.NewInterval(table.Bounds()), + Keyspace: v1.NewBounds(0, math.MaxUint64), + }, + ) + if err != nil { + return nil, nil, fmt.Errorf("failed to get metas: %w", err) + } + + for _, ownershipRange := range ownershipRanges { + logger := log.With(logger, "ownership", ownershipRange.String()) + + // Filter only the metas that overlap in the ownership range + metasInBounds := bloomshipper.FilterMetasOverlappingBounds(metas, ownershipRange) + level.Debug(logger).Log("msg", "found relevant metas", "metas", len(metasInBounds)) + + // Find gaps in the TSDBs for this tenant/table + gaps, err := p.findOutdatedGaps(ctx, tenant, table, ownershipRange, metasInBounds, logger) + if err != nil { + level.Error(logger).Log("msg", "failed to find outdated gaps", "err", err) + continue + } + + for _, gap := range gaps { + tasks = append(tasks, protos.NewTask(table, tenant, ownershipRange, gap.tsdb, gap.gaps)) + } + } + + return tasks, metas, nil +} + +func (p *Planner) processTenantTaskResults( + ctx context.Context, + table config.DayTable, + tenant string, + originalMetas []bloomshipper.Meta, + totalTasks int, + resultsCh <-chan *protos.TaskResult, +) error { + logger := log.With(p.logger, table, table.Addr(), "tenant", tenant) + level.Debug(logger).Log("msg", "waiting for all tasks to be completed", "tasks", totalTasks) + + newMetas := make([]bloomshipper.Meta, 0, totalTasks) + for i := 0; i < totalTasks; i++ { + select { + case <-ctx.Done(): + if err := ctx.Err(); err != nil && !errors.Is(err, context.Canceled) { + level.Error(logger).Log("msg", "planner context done with error", "err", err) + return err + } + + // No error or context canceled, just return + level.Debug(logger).Log("msg", "context done while waiting for task results") + return nil + case result := <-resultsCh: + if result == nil { + level.Error(logger).Log("msg", "received nil task result") + continue + } + if result.Error != nil { + level.Error(logger).Log( + "msg", "task failed", + "err", result.Error, + "task", result.TaskID, + ) + continue + } + + newMetas = append(newMetas, result.CreatedMetas...) + } + } + + level.Debug(logger).Log( + "msg", "all tasks completed", + "tasks", totalTasks, + "originalMetas", len(originalMetas), + "newMetas", len(newMetas), + ) + + if len(newMetas) == 0 { + // No new metas were created, nothing to delete + // Note: this would only happen if all tasks failed + return nil + } + + combined := append(originalMetas, newMetas...) + outdated, err := outdatedMetas(combined) + if err != nil { + return fmt.Errorf("failed to find outdated metas: %w", err) + } + level.Debug(logger).Log("msg", "found outdated metas", "outdated", len(outdated)) + + if err := p.deleteOutdatedMetasAndBlocks(ctx, table, tenant, outdated); err != nil { + return fmt.Errorf("failed to delete outdated metas: %w", err) + } + + return nil +} + +func (p *Planner) deleteOutdatedMetasAndBlocks( + ctx context.Context, + table config.DayTable, + tenant string, + metas []bloomshipper.Meta, +) error { + logger := log.With(p.logger, "table", table.Addr(), "tenant", tenant) + + client, err := p.bloomStore.Client(table.ModelTime()) + if err != nil { + level.Error(logger).Log("msg", "failed to get client", "err", err) + return errors.Wrap(err, "failed to get client") + } + + var ( + deletedMetas int + deletedBlocks int + ) + defer func() { + p.metrics.metasDeleted.Add(float64(deletedMetas)) + p.metrics.blocksDeleted.Add(float64(deletedBlocks)) + }() + + for _, meta := range metas { + for _, block := range meta.Blocks { + if err := client.DeleteBlocks(ctx, []bloomshipper.BlockRef{block}); err != nil { + if client.IsObjectNotFoundErr(err) { + level.Debug(logger).Log("msg", "block not found while attempting delete, continuing", "block", block.String()) + } else { + level.Error(logger).Log("msg", "failed to delete block", "err", err, "block", block.String()) + return errors.Wrap(err, "failed to delete block") + } + } + + deletedBlocks++ + level.Debug(logger).Log("msg", "removed outdated block", "block", block.String()) + } + + err = client.DeleteMetas(ctx, []bloomshipper.MetaRef{meta.MetaRef}) + if err != nil { + if client.IsObjectNotFoundErr(err) { + level.Debug(logger).Log("msg", "meta not found while attempting delete, continuing", "meta", meta.MetaRef.String()) + } else { + level.Error(logger).Log("msg", "failed to delete meta", "err", err, "meta", meta.MetaRef.String()) + return errors.Wrap(err, "failed to delete meta") + } + } + deletedMetas++ + level.Debug(logger).Log("msg", "removed outdated meta", "meta", meta.MetaRef.String()) + } + + level.Debug(logger).Log( + "msg", "deleted outdated metas and blocks", + "metas", deletedMetas, + "blocks", deletedBlocks, + ) + + return nil +} + func (p *Planner) tables(ts time.Time) *dayRangeIterator { // adjust the minimum by one to make it inclusive, which is more intuitive // for a configuration variable @@ -228,21 +454,15 @@ func (p *Planner) tables(ts time.Time) *dayRangeIterator { return newDayRangeIterator(fromDay, throughDay, p.schemaCfg) } -type tenantTableRange struct { - tenant string - table config.DayTable - ownershipRange v1.FingerprintBounds +type work map[config.DayTable]map[string][]v1.FingerprintBounds - // TODO: Add tracking - //finished bool - //queueTime, startTime, endTime time.Time -} - -func (p *Planner) loadWork( +// loadTenantWork loads the work for each tenant and table tuple. +// work is the list of fingerprint ranges that need to be indexed in bloom filters. +func (p *Planner) loadTenantWork( ctx context.Context, tables *dayRangeIterator, -) ([]tenantTableRange, error) { - var work []tenantTableRange +) (work, error) { + tenantTableWork := make(map[config.DayTable]map[string][]v1.FingerprintBounds, tables.TotalDays()) for tables.Next() && tables.Err() == nil && ctx.Err() == nil { table := tables.At() @@ -254,6 +474,11 @@ func (p *Planner) loadWork( } level.Debug(p.logger).Log("msg", "loaded tenants", "table", table, "tenants", tenants.Len()) + // If this is the first this we see this table, initialize the map + if tenantTableWork[table] == nil { + tenantTableWork[table] = make(map[string][]v1.FingerprintBounds, tenants.Len()) + } + for tenants.Next() && tenants.Err() == nil && ctx.Err() == nil { p.metrics.tenantsDiscovered.Inc() tenant := tenants.At() @@ -265,13 +490,7 @@ func (p *Planner) loadWork( splitFactor := p.limits.BloomSplitSeriesKeyspaceBy(tenant) bounds := SplitFingerprintKeyspaceByFactor(splitFactor) - for _, bounds := range bounds { - work = append(work, tenantTableRange{ - tenant: tenant, - table: table, - ownershipRange: bounds, - }) - } + tenantTableWork[table][tenant] = bounds level.Debug(p.logger).Log("msg", "loading work for tenant", "table", table, "tenant", tenant, "splitFactor", splitFactor) } @@ -286,7 +505,7 @@ func (p *Planner) loadWork( return nil, fmt.Errorf("error iterating tables: %w", err) } - return work, ctx.Err() + return tenantTableWork, ctx.Err() } func (p *Planner) tenants(ctx context.Context, table config.DayTable) (*v1.SliceIter[string], error) { @@ -298,47 +517,6 @@ func (p *Planner) tenants(ctx context.Context, table config.DayTable) (*v1.Slice return v1.NewSliceIter(tenants), nil } -/* -Planning works as follows, split across many functions for clarity: - 1. Fetch all meta.jsons for the given tenant and table which overlap the ownership range of this compactor. - 2. Load current TSDBs for this tenant/table. - 3. For each live TSDB (there should be only 1, but this works with multiple), find any gaps - (fingerprint ranges) which are not up-to-date, determined by checking other meta.json files and comparing - the TSDBs they were generated from as well as their ownership ranges. -*/ -func (p *Planner) findGapsForBounds( - ctx context.Context, - tenant string, - table config.DayTable, - ownershipRange v1.FingerprintBounds, -) ([]blockPlan, error) { - logger := log.With(p.logger, "org_id", tenant, "table", table.Addr(), "ownership", ownershipRange.String()) - - // Fetch source metas to be used in both build and cleanup of out-of-date metas+blooms - metas, err := p.bloomStore.FetchMetas( - ctx, - bloomshipper.MetaSearchParams{ - TenantID: tenant, - Interval: bloomshipper.NewInterval(table.Bounds()), - Keyspace: ownershipRange, - }, - ) - if err != nil { - level.Error(logger).Log("msg", "failed to get metas", "err", err) - return nil, fmt.Errorf("failed to get metas: %w", err) - } - - level.Debug(logger).Log("msg", "found relevant metas", "metas", len(metas)) - - // Find gaps in the TSDBs for this tenant/table - gaps, err := p.findOutdatedGaps(ctx, tenant, table, ownershipRange, metas, logger) - if err != nil { - return nil, fmt.Errorf("failed to find outdated gaps: %w", err) - } - - return gaps, nil -} - // blockPlan is a plan for all the work needed to build a meta.json // It includes: // - the tsdb (source of truth) which contains all the series+chunks @@ -507,11 +685,11 @@ func blockPlansForGaps(tsdbs []tsdbGaps, metas []bloomshipper.Meta) ([]blockPlan return plans, nil } -func (p *Planner) addPendingTask(task *Task) { +func (p *Planner) addPendingTask(task *QueueTask) { p.pendingTasks.Store(task.ID, task) } -func (p *Planner) removePendingTask(task *Task) { +func (p *Planner) removePendingTask(task *QueueTask) { p.pendingTasks.Delete(task.ID) } @@ -523,7 +701,7 @@ func (p *Planner) totalPendingTasks() (total int) { return total } -func (p *Planner) enqueueTask(task *Task) error { +func (p *Planner) enqueueTask(task *QueueTask) error { p.activeUsers.UpdateUserTimestamp(task.Tenant, time.Now()) return p.tasksQueue.Enqueue(task.Tenant, nil, task, func() { task.timesEnqueued++ @@ -570,7 +748,8 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer return fmt.Errorf("dequeue() call resulted in nil response. builder: %s", builderID) } - task := item.(*Task) + task := item.(*QueueTask) + logger := log.With(logger, "task", task.ID) queueTime := time.Since(task.queueTime) p.metrics.queueDuration.Observe(queueTime.Seconds()) @@ -582,7 +761,8 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer continue } - if err := p.forwardTaskToBuilder(builder, builderID, task); err != nil { + result, err := p.forwardTaskToBuilder(builder, builderID, task) + if err != nil { maxRetries := p.limits.BloomTaskMaxRetries(task.Tenant) if maxRetries > 0 && task.timesEnqueued >= maxRetries { p.metrics.tasksFailed.Inc() @@ -593,6 +773,10 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer "maxRetries", maxRetries, "err", err, ) + task.resultsChannel <- &protos.TaskResult{ + TaskID: task.ID, + Error: fmt.Errorf("task failed after max retries (%d): %w", maxRetries, err), + } continue } @@ -601,13 +785,31 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer p.metrics.taskLost.Inc() p.removePendingTask(task) level.Error(logger).Log("msg", "error re-enqueuing task. this task will be lost", "err", err) + task.resultsChannel <- &protos.TaskResult{ + TaskID: task.ID, + Error: fmt.Errorf("error re-enqueuing task: %w", err), + } continue } p.metrics.tasksRequeued.Inc() - level.Error(logger).Log("msg", "error forwarding task to builder, Task requeued", "err", err) + level.Error(logger).Log( + "msg", "error forwarding task to builder, Task requeued", + "retries", task.timesEnqueued, + "err", err, + ) + continue } + level.Debug(logger).Log( + "msg", "task completed", + "duration", time.Since(task.queueTime).Seconds(), + "retries", task.timesEnqueued, + ) + p.removePendingTask(task) + + // Send the result back to the task. The channel is buffered, so this should not block. + task.resultsChannel <- result } return errPlannerIsNotRunning @@ -616,16 +818,14 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer func (p *Planner) forwardTaskToBuilder( builder protos.PlannerForBuilder_BuilderLoopServer, builderID string, - task *Task, -) error { - defer p.removePendingTask(task) - + task *QueueTask, +) (*protos.TaskResult, error) { msg := &protos.PlannerToBuilder{ Task: task.ToProtoTask(), } if err := builder.Send(msg); err != nil { - return fmt.Errorf("error sending task to builder (%s): %w", builderID, err) + return nil, fmt.Errorf("error sending task to builder (%s): %w", builderID, err) } // Launch a goroutine to wait for the response from the builder so we can @@ -651,12 +851,14 @@ func (p *Planner) forwardTaskToBuilder( select { case result := <-resultsCh: - // TODO: Return metas forward via channel - return result.Error + // Note: Errors from the result are not returned here since we don't retry tasks + // that return with an error. I.e. we won't retry errors forwarded from the builder. + // TODO(salvacorts): Filter and return errors that can be retried. + return result, nil case err := <-errCh: - return err + return nil, err case <-timeout: - return fmt.Errorf("timeout waiting for response from builder (%s)", builderID) + return nil, fmt.Errorf("timeout waiting for response from builder (%s)", builderID) } } @@ -666,7 +868,7 @@ func (p *Planner) forwardTaskToBuilder( func (p *Planner) receiveResultFromBuilder( builder protos.PlannerForBuilder_BuilderLoopServer, builderID string, - task *Task, + task *QueueTask, ) (*protos.TaskResult, error) { // If connection is closed, Recv() will return an error res, err := builder.Recv() diff --git a/pkg/bloombuild/planner/planner_test.go b/pkg/bloombuild/planner/planner_test.go index b46b987de751c..c76ef0e4d2679 100644 --- a/pkg/bloombuild/planner/planner_test.go +++ b/pkg/bloombuild/planner/planner_test.go @@ -3,12 +3,16 @@ package planner import ( "context" "fmt" + "io" + "math" + "sync" "testing" "time" "github.com/go-kit/log" "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/services" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" @@ -17,6 +21,7 @@ import ( "github.com/grafana/loki/v3/pkg/bloombuild/protos" "github.com/grafana/loki/v3/pkg/storage" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" + "github.com/grafana/loki/v3/pkg/storage/chunk/cache" "github.com/grafana/loki/v3/pkg/storage/chunk/client/local" "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" @@ -25,6 +30,9 @@ import ( "github.com/grafana/loki/v3/pkg/storage/types" ) +var testDay = parseDayTime("2023-09-01") +var testTable = config.NewDayTable(testDay, "index_") + func tsdbID(n int) tsdb.SingleTenantTSDBIdentifier { return tsdb.SingleTenantTSDBIdentifier{ TS: time.Unix(int64(n), 0), @@ -35,7 +43,9 @@ func genMeta(min, max model.Fingerprint, sources []int, blocks []bloomshipper.Bl m := bloomshipper.Meta{ MetaRef: bloomshipper.MetaRef{ Ref: bloomshipper.Ref{ - Bounds: v1.NewBounds(min, max), + TenantID: "fakeTenant", + TableName: testTable.Addr(), + Bounds: v1.NewBounds(min, max), }, }, Blocks: blocks, @@ -141,14 +151,26 @@ func Test_gapsBetweenTSDBsAndMetas(t *testing.T) { } func genBlockRef(min, max model.Fingerprint) bloomshipper.BlockRef { - bounds := v1.NewBounds(min, max) + startTS, endTS := testDay.Bounds() return bloomshipper.BlockRef{ Ref: bloomshipper.Ref{ - Bounds: bounds, + TenantID: "fakeTenant", + TableName: testTable.Addr(), + Bounds: v1.NewBounds(min, max), + StartTimestamp: startTS, + EndTimestamp: endTS, + Checksum: 0, }, } } +func genBlock(ref bloomshipper.BlockRef) bloomshipper.Block { + return bloomshipper.Block{ + BlockRef: ref, + Data: &DummyReadSeekCloser{}, + } +} + func Test_blockPlansForGaps(t *testing.T) { for _, tc := range []struct { desc string @@ -333,13 +355,14 @@ func Test_blockPlansForGaps(t *testing.T) { } } -func createTasks(n int) []*Task { - tasks := make([]*Task, 0, n) +func createTasks(n int, resultsCh chan *protos.TaskResult) []*QueueTask { + tasks := make([]*QueueTask, 0, n) // Enqueue tasks for i := 0; i < n; i++ { - task := NewTask( + task := NewQueueTask( context.Background(), time.Now(), - protos.NewTask(config.NewDayTable(config.NewDayTime(0), "fake"), "fakeTenant", v1.NewBounds(0, 10), tsdbID(1), nil), + protos.NewTask(config.NewDayTable(testDay, "fake"), "fakeTenant", v1.NewBounds(0, 10), tsdbID(1), nil), + resultsCh, ) tasks = append(tasks, task) } @@ -385,7 +408,12 @@ func createPlanner( } reg := prometheus.NewPedanticRegistry() - planner, err := New(cfg, limits, schemaCfg, storageCfg, storage.ClientMetrics{}, nil, logger, reg) + metasCache := cache.NewNoopCache() + blocksCache := bloomshipper.NewFsBlocksCache(storageCfg.BloomShipperConfig.BlocksCache, reg, logger) + bloomStore, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, storage.ClientMetrics{}, metasCache, blocksCache, reg, logger) + require.NoError(t, err) + + planner, err := New(cfg, limits, schemaCfg, storageCfg, storage.ClientMetrics{}, bloomStore, logger, reg) require.NoError(t, err) return planner @@ -432,9 +460,8 @@ func Test_BuilderLoop(t *testing.T) { modifyBuilder: func(builder *fakeBuilder) { builder.SetReturnErrorMsg(true) }, - resetBuilder: func(builder *fakeBuilder) { - builder.SetReturnErrorMsg(false) - }, + // We don't retry on error messages from the builder + shouldConsumeAfterModify: true, }, { name: "exceed max retries", @@ -487,7 +514,8 @@ func Test_BuilderLoop(t *testing.T) { }) // Enqueue tasks - tasks := createTasks(nTasks) + resultsCh := make(chan *protos.TaskResult, nTasks) + tasks := createTasks(nTasks, resultsCh) for _, task := range tasks { err = planner.enqueueTask(task) require.NoError(t, err) @@ -517,6 +545,11 @@ func Test_BuilderLoop(t *testing.T) { // Finally, the queue should be empty require.Equal(t, 0, planner.totalPendingTasks()) + // consume all tasks result to free up the channel for the next round of tasks + for i := 0; i < nTasks; i++ { + <-resultsCh + } + if tc.modifyBuilder != nil { // Configure builders to return errors for _, builder := range builders { @@ -568,6 +601,213 @@ func Test_BuilderLoop(t *testing.T) { } } +func putMetas(bloomClient bloomshipper.Client, metas []bloomshipper.Meta) error { + for _, meta := range metas { + err := bloomClient.PutMeta(context.Background(), meta) + if err != nil { + return err + } + + for _, block := range meta.Blocks { + err := bloomClient.PutBlock(context.Background(), genBlock(block)) + if err != nil { + return err + } + } + } + return nil +} + +func Test_processTenantTaskResults(t *testing.T) { + for _, tc := range []struct { + name string + + originalMetas []bloomshipper.Meta + taskResults []*protos.TaskResult + expectedMetas []bloomshipper.Meta + }{ + { + name: "errors", + originalMetas: []bloomshipper.Meta{ + genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), + genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(10, 20)}), + }, + taskResults: []*protos.TaskResult{ + { + TaskID: "1", + Error: errors.New("fake error"), + }, + { + TaskID: "2", + Error: errors.New("fake error"), + }, + }, + expectedMetas: []bloomshipper.Meta{ + // The original metas should remain unchanged + genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), + genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(10, 20)}), + }, + }, + { + name: "no new metas", + originalMetas: []bloomshipper.Meta{ + genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), + genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(10, 20)}), + }, + taskResults: []*protos.TaskResult{ + { + TaskID: "1", + }, + { + TaskID: "2", + }, + }, + expectedMetas: []bloomshipper.Meta{ + // The original metas should remain unchanged + genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), + genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(10, 20)}), + }, + }, + { + name: "no original metas", + taskResults: []*protos.TaskResult{ + { + TaskID: "1", + CreatedMetas: []bloomshipper.Meta{ + genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), + }, + }, + { + TaskID: "2", + CreatedMetas: []bloomshipper.Meta{ + genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(10, 20)}), + }, + }, + }, + expectedMetas: []bloomshipper.Meta{ + genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), + genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(10, 20)}), + }, + }, + { + name: "single meta covers all original", + originalMetas: []bloomshipper.Meta{ + genMeta(0, 5, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 5)}), + genMeta(6, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(6, 10)}), + }, + taskResults: []*protos.TaskResult{ + { + TaskID: "1", + CreatedMetas: []bloomshipper.Meta{ + genMeta(0, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), + }, + }, + }, + expectedMetas: []bloomshipper.Meta{ + genMeta(0, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), + }, + }, + { + name: "multi version ordering", + originalMetas: []bloomshipper.Meta{ + genMeta(0, 5, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 5)}), + genMeta(0, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), // only part of the range is outdated, must keep + }, + taskResults: []*protos.TaskResult{ + { + TaskID: "1", + CreatedMetas: []bloomshipper.Meta{ + genMeta(8, 10, []int{2}, []bloomshipper.BlockRef{genBlockRef(8, 10)}), + }, + }, + }, + expectedMetas: []bloomshipper.Meta{ + genMeta(0, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), + genMeta(8, 10, []int{2}, []bloomshipper.BlockRef{genBlockRef(8, 10)}), + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + logger := log.NewNopLogger() + //logger := log.NewLogfmtLogger(os.Stdout) + + cfg := Config{ + PlanningInterval: 1 * time.Hour, + MaxQueuedTasksPerTenant: 10000, + } + planner := createPlanner(t, cfg, &fakeLimits{}, logger) + + bloomClient, err := planner.bloomStore.Client(testDay.ModelTime()) + require.NoError(t, err) + + // Create original metas and blocks + err = putMetas(bloomClient, tc.originalMetas) + require.NoError(t, err) + + ctx, ctxCancel := context.WithCancel(context.Background()) + defer ctxCancel() + resultsCh := make(chan *protos.TaskResult, len(tc.taskResults)) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + + err = planner.processTenantTaskResults( + ctx, + testTable, + "fakeTenant", + tc.originalMetas, + len(tc.taskResults), + resultsCh, + ) + require.NoError(t, err) + }() + + for _, taskResult := range tc.taskResults { + if len(taskResult.CreatedMetas) > 0 { + // Emulate builder putting new metas to obj store + err = putMetas(bloomClient, taskResult.CreatedMetas) + require.NoError(t, err) + } + + resultsCh <- taskResult + } + + // Wait for all tasks to be processed and outdated metas/blocks deleted + wg.Wait() + + // Get all metas + metas, err := planner.bloomStore.FetchMetas( + context.Background(), + bloomshipper.MetaSearchParams{ + TenantID: "fakeTenant", + Interval: bloomshipper.NewInterval(testTable.Bounds()), + Keyspace: v1.NewBounds(0, math.MaxUint64), + }, + ) + require.NoError(t, err) + + // TODO(salvacorts): Fix this + // For some reason, when the tests are run in the CI, we do not encode the `loc` of model.Time for each TSDB. + // As a result, when we fetch them, the loc is empty whereas in the original metas, it is not. Therefore the + // comparison fails. As a workaround to fix the issue, we will manually reset the TS of the sources to the + // fetched metas + for i := range metas { + for j := range metas[i].Sources { + sec := metas[i].Sources[j].TS.Unix() + nsec := metas[i].Sources[j].TS.Nanosecond() + metas[i].Sources[j].TS = time.Unix(sec, int64(nsec)) + } + } + + // Compare metas + require.Equal(t, len(tc.expectedMetas), len(metas)) + require.ElementsMatch(t, tc.expectedMetas, metas) + }) + } +} + type fakeBuilder struct { id string tasks []*protos.Task @@ -709,3 +949,17 @@ func parseDayTime(s string) config.DayTime { Time: model.TimeFromUnix(t.Unix()), } } + +type DummyReadSeekCloser struct{} + +func (d *DummyReadSeekCloser) Read(_ []byte) (n int, err error) { + return 0, io.EOF +} + +func (d *DummyReadSeekCloser) Seek(_ int64, _ int) (int64, error) { + return 0, nil +} + +func (d *DummyReadSeekCloser) Close() error { + return nil +} diff --git a/pkg/bloombuild/planner/task.go b/pkg/bloombuild/planner/task.go index 1da39cea6bfd7..8580dd12a655f 100644 --- a/pkg/bloombuild/planner/task.go +++ b/pkg/bloombuild/planner/task.go @@ -7,19 +7,27 @@ import ( "github.com/grafana/loki/v3/pkg/bloombuild/protos" ) -type Task struct { +type QueueTask struct { *protos.Task + resultsChannel chan *protos.TaskResult + // Tracking timesEnqueued int queueTime time.Time ctx context.Context } -func NewTask(ctx context.Context, queueTime time.Time, task *protos.Task) *Task { - return &Task{ - Task: task, - ctx: ctx, - queueTime: queueTime, +func NewQueueTask( + ctx context.Context, + queueTime time.Time, + task *protos.Task, + resultsChannel chan *protos.TaskResult, +) *QueueTask { + return &QueueTask{ + Task: task, + resultsChannel: resultsChannel, + ctx: ctx, + queueTime: queueTime, } } diff --git a/pkg/bloombuild/planner/versioned_range.go b/pkg/bloombuild/planner/versioned_range.go new file mode 100644 index 0000000000000..3a436353954e1 --- /dev/null +++ b/pkg/bloombuild/planner/versioned_range.go @@ -0,0 +1,247 @@ +package planner + +import ( + "sort" + + "github.com/prometheus/common/model" + + v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" +) + +type tsdbToken struct { + through model.Fingerprint // inclusive + version int // TSDB version +} + +// a ring of token ranges used to identify old metas. +// each token represents that a TSDB version has covered the entire range +// up to that point from the previous token. +type tsdbTokenRange []tsdbToken + +func (t tsdbTokenRange) Len() int { + return len(t) +} + +func (t tsdbTokenRange) Less(i, j int) bool { + return t[i].through < t[j].through +} + +func (t tsdbTokenRange) Swap(i, j int) { + t[i], t[j] = t[j], t[i] +} + +// Add ensures a versioned set of bounds is added to the range. If the bounds are already +// covered by a more up to date version, it returns false. +func (t tsdbTokenRange) Add(version int, bounds v1.FingerprintBounds) (res tsdbTokenRange, added bool) { + // allows attempting to join neighboring token ranges with identical versions + // that aren't known until the end of the function + var shouldReassemble bool + var reassembleFrom int + defer func() { + if shouldReassemble { + res = res.reassemble(reassembleFrom) + } + }() + + // special case: first token + if len(t) == 0 { + tok := tsdbToken{through: bounds.Max, version: version} + // special case: first token is included in bounds, no need to fill negative space + if bounds.Min == 0 { + return append(t, tok), true + } + // Use a negative version to indicate that the range is not covered by any version. + return append(t, tsdbToken{through: bounds.Min - 1, version: -1}, tok), true + } + + // For non-nil token ranges, we continually update the range with newer versions. + for { + // find first token that covers the start of the range + i := sort.Search(len(t), func(i int) bool { + return t[i].through >= bounds.Min + }) + + if i == len(t) { + tok := tsdbToken{through: bounds.Max, version: version} + + // edge case: there is no gap between the previous token range + // and the new one; + // skip adding a negative token + if t[len(t)-1].through == bounds.Min-1 { + return append(t, tok), true + } + + // the range is not covered by any version and we are at the end of the range. + // Add a negative token and the new token. + negative := tsdbToken{through: bounds.Min - 1, version: -1} + return append(t, negative, tok), true + } + + // Otherwise, we've found a token that covers the start of the range. + newer := t[i].version < version + preExisting := t.boundsForToken(i) + if !newer { + if bounds.Within(preExisting) { + // The range is already covered by a more up to date version, no need + // to add anything, but honor if an earlier token was added + return t, added + } + + // The range is partially covered by a more up to date version; + // update the range we need to check and continue + bounds = v1.NewBounds(preExisting.Max+1, bounds.Max) + continue + } + + // If we need to update the range, there are 5 cases: + // 1. `equal`: the incoming range equals an existing range () + // ------ # addition + // ------ # src + // 2. `subset`: the incoming range is a subset of an existing range + // ------ # addition + // -------- # src + // 3. `overflow_both_sides`: the incoming range is a superset of an existing range. This is not possible + // because the first token in the ring implicitly covers the left bound (zero) of all possible fps. + // Therefore, we can skip this case. + // ------ # addition + // ---- # src + // 4. `right_overflow`: the incoming range overflows the right side of an existing range + // ------ # addition + // ------ # src + // 5. `left_overflow`: the incoming range overflows the left side of an existing range. This can be skipped + // for the same reason as `superset`. + // ------ # addition + // ------ # src + + // 1) (`equal`): we're replacing the same bounds + if bounds.Equal(preExisting) { + t[i].version = version + return t, true + } + + // 2) (`subset`): the incoming range is a subset of an existing range + if bounds.Within(preExisting) { + // 2a) the incoming range touches the existing range's minimum bound + if bounds.Min == preExisting.Min { + tok := tsdbToken{through: bounds.Max, version: version} + t = append(t, tsdbToken{}) + copy(t[i+1:], t[i:]) + t[i] = tok + return t, true + } + // 2b) the incoming range touches the existing range's maximum bound + if bounds.Max == preExisting.Max { + t[i].through = bounds.Min - 1 + tok := tsdbToken{through: bounds.Max, version: version} + t = append(t, tsdbToken{}) + copy(t[i+2:], t[i+1:]) + t[i+1] = tok + return t, true + } + + // 2c) the incoming range is does not touch either edge; + // add two tokens (the new one and a new left-bound for the old range) + tok := tsdbToken{through: bounds.Max, version: version} + t = append(t, tsdbToken{}, tsdbToken{}) + copy(t[i+2:], t[i:]) + t[i+1] = tok + t[i].through = bounds.Min - 1 + return t, true + } + + // 4) (`right_overflow`): the incoming range overflows the right side of an existing range + + // 4a) shortcut: the incoming range is a right-overlapping superset of the existing range. + // replace the existing token's version, update reassembly targets for merging neighboring ranges + // w/ the same version, and continue + if preExisting.Min == bounds.Min { + t[i].version = version + bounds.Min = preExisting.Max + 1 + added = true + if !shouldReassemble { + reassembleFrom = i + shouldReassemble = true + } + continue + } + + // 4b) the incoming range overlaps the right side of the existing range but + // does not touch the left side; + // add a new token for the right side of the existing range then update the reassembly targets + // and continue + overlap := tsdbToken{through: t[i].through, version: version} + t[i].through = bounds.Min - 1 + t = append(t, tsdbToken{}) + copy(t[i+2:], t[i+1:]) + t[i+1] = overlap + added = true + bounds.Min = overlap.through + 1 + if !shouldReassemble { + reassembleFrom = i + 1 + shouldReassemble = true + } + continue + } +} + +func (t tsdbTokenRange) boundsForToken(i int) v1.FingerprintBounds { + if i == 0 { + return v1.FingerprintBounds{Min: 0, Max: t[i].through} + } + return v1.FingerprintBounds{Min: t[i-1].through + 1, Max: t[i].through} +} + +// reassemble merges neighboring tokens with the same version +func (t tsdbTokenRange) reassemble(from int) tsdbTokenRange { + reassembleTo := from + for i := from; i < len(t)-1; i++ { + if t[i].version != t[i+1].version { + break + } + reassembleTo = i + 1 + } + + if reassembleTo == from { + return t + } + t[from].through = t[reassembleTo].through + copy(t[from+1:], t[reassembleTo+1:]) + return t[:len(t)-(reassembleTo-from)] +} + +func outdatedMetas(metas []bloomshipper.Meta) (outdated []bloomshipper.Meta, err error) { + // Sort metas descending by most recent source when checking + // for outdated metas (older metas are discarded if they don't change the range). + sort.Slice(metas, func(i, j int) bool { + a, err := metas[i].MostRecentSource() + if err != nil { + panic(err.Error()) + } + b, err := metas[j].MostRecentSource() + if err != nil { + panic(err.Error()) + } + return !a.TS.Before(b.TS) + }) + + var ( + tokenRange tsdbTokenRange + added bool + ) + + for _, meta := range metas { + mostRecent, err := meta.MostRecentSource() + if err != nil { + return nil, err + } + version := int(model.TimeFromUnixNano(mostRecent.TS.UnixNano())) + tokenRange, added = tokenRange.Add(version, meta.Bounds) + if !added { + outdated = append(outdated, meta) + } + } + + return outdated, nil + +} diff --git a/pkg/bloombuild/planner/versioned_range_test.go b/pkg/bloombuild/planner/versioned_range_test.go new file mode 100644 index 0000000000000..9827e9cd932c5 --- /dev/null +++ b/pkg/bloombuild/planner/versioned_range_test.go @@ -0,0 +1,323 @@ +package planner + +import ( + "testing" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" +) + +func Test_TsdbTokenRange(t *testing.T) { + type addition struct { + version int + bounds v1.FingerprintBounds + } + type exp struct { + added bool + err bool + } + mk := func(version int, min, max model.Fingerprint) addition { + return addition{version, v1.FingerprintBounds{Min: min, Max: max}} + } + tok := func(version int, through model.Fingerprint) tsdbToken { + return tsdbToken{version: version, through: through} + } + + for _, tc := range []struct { + desc string + additions []addition + exp []bool + result tsdbTokenRange + }{ + { + desc: "ascending versions", + additions: []addition{ + mk(1, 0, 10), + mk(2, 11, 20), + mk(3, 15, 25), + }, + exp: []bool{true, true, true}, + result: tsdbTokenRange{ + tok(1, 10), + tok(2, 14), + tok(3, 25), + }, + }, + { + desc: "descending versions", + additions: []addition{ + mk(3, 15, 25), + mk(2, 11, 20), + mk(1, 0, 10), + }, + exp: []bool{true, true, true}, + result: tsdbTokenRange{ + tok(1, 10), + tok(2, 14), + tok(3, 25), + }, + }, + { + desc: "simple", + additions: []addition{ + mk(3, 0, 10), + mk(2, 11, 20), + mk(1, 15, 25), + }, + exp: []bool{true, true, true}, + result: tsdbTokenRange{ + tok(3, 10), + tok(2, 20), + tok(1, 25), + }, + }, + { + desc: "simple replacement", + additions: []addition{ + mk(3, 10, 20), + mk(2, 0, 9), + }, + exp: []bool{true, true}, + result: tsdbTokenRange{ + tok(2, 9), + tok(3, 20), + }, + }, + { + desc: "complex", + additions: []addition{ + mk(5, 30, 50), + mk(4, 20, 45), + mk(3, 25, 70), + mk(2, 10, 20), + mk(1, 1, 5), + }, + exp: []bool{true, true, true, true, true, true}, + result: tsdbTokenRange{ + tok(-1, 0), + tok(1, 5), + tok(-1, 9), + tok(2, 19), + tok(4, 29), + tok(5, 50), + tok(3, 70), + }, + }, + { + desc: "neighboring upper range", + additions: []addition{ + mk(5, 30, 50), + mk(4, 51, 60), + }, + exp: []bool{true, true}, + result: tsdbTokenRange{ + tok(-1, 29), + tok(5, 50), + tok(4, 60), + }, + }, + { + desc: "non-neighboring upper range", + additions: []addition{ + mk(5, 30, 50), + mk(4, 55, 60), + }, + exp: []bool{true, true}, + result: tsdbTokenRange{ + tok(-1, 29), + tok(5, 50), + tok(-1, 54), + tok(4, 60), + }, + }, + { + desc: "earlier version within", + additions: []addition{ + mk(5, 30, 50), + mk(4, 40, 45), + }, + exp: []bool{true, false}, + result: tsdbTokenRange{ + tok(-1, 29), + tok(5, 50), + }, + }, + { + desc: "earlier version right overlapping", + additions: []addition{ + mk(5, 10, 20), + mk(4, 15, 25), + }, + exp: []bool{true, true}, + result: tsdbTokenRange{ + tok(-1, 9), + tok(5, 20), + tok(4, 25), + }, + }, + { + desc: "older version overlaps two", + additions: []addition{ + mk(3, 10, 20), + mk(2, 21, 30), + mk(1, 15, 25), + }, + exp: []bool{true, true, false}, + result: tsdbTokenRange{ + tok(-1, 9), + tok(3, 20), + tok(2, 30), + }, + }, + { + desc: "older version overlaps two w middle", + additions: []addition{ + mk(3, 10, 20), + mk(2, 22, 30), + mk(1, 15, 25), + }, + exp: []bool{true, true, true}, + result: tsdbTokenRange{ + tok(-1, 9), + tok(3, 20), + tok(1, 21), + tok(2, 30), + }, + }, + { + desc: "newer right overflow", + additions: []addition{ + mk(1, 30, 50), + mk(2, 40, 60), + }, + exp: []bool{true, true}, + result: tsdbTokenRange{ + tok(-1, 29), + tok(1, 39), + tok(2, 60), + }, + }, + { + desc: "newer right overflow superset", + additions: []addition{ + mk(1, 30, 50), + mk(2, 30, 60), + }, + exp: []bool{true, true}, + result: tsdbTokenRange{ + tok(-1, 29), + tok(2, 60), + }, + }, + { + desc: "newer right overflow partial", + additions: []addition{ + mk(1, 30, 50), + mk(2, 40, 60), + }, + exp: []bool{true, true}, + result: tsdbTokenRange{ + tok(-1, 29), + tok(1, 39), + tok(2, 60), + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + var ( + tr tsdbTokenRange + added bool + ) + for i, a := range tc.additions { + tr, added = tr.Add(a.version, a.bounds) + exp := tc.exp[i] + require.Equal(t, exp, added, "on iteration %d", i) + } + require.Equal(t, tc.result, tr) + }) + } +} + +func Test_OutdatedMetas(t *testing.T) { + gen := func(bounds v1.FingerprintBounds, tsdbTimes ...model.Time) (meta bloomshipper.Meta) { + for _, tsdbTime := range tsdbTimes { + meta.Sources = append(meta.Sources, tsdb.SingleTenantTSDBIdentifier{TS: tsdbTime.Time()}) + } + meta.Bounds = bounds + return meta + } + + for _, tc := range []struct { + desc string + metas []bloomshipper.Meta + exp []bloomshipper.Meta + }{ + { + desc: "no metas", + metas: nil, + exp: nil, + }, + { + desc: "single meta", + metas: []bloomshipper.Meta{ + gen(v1.NewBounds(0, 10), 0), + }, + exp: nil, + }, + { + desc: "single outdated meta", + metas: []bloomshipper.Meta{ + gen(v1.NewBounds(0, 10), 0), + gen(v1.NewBounds(0, 10), 1), + }, + exp: []bloomshipper.Meta{ + gen(v1.NewBounds(0, 10), 0), + }, + }, + { + desc: "single outdated via partitions", + metas: []bloomshipper.Meta{ + gen(v1.NewBounds(0, 5), 0), + gen(v1.NewBounds(6, 10), 0), + gen(v1.NewBounds(0, 10), 1), + }, + exp: []bloomshipper.Meta{ + gen(v1.NewBounds(6, 10), 0), + gen(v1.NewBounds(0, 5), 0), + }, + }, + { + desc: "same tsdb versions", + metas: []bloomshipper.Meta{ + gen(v1.NewBounds(0, 5), 0), + gen(v1.NewBounds(6, 10), 0), + gen(v1.NewBounds(0, 10), 1), + }, + exp: []bloomshipper.Meta{ + gen(v1.NewBounds(6, 10), 0), + gen(v1.NewBounds(0, 5), 0), + }, + }, + { + desc: "multi version ordering", + metas: []bloomshipper.Meta{ + gen(v1.NewBounds(0, 5), 0), + gen(v1.NewBounds(0, 10), 1), // only part of the range is outdated, must keep + gen(v1.NewBounds(8, 10), 2), + }, + exp: []bloomshipper.Meta{ + gen(v1.NewBounds(0, 5), 0), + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + outdated, err := outdatedMetas(tc.metas) + require.NoError(t, err) + require.Equal(t, tc.exp, outdated) + }) + } +} diff --git a/pkg/storage/stores/shipper/bloomshipper/store.go b/pkg/storage/stores/shipper/bloomshipper/store.go index 9b18427bacf10..8b2ff3365d766 100644 --- a/pkg/storage/stores/shipper/bloomshipper/store.go +++ b/pkg/storage/stores/shipper/bloomshipper/store.go @@ -113,6 +113,28 @@ func (b *bloomStoreEntry) ResolveMetas(ctx context.Context, params MetaSearchPar return [][]MetaRef{refs}, []*Fetcher{b.fetcher}, nil } +// FilterMetasOverlappingBounds filters metas that are within the given bounds. +// the input metas are expected to be sorted by fingerprint. +func FilterMetasOverlappingBounds(metas []Meta, bounds v1.FingerprintBounds) []Meta { + withinBounds := make([]Meta, 0, len(metas)) + for _, meta := range metas { + // We can stop iterating once we find an item greater + // than the keyspace we're looking for + if bounds.Cmp(meta.Bounds.Min) == v1.After { + break + } + + // Only check keyspace for now, because we don't have start/end timestamps in the refs + if !bounds.Overlaps(meta.Bounds) { + continue + } + + withinBounds = append(withinBounds, meta) + } + + return withinBounds +} + // FetchMetas implements store. func (b *bloomStoreEntry) FetchMetas(ctx context.Context, params MetaSearchParams) ([]Meta, error) { logger := spanlogger.FromContext(ctx)