From efecf8617f836a47b0f52d566fbda7ad401a5dd7 Mon Sep 17 00:00:00 2001 From: Andy Asp Date: Thu, 3 Oct 2024 18:08:41 -0400 Subject: [PATCH 01/12] WIP: Split blocks tool --- pkg/storage/tsdb/block/meta.go | 1 + tools/splitblocks/.gitignore | 1 + tools/splitblocks/main.go | 242 +++++++++++++++++++++++++++++++++ 3 files changed, 244 insertions(+) create mode 100644 tools/splitblocks/.gitignore create mode 100644 tools/splitblocks/main.go diff --git a/pkg/storage/tsdb/block/meta.go b/pkg/storage/tsdb/block/meta.go index 1927466c9b8..faaa4b0f9c0 100644 --- a/pkg/storage/tsdb/block/meta.go +++ b/pkg/storage/tsdb/block/meta.go @@ -30,6 +30,7 @@ const ( CompactorRepairSource SourceType = "compactor.repair" BucketRepairSource SourceType = "bucket.repair" BlockBuilderSource SourceType = "block-builder" + SplitBlocksSource SourceType = "split-blocks" TestSource SourceType = "test" ) diff --git a/tools/splitblocks/.gitignore b/tools/splitblocks/.gitignore new file mode 100644 index 00000000000..cf4a4f5169a --- /dev/null +++ b/tools/splitblocks/.gitignore @@ -0,0 +1 @@ +splitblocks diff --git a/tools/splitblocks/main.go b/tools/splitblocks/main.go new file mode 100644 index 00000000000..3f668369bde --- /dev/null +++ b/tools/splitblocks/main.go @@ -0,0 +1,242 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package main + +import ( + "context" + "flag" + "fmt" + "os" + "os/signal" + "path" + "slices" + "syscall" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/concurrency" + "github.com/grafana/dskit/flagext" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/thanos-io/objstore" + + "github.com/grafana/mimir/pkg/storage/bucket" + "github.com/grafana/mimir/pkg/storage/tsdb" + "github.com/grafana/mimir/pkg/storage/tsdb/block" + "github.com/grafana/mimir/pkg/util" +) + +type config struct { + bucket bucket.Config + blockRanges tsdb.DurationList + outputDir string + tenantConcurrency int + blockConcurrency int + includeTenants flagext.StringSliceCSV + excludeTenants flagext.StringSliceCSV + dryRun bool +} + +func (c *config) registerFlags(f *flag.FlagSet) { + c.bucket.RegisterFlags(flag.CommandLine) + c.blockRanges = tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour} + flag.Var(&c.blockRanges, "block-ranges", "List of compaction time ranges") + f.StringVar(&c.outputDir, "output-dir", "", "The output directory where split blocks will be written") + f.IntVar(&c.blockConcurrency, "block-concurrency", 5, "How many blocks can be split at once per tenant") + f.IntVar(&c.tenantConcurrency, "tenant-concurrency", 1, "How many tenants can be processed concurrently") + f.Var(&c.includeTenants, "include-tenants", "A comma separated list of what tenants to target") + f.Var(&c.excludeTenants, "exclude-tenants", "A comma separated list of what tenants to ignore. Has precedence over included tenants") + f.BoolVar(&c.dryRun, "dry-run", false, "If set blocks are not downloaded and splits are not performed; only what would happen is logged") +} + +func (c *config) validate() error { + if len(c.blockRanges) == 0 { + return fmt.Errorf("block-ranges must not be empty") + } + for _, blockRange := range c.blockRanges { + if blockRange <= 0 { + return fmt.Errorf("block-ranges must only contain positive durations") + } + } + if c.outputDir == "" { + return fmt.Errorf("output-dir is required") + } + if c.tenantConcurrency < 1 { + return fmt.Errorf("tenant-concurrency must be positive") + } + if c.blockConcurrency < 1 { + return fmt.Errorf("block-concurrency must be positive") + } + return nil +} + +func main() { + // Clean up all flags registered via init() methods of 3rd-party libraries. + flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError) + + cfg := config{} + cfg.registerFlags(flag.CommandLine) + + // Parse CLI arguments. + if err := flagext.ParseFlagsWithoutArguments(flag.CommandLine); err != nil { + fmt.Fprintln(os.Stderr, err.Error()) + os.Exit(1) + } + + if err := cfg.validate(); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + + logger := log.NewLogfmtLogger(os.Stdout) + logger = log.With(logger, "ts", log.DefaultTimestampUTC) + + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + if err := splitBlocks(ctx, cfg, logger); err != nil { + fmt.Fprintln(os.Stderr, err.Error()) + os.Exit(1) + } +} + +func splitBlocks(ctx context.Context, cfg config, logger log.Logger) error { + allowedTenants := util.NewAllowedTenants(cfg.includeTenants, cfg.excludeTenants) + slices.Sort(cfg.blockRanges) + maxDuration := cfg.blockRanges[len(cfg.blockRanges)-1] + + bkt, err := bucket.NewClient(ctx, cfg.bucket, "bucket", logger, nil) + if err != nil { + return errors.Wrap(err, "failed to create bucket") + } + tenants, err := tsdb.ListUsers(ctx, bkt) + if err != nil { + return errors.Wrap(err, "failed to list tenants") + } + + return concurrency.ForEachUser(ctx, tenants, cfg.tenantConcurrency, func(ctx context.Context, tenantID string) error { + if !allowedTenants.IsAllowed(tenantID) { + return nil + } + logger := log.With(logger, "tenantID", tenantID) + + blocks, err := listBlocksForTenant(ctx, bkt, tenantID) + if err != nil { + level.Error(logger).Log("msg", "failed to list blocks for tenant", "err", err) + return errors.Wrapf(err, "failed to list blocks for tenant %s", tenantID) + } + + bkt := objstore.NewPrefixedBucket(bkt, tenantID) + + return concurrency.ForEachUser(ctx, blocks, cfg.blockConcurrency, func(ctx context.Context, blockString string) error { + blockID, err := ulid.Parse(blockString) + if err != nil { + return err + } + + logger := log.With(logger, "block", blockString) + blockMeta, err := block.DownloadMeta(ctx, logger, bkt, blockID) + if err != nil { + level.Error(logger).Log("msg", "failed to read block's meta.json file", "err", err) + return err + } + + blockMinTime := time.Unix(0, blockMeta.MinTime*int64(time.Millisecond)).UTC() + blockMaxTime := time.Unix(0, blockMeta.MaxTime*int64(time.Millisecond)).UTC() + logger = log.With(logger, "block_min_time", blockMinTime, "block_max_time", blockMaxTime) + + blockDuration := blockMaxTime.Sub(blockMinTime) + if blockDuration < 0 { + level.Error(logger).Log("msg", "block has an invalid minTime greater than maxTime") + return fmt.Errorf("block has an invalid minTime greater than maxTime") + } + if blockDuration <= maxDuration { + level.Debug(logger).Log("msg", "block does not need to be split") + return nil + } + + splits := splitDuration(blockDuration, cfg.blockRanges) + + if cfg.dryRun { + level.Info(logger).Log("msg", "dry run: would split block", "splits", splits) + return nil + } + + if err := splitBlock(ctx, cfg, bkt, tenantID, blockMeta, splits, logger); err != nil { + level.Error(logger).Log("msg", "failed to split block", "err", err) + return err + } + + level.Info(logger).Log("msg", "block split successfully", "dryRun", cfg.dryRun) + return nil + }) + }) +} + +func listBlocksForTenant(ctx context.Context, bkt objstore.Bucket, tenantID string) ([]string, error) { + var blocks []string + err := bkt.Iter(ctx, tenantID+objstore.DirDelim, func(name string) error { + if block, ok := block.IsBlockDir(name); ok { + blocks = append(blocks, block.String()) + } + return nil + }) + if err != nil { + return nil, err + } + + return blocks, nil +} + +// TODO: Boundary alignment? Confusing with configurable block ranges that may differ +func splitDuration(blockDuration time.Duration, ranges []time.Duration) []time.Duration { + var durations []time.Duration + rangeIndex := len(ranges) - 1 + for blockDuration > 0 { + for ranges[rangeIndex] > blockDuration && rangeIndex > 0 { + rangeIndex-- + } + duration := min(blockDuration, ranges[rangeIndex]) + durations = append(durations, duration) + blockDuration -= duration + } + return durations +} + +// splitBlock downloads the source block to the output directory, then generates new blocks that are within cfg.blockRanges durations. +// After all the splits succeed the original source block is removed from the output directory. +func splitBlock(ctx context.Context, cfg config, bkt objstore.Bucket, tenantID string, meta block.Meta, splits []time.Duration, logger log.Logger) error { + tenantDir := path.Join(cfg.outputDir, tenantID) + + originalBlockDir := path.Join(tenantDir, meta.ULID.String()) + if err := block.Download(ctx, logger, bkt, meta.ULID, originalBlockDir); err != nil { + return err + } + + minTime := meta.MinTime + for _, split := range splits { + maxTime := minTime + split.Milliseconds() + + // TODO: I'm not sure if this works because chunks can cross block min/max boundaries + // The idea was to inject a modified meta to try to abuse the repair into removing the now "outside" chunks + meta.MinTime = minTime + meta.MaxTime = maxTime + if err := meta.WriteToDir(logger, originalBlockDir); err != nil { + return errors.Wrap(err, "failed injecting meta for split") + } + splitID, err := block.Repair(ctx, logger, tenantDir, meta.ULID, block.SplitBlocksSource, block.IgnoreCompleteOutsideChunk, block.IgnoreIssue347OutsideChunk) + if err != nil { + return errors.Wrap(err, "failed while splitting block") + } + + level.Info(logger).Log("msg", "created block from split", "splitID", splitID) + minTime = maxTime + 1 + } + + if err := os.RemoveAll(originalBlockDir); err != nil { + return errors.Wrap(err, "failed to clean up original block directory after splitting block") + } + + return nil +} From 1e5da0d8b5b195230e97f2a997aec5d84a10f55a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Fri, 4 Oct 2024 12:16:46 +0200 Subject: [PATCH 02/12] Split blocks by single duration. Split blocks at duration boundaries. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- tools/splitblocks/main.go | 74 ++++++++++++++++----------------------- 1 file changed, 30 insertions(+), 44 deletions(-) diff --git a/tools/splitblocks/main.go b/tools/splitblocks/main.go index 3f668369bde..ee409c8196d 100644 --- a/tools/splitblocks/main.go +++ b/tools/splitblocks/main.go @@ -9,7 +9,6 @@ import ( "os" "os/signal" "path" - "slices" "syscall" "time" @@ -19,6 +18,7 @@ import ( "github.com/grafana/dskit/flagext" "github.com/oklog/ulid" "github.com/pkg/errors" + "github.com/prometheus/prometheus/model/timestamp" "github.com/thanos-io/objstore" "github.com/grafana/mimir/pkg/storage/bucket" @@ -29,35 +29,35 @@ import ( type config struct { bucket bucket.Config - blockRanges tsdb.DurationList outputDir string tenantConcurrency int blockConcurrency int includeTenants flagext.StringSliceCSV excludeTenants flagext.StringSliceCSV dryRun bool + maxDuration time.Duration } func (c *config) registerFlags(f *flag.FlagSet) { c.bucket.RegisterFlags(flag.CommandLine) - c.blockRanges = tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour} - flag.Var(&c.blockRanges, "block-ranges", "List of compaction time ranges") f.StringVar(&c.outputDir, "output-dir", "", "The output directory where split blocks will be written") f.IntVar(&c.blockConcurrency, "block-concurrency", 5, "How many blocks can be split at once per tenant") f.IntVar(&c.tenantConcurrency, "tenant-concurrency", 1, "How many tenants can be processed concurrently") f.Var(&c.includeTenants, "include-tenants", "A comma separated list of what tenants to target") f.Var(&c.excludeTenants, "exclude-tenants", "A comma separated list of what tenants to ignore. Has precedence over included tenants") f.BoolVar(&c.dryRun, "dry-run", false, "If set blocks are not downloaded and splits are not performed; only what would happen is logged") + f.DurationVar(&c.maxDuration, "max-block-duration", 24*time.Hour, "Max block duration, blocks larger than this or crossing duration boundary are split.") } func (c *config) validate() error { - if len(c.blockRanges) == 0 { - return fmt.Errorf("block-ranges must not be empty") + if c.maxDuration < 2*time.Hour { + return fmt.Errorf("max-block-duration must be at least 2 hours") } - for _, blockRange := range c.blockRanges { - if blockRange <= 0 { - return fmt.Errorf("block-ranges must only contain positive durations") - } + if c.maxDuration.Truncate(time.Hour) != c.maxDuration { + return fmt.Errorf("max-block-duration should be aligned to hours") + } + if 24*time.Hour%c.maxDuration.Truncate(time.Hour) != 0 { + return fmt.Errorf("max-block-duration should divide 24h without remainder") } if c.outputDir == "" { return fmt.Errorf("output-dir is required") @@ -103,8 +103,6 @@ func main() { func splitBlocks(ctx context.Context, cfg config, logger log.Logger) error { allowedTenants := util.NewAllowedTenants(cfg.includeTenants, cfg.excludeTenants) - slices.Sort(cfg.blockRanges) - maxDuration := cfg.blockRanges[len(cfg.blockRanges)-1] bkt, err := bucket.NewClient(ctx, cfg.bucket, "bucket", logger, nil) if err != nil { @@ -142,28 +140,27 @@ func splitBlocks(ctx context.Context, cfg config, logger log.Logger) error { return err } - blockMinTime := time.Unix(0, blockMeta.MinTime*int64(time.Millisecond)).UTC() - blockMaxTime := time.Unix(0, blockMeta.MaxTime*int64(time.Millisecond)).UTC() - logger = log.With(logger, "block_min_time", blockMinTime, "block_max_time", blockMaxTime) + blockMinTime := timestamp.Time(blockMeta.MinTime) + blockMaxTime := timestamp.Time(blockMeta.MaxTime) + level.Info(logger).Log("block_min_time", blockMinTime, "block_max_time", blockMaxTime) - blockDuration := blockMaxTime.Sub(blockMinTime) - if blockDuration < 0 { + if blockMinTime.After(blockMaxTime) { level.Error(logger).Log("msg", "block has an invalid minTime greater than maxTime") return fmt.Errorf("block has an invalid minTime greater than maxTime") } - if blockDuration <= maxDuration { - level.Debug(logger).Log("msg", "block does not need to be split") + + allowedMaxTime := blockMinTime.Truncate(cfg.maxDuration).Add(cfg.maxDuration) + if !blockMaxTime.After(allowedMaxTime) { + level.Info(logger).Log("msg", "block does not need to be split") return nil } - splits := splitDuration(blockDuration, cfg.blockRanges) - if cfg.dryRun { - level.Info(logger).Log("msg", "dry run: would split block", "splits", splits) + level.Info(logger).Log("msg", "dry run: would split block") return nil } - if err := splitBlock(ctx, cfg, bkt, tenantID, blockMeta, splits, logger); err != nil { + if err := splitBlock(ctx, cfg, bkt, tenantID, blockMeta, cfg.maxDuration, logger); err != nil { level.Error(logger).Log("msg", "failed to split block", "err", err) return err } @@ -189,24 +186,9 @@ func listBlocksForTenant(ctx context.Context, bkt objstore.Bucket, tenantID stri return blocks, nil } -// TODO: Boundary alignment? Confusing with configurable block ranges that may differ -func splitDuration(blockDuration time.Duration, ranges []time.Duration) []time.Duration { - var durations []time.Duration - rangeIndex := len(ranges) - 1 - for blockDuration > 0 { - for ranges[rangeIndex] > blockDuration && rangeIndex > 0 { - rangeIndex-- - } - duration := min(blockDuration, ranges[rangeIndex]) - durations = append(durations, duration) - blockDuration -= duration - } - return durations -} - // splitBlock downloads the source block to the output directory, then generates new blocks that are within cfg.blockRanges durations. // After all the splits succeed the original source block is removed from the output directory. -func splitBlock(ctx context.Context, cfg config, bkt objstore.Bucket, tenantID string, meta block.Meta, splits []time.Duration, logger log.Logger) error { +func splitBlock(ctx context.Context, cfg config, bkt objstore.Bucket, tenantID string, meta block.Meta, maxDuration time.Duration, logger log.Logger) error { tenantDir := path.Join(cfg.outputDir, tenantID) originalBlockDir := path.Join(tenantDir, meta.ULID.String()) @@ -214,12 +196,16 @@ func splitBlock(ctx context.Context, cfg config, bkt objstore.Bucket, tenantID s return err } + origBlockMaxTime := meta.MaxTime minTime := meta.MinTime - for _, split := range splits { - maxTime := minTime + split.Milliseconds() + for minTime < origBlockMaxTime { + // Max time cannot cross maxDuration boundary, but also should not be greater than original max time. + maxTime := min(timestamp.Time(minTime).Truncate(cfg.maxDuration).Add(cfg.maxDuration).UnixMilli(), origBlockMaxTime) + + level.Info(logger).Log("msg", "splitting block", "minTime", timestamp.Time(minTime), "maxTime", timestamp.Time(maxTime)) - // TODO: I'm not sure if this works because chunks can cross block min/max boundaries - // The idea was to inject a modified meta to try to abuse the repair into removing the now "outside" chunks + // Inject a modified meta and abuse the repair into removing the now "outside" chunks. + // Chunks that cross boundaries are included in multiple blocks. meta.MinTime = minTime meta.MaxTime = maxTime if err := meta.WriteToDir(logger, originalBlockDir); err != nil { @@ -231,7 +217,7 @@ func splitBlock(ctx context.Context, cfg config, bkt objstore.Bucket, tenantID s } level.Info(logger).Log("msg", "created block from split", "splitID", splitID) - minTime = maxTime + 1 + minTime = maxTime } if err := os.RemoveAll(originalBlockDir); err != nil { From 2cae34c473d2adcb4e2c258ef575e9b2ed2ab9b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Fri, 4 Oct 2024 12:39:52 +0200 Subject: [PATCH 03/12] Log split block stats. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- tools/splitblocks/main.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tools/splitblocks/main.go b/tools/splitblocks/main.go index ee409c8196d..d5a1b54cfce 100644 --- a/tools/splitblocks/main.go +++ b/tools/splitblocks/main.go @@ -216,7 +216,12 @@ func splitBlock(ctx context.Context, cfg config, bkt objstore.Bucket, tenantID s return errors.Wrap(err, "failed while splitting block") } - level.Info(logger).Log("msg", "created block from split", "splitID", splitID) + splitMeta, err := block.ReadMetaFromDir(path.Join(tenantDir, splitID.String())) + if err != nil { + return errors.Wrap(err, "failed while reading meta.json from split block") + } + + level.Info(logger).Log("msg", "created block from split", "minTime", timestamp.Time(minTime), "maxTime", timestamp.Time(maxTime), "splitID", splitID, "series", splitMeta.Stats.NumSeries, "chunks", splitMeta.Stats.NumChunks, "samples", splitMeta.Stats.NumSamples) minTime = maxTime } From 05ace990d55ffa20c702c8aa81c623d391906050 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Fri, 4 Oct 2024 14:36:44 +0200 Subject: [PATCH 04/12] Extract splitLocalBlock. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- tools/splitblocks/main.go | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/tools/splitblocks/main.go b/tools/splitblocks/main.go index d5a1b54cfce..f4fea85184d 100644 --- a/tools/splitblocks/main.go +++ b/tools/splitblocks/main.go @@ -9,6 +9,7 @@ import ( "os" "os/signal" "path" + "path/filepath" "syscall" "time" @@ -160,7 +161,7 @@ func splitBlocks(ctx context.Context, cfg config, logger log.Logger) error { return nil } - if err := splitBlock(ctx, cfg, bkt, tenantID, blockMeta, cfg.maxDuration, logger); err != nil { + if err := splitBlock(ctx, cfg, bkt, tenantID, blockMeta, logger); err != nil { level.Error(logger).Log("msg", "failed to split block", "err", err) return err } @@ -188,19 +189,23 @@ func listBlocksForTenant(ctx context.Context, bkt objstore.Bucket, tenantID stri // splitBlock downloads the source block to the output directory, then generates new blocks that are within cfg.blockRanges durations. // After all the splits succeed the original source block is removed from the output directory. -func splitBlock(ctx context.Context, cfg config, bkt objstore.Bucket, tenantID string, meta block.Meta, maxDuration time.Duration, logger log.Logger) error { - tenantDir := path.Join(cfg.outputDir, tenantID) +func splitBlock(ctx context.Context, cfg config, bkt objstore.Bucket, tenantID string, meta block.Meta, logger log.Logger) error { + tenantDir := filepath.Join(cfg.outputDir, tenantID) - originalBlockDir := path.Join(tenantDir, meta.ULID.String()) + originalBlockDir := filepath.Join(tenantDir, meta.ULID.String()) if err := block.Download(ctx, logger, bkt, meta.ULID, originalBlockDir); err != nil { return err } + return splitLocalBlock(ctx, tenantDir, originalBlockDir, meta, cfg.maxDuration, logger) +} + +func splitLocalBlock(ctx context.Context, parentDir, blockDir string, meta block.Meta, maxDuration time.Duration, logger log.Logger) error { origBlockMaxTime := meta.MaxTime minTime := meta.MinTime for minTime < origBlockMaxTime { // Max time cannot cross maxDuration boundary, but also should not be greater than original max time. - maxTime := min(timestamp.Time(minTime).Truncate(cfg.maxDuration).Add(cfg.maxDuration).UnixMilli(), origBlockMaxTime) + maxTime := min(timestamp.Time(minTime).Truncate(maxDuration).Add(maxDuration).UnixMilli(), origBlockMaxTime) level.Info(logger).Log("msg", "splitting block", "minTime", timestamp.Time(minTime), "maxTime", timestamp.Time(maxTime)) @@ -208,15 +213,15 @@ func splitBlock(ctx context.Context, cfg config, bkt objstore.Bucket, tenantID s // Chunks that cross boundaries are included in multiple blocks. meta.MinTime = minTime meta.MaxTime = maxTime - if err := meta.WriteToDir(logger, originalBlockDir); err != nil { + if err := meta.WriteToDir(logger, blockDir); err != nil { return errors.Wrap(err, "failed injecting meta for split") } - splitID, err := block.Repair(ctx, logger, tenantDir, meta.ULID, block.SplitBlocksSource, block.IgnoreCompleteOutsideChunk, block.IgnoreIssue347OutsideChunk) + splitID, err := block.Repair(ctx, logger, parentDir, meta.ULID, block.SplitBlocksSource, block.IgnoreCompleteOutsideChunk, block.IgnoreIssue347OutsideChunk) if err != nil { return errors.Wrap(err, "failed while splitting block") } - splitMeta, err := block.ReadMetaFromDir(path.Join(tenantDir, splitID.String())) + splitMeta, err := block.ReadMetaFromDir(path.Join(parentDir, splitID.String())) if err != nil { return errors.Wrap(err, "failed while reading meta.json from split block") } @@ -225,7 +230,7 @@ func splitBlock(ctx context.Context, cfg config, bkt objstore.Bucket, tenantID s minTime = maxTime } - if err := os.RemoveAll(originalBlockDir); err != nil { + if err := os.RemoveAll(blockDir); err != nil { return errors.Wrap(err, "failed to clean up original block directory after splitting block") } From de0e6e87aa6d7fed665ca75c1397a6ff69404932 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Fri, 4 Oct 2024 15:55:56 +0200 Subject: [PATCH 05/12] Test splitting of the block. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/storage/tsdb/block/block_generator.go | 7 + tools/splitblocks/main.go | 17 +- tools/splitblocks/main_test.go | 205 ++++++++++++++++++++++ 3 files changed, 222 insertions(+), 7 deletions(-) create mode 100644 tools/splitblocks/main_test.go diff --git a/pkg/storage/tsdb/block/block_generator.go b/pkg/storage/tsdb/block/block_generator.go index 368e9d43e0e..d88ab990c4e 100644 --- a/pkg/storage/tsdb/block/block_generator.go +++ b/pkg/storage/tsdb/block/block_generator.go @@ -71,6 +71,8 @@ func GenerateBlockFromSpec(storageDir string, specs SeriesSpecs) (_ *Meta, retur blockID := ulid.MustNew(ulid.Now(), crypto_rand.Reader) blockDir := filepath.Join(storageDir, blockID.String()) + stats := tsdb.BlockStats{} + // Ensure series are sorted. sort.Slice(specs, func(i, j int) bool { return labels.Compare(specs[i].Labels, specs[j].Labels) < 0 @@ -109,11 +111,15 @@ func GenerateBlockFromSpec(storageDir string, specs SeriesSpecs) (_ *Meta, retur // Updates the Ref on each chunk. for _, series := range specs { + stats.NumSeries++ + // Ensure every chunk meta has chunk data. for _, c := range series.Chunks { if c.Chunk == nil { return nil, errors.Errorf("missing chunk data for series %s", series.Labels.String()) } + stats.NumChunks++ + stats.NumSamples += uint64(c.Chunk.NumSamples()) } if err := chunkw.WriteChunks(series.Chunks...); err != nil { @@ -172,6 +178,7 @@ func GenerateBlockFromSpec(storageDir string, specs SeriesSpecs) (_ *Meta, retur Sources: []ulid.ULID{blockID}, }, Version: 1, + Stats: stats, }, Thanos: ThanosMeta{ Version: ThanosVersion1, diff --git a/tools/splitblocks/main.go b/tools/splitblocks/main.go index f4fea85184d..e9ac91b4814 100644 --- a/tools/splitblocks/main.go +++ b/tools/splitblocks/main.go @@ -197,12 +197,14 @@ func splitBlock(ctx context.Context, cfg config, bkt objstore.Bucket, tenantID s return err } - return splitLocalBlock(ctx, tenantDir, originalBlockDir, meta, cfg.maxDuration, logger) + _, err := splitLocalBlock(ctx, tenantDir, originalBlockDir, meta, cfg.maxDuration, logger) + return err } -func splitLocalBlock(ctx context.Context, parentDir, blockDir string, meta block.Meta, maxDuration time.Duration, logger log.Logger) error { +func splitLocalBlock(ctx context.Context, parentDir, blockDir string, meta block.Meta, maxDuration time.Duration, logger log.Logger) ([]ulid.ULID, error) { origBlockMaxTime := meta.MaxTime minTime := meta.MinTime + result := []ulid.ULID(nil) for minTime < origBlockMaxTime { // Max time cannot cross maxDuration boundary, but also should not be greater than original max time. maxTime := min(timestamp.Time(minTime).Truncate(maxDuration).Add(maxDuration).UnixMilli(), origBlockMaxTime) @@ -214,25 +216,26 @@ func splitLocalBlock(ctx context.Context, parentDir, blockDir string, meta block meta.MinTime = minTime meta.MaxTime = maxTime if err := meta.WriteToDir(logger, blockDir); err != nil { - return errors.Wrap(err, "failed injecting meta for split") + return nil, errors.Wrap(err, "failed injecting meta for split") } splitID, err := block.Repair(ctx, logger, parentDir, meta.ULID, block.SplitBlocksSource, block.IgnoreCompleteOutsideChunk, block.IgnoreIssue347OutsideChunk) if err != nil { - return errors.Wrap(err, "failed while splitting block") + return nil, errors.Wrap(err, "failed while splitting block") } splitMeta, err := block.ReadMetaFromDir(path.Join(parentDir, splitID.String())) if err != nil { - return errors.Wrap(err, "failed while reading meta.json from split block") + return nil, errors.Wrap(err, "failed while reading meta.json from split block") } level.Info(logger).Log("msg", "created block from split", "minTime", timestamp.Time(minTime), "maxTime", timestamp.Time(maxTime), "splitID", splitID, "series", splitMeta.Stats.NumSeries, "chunks", splitMeta.Stats.NumChunks, "samples", splitMeta.Stats.NumSamples) + result = append(result, splitID) minTime = maxTime } if err := os.RemoveAll(blockDir); err != nil { - return errors.Wrap(err, "failed to clean up original block directory after splitting block") + return nil, errors.Wrap(err, "failed to clean up original block directory after splitting block") } - return nil + return result, nil } diff --git a/tools/splitblocks/main_test.go b/tools/splitblocks/main_test.go new file mode 100644 index 00000000000..d077577040b --- /dev/null +++ b/tools/splitblocks/main_test.go @@ -0,0 +1,205 @@ +package main + +import ( + "context" + "path/filepath" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/dskit/runutil" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/index" + "github.com/stretchr/testify/require" + + "github.com/grafana/mimir/pkg/storage/tsdb/block" +) + +func TestSplitLocalBlock(t *testing.T) { + dir := t.TempDir() + + startOfDay := time.Now().Truncate(24 * time.Hour) + + specs := []*block.SeriesSpec{ + { + Labels: labels.FromStrings("__name__", "1_series_with_one_chunk_with_samples_covering_multiple_days"), + Chunks: []chunks.Meta{ + must(chunks.ChunkFromSamples([]chunks.Sample{ + newSample(startOfDay.Add(10*time.Minute).UnixMilli(), 1, nil, nil), + newSample(startOfDay.Add(12*time.Hour).UnixMilli(), 2, nil, nil), + newSample(startOfDay.Add(24*time.Hour).UnixMilli(), 3, nil, nil), + newSample(startOfDay.Add(36*time.Hour).UnixMilli(), 4, nil, nil), + newSample(startOfDay.Add(48*time.Hour).UnixMilli(), 5, nil, nil), + })), + }, + }, + + { + Labels: labels.FromStrings("__name__", "2_series_with_multiple_chunks_not_crossing_24h_boundary"), + Chunks: []chunks.Meta{ + must(chunks.ChunkFromSamples([]chunks.Sample{ + newSample(startOfDay.UnixMilli(), 1, nil, nil), + newSample(startOfDay.Add(12*time.Hour).UnixMilli(), 2, nil, nil), + })), + must(chunks.ChunkFromSamples([]chunks.Sample{ + newSample(startOfDay.Add(24*time.Hour).UnixMilli(), 3, nil, nil), + newSample(startOfDay.Add(36*time.Hour).UnixMilli(), 4, nil, nil), + })), + must(chunks.ChunkFromSamples([]chunks.Sample{ + newSample(startOfDay.Add(48*time.Hour).UnixMilli(), 5, nil, nil), + })), + }, + }, + + { + Labels: labels.FromStrings("__name__", "3_series_with_samples_on_second_day"), + Chunks: []chunks.Meta{ + must(chunks.ChunkFromSamples([]chunks.Sample{ + newSample(startOfDay.Add(24*time.Hour).UnixMilli(), 1, nil, nil), + newSample(startOfDay.Add(25*time.Hour).UnixMilli(), 2, nil, nil), + newSample(startOfDay.Add(26*time.Hour).UnixMilli(), 3, nil, nil), + })), + }, + }, + } + + meta, err := block.GenerateBlockFromSpec(dir, specs) + require.NoError(t, err) + + blocks, err := splitLocalBlock(context.Background(), dir, filepath.Join(dir, meta.ULID.String()), *meta, 24*time.Hour, log.NewNopLogger()) + require.NoError(t, err) + + // We expect 3 blocks + require.Len(t, blocks, 3) + + // First block + { + spec := listSeriesAndChunksFromBlock(t, filepath.Join(dir, blocks[0].String())) + + // series 1 has its entire chunk in the first block + require.Equal(t, spec[0].Labels, labels.FromStrings("__name__", "1_series_with_one_chunk_with_samples_covering_multiple_days")) + require.Len(t, spec[0].Chunks, 1) + require.Equal(t, startOfDay.Add(10*time.Minute).UnixMilli(), spec[0].Chunks[0].MinTime) + require.Equal(t, startOfDay.Add(48*time.Hour).UnixMilli(), spec[0].Chunks[0].MaxTime) + + // Series 2 has only first chunk in the first block + require.Equal(t, spec[1].Labels, labels.FromStrings("__name__", "2_series_with_multiple_chunks_not_crossing_24h_boundary")) + require.Len(t, spec[1].Chunks, 1) + require.Equal(t, startOfDay.UnixMilli(), spec[1].Chunks[0].MinTime) + require.Equal(t, startOfDay.Add(12*time.Hour).UnixMilli(), spec[1].Chunks[0].MaxTime) + + // No more series. + require.Len(t, spec, 2) + } + + // Second block has all three series. + { + spec := listSeriesAndChunksFromBlock(t, filepath.Join(dir, blocks[1].String())) + + // series 1 has its entire chunk in the second block as well. + require.Equal(t, spec[0].Labels, labels.FromStrings("__name__", "1_series_with_one_chunk_with_samples_covering_multiple_days")) + require.Len(t, spec[0].Chunks, 1) + require.Equal(t, startOfDay.Add(10*time.Minute).UnixMilli(), spec[0].Chunks[0].MinTime) + require.Equal(t, startOfDay.Add(48*time.Hour).UnixMilli(), spec[0].Chunks[0].MaxTime) + + // Series 2 has only second chunk in the first block + require.Equal(t, spec[1].Labels, labels.FromStrings("__name__", "2_series_with_multiple_chunks_not_crossing_24h_boundary")) + require.Len(t, spec[1].Chunks, 1) + require.Equal(t, startOfDay.Add(24*time.Hour).UnixMilli(), spec[1].Chunks[0].MinTime) + require.Equal(t, startOfDay.Add(36*time.Hour).UnixMilli(), spec[1].Chunks[0].MaxTime) + + // Series 3 has its chunk in this block too + require.Equal(t, spec[2].Labels, labels.FromStrings("__name__", "3_series_with_samples_on_second_day")) + require.Len(t, spec[2].Chunks, 1) + require.Equal(t, startOfDay.Add(24*time.Hour).UnixMilli(), spec[2].Chunks[0].MinTime) + require.Equal(t, startOfDay.Add(26*time.Hour).UnixMilli(), spec[2].Chunks[0].MaxTime) + + // No more series. + require.Len(t, spec, 3) + } + + // Last block, only has series 1 and 2 again. + { + spec := listSeriesAndChunksFromBlock(t, filepath.Join(dir, blocks[2].String())) + + // series 1 has its entire chunk in the last block too. + require.Equal(t, spec[0].Labels, labels.FromStrings("__name__", "1_series_with_one_chunk_with_samples_covering_multiple_days")) + require.Len(t, spec[0].Chunks, 1) + require.Equal(t, startOfDay.Add(10*time.Minute).UnixMilli(), spec[0].Chunks[0].MinTime) + require.Equal(t, startOfDay.Add(48*time.Hour).UnixMilli(), spec[0].Chunks[0].MaxTime) + + // Series 2 has only last chunk in the first block + require.Equal(t, spec[1].Labels, labels.FromStrings("__name__", "2_series_with_multiple_chunks_not_crossing_24h_boundary")) + require.Len(t, spec[1].Chunks, 1) + require.Equal(t, startOfDay.Add(48*time.Hour).UnixMilli(), spec[1].Chunks[0].MinTime) + require.Equal(t, startOfDay.Add(48*time.Hour).UnixMilli(), spec[1].Chunks[0].MaxTime) + + // No more series. + require.Len(t, spec, 2) + } +} + +func listSeriesAndChunksFromBlock(t *testing.T, blockDir string) []*block.SeriesSpec { + allKey, allValue := index.AllPostingsKey() + r, err := index.NewFileReader(filepath.Join(blockDir, block.IndexFilename)) + require.NoError(t, err) + defer runutil.CloseWithErrCapture(&err, r, "gather index issue file reader") + + it, err := r.Postings(context.Background(), allKey, allValue) + require.NoError(t, err) + + result := []*block.SeriesSpec(nil) + + for it.Next() { + lbls := labels.ScratchBuilder{} + chks := []chunks.Meta(nil) + + ref := it.At() + + require.NoError(t, r.Series(ref, &lbls, &chks)) + + ss := block.SeriesSpec{ + Labels: lbls.Labels(), + Chunks: chks, + } + result = append(result, &ss) + } + + return result +} + +func must[T any](v T, err error) T { + if err != nil { + panic(err) + } + return v +} + +type sample struct { + t int64 + v float64 + h *histogram.Histogram + fh *histogram.FloatHistogram +} + +func newSample(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample { + return sample{t, v, h, fh} +} +func (s sample) T() int64 { return s.t } +func (s sample) F() float64 { return s.v } +func (s sample) H() *histogram.Histogram { return s.h } +func (s sample) FH() *histogram.FloatHistogram { return s.fh } + +func (s sample) Type() chunkenc.ValueType { + switch { + case s.h != nil: + return chunkenc.ValHistogram + case s.fh != nil: + return chunkenc.ValFloatHistogram + default: + return chunkenc.ValFloat + } +} From 43ffaa76e252d97fb3007a9c48a501899d69bf6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Fri, 4 Oct 2024 16:00:37 +0200 Subject: [PATCH 06/12] Fix wrong comment. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- tools/splitblocks/main_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/splitblocks/main_test.go b/tools/splitblocks/main_test.go index d077577040b..5446085a0a2 100644 --- a/tools/splitblocks/main_test.go +++ b/tools/splitblocks/main_test.go @@ -131,7 +131,7 @@ func TestSplitLocalBlock(t *testing.T) { require.Equal(t, startOfDay.Add(10*time.Minute).UnixMilli(), spec[0].Chunks[0].MinTime) require.Equal(t, startOfDay.Add(48*time.Hour).UnixMilli(), spec[0].Chunks[0].MaxTime) - // Series 2 has only last chunk in the first block + // Series 2 has only last chunk in the last block require.Equal(t, spec[1].Labels, labels.FromStrings("__name__", "2_series_with_multiple_chunks_not_crossing_24h_boundary")) require.Len(t, spec[1].Chunks, 1) require.Equal(t, startOfDay.Add(48*time.Hour).UnixMilli(), spec[1].Chunks[0].MinTime) From cdcbc03dada35880e935140cb651a61c10a7e410 Mon Sep 17 00:00:00 2001 From: Andy Asp Date: Fri, 4 Oct 2024 17:45:43 -0400 Subject: [PATCH 07/12] Remove tenant hierarchy, add documentation --- tools/splitblocks/README.md | 65 +++++++++++++++ tools/splitblocks/main.go | 161 +++++++++++++++++------------------- 2 files changed, 141 insertions(+), 85 deletions(-) create mode 100644 tools/splitblocks/README.md diff --git a/tools/splitblocks/README.md b/tools/splitblocks/README.md new file mode 100644 index 00000000000..2a29d6605c8 --- /dev/null +++ b/tools/splitblocks/README.md @@ -0,0 +1,65 @@ +# Split Blocks + +This program splits source blocks into new blocks where each spans at most a duration of time. For instance, it can create three 24 hour blocks from a single 72 hour block. + +Time boundaries are also considered when determining what to split. For instance, a block spanning `00:00`-`23:59` could not be split while a block spanning `12:00`-`11:59` (the next day) would be. + +Source blocks can be read from either object storage or a local filesystem. Blocks that are created are only written to a local filesystem. + +## Flags + +- `--output.dir` (required) The output directory where split blocks will be written on the local filesystem +- `--blocks` (optional) A comma separated list of blocks to target. If not provided, or empty, all blocks are considered +- `--block-concurrency` (optional) How many blocks can be split at once +- `--bucket-prefix` (optional) A prefix applied to the bucket path +- `--max-block-duration` (optional, defaults to `24h`) Max block duration, blocks larger than this or crossing a duration boundary are split +- `--dry-run` (optional) If set blocks are not downloaded (except metadata) and splits are not performed; only what would happen is logged + +## Running + +Running `go build .` in this directory builds the program. Then use an example below as a guide. + +### Splitting blocks from a local filesystem + +```bash +./splitblocks \ + --backend filesystem \ + --filesystem.dir \ + --output.dir \ + --dry-run +``` + +### Splitting blocks from Google Cloud Storage + +```bash +./splitblocks \ + --backend gcs \ + --gcs.bucket-name \ + --output.dir \ + --dry-run +``` + +### Splitting blocks from Azure Blob Storage + +```bash +./splitblocks \ + --backend azure \ + --azure.container-name \ + --azure.account-name \ + --azure.account-key \ + --output.dir \ + --dry-run +``` + +### Splitting blocks from Amazon Simple Storage Service + +```bash +./splitblocks \ + --backend s3 \ + --s3.bucket-name \ + --s3.access-key-id \ + --s3.secret-access-key \ + --s3.endpoint \ + --output.dir \ + --dry-run +``` diff --git a/tools/splitblocks/main.go b/tools/splitblocks/main.go index e9ac91b4814..a9f187ee2d0 100644 --- a/tools/splitblocks/main.go +++ b/tools/splitblocks/main.go @@ -23,49 +23,47 @@ import ( "github.com/thanos-io/objstore" "github.com/grafana/mimir/pkg/storage/bucket" - "github.com/grafana/mimir/pkg/storage/tsdb" "github.com/grafana/mimir/pkg/storage/tsdb/block" - "github.com/grafana/mimir/pkg/util" ) type config struct { - bucket bucket.Config - outputDir string - tenantConcurrency int - blockConcurrency int - includeTenants flagext.StringSliceCSV - excludeTenants flagext.StringSliceCSV - dryRun bool - maxDuration time.Duration + bucket bucket.Config + blocks flagext.StringSliceCSV + bucketPrefix string + outputDir string + blockConcurrency int + dryRun bool + maxBlockDuration time.Duration } func (c *config) registerFlags(f *flag.FlagSet) { - c.bucket.RegisterFlags(flag.CommandLine) - f.StringVar(&c.outputDir, "output-dir", "", "The output directory where split blocks will be written") - f.IntVar(&c.blockConcurrency, "block-concurrency", 5, "How many blocks can be split at once per tenant") - f.IntVar(&c.tenantConcurrency, "tenant-concurrency", 1, "How many tenants can be processed concurrently") - f.Var(&c.includeTenants, "include-tenants", "A comma separated list of what tenants to target") - f.Var(&c.excludeTenants, "exclude-tenants", "A comma separated list of what tenants to ignore. Has precedence over included tenants") - f.BoolVar(&c.dryRun, "dry-run", false, "If set blocks are not downloaded and splits are not performed; only what would happen is logged") - f.DurationVar(&c.maxDuration, "max-block-duration", 24*time.Hour, "Max block duration, blocks larger than this or crossing duration boundary are split.") + c.bucket.RegisterFlags(f) + f.Var(&c.blocks, "blocks", "An optional comma separated list of blocks to target. If not provided, or empty, all blocks are considered.") + f.StringVar(&c.bucketPrefix, "bucket-prefix", "", "An optional prefix applied to the bucket path") + f.StringVar(&c.outputDir, "output.dir", "", "The output directory where split blocks will be written") + f.IntVar(&c.blockConcurrency, "block-concurrency", 5, "How many blocks can be split at once") + f.BoolVar(&c.dryRun, "dry-run", false, "If set blocks are not downloaded (except metadata) and splits are not performed; only what would happen is logged") + f.DurationVar(&c.maxBlockDuration, "max-block-duration", 24*time.Hour, "Max block duration, blocks larger than this or crossing a duration boundary are split.") } func (c *config) validate() error { - if c.maxDuration < 2*time.Hour { + for _, blockID := range c.blocks { + if _, err := ulid.Parse(blockID); err != nil { + return errors.Wrap(err, "blocks contained an invalid block ID") + } + } + if c.maxBlockDuration < 2*time.Hour { return fmt.Errorf("max-block-duration must be at least 2 hours") } - if c.maxDuration.Truncate(time.Hour) != c.maxDuration { + if c.maxBlockDuration.Truncate(time.Hour) != c.maxBlockDuration { return fmt.Errorf("max-block-duration should be aligned to hours") } - if 24*time.Hour%c.maxDuration.Truncate(time.Hour) != 0 { + if 24*time.Hour%c.maxBlockDuration.Truncate(time.Hour) != 0 { return fmt.Errorf("max-block-duration should divide 24h without remainder") } if c.outputDir == "" { return fmt.Errorf("output-dir is required") } - if c.tenantConcurrency < 1 { - return fmt.Errorf("tenant-concurrency must be positive") - } if c.blockConcurrency < 1 { return fmt.Errorf("block-concurrency must be positive") } @@ -103,78 +101,73 @@ func main() { } func splitBlocks(ctx context.Context, cfg config, logger log.Logger) error { - allowedTenants := util.NewAllowedTenants(cfg.includeTenants, cfg.excludeTenants) - bkt, err := bucket.NewClient(ctx, cfg.bucket, "bucket", logger, nil) if err != nil { return errors.Wrap(err, "failed to create bucket") } - tenants, err := tsdb.ListUsers(ctx, bkt) - if err != nil { - return errors.Wrap(err, "failed to list tenants") + + // Helpful when not specifying -backend=filesystem (since it can use -filesystem.dir) + if cfg.bucketPrefix != "" { + bkt = bucket.NewPrefixedBucketClient(bkt, cfg.bucketPrefix) } - return concurrency.ForEachUser(ctx, tenants, cfg.tenantConcurrency, func(ctx context.Context, tenantID string) error { - if !allowedTenants.IsAllowed(tenantID) { - return nil + var blocks []string + if len(cfg.blocks) > 0 { + blocks = cfg.blocks + } else { + blocks, err = listBlocks(ctx, bkt) + if err != nil { + level.Error(logger).Log("msg", "failed to list blocks", "err", err) + return errors.Wrapf(err, "failed to list blocks") + } + } + + return concurrency.ForEachUser(ctx, blocks, cfg.blockConcurrency, func(ctx context.Context, blockString string) error { + blockID, err := ulid.Parse(blockString) + if err != nil { + return err } - logger := log.With(logger, "tenantID", tenantID) - blocks, err := listBlocksForTenant(ctx, bkt, tenantID) + logger := log.With(logger, "block", blockString) + blockMeta, err := block.DownloadMeta(ctx, logger, bkt, blockID) if err != nil { - level.Error(logger).Log("msg", "failed to list blocks for tenant", "err", err) - return errors.Wrapf(err, "failed to list blocks for tenant %s", tenantID) + level.Error(logger).Log("msg", "failed to read block's meta.json file", "err", err) + return err } - bkt := objstore.NewPrefixedBucket(bkt, tenantID) - - return concurrency.ForEachUser(ctx, blocks, cfg.blockConcurrency, func(ctx context.Context, blockString string) error { - blockID, err := ulid.Parse(blockString) - if err != nil { - return err - } - - logger := log.With(logger, "block", blockString) - blockMeta, err := block.DownloadMeta(ctx, logger, bkt, blockID) - if err != nil { - level.Error(logger).Log("msg", "failed to read block's meta.json file", "err", err) - return err - } - - blockMinTime := timestamp.Time(blockMeta.MinTime) - blockMaxTime := timestamp.Time(blockMeta.MaxTime) - level.Info(logger).Log("block_min_time", blockMinTime, "block_max_time", blockMaxTime) - - if blockMinTime.After(blockMaxTime) { - level.Error(logger).Log("msg", "block has an invalid minTime greater than maxTime") - return fmt.Errorf("block has an invalid minTime greater than maxTime") - } - - allowedMaxTime := blockMinTime.Truncate(cfg.maxDuration).Add(cfg.maxDuration) - if !blockMaxTime.After(allowedMaxTime) { - level.Info(logger).Log("msg", "block does not need to be split") - return nil - } - - if cfg.dryRun { - level.Info(logger).Log("msg", "dry run: would split block") - return nil - } - - if err := splitBlock(ctx, cfg, bkt, tenantID, blockMeta, logger); err != nil { - level.Error(logger).Log("msg", "failed to split block", "err", err) - return err - } - - level.Info(logger).Log("msg", "block split successfully", "dryRun", cfg.dryRun) + blockMinTime := timestamp.Time(blockMeta.MinTime) + blockMaxTime := timestamp.Time(blockMeta.MaxTime) + level.Info(logger).Log("block_min_time", blockMinTime, "block_max_time", blockMaxTime) + + if blockMinTime.After(blockMaxTime) { + level.Error(logger).Log("msg", "block has an invalid minTime greater than maxTime") + return fmt.Errorf("block has an invalid minTime greater than maxTime") + } + + allowedMaxTime := blockMinTime.Truncate(cfg.maxBlockDuration).Add(cfg.maxBlockDuration) + if !blockMaxTime.After(allowedMaxTime) { + level.Info(logger).Log("msg", "block does not need to be split") + return nil + } + + if cfg.dryRun { + level.Info(logger).Log("msg", "dry run: would split block") return nil - }) + } + + if err := splitBlock(ctx, cfg, bkt, blockMeta, logger); err != nil { + level.Error(logger).Log("msg", "failed to split block", "err", err) + return err + } + + level.Info(logger).Log("msg", "block split successfully") + return nil }) } -func listBlocksForTenant(ctx context.Context, bkt objstore.Bucket, tenantID string) ([]string, error) { +func listBlocks(ctx context.Context, bkt objstore.Bucket) ([]string, error) { var blocks []string - err := bkt.Iter(ctx, tenantID+objstore.DirDelim, func(name string) error { + err := bkt.Iter(ctx, "", func(name string) error { if block, ok := block.IsBlockDir(name); ok { blocks = append(blocks, block.String()) } @@ -189,15 +182,13 @@ func listBlocksForTenant(ctx context.Context, bkt objstore.Bucket, tenantID stri // splitBlock downloads the source block to the output directory, then generates new blocks that are within cfg.blockRanges durations. // After all the splits succeed the original source block is removed from the output directory. -func splitBlock(ctx context.Context, cfg config, bkt objstore.Bucket, tenantID string, meta block.Meta, logger log.Logger) error { - tenantDir := filepath.Join(cfg.outputDir, tenantID) - - originalBlockDir := filepath.Join(tenantDir, meta.ULID.String()) +func splitBlock(ctx context.Context, cfg config, bkt objstore.Bucket, meta block.Meta, logger log.Logger) error { + originalBlockDir := filepath.Join(cfg.outputDir, meta.ULID.String()) if err := block.Download(ctx, logger, bkt, meta.ULID, originalBlockDir); err != nil { return err } - _, err := splitLocalBlock(ctx, tenantDir, originalBlockDir, meta, cfg.maxDuration, logger) + _, err := splitLocalBlock(ctx, cfg.outputDir, originalBlockDir, meta, cfg.maxBlockDuration, logger) return err } From 4dff11306d8af4be6f61757e4c8a4293db471aa4 Mon Sep 17 00:00:00 2001 From: Andy Asp Date: Mon, 7 Oct 2024 13:03:02 -0400 Subject: [PATCH 08/12] Address Peter's review --- tools/splitblocks/README.md | 5 ++- tools/splitblocks/main.go | 76 +++++++++++++++++++++++++------------ 2 files changed, 54 insertions(+), 27 deletions(-) diff --git a/tools/splitblocks/README.md b/tools/splitblocks/README.md index 2a29d6605c8..904af3e1756 100644 --- a/tools/splitblocks/README.md +++ b/tools/splitblocks/README.md @@ -10,10 +10,11 @@ Source blocks can be read from either object storage or a local filesystem. Bloc - `--output.dir` (required) The output directory where split blocks will be written on the local filesystem - `--blocks` (optional) A comma separated list of blocks to target. If not provided, or empty, all blocks are considered -- `--block-concurrency` (optional) How many blocks can be split at once +- `--block-concurrency` (optional, defaults to `5`) How many blocks can be split concurrently - `--bucket-prefix` (optional) A prefix applied to the bucket path - `--max-block-duration` (optional, defaults to `24h`) Max block duration, blocks larger than this or crossing a duration boundary are split -- `--dry-run` (optional) If set blocks are not downloaded (except metadata) and splits are not performed; only what would happen is logged +- `--full` (optional) If set, blocks that do not need to be split are included in the output directory +- `--dry-run` (optional) If set, blocks are not downloaded (except metadata) and splits are not performed; only what would happen is logged ## Running diff --git a/tools/splitblocks/main.go b/tools/splitblocks/main.go index a9f187ee2d0..e51f89bcf93 100644 --- a/tools/splitblocks/main.go +++ b/tools/splitblocks/main.go @@ -32,18 +32,20 @@ type config struct { bucketPrefix string outputDir string blockConcurrency int + full bool dryRun bool maxBlockDuration time.Duration } func (c *config) registerFlags(f *flag.FlagSet) { c.bucket.RegisterFlags(f) - f.Var(&c.blocks, "blocks", "An optional comma separated list of blocks to target. If not provided, or empty, all blocks are considered.") + f.Var(&c.blocks, "blocks", "An optional comma separated list of blocks to target. If not provided, or empty, all blocks are considered") f.StringVar(&c.bucketPrefix, "bucket-prefix", "", "An optional prefix applied to the bucket path") f.StringVar(&c.outputDir, "output.dir", "", "The output directory where split blocks will be written") f.IntVar(&c.blockConcurrency, "block-concurrency", 5, "How many blocks can be split at once") + f.BoolVar(&c.full, "full", false, "If set blocks that do not need to be split are included in the output directory") f.BoolVar(&c.dryRun, "dry-run", false, "If set blocks are not downloaded (except metadata) and splits are not performed; only what would happen is logged") - f.DurationVar(&c.maxBlockDuration, "max-block-duration", 24*time.Hour, "Max block duration, blocks larger than this or crossing a duration boundary are split.") + f.DurationVar(&c.maxBlockDuration, "max-block-duration", 24*time.Hour, "Max block duration, blocks larger than this or crossing a duration boundary are split") } func (c *config) validate() error { @@ -106,29 +108,19 @@ func splitBlocks(ctx context.Context, cfg config, logger log.Logger) error { return errors.Wrap(err, "failed to create bucket") } - // Helpful when not specifying -backend=filesystem (since it can use -filesystem.dir) if cfg.bucketPrefix != "" { bkt = bucket.NewPrefixedBucketClient(bkt, cfg.bucketPrefix) } - var blocks []string - if len(cfg.blocks) > 0 { - blocks = cfg.blocks - } else { - blocks, err = listBlocks(ctx, bkt) - if err != nil { - level.Error(logger).Log("msg", "failed to list blocks", "err", err) - return errors.Wrapf(err, "failed to list blocks") - } + blockIDs, err := targetBlocks(ctx, cfg, bkt) + if err != nil { + return err } - return concurrency.ForEachUser(ctx, blocks, cfg.blockConcurrency, func(ctx context.Context, blockString string) error { - blockID, err := ulid.Parse(blockString) - if err != nil { - return err - } + return concurrency.ForEachJob(ctx, len(blockIDs), cfg.blockConcurrency, func(ctx context.Context, idx int) error { + blockID := blockIDs[idx] - logger := log.With(logger, "block", blockString) + logger := log.With(logger, "block", blockID.String()) blockMeta, err := block.DownloadMeta(ctx, logger, bkt, blockID) if err != nil { level.Error(logger).Log("msg", "failed to read block's meta.json file", "err", err) @@ -147,6 +139,16 @@ func splitBlocks(ctx context.Context, cfg config, logger log.Logger) error { allowedMaxTime := blockMinTime.Truncate(cfg.maxBlockDuration).Add(cfg.maxBlockDuration) if !blockMaxTime.After(allowedMaxTime) { level.Info(logger).Log("msg", "block does not need to be split") + if cfg.full { + if cfg.dryRun { + level.Info(logger).Log("msg", "dry run: would download block") + return nil + } + blockDir := filepath.Join(cfg.outputDir, blockID.String()) + if err := block.Download(ctx, logger, bkt, blockID, blockDir); err != nil { + return errors.Wrapf(err, "failed to download block") + } + } return nil } @@ -165,16 +167,32 @@ func splitBlocks(ctx context.Context, cfg config, logger log.Logger) error { }) } -func listBlocks(ctx context.Context, bkt objstore.Bucket) ([]string, error) { - var blocks []string +func targetBlocks(ctx context.Context, cfg config, bkt objstore.Bucket) ([]ulid.ULID, error) { + if len(cfg.blocks) == 0 { + return listBlocks(ctx, bkt) + } + + blocks := make([]ulid.ULID, 0, len(cfg.blocks)) + for _, block := range cfg.blocks { + blockID, err := ulid.Parse(block) + if err != nil { + return nil, errors.Wrapf(err, "a blockID in --blocks was invalid: %s", block) + } + blocks = append(blocks, blockID) + } + return blocks, nil +} + +func listBlocks(ctx context.Context, bkt objstore.Bucket) ([]ulid.ULID, error) { + var blocks []ulid.ULID err := bkt.Iter(ctx, "", func(name string) error { if block, ok := block.IsBlockDir(name); ok { - blocks = append(blocks, block.String()) + blocks = append(blocks, block) } return nil }) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "failed to list blocks") } return blocks, nil @@ -214,13 +232,21 @@ func splitLocalBlock(ctx context.Context, parentDir, blockDir string, meta block return nil, errors.Wrap(err, "failed while splitting block") } - splitMeta, err := block.ReadMetaFromDir(path.Join(parentDir, splitID.String())) + splitDir := path.Join(parentDir, splitID.String()) + splitMeta, err := block.ReadMetaFromDir(splitDir) if err != nil { return nil, errors.Wrap(err, "failed while reading meta.json from split block") } - level.Info(logger).Log("msg", "created block from split", "minTime", timestamp.Time(minTime), "maxTime", timestamp.Time(maxTime), "splitID", splitID, "series", splitMeta.Stats.NumSeries, "chunks", splitMeta.Stats.NumChunks, "samples", splitMeta.Stats.NumSamples) - result = append(result, splitID) + if splitMeta.Stats.NumSeries == 0 { + if err := os.RemoveAll(splitDir); err != nil { + return nil, errors.Wrap(err, "failed to clean up empty split block") + } + } else { + level.Info(logger).Log("msg", "created block from split", "minTime", timestamp.Time(minTime), "maxTime", timestamp.Time(maxTime), "splitID", splitID, "series", splitMeta.Stats.NumSeries, "chunks", splitMeta.Stats.NumChunks, "samples", splitMeta.Stats.NumSamples) + result = append(result, splitID) + } + minTime = maxTime } From cd6ee763022dda223da06eb4eac58754a95be72c Mon Sep 17 00:00:00 2001 From: Andy Asp Date: Mon, 7 Oct 2024 14:51:49 -0400 Subject: [PATCH 09/12] Add simple high-level test --- tools/splitblocks/main.go | 20 ++--- tools/splitblocks/main_test.go | 131 ++++++++++++++++++++++----------- 2 files changed, 98 insertions(+), 53 deletions(-) diff --git a/tools/splitblocks/main.go b/tools/splitblocks/main.go index e51f89bcf93..562ec2e5d72 100644 --- a/tools/splitblocks/main.go +++ b/tools/splitblocks/main.go @@ -49,11 +49,6 @@ func (c *config) registerFlags(f *flag.FlagSet) { } func (c *config) validate() error { - for _, blockID := range c.blocks { - if _, err := ulid.Parse(blockID); err != nil { - return errors.Wrap(err, "blocks contained an invalid block ID") - } - } if c.maxBlockDuration < 2*time.Hour { return fmt.Errorf("max-block-duration must be at least 2 hours") } @@ -96,18 +91,19 @@ func main() { ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer cancel() - if err := splitBlocks(ctx, cfg, logger); err != nil { - fmt.Fprintln(os.Stderr, err.Error()) + bkt, err := bucket.NewClient(ctx, cfg.bucket, "bucket", logger, nil) + if err != nil { + fmt.Fprintln(os.Stderr, errors.Wrap(err, "failed to create bucket")) os.Exit(1) } -} -func splitBlocks(ctx context.Context, cfg config, logger log.Logger) error { - bkt, err := bucket.NewClient(ctx, cfg.bucket, "bucket", logger, nil) - if err != nil { - return errors.Wrap(err, "failed to create bucket") + if err := splitBlocks(ctx, cfg, bkt, logger); err != nil { + fmt.Fprintln(os.Stderr, err.Error()) + os.Exit(1) } +} +func splitBlocks(ctx context.Context, cfg config, bkt objstore.Bucket, logger log.Logger) error { if cfg.bucketPrefix != "" { bkt = bucket.NewPrefixedBucketClient(bkt, cfg.bucketPrefix) } diff --git a/tools/splitblocks/main_test.go b/tools/splitblocks/main_test.go index 5446085a0a2..26e8278b5df 100644 --- a/tools/splitblocks/main_test.go +++ b/tools/splitblocks/main_test.go @@ -2,6 +2,8 @@ package main import ( "context" + "os" + "path" "path/filepath" "testing" "time" @@ -14,58 +16,51 @@ import ( "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/index" "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" "github.com/grafana/mimir/pkg/storage/tsdb/block" ) -func TestSplitLocalBlock(t *testing.T) { - dir := t.TempDir() +func TestSplitBlocks(t *testing.T) { + bkt := objstore.NewInMemBucket() + cfg := config{ + outputDir: t.TempDir(), + blockConcurrency: 2, + maxBlockDuration: 24 * time.Hour, + } + logger := log.NewNopLogger() startOfDay := time.Now().Truncate(24 * time.Hour) + specs := buildSeriesSpec(startOfDay) - specs := []*block.SeriesSpec{ - { - Labels: labels.FromStrings("__name__", "1_series_with_one_chunk_with_samples_covering_multiple_days"), - Chunks: []chunks.Meta{ - must(chunks.ChunkFromSamples([]chunks.Sample{ - newSample(startOfDay.Add(10*time.Minute).UnixMilli(), 1, nil, nil), - newSample(startOfDay.Add(12*time.Hour).UnixMilli(), 2, nil, nil), - newSample(startOfDay.Add(24*time.Hour).UnixMilli(), 3, nil, nil), - newSample(startOfDay.Add(36*time.Hour).UnixMilli(), 4, nil, nil), - newSample(startOfDay.Add(48*time.Hour).UnixMilli(), 5, nil, nil), - })), - }, - }, + blocksDir := t.TempDir() + meta, err := block.GenerateBlockFromSpec(blocksDir, specs) + require.NoError(t, err) - { - Labels: labels.FromStrings("__name__", "2_series_with_multiple_chunks_not_crossing_24h_boundary"), - Chunks: []chunks.Meta{ - must(chunks.ChunkFromSamples([]chunks.Sample{ - newSample(startOfDay.UnixMilli(), 1, nil, nil), - newSample(startOfDay.Add(12*time.Hour).UnixMilli(), 2, nil, nil), - })), - must(chunks.ChunkFromSamples([]chunks.Sample{ - newSample(startOfDay.Add(24*time.Hour).UnixMilli(), 3, nil, nil), - newSample(startOfDay.Add(36*time.Hour).UnixMilli(), 4, nil, nil), - })), - must(chunks.ChunkFromSamples([]chunks.Sample{ - newSample(startOfDay.Add(48*time.Hour).UnixMilli(), 5, nil, nil), - })), - }, - }, + err = block.Upload(context.Background(), logger, bkt, path.Join(blocksDir, meta.ULID.String()), meta) + require.NoError(t, err) - { - Labels: labels.FromStrings("__name__", "3_series_with_samples_on_second_day"), - Chunks: []chunks.Meta{ - must(chunks.ChunkFromSamples([]chunks.Sample{ - newSample(startOfDay.Add(24*time.Hour).UnixMilli(), 1, nil, nil), - newSample(startOfDay.Add(25*time.Hour).UnixMilli(), 2, nil, nil), - newSample(startOfDay.Add(26*time.Hour).UnixMilli(), 3, nil, nil), - })), - }, - }, + for _, dryRun := range []bool{true, false} { + cfg.dryRun = dryRun + err = splitBlocks(context.Background(), cfg, bkt, logger) + require.NoError(t, err) + + entries, err := os.ReadDir(cfg.outputDir) + require.NoError(t, err) + expectedEntries := 3 + if dryRun { + expectedEntries = 0 + } + require.Len(t, entries, expectedEntries) } +} + +func TestSplitLocalBlock(t *testing.T) { + dir := t.TempDir() + startOfDay := time.Now().Truncate(24 * time.Hour) + + specs := buildSeriesSpec(startOfDay) meta, err := block.GenerateBlockFromSpec(dir, specs) require.NoError(t, err) @@ -142,6 +137,51 @@ func TestSplitLocalBlock(t *testing.T) { } } +func buildSeriesSpec(startOfDay time.Time) []*block.SeriesSpec { + return []*block.SeriesSpec{ + { + Labels: labels.FromStrings("__name__", "1_series_with_one_chunk_with_samples_covering_multiple_days"), + Chunks: []chunks.Meta{ + must(chunks.ChunkFromSamples([]chunks.Sample{ + newSample(startOfDay.Add(10*time.Minute).UnixMilli(), 1, nil, nil), + newSample(startOfDay.Add(12*time.Hour).UnixMilli(), 2, nil, nil), + newSample(startOfDay.Add(24*time.Hour).UnixMilli(), 3, nil, nil), + newSample(startOfDay.Add(36*time.Hour).UnixMilli(), 4, nil, nil), + newSample(startOfDay.Add(48*time.Hour).UnixMilli(), 5, nil, nil), + })), + }, + }, + + { + Labels: labels.FromStrings("__name__", "2_series_with_multiple_chunks_not_crossing_24h_boundary"), + Chunks: []chunks.Meta{ + must(chunks.ChunkFromSamples([]chunks.Sample{ + newSample(startOfDay.UnixMilli(), 1, nil, nil), + newSample(startOfDay.Add(12*time.Hour).UnixMilli(), 2, nil, nil), + })), + must(chunks.ChunkFromSamples([]chunks.Sample{ + newSample(startOfDay.Add(24*time.Hour).UnixMilli(), 3, nil, nil), + newSample(startOfDay.Add(36*time.Hour).UnixMilli(), 4, nil, nil), + })), + must(chunks.ChunkFromSamples([]chunks.Sample{ + newSample(startOfDay.Add(48*time.Hour).UnixMilli(), 5, nil, nil), + })), + }, + }, + + { + Labels: labels.FromStrings("__name__", "3_series_with_samples_on_second_day"), + Chunks: []chunks.Meta{ + must(chunks.ChunkFromSamples([]chunks.Sample{ + newSample(startOfDay.Add(24*time.Hour).UnixMilli(), 1, nil, nil), + newSample(startOfDay.Add(25*time.Hour).UnixMilli(), 2, nil, nil), + newSample(startOfDay.Add(26*time.Hour).UnixMilli(), 3, nil, nil), + })), + }, + }, + } +} + func listSeriesAndChunksFromBlock(t *testing.T, blockDir string) []*block.SeriesSpec { allKey, allValue := index.AllPostingsKey() r, err := index.NewFileReader(filepath.Join(blockDir, block.IndexFilename)) @@ -203,3 +243,12 @@ func (s sample) Type() chunkenc.ValueType { return chunkenc.ValFloat } } + +func (s sample) Copy() sample { + return sample{ + s.t, + s.v, + s.h.Copy(), + s.fh.Copy(), + } +} From b912bf200209b253914a8f50cfd064353dac9cf3 Mon Sep 17 00:00:00 2001 From: Andy Asp Date: Mon, 7 Oct 2024 14:55:45 -0400 Subject: [PATCH 10/12] Change return type for lint --- tools/splitblocks/main_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/splitblocks/main_test.go b/tools/splitblocks/main_test.go index 26e8278b5df..95046aae7f4 100644 --- a/tools/splitblocks/main_test.go +++ b/tools/splitblocks/main_test.go @@ -244,7 +244,7 @@ func (s sample) Type() chunkenc.ValueType { } } -func (s sample) Copy() sample { +func (s sample) Copy() chunks.Sample { return sample{ s.t, s.v, From 86181df6442f14032f1c2ec41a14933545683658 Mon Sep 17 00:00:00 2001 From: Andy Asp Date: Mon, 7 Oct 2024 15:03:41 -0400 Subject: [PATCH 11/12] License for test --- tools/splitblocks/main_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tools/splitblocks/main_test.go b/tools/splitblocks/main_test.go index 95046aae7f4..3d9d27deb7e 100644 --- a/tools/splitblocks/main_test.go +++ b/tools/splitblocks/main_test.go @@ -1,3 +1,5 @@ +// SPDX-License-Identifier: AGPL-3.0-only + package main import ( From 0380b64446c88e93909117938c0f63218adc55c6 Mon Sep 17 00:00:00 2001 From: Andy Asp Date: Mon, 7 Oct 2024 15:18:37 -0400 Subject: [PATCH 12/12] Add CHANGELOG entry --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 836993c0e65..d2de156240f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,6 +52,7 @@ ### Tools +* [FEATURE] `splitblocks`: add new tool to split blocks larger than a specified duration into multiple blocks. #9517 * [ENHANCEMENT] `copyblocks`: Added `--skip-no-compact-block-duration-check`, which defaults to `false`, to simplify targeting blocks that are not awaiting compaction. #9439 ## v2.14.0-rc.0