From 18da8a41bb708503705c835dd3c60c3498a3c236 Mon Sep 17 00:00:00 2001 From: ris <79858083+RidRisR@users.noreply.github.com> Date: Tue, 23 Jul 2024 14:15:06 +0800 Subject: [PATCH] This is an automated cherry-pick of #54596 Signed-off-by: ti-chi-bot --- br/pkg/errors/errors.go | 1 + br/pkg/task/BUILD.bazel | 5 + br/pkg/task/restore.go | 186 ++++++++++++++++++++++++++++++++++++ br/pkg/task/restore_test.go | 112 ++++++++++++++++++++++ br/pkg/utils/backoff.go | 44 +++++++++ errors.toml | 5 + 6 files changed, 353 insertions(+) diff --git a/br/pkg/errors/errors.go b/br/pkg/errors/errors.go index 130569514db35..1b78a861fa668 100644 --- a/br/pkg/errors/errors.go +++ b/br/pkg/errors/errors.go @@ -97,6 +97,7 @@ var ( ErrKVClusterIDMismatch = errors.Normalize("tikv cluster ID mismatch", errors.RFCCodeText("BR:KV:ErrKVClusterIDMismatch")) ErrKVNotLeader = errors.Normalize("not leader", errors.RFCCodeText("BR:KV:ErrKVNotLeader")) ErrKVNotTiKV = errors.Normalize("storage is not tikv", errors.RFCCodeText("BR:KV:ErrNotTiKVStorage")) + ErrKVDiskFull = errors.Normalize("disk is full", errors.RFCCodeText("BR:KV:ErrKVDiskFull")) // ErrKVEpochNotMatch is the error raised when ingestion failed with "epoch // not match". This error is retryable. diff --git a/br/pkg/task/BUILD.bazel b/br/pkg/task/BUILD.bazel index a1b1b7690bcb1..cc179dddbf2d7 100644 --- a/br/pkg/task/BUILD.bazel +++ b/br/pkg/task/BUILD.bazel @@ -114,6 +114,11 @@ go_test( "//pkg/statistics/handle/util", "//pkg/tablecodec", "//pkg/util/table-filter", +<<<<<<< HEAD +======= + "@com_github_docker_go_units//:go-units", + "@com_github_gogo_protobuf//proto", +>>>>>>> d662428574e (br: disk space check follow up (#54596)) "@com_github_golang_protobuf//proto", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_kvproto//pkg/brpb", diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 239791ad1a049..05ac099e3392a 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -1130,6 +1130,192 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf return nil } +<<<<<<< HEAD +======= +func getMaxReplica(ctx context.Context, mgr *conn.Mgr) (cnt uint64, err error) { + var resp map[string]any + err = utils.WithRetry(ctx, func() error { + resp, err = mgr.GetPDHTTPClient().GetReplicateConfig(ctx) + return err + }, utils.NewPDReqBackoffer()) + if err != nil { + return 0, errors.Trace(err) + } + + key := "max-replicas" + val, ok := resp[key] + if !ok { + return 0, errors.Errorf("key %s not found in response %v", key, resp) + } + return uint64(val.(float64)), nil +} + +func getStores(ctx context.Context, mgr *conn.Mgr) (stores *http.StoresInfo, err error) { + err = utils.WithRetry(ctx, func() error { + stores, err = mgr.GetPDHTTPClient().GetStores(ctx) + return err + }, utils.NewPDReqBackoffer()) + if err != nil { + return nil, errors.Trace(err) + } + return stores, nil +} + +func EstimateTikvUsage(files []*backuppb.File, replicaCnt uint64, storeCnt uint64) uint64 { + if storeCnt == 0 { + return 0 + } + if replicaCnt > storeCnt { + replicaCnt = storeCnt + } + totalSize := uint64(0) + for _, file := range files { + totalSize += file.GetSize_() + } + log.Info("estimate tikv usage", zap.Uint64("total size", totalSize), zap.Uint64("replicaCnt", replicaCnt), zap.Uint64("store count", storeCnt)) + return totalSize * replicaCnt / storeCnt +} + +func EstimateTiflashUsage(tables []*metautil.Table, storeCnt uint64) uint64 { + if storeCnt == 0 { + return 0 + } + tiflashTotal := uint64(0) + for _, table := range tables { + if table.Info.TiFlashReplica == nil || table.Info.TiFlashReplica.Count <= 0 { + continue + } + tableBytes := uint64(0) + for _, file := range table.Files { + tableBytes += file.GetSize_() + } + tiflashTotal += tableBytes * table.Info.TiFlashReplica.Count + } + log.Info("estimate tiflash usage", zap.Uint64("total size", tiflashTotal), zap.Uint64("store count", storeCnt)) + return tiflashTotal / storeCnt +} + +func CheckStoreSpace(necessary uint64, store *http.StoreInfo) error { + available, err := units.RAMInBytes(store.Status.Available) + if err != nil { + return errors.Annotatef(berrors.ErrPDInvalidResponse, "store %d has invalid available space %s", store.Store.ID, store.Status.Available) + } + if available <= 0 { + return errors.Annotatef(berrors.ErrPDInvalidResponse, "store %d has invalid available space %s", store.Store.ID, store.Status.Available) + } + if uint64(available) < necessary { + return errors.Annotatef(berrors.ErrKVDiskFull, "store %d has no space left on device, available %s, necessary %s", + store.Store.ID, units.BytesSize(float64(available)), units.BytesSize(float64(necessary))) + } + return nil +} + +func checkDiskSpace(ctx context.Context, mgr *conn.Mgr, files []*backuppb.File, tables []*metautil.Table) error { + maxReplica, err := getMaxReplica(ctx, mgr) + if err != nil { + return errors.Trace(err) + } + stores, err := getStores(ctx, mgr) + if err != nil { + return errors.Trace(err) + } + + var tikvCnt, tiflashCnt uint64 = 0, 0 + for i := range stores.Stores { + store := &stores.Stores[i] + if engine.IsTiFlashHTTPResp(&store.Store) { + tiflashCnt += 1 + continue + } + tikvCnt += 1 + } + + // We won't need to restore more than 1800 PB data at one time, right? + preserve := func(base uint64, ratio float32) uint64 { + if base > 1000*units.PB { + return base + } + return base * uint64(ratio*10) / 10 + } + tikvUsage := preserve(EstimateTikvUsage(files, maxReplica, tikvCnt), 1.1) + tiflashUsage := preserve(EstimateTiflashUsage(tables, tiflashCnt), 1.4) + log.Info("preserved disk space", zap.Uint64("tikv", tikvUsage), zap.Uint64("tiflash", tiflashUsage)) + + err = utils.WithRetry(ctx, func() error { + stores, err = getStores(ctx, mgr) + if err != nil { + return errors.Trace(err) + } + for _, store := range stores.Stores { + if engine.IsTiFlashHTTPResp(&store.Store) { + if err := CheckStoreSpace(tiflashUsage, &store); err != nil { + return errors.Trace(err) + } + continue + } + if err := CheckStoreSpace(tikvUsage, &store); err != nil { + return errors.Trace(err) + } + } + return nil + }, utils.NewDiskCheckBackoffer()) + if err != nil { + return errors.Trace(err) + } + return nil +} + +// Exhaust drains all remaining errors in the channel, into a slice of errors. +func Exhaust(ec <-chan error) []error { + out := make([]error, 0, len(ec)) + for { + select { + case err := <-ec: + out = append(out, err) + default: + // errCh will NEVER be closed(ya see, it has multi sender-part), + // so we just consume the current backlog of this channel, then return. + return out + } + } +} + +// EstimateRangeSize estimates the total range count by file. +func EstimateRangeSize(files []*backuppb.File) int { + result := 0 + for _, f := range files { + if strings.HasSuffix(f.GetName(), "_write.sst") { + result++ + } + } + return result +} + +// MapTableToFiles makes a map that mapping table ID to its backup files. +// aware that one file can and only can hold one table. +func MapTableToFiles(files []*backuppb.File) map[int64][]*backuppb.File { + result := map[int64][]*backuppb.File{} + for _, file := range files { + tableID := tablecodec.DecodeTableID(file.GetStartKey()) + tableEndID := tablecodec.DecodeTableID(file.GetEndKey()) + if tableID != tableEndID { + log.Panic("key range spread between many files.", + zap.String("file name", file.Name), + logutil.Key("startKey", file.StartKey), + logutil.Key("endKey", file.EndKey)) + } + if tableID == 0 { + log.Panic("invalid table key of file", + zap.String("file name", file.Name), + logutil.Key("startKey", file.StartKey), + logutil.Key("endKey", file.EndKey)) + } + result[tableID] = append(result[tableID], file) + } + return result +} + +>>>>>>> d662428574e (br: disk space check follow up (#54596)) // dropToBlackhole drop all incoming tables into black hole, // i.e. don't execute checksum, just increase the process anyhow. func dropToBlackhole( diff --git a/br/pkg/task/restore_test.go b/br/pkg/task/restore_test.go index 9301c1ea88f39..ed6a297c1781f 100644 --- a/br/pkg/task/restore_test.go +++ b/br/pkg/task/restore_test.go @@ -8,7 +8,12 @@ import ( "fmt" "testing" +<<<<<<< HEAD "github.com/golang/protobuf/proto" +======= + "github.com/docker/go-units" + "github.com/gogo/protobuf/proto" +>>>>>>> d662428574e (br: disk space check follow up (#54596)) backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/encryptionpb" "github.com/pingcap/kvproto/pkg/metapb" @@ -25,9 +30,13 @@ import ( "google.golang.org/grpc/keepalive" ) +<<<<<<< HEAD func TestRestoreConfigAdjust(t *testing.T) { cfg := &RestoreConfig{} cfg.Adjust() +======= +const pb uint64 = units.PB +>>>>>>> d662428574e (br: disk space check follow up (#54596)) require.Equal(t, uint32(defaultRestoreConcurrency), cfg.Config.Concurrency) require.Equal(t, defaultSwitchInterval, cfg.Config.SwitchModeInterval) @@ -272,3 +281,106 @@ func mockBackupMeta(mockSchemas []*backuppb.Schema, mockFiles []*backuppb.File) Schemas: mockSchemas, } } +<<<<<<< HEAD +======= + +func TestCheckDDLJobByRules(t *testing.T) { + ddlJobs := []*model.Job{ + { + Type: model.ActionSetTiFlashReplica, + }, + { + Type: model.ActionAddPrimaryKey, + }, + { + Type: model.ActionUpdateTiFlashReplicaStatus, + }, + { + Type: model.ActionCreateTable, + }, + { + Type: model.ActionLockTable, + }, + { + Type: model.ActionAddIndex, + }, + { + Type: model.ActionUnlockTable, + }, + { + Type: model.ActionCreateSchema, + }, + { + Type: model.ActionModifyColumn, + }, + { + Type: model.ActionReorganizePartition, + }, + } + + filteredDDlJobs := task.FilterDDLJobByRules(ddlJobs, task.DDLJobLogIncrementalCompactBlockListRule) + + expectedDDLTypes := []model.ActionType{ + model.ActionSetTiFlashReplica, + model.ActionAddPrimaryKey, + model.ActionUpdateTiFlashReplicaStatus, + model.ActionCreateTable, + model.ActionLockTable, + model.ActionUnlockTable, + model.ActionCreateSchema, + } + + require.Equal(t, len(expectedDDLTypes), len(filteredDDlJobs)) + expectedDDLJobs := make([]*model.Job, 0, len(expectedDDLTypes)) + for i, ddlJob := range filteredDDlJobs { + assert.Equal(t, expectedDDLTypes[i], ddlJob.Type) + expectedDDLJobs = append(expectedDDLJobs, ddlJob) + } + + require.NoError(t, task.CheckDDLJobByRules(expectedDDLJobs, task.DDLJobLogIncrementalCompactBlockListRule)) + require.Error(t, task.CheckDDLJobByRules(ddlJobs, task.DDLJobLogIncrementalCompactBlockListRule)) +} + +// NOTICE: Once there is a new backfilled type ddl, BR needs to ensure that it is correctly cover by the rules: +func TestMonitorTheIncrementalUnsupportDDLType(t *testing.T) { + require.Equal(t, int(5), ddl.BackupFillerTypeCount()) +} + +func TestTikvUsage(t *testing.T) { + files := []*backuppb.File{ + {Name: "F1", Size_: 1 * pb}, + {Name: "F2", Size_: 2 * pb}, + {Name: "F3", Size_: 3 * pb}, + {Name: "F4", Size_: 4 * pb}, + {Name: "F5", Size_: 5 * pb}, + } + replica := uint64(3) + storeCnt := uint64(6) + total := uint64(0) + for _, f := range files { + total += f.GetSize_() + } + ret := task.EstimateTikvUsage(files, replica, storeCnt) + require.Equal(t, total*replica/storeCnt, ret) +} + +func TestTiflashUsage(t *testing.T) { + tables := []*metautil.Table{ + {Info: &model.TableInfo{TiFlashReplica: &model.TiFlashReplicaInfo{Count: 0}}, + Files: []*backuppb.File{{Size_: 1 * pb}}}, + {Info: &model.TableInfo{TiFlashReplica: &model.TiFlashReplicaInfo{Count: 1}}, + Files: []*backuppb.File{{Size_: 2 * pb}}}, + {Info: &model.TableInfo{TiFlashReplica: &model.TiFlashReplicaInfo{Count: 2}}, + Files: []*backuppb.File{{Size_: 3 * pb}}}, + } + + var storeCnt uint64 = 3 + ret := task.EstimateTiflashUsage(tables, storeCnt) + require.Equal(t, 8*pb/3, ret) +} + +func TestCheckTikvSpace(t *testing.T) { + store := pdhttp.StoreInfo{Store: pdhttp.MetaStore{ID: 1}, Status: pdhttp.StoreStatus{Available: "500PB"}} + require.NoError(t, task.CheckStoreSpace(400*pb, &store)) +} +>>>>>>> d662428574e (br: disk space check follow up (#54596)) diff --git a/br/pkg/utils/backoff.go b/br/pkg/utils/backoff.go index f669b07a5125f..0aa19c9d000c4 100644 --- a/br/pkg/utils/backoff.go +++ b/br/pkg/utils/backoff.go @@ -277,3 +277,47 @@ func (bo *pdReqBackoffer) NextBackoff(err error) time.Duration { func (bo *pdReqBackoffer) Attempt() int { return bo.attempt } +<<<<<<< HEAD +======= + +type DiskCheckBackoffer struct { + attempt int + delayTime time.Duration + maxDelayTime time.Duration +} + +func NewDiskCheckBackoffer() Backoffer { + return &DiskCheckBackoffer{ + attempt: resetTSRetryTime, + delayTime: resetTSWaitInterval, + maxDelayTime: resetTSMaxWaitInterval, + } +} + +func (bo *DiskCheckBackoffer) NextBackoff(err error) time.Duration { + e := errors.Cause(err) + switch e { // nolint:errorlint + case nil, context.Canceled, context.DeadlineExceeded, berrors.ErrKVDiskFull: + bo.delayTime = 0 + bo.attempt = 0 + case berrors.ErrPDInvalidResponse: + bo.delayTime = 2 * bo.delayTime + bo.attempt-- + default: + bo.delayTime = 2 * bo.delayTime + if bo.attempt > 5 { + bo.attempt = 5 + } + bo.attempt-- + } + + if bo.delayTime > bo.maxDelayTime { + return bo.maxDelayTime + } + return bo.delayTime +} + +func (bo *DiskCheckBackoffer) Attempt() int { + return bo.attempt +} +>>>>>>> d662428574e (br: disk space check follow up (#54596)) diff --git a/errors.toml b/errors.toml index f635ce34de165..9f4289289e4af 100644 --- a/errors.toml +++ b/errors.toml @@ -111,6 +111,11 @@ error = ''' tikv cluster ID mismatch ''' +["BR:KV:ErrKVDiskFull"] +error = ''' +disk is full +''' + ["BR:KV:ErrKVDownloadFailed"] error = ''' download sst failed