Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#1076
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
Relax4Life authored and ti-chi-bot committed May 21, 2021
1 parent 2602675 commit 7460321
Show file tree
Hide file tree
Showing 16 changed files with 219 additions and 34 deletions.
4 changes: 2 additions & 2 deletions cmd/br/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func newFullRestoreCommand() *cobra.Command {
func newDBRestoreCommand() *cobra.Command {
command := &cobra.Command{
Use: "db",
Short: "restore tables in a database",
Short: "restore tables in a database from the backup data",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, _ []string) error {
return runRestoreCommand(cmd, "Database restore")
Expand All @@ -140,7 +140,7 @@ func newDBRestoreCommand() *cobra.Command {
func newTableRestoreCommand() *cobra.Command {
command := &cobra.Command{
Use: "table",
Short: "restore a table",
Short: "restore a table from the backup data",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, _ []string) error {
return runRestoreCommand(cmd, "Table restore")
Expand Down
17 changes: 11 additions & 6 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ error = '''
invalid argument
'''

["BR:Common:ErrUndefinedDbOrTable"]
error = '''
undefined restore databases or tables
'''

["BR:Common:ErrUnknown"]
error = '''
internal error
Expand Down Expand Up @@ -76,11 +81,6 @@ error = '''
key not in region
'''

["BR:KV:ErrKVNotHealth"]
error = '''
tikv cluster not health
'''

["BR:KV:ErrKVNotLeader"]
error = '''
not leader
Expand All @@ -96,9 +96,14 @@ error = '''
rewrite rule not found
'''

["BR:KV:ErrKVStorage"]
error = '''
tikv storage occur I/O error
'''

["BR:KV:ErrKVUnknown"]
error = '''
unknown tikv error
unknown error occur on tikv
'''

["BR:KV:ErrNotTiKVStorage"]
Expand Down
2 changes: 1 addition & 1 deletion pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,7 @@ func ChecksumMatches(backupMeta *backuppb.BackupMeta, local []Checksum) error {
zap.Uint64("origin tidb total bytes", schema.TotalBytes),
zap.Uint64("calculated total bytes", localChecksum.TotalBytes))
// TODO enhance error
return errors.Annotate(berrors.ErrBackupChecksumMismatch, "failed in checksum, and cannot parse table info")
return berrors.ErrBackupChecksumMismatch
}
log.Info("checksum success",
zap.String("database", dbInfo.Name.L),
Expand Down
49 changes: 43 additions & 6 deletions pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package backup

import (
"context"
"fmt"
"sync"

"github.com/opentracing/opentracing-go"
Expand All @@ -25,15 +26,28 @@ import (
// pushDown wraps a backup task.
type pushDown struct {
mgr ClientMgr
respCh chan *backuppb.BackupResponse
respCh chan responseAndStore
errCh chan error
}

type responseAndStore struct {
Resp *backuppb.BackupResponse
Store *metapb.Store
}

func (r responseAndStore) GetResponse() *backuppb.BackupResponse {
return r.Resp
}

func (r responseAndStore) GetStore() *metapb.Store {
return r.Store
}

// newPushDown creates a push down backup.
func newPushDown(mgr ClientMgr, cap int) *pushDown {
return &pushDown{
mgr: mgr,
respCh: make(chan *backuppb.BackupResponse, cap),
respCh: make(chan responseAndStore, cap),
errCh: make(chan error, cap),
}
}
Expand All @@ -60,6 +74,7 @@ func (push *pushDown) pushBackup(

wg := new(sync.WaitGroup)
for _, s := range stores {
store := s
storeID := s.GetId()
if s.GetState() != metapb.StoreState_Up {
log.Warn("skip store", zap.Uint64("StoreID", storeID), zap.Stringer("State", s.GetState()))
Expand All @@ -79,7 +94,10 @@ func (push *pushDown) pushBackup(
ctx, storeID, client, req,
func(resp *backuppb.BackupResponse) error {
// Forward all responses (including error).
push.respCh <- resp
push.respCh <- responseAndStore{
Resp: resp,
Store: store,
}
return nil
},
func() (backuppb.BackupClient, error) {
Expand All @@ -102,7 +120,9 @@ func (push *pushDown) pushBackup(

for {
select {
case resp, ok := <-push.respCh:
case respAndStore, ok := <-push.respCh:
resp := respAndStore.GetResponse()
store := respAndStore.GetStore()
if !ok {
// Finished.
return res, nil
Expand All @@ -114,6 +134,13 @@ func (push *pushDown) pushBackup(
Msg: msg,
}
})
failpoint.Inject("tikv-rw-error", func(val failpoint.Value) {
msg := val.(string)
log.Debug("failpoint tikv-rw-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
Msg: msg,
}
})
if resp.GetError() == nil {
// None error means range has been backuped successfully.
res.Put(
Expand All @@ -138,8 +165,18 @@ func (push *pushDown) pushBackup(
log.Warn("backup occur storage error", zap.String("error", errPb.GetMsg()))
continue
}
log.Error("backup occur unknown error", zap.String("error", errPb.GetMsg()))
return res, errors.Annotatef(berrors.ErrKVUnknown, "%v", errPb)
if utils.MessageIsNotFoundStorageError(errPb.GetMsg()) {
errMsg := fmt.Sprintf("File or directory not found error occurs on TiKV Node(store id: %v; Address: %s)", store.GetId(), redact.String(store.GetAddress()))
log.Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg),
zap.String("work around", "please ensure br and tikv node share a same disk and the user of br and tikv has same uid."))
}

if utils.MessageIsPermissionDeniedStorageError(errPb.GetMsg()) {
errMsg := fmt.Sprintf("I/O permission denied error occurs on TiKV Node(store id: %v; Address: %s)", store.GetId(), redact.String(store.GetAddress()))
log.Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg),
zap.String("work around", "please ensure tikv has permission to read from & write to the storage."))
}
return res, berrors.ErrKVStorage
}
}
case err := <-push.errCh:
Expand Down
6 changes: 0 additions & 6 deletions pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,6 @@ func NewMgr(
}
liveStoreCount++
}
if liveStoreCount == 0 &&
// Assume 3 replicas
len(stores) >= 3 && len(stores) > liveStoreCount+1 {
log.Error("tikv cluster not health", zap.Reflect("stores", stores))
return nil, errors.Annotatef(berrors.ErrKVNotHealth, "%+v", stores)
}

var dom *domain.Domain
if needDomain {
Expand Down
13 changes: 7 additions & 6 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ func Is(err error, is *errors.Error) bool {

// BR errors.
var (
ErrUnknown = errors.Normalize("internal error", errors.RFCCodeText("BR:Common:ErrUnknown"))
ErrInvalidArgument = errors.Normalize("invalid argument", errors.RFCCodeText("BR:Common:ErrInvalidArgument"))
ErrVersionMismatch = errors.Normalize("version mismatch", errors.RFCCodeText("BR:Common:ErrVersionMismatch"))
ErrFailedToConnect = errors.Normalize("failed to make gRPC channels", errors.RFCCodeText("BR:Common:ErrFailedToConnect"))
ErrUnknown = errors.Normalize("internal error", errors.RFCCodeText("BR:Common:ErrUnknown"))
ErrInvalidArgument = errors.Normalize("invalid argument", errors.RFCCodeText("BR:Common:ErrInvalidArgument"))
ErrUndefinedRestoreDbOrTable = errors.Normalize("undefined restore databases or tables", errors.RFCCodeText("BR:Common:ErrUndefinedDbOrTable"))
ErrVersionMismatch = errors.Normalize("version mismatch", errors.RFCCodeText("BR:Common:ErrVersionMismatch"))
ErrFailedToConnect = errors.Normalize("failed to make gRPC channels", errors.RFCCodeText("BR:Common:ErrFailedToConnect"))

ErrPDUpdateFailed = errors.Normalize("failed to update PD", errors.RFCCodeText("BR:PD:ErrPDUpdateFailed"))
ErrPDLeaderNotFound = errors.Normalize("PD leader not found", errors.RFCCodeText("BR:PD:ErrPDLeaderNotFound"))
Expand Down Expand Up @@ -53,9 +54,9 @@ var (
ErrStorageInvalidConfig = errors.Normalize("invalid external storage config", errors.RFCCodeText("BR:ExternalStorage:ErrStorageInvalidConfig"))

// Errors reported from TiKV.
ErrKVUnknown = errors.Normalize("unknown tikv error", errors.RFCCodeText("BR:KV:ErrKVUnknown"))
ErrKVStorage = errors.Normalize("tikv storage occur I/O error", errors.RFCCodeText("BR:KV:ErrKVStorage"))
ErrKVUnknown = errors.Normalize("unknown error occur on tikv", errors.RFCCodeText("BR:KV:ErrKVUnknown"))
ErrKVClusterIDMismatch = errors.Normalize("tikv cluster ID mismatch", errors.RFCCodeText("BR:KV:ErrKVClusterIDMismatch"))
ErrKVNotHealth = errors.Normalize("tikv cluster not health", errors.RFCCodeText("BR:KV:ErrKVNotHealth"))
ErrKVNotLeader = errors.Normalize("not leader", errors.RFCCodeText("BR:KV:ErrKVNotLeader"))
ErrKVNotTiKV = errors.Normalize("storage is not tikv", errors.RFCCodeText("BR:KV:ErrNotTiKVStorage"))

Expand Down
2 changes: 1 addition & 1 deletion pkg/gluetikv/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (Glue) StartProgress(ctx context.Context, cmdName string, total int64, redi

// Record implements glue.Glue.
func (Glue) Record(name string, val uint64) {
summary.CollectUint(name, val)
summary.CollectSuccessUnit(name, 1, val)
}

// GetVersion implements glue.Glue.
Expand Down
24 changes: 23 additions & 1 deletion pkg/summary/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ const (
TotalKV = "total kv"
// TotalBytes is a field we collect during backup/restore
TotalBytes = "total bytes"
// BackupDataSize is a field we collect after backup finish
BackupDataSize = "backup data size(after compressed)"
// RestoreDataSize is a field we collection after restore finish
RestoreDataSize = "restore data size(after decompressed)"
)

// LogCollector collects infos into summary log.
Expand Down Expand Up @@ -200,10 +204,28 @@ func (tc *logCollector) Summary(name string) {
for name, data := range tc.successData {
if name == TotalBytes {
logFields = append(logFields,
zap.String("data-size", units.HumanSize(float64(data))),
zap.String("total-kv-size", units.HumanSize(float64(data))),
zap.String("average-speed", units.HumanSize(float64(data)/totalCost.Seconds())+"/s"))
continue
}
if name == BackupDataSize {
if tc.failureUnitCount+tc.successUnitCount == 0 {
logFields = append(logFields, zap.String("Result", "Nothing to bakcup"))
} else {
logFields = append(logFields,
zap.String(BackupDataSize, units.HumanSize(float64(data))))
}
continue
}
if name == RestoreDataSize {
if tc.failureUnitCount+tc.successUnitCount == 0 {
logFields = append(logFields, zap.String("Result", "Nothing to restore"))
} else {
logFields = append(logFields,
zap.String(RestoreDataSize, units.HumanSize(float64(data))))
}
continue
}
logFields = append(logFields, zap.Uint64(logKeyFor(name), data))
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
return errors.Trace(err)
}

g.Record("Size", utils.ArchiveSize(&backupMeta))
g.Record(summary.BackupDataSize, utils.ArchiveSize(&backupMeta))

// Set task summary to success status.
summary.SetSuccessStatus(true)
Expand Down
2 changes: 1 addition & 1 deletion pkg/task/backup_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
return errors.Trace(err)
}

g.Record("Size", utils.ArchiveSize(&backupMeta))
g.Record(summary.BackupDataSize, utils.ArchiveSize(&backupMeta))

// Set task summary to success status.
summary.SetSuccessStatus(true)
Expand Down
9 changes: 9 additions & 0 deletions pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
berrors "github.com/pingcap/br/pkg/errors"
"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/br/pkg/utils"
)

const (
Expand Down Expand Up @@ -130,6 +131,10 @@ type Config struct {
// EnableOpenTracing is whether to enable opentracing
EnableOpenTracing bool `json:"enable-opentracing" toml:"enable-opentracing"`
SwitchModeInterval time.Duration `json:"switch-mode-interval" toml:"switch-mode-interval"`
// Schemas is a database name set, to check whether the restore database has been backup
Schemas map[string]struct{}
// Tables is a table name set, to check whether the restore table has been backup
Tables map[string]struct{}

// GrpcKeepaliveTime is the interval of pinging the server.
GRPCKeepaliveTime time.Duration `json:"grpc-keepalive-time" toml:"grpc-keepalive-time"`
Expand Down Expand Up @@ -270,6 +275,8 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
}
cfg.RateLimit = rateLimit * rateLimitUnit

cfg.Schemas = make(map[string]struct{})
cfg.Tables = make(map[string]struct{})
var caseSensitive bool
if filterFlag := flags.Lookup(flagFilter); filterFlag != nil {
var f filter.Filter
Expand All @@ -287,11 +294,13 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
if len(db) == 0 {
return errors.Annotate(berrors.ErrInvalidArgument, "empty database name is not allowed")
}
cfg.Schemas[utils.EncloseName(db)] = struct{}{}
if tblFlag := flags.Lookup(flagTable); tblFlag != nil {
tbl := tblFlag.Value.String()
if len(tbl) == 0 {
return errors.Annotate(berrors.ErrInvalidArgument, "empty table name is not allowed")
}
cfg.Tables[utils.EncloseDBAndTable(db, tbl)] = struct{}{}
cfg.TableFilter = filter.NewTablesFilter(filter.Table{
Schema: db,
Name: tbl,
Expand Down
46 changes: 44 additions & 2 deletions pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,37 @@ func (cfg *RestoreConfig) adjustRestoreConfig() {
}
}

// CheckRestoreDBAndTable is used to check whether the restore dbs or tables have been backup
func CheckRestoreDBAndTable(client *restore.Client, cfg *RestoreConfig) error {
if len(cfg.Schemas) == 0 && len(cfg.Tables) == 0 {
return nil
}
schemas := client.GetDatabases()
schemasMap := make(map[string]struct{})
tablesMap := make(map[string]struct{})
for _, db := range schemas {
schemasMap[utils.EncloseName(db.Info.Name.O)] = struct{}{}
for _, table := range db.Tables {
tablesMap[utils.EncloseDBAndTable(db.Info.Name.O, table.Info.Name.O)] = struct{}{}
}
}
restoreSchemas := cfg.Schemas
restoreTables := cfg.Tables
for schema := range restoreSchemas {
if _, ok := schemasMap[schema]; !ok {
return errors.Annotatef(berrors.ErrUndefinedRestoreDbOrTable,
"[database: %v] has not been backup, please ensure you has input a correct database name", schema)
}
}
for table := range restoreTables {
if _, ok := tablesMap[table]; !ok {
return errors.Annotatef(berrors.ErrUndefinedRestoreDbOrTable,
"[table: %v] has not been backup, please ensure you has input a correct table name", table)
}
}
return nil
}

// RunRestore starts a restore task inside the current goroutine.
func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error {
cfg.adjustRestoreConfig()
Expand Down Expand Up @@ -203,7 +234,17 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
if err != nil {
return errors.Trace(err)
}
<<<<<<< HEAD
g.Record("Size", utils.ArchiveSize(backupMeta))
=======
g.Record(summary.RestoreDataSize, utils.ArchiveSize(backupMeta))
backupVersion := version.NormalizeBackupVersion(backupMeta.ClusterVersion)
if cfg.CheckRequirements && backupVersion != nil {
if versionErr := version.CheckClusterVersion(ctx, mgr.GetPDClient(), version.CheckVersionForBackup(backupVersion)); versionErr != nil {
return errors.Trace(versionErr)
}
}
>>>>>>> 3c853124 (err_msg: clarify some ambiguous error message (#1076))

if err = client.InitBackupMeta(backupMeta, u); err != nil {
return errors.Trace(err)
Expand All @@ -212,7 +253,9 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
if client.IsRawKvMode() {
return errors.Annotate(berrors.ErrRestoreModeMismatch, "cannot do transactional restore from raw kv data")
}

if err = CheckRestoreDBAndTable(client, cfg); err != nil {
return err
}
files, tables, dbs := filterRestoreFiles(client, cfg)
if len(dbs) == 0 && len(tables) != 0 {
return errors.Annotate(berrors.ErrRestoreInvalidBackup, "contain tables but no databases")
Expand Down Expand Up @@ -422,7 +465,6 @@ func filterRestoreFiles(
if !cfg.TableFilter.MatchTable(db.Info.Name.O, table.Info.Name.O) {
continue
}

if !createdDatabase {
dbs = append(dbs, db)
createdDatabase = true
Expand Down
Loading

0 comments on commit 7460321

Please sign in to comment.