Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split blocks tool #9517

Merged
merged 12 commits into from
Oct 7, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

### Tools

* [FEATURE] `splitblocks`: add new tool to split blocks larger than a specified duration into multiple blocks. #9517
* [ENHANCEMENT] `copyblocks`: Added `--skip-no-compact-block-duration-check`, which defaults to `false`, to simplify targeting blocks that are not awaiting compaction. #9439

## v2.14.0-rc.0
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/tsdb/block/block_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func GenerateBlockFromSpec(storageDir string, specs SeriesSpecs) (_ *Meta, retur
blockID := ulid.MustNew(ulid.Now(), crypto_rand.Reader)
blockDir := filepath.Join(storageDir, blockID.String())

stats := tsdb.BlockStats{}

// Ensure series are sorted.
sort.Slice(specs, func(i, j int) bool {
return labels.Compare(specs[i].Labels, specs[j].Labels) < 0
Expand Down Expand Up @@ -109,11 +111,15 @@ func GenerateBlockFromSpec(storageDir string, specs SeriesSpecs) (_ *Meta, retur

// Updates the Ref on each chunk.
for _, series := range specs {
stats.NumSeries++

// Ensure every chunk meta has chunk data.
for _, c := range series.Chunks {
if c.Chunk == nil {
return nil, errors.Errorf("missing chunk data for series %s", series.Labels.String())
}
stats.NumChunks++
stats.NumSamples += uint64(c.Chunk.NumSamples())
}

if err := chunkw.WriteChunks(series.Chunks...); err != nil {
Expand Down Expand Up @@ -172,6 +178,7 @@ func GenerateBlockFromSpec(storageDir string, specs SeriesSpecs) (_ *Meta, retur
Sources: []ulid.ULID{blockID},
},
Version: 1,
Stats: stats,
},
Thanos: ThanosMeta{
Version: ThanosVersion1,
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/tsdb/block/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
CompactorRepairSource SourceType = "compactor.repair"
BucketRepairSource SourceType = "bucket.repair"
BlockBuilderSource SourceType = "block-builder"
SplitBlocksSource SourceType = "split-blocks"
TestSource SourceType = "test"
)

Expand Down
1 change: 1 addition & 0 deletions tools/splitblocks/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
splitblocks
66 changes: 66 additions & 0 deletions tools/splitblocks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Split Blocks

This program splits source blocks into new blocks where each spans at most a duration of time. For instance, it can create three 24 hour blocks from a single 72 hour block.

Time boundaries are also considered when determining what to split. For instance, a block spanning `00:00`-`23:59` could not be split while a block spanning `12:00`-`11:59` (the next day) would be.

Source blocks can be read from either object storage or a local filesystem. Blocks that are created are only written to a local filesystem.

## Flags

- `--output.dir` (required) The output directory where split blocks will be written on the local filesystem
- `--blocks` (optional) A comma separated list of blocks to target. If not provided, or empty, all blocks are considered
- `--block-concurrency` (optional, defaults to `5`) How many blocks can be split concurrently
- `--bucket-prefix` (optional) A prefix applied to the bucket path
- `--max-block-duration` (optional, defaults to `24h`) Max block duration, blocks larger than this or crossing a duration boundary are split
- `--full` (optional) If set, blocks that do not need to be split are included in the output directory
- `--dry-run` (optional) If set, blocks are not downloaded (except metadata) and splits are not performed; only what would happen is logged

## Running

Running `go build .` in this directory builds the program. Then use an example below as a guide.

### Splitting blocks from a local filesystem

```bash
./splitblocks \
--backend filesystem \
--filesystem.dir <directory> \
--output.dir <directory> \
--dry-run
```

### Splitting blocks from Google Cloud Storage

```bash
./splitblocks \
--backend gcs \
--gcs.bucket-name <bucket name> \
--output.dir <directory> \
--dry-run
```

### Splitting blocks from Azure Blob Storage

```bash
./splitblocks \
--backend azure \
--azure.container-name <container name> \
--azure.account-name <account name> \
--azure.account-key <account key> \
--output.dir <directory> \
--dry-run
```

### Splitting blocks from Amazon Simple Storage Service

```bash
./splitblocks \
--backend s3 \
--s3.bucket-name <bucket name> \
--s3.access-key-id <access key id> \
--s3.secret-access-key <secret access key> \
--s3.endpoint <endpoint> \
--output.dir <directory> \
--dry-run
```
254 changes: 254 additions & 0 deletions tools/splitblocks/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
// SPDX-License-Identifier: AGPL-3.0-only

package main

import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"path"
"path/filepath"
"syscall"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/flagext"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/thanos-io/objstore"

"github.com/grafana/mimir/pkg/storage/bucket"
"github.com/grafana/mimir/pkg/storage/tsdb/block"
)

type config struct {
bucket bucket.Config
blocks flagext.StringSliceCSV
bucketPrefix string
outputDir string
blockConcurrency int
full bool
dryRun bool
maxBlockDuration time.Duration
}

func (c *config) registerFlags(f *flag.FlagSet) {
c.bucket.RegisterFlags(f)
f.Var(&c.blocks, "blocks", "An optional comma separated list of blocks to target. If not provided, or empty, all blocks are considered")
f.StringVar(&c.bucketPrefix, "bucket-prefix", "", "An optional prefix applied to the bucket path")
f.StringVar(&c.outputDir, "output.dir", "", "The output directory where split blocks will be written")
f.IntVar(&c.blockConcurrency, "block-concurrency", 5, "How many blocks can be split at once")
f.BoolVar(&c.full, "full", false, "If set blocks that do not need to be split are included in the output directory")
f.BoolVar(&c.dryRun, "dry-run", false, "If set blocks are not downloaded (except metadata) and splits are not performed; only what would happen is logged")
f.DurationVar(&c.maxBlockDuration, "max-block-duration", 24*time.Hour, "Max block duration, blocks larger than this or crossing a duration boundary are split")
}

func (c *config) validate() error {
if c.maxBlockDuration < 2*time.Hour {
return fmt.Errorf("max-block-duration must be at least 2 hours")
}
if c.maxBlockDuration.Truncate(time.Hour) != c.maxBlockDuration {
return fmt.Errorf("max-block-duration should be aligned to hours")
}
if 24*time.Hour%c.maxBlockDuration.Truncate(time.Hour) != 0 {
return fmt.Errorf("max-block-duration should divide 24h without remainder")
}
if c.outputDir == "" {
return fmt.Errorf("output-dir is required")
}
if c.blockConcurrency < 1 {
return fmt.Errorf("block-concurrency must be positive")
}
return nil
}

func main() {
// Clean up all flags registered via init() methods of 3rd-party libraries.
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError)

cfg := config{}
cfg.registerFlags(flag.CommandLine)

// Parse CLI arguments.
if err := flagext.ParseFlagsWithoutArguments(flag.CommandLine); err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
}

if err := cfg.validate(); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}

logger := log.NewLogfmtLogger(os.Stdout)
logger = log.With(logger, "ts", log.DefaultTimestampUTC)

ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()

bkt, err := bucket.NewClient(ctx, cfg.bucket, "bucket", logger, nil)
if err != nil {
fmt.Fprintln(os.Stderr, errors.Wrap(err, "failed to create bucket"))
os.Exit(1)
}

if err := splitBlocks(ctx, cfg, bkt, logger); err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
}
}

