Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
[cherry-pick] br: less split during restoring(tidb#27240) (#1432)
Browse files Browse the repository at this point in the history
  • Loading branch information
YuJuncen authored Sep 6, 2021
1 parent 5db3972 commit 0391331
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 75 deletions.
16 changes: 6 additions & 10 deletions pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/codec"
"go.uber.org/multierr"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -362,18 +361,14 @@ func (importer *FileImporter) downloadSST(
) (*import_sstpb.SSTMeta, error) {
uid := uuid.New()
id := uid[:]
// Assume one region reflects to one rewrite rule
_, key, err := codec.DecodeBytes(regionInfo.Region.GetStartKey())
if err != nil {
return nil, errors.Trace(err)
}
regionRule := matchNewPrefix(key, rewriteRules)
if regionRule == nil {
// Get the rewrite rule for the file.
fileRule := findMatchedRewriteRule(file, rewriteRules)
if fileRule == nil {
return nil, errors.Trace(berrors.ErrKVRewriteRuleNotFound)
}
rule := import_sstpb.RewriteRule{
OldKeyPrefix: encodeKeyPrefix(regionRule.GetOldKeyPrefix()),
NewKeyPrefix: encodeKeyPrefix(regionRule.GetNewKeyPrefix()),
OldKeyPrefix: encodeKeyPrefix(fileRule.GetOldKeyPrefix()),
NewKeyPrefix: encodeKeyPrefix(fileRule.GetNewKeyPrefix()),
}
sstMeta := GetSSTMetaFromFile(id, file, regionInfo.Region, &rule)

Expand All @@ -390,6 +385,7 @@ func (importer *FileImporter) downloadSST(
)
var resp *import_sstpb.DownloadResponse
for _, peer := range regionInfo.Region.GetPeers() {
var err error
resp, err = importer.importClient.DownloadSST(ctx, peer.GetStoreId(), req)
if err != nil {
return nil, errors.Trace(err)
Expand Down
58 changes: 29 additions & 29 deletions pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
"github.com/tikv/pd/pkg/codec"
"go.uber.org/zap"

berrors "github.com/pingcap/br/pkg/errors"
Expand Down Expand Up @@ -78,24 +78,8 @@ func (rs *RegionSplitter) Split(
if errSplit != nil {
return errors.Trace(errSplit)
}
minKey := codec.EncodeBytes([]byte{}, sortedRanges[0].StartKey)
maxKey := codec.EncodeBytes([]byte{}, sortedRanges[len(sortedRanges)-1].EndKey)
for _, rule := range rewriteRules.Table {
if bytes.Compare(minKey, rule.GetNewKeyPrefix()) > 0 {
minKey = rule.GetNewKeyPrefix()
}
if bytes.Compare(maxKey, rule.GetNewKeyPrefix()) < 0 {
maxKey = rule.GetNewKeyPrefix()
}
}
for _, rule := range rewriteRules.Data {
if bytes.Compare(minKey, rule.GetNewKeyPrefix()) > 0 {
minKey = rule.GetNewKeyPrefix()
}
if bytes.Compare(maxKey, rule.GetNewKeyPrefix()) < 0 {
maxKey = rule.GetNewKeyPrefix()
}
}
minKey := codec.EncodeBytes(sortedRanges[0].StartKey)
maxKey := codec.EncodeBytes(sortedRanges[len(sortedRanges)-1].EndKey)
interval := SplitRetryInterval
scatterRegions := make([]*RegionInfo, 0)
SplitRegions:
Expand Down Expand Up @@ -127,7 +111,7 @@ SplitRegions:
log.Error("split regions no valid key",
logutil.Key("startKey", region.Region.StartKey),
logutil.Key("endKey", region.Region.EndKey),
logutil.Key("key", codec.EncodeBytes([]byte{}, key)),
logutil.Key("key", codec.EncodeBytes(key)),
rtree.ZapRanges(ranges))
}
return errors.Trace(errSplit)
Expand Down Expand Up @@ -258,31 +242,47 @@ func (rs *RegionSplitter) waitForScatterRegion(ctx context.Context, regionInfo *
func (rs *RegionSplitter) splitAndScatterRegions(
ctx context.Context, regionInfo *RegionInfo, keys [][]byte,
) ([]*RegionInfo, error) {
if len(keys) == 0 {
return []*RegionInfo{regionInfo}, nil
}

newRegions, err := rs.client.BatchSplitRegions(ctx, regionInfo, keys)
if err != nil {
return nil, errors.Trace(err)
}
// There would be some regions be scattered twice, e.g.:
// |--1-|--2-+----|-3--|
// | +(t1)|
// +(t1_r4) |
// +(t2_r42)
// When spliting at `t1_r4`, we would scatter region 1, 2.
// When spliting at `t2_r42`, we would scatter region 2, 3.
// Because we don't split at t1 anymore.
// The trick here is a pinky promise: never scatter regions you haven't imported any data.
// In this scenario, it is the last region after spliting (applying to >= 5.0).
if bytes.Equal(newRegions[len(newRegions)-1].Region.StartKey, keys[len(keys)-1]) {
newRegions = newRegions[:len(newRegions)-1]
}
rs.ScatterRegions(ctx, newRegions)
return newRegions, nil
}

// ScatterRegions scatter the regions.
func (rs *RegionSplitter) ScatterRegions(ctx context.Context, newRegions []*RegionInfo) {
for _, region := range newRegions {
// Wait for a while until the regions successfully split.
rs.waitForSplit(ctx, region.Region.Id)
if err = rs.client.ScatterRegion(ctx, region); err != nil {
if err := rs.client.ScatterRegion(ctx, region); err != nil {
log.Warn("scatter region failed", logutil.Region(region.Region), zap.Error(err))
}
}
return newRegions, nil
}

// GetSplitKeys checks if the regions should be split by the new prefix of the rewrites rule and the end key of
// the ranges, groups the split keys by region id.
func GetSplitKeys(rewriteRules *RewriteRules, ranges []rtree.Range, regions []*RegionInfo) map[uint64][][]byte {
splitKeyMap := make(map[uint64][][]byte)
checkKeys := make([][]byte, 0)
for _, rule := range rewriteRules.Table {
checkKeys = append(checkKeys, rule.GetNewKeyPrefix())
}
for _, rule := range rewriteRules.Data {
checkKeys = append(checkKeys, rule.GetNewKeyPrefix())
}
for _, rg := range ranges {
checkKeys = append(checkKeys, truncateRowKey(rg.EndKey))
}
Expand All @@ -308,7 +308,7 @@ func NeedSplit(splitKey []byte, regions []*RegionInfo) *RegionInfo {
if len(splitKey) == 0 {
return nil
}
splitKey = codec.EncodeBytes([]byte{}, splitKey)
splitKey = codec.EncodeBytes(splitKey)
for _, region := range regions {
// If splitKey is the boundary of the region
if bytes.Equal(splitKey, region.Region.GetStartKey()) {
Expand Down
14 changes: 7 additions & 7 deletions pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,9 @@ func (c *testClient) SetStoresLabel(ctx context.Context, stores []uint64, labelK
// range: [aaa, aae), [aae, aaz), [ccd, ccf), [ccf, ccj)
// rewrite rules: aa -> xx, cc -> bb
// expected regions after split:
// [, aay), [aay, bb), [bb, bba), [bba, bbf), [bbf, bbh), [bbh, bbj),
// [bbj, cca), [cca, xx), [xx, xxe), [xxe, xxz), [xxz, )
func (s *testRestoreUtilSuite) TestSplit(c *C) {
// [, aay), [aay, bba), [bba, bbf), [bbf, bbh), [bbh, bbj),
// [bbj, cca), [cca, xxe), [xxe, xxz), [xxz, )
func (s *testRangeSuite) TestSplit(c *C) {
client := initTestClient()
ranges := initRanges()
rewriteRules := initRewriteRules()
Expand Down Expand Up @@ -300,11 +300,11 @@ func initRewriteRules() *restore.RewriteRules {
}

// expected regions after split:
// [, aay), [aay, bb), [bb, bba), [bba, bbf), [bbf, bbh), [bbh, bbj),
// [bbj, cca), [cca, xx), [xx, xxe), [xxe, xxz), [xxz, )
// [, aay), [aay, bba), [bba, bbf), [bbf, bbh), [bbh, bbj),
// [bbj, cca), [cca, xxe), [xxe, xxz), [xxz, )
func validateRegions(regions map[uint64]*restore.RegionInfo) bool {
keys := [12]string{"", "aay", "bb", "bba", "bbf", "bbh", "bbj", "cca", "xx", "xxe", "xxz", ""}
if len(regions) != 11 {
keys := [...]string{"", "aay", "bba", "bbf", "bbh", "bbj", "cca", "xxe", "xxz", ""}
if len(regions) != len(keys)-1 {
return false
}
FindRegion:
Expand Down
44 changes: 17 additions & 27 deletions pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,9 @@ func ValidateFileRewriteRule(file *backuppb.File, rewriteRules *RewriteRules) er
)
return errors.Annotate(berrors.ErrRestoreInvalidRewrite, "cannot find rewrite rule")
}
// the new prefix of the start rule must equal or less than the new prefix of the end rule
if bytes.Compare(startRule.GetNewKeyPrefix(), endRule.GetNewKeyPrefix()) > 0 {
// the rewrite rule of the start key and the end key should be equaled.
// i.e. there should only one rewrite rule for one file, a file should only be imported into one region.
if !bytes.Equal(startRule.GetNewKeyPrefix(), endRule.GetNewKeyPrefix()) {
startTableID := tablecodec.DecodeTableID(file.GetStartKey())
endTableID := tablecodec.DecodeTableID(file.GetEndKey())
log.Error(
Expand All @@ -307,17 +308,10 @@ func ValidateFileRewriteRule(file *backuppb.File, rewriteRules *RewriteRules) er
zap.Stringer("endRule", endRule),
logutil.File(file),
)
return errors.Annotate(berrors.ErrRestoreInvalidRewrite, "unexpected rewrite rules")
}

startID := tablecodec.DecodeTableID(file.GetStartKey())
endID := tablecodec.DecodeTableID(file.GetEndKey())
if startID != endID {
log.Error("table ids mismatch",
zap.Int64("startID", startID),
zap.Int64("endID", endID),
logutil.File(file))
return errors.Annotate(berrors.ErrRestoreTableIDMismatch, "file start_key end_key table ids mismatch")
return errors.Annotatef(berrors.ErrRestoreInvalidRewrite,
"rewrite rule mismatch, the backup data may be dirty or from incompatible versions of BR, startKey rule: %X => %X, endKey rule: %X => %X",
startRule.OldKeyPrefix, startRule.NewKeyPrefix, endRule.OldKeyPrefix, endRule.NewKeyPrefix,
)
}
return nil
}
Expand Down Expand Up @@ -349,20 +343,6 @@ func matchOldPrefix(key []byte, rewriteRules *RewriteRules) *import_sstpb.Rewrit
return nil
}

func matchNewPrefix(key []byte, rewriteRules *RewriteRules) *import_sstpb.RewriteRule {
for _, rule := range rewriteRules.Data {
if bytes.HasPrefix(key, rule.GetNewKeyPrefix()) {
return rule
}
}
for _, rule := range rewriteRules.Table {
if bytes.HasPrefix(key, rule.GetNewKeyPrefix()) {
return rule
}
}
return nil
}

func truncateTS(key []byte) []byte {
if len(key) == 0 {
return nil
Expand Down Expand Up @@ -394,6 +374,16 @@ func SplitRanges(
})
}

func findMatchedRewriteRule(file *backuppb.File, rules *RewriteRules) *import_sstpb.RewriteRule {
startID := tablecodec.DecodeTableID(file.GetStartKey())
endID := tablecodec.DecodeTableID(file.GetEndKey())
if startID != endID {
return nil
}
_, rule := rewriteRawKey(file.StartKey, rules)
return rule
}

func rewriteFileKeys(file *backuppb.File, rewriteRules *RewriteRules) (startKey, endKey []byte, err error) {
startID := tablecodec.DecodeTableID(file.GetStartKey())
endID := tablecodec.DecodeTableID(file.GetEndKey())
Expand Down
5 changes: 3 additions & 2 deletions pkg/restore/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ func (s *testRestoreUtilSuite) TestValidateFileRewriteRule(c *C) {
},
rules,
)
c.Assert(err, ErrorMatches, ".*restore table ID mismatch")
c.Assert(err, ErrorMatches, ".*rewrite rule mismatch.*")

// Add a bad rule for end key, after rewrite start key > end key.
rules.Table = append(rules.Table[:1], &import_sstpb.RewriteRule{
OldKeyPrefix: tablecodec.EncodeTablePrefix(2),
Expand All @@ -162,7 +163,7 @@ func (s *testRestoreUtilSuite) TestValidateFileRewriteRule(c *C) {
},
rules,
)
c.Assert(err, ErrorMatches, ".*unexpected rewrite rules.*")
c.Assert(err, ErrorMatches, ".*rewrite rule mismatch.*")
}

func (s *testRestoreUtilSuite) TestPaginateScanRegion(c *C) {
Expand Down

0 comments on commit 0391331

Please sign in to comment.