Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: disk space check follow up #54596

Merged
merged 8 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ var (
ErrKVClusterIDMismatch = errors.Normalize("tikv cluster ID mismatch", errors.RFCCodeText("BR:KV:ErrKVClusterIDMismatch"))
ErrKVNotLeader = errors.Normalize("not leader", errors.RFCCodeText("BR:KV:ErrKVNotLeader"))
ErrKVNotTiKV = errors.Normalize("storage is not tikv", errors.RFCCodeText("BR:KV:ErrNotTiKVStorage"))
ErrKVDiskFull = errors.Normalize("disk is full", errors.RFCCodeText("BR:KV:ErrKVDiskFull"))

// ErrKVEpochNotMatch is the error raised when ingestion failed with "epoch
// not match". This error is retryable.
Expand Down
1 change: 1 addition & 0 deletions br/pkg/task/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ go_test(
"//pkg/testkit",
"//pkg/types",
"//pkg/util/table-filter",
"@com_github_docker_go_units//:go-units",
"@com_github_gogo_protobuf//proto",
"@com_github_golang_protobuf//proto",
"@com_github_pingcap_errors//:errors",
Expand Down
32 changes: 18 additions & 14 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ const (
defaultFlagDdlBatchSize = 128
resetSpeedLimitRetryTimes = 3
maxRestoreBatchSizeLimit = 10240
pb = 1024 * 1024 * 1024 * 1024 * 1024
)

const (
Expand Down Expand Up @@ -1240,37 +1239,41 @@ func getStores(ctx context.Context, mgr *conn.Mgr) (stores *http.StoresInfo, err
return stores, nil
}

func EstimateTikvUsage(files []*backuppb.File, maxReplica uint64, storeCnt int) uint64 {
func EstimateTikvUsage(files []*backuppb.File, replicaCnt uint64, storeCnt uint64) uint64 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, a more common name could be replicationFactor or rf

if storeCnt == 0 {
return 0
}
var totalSize uint64 = 0
if replicaCnt > storeCnt {
replicaCnt = storeCnt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@3pointer just to double check, do we have topology constraint that makes sure no more than 1 replica from the same resign placed on the same store?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, our raft implementation make this constraint

}
totalSize := uint64(0)
for _, file := range files {
totalSize += file.GetSize_()
}
return totalSize * maxReplica / uint64(storeCnt)
log.Info("estimate tikv usage", zap.Uint64("total size", totalSize), zap.Uint64("replicaCnt", replicaCnt), zap.Uint64("store count", storeCnt))
return totalSize * replicaCnt / storeCnt
}

func EstimateTiflashUsage(tables []*metautil.Table, storeCnt int) uint64 {
func EstimateTiflashUsage(tables []*metautil.Table, storeCnt uint64) uint64 {
if storeCnt == 0 {
return 0
}
var tiflashTotal uint64 = 0
tiflashTotal := uint64(0)
for _, table := range tables {
if table.TiFlashReplicas <= 0 {
if table.Info.TiFlashReplica == nil || table.Info.TiFlashReplica.Count <= 0 {
continue
}
tableBytes := uint64(0)
for _, file := range table.Files {
tableBytes += file.GetSize_()
}
tiflashTotal += tableBytes * uint64(table.TiFlashReplicas)
tiflashTotal += tableBytes * table.Info.TiFlashReplica.Count
}
return tiflashTotal / uint64(storeCnt)
log.Info("estimate tiflash usage", zap.Uint64("total size", tiflashTotal), zap.Uint64("store count", storeCnt))
return tiflashTotal / storeCnt
}

func CheckStoreSpace(necessary uint64, store *http.StoreInfo) error {
// Be careful editing the message, it is used in DiskCheckBackoffer
available, err := units.RAMInBytes(store.Status.Available)
if err != nil {
return errors.Annotatef(berrors.ErrPDInvalidResponse, "store %d has invalid available space %s", store.Store.ID, store.Status.Available)
Expand All @@ -1279,7 +1282,7 @@ func CheckStoreSpace(necessary uint64, store *http.StoreInfo) error {
return errors.Annotatef(berrors.ErrPDInvalidResponse, "store %d has invalid available space %s", store.Store.ID, store.Status.Available)
}
if uint64(available) < necessary {
return errors.Errorf("store %d has no space left on device, available %s, necessary %s",
return errors.Annotatef(berrors.ErrKVDiskFull, "store %d has no space left on device, available %s, necessary %s",
store.Store.ID, units.BytesSize(float64(available)), units.BytesSize(float64(necessary)))
}
return nil
Expand All @@ -1295,7 +1298,7 @@ func checkDiskSpace(ctx context.Context, mgr *conn.Mgr, files []*backuppb.File,
return errors.Trace(err)
}

tikvCnt, tiflashCnt := 0, 0
var tikvCnt, tiflashCnt uint64 = 0, 0
for i := range stores.Stores {
store := &stores.Stores[i]
if engine.IsTiFlashHTTPResp(&store.Store) {
Expand All @@ -1307,13 +1310,14 @@ func checkDiskSpace(ctx context.Context, mgr *conn.Mgr, files []*backuppb.File,

// We won't need to restore more than 1800 PB data at one time, right?
preserve := func(base uint64, ratio float32) uint64 {
if base > 1000*pb {
if base > 1000*units.PB {
return base
}
return base * uint64(ratio*10) / 10
}
tikvUsage := preserve(EstimateTikvUsage(files, maxReplica, tikvCnt), 1.1)
tiflashUsage := preserve(EstimateTiflashUsage(tables, tiflashCnt), 1.1)
tiflashUsage := preserve(EstimateTiflashUsage(tables, tiflashCnt), 1.4)
log.Info("preserved disk space", zap.Uint64("tikv", tikvUsage), zap.Uint64("tiflash", tiflashUsage))

err = utils.WithRetry(ctx, func() error {
stores, err = getStores(ctx, mgr)
Expand Down
18 changes: 11 additions & 7 deletions br/pkg/task/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strconv"
"testing"

"github.com/docker/go-units"
"github.com/gogo/protobuf/proto"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/encryptionpb"
Expand All @@ -32,7 +33,7 @@ import (
pdhttp "github.com/tikv/pd/client/http"
)

const pb uint64 = 1024 * 1024 * 1024 * 1024 * 1024
const pb uint64 = units.PB

func TestPreCheckTableTiFlashReplicas(t *testing.T) {
mockStores := []*metapb.Store{
Expand Down Expand Up @@ -492,23 +493,26 @@ func TestTikvUsage(t *testing.T) {
{Name: "F5", Size_: 5 * pb},
}
replica := uint64(3)
storeCnt := 6
storeCnt := uint64(6)
total := uint64(0)
for _, f := range files {
total += f.GetSize_()
}
ret := task.EstimateTikvUsage(files, replica, storeCnt)
require.Equal(t, total*replica/uint64(storeCnt), ret)
require.Equal(t, total*replica/storeCnt, ret)
}

func TestTiflashUsage(t *testing.T) {
tables := []*metautil.Table{
{TiFlashReplicas: 0, Files: []*backuppb.File{{Size_: 1 * pb}}},
{TiFlashReplicas: 1, Files: []*backuppb.File{{Size_: 2 * pb}}},
{TiFlashReplicas: 2, Files: []*backuppb.File{{Size_: 3 * pb}}},
{Info: &model.TableInfo{TiFlashReplica: &model.TiFlashReplicaInfo{Count: 0}},
Files: []*backuppb.File{{Size_: 1 * pb}}},
{Info: &model.TableInfo{TiFlashReplica: &model.TiFlashReplicaInfo{Count: 1}},
Files: []*backuppb.File{{Size_: 2 * pb}}},
{Info: &model.TableInfo{TiFlashReplica: &model.TiFlashReplicaInfo{Count: 2}},
Files: []*backuppb.File{{Size_: 3 * pb}}},
}

storeCnt := 3
var storeCnt uint64 = 3
ret := task.EstimateTiflashUsage(tables, storeCnt)
require.Equal(t, 8*pb/3, ret)
}
Expand Down
15 changes: 5 additions & 10 deletions br/pkg/utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,23 +298,18 @@ func NewDiskCheckBackoffer() Backoffer {
func (bo *DiskCheckBackoffer) NextBackoff(err error) time.Duration {
e := errors.Cause(err)
switch e { // nolint:errorlint
case nil, context.Canceled, context.DeadlineExceeded:
case nil, context.Canceled, context.DeadlineExceeded, berrors.ErrKVDiskFull:
bo.delayTime = 0
bo.attempt = 0
case berrors.ErrPDInvalidResponse:
bo.delayTime = 2 * bo.delayTime
bo.attempt--
default:
if strings.Contains(e.Error(), "no space left on device") {
bo.delayTime = 0
bo.attempt = 0
} else {
bo.delayTime = 2 * bo.delayTime
if bo.attempt > 5 {
bo.attempt = 5
}
bo.attempt--
bo.delayTime = 2 * bo.delayTime
if bo.attempt > 5 {
bo.attempt = 5
}
bo.attempt--
}

if bo.delayTime > bo.maxDelayTime {
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ error = '''
tikv cluster ID mismatch
'''

["BR:KV:ErrKVDiskFull"]
error = '''
disk is full
'''

["BR:KV:ErrKVDownloadFailed"]
error = '''
download sst failed
Expand Down
Loading