diff --git a/modules/generator/processor/localblocks/processor.go b/modules/generator/processor/localblocks/processor.go index 59b671fc325..9ccb6f36b8a 100644 --- a/modules/generator/processor/localblocks/processor.go +++ b/modules/generator/processor/localblocks/processor.go @@ -634,12 +634,25 @@ func (p *Processor) deleteOldBlocks() (err error) { } for id, b := range p.completeBlocks { + if !p.Cfg.FlushToStorage { + if b.BlockMeta().EndTime.Before(before) { + level.Info(p.logger).Log("msg", "deleting complete block", "block", id.String()) + err = p.wal.LocalBackend().ClearBlock(id, p.tenant) + if err != nil { + return err + } + delete(p.completeBlocks, id) + } + continue + } + flushedTime := b.FlushedTime() if flushedTime.IsZero() { continue } if flushedTime.Add(p.Cfg.CompleteBlockTimeout).Before(time.Now()) { + level.Info(p.logger).Log("msg", "deleting flushed complete block", "block", id.String()) err = p.wal.LocalBackend().ClearBlock(id, p.tenant) if err != nil { return err @@ -773,12 +786,15 @@ func (p *Processor) reloadBlocks() error { if err != nil { return err } + level.Info(p.logger).Log("msg", "reloading wal blocks", "count", len(walBlocks)) for _, blk := range walBlocks { meta := blk.BlockMeta() if meta.TenantID == p.tenant { + level.Info(p.logger).Log("msg", "reloading wal block", "block", meta.BlockID.String()) p.walBlocks[blk.BlockMeta().BlockID] = blk } } + level.Info(p.logger).Log("msg", "reloaded wal blocks", "count", len(p.walBlocks)) // ------------------------------------ // Complete blocks @@ -791,6 +807,7 @@ func (p *Processor) reloadBlocks() error { return err } if len(tenants) == 0 { + level.Info(p.logger).Log("msg", "no tenants found, skipping complete block replay") return nil } @@ -798,8 +815,10 @@ func (p *Processor) reloadBlocks() error { if err != nil { return err } + level.Info(p.logger).Log("msg", "reloading complete blocks", "count", len(ids)) for _, id := range ids { + level.Info(p.logger).Log("msg", "reloading complete block", "block", id.String()) meta, err := r.BlockMeta(ctx, id, t) var clearBlock bool @@ -811,6 +830,7 @@ func (p *Processor) reloadBlocks() error { } if clearBlock { + level.Info(p.logger).Log("msg", "clearing block", "block", id.String(), "err", err) // Partially written block, delete and continue err = l.ClearBlock(id, t) if err != nil { @@ -827,11 +847,13 @@ func (p *Processor) reloadBlocks() error { if err != nil { return err } + level.Info(p.logger).Log("msg", "reloaded complete block", "block", id.String()) lb := ingester.NewLocalBlock(ctx, blk, l) p.completeBlocks[id] = lb - if lb.FlushedTime().IsZero() { + if p.Cfg.FlushToStorage && lb.FlushedTime().IsZero() { + level.Info(p.logger).Log("msg", "queueing reloaded block for flushing", "block", id.String()) if _, err := p.flushqueue.Enqueue(newFlushOp(id)); err != nil { _ = level.Error(p.logger).Log("msg", "local blocks processor failed to enqueue block for flushing during replay", "err", err) }