Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

err_msg: clarify some ambiguous error message #1076

Merged
merged 24 commits into from
May 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/br/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func newFullRestoreCommand() *cobra.Command {
func newDBRestoreCommand() *cobra.Command {
command := &cobra.Command{
Use: "db",
Short: "restore tables in a database",
Short: "restore tables in a database from the backup data",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, _ []string) error {
return runRestoreCommand(cmd, "Database restore")
Expand All @@ -140,7 +140,7 @@ func newDBRestoreCommand() *cobra.Command {
func newTableRestoreCommand() *cobra.Command {
command := &cobra.Command{
Use: "table",
Short: "restore a table",
Short: "restore a table from the backup data",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, _ []string) error {
return runRestoreCommand(cmd, "Table restore")
Expand Down
17 changes: 11 additions & 6 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ error = '''
invalid argument
'''

["BR:Common:ErrUndefinedDbOrTable"]
error = '''
undefined restore databases or tables
'''

["BR:Common:ErrUnknown"]
error = '''
internal error
Expand Down Expand Up @@ -76,11 +81,6 @@ error = '''
key not in region
'''

["BR:KV:ErrKVNotHealth"]
error = '''
tikv cluster not health
'''

["BR:KV:ErrKVNotLeader"]
error = '''
not leader
Expand All @@ -96,9 +96,14 @@ error = '''
rewrite rule not found
'''

["BR:KV:ErrKVStorage"]
error = '''
tikv storage occur I/O error
'''

["BR:KV:ErrKVUnknown"]
error = '''
unknown tikv error
unknown error occur on tikv
'''

["BR:KV:ErrNotTiKVStorage"]
Expand Down
2 changes: 1 addition & 1 deletion pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@ func ChecksumMatches(backupMeta *backuppb.BackupMeta, local []Checksum) error {
zap.Uint64("origin tidb total bytes", schema.TotalBytes),
zap.Uint64("calculated total bytes", localChecksum.TotalBytes))
// TODO enhance error
return errors.Annotate(berrors.ErrBackupChecksumMismatch, "failed in checksum, and cannot parse table info")
return berrors.ErrBackupChecksumMismatch
}
log.Info("checksum success",
zap.String("database", dbInfo.Name.L),
Expand Down
49 changes: 43 additions & 6 deletions pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package backup

import (
"context"
"fmt"
"sync"

"github.com/opentracing/opentracing-go"
Expand All @@ -24,15 +25,28 @@ import (
// pushDown wraps a backup task.
type pushDown struct {
mgr ClientMgr
respCh chan *backuppb.BackupResponse
respCh chan responseAndStore
errCh chan error
}

type responseAndStore struct {
Resp *backuppb.BackupResponse
Store *metapb.Store
}

func (r responseAndStore) GetResponse() *backuppb.BackupResponse {
return r.Resp
}

func (r responseAndStore) GetStore() *metapb.Store {
return r.Store
}

// newPushDown creates a push down backup.
func newPushDown(mgr ClientMgr, cap int) *pushDown {
return &pushDown{
mgr: mgr,
respCh: make(chan *backuppb.BackupResponse, cap),
respCh: make(chan responseAndStore, cap),
errCh: make(chan error, cap),
}
}
Expand All @@ -59,6 +73,7 @@ func (push *pushDown) pushBackup(

wg := new(sync.WaitGroup)
for _, s := range stores {
store := s
storeID := s.GetId()
if s.GetState() != metapb.StoreState_Up {
log.Warn("skip store", zap.Uint64("StoreID", storeID), zap.Stringer("State", s.GetState()))
Expand All @@ -78,7 +93,10 @@ func (push *pushDown) pushBackup(
ctx, storeID, client, req,
func(resp *backuppb.BackupResponse) error {
// Forward all responses (including error).
push.respCh <- resp
push.respCh <- responseAndStore{
Resp: resp,
Store: store,
}
return nil
},
func() (backuppb.BackupClient, error) {
Expand All @@ -101,7 +119,9 @@ func (push *pushDown) pushBackup(

for {
select {
case resp, ok := <-push.respCh:
case respAndStore, ok := <-push.respCh:
resp := respAndStore.GetResponse()
store := respAndStore.GetStore()
if !ok {
// Finished.
return res, nil
Expand All @@ -113,6 +133,13 @@ func (push *pushDown) pushBackup(
Msg: msg,
}
})
failpoint.Inject("tikv-rw-error", func(val failpoint.Value) {
msg := val.(string)
log.Debug("failpoint tikv-rw-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
Msg: msg,
}
})
if resp.GetError() == nil {
// None error means range has been backuped successfully.
res.Put(
Expand All @@ -137,8 +164,18 @@ func (push *pushDown) pushBackup(
log.Warn("backup occur storage error", zap.String("error", errPb.GetMsg()))
continue
}
log.Error("backup occur unknown error", zap.String("error", errPb.GetMsg()))
return res, errors.Annotatef(berrors.ErrKVUnknown, "%v", errPb)
if utils.MessageIsNotFoundStorageError(errPb.GetMsg()) {
errMsg := fmt.Sprintf("File or directory not found error occurs on TiKV Node(store id: %v; Address: %s)", store.GetId(), redact.String(store.GetAddress()))
log.Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg),
zap.String("work around", "please ensure br and tikv node share a same disk and the user of br and tikv has same uid."))
}

if utils.MessageIsPermissionDeniedStorageError(errPb.GetMsg()) {
errMsg := fmt.Sprintf("I/O permission denied error occurs on TiKV Node(store id: %v; Address: %s)", store.GetId(), redact.String(store.GetAddress()))
log.Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg),
zap.String("work around", "please ensure tikv has permission to read from & write to the storage."))
}
return res, berrors.ErrKVStorage
}
}
case err := <-push.errCh:
Expand Down
6 changes: 0 additions & 6 deletions pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,6 @@ func NewMgr(
}
liveStoreCount++
}
if liveStoreCount == 0 &&
// Assume 3 replicas
len(stores) >= 3 && len(stores) > liveStoreCount+1 {
log.Error("tikv cluster not health", zap.Reflect("stores", stores))
return nil, errors.Annotatef(berrors.ErrKVNotHealth, "%+v", stores)
}

var dom *domain.Domain
if needDomain {
Expand Down
13 changes: 7 additions & 6 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ func Is(err error, is *errors.Error) bool {

// BR errors.
var (
ErrUnknown = errors.Normalize("internal error", errors.RFCCodeText("BR:Common:ErrUnknown"))
ErrInvalidArgument = errors.Normalize("invalid argument", errors.RFCCodeText("BR:Common:ErrInvalidArgument"))
ErrVersionMismatch = errors.Normalize("version mismatch", errors.RFCCodeText("BR:Common:ErrVersionMismatch"))
ErrFailedToConnect = errors.Normalize("failed to make gRPC channels", errors.RFCCodeText("BR:Common:ErrFailedToConnect"))
ErrUnknown = errors.Normalize("internal error", errors.RFCCodeText("BR:Common:ErrUnknown"))
ErrInvalidArgument = errors.Normalize("invalid argument", errors.RFCCodeText("BR:Common:ErrInvalidArgument"))
ErrUndefinedRestoreDbOrTable = errors.Normalize("undefined restore databases or tables", errors.RFCCodeText("BR:Common:ErrUndefinedDbOrTable"))
ErrVersionMismatch = errors.Normalize("version mismatch", errors.RFCCodeText("BR:Common:ErrVersionMismatch"))
ErrFailedToConnect = errors.Normalize("failed to make gRPC channels", errors.RFCCodeText("BR:Common:ErrFailedToConnect"))

ErrPDUpdateFailed = errors.Normalize("failed to update PD", errors.RFCCodeText("BR:PD:ErrPDUpdateFailed"))
ErrPDLeaderNotFound = errors.Normalize("PD leader not found", errors.RFCCodeText("BR:PD:ErrPDLeaderNotFound"))
Expand Down Expand Up @@ -54,9 +55,9 @@ var (
ErrStorageInvalidConfig = errors.Normalize("invalid external storage config", errors.RFCCodeText("BR:ExternalStorage:ErrStorageInvalidConfig"))

// Errors reported from TiKV.
ErrKVUnknown = errors.Normalize("unknown tikv error", errors.RFCCodeText("BR:KV:ErrKVUnknown"))
ErrKVStorage = errors.Normalize("tikv storage occur I/O error", errors.RFCCodeText("BR:KV:ErrKVStorage"))
ErrKVUnknown = errors.Normalize("unknown error occur on tikv", errors.RFCCodeText("BR:KV:ErrKVUnknown"))
ErrKVClusterIDMismatch = errors.Normalize("tikv cluster ID mismatch", errors.RFCCodeText("BR:KV:ErrKVClusterIDMismatch"))
ErrKVNotHealth = errors.Normalize("tikv cluster not health", errors.RFCCodeText("BR:KV:ErrKVNotHealth"))
ErrKVNotLeader = errors.Normalize("not leader", errors.RFCCodeText("BR:KV:ErrKVNotLeader"))
ErrKVNotTiKV = errors.Normalize("storage is not tikv", errors.RFCCodeText("BR:KV:ErrNotTiKVStorage"))

Expand Down
2 changes: 1 addition & 1 deletion pkg/gluetikv/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (Glue) StartProgress(ctx context.Context, cmdName string, total int64, redi

// Record implements glue.Glue.
func (Glue) Record(name string, val uint64) {
summary.CollectUint(name, val)
summary.CollectSuccessUnit(name, 1, val)
}

// GetVersion implements glue.Glue.
Expand Down
24 changes: 23 additions & 1 deletion pkg/summary/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ const (
TotalKV = "total kv"
// TotalBytes is a field we collect during backup/restore
TotalBytes = "total bytes"
// BackupDataSize is a field we collect after backup finish
BackupDataSize = "backup data size(after compressed)"
// RestoreDataSize is a field we collection after restore finish
RestoreDataSize = "restore data size(after decompressed)"
)

// LogCollector collects infos into summary log.
Expand Down Expand Up @@ -200,10 +204,28 @@ func (tc *logCollector) Summary(name string) {
for name, data := range tc.successData {
if name == TotalBytes {
logFields = append(logFields,
zap.String("data-size", units.HumanSize(float64(data))),
zap.String("total-kv-size", units.HumanSize(float64(data))),
zap.String("average-speed", units.HumanSize(float64(data)/totalCost.Seconds())+"/s"))
continue
}
if name == BackupDataSize {
if tc.failureUnitCount+tc.successUnitCount == 0 {
logFields = append(logFields, zap.String("Result", "Nothing to bakcup"))
} else {
logFields = append(logFields,
zap.String(BackupDataSize, units.HumanSize(float64(data))))
}
continue
}
if name == RestoreDataSize {
if tc.failureUnitCount+tc.successUnitCount == 0 {
logFields = append(logFields, zap.String("Result", "Nothing to restore"))
} else {
logFields = append(logFields,
zap.String(RestoreDataSize, units.HumanSize(float64(data))))
}
continue
}
logFields = append(logFields, zap.Uint64(logKeyFor(name), data))
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
return errors.Trace(err)
}

g.Record("Size", utils.ArchiveSize(&backupMeta))
3pointer marked this conversation as resolved.
Show resolved Hide resolved
g.Record(summary.BackupDataSize, utils.ArchiveSize(&backupMeta))

// Set task summary to success status.
summary.SetSuccessStatus(true)
Expand Down
2 changes: 1 addition & 1 deletion pkg/task/backup_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
return errors.Trace(err)
}

g.Record("Size", utils.ArchiveSize(&backupMeta))
g.Record(summary.BackupDataSize, utils.ArchiveSize(&backupMeta))

// Set task summary to success status.
summary.SetSuccessStatus(true)
Expand Down
9 changes: 9 additions & 0 deletions pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
berrors "github.com/pingcap/br/pkg/errors"
"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/br/pkg/utils"
)

const (
Expand Down Expand Up @@ -139,6 +140,10 @@ type Config struct {

TableFilter filter.Filter `json:"-" toml:"-"`
SwitchModeInterval time.Duration `json:"switch-mode-interval" toml:"switch-mode-interval"`
// Schemas is a database name set, to check whether the restore database has been backup
Schemas map[string]struct{}
// Tables is a table name set, to check whether the restore table has been backup
Tables map[string]struct{}

// GrpcKeepaliveTime is the interval of pinging the server.
GRPCKeepaliveTime time.Duration `json:"grpc-keepalive-time" toml:"grpc-keepalive-time"`
Expand Down Expand Up @@ -280,6 +285,8 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
}
cfg.RateLimit = rateLimit * rateLimitUnit

cfg.Schemas = make(map[string]struct{})
cfg.Tables = make(map[string]struct{})
var caseSensitive bool
if filterFlag := flags.Lookup(flagFilter); filterFlag != nil {
var f filter.Filter
Expand All @@ -297,11 +304,13 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
if len(db) == 0 {
return errors.Annotate(berrors.ErrInvalidArgument, "empty database name is not allowed")
}
cfg.Schemas[utils.EncloseName(db)] = struct{}{}
if tblFlag := flags.Lookup(flagTable); tblFlag != nil {
tbl := tblFlag.Value.String()
if len(tbl) == 0 {
return errors.Annotate(berrors.ErrInvalidArgument, "empty table name is not allowed")
}
cfg.Tables[utils.EncloseDBAndTable(db, tbl)] = struct{}{}
cfg.TableFilter = filter.NewTablesFilter(filter.Table{
Schema: db,
Name: tbl,
Expand Down
38 changes: 35 additions & 3 deletions pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,37 @@ func (cfg *RestoreConfig) adjustRestoreConfig() {
}
}

// CheckRestoreDBAndTable is used to check whether the restore dbs or tables have been backup
func CheckRestoreDBAndTable(client *restore.Client, cfg *RestoreConfig) error {
if len(cfg.Schemas) == 0 && len(cfg.Tables) == 0 {
return nil
}
schemas := client.GetDatabases()
schemasMap := make(map[string]struct{})
tablesMap := make(map[string]struct{})
for _, db := range schemas {
schemasMap[utils.EncloseName(db.Info.Name.O)] = struct{}{}
for _, table := range db.Tables {
tablesMap[utils.EncloseDBAndTable(db.Info.Name.O, table.Info.Name.O)] = struct{}{}
}
}
restoreSchemas := cfg.Schemas
restoreTables := cfg.Tables
for schema := range restoreSchemas {
if _, ok := schemasMap[schema]; !ok {
return errors.Annotatef(berrors.ErrUndefinedRestoreDbOrTable,
"[database: %v] has not been backup, please ensure you has input a correct database name", schema)
}
}
for table := range restoreTables {
if _, ok := tablesMap[table]; !ok {
return errors.Annotatef(berrors.ErrUndefinedRestoreDbOrTable,
"[table: %v] has not been backup, please ensure you has input a correct table name", table)
}
}
return nil
}

// RunRestore starts a restore task inside the current goroutine.
func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error {
cfg.adjustRestoreConfig()
Expand Down Expand Up @@ -210,7 +241,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
if err != nil {
return errors.Trace(err)
}
g.Record("Size", utils.ArchiveSize(backupMeta))
3pointer marked this conversation as resolved.
Show resolved Hide resolved
g.Record(summary.RestoreDataSize, utils.ArchiveSize(backupMeta))
backupVersion := version.NormalizeBackupVersion(backupMeta.ClusterVersion)
if cfg.CheckRequirements && backupVersion != nil {
if versionErr := version.CheckClusterVersion(ctx, mgr.GetPDClient(), version.CheckVersionForBackup(backupVersion)); versionErr != nil {
Expand All @@ -225,7 +256,9 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
if client.IsRawKvMode() {
return errors.Annotate(berrors.ErrRestoreModeMismatch, "cannot do transactional restore from raw kv data")
}

if err = CheckRestoreDBAndTable(client, cfg); err != nil {
return err
}
files, tables, dbs := filterRestoreFiles(client, cfg)
if len(dbs) == 0 && len(tables) != 0 {
return errors.Annotate(berrors.ErrRestoreInvalidBackup, "contain tables but no databases")
Expand Down Expand Up @@ -439,7 +472,6 @@ func filterRestoreFiles(
if !cfg.TableFilter.MatchTable(db.Info.Name.O, table.Info.Name.O) {
continue
}

if !createdDatabase {
dbs = append(dbs, db)
createdDatabase = true
Expand Down
2 changes: 1 addition & 1 deletion pkg/task/restore_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR
if err != nil {
return errors.Trace(err)
}
g.Record("Size", utils.ArchiveSize(backupMeta))
g.Record(summary.RestoreDataSize, utils.ArchiveSize(backupMeta))
if err = client.InitBackupMeta(backupMeta, u); err != nil {
return errors.Trace(err)
}
Expand Down
Loading