func splitBlocks(ctx context.Context, cfg config, bkt objstore.Bucket, logger log.Logger) error {
if cfg.bucketPrefix != "" {
bkt = bucket.NewPrefixedBucketClient(bkt, cfg.bucketPrefix)
}

blockIDs, err := targetBlocks(ctx, cfg, bkt)
if err != nil {
return err
}

return concurrency.ForEachJob(ctx, len(blockIDs), cfg.blockConcurrency, func(ctx context.Context, idx int) error {
blockID := blockIDs[idx]

logger := log.With(logger, "block", blockID.String())
blockMeta, err := block.DownloadMeta(ctx, logger, bkt, blockID)
if err != nil {
level.Error(logger).Log("msg", "failed to read block's meta.json file", "err", err)
return err
}

blockMinTime := timestamp.Time(blockMeta.MinTime)
blockMaxTime := timestamp.Time(blockMeta.MaxTime)
level.Info(logger).Log("block_min_time", blockMinTime, "block_max_time", blockMaxTime)

if blockMinTime.After(blockMaxTime) {
level.Error(logger).Log("msg", "block has an invalid minTime greater than maxTime")
return fmt.Errorf("block has an invalid minTime greater than maxTime")
}

allowedMaxTime := blockMinTime.Truncate(cfg.maxBlockDuration).Add(cfg.maxBlockDuration)
if !blockMaxTime.After(allowedMaxTime) {
level.Info(logger).Log("msg", "block does not need to be split")
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
if cfg.full {
if cfg.dryRun {
level.Info(logger).Log("msg", "dry run: would download block")
return nil
}
blockDir := filepath.Join(cfg.outputDir, blockID.String())
if err := block.Download(ctx, logger, bkt, blockID, blockDir); err != nil {
return errors.Wrapf(err, "failed to download block")
}
}
return nil
}

if cfg.dryRun {
level.Info(logger).Log("msg", "dry run: would split block")
return nil
}

if err := splitBlock(ctx, cfg, bkt, blockMeta, logger); err != nil {
level.Error(logger).Log("msg", "failed to split block", "err", err)
return err
}

level.Info(logger).Log("msg", "block split successfully")
return nil
})
}

func targetBlocks(ctx context.Context, cfg config, bkt objstore.Bucket) ([]ulid.ULID, error) {
if len(cfg.blocks) == 0 {
return listBlocks(ctx, bkt)
}

blocks := make([]ulid.ULID, 0, len(cfg.blocks))
for _, block := range cfg.blocks {
blockID, err := ulid.Parse(block)
if err != nil {
return nil, errors.Wrapf(err, "a blockID in --blocks was invalid: %s", block)
}
blocks = append(blocks, blockID)
}
return blocks, nil
}

func listBlocks(ctx context.Context, bkt objstore.Bucket) ([]ulid.ULID, error) {
var blocks []ulid.ULID
err := bkt.Iter(ctx, "", func(name string) error {
if block, ok := block.IsBlockDir(name); ok {
blocks = append(blocks, block)
}
return nil
})
if err != nil {
return nil, errors.Wrapf(err, "failed to list blocks")
}

return blocks, nil
}

// splitBlock downloads the source block to the output directory, then generates new blocks that are within cfg.blockRanges durations.
// After all the splits succeed the original source block is removed from the output directory.
func splitBlock(ctx context.Context, cfg config, bkt objstore.Bucket, meta block.Meta, logger log.Logger) error {
originalBlockDir := filepath.Join(cfg.outputDir, meta.ULID.String())
if err := block.Download(ctx, logger, bkt, meta.ULID, originalBlockDir); err != nil {
return err
}

_, err := splitLocalBlock(ctx, cfg.outputDir, originalBlockDir, meta, cfg.maxBlockDuration, logger)
return err
}

func splitLocalBlock(ctx context.Context, parentDir, blockDir string, meta block.Meta, maxDuration time.Duration, logger log.Logger) ([]ulid.ULID, error) {
origBlockMaxTime := meta.MaxTime
minTime := meta.MinTime
result := []ulid.ULID(nil)
for minTime < origBlockMaxTime {
// Max time cannot cross maxDuration boundary, but also should not be greater than original max time.
maxTime := min(timestamp.Time(minTime).Truncate(maxDuration).Add(maxDuration).UnixMilli(), origBlockMaxTime)

level.Info(logger).Log("msg", "splitting block", "minTime", timestamp.Time(minTime), "maxTime", timestamp.Time(maxTime))

// Inject a modified meta and abuse the repair into removing the now "outside" chunks.
// Chunks that cross boundaries are included in multiple blocks.
meta.MinTime = minTime
meta.MaxTime = maxTime
if err := meta.WriteToDir(logger, blockDir); err != nil {
return nil, errors.Wrap(err, "failed injecting meta for split")
}
splitID, err := block.Repair(ctx, logger, parentDir, meta.ULID, block.SplitBlocksSource, block.IgnoreCompleteOutsideChunk, block.IgnoreIssue347OutsideChunk)
if err != nil {
return nil, errors.Wrap(err, "failed while splitting block")
}

splitDir := path.Join(parentDir, splitID.String())
splitMeta, err := block.ReadMetaFromDir(splitDir)
if err != nil {
return nil, errors.Wrap(err, "failed while reading meta.json from split block")
}

if splitMeta.Stats.NumSeries == 0 {
if err := os.RemoveAll(splitDir); err != nil {
return nil, errors.Wrap(err, "failed to clean up empty split block")
}
} else {
level.Info(logger).Log("msg", "created block from split", "minTime", timestamp.Time(minTime), "maxTime", timestamp.Time(maxTime), "splitID", splitID, "series", splitMeta.Stats.NumSeries, "chunks", splitMeta.Stats.NumChunks, "samples", splitMeta.Stats.NumSamples)
result = append(result, splitID)
}

minTime = maxTime
}

if err := os.RemoveAll(blockDir); err != nil {
return nil, errors.Wrap(err, "failed to clean up original block directory after splitting block")
}

return result, nil
}
Loading
Loading