Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: disk space check follow up (#54596) #54996

Open
wants to merge 1 commit into
base: release-8.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ var (
ErrKVClusterIDMismatch = errors.Normalize("tikv cluster ID mismatch", errors.RFCCodeText("BR:KV:ErrKVClusterIDMismatch"))
ErrKVNotLeader = errors.Normalize("not leader", errors.RFCCodeText("BR:KV:ErrKVNotLeader"))
ErrKVNotTiKV = errors.Normalize("storage is not tikv", errors.RFCCodeText("BR:KV:ErrNotTiKVStorage"))
ErrKVDiskFull = errors.Normalize("disk is full", errors.RFCCodeText("BR:KV:ErrKVDiskFull"))

// ErrKVEpochNotMatch is the error raised when ingestion failed with "epoch
// not match". This error is retryable.
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/task/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ go_test(
"//pkg/statistics/handle/util",
"//pkg/tablecodec",
"//pkg/util/table-filter",
<<<<<<< HEAD
=======
"@com_github_docker_go_units//:go-units",
"@com_github_gogo_protobuf//proto",
>>>>>>> d662428574e (br: disk space check follow up (#54596))
"@com_github_golang_protobuf//proto",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/brpb",
Expand Down
186 changes: 186 additions & 0 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,192 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
return nil
}

<<<<<<< HEAD
=======
func getMaxReplica(ctx context.Context, mgr *conn.Mgr) (cnt uint64, err error) {
var resp map[string]any
err = utils.WithRetry(ctx, func() error {
resp, err = mgr.GetPDHTTPClient().GetReplicateConfig(ctx)
return err
}, utils.NewPDReqBackoffer())
if err != nil {
return 0, errors.Trace(err)
}

key := "max-replicas"
val, ok := resp[key]
if !ok {
return 0, errors.Errorf("key %s not found in response %v", key, resp)
}
return uint64(val.(float64)), nil
}

func getStores(ctx context.Context, mgr *conn.Mgr) (stores *http.StoresInfo, err error) {
err = utils.WithRetry(ctx, func() error {
stores, err = mgr.GetPDHTTPClient().GetStores(ctx)
return err
}, utils.NewPDReqBackoffer())
if err != nil {
return nil, errors.Trace(err)
}
return stores, nil
}

func EstimateTikvUsage(files []*backuppb.File, replicaCnt uint64, storeCnt uint64) uint64 {
if storeCnt == 0 {
return 0
}
if replicaCnt > storeCnt {
replicaCnt = storeCnt
}
totalSize := uint64(0)
for _, file := range files {
totalSize += file.GetSize_()
}
log.Info("estimate tikv usage", zap.Uint64("total size", totalSize), zap.Uint64("replicaCnt", replicaCnt), zap.Uint64("store count", storeCnt))
return totalSize * replicaCnt / storeCnt
}

func EstimateTiflashUsage(tables []*metautil.Table, storeCnt uint64) uint64 {
if storeCnt == 0 {
return 0
}
tiflashTotal := uint64(0)
for _, table := range tables {
if table.Info.TiFlashReplica == nil || table.Info.TiFlashReplica.Count <= 0 {
continue
}
tableBytes := uint64(0)
for _, file := range table.Files {
tableBytes += file.GetSize_()
}
tiflashTotal += tableBytes * table.Info.TiFlashReplica.Count
}
log.Info("estimate tiflash usage", zap.Uint64("total size", tiflashTotal), zap.Uint64("store count", storeCnt))
return tiflashTotal / storeCnt
}

func CheckStoreSpace(necessary uint64, store *http.StoreInfo) error {
available, err := units.RAMInBytes(store.Status.Available)
if err != nil {
return errors.Annotatef(berrors.ErrPDInvalidResponse, "store %d has invalid available space %s", store.Store.ID, store.Status.Available)
}
if available <= 0 {
return errors.Annotatef(berrors.ErrPDInvalidResponse, "store %d has invalid available space %s", store.Store.ID, store.Status.Available)
}
if uint64(available) < necessary {
return errors.Annotatef(berrors.ErrKVDiskFull, "store %d has no space left on device, available %s, necessary %s",
store.Store.ID, units.BytesSize(float64(available)), units.BytesSize(float64(necessary)))
}
return nil
}

func checkDiskSpace(ctx context.Context, mgr *conn.Mgr, files []*backuppb.File, tables []*metautil.Table) error {
maxReplica, err := getMaxReplica(ctx, mgr)
if err != nil {
return errors.Trace(err)
}
stores, err := getStores(ctx, mgr)
if err != nil {
return errors.Trace(err)
}

var tikvCnt, tiflashCnt uint64 = 0, 0
for i := range stores.Stores {
store := &stores.Stores[i]
if engine.IsTiFlashHTTPResp(&store.Store) {
tiflashCnt += 1
continue
}
tikvCnt += 1
}

// We won't need to restore more than 1800 PB data at one time, right?
preserve := func(base uint64, ratio float32) uint64 {
if base > 1000*units.PB {
return base
}
return base * uint64(ratio*10) / 10
}
tikvUsage := preserve(EstimateTikvUsage(files, maxReplica, tikvCnt), 1.1)
tiflashUsage := preserve(EstimateTiflashUsage(tables, tiflashCnt), 1.4)
log.Info("preserved disk space", zap.Uint64("tikv", tikvUsage), zap.Uint64("tiflash", tiflashUsage))

err = utils.WithRetry(ctx, func() error {
stores, err = getStores(ctx, mgr)
if err != nil {
return errors.Trace(err)
}
for _, store := range stores.Stores {
if engine.IsTiFlashHTTPResp(&store.Store) {
if err := CheckStoreSpace(tiflashUsage, &store); err != nil {
return errors.Trace(err)
}
continue
}
if err := CheckStoreSpace(tikvUsage, &store); err != nil {
return errors.Trace(err)
}
}
return nil
}, utils.NewDiskCheckBackoffer())
if err != nil {
return errors.Trace(err)
}
return nil
}

// Exhaust drains all remaining errors in the channel, into a slice of errors.
func Exhaust(ec <-chan error) []error {
out := make([]error, 0, len(ec))
for {
select {
case err := <-ec:
out = append(out, err)
default:
// errCh will NEVER be closed(ya see, it has multi sender-part),
// so we just consume the current backlog of this channel, then return.
return out
}
}
}

// EstimateRangeSize estimates the total range count by file.
func EstimateRangeSize(files []*backuppb.File) int {
result := 0
for _, f := range files {
if strings.HasSuffix(f.GetName(), "_write.sst") {
result++
}
}
return result
}

// MapTableToFiles makes a map that mapping table ID to its backup files.
// aware that one file can and only can hold one table.
func MapTableToFiles(files []*backuppb.File) map[int64][]*backuppb.File {
result := map[int64][]*backuppb.File{}
for _, file := range files {
tableID := tablecodec.DecodeTableID(file.GetStartKey())
tableEndID := tablecodec.DecodeTableID(file.GetEndKey())
if tableID != tableEndID {
log.Panic("key range spread between many files.",
zap.String("file name", file.Name),
logutil.Key("startKey", file.StartKey),
logutil.Key("endKey", file.EndKey))
}
if tableID == 0 {
log.Panic("invalid table key of file",
zap.String("file name", file.Name),
logutil.Key("startKey", file.StartKey),
logutil.Key("endKey", file.EndKey))
}
result[tableID] = append(result[tableID], file)
}
return result
}

>>>>>>> d662428574e (br: disk space check follow up (#54596))
// dropToBlackhole drop all incoming tables into black hole,
// i.e. don't execute checksum, just increase the process anyhow.
func dropToBlackhole(
Expand Down
112 changes: 112 additions & 0 deletions br/pkg/task/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
"fmt"
"testing"

<<<<<<< HEAD

Check failure on line 11 in br/pkg/task/restore_test.go

View workflow job for this annotation

GitHub Actions / Compile for ubuntu-latest

missing import path

Check failure on line 11 in br/pkg/task/restore_test.go

View workflow job for this annotation

GitHub Actions / Compile for FreeBSD job

missing import path

Check failure on line 11 in br/pkg/task/restore_test.go

View workflow job for this annotation

GitHub Actions / integration-test (8.0.22)

missing import path

Check failure on line 11 in br/pkg/task/restore_test.go

View workflow job for this annotation

GitHub Actions / Compile for ubuntu-latest

missing import path

Check failure on line 11 in br/pkg/task/restore_test.go

View workflow job for this annotation

GitHub Actions / Compile for macos-latest

missing import path

Check failure on line 11 in br/pkg/task/restore_test.go

View workflow job for this annotation

GitHub Actions / Compile for FreeBSD job

missing import path
"github.com/golang/protobuf/proto"
=======
"github.com/docker/go-units"
"github.com/gogo/protobuf/proto"
>>>>>>> d662428574e (br: disk space check follow up (#54596))
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/encryptionpb"
"github.com/pingcap/kvproto/pkg/metapb"
Expand All @@ -25,9 +30,13 @@
"google.golang.org/grpc/keepalive"
)

<<<<<<< HEAD
func TestRestoreConfigAdjust(t *testing.T) {
cfg := &RestoreConfig{}
cfg.Adjust()
=======
const pb uint64 = units.PB
>>>>>>> d662428574e (br: disk space check follow up (#54596))

require.Equal(t, uint32(defaultRestoreConcurrency), cfg.Config.Concurrency)
require.Equal(t, defaultSwitchInterval, cfg.Config.SwitchModeInterval)
Expand Down Expand Up @@ -272,3 +281,106 @@
Schemas: mockSchemas,
}
}
<<<<<<< HEAD
=======

func TestCheckDDLJobByRules(t *testing.T) {
ddlJobs := []*model.Job{
{
Type: model.ActionSetTiFlashReplica,
},
{
Type: model.ActionAddPrimaryKey,
},
{
Type: model.ActionUpdateTiFlashReplicaStatus,
},
{
Type: model.ActionCreateTable,
},
{
Type: model.ActionLockTable,
},
{
Type: model.ActionAddIndex,
},
{
Type: model.ActionUnlockTable,
},
{
Type: model.ActionCreateSchema,
},
{
Type: model.ActionModifyColumn,
},
{
Type: model.ActionReorganizePartition,
},
}

filteredDDlJobs := task.FilterDDLJobByRules(ddlJobs, task.DDLJobLogIncrementalCompactBlockListRule)

expectedDDLTypes := []model.ActionType{
model.ActionSetTiFlashReplica,
model.ActionAddPrimaryKey,
model.ActionUpdateTiFlashReplicaStatus,
model.ActionCreateTable,
model.ActionLockTable,
model.ActionUnlockTable,
model.ActionCreateSchema,
}

require.Equal(t, len(expectedDDLTypes), len(filteredDDlJobs))
expectedDDLJobs := make([]*model.Job, 0, len(expectedDDLTypes))
for i, ddlJob := range filteredDDlJobs {
assert.Equal(t, expectedDDLTypes[i], ddlJob.Type)
expectedDDLJobs = append(expectedDDLJobs, ddlJob)
}

require.NoError(t, task.CheckDDLJobByRules(expectedDDLJobs, task.DDLJobLogIncrementalCompactBlockListRule))
require.Error(t, task.CheckDDLJobByRules(ddlJobs, task.DDLJobLogIncrementalCompactBlockListRule))
}

// NOTICE: Once there is a new backfilled type ddl, BR needs to ensure that it is correctly cover by the rules:
func TestMonitorTheIncrementalUnsupportDDLType(t *testing.T) {
require.Equal(t, int(5), ddl.BackupFillerTypeCount())
}

func TestTikvUsage(t *testing.T) {
files := []*backuppb.File{
{Name: "F1", Size_: 1 * pb},
{Name: "F2", Size_: 2 * pb},
{Name: "F3", Size_: 3 * pb},
{Name: "F4", Size_: 4 * pb},
{Name: "F5", Size_: 5 * pb},
}
replica := uint64(3)
storeCnt := uint64(6)
total := uint64(0)
for _, f := range files {
total += f.GetSize_()
}
ret := task.EstimateTikvUsage(files, replica, storeCnt)
require.Equal(t, total*replica/storeCnt, ret)
}

func TestTiflashUsage(t *testing.T) {
tables := []*metautil.Table{
{Info: &model.TableInfo{TiFlashReplica: &model.TiFlashReplicaInfo{Count: 0}},
Files: []*backuppb.File{{Size_: 1 * pb}}},
{Info: &model.TableInfo{TiFlashReplica: &model.TiFlashReplicaInfo{Count: 1}},
Files: []*backuppb.File{{Size_: 2 * pb}}},
{Info: &model.TableInfo{TiFlashReplica: &model.TiFlashReplicaInfo{Count: 2}},
Files: []*backuppb.File{{Size_: 3 * pb}}},
}

var storeCnt uint64 = 3
ret := task.EstimateTiflashUsage(tables, storeCnt)
require.Equal(t, 8*pb/3, ret)
}

func TestCheckTikvSpace(t *testing.T) {
store := pdhttp.StoreInfo{Store: pdhttp.MetaStore{ID: 1}, Status: pdhttp.StoreStatus{Available: "500PB"}}
require.NoError(t, task.CheckStoreSpace(400*pb, &store))
}
>>>>>>> d662428574e (br: disk space check follow up (#54596))
44 changes: 44 additions & 0 deletions br/pkg/utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,3 +277,47 @@ func (bo *pdReqBackoffer) NextBackoff(err error) time.Duration {
func (bo *pdReqBackoffer) Attempt() int {
return bo.attempt
}
<<<<<<< HEAD
=======

type DiskCheckBackoffer struct {
attempt int
delayTime time.Duration
maxDelayTime time.Duration
}

func NewDiskCheckBackoffer() Backoffer {
return &DiskCheckBackoffer{
attempt: resetTSRetryTime,
delayTime: resetTSWaitInterval,
maxDelayTime: resetTSMaxWaitInterval,
}
}

func (bo *DiskCheckBackoffer) NextBackoff(err error) time.Duration {
e := errors.Cause(err)
switch e { // nolint:errorlint
case nil, context.Canceled, context.DeadlineExceeded, berrors.ErrKVDiskFull:
bo.delayTime = 0
bo.attempt = 0
case berrors.ErrPDInvalidResponse:
bo.delayTime = 2 * bo.delayTime
bo.attempt--
default:
bo.delayTime = 2 * bo.delayTime
if bo.attempt > 5 {
bo.attempt = 5
}
bo.attempt--
}

if bo.delayTime > bo.maxDelayTime {
return bo.maxDelayTime
}
return bo.delayTime
}

func (bo *DiskCheckBackoffer) Attempt() int {
return bo.attempt
}
>>>>>>> d662428574e (br: disk space check follow up (#54596))
Loading
Loading