Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

[cherry-pick] br: less split during restoring(tidb#27240) #1432

Merged
merged 2 commits into from
Sep 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/codec"
"go.uber.org/multierr"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -362,18 +361,14 @@ func (importer *FileImporter) downloadSST(
) (*import_sstpb.SSTMeta, error) {
uid := uuid.New()
id := uid[:]
// Assume one region reflects to one rewrite rule
_, key, err := codec.DecodeBytes(regionInfo.Region.GetStartKey())
if err != nil {
return nil, errors.Trace(err)
}
regionRule := matchNewPrefix(key, rewriteRules)
if regionRule == nil {
// Get the rewrite rule for the file.
fileRule := findMatchedRewriteRule(file, rewriteRules)
if fileRule == nil {
return nil, errors.Trace(berrors.ErrKVRewriteRuleNotFound)
}
rule := import_sstpb.RewriteRule{
OldKeyPrefix: encodeKeyPrefix(regionRule.GetOldKeyPrefix()),
NewKeyPrefix: encodeKeyPrefix(regionRule.GetNewKeyPrefix()),
OldKeyPrefix: encodeKeyPrefix(fileRule.GetOldKeyPrefix()),
NewKeyPrefix: encodeKeyPrefix(fileRule.GetNewKeyPrefix()),
}
sstMeta := GetSSTMetaFromFile(id, file, regionInfo.Region, &rule)

Expand All @@ -390,6 +385,7 @@ func (importer *FileImporter) downloadSST(
)
var resp *import_sstpb.DownloadResponse
for _, peer := range regionInfo.Region.GetPeers() {
var err error
resp, err = importer.importClient.DownloadSST(ctx, peer.GetStoreId(), req)
if err != nil {
return nil, errors.Trace(err)
Expand Down
58 changes: 29 additions & 29 deletions pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
"github.com/tikv/pd/pkg/codec"
"go.uber.org/zap"

berrors "github.com/pingcap/br/pkg/errors"
Expand Down Expand Up @@ -78,24 +78,8 @@ func (rs *RegionSplitter) Split(
if errSplit != nil {
return errors.Trace(errSplit)
}
minKey := codec.EncodeBytes([]byte{}, sortedRanges[0].StartKey)
maxKey := codec.EncodeBytes([]byte{}, sortedRanges[len(sortedRanges)-1].EndKey)
for _, rule := range rewriteRules.Table {
if bytes.Compare(minKey, rule.GetNewKeyPrefix()) > 0 {
minKey = rule.GetNewKeyPrefix()
}
if bytes.Compare(maxKey, rule.GetNewKeyPrefix()) < 0 {
maxKey = rule.GetNewKeyPrefix()
}
}
for _, rule := range rewriteRules.Data {
if bytes.Compare(minKey, rule.GetNewKeyPrefix()) > 0 {
minKey = rule.GetNewKeyPrefix()
}
if bytes.Compare(maxKey, rule.GetNewKeyPrefix()) < 0 {
maxKey = rule.GetNewKeyPrefix()
}
}
minKey := codec.EncodeBytes(sortedRanges[0].StartKey)
maxKey := codec.EncodeBytes(sortedRanges[len(sortedRanges)-1].EndKey)
interval := SplitRetryInterval
scatterRegions := make([]*RegionInfo, 0)
SplitRegions:
Expand Down Expand Up @@ -127,7 +111,7 @@ SplitRegions:
log.Error("split regions no valid key",
logutil.Key("startKey", region.Region.StartKey),
logutil.Key("endKey", region.Region.EndKey),
logutil.Key("key", codec.EncodeBytes([]byte{}, key)),
logutil.Key("key", codec.EncodeBytes(key)),
rtree.ZapRanges(ranges))
}
return errors.Trace(errSplit)
Expand Down Expand Up @@ -258,31 +242,47 @@ func (rs *RegionSplitter) waitForScatterRegion(ctx context.Context, regionInfo *
func (rs *RegionSplitter) splitAndScatterRegions(
ctx context.Context, regionInfo *RegionInfo, keys [][]byte,
) ([]*RegionInfo, error) {
if len(keys) == 0 {
return []*RegionInfo{regionInfo}, nil
}

newRegions, err := rs.client.BatchSplitRegions(ctx, regionInfo, keys)
if err != nil {
return nil, errors.Trace(err)
}
// There would be some regions be scattered twice, e.g.:
// |--1-|--2-+----|-3--|
// | +(t1)|
// +(t1_r4) |
// +(t2_r42)
// When spliting at `t1_r4`, we would scatter region 1, 2.
// When spliting at `t2_r42`, we would scatter region 2, 3.
// Because we don't split at t1 anymore.
// The trick here is a pinky promise: never scatter regions you haven't imported any data.
// In this scenario, it is the last region after spliting (applying to >= 5.0).
if bytes.Equal(newRegions[len(newRegions)-1].Region.StartKey, keys[len(keys)-1]) {
newRegions = newRegions[:len(newRegions)-1]
}
rs.ScatterRegions(ctx, newRegions)
return newRegions, nil
}

// ScatterRegions scatter the regions.
func (rs *RegionSplitter) ScatterRegions(ctx context.Context, newRegions []*RegionInfo) {
for _, region := range newRegions {
// Wait for a while until the regions successfully split.
rs.waitForSplit(ctx, region.Region.Id)
if err = rs.client.ScatterRegion(ctx, region); err != nil {
if err := rs.client.ScatterRegion(ctx, region); err != nil {
log.Warn("scatter region failed", logutil.Region(region.Region), zap.Error(err))
}
}
return newRegions, nil
}

// GetSplitKeys checks if the regions should be split by the new prefix of the rewrites rule and the end key of
// the ranges, groups the split keys by region id.
func GetSplitKeys(rewriteRules *RewriteRules, ranges []rtree.Range, regions []*RegionInfo) map[uint64][][]byte {
splitKeyMap := make(map[uint64][][]byte)
checkKeys := make([][]byte, 0)
for _, rule := range rewriteRules.Table {
checkKeys = append(checkKeys, rule.GetNewKeyPrefix())
}
for _, rule := range rewriteRules.Data {
checkKeys = append(checkKeys, rule.GetNewKeyPrefix())
}
for _, rg := range ranges {
checkKeys = append(checkKeys, truncateRowKey(rg.EndKey))
}
Expand All @@ -308,7 +308,7 @@ func NeedSplit(splitKey []byte, regions []*RegionInfo) *RegionInfo {
if len(splitKey) == 0 {
return nil
}
splitKey = codec.EncodeBytes([]byte{}, splitKey)
splitKey = codec.EncodeBytes(splitKey)
for _, region := range regions {
// If splitKey is the boundary of the region
if bytes.Equal(splitKey, region.Region.GetStartKey()) {
Expand Down
14 changes: 7 additions & 7 deletions pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,9 @@ func (c *testClient) SetStoresLabel(ctx context.Context, stores []uint64, labelK
// range: [aaa, aae), [aae, aaz), [ccd, ccf), [ccf, ccj)
// rewrite rules: aa -> xx, cc -> bb
// expected regions after split:
// [, aay), [aay, bb), [bb, bba), [bba, bbf), [bbf, bbh), [bbh, bbj),
// [bbj, cca), [cca, xx), [xx, xxe), [xxe, xxz), [xxz, )
func (s *testRestoreUtilSuite) TestSplit(c *C) {
// [, aay), [aay, bba), [bba, bbf), [bbf, bbh), [bbh, bbj),
// [bbj, cca), [cca, xxe), [xxe, xxz), [xxz, )
func (s *testRangeSuite) TestSplit(c *C) {
client := initTestClient()
ranges := initRanges()
rewriteRules := initRewriteRules()
Expand Down Expand Up @@ -300,11 +300,11 @@ func initRewriteRules() *restore.RewriteRules {
}

// expected regions after split:
// [, aay), [aay, bb), [bb, bba), [bba, bbf), [bbf, bbh), [bbh, bbj),
// [bbj, cca), [cca, xx), [xx, xxe), [xxe, xxz), [xxz, )
// [, aay), [aay, bba), [bba, bbf), [bbf, bbh), [bbh, bbj),
// [bbj, cca), [cca, xxe), [xxe, xxz), [xxz, )
func validateRegions(regions map[uint64]*restore.RegionInfo) bool {
keys := [12]string{"", "aay", "bb", "bba", "bbf", "bbh", "bbj", "cca", "xx", "xxe", "xxz", ""}
if len(regions) != 11 {
keys := [...]string{"", "aay", "bba", "bbf", "bbh", "bbj", "cca", "xxe", "xxz", ""}
if len(regions) != len(keys)-1 {
return false
}
FindRegion:
Expand Down
44 changes: 17 additions & 27 deletions pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,9 @@ func ValidateFileRewriteRule(file *backuppb.File, rewriteRules *RewriteRules) er
)
return errors.Annotate(berrors.ErrRestoreInvalidRewrite, "cannot find rewrite rule")
}
// the new prefix of the start rule must equal or less than the new prefix of the end rule
if bytes.Compare(startRule.GetNewKeyPrefix(), endRule.GetNewKeyPrefix()) > 0 {
// the rewrite rule of the start key and the end key should be equaled.
// i.e. there should only one rewrite rule for one file, a file should only be imported into one region.
if !bytes.Equal(startRule.GetNewKeyPrefix(), endRule.GetNewKeyPrefix()) {
startTableID := tablecodec.DecodeTableID(file.GetStartKey())
endTableID := tablecodec.DecodeTableID(file.GetEndKey())
log.Error(
Expand All @@ -307,17 +308,10 @@ func ValidateFileRewriteRule(file *backuppb.File, rewriteRules *RewriteRules) er
zap.Stringer("endRule", endRule),
logutil.File(file),
)
return errors.Annotate(berrors.ErrRestoreInvalidRewrite, "unexpected rewrite rules")
}

startID := tablecodec.DecodeTableID(file.GetStartKey())
endID := tablecodec.DecodeTableID(file.GetEndKey())
if startID != endID {
log.Error("table ids mismatch",
zap.Int64("startID", startID),
zap.Int64("endID", endID),
logutil.File(file))
return errors.Annotate(berrors.ErrRestoreTableIDMismatch, "file start_key end_key table ids mismatch")
return errors.Annotatef(berrors.ErrRestoreInvalidRewrite,
"rewrite rule mismatch, the backup data may be dirty or from incompatible versions of BR, startKey rule: %X => %X, endKey rule: %X => %X",
startRule.OldKeyPrefix, startRule.NewKeyPrefix, endRule.OldKeyPrefix, endRule.NewKeyPrefix,
)
}
return nil
}
Expand Down Expand Up @@ -349,20 +343,6 @@ func matchOldPrefix(key []byte, rewriteRules *RewriteRules) *import_sstpb.Rewrit
return nil
}

func matchNewPrefix(key []byte, rewriteRules *RewriteRules) *import_sstpb.RewriteRule {
for _, rule := range rewriteRules.Data {
if bytes.HasPrefix(key, rule.GetNewKeyPrefix()) {
return rule
}
}
for _, rule := range rewriteRules.Table {
if bytes.HasPrefix(key, rule.GetNewKeyPrefix()) {
return rule
}
}
return nil
}

func truncateTS(key []byte) []byte {
if len(key) == 0 {
return nil
Expand Down Expand Up @@ -394,6 +374,16 @@ func SplitRanges(
})
}

func findMatchedRewriteRule(file *backuppb.File, rules *RewriteRules) *import_sstpb.RewriteRule {
startID := tablecodec.DecodeTableID(file.GetStartKey())
endID := tablecodec.DecodeTableID(file.GetEndKey())
if startID != endID {
return nil
}
_, rule := rewriteRawKey(file.StartKey, rules)
return rule
}

func rewriteFileKeys(file *backuppb.File, rewriteRules *RewriteRules) (startKey, endKey []byte, err error) {
startID := tablecodec.DecodeTableID(file.GetStartKey())
endID := tablecodec.DecodeTableID(file.GetEndKey())
Expand Down
5 changes: 3 additions & 2 deletions pkg/restore/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ func (s *testRestoreUtilSuite) TestValidateFileRewriteRule(c *C) {
},
rules,
)
c.Assert(err, ErrorMatches, ".*restore table ID mismatch")
c.Assert(err, ErrorMatches, ".*rewrite rule mismatch.*")

// Add a bad rule for end key, after rewrite start key > end key.
rules.Table = append(rules.Table[:1], &import_sstpb.RewriteRule{
OldKeyPrefix: tablecodec.EncodeTablePrefix(2),
Expand All @@ -162,7 +163,7 @@ func (s *testRestoreUtilSuite) TestValidateFileRewriteRule(c *C) {
},
rules,
)
c.Assert(err, ErrorMatches, ".*unexpected rewrite rules.*")
c.Assert(err, ErrorMatches, ".*rewrite rule mismatch.*")
}

func (s *testRestoreUtilSuite) TestPaginateScanRegion(c *C) {
Expand Down