Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#54596
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
RidRisR authored and ti-chi-bot committed Jul 29, 2024
1 parent 42b624c commit 18da8a4
Show file tree
Hide file tree
Showing 6 changed files with 353 additions and 0 deletions.
1 change: 1 addition & 0 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ var (
ErrKVClusterIDMismatch = errors.Normalize("tikv cluster ID mismatch", errors.RFCCodeText("BR:KV:ErrKVClusterIDMismatch"))
ErrKVNotLeader = errors.Normalize("not leader", errors.RFCCodeText("BR:KV:ErrKVNotLeader"))
ErrKVNotTiKV = errors.Normalize("storage is not tikv", errors.RFCCodeText("BR:KV:ErrNotTiKVStorage"))
ErrKVDiskFull = errors.Normalize("disk is full", errors.RFCCodeText("BR:KV:ErrKVDiskFull"))

// ErrKVEpochNotMatch is the error raised when ingestion failed with "epoch
// not match". This error is retryable.
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/task/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ go_test(
"//pkg/statistics/handle/util",
"//pkg/tablecodec",
"//pkg/util/table-filter",
<<<<<<< HEAD
=======
"@com_github_docker_go_units//:go-units",
"@com_github_gogo_protobuf//proto",
>>>>>>> d662428574e (br: disk space check follow up (#54596))
"@com_github_golang_protobuf//proto",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/brpb",
Expand Down
186 changes: 186 additions & 0 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,192 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
return nil
}

<<<<<<< HEAD
=======
func getMaxReplica(ctx context.Context, mgr *conn.Mgr) (cnt uint64, err error) {
var resp map[string]any
err = utils.WithRetry(ctx, func() error {
resp, err = mgr.GetPDHTTPClient().GetReplicateConfig(ctx)
return err
}, utils.NewPDReqBackoffer())
if err != nil {
return 0, errors.Trace(err)
}

key := "max-replicas"
val, ok := resp[key]
if !ok {
return 0, errors.Errorf("key %s not found in response %v", key, resp)
}
return uint64(val.(float64)), nil
}

func getStores(ctx context.Context, mgr *conn.Mgr) (stores *http.StoresInfo, err error) {
err = utils.WithRetry(ctx, func() error {
stores, err = mgr.GetPDHTTPClient().GetStores(ctx)
return err
}, utils.NewPDReqBackoffer())
if err != nil {
return nil, errors.Trace(err)
}
return stores, nil
}

func EstimateTikvUsage(files []*backuppb.File, replicaCnt uint64, storeCnt uint64) uint64 {
if storeCnt == 0 {
return 0
}
if replicaCnt > storeCnt {
replicaCnt = storeCnt
}
totalSize := uint64(0)
for _, file := range files {
totalSize += file.GetSize_()
}
log.Info("estimate tikv usage", zap.Uint64("total size", totalSize), zap.Uint64("replicaCnt", replicaCnt), zap.Uint64("store count", storeCnt))
return totalSize * replicaCnt / storeCnt
}

func EstimateTiflashUsage(tables []*metautil.Table, storeCnt uint64) uint64 {
if storeCnt == 0 {
return 0
}
tiflashTotal := uint64(0)
for _, table := range tables {
if table.Info.TiFlashReplica == nil || table.Info.TiFlashReplica.Count <= 0 {
continue
}
tableBytes := uint64(0)
for _, file := range table.Files {
tableBytes += file.GetSize_()
}
tiflashTotal += tableBytes * table.Info.TiFlashReplica.Count
}
log.Info("estimate tiflash usage", zap.Uint64("total size", tiflashTotal), zap.Uint64("store count", storeCnt))
return tiflashTotal / storeCnt
}

func CheckStoreSpace(necessary uint64, store *http.StoreInfo) error {
available, err := units.RAMInBytes(store.Status.Available)
if err != nil {
return errors.Annotatef(berrors.ErrPDInvalidResponse, "store %d has invalid available space %s", store.Store.ID, store.Status.Available)
}
if available <= 0 {
return errors.Annotatef(berrors.ErrPDInvalidResponse, "store %d has invalid available space %s", store.Store.ID, store.Status.Available)
}
if uint64(available) < necessary {
return errors.Annotatef(berrors.ErrKVDiskFull, "store %d has no space left on device, available %s, necessary %s",
store.Store.ID, units.BytesSize(float64(available)), units.BytesSize(float64(necessary)))
}
return nil
}

func checkDiskSpace(ctx context.Context, mgr *conn.Mgr, files []*backuppb.File, tables []*metautil.Table) error {
maxReplica, err := getMaxReplica(ctx, mgr)
if err != nil {
return errors.Trace(err)
}
stores, err := getStores(ctx, mgr)
if err != nil {
return errors.Trace(err)
}

var tikvCnt, tiflashCnt uint64 = 0, 0
for i := range stores.Stores {
store := &stores.Stores[i]
if engine.IsTiFlashHTTPResp(&store.Store) {
tiflashCnt += 1
continue
}
tikvCnt += 1
}

// We won't need to restore more than 1800 PB data at one time, right?
preserve := func(base uint64, ratio float32) uint64 {
if base > 1000*units.PB {
return base
}
return base * uint64(ratio*10) / 10
}
tikvUsage := preserve(EstimateTikvUsage(files, maxReplica, tikvCnt), 1.1)
tiflashUsage := preserve(EstimateTiflashUsage(tables, tiflashCnt), 1.4)
log.Info("preserved disk space", zap.Uint64("tikv", tikvUsage), zap.Uint64("tiflash", tiflashUsage))

err = utils.WithRetry(ctx, func() error {
stores, err = getStores(ctx, mgr)
if err != nil {
return errors.Trace(err)
}
for _, store := range stores.Stores {
if engine.IsTiFlashHTTPResp(&store.Store) {
if err := CheckStoreSpace(tiflashUsage, &store); err != nil {
return errors.Trace(err)
}
continue
}
if err := CheckStoreSpace(tikvUsage, &store); err != nil {
return errors.Trace(err)
}
}
return nil
}, utils.NewDiskCheckBackoffer())
if err != nil {
return errors.Trace(err)
}
return nil
}

// Exhaust drains all remaining errors in the channel, into a slice of errors.
func Exhaust(ec <-chan error) []error {
out := make([]error, 0, len(ec))
for {
select {
case err := <-ec:
out = append(out, err)
default:
// errCh will NEVER be closed(ya see, it has multi sender-part),
// so we just consume the current backlog of this channel, then return.
return out
}
}
}

// EstimateRangeSize estimates the total range count by file.
func EstimateRangeSize(files []*backuppb.File) int {
result := 0
for _, f := range files {
if strings.HasSuffix(f.GetName(), "_write.sst") {
result++
}
}
return result
}

// MapTableToFiles makes a map that mapping table ID to its backup files.
// aware that one file can and only can hold one table.
func MapTableToFiles(files []*backuppb.File) map[int64][]*backuppb.File {
result := map[int64][]*backuppb.File{}
for _, file := range files {
tableID := tablecodec.DecodeTableID(file.GetStartKey())
tableEndID := tablecodec.DecodeTableID(file.GetEndKey())
if tableID != tableEndID {
log.Panic("key range spread between many files.",
zap.String("file name", file.Name),
logutil.Key("startKey", file.StartKey),
logutil.Key("endKey", file.EndKey))
}
if tableID == 0 {
log.Panic("invalid table key of file",
zap.String("file name", file.Name),
logutil.Key("startKey", file.StartKey),
logutil.Key("endKey", file.EndKey))
}
result[tableID] = append(result[tableID], file)
}
return result
}

>>>>>>> d662428574e (br: disk space check follow up (#54596))
// dropToBlackhole drop all incoming tables into black hole,
// i.e. don't execute checksum, just increase the process anyhow.
func dropToBlackhole(
Expand Down
112 changes: 112 additions & 0 deletions br/pkg/task/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@ import (
"fmt"
"testing"

<<<<<<< HEAD
"github.com/golang/protobuf/proto"
=======
"github.com/docker/go-units"
"github.com/gogo/protobuf/proto"
>>>>>>> d662428574e (br: disk space check follow up (#54596))
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/encryptionpb"
"github.com/pingcap/kvproto/pkg/metapb"
Expand All @@ -25,9 +30,13 @@ import (
"google.golang.org/grpc/keepalive"
)

<<<<<<< HEAD
func TestRestoreConfigAdjust(t *testing.T) {
cfg := &RestoreConfig{}
cfg.Adjust()
=======
const pb uint64 = units.PB
>>>>>>> d662428574e (br: disk space check follow up (#54596))

require.Equal(t, uint32(defaultRestoreConcurrency), cfg.Config.Concurrency)
require.Equal(t, defaultSwitchInterval, cfg.Config.SwitchModeInterval)
Expand Down Expand Up @@ -272,3 +281,106 @@ func mockBackupMeta(mockSchemas []*backuppb.Schema, mockFiles []*backuppb.File)
Schemas: mockSchemas,
}
}
<<<<<<< HEAD
=======

func TestCheckDDLJobByRules(t *testing.T) {
ddlJobs := []*model.Job{
{
Type: model.ActionSetTiFlashReplica,
},
{
Type: model.ActionAddPrimaryKey,
},
{
Type: model.ActionUpdateTiFlashReplicaStatus,
},
{
Type: model.ActionCreateTable,
},
{
Type: model.ActionLockTable,
},
{
Type: model.ActionAddIndex,
},
{
Type: model.ActionUnlockTable,
},
{
Type: model.ActionCreateSchema,
},
{
Type: model.ActionModifyColumn,
},
{
Type: model.ActionReorganizePartition,
},
}

filteredDDlJobs := task.FilterDDLJobByRules(ddlJobs, task.DDLJobLogIncrementalCompactBlockListRule)

expectedDDLTypes := []model.ActionType{
model.ActionSetTiFlashReplica,
model.ActionAddPrimaryKey,
model.ActionUpdateTiFlashReplicaStatus,
model.ActionCreateTable,
model.ActionLockTable,
model.ActionUnlockTable,
model.ActionCreateSchema,
}

require.Equal(t, len(expectedDDLTypes), len(filteredDDlJobs))
expectedDDLJobs := make([]*model.Job, 0, len(expectedDDLTypes))
for i, ddlJob := range filteredDDlJobs {
assert.Equal(t, expectedDDLTypes[i], ddlJob.Type)
expectedDDLJobs = append(expectedDDLJobs, ddlJob)
}

require.NoError(t, task.CheckDDLJobByRules(expectedDDLJobs, task.DDLJobLogIncrementalCompactBlockListRule))
require.Error(t, task.CheckDDLJobByRules(ddlJobs, task.DDLJobLogIncrementalCompactBlockListRule))
}

// NOTICE: Once there is a new backfilled type ddl, BR needs to ensure that it is correctly cover by the rules:
func TestMonitorTheIncrementalUnsupportDDLType(t *testing.T) {
require.Equal(t, int(5), ddl.BackupFillerTypeCount())
}

func TestTikvUsage(t *testing.T) {
files := []*backuppb.File{
{Name: "F1", Size_: 1 * pb},
{Name: "F2", Size_: 2 * pb},
{Name: "F3", Size_: 3 * pb},
{Name: "F4", Size_: 4 * pb},
{Name: "F5", Size_: 5 * pb},
}
replica := uint64(3)
storeCnt := uint64(6)
total := uint64(0)
for _, f := range files {
total += f.GetSize_()
}
ret := task.EstimateTikvUsage(files, replica, storeCnt)
require.Equal(t, total*replica/storeCnt, ret)
}

func TestTiflashUsage(t *testing.T) {
tables := []*metautil.Table{
{Info: &model.TableInfo{TiFlashReplica: &model.TiFlashReplicaInfo{Count: 0}},
Files: []*backuppb.File{{Size_: 1 * pb}}},
{Info: &model.TableInfo{TiFlashReplica: &model.TiFlashReplicaInfo{Count: 1}},
Files: []*backuppb.File{{Size_: 2 * pb}}},
{Info: &model.TableInfo{TiFlashReplica: &model.TiFlashReplicaInfo{Count: 2}},
Files: []*backuppb.File{{Size_: 3 * pb}}},
}

var storeCnt uint64 = 3
ret := task.EstimateTiflashUsage(tables, storeCnt)
require.Equal(t, 8*pb/3, ret)
}

func TestCheckTikvSpace(t *testing.T) {
store := pdhttp.StoreInfo{Store: pdhttp.MetaStore{ID: 1}, Status: pdhttp.StoreStatus{Available: "500PB"}}
require.NoError(t, task.CheckStoreSpace(400*pb, &store))
}
>>>>>>> d662428574e (br: disk space check follow up (#54596))
44 changes: 44 additions & 0 deletions br/pkg/utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,3 +277,47 @@ func (bo *pdReqBackoffer) NextBackoff(err error) time.Duration {
func (bo *pdReqBackoffer) Attempt() int {
return bo.attempt
}
<<<<<<< HEAD
=======

type DiskCheckBackoffer struct {
attempt int
delayTime time.Duration
maxDelayTime time.Duration
}

func NewDiskCheckBackoffer() Backoffer {
return &DiskCheckBackoffer{
attempt: resetTSRetryTime,
delayTime: resetTSWaitInterval,
maxDelayTime: resetTSMaxWaitInterval,
}
}

func (bo *DiskCheckBackoffer) NextBackoff(err error) time.Duration {
e := errors.Cause(err)
switch e { // nolint:errorlint
case nil, context.Canceled, context.DeadlineExceeded, berrors.ErrKVDiskFull:
bo.delayTime = 0
bo.attempt = 0
case berrors.ErrPDInvalidResponse:
bo.delayTime = 2 * bo.delayTime
bo.attempt--
default:
bo.delayTime = 2 * bo.delayTime
if bo.attempt > 5 {
bo.attempt = 5
}
bo.attempt--
}

if bo.delayTime > bo.maxDelayTime {
return bo.maxDelayTime
}
return bo.delayTime
}

func (bo *DiskCheckBackoffer) Attempt() int {
return bo.attempt
}
>>>>>>> d662428574e (br: disk space check follow up (#54596))
Loading

0 comments on commit 18da8a4

Please sign in to comment.