Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

backup: support explicitly set sst compression type #404

Merged
merged 10 commits into from
Jul 9, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func runBackupCommand(command *cobra.Command, cmdName string) error {

func runBackupRawCommand(command *cobra.Command, cmdName string) error {
cfg := task.RawKvConfig{Config: task.Config{LogProgress: HasLogFile()}}
if err := cfg.ParseFromFlags(command.Flags()); err != nil {
if err := cfg.ParseBackupConfigFromFlags(command.Flags()); err != nil {
command.SilenceUsage = false
return err
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011
github.com/pingcap/failpoint v0.0.0-20200603062251-b230c36c413c
github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29
github.com/pingcap/kvproto v0.0.0-20200706115936-1e0910aabe6c
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad
github.com/pingcap/parser v0.0.0-20200609110328-c65941b9fbb3
github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200520083007-2c251bd8f181
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,8 @@ github.com/pingcap/kvproto v0.0.0-20200424032552-6650270c39c3/go.mod h1:IOdRDPLy
github.com/pingcap/kvproto v0.0.0-20200428135407-0f5ffe459677/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29 h1:NpW1OuYrIl+IQrSsVbtyHpHpazmSCHy+ysrOixY0xY4=
github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200706115936-1e0910aabe6c h1:VnLpCAxMAeDxc7HXTetwDQB+/MtDQjHAOBsd4QnGVwA=
github.com/pingcap/kvproto v0.0.0-20200706115936-1e0910aabe6c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd h1:CV3VsP3Z02MVtdpTMfEgRJ4T9NGgGTxdHpJerent7rM=
Expand Down
25 changes: 14 additions & 11 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,8 +527,8 @@ func (bc *Client) BackupRange(
// Find and backup remaining ranges.
// TODO: test fine grained backup.
err = bc.fineGrainedBackup(
ctx, startKey, endKey, req.StartVersion,
req.EndVersion, req.RateLimit, req.Concurrency, results, updateCh)
ctx, startKey, endKey, req.StartVersion, req.EndVersion, req.CompressionType,
req.RateLimit, req.Concurrency, results, updateCh)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -588,6 +588,7 @@ func (bc *Client) fineGrainedBackup(
startKey, endKey []byte,
lastBackupTS uint64,
backupTS uint64,
compressType kvproto.CompressionType,
rateLimit uint64,
concurrency uint32,
rangeTree rtree.RangeTree,
Expand Down Expand Up @@ -618,7 +619,7 @@ func (bc *Client) fineGrainedBackup(
defer wg.Done()
for rg := range retry {
backoffMs, err :=
bc.handleFineGrained(ctx, boFork, rg, lastBackupTS, backupTS, rateLimit, concurrency, respCh)
bc.handleFineGrained(ctx, boFork, rg, lastBackupTS, backupTS, compressType, rateLimit, concurrency, respCh)
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -752,6 +753,7 @@ func (bc *Client) handleFineGrained(
rg rtree.Range,
lastBackupTS uint64,
backupTS uint64,
compressType kvproto.CompressionType,
rateLimit uint64,
concurrency uint32,
respCh chan<- *kvproto.BackupResponse,
Expand All @@ -764,14 +766,15 @@ func (bc *Client) handleFineGrained(
max := 0

req := kvproto.BackupRequest{
ClusterId: bc.clusterID,
StartKey: rg.StartKey, // TODO: the range may cross region.
EndKey: rg.EndKey,
StartVersion: lastBackupTS,
EndVersion: backupTS,
StorageBackend: bc.backend,
RateLimit: rateLimit,
Concurrency: concurrency,
ClusterId: bc.clusterID,
StartKey: rg.StartKey, // TODO: the range may cross region.
EndKey: rg.EndKey,
StartVersion: lastBackupTS,
EndVersion: backupTS,
StorageBackend: bc.backend,
RateLimit: rateLimit,
Concurrency: concurrency,
CompressionType: compressType,
}
lockResolver := bc.mgr.GetLockResolver()
client, err := bc.mgr.GetBackupClient(ctx, storeID)
Expand Down
53 changes: 42 additions & 11 deletions pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ import (
)

const (
flagBackupTimeago = "timeago"
flagBackupTS = "backupts"
flagLastBackupTS = "lastbackupts"
flagBackupTimeago = "timeago"
flagBackupTS = "backupts"
flagLastBackupTS = "lastbackupts"
flagCompressionType = "compression-type"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
flagCompressionType = "compression-type"
flagCompressionType = "compression"


flagGCTTL = "gcttl"

Expand All @@ -42,10 +43,11 @@ const (
type BackupConfig struct {
Config

TimeAgo time.Duration `json:"time-ago" toml:"time-ago"`
BackupTS uint64 `json:"backup-ts" toml:"backup-ts"`
LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"`
GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"`
TimeAgo time.Duration `json:"time-ago" toml:"time-ago"`
BackupTS uint64 `json:"backup-ts" toml:"backup-ts"`
LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"`
GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"`
CompressionType kvproto.CompressionType `json:"compression-type" toml:"compression-type"`
}

// DefineBackupFlags defines common flags for the backup command.
Expand All @@ -60,6 +62,7 @@ func DefineBackupFlags(flags *pflag.FlagSet) {
flags.String(flagBackupTS, "", "the backup ts support TSO or datetime,"+
" e.g. '400036290571534337', '2018-05-11 01:42:23'")
flags.Int64(flagGCTTL, backup.DefaultBRGCSafePointTTL, "the TTL (in seconds) that PD holds for BR's GC safepoint")
flags.String(flagCompressionType, "", "backup sst file compression algorithm, value can be one of 'lz4|zstd|snappy'")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

provide a default value?

Suggested change
flags.String(flagCompressionType, "", "backup sst file compression algorithm, value can be one of 'lz4|zstd|snappy'")
flags.String(flagCompressionType, "zstd", "backup sst file compression algorithm, value can be one of 'lz4|zstd|snappy'")

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the default value should be default, which means the compression type will be determined by tikv. In currently logic, tikv will choose the fastest algorithm and in most case it will be 'lz4'.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the default is called "unknown" though

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e, so we should change the name in kv-proto to default?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should provide a validate default value, "zstd" looks good to me.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Then I think we can set default to lz4 because it's the current compression type tikv uses. So if user don't set this flag, the behavior is same with previous version, so it's compatible will old version

}

// ParseFromFlags parses the backup-related flags from the flag set.
Expand Down Expand Up @@ -90,6 +93,16 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error {
}
cfg.GCTTL = gcTTL

compressionStr, err := flags.GetString(flagCompressionType)
if err != nil {
return errors.Trace(err)
}
compressionType, err := parseCompressionType(compressionStr)
if err != nil {
return errors.Trace(err)
}
cfg.CompressionType = compressionType

if err = cfg.Config.ParseFromFlags(flags); err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -140,10 +153,11 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
isIncrementalBackup := cfg.LastBackupTS > 0

req := kvproto.BackupRequest{
StartVersion: cfg.LastBackupTS,
EndVersion: backupTS,
RateLimit: cfg.RateLimit,
Concurrency: defaultBackupConcurrency,
StartVersion: cfg.LastBackupTS,
EndVersion: backupTS,
RateLimit: cfg.RateLimit,
Concurrency: defaultBackupConcurrency,
CompressionType: cfg.CompressionType,
}

ranges, backupSchemas, err := backup.BuildBackupRangeAndSchema(
Expand Down Expand Up @@ -295,3 +309,20 @@ func parseTSString(ts string) (uint64, error) {
}
return variable.GoTimeToTS(t1), nil
}

func parseCompressionType(s string) (kvproto.CompressionType, error) {
var ct kvproto.CompressionType
switch s {
case "lz4":
ct = kvproto.CompressionType_LZ4
case "snappy":
ct = kvproto.CompressionType_SNAPPY
case "zstd":
ct = kvproto.CompressionType_ZSTD
case "":
ct = kvproto.CompressionType_UNKNOWN
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
case "":
ct = kvproto.CompressionType_UNKNOWN

default:
return kvproto.CompressionType_UNKNOWN, errors.Errorf("invalid compression type '%s'", s)
}
return ct, nil
}
43 changes: 33 additions & 10 deletions pkg/task/backup_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ const (
type RawKvConfig struct {
Config

StartKey []byte `json:"start-key" toml:"start-key"`
EndKey []byte `json:"end-key" toml:"end-key"`
CF string `json:"cf" toml:"cf"`
StartKey []byte `json:"start-key" toml:"start-key"`
EndKey []byte `json:"end-key" toml:"end-key"`
CF string `json:"cf" toml:"cf"`
CompressionType kvproto.CompressionType `json:"compression-type" toml:"compression-type"`
}

// DefineRawBackupFlags defines common flags for the backup command.
Expand All @@ -42,9 +43,11 @@ func DefineRawBackupFlags(command *cobra.Command) {
command.Flags().StringP(flagTiKVColumnFamily, "", "default", "backup specify cf, correspond to tikv cf")
command.Flags().StringP(flagStartKey, "", "", "backup raw kv start key, key is inclusive")
command.Flags().StringP(flagEndKey, "", "", "backup raw kv end key, key is exclusive")
command.Flags().String(flagCompressionType, "",
"backup sst file compression algorithm, value can be one of 'lz4|zstd|snappy'")
}

// ParseFromFlags parses the backup-related flags from the flag set.
// ParseFromFlags parses the raw kv backup&restore common flags from the flag set.
func (cfg *RawKvConfig) ParseFromFlags(flags *pflag.FlagSet) error {
format, err := flags.GetString(flagKeyFormat)
if err != nil {
Expand Down Expand Up @@ -81,6 +84,25 @@ func (cfg *RawKvConfig) ParseFromFlags(flags *pflag.FlagSet) error {
return nil
}

// ParseBackupConfigFromFlags parses the backup-related flags from the flag set.
func (cfg *RawKvConfig) ParseBackupConfigFromFlags(flags *pflag.FlagSet) error {
err := cfg.ParseFromFlags(flags)
if err != nil {
return err
}

compressionStr, err := flags.GetString(flagCompressionType)
if err != nil {
return errors.Trace(err)
}
compressionType, err := parseCompressionType(compressionStr)
if err != nil {
return errors.Trace(err)
}
cfg.CompressionType = compressionType
return nil
}

// RunBackupRaw starts a backup task inside the current goroutine.
func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConfig) error {
defer summary.Summary(cmdName)
Expand Down Expand Up @@ -121,12 +143,13 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
ctx, cmdName, int64(approximateRegions), !cfg.LogProgress)

req := kvproto.BackupRequest{
StartVersion: 0,
EndVersion: 0,
RateLimit: cfg.RateLimit,
Concurrency: cfg.Concurrency,
IsRawKv: true,
Cf: cfg.CF,
StartVersion: 0,
EndVersion: 0,
RateLimit: cfg.RateLimit,
Concurrency: cfg.Concurrency,
IsRawKv: true,
Cf: cfg.CF,
CompressionType: cfg.CompressionType,
}
files, err := client.BackupRange(ctx, backupRange.StartKey, backupRange.EndKey, req, updateCh)
if err != nil {
Expand Down