From 8c97f57624aa31719e772d0b1b4c69fbbfc9f206 Mon Sep 17 00:00:00 2001 From: Andy Asp Date: Tue, 3 Dec 2024 16:35:28 -0500 Subject: [PATCH 1/4] copyblocks: support copying between tenants --- CHANGELOG.md | 3 +- tools/copyblocks/README.md | 1 + tools/copyblocks/main.go | 82 ++++++++++++++++++++++++++++---------- 3 files changed, 65 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b2018bcb8a1..d55bdc1f6d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -151,7 +151,8 @@ ### Tools * [FEATURE] `splitblocks`: add new tool to split blocks larger than a specified duration into multiple blocks. #9517, #9779 -* [ENHANCEMENT] `copyblocks`: Added `--skip-no-compact-block-duration-check`, which defaults to `false`, to simplify targeting blocks that are not awaiting compaction. #9439 +* [ENHANCEMENT] `copyblocks`: add `--skip-no-compact-block-duration-check`, which defaults to `false`, to simplify targeting blocks that are not awaiting compaction. #9439 +* [ENHANCEMENT] `copyblocks`: add `--cross-tenant-mapping` to support copying data between tenants. #10110 * [ENHANCEMENT] `kafkatool`: add SASL plain authentication support. The following new CLI flags have been added: #9584 * `--kafka-sasl-username` * `--kafka-sasl-password` diff --git a/tools/copyblocks/README.md b/tools/copyblocks/README.md index 3e6d14b764a..aa7e1a90872 100644 --- a/tools/copyblocks/README.md +++ b/tools/copyblocks/README.md @@ -13,6 +13,7 @@ The currently supported services are Amazon Simple Storage Service (S3 and S3-co - Include or exclude users from having blocks copied (`--enabled-users` and `--disabled-users`) - Configurable minimum block duration (`--min-block-duration`) and (`--skip-no-compact-block-duration-check`) to target blocks that are not awaiting compaction - Configurable time range (`--min-time` and `--max-time`) to only copy blocks inclusively within a provided range +- Copy blocks between tenants with `--cross-tenant-mapping`. For instance, `--cross-tenant-mapping="tenant1:tenant2,tenant3:tenant4"` would copy source blocks from `tenant1` to `tenant2` and source blocks from `tenant3` to tenant `tenant4`. If a mapping is not provided for a tenant it is assumed to be identical to the source tenant. - Log what would be copied without actually copying anything with `--dry-run` ## Running diff --git a/tools/copyblocks/main.go b/tools/copyblocks/main.go index a72f05efea6..0364f3be6ee 100644 --- a/tools/copyblocks/main.go +++ b/tools/copyblocks/main.go @@ -16,6 +16,7 @@ import ( "os" "os/signal" "path/filepath" + "slices" "strings" "syscall" "time" @@ -24,6 +25,7 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/concurrency" "github.com/grafana/dskit/flagext" + "github.com/grafana/dskit/tenant" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -44,6 +46,7 @@ type config struct { copyPeriod time.Duration enabledUsers flagext.StringSliceCSV disabledUsers flagext.StringSliceCSV + crossTenantMapping flagext.StringSliceCSV dryRun bool skipNoCompactBlockDurationCheck bool httpListen string @@ -59,6 +62,7 @@ func (c *config) registerFlags(f *flag.FlagSet) { f.DurationVar(&c.copyPeriod, "copy-period", 0, "How often to repeat the copy. If set to 0, copy is done once, and the program stops. Otherwise, the program keeps running and copying blocks until it is terminated.") f.Var(&c.enabledUsers, "enabled-users", "If not empty, only blocks for these users are copied.") f.Var(&c.disabledUsers, "disabled-users", "If not empty, blocks for these users are not copied.") + f.Var(&c.crossTenantMapping, "cross-tenant-mapping", "A comma-separated list of (source tenant):(destination tenant). If a source tenant is not mapped then its destination tenant is assumed to be identical.") f.BoolVar(&c.dryRun, "dry-run", false, "Don't perform any copy; only log what would happen.") f.BoolVar(&c.skipNoCompactBlockDurationCheck, "skip-no-compact-block-duration-check", false, "If set, blocks marked as no-compact are not checked against min-block-duration") f.StringVar(&c.httpListen, "http-listen-address", ":8080", "HTTP listen address.") @@ -80,6 +84,23 @@ func (c *config) validate() error { return nil } +func (c *config) parseCrossTenantMapping() (map[string]string, error) { + m := make(map[string]string, len(c.crossTenantMapping)) + for _, mapping := range c.crossTenantMapping { + splitMapping := strings.Split(mapping, ":") + if len(splitMapping) != 2 || slices.Contains(splitMapping, "") { + return nil, fmt.Errorf("invalid tenant mapping: %s", mapping) + } + for _, id := range splitMapping { + if err := tenant.ValidTenantID(id); err != nil { + return nil, err + } + } + m[splitMapping[0]] = splitMapping[1] + } + return m, nil +} + type metrics struct { copyCyclesSucceeded prometheus.Counter copyCyclesFailed prometheus.Counter @@ -126,6 +147,12 @@ func main() { os.Exit(1) } + crossTenantMapping, err := cfg.parseCrossTenantMapping() + if err != nil { + fmt.Fprintln(os.Stderr, err.Error()) + os.Exit(1) + } + logger := log.NewLogfmtLogger(os.Stdout) logger = log.With(logger, "ts", log.DefaultTimestampUTC) @@ -144,7 +171,7 @@ func main() { } }() - success := runCopy(ctx, cfg, logger, m) + success := runCopy(ctx, cfg, crossTenantMapping, logger, m) if cfg.copyPeriod <= 0 { if success { os.Exit(0) @@ -158,14 +185,14 @@ func main() { for ctx.Err() == nil { select { case <-t.C: - _ = runCopy(ctx, cfg, logger, m) + _ = runCopy(ctx, cfg, crossTenantMapping, logger, m) case <-ctx.Done(): } } } -func runCopy(ctx context.Context, cfg config, logger log.Logger, m *metrics) bool { - err := copyBlocks(ctx, cfg, logger, m) +func runCopy(ctx context.Context, cfg config, crossTenantMapping map[string]string, logger log.Logger, m *metrics) bool { + err := copyBlocks(ctx, cfg, crossTenantMapping, logger, m) if err != nil { m.copyCyclesFailed.Inc() level.Error(logger).Log("msg", "failed to copy blocks", "err", err, "dryRun", cfg.dryRun) @@ -177,7 +204,7 @@ func runCopy(ctx context.Context, cfg config, logger log.Logger, m *metrics) boo return true } -func copyBlocks(ctx context.Context, cfg config, logger log.Logger, m *metrics) error { +func copyBlocks(ctx context.Context, cfg config, crossTenantMapping map[string]string, logger log.Logger, m *metrics) error { sourceBucket, destBucket, copyFunc, err := cfg.copyConfig.ToBuckets(ctx) if err != nil { return err @@ -197,23 +224,28 @@ func copyBlocks(ctx context.Context, cfg config, logger log.Logger, m *metrics) disabledUsers[u] = struct{}{} } - return concurrency.ForEachUser(ctx, tenants, cfg.tenantConcurrency, func(ctx context.Context, tenantID string) error { - if !isAllowedUser(enabledUsers, disabledUsers, tenantID) { + return concurrency.ForEachUser(ctx, tenants, cfg.tenantConcurrency, func(ctx context.Context, sourceTenantID string) error { + if !isAllowedUser(enabledUsers, disabledUsers, sourceTenantID) { return nil } - logger := log.With(logger, "tenantID", tenantID) + destinationTenantID, ok := crossTenantMapping[sourceTenantID] + if !ok { + destinationTenantID = sourceTenantID + } + + logger := log.With(logger, "sourceTenantID", sourceTenantID, "destinationTenantID", destinationTenantID) - blocks, err := listBlocksForTenant(ctx, sourceBucket, tenantID) + blocks, err := listBlocksForTenant(ctx, sourceBucket, sourceTenantID) if err != nil { level.Error(logger).Log("msg", "failed to list blocks for tenant", "err", err) - return errors.Wrapf(err, "failed to list blocks for tenant %v", tenantID) + return errors.Wrapf(err, "failed to list blocks for tenant %v", sourceTenantID) } - markers, err := listBlockMarkersForTenant(ctx, sourceBucket, tenantID, destBucket.Name()) + markers, err := listBlockMarkersForTenant(ctx, sourceBucket, sourceTenantID, destBucket.Name()) if err != nil { level.Error(logger).Log("msg", "failed to list blocks markers for tenant", "err", err) - return errors.Wrapf(err, "failed to list block markers for tenant %v", tenantID) + return errors.Wrapf(err, "failed to list block markers for tenant %v", sourceTenantID) } var blockIDs []string @@ -243,7 +275,7 @@ func copyBlocks(ctx context.Context, cfg config, logger log.Logger, m *metrics) return nil } - blockMeta, err := loadMetaJSONFile(ctx, sourceBucket, tenantID, blockID) + blockMeta, err := loadMetaJSONFile(ctx, sourceBucket, sourceTenantID, blockID) if err != nil { level.Error(logger).Log("msg", "skipping block, failed to read meta.json file", "err", err) return err @@ -287,7 +319,7 @@ func copyBlocks(ctx context.Context, cfg config, logger log.Logger, m *metrics) level.Info(logger).Log("msg", "copying block") - err = copySingleBlock(ctx, tenantID, blockID, markers[blockID], sourceBucket, copyFunc) + err = copySingleBlock(ctx, sourceTenantID, destinationTenantID, blockID, markers[blockID], sourceBucket, copyFunc) if err != nil { m.blocksCopyFailed.Inc() level.Error(logger).Log("msg", "failed to copy block", "err", err) @@ -297,7 +329,7 @@ func copyBlocks(ctx context.Context, cfg config, logger log.Logger, m *metrics) m.blocksCopied.Inc() level.Info(logger).Log("msg", "block copied successfully") - err = uploadCopiedMarkerFile(ctx, sourceBucket, tenantID, blockID, destBucket.Name()) + err = uploadCopiedMarkerFile(ctx, sourceBucket, sourceTenantID, blockID, destBucket.Name()) if err != nil { level.Error(logger).Log("msg", "failed to upload copied-marker file for block", "block", blockID.String(), "err", err) return err @@ -324,13 +356,13 @@ func isAllowedUser(enabled map[string]struct{}, disabled map[string]struct{}, te } // This method copies files within single TSDB block to a destination bucket. -func copySingleBlock(ctx context.Context, tenantID string, blockID ulid.ULID, markers blockMarkers, srcBkt objtools.Bucket, copyFunc objtools.CopyFunc) error { +func copySingleBlock(ctx context.Context, sourceTenantID, destinationTenantID string, blockID ulid.ULID, markers blockMarkers, srcBkt objtools.Bucket, copyFunc objtools.CopyFunc) error { result, err := srcBkt.List(ctx, objtools.ListOptions{ - Prefix: tenantID + objtools.Delim + blockID.String(), + Prefix: sourceTenantID + objtools.Delim + blockID.String(), Recursive: true, }) if err != nil { - return errors.Wrapf(err, "copySingleBlock: failed to list block files for %v/%v", tenantID, blockID.String()) + return errors.Wrapf(err, "copySingleBlock: failed to list block files for %v/%v", sourceTenantID, blockID.String()) } paths := result.ToNames() @@ -346,11 +378,21 @@ func copySingleBlock(ctx context.Context, tenantID string, blockID ulid.ULID, ma // Copy global markers too (skipping deletion mark because deleted blocks are not copied by this tool). if markers.noCompact { - paths = append(paths, tenantID+objtools.Delim+block.NoCompactMarkFilepath(blockID)) + paths = append(paths, sourceTenantID+objtools.Delim+block.NoCompactMarkFilepath(blockID)) } + isCrossTenant := sourceTenantID != destinationTenantID + for _, fullPath := range paths { - err := copyFunc(ctx, fullPath, objtools.CopyOptions{}) + options := objtools.CopyOptions{} + if isCrossTenant { + after, found := strings.CutPrefix(fullPath, sourceTenantID) + if !found { + return fmt.Errorf("unexpected object path that does not begin with sourceTenantID: path=%s, sourceTenantID=%s", fullPath, sourceTenantID) + } + options.DestinationObjectName = destinationTenantID + after + } + err := copyFunc(ctx, fullPath, options) if err != nil { return errors.Wrapf(err, "copySingleBlock: failed to copy %v", fullPath) } From 845e5e1d1423b9e563d8930e51dec860122f4758 Mon Sep 17 00:00:00 2001 From: Andy Asp <90626759+andyasp@users.noreply.github.com> Date: Tue, 3 Dec 2024 16:50:37 -0500 Subject: [PATCH 2/4] Update tools/copyblocks/README.md --- tools/copyblocks/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/copyblocks/README.md b/tools/copyblocks/README.md index aa7e1a90872..d9bfb8f39fe 100644 --- a/tools/copyblocks/README.md +++ b/tools/copyblocks/README.md @@ -13,7 +13,7 @@ The currently supported services are Amazon Simple Storage Service (S3 and S3-co - Include or exclude users from having blocks copied (`--enabled-users` and `--disabled-users`) - Configurable minimum block duration (`--min-block-duration`) and (`--skip-no-compact-block-duration-check`) to target blocks that are not awaiting compaction - Configurable time range (`--min-time` and `--max-time`) to only copy blocks inclusively within a provided range -- Copy blocks between tenants with `--cross-tenant-mapping`. For instance, `--cross-tenant-mapping="tenant1:tenant2,tenant3:tenant4"` would copy source blocks from `tenant1` to `tenant2` and source blocks from `tenant3` to tenant `tenant4`. If a mapping is not provided for a tenant it is assumed to be identical to the source tenant. +- Copy blocks between tenants with `--cross-tenant-mapping`. For instance, `--cross-tenant-mapping="tenant1:tenant2,tenant3:tenant4"` would map source blocks from `tenant1` to `tenant2` and source blocks from `tenant3` to tenant `tenant4`. If a mapping is not provided for a tenant it is assumed to be identical to the source tenant. - Log what would be copied without actually copying anything with `--dry-run` ## Running From 4b5d707783396a05cd4fae25f199cbefafa1a111 Mon Sep 17 00:00:00 2001 From: Andy Asp <90626759+andyasp@users.noreply.github.com> Date: Tue, 3 Dec 2024 17:56:01 -0500 Subject: [PATCH 3/4] Update tools/copyblocks/README.md Co-authored-by: Taylor C <41653732+tacole02@users.noreply.github.com> --- tools/copyblocks/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/copyblocks/README.md b/tools/copyblocks/README.md index d9bfb8f39fe..7847b665135 100644 --- a/tools/copyblocks/README.md +++ b/tools/copyblocks/README.md @@ -13,7 +13,7 @@ The currently supported services are Amazon Simple Storage Service (S3 and S3-co - Include or exclude users from having blocks copied (`--enabled-users` and `--disabled-users`) - Configurable minimum block duration (`--min-block-duration`) and (`--skip-no-compact-block-duration-check`) to target blocks that are not awaiting compaction - Configurable time range (`--min-time` and `--max-time`) to only copy blocks inclusively within a provided range -- Copy blocks between tenants with `--cross-tenant-mapping`. For instance, `--cross-tenant-mapping="tenant1:tenant2,tenant3:tenant4"` would map source blocks from `tenant1` to `tenant2` and source blocks from `tenant3` to tenant `tenant4`. If a mapping is not provided for a tenant it is assumed to be identical to the source tenant. +- Copy blocks between tenants with `--cross-tenant-mapping`. For instance, `--cross-tenant-mapping="tenant1:tenant2,tenant3:tenant4"` maps source blocks from `tenant1` to `tenant2` and source blocks from `tenant3` to tenant `tenant4`. If you don't provide a mapping for a tenant, it is assumed to be identical to the source tenant. - Log what would be copied without actually copying anything with `--dry-run` ## Running From 549093f5eeb1ed52082208474b68a2078c03e402 Mon Sep 17 00:00:00 2001 From: Andy Asp Date: Wed, 4 Dec 2024 10:32:12 -0500 Subject: [PATCH 4/4] Address feedback --- CHANGELOG.md | 2 +- tools/copyblocks/README.md | 2 +- tools/copyblocks/main.go | 36 +++++++++++++++++++++--------------- 3 files changed, 23 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d55bdc1f6d8..fe8e290b583 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -152,7 +152,7 @@ * [FEATURE] `splitblocks`: add new tool to split blocks larger than a specified duration into multiple blocks. #9517, #9779 * [ENHANCEMENT] `copyblocks`: add `--skip-no-compact-block-duration-check`, which defaults to `false`, to simplify targeting blocks that are not awaiting compaction. #9439 -* [ENHANCEMENT] `copyblocks`: add `--cross-tenant-mapping` to support copying data between tenants. #10110 +* [ENHANCEMENT] `copyblocks`: add `--user-mapping` to support copying blocks between users. #10110 * [ENHANCEMENT] `kafkatool`: add SASL plain authentication support. The following new CLI flags have been added: #9584 * `--kafka-sasl-username` * `--kafka-sasl-password` diff --git a/tools/copyblocks/README.md b/tools/copyblocks/README.md index 7847b665135..8f0fe49a936 100644 --- a/tools/copyblocks/README.md +++ b/tools/copyblocks/README.md @@ -13,7 +13,7 @@ The currently supported services are Amazon Simple Storage Service (S3 and S3-co - Include or exclude users from having blocks copied (`--enabled-users` and `--disabled-users`) - Configurable minimum block duration (`--min-block-duration`) and (`--skip-no-compact-block-duration-check`) to target blocks that are not awaiting compaction - Configurable time range (`--min-time` and `--max-time`) to only copy blocks inclusively within a provided range -- Copy blocks between tenants with `--cross-tenant-mapping`. For instance, `--cross-tenant-mapping="tenant1:tenant2,tenant3:tenant4"` maps source blocks from `tenant1` to `tenant2` and source blocks from `tenant3` to tenant `tenant4`. If you don't provide a mapping for a tenant, it is assumed to be identical to the source tenant. +- Copy blocks between users with `--user-mapping`. For instance, `--user-mapping="user1:user2,user3:user4"` maps source blocks from `user1` to `user2` and source blocks from `user3` to `user4`. If you don't provide a mapping for a user, it is assumed to be identical to the source user. - Log what would be copied without actually copying anything with `--dry-run` ## Running diff --git a/tools/copyblocks/main.go b/tools/copyblocks/main.go index 0364f3be6ee..d6a2bc22783 100644 --- a/tools/copyblocks/main.go +++ b/tools/copyblocks/main.go @@ -46,7 +46,7 @@ type config struct { copyPeriod time.Duration enabledUsers flagext.StringSliceCSV disabledUsers flagext.StringSliceCSV - crossTenantMapping flagext.StringSliceCSV + userMapping flagext.StringSliceCSV dryRun bool skipNoCompactBlockDurationCheck bool httpListen string @@ -62,9 +62,9 @@ func (c *config) registerFlags(f *flag.FlagSet) { f.DurationVar(&c.copyPeriod, "copy-period", 0, "How often to repeat the copy. If set to 0, copy is done once, and the program stops. Otherwise, the program keeps running and copying blocks until it is terminated.") f.Var(&c.enabledUsers, "enabled-users", "If not empty, only blocks for these users are copied.") f.Var(&c.disabledUsers, "disabled-users", "If not empty, blocks for these users are not copied.") - f.Var(&c.crossTenantMapping, "cross-tenant-mapping", "A comma-separated list of (source tenant):(destination tenant). If a source tenant is not mapped then its destination tenant is assumed to be identical.") + f.Var(&c.userMapping, "user-mapping", "A comma-separated list of (source user):(destination user). If a user is not mapped then its destination user is assumed to be identical.") f.BoolVar(&c.dryRun, "dry-run", false, "Don't perform any copy; only log what would happen.") - f.BoolVar(&c.skipNoCompactBlockDurationCheck, "skip-no-compact-block-duration-check", false, "If set, blocks marked as no-compact are not checked against min-block-duration") + f.BoolVar(&c.skipNoCompactBlockDurationCheck, "skip-no-compact-block-duration-check", false, "If set, blocks marked as no-compact are not checked against min-block-duration.") f.StringVar(&c.httpListen, "http-listen-address", ":8080", "HTTP listen address.") } @@ -84,19 +84,23 @@ func (c *config) validate() error { return nil } -func (c *config) parseCrossTenantMapping() (map[string]string, error) { - m := make(map[string]string, len(c.crossTenantMapping)) - for _, mapping := range c.crossTenantMapping { +func (c *config) parseUserMapping() (map[string]string, error) { + m := make(map[string]string, len(c.userMapping)) + for _, mapping := range c.userMapping { splitMapping := strings.Split(mapping, ":") if len(splitMapping) != 2 || slices.Contains(splitMapping, "") { - return nil, fmt.Errorf("invalid tenant mapping: %s", mapping) + return nil, fmt.Errorf("invalid user mapping: %s", mapping) } for _, id := range splitMapping { if err := tenant.ValidTenantID(id); err != nil { return nil, err } } - m[splitMapping[0]] = splitMapping[1] + source := splitMapping[0] + if _, ok := m[source]; ok { + return nil, fmt.Errorf("multiple user mappings for source user: %s", source) + } + m[source] = splitMapping[1] } return m, nil } @@ -147,7 +151,7 @@ func main() { os.Exit(1) } - crossTenantMapping, err := cfg.parseCrossTenantMapping() + userMapping, err := cfg.parseUserMapping() if err != nil { fmt.Fprintln(os.Stderr, err.Error()) os.Exit(1) @@ -171,7 +175,7 @@ func main() { } }() - success := runCopy(ctx, cfg, crossTenantMapping, logger, m) + success := runCopy(ctx, cfg, userMapping, logger, m) if cfg.copyPeriod <= 0 { if success { os.Exit(0) @@ -185,14 +189,14 @@ func main() { for ctx.Err() == nil { select { case <-t.C: - _ = runCopy(ctx, cfg, crossTenantMapping, logger, m) + _ = runCopy(ctx, cfg, userMapping, logger, m) case <-ctx.Done(): } } } -func runCopy(ctx context.Context, cfg config, crossTenantMapping map[string]string, logger log.Logger, m *metrics) bool { - err := copyBlocks(ctx, cfg, crossTenantMapping, logger, m) +func runCopy(ctx context.Context, cfg config, userMapping map[string]string, logger log.Logger, m *metrics) bool { + err := copyBlocks(ctx, cfg, userMapping, logger, m) if err != nil { m.copyCyclesFailed.Inc() level.Error(logger).Log("msg", "failed to copy blocks", "err", err, "dryRun", cfg.dryRun) @@ -204,7 +208,7 @@ func runCopy(ctx context.Context, cfg config, crossTenantMapping map[string]stri return true } -func copyBlocks(ctx context.Context, cfg config, crossTenantMapping map[string]string, logger log.Logger, m *metrics) error { +func copyBlocks(ctx context.Context, cfg config, userMapping map[string]string, logger log.Logger, m *metrics) error { sourceBucket, destBucket, copyFunc, err := cfg.copyConfig.ToBuckets(ctx) if err != nil { return err @@ -229,7 +233,7 @@ func copyBlocks(ctx context.Context, cfg config, crossTenantMapping map[string]s return nil } - destinationTenantID, ok := crossTenantMapping[sourceTenantID] + destinationTenantID, ok := userMapping[sourceTenantID] if !ok { destinationTenantID = sourceTenantID } @@ -329,6 +333,8 @@ func copyBlocks(ctx context.Context, cfg config, crossTenantMapping map[string]s m.blocksCopied.Inc() level.Info(logger).Log("msg", "block copied successfully") + // Note that only the blockID and destination bucket are considered in the copy marker. + // If multiple tenants in the same destination bucket are copied to from the same source tenant the markers will currently clash. err = uploadCopiedMarkerFile(ctx, sourceBucket, sourceTenantID, blockID, destBucket.Name()) if err != nil { level.Error(logger).Log("msg", "failed to upload copied-marker file for block", "block", blockID.String(), "err", err)