diff --git a/cmd/backup.go b/cmd/backup.go index 971d4ef08..948249671 100644 --- a/cmd/backup.go +++ b/cmd/backup.go @@ -30,7 +30,7 @@ func runBackupCommand(command *cobra.Command, cmdName string) error { func runBackupRawCommand(command *cobra.Command, cmdName string) error { cfg := task.RawKvConfig{Config: task.Config{LogProgress: HasLogFile()}} - if err := cfg.ParseFromFlags(command.Flags()); err != nil { + if err := cfg.ParseBackupConfigFromFlags(command.Flags()); err != nil { command.SilenceUsage = false return err } diff --git a/go.mod b/go.mod index 9bbce185d..df376c0e5 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712 github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011 github.com/pingcap/failpoint v0.0.0-20200603062251-b230c36c413c - github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29 + github.com/pingcap/kvproto v0.0.0-20200706115936-1e0910aabe6c github.com/pingcap/log v0.0.0-20200511115504-543df19646ad github.com/pingcap/parser v0.0.0-20200609110328-c65941b9fbb3 github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200520083007-2c251bd8f181 diff --git a/go.sum b/go.sum index f7588a3f7..9305d4e12 100644 --- a/go.sum +++ b/go.sum @@ -478,6 +478,8 @@ github.com/pingcap/kvproto v0.0.0-20200424032552-6650270c39c3/go.mod h1:IOdRDPLy github.com/pingcap/kvproto v0.0.0-20200428135407-0f5ffe459677/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29 h1:NpW1OuYrIl+IQrSsVbtyHpHpazmSCHy+ysrOixY0xY4= github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20200706115936-1e0910aabe6c h1:VnLpCAxMAeDxc7HXTetwDQB+/MtDQjHAOBsd4QnGVwA= +github.com/pingcap/kvproto v0.0.0-20200706115936-1e0910aabe6c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd h1:CV3VsP3Z02MVtdpTMfEgRJ4T9NGgGTxdHpJerent7rM= diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 1a6b6cc3b..a86801434 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -503,8 +503,8 @@ func (bc *Client) BackupRange( // Find and backup remaining ranges. // TODO: test fine grained backup. err = bc.fineGrainedBackup( - ctx, startKey, endKey, req.StartVersion, - req.EndVersion, req.RateLimit, req.Concurrency, results, updateCh) + ctx, startKey, endKey, req.StartVersion, req.EndVersion, req.CompressionType, + req.RateLimit, req.Concurrency, results, updateCh) if err != nil { return nil, err } @@ -564,6 +564,7 @@ func (bc *Client) fineGrainedBackup( startKey, endKey []byte, lastBackupTS uint64, backupTS uint64, + compressType kvproto.CompressionType, rateLimit uint64, concurrency uint32, rangeTree rtree.RangeTree, @@ -594,7 +595,7 @@ func (bc *Client) fineGrainedBackup( defer wg.Done() for rg := range retry { backoffMs, err := - bc.handleFineGrained(ctx, boFork, rg, lastBackupTS, backupTS, rateLimit, concurrency, respCh) + bc.handleFineGrained(ctx, boFork, rg, lastBackupTS, backupTS, compressType, rateLimit, concurrency, respCh) if err != nil { errCh <- err return @@ -728,6 +729,7 @@ func (bc *Client) handleFineGrained( rg rtree.Range, lastBackupTS uint64, backupTS uint64, + compressType kvproto.CompressionType, rateLimit uint64, concurrency uint32, respCh chan<- *kvproto.BackupResponse, @@ -740,14 +742,15 @@ func (bc *Client) handleFineGrained( max := 0 req := kvproto.BackupRequest{ - ClusterId: bc.clusterID, - StartKey: rg.StartKey, // TODO: the range may cross region. - EndKey: rg.EndKey, - StartVersion: lastBackupTS, - EndVersion: backupTS, - StorageBackend: bc.backend, - RateLimit: rateLimit, - Concurrency: concurrency, + ClusterId: bc.clusterID, + StartKey: rg.StartKey, // TODO: the range may cross region. + EndKey: rg.EndKey, + StartVersion: lastBackupTS, + EndVersion: backupTS, + StorageBackend: bc.backend, + RateLimit: rateLimit, + Concurrency: concurrency, + CompressionType: compressType, } lockResolver := bc.mgr.GetLockResolver() client, err := bc.mgr.GetBackupClient(ctx, storeID) @@ -812,7 +815,7 @@ func SendBackup( // TODO: handle errors in the resp. log.Info("range backuped", zap.Stringer("StartKey", utils.WrapKey(resp.GetStartKey())), - zap.Stringer("EndKey", utils.WrapKey(req.GetEndKey()))) + zap.Stringer("EndKey", utils.WrapKey(resp.GetEndKey()))) err = respFn(resp) if err != nil { return err diff --git a/pkg/task/backup.go b/pkg/task/backup.go index 0956df988..b79f33d35 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -28,9 +28,10 @@ import ( ) const ( - flagBackupTimeago = "timeago" - flagBackupTS = "backupts" - flagLastBackupTS = "lastbackupts" + flagBackupTimeago = "timeago" + flagBackupTS = "backupts" + flagLastBackupTS = "lastbackupts" + flagCompressionType = "compression" flagGCTTL = "gcttl" @@ -42,10 +43,11 @@ const ( type BackupConfig struct { Config - TimeAgo time.Duration `json:"time-ago" toml:"time-ago"` - BackupTS uint64 `json:"backup-ts" toml:"backup-ts"` - LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"` - GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"` + TimeAgo time.Duration `json:"time-ago" toml:"time-ago"` + BackupTS uint64 `json:"backup-ts" toml:"backup-ts"` + LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"` + GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"` + CompressionType kvproto.CompressionType `json:"compression-type" toml:"compression-type"` } // DefineBackupFlags defines common flags for the backup command. @@ -60,6 +62,8 @@ func DefineBackupFlags(flags *pflag.FlagSet) { flags.String(flagBackupTS, "", "the backup ts support TSO or datetime,"+ " e.g. '400036290571534337', '2018-05-11 01:42:23'") flags.Int64(flagGCTTL, backup.DefaultBRGCSafePointTTL, "the TTL (in seconds) that PD holds for BR's GC safepoint") + flags.String(flagCompressionType, "zstd", + "backup sst file compression algorithm, value can be one of 'lz4|zstd|snappy'") } // ParseFromFlags parses the backup-related flags from the flag set. @@ -90,6 +94,16 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error { } cfg.GCTTL = gcTTL + compressionStr, err := flags.GetString(flagCompressionType) + if err != nil { + return errors.Trace(err) + } + compressionType, err := parseCompressionType(compressionStr) + if err != nil { + return errors.Trace(err) + } + cfg.CompressionType = compressionType + if err = cfg.Config.ParseFromFlags(flags); err != nil { return errors.Trace(err) } @@ -149,10 +163,11 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig isIncrementalBackup := cfg.LastBackupTS > 0 req := kvproto.BackupRequest{ - StartVersion: cfg.LastBackupTS, - EndVersion: backupTS, - RateLimit: cfg.RateLimit, - Concurrency: defaultBackupConcurrency, + StartVersion: cfg.LastBackupTS, + EndVersion: backupTS, + RateLimit: cfg.RateLimit, + Concurrency: defaultBackupConcurrency, + CompressionType: cfg.CompressionType, } ranges, backupSchemas, err := backup.BuildBackupRangeAndSchema( @@ -304,3 +319,18 @@ func parseTSString(ts string) (uint64, error) { } return variable.GoTimeToTS(t1), nil } + +func parseCompressionType(s string) (kvproto.CompressionType, error) { + var ct kvproto.CompressionType + switch s { + case "lz4": + ct = kvproto.CompressionType_LZ4 + case "snappy": + ct = kvproto.CompressionType_SNAPPY + case "zstd": + ct = kvproto.CompressionType_ZSTD + default: + return kvproto.CompressionType_UNKNOWN, errors.Errorf("invalid compression type '%s'", s) + } + return ct, nil +} diff --git a/pkg/task/backup_raw.go b/pkg/task/backup_raw.go index dfc8d5f67..4d237848b 100644 --- a/pkg/task/backup_raw.go +++ b/pkg/task/backup_raw.go @@ -31,9 +31,10 @@ const ( type RawKvConfig struct { Config - StartKey []byte `json:"start-key" toml:"start-key"` - EndKey []byte `json:"end-key" toml:"end-key"` - CF string `json:"cf" toml:"cf"` + StartKey []byte `json:"start-key" toml:"start-key"` + EndKey []byte `json:"end-key" toml:"end-key"` + CF string `json:"cf" toml:"cf"` + CompressionType kvproto.CompressionType `json:"compression-type" toml:"compression-type"` } // DefineRawBackupFlags defines common flags for the backup command. @@ -42,9 +43,11 @@ func DefineRawBackupFlags(command *cobra.Command) { command.Flags().StringP(flagTiKVColumnFamily, "", "default", "backup specify cf, correspond to tikv cf") command.Flags().StringP(flagStartKey, "", "", "backup raw kv start key, key is inclusive") command.Flags().StringP(flagEndKey, "", "", "backup raw kv end key, key is exclusive") + command.Flags().String(flagCompressionType, "zstd", + "backup sst file compression algorithm, value can be one of 'lz4|zstd|snappy'") } -// ParseFromFlags parses the backup-related flags from the flag set. +// ParseFromFlags parses the raw kv backup&restore common flags from the flag set. func (cfg *RawKvConfig) ParseFromFlags(flags *pflag.FlagSet) error { format, err := flags.GetString(flagKeyFormat) if err != nil { @@ -81,6 +84,25 @@ func (cfg *RawKvConfig) ParseFromFlags(flags *pflag.FlagSet) error { return nil } +// ParseBackupConfigFromFlags parses the backup-related flags from the flag set. +func (cfg *RawKvConfig) ParseBackupConfigFromFlags(flags *pflag.FlagSet) error { + err := cfg.ParseFromFlags(flags) + if err != nil { + return err + } + + compressionStr, err := flags.GetString(flagCompressionType) + if err != nil { + return errors.Trace(err) + } + compressionType, err := parseCompressionType(compressionStr) + if err != nil { + return errors.Trace(err) + } + cfg.CompressionType = compressionType + return nil +} + // RunBackupRaw starts a backup task inside the current goroutine. func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConfig) error { defer summary.Summary(cmdName) @@ -121,12 +143,13 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf ctx, cmdName, int64(approximateRegions), !cfg.LogProgress) req := kvproto.BackupRequest{ - StartVersion: 0, - EndVersion: 0, - RateLimit: cfg.RateLimit, - Concurrency: cfg.Concurrency, - IsRawKv: true, - Cf: cfg.CF, + StartVersion: 0, + EndVersion: 0, + RateLimit: cfg.RateLimit, + Concurrency: cfg.Concurrency, + IsRawKv: true, + Cf: cfg.CF, + CompressionType: cfg.CompressionType, } files, err := client.BackupRange(ctx, backupRange.StartKey, backupRange.EndKey, req, updateCh) if err != nil {