Skip to content

Commit

Permalink
backup: support explicitly set sst compression type (pingcap#404)
Browse files Browse the repository at this point in the history
* add compression type

* fix

* apply compresssion

* fix lint

* fix restore flag

* provide default value for flag compression

* change default backup compression type from 'lz4' to 'zstd' in favor of better performance

* fix a log bug
  • Loading branch information
glorv authored and Hillium committed Jul 15, 2020
1 parent 00ea20a commit 36c3204
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 35 deletions.
2 changes: 1 addition & 1 deletion cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func runBackupCommand(command *cobra.Command, cmdName string) error {

func runBackupRawCommand(command *cobra.Command, cmdName string) error {
cfg := task.RawKvConfig{Config: task.Config{LogProgress: HasLogFile()}}
if err := cfg.ParseFromFlags(command.Flags()); err != nil {
if err := cfg.ParseBackupConfigFromFlags(command.Flags()); err != nil {
command.SilenceUsage = false
return err
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011
github.com/pingcap/failpoint v0.0.0-20200603062251-b230c36c413c
github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29
github.com/pingcap/kvproto v0.0.0-20200706115936-1e0910aabe6c
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad
github.com/pingcap/parser v0.0.0-20200603032439-c4ecb4508d2f
github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200520083007-2c251bd8f181
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,8 @@ github.com/pingcap/kvproto v0.0.0-20200423020121-038e31959c2a/go.mod h1:IOdRDPLy
github.com/pingcap/kvproto v0.0.0-20200424032552-6650270c39c3/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29 h1:NpW1OuYrIl+IQrSsVbtyHpHpazmSCHy+ysrOixY0xY4=
github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200706115936-1e0910aabe6c h1:VnLpCAxMAeDxc7HXTetwDQB+/MtDQjHAOBsd4QnGVwA=
github.com/pingcap/kvproto v0.0.0-20200706115936-1e0910aabe6c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd h1:CV3VsP3Z02MVtdpTMfEgRJ4T9NGgGTxdHpJerent7rM=
Expand Down
27 changes: 15 additions & 12 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,8 +503,8 @@ func (bc *Client) BackupRange(
// Find and backup remaining ranges.
// TODO: test fine grained backup.
err = bc.fineGrainedBackup(
ctx, startKey, endKey, req.StartVersion,
req.EndVersion, req.RateLimit, req.Concurrency, results, updateCh)
ctx, startKey, endKey, req.StartVersion, req.EndVersion, req.CompressionType,
req.RateLimit, req.Concurrency, results, updateCh)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -564,6 +564,7 @@ func (bc *Client) fineGrainedBackup(
startKey, endKey []byte,
lastBackupTS uint64,
backupTS uint64,
compressType kvproto.CompressionType,
rateLimit uint64,
concurrency uint32,
rangeTree rtree.RangeTree,
Expand Down Expand Up @@ -594,7 +595,7 @@ func (bc *Client) fineGrainedBackup(
defer wg.Done()
for rg := range retry {
backoffMs, err :=
bc.handleFineGrained(ctx, boFork, rg, lastBackupTS, backupTS, rateLimit, concurrency, respCh)
bc.handleFineGrained(ctx, boFork, rg, lastBackupTS, backupTS, compressType, rateLimit, concurrency, respCh)
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -728,6 +729,7 @@ func (bc *Client) handleFineGrained(
rg rtree.Range,
lastBackupTS uint64,
backupTS uint64,
compressType kvproto.CompressionType,
rateLimit uint64,
concurrency uint32,
respCh chan<- *kvproto.BackupResponse,
Expand All @@ -740,14 +742,15 @@ func (bc *Client) handleFineGrained(
max := 0

req := kvproto.BackupRequest{
ClusterId: bc.clusterID,
StartKey: rg.StartKey, // TODO: the range may cross region.
EndKey: rg.EndKey,
StartVersion: lastBackupTS,
EndVersion: backupTS,
StorageBackend: bc.backend,
RateLimit: rateLimit,
Concurrency: concurrency,
ClusterId: bc.clusterID,
StartKey: rg.StartKey, // TODO: the range may cross region.
EndKey: rg.EndKey,
StartVersion: lastBackupTS,
EndVersion: backupTS,
StorageBackend: bc.backend,
RateLimit: rateLimit,
Concurrency: concurrency,
CompressionType: compressType,
}
lockResolver := bc.mgr.GetLockResolver()
client, err := bc.mgr.GetBackupClient(ctx, storeID)
Expand Down Expand Up @@ -812,7 +815,7 @@ func SendBackup(
// TODO: handle errors in the resp.
log.Info("range backuped",
zap.Stringer("StartKey", utils.WrapKey(resp.GetStartKey())),
zap.Stringer("EndKey", utils.WrapKey(req.GetEndKey())))
zap.Stringer("EndKey", utils.WrapKey(resp.GetEndKey())))
err = respFn(resp)
if err != nil {
return err
Expand Down
52 changes: 41 additions & 11 deletions pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ import (
)

const (
flagBackupTimeago = "timeago"
flagBackupTS = "backupts"
flagLastBackupTS = "lastbackupts"
flagBackupTimeago = "timeago"
flagBackupTS = "backupts"
flagLastBackupTS = "lastbackupts"
flagCompressionType = "compression"

flagGCTTL = "gcttl"

Expand All @@ -42,10 +43,11 @@ const (
type BackupConfig struct {
Config

TimeAgo time.Duration `json:"time-ago" toml:"time-ago"`
BackupTS uint64 `json:"backup-ts" toml:"backup-ts"`
LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"`
GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"`
TimeAgo time.Duration `json:"time-ago" toml:"time-ago"`
BackupTS uint64 `json:"backup-ts" toml:"backup-ts"`
LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"`
GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"`
CompressionType kvproto.CompressionType `json:"compression-type" toml:"compression-type"`
}

// DefineBackupFlags defines common flags for the backup command.
Expand All @@ -60,6 +62,8 @@ func DefineBackupFlags(flags *pflag.FlagSet) {
flags.String(flagBackupTS, "", "the backup ts support TSO or datetime,"+
" e.g. '400036290571534337', '2018-05-11 01:42:23'")
flags.Int64(flagGCTTL, backup.DefaultBRGCSafePointTTL, "the TTL (in seconds) that PD holds for BR's GC safepoint")
flags.String(flagCompressionType, "zstd",
"backup sst file compression algorithm, value can be one of 'lz4|zstd|snappy'")
}

// ParseFromFlags parses the backup-related flags from the flag set.
Expand Down Expand Up @@ -90,6 +94,16 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error {
}
cfg.GCTTL = gcTTL

compressionStr, err := flags.GetString(flagCompressionType)
if err != nil {
return errors.Trace(err)
}
compressionType, err := parseCompressionType(compressionStr)
if err != nil {
return errors.Trace(err)
}
cfg.CompressionType = compressionType

if err = cfg.Config.ParseFromFlags(flags); err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -149,10 +163,11 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
isIncrementalBackup := cfg.LastBackupTS > 0

req := kvproto.BackupRequest{
StartVersion: cfg.LastBackupTS,
EndVersion: backupTS,
RateLimit: cfg.RateLimit,
Concurrency: defaultBackupConcurrency,
StartVersion: cfg.LastBackupTS,
EndVersion: backupTS,
RateLimit: cfg.RateLimit,
Concurrency: defaultBackupConcurrency,
CompressionType: cfg.CompressionType,
}

ranges, backupSchemas, err := backup.BuildBackupRangeAndSchema(
Expand Down Expand Up @@ -304,3 +319,18 @@ func parseTSString(ts string) (uint64, error) {
}
return variable.GoTimeToTS(t1), nil
}

func parseCompressionType(s string) (kvproto.CompressionType, error) {
var ct kvproto.CompressionType
switch s {
case "lz4":
ct = kvproto.CompressionType_LZ4
case "snappy":
ct = kvproto.CompressionType_SNAPPY
case "zstd":
ct = kvproto.CompressionType_ZSTD
default:
return kvproto.CompressionType_UNKNOWN, errors.Errorf("invalid compression type '%s'", s)
}
return ct, nil
}
43 changes: 33 additions & 10 deletions pkg/task/backup_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ const (
type RawKvConfig struct {
Config

StartKey []byte `json:"start-key" toml:"start-key"`
EndKey []byte `json:"end-key" toml:"end-key"`
CF string `json:"cf" toml:"cf"`
StartKey []byte `json:"start-key" toml:"start-key"`
EndKey []byte `json:"end-key" toml:"end-key"`
CF string `json:"cf" toml:"cf"`
CompressionType kvproto.CompressionType `json:"compression-type" toml:"compression-type"`
}

// DefineRawBackupFlags defines common flags for the backup command.
Expand All @@ -42,9 +43,11 @@ func DefineRawBackupFlags(command *cobra.Command) {
command.Flags().StringP(flagTiKVColumnFamily, "", "default", "backup specify cf, correspond to tikv cf")
command.Flags().StringP(flagStartKey, "", "", "backup raw kv start key, key is inclusive")
command.Flags().StringP(flagEndKey, "", "", "backup raw kv end key, key is exclusive")
command.Flags().String(flagCompressionType, "zstd",
"backup sst file compression algorithm, value can be one of 'lz4|zstd|snappy'")
}

// ParseFromFlags parses the backup-related flags from the flag set.
// ParseFromFlags parses the raw kv backup&restore common flags from the flag set.
func (cfg *RawKvConfig) ParseFromFlags(flags *pflag.FlagSet) error {
format, err := flags.GetString(flagKeyFormat)
if err != nil {
Expand Down Expand Up @@ -81,6 +84,25 @@ func (cfg *RawKvConfig) ParseFromFlags(flags *pflag.FlagSet) error {
return nil
}

// ParseBackupConfigFromFlags parses the backup-related flags from the flag set.
func (cfg *RawKvConfig) ParseBackupConfigFromFlags(flags *pflag.FlagSet) error {
err := cfg.ParseFromFlags(flags)
if err != nil {
return err
}

compressionStr, err := flags.GetString(flagCompressionType)
if err != nil {
return errors.Trace(err)
}
compressionType, err := parseCompressionType(compressionStr)
if err != nil {
return errors.Trace(err)
}
cfg.CompressionType = compressionType
return nil
}

// RunBackupRaw starts a backup task inside the current goroutine.
func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConfig) error {
defer summary.Summary(cmdName)
Expand Down Expand Up @@ -121,12 +143,13 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
ctx, cmdName, int64(approximateRegions), !cfg.LogProgress)

req := kvproto.BackupRequest{
StartVersion: 0,
EndVersion: 0,
RateLimit: cfg.RateLimit,
Concurrency: cfg.Concurrency,
IsRawKv: true,
Cf: cfg.CF,
StartVersion: 0,
EndVersion: 0,
RateLimit: cfg.RateLimit,
Concurrency: cfg.Concurrency,
IsRawKv: true,
Cf: cfg.CF,
CompressionType: cfg.CompressionType,
}
files, err := client.BackupRange(ctx, backupRange.StartKey, backupRange.EndKey, req, updateCh)
if err != nil {
Expand Down

0 comments on commit 36c3204

Please sign in to comment.