From 0a43d2a28aa1a2fd0a4504192981a2a7c58652f0 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Wed, 22 Jan 2020 14:28:16 +0800 Subject: [PATCH] restore: merge tidb-tools/pkg/restore-util (#146) * restore-util: Implement split/scatter (#274) * implement split/scatter Signed-off-by: 5kbpers * init test Signed-off-by: 5kbpers * redesign output/input of the lib Signed-off-by: 5kbpers * update dependency Signed-off-by: 5kbpers * add commments and more tests Signed-off-by: 5kbpers * add ScanRegions interface to Client Signed-off-by: 5kbpers * fix potential data race Signed-off-by: 5kbpers * address comments Signed-off-by: 5kbpers * address comments Signed-off-by: 5kbpers * Apply suggestions from code review Co-Authored-By: kennytm * Update pkg/restore-util/client.go Co-Authored-By: kennytm * address comments Signed-off-by: 5kbpers * address comments Signed-off-by: 5kbpers * address comments Signed-off-by: 5kbpers * update dependency Signed-off-by: 5kbpers * resolve conflicts Signed-off-by: 5kbpers * fix prefix rewrite Signed-off-by: 5kbpers * add RewriteRule/skip failed scatter region/retry the SplitRegion Signed-off-by: 5kbpers * fix test Signed-off-by: 5kbpers * check if region has peer Signed-off-by: 5kbpers * more logs Signed-off-by: 5kbpers * restore-util: add split retry interval (#277) * reset dependencies to release-3.1 * add split retry interval Signed-off-by: 5kbpers * fix go.sum Signed-off-by: 5kbpers * restore-util: wait for scatter region sequentially (#279) * wait for scatter region sequentially Signed-off-by: 5kbpers * address comments Signed-off-by: 5kbpers * restore-util: add on split hook (#281) * restore-util: add on split hook Signed-off-by: Neil Shen * Nil check onSplit Co-Authored-By: kennytm * restore-util: fix returned new region is nil (#283) * restore-util: fix returned new region is nil Signed-off-by: 5kbpers * more logs Signed-off-by: 5kbpers * *: gofmt Signed-off-by: 5kbpers * Apply suggestions from code review Co-Authored-By: kennytm * fix log Signed-off-by: 5kbpers * restore-util: call onSplit on splitByRewriteRules (#285) Signed-off-by: Neil Shen * restore-util: fix overlapped error message (#293) * restore-util: fix overlapped error message Signed-off-by: 5kbpers * fix log message Signed-off-by: 5kbpers * reduce error trace Signed-off-by: 5kbpers * fix test Signed-off-by: 5kbpers * address comments Signed-off-by: 5kbpers * address comments Signed-off-by: 5kbpers * restore-util: log warning when cannot find matched rewrite rule (#299) * restore-util: add method to set placement rules and store labels (#301) * restore-util: add method to set placement rules and store labels Signed-off-by: disksing * minor fix Signed-off-by: disksing * address comment Signed-off-by: disksing * add GetPlacementRules Signed-off-by: disksing * fix test Signed-off-by: disksing * restore-util: support batch split (#300) * restore-util: support batch split Signed-off-by: 5kbpers * go fmt Signed-off-by: 5kbpers * Apply suggestions from code review Co-Authored-By: kennytm * address commits Signed-off-by: 5kbpers * Update pkg/restore-util/split.go Co-Authored-By: kennytm * add onSplit callback Signed-off-by: 5kbpers * fix test Signed-off-by: 5kbpers * address comments Signed-off-by: 5kbpers * restore-util: add upper bound time for waiting for scatter (#305) * restore: fix scatter regions failed Signed-off-by: 5kbpers * add log Signed-off-by: 5kbpers * stop waiting for scatter after 3min Signed-off-by: 5kbpers * address comments Signed-off-by: 5kbpers * restore-util: fix wrong url (#306) Signed-off-by: disksing * restore-util: add warning about unmatched table id (#313) * restore-util: support table partition Signed-off-by: 5kbpers * fix log Signed-off-by: 5kbpers * warn table id does not match Signed-off-by: 5kbpers * add unit tests Signed-off-by: 5kbpers * Apply suggestions from code review Co-Authored-By: Neil Shen * fix compile error Signed-off-by: 5kbpers * address comments Signed-off-by: 5kbpers * address comments Signed-off-by: 5kbpers * fix test Signed-off-by: 5kbpers Co-authored-by: Ian Co-authored-by: Neil Shen * *: prune tidb-tools Signed-off-by: Neil Shen * restore: address linters suggestions Signed-off-by: Neil Shen * restore: merge restoreutil into restore Signed-off-by: Neil Shen * address comment Signed-off-by: Neil Shen Co-authored-by: 5kbpers <20279863+5kbpers@users.noreply.github.com> Co-authored-by: kennytm Co-authored-by: disksing Co-authored-by: Ian --- .golangci.yml | 7 +- cmd/validate.go | 9 +- go.mod | 1 - go.sum | 3 +- pkg/restore/client.go | 13 +- pkg/restore/import.go | 33 ++-- pkg/restore/range.go | 148 +++++++++++++++ pkg/restore/range_test.go | 75 ++++++++ pkg/restore/split.go | 305 +++++++++++++++++++++++++++++++ pkg/restore/split_client.go | 353 ++++++++++++++++++++++++++++++++++++ pkg/restore/split_test.go | 301 ++++++++++++++++++++++++++++++ pkg/restore/util.go | 29 ++- pkg/restore/util_test.go | 3 +- 13 files changed, 1228 insertions(+), 52 deletions(-) create mode 100644 pkg/restore/range.go create mode 100644 pkg/restore/range_test.go create mode 100644 pkg/restore/split.go create mode 100644 pkg/restore/split_client.go create mode 100644 pkg/restore/split_test.go diff --git a/.golangci.yml b/.golangci.yml index 969cac759..1b025678e 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -9,7 +9,8 @@ issues: text: "Potential HTTP request made with variable url" linters: - gosec - - path: .go - text: "Use of weak random number generator" + # TODO Remove it. + - path: split_client.go + text: "SA1019:" linters: - - gosec + - staticcheck diff --git a/cmd/validate.go b/cmd/validate.go index dd1e11fb0..8ba72b372 100644 --- a/cmd/validate.go +++ b/cmd/validate.go @@ -15,7 +15,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/parser/model" "github.com/pingcap/pd/pkg/mock/mockid" - restore_util "github.com/pingcap/tidb-tools/pkg/restore-util" "github.com/spf13/cobra" "go.uber.org/zap" @@ -187,15 +186,15 @@ func newBackupMetaCommand() *cobra.Command { tables = append(tables, db.Tables...) } // Check if the ranges of files overlapped - rangeTree := restore_util.NewRangeTree() + rangeTree := restore.NewRangeTree() for _, file := range files { - if out := rangeTree.InsertRange(restore_util.Range{ + if out := rangeTree.InsertRange(restore.Range{ StartKey: file.GetStartKey(), EndKey: file.GetEndKey(), }); out != nil { log.Error( "file ranges overlapped", - zap.Stringer("out", out.(*restore_util.Range)), + zap.Stringer("out", out.(*restore.Range)), zap.Stringer("file", file), ) } @@ -206,7 +205,7 @@ func newBackupMetaCommand() *cobra.Command { for offset := uint64(0); offset < tableIDOffset; offset++ { _, _ = tableIDAllocator.Alloc() // Ignore error } - rewriteRules := &restore_util.RewriteRules{ + rewriteRules := &restore.RewriteRules{ Table: make([]*import_sstpb.RewriteRule, 0), Data: make([]*import_sstpb.RewriteRule, 0), } diff --git a/go.mod b/go.mod index 8e50bbf35..9951c2922 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,6 @@ require ( github.com/pingcap/parser v0.0.0-20191210060830-bdf23a7ade01 github.com/pingcap/pd v1.1.0-beta.0.20191212045800-234784c7a9c5 github.com/pingcap/tidb v1.1.0-beta.0.20191213040028-9009da737834 - github.com/pingcap/tidb-tools v3.1.0-beta.0.20191223064326-e9c7a23a8dcb+incompatible github.com/pingcap/tipb v0.0.0-20191209145133-44f75c9bef33 github.com/prometheus/client_golang v1.0.0 github.com/sirupsen/logrus v1.4.2 diff --git a/go.sum b/go.sum index 696ccee81..085e00355 100644 --- a/go.sum +++ b/go.sum @@ -283,9 +283,8 @@ github.com/pingcap/sysutil v0.0.0-20191126040022-986c5b3ed9a3 h1:HCNif3lukL83gNC github.com/pingcap/sysutil v0.0.0-20191126040022-986c5b3ed9a3/go.mod h1:Futrrmuw98pEsbEmoPsjw8aKLCmixwHEmT2rF+AsXGw= github.com/pingcap/tidb v1.1.0-beta.0.20191213040028-9009da737834 h1:eNf7bDY39moIzzcs5+PhLLW0BM2D2yrzFbjW/X42y0s= github.com/pingcap/tidb v1.1.0-beta.0.20191213040028-9009da737834/go.mod h1:VWx47QOXISBHHtZeWrDQlBOdbvth9TE9gei6QpoqJ4g= +github.com/pingcap/tidb-tools v3.0.6-0.20191106033616-90632dda3863+incompatible h1:H1jg0aDWz2SLRh3hNBo2HFtnuHtudIUvBumU7syRkic= github.com/pingcap/tidb-tools v3.0.6-0.20191106033616-90632dda3863+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= -github.com/pingcap/tidb-tools v3.1.0-beta.0.20191223064326-e9c7a23a8dcb+incompatible h1:GxWxXVqA2aAZIgS+bEpasJkkspu9Jom1/oB2NmP7t/o= -github.com/pingcap/tidb-tools v3.1.0-beta.0.20191223064326-e9c7a23a8dcb+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= github.com/pingcap/tipb v0.0.0-20191209145133-44f75c9bef33 h1:cTSaVv1hue17BCPqt+sURADTFSMpSD26ZuvKRyYIjJs= github.com/pingcap/tipb v0.0.0-20191209145133-44f75c9bef33/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 9714edc2a..3030ba857 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -13,7 +13,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/parser/model" pd "github.com/pingcap/pd/client" - restore_util "github.com/pingcap/tidb-tools/pkg/restore-util" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/oracle" @@ -108,7 +107,7 @@ func (rc *Client) InitBackupMeta(backupMeta *backup.BackupMeta, backend *backup. rc.databases = databases rc.backupMeta = backupMeta - metaClient := restore_util.NewClient(rc.pdClient) + metaClient := NewSplitClient(rc.pdClient) importClient := NewImportClient(metaClient) rc.fileImporter = NewFileImporter(rc.ctx, metaClient, importClient, backend, rc.rateLimit) return nil @@ -189,8 +188,8 @@ func (rc *Client) CreateTables( dom *domain.Domain, tables []*utils.Table, newTS uint64, -) (*restore_util.RewriteRules, []*model.TableInfo, error) { - rewriteRules := &restore_util.RewriteRules{ +) (*RewriteRules, []*model.TableInfo, error) { + rewriteRules := &RewriteRules{ Table: make([]*import_sstpb.RewriteRule, 0), Data: make([]*import_sstpb.RewriteRule, 0), } @@ -232,7 +231,7 @@ func (rc *Client) setSpeedLimit() error { // RestoreTable tries to restore the data of a table. func (rc *Client) RestoreTable( table *utils.Table, - rewriteRules *restore_util.RewriteRules, + rewriteRules *RewriteRules, updateCh chan<- struct{}, ) (err error) { start := time.Now() @@ -300,7 +299,7 @@ func (rc *Client) RestoreTable( // RestoreDatabase tries to restore the data of a database func (rc *Client) RestoreDatabase( db *utils.Database, - rewriteRules *restore_util.RewriteRules, + rewriteRules *RewriteRules, updateCh chan<- struct{}, ) (err error) { start := time.Now() @@ -336,7 +335,7 @@ func (rc *Client) RestoreDatabase( // RestoreAll tries to restore all the data of backup files. func (rc *Client) RestoreAll( - rewriteRules *restore_util.RewriteRules, + rewriteRules *RewriteRules, updateCh chan<- struct{}, ) (err error) { start := time.Now() diff --git a/pkg/restore/import.go b/pkg/restore/import.go index fc09b7b16..77273ebab 100644 --- a/pkg/restore/import.go +++ b/pkg/restore/import.go @@ -12,7 +12,6 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/log" "github.com/pingcap/pd/pkg/codec" - restore_util "github.com/pingcap/tidb-tools/pkg/restore-util" "go.uber.org/zap" "google.golang.org/grpc" @@ -60,12 +59,12 @@ type ImporterClient interface { type importClient struct { mu sync.Mutex - metaClient restore_util.Client + metaClient SplitClient clients map[uint64]import_sstpb.ImportSSTClient } // NewImportClient returns a new ImporterClient -func NewImportClient(metaClient restore_util.Client) ImporterClient { +func NewImportClient(metaClient SplitClient) ImporterClient { return &importClient{ metaClient: metaClient, clients: make(map[uint64]import_sstpb.ImportSSTClient), @@ -133,7 +132,7 @@ func (ic *importClient) getImportClient( // FileImporter used to import a file to TiKV. type FileImporter struct { - metaClient restore_util.Client + metaClient SplitClient importClient ImporterClient backend *backup.StorageBackend rateLimit uint64 @@ -145,7 +144,7 @@ type FileImporter struct { // NewFileImporter returns a new file importClient. func NewFileImporter( ctx context.Context, - metaClient restore_util.Client, + metaClient SplitClient, importClient ImporterClient, backend *backup.StorageBackend, rateLimit uint64, @@ -163,7 +162,7 @@ func NewFileImporter( // Import tries to import a file. // All rules must contain encoded keys. -func (importer *FileImporter) Import(file *backup.File, rewriteRules *restore_util.RewriteRules) error { +func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRules) error { log.Debug("import file", zap.Stringer("file", file)) // Rewrite the start key and end key of file to scan regions startKey, endKey, err := rewriteFileKeys(file, rewriteRules) @@ -179,9 +178,9 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *restore_ut ctx, cancel := context.WithTimeout(importer.ctx, importScanResgionTime) defer cancel() // Scan regions covered by the file range - regionInfos, err := importer.metaClient.ScanRegions(ctx, startKey, endKey, 0) - if err != nil { - return errors.Trace(err) + regionInfos, err1 := importer.metaClient.ScanRegions(ctx, startKey, endKey, 0) + if err1 != nil { + return errors.Trace(err1) } log.Debug("scan regions", zap.Stringer("file", file), zap.Int("count", len(regionInfos))) // Try to download and ingest the file in every region @@ -190,20 +189,20 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *restore_ut info := regionInfo // Try to download file. err = withRetry(func() error { - var err error + var err2 error var isEmpty bool - downloadMeta, isEmpty, err = importer.downloadSST(info, file, rewriteRules) - if err != nil { + downloadMeta, isEmpty, err2 = importer.downloadSST(info, file, rewriteRules) + if err2 != nil { if err != errRewriteRuleNotFound { log.Warn("download file failed", zap.Stringer("file", file), zap.Stringer("region", info.Region), zap.Binary("startKey", startKey), zap.Binary("endKey", endKey), - zap.Error(err), + zap.Error(err2), ) } - return err + return err2 } if isEmpty { log.Info( @@ -255,9 +254,9 @@ func (importer *FileImporter) setDownloadSpeedLimit(storeID uint64) error { } func (importer *FileImporter) downloadSST( - regionInfo *restore_util.RegionInfo, + regionInfo *RegionInfo, file *backup.File, - rewriteRules *restore_util.RewriteRules, + rewriteRules *RewriteRules, ) (*import_sstpb.SSTMeta, bool, error) { id, err := uuid.New().MarshalBinary() if err != nil { @@ -312,7 +311,7 @@ func (importer *FileImporter) downloadSST( func (importer *FileImporter) ingestSST( sstMeta *import_sstpb.SSTMeta, - regionInfo *restore_util.RegionInfo, + regionInfo *RegionInfo, ) error { leader := regionInfo.Leader if leader == nil { diff --git a/pkg/restore/range.go b/pkg/restore/range.go new file mode 100644 index 000000000..f3914539e --- /dev/null +++ b/pkg/restore/range.go @@ -0,0 +1,148 @@ +package restore + +import ( + "bytes" + "fmt" + + "github.com/google/btree" + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/import_sstpb" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" + "github.com/pingcap/tidb/tablecodec" + "go.uber.org/zap" +) + +// Range represents a range of keys. +type Range struct { + StartKey []byte + EndKey []byte +} + +// String formats a range to a string +func (r *Range) String() string { + return fmt.Sprintf("[%x %x]", r.StartKey, r.EndKey) +} + +// Less compares a range with a btree.Item +func (r *Range) Less(than btree.Item) bool { + t := than.(*Range) + return len(r.EndKey) != 0 && bytes.Compare(r.EndKey, t.StartKey) <= 0 +} + +// contains returns if a key is included in the range. +func (r *Range) contains(key []byte) bool { + start, end := r.StartKey, r.EndKey + return bytes.Compare(key, start) >= 0 && + (len(end) == 0 || bytes.Compare(key, end) < 0) +} + +// sortRanges checks if the range overlapped and sort them +func sortRanges(ranges []Range, rewriteRules *RewriteRules) ([]Range, error) { + rangeTree := NewRangeTree() + for _, rg := range ranges { + if rewriteRules != nil { + startID := tablecodec.DecodeTableID(rg.StartKey) + endID := tablecodec.DecodeTableID(rg.EndKey) + var rule *import_sstpb.RewriteRule + if startID == endID { + rg.StartKey, rule = replacePrefix(rg.StartKey, rewriteRules) + if rule == nil { + log.Warn("cannot find rewrite rule", zap.Binary("key", rg.StartKey)) + } else { + log.Debug( + "rewrite start key", + zap.Binary("key", rg.StartKey), + zap.Stringer("rule", rule)) + } + rg.EndKey, rule = replacePrefix(rg.EndKey, rewriteRules) + if rule == nil { + log.Warn("cannot find rewrite rule", zap.Binary("key", rg.EndKey)) + } else { + log.Debug( + "rewrite end key", + zap.Binary("key", rg.EndKey), + zap.Stringer("rule", rule)) + } + } else { + log.Warn("table id does not match", + zap.Binary("startKey", rg.StartKey), + zap.Binary("endKey", rg.EndKey), + zap.Int64("startID", startID), + zap.Int64("endID", endID)) + return nil, errors.New("table id does not match") + } + } + if out := rangeTree.InsertRange(rg); out != nil { + return nil, errors.Errorf("ranges overlapped: %s, %s", out, rg) + } + } + sortedRanges := make([]Range, 0, len(ranges)) + rangeTree.Ascend(func(rg *Range) bool { + if rg == nil { + return false + } + sortedRanges = append(sortedRanges, *rg) + return true + }) + return sortedRanges, nil +} + +// RangeTree stores the ranges in an orderly manner. +// All the ranges it stored do not overlap. +type RangeTree struct { + tree *btree.BTree +} + +// NewRangeTree returns a new RangeTree. +func NewRangeTree() *RangeTree { + return &RangeTree{tree: btree.New(32)} +} + +// Find returns nil or a range in the range tree +func (rt *RangeTree) Find(key []byte) *Range { + var ret *Range + r := &Range{ + StartKey: key, + } + rt.tree.DescendLessOrEqual(r, func(i btree.Item) bool { + ret = i.(*Range) + return false + }) + if ret == nil || !ret.contains(key) { + return nil + } + return ret +} + +// InsertRange inserts ranges into the range tree. +// it returns true if all ranges inserted successfully. +// it returns false if there are some overlapped ranges. +func (rt *RangeTree) InsertRange(rg Range) btree.Item { + return rt.tree.ReplaceOrInsert(&rg) +} + +// RangeIterator allows callers of Ascend to iterate in-order over portions of +// the tree. When this function returns false, iteration will stop and the +// associated Ascend function will immediately return. +type RangeIterator func(rg *Range) bool + +// Ascend calls the iterator for every value in the tree within [first, last], +// until the iterator returns false. +func (rt *RangeTree) Ascend(iterator RangeIterator) { + rt.tree.Ascend(func(i btree.Item) bool { + return iterator(i.(*Range)) + }) +} + +// RegionInfo includes a region and the leader of the region. +type RegionInfo struct { + Region *metapb.Region + Leader *metapb.Peer +} + +// RewriteRules contains rules for rewriting keys of tables. +type RewriteRules struct { + Table []*import_sstpb.RewriteRule + Data []*import_sstpb.RewriteRule +} diff --git a/pkg/restore/range_test.go b/pkg/restore/range_test.go new file mode 100644 index 000000000..a9edc5b82 --- /dev/null +++ b/pkg/restore/range_test.go @@ -0,0 +1,75 @@ +package restore + +import ( + "bytes" + + . "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/import_sstpb" + "github.com/pingcap/tidb/tablecodec" +) + +type testRangeSuite struct{} + +var _ = Suite(&testRangeSuite{}) + +type rangeEquals struct { + *CheckerInfo +} + +var RangeEquals Checker = &rangeEquals{ + &CheckerInfo{Name: "RangeEquals", Params: []string{"obtained", "expected"}}, +} + +func (checker *rangeEquals) Check(params []interface{}, names []string) (result bool, error string) { + obtained := params[0].([]Range) + expected := params[1].([]Range) + if len(obtained) != len(expected) { + return false, "" + } + for i := range obtained { + if !bytes.Equal(obtained[i].StartKey, expected[i].StartKey) || + !bytes.Equal(obtained[i].EndKey, expected[i].EndKey) { + return false, "" + } + } + return true, "" +} + +func (s *testRangeSuite) TestSortRange(c *C) { + dataRules := []*import_sstpb.RewriteRule{ + {OldKeyPrefix: tablecodec.GenTableRecordPrefix(1), NewKeyPrefix: tablecodec.GenTableRecordPrefix(4)}, + {OldKeyPrefix: tablecodec.GenTableRecordPrefix(2), NewKeyPrefix: tablecodec.GenTableRecordPrefix(5)}, + } + rewriteRules := &RewriteRules{ + Table: make([]*import_sstpb.RewriteRule, 0), + Data: dataRules, + } + ranges1 := []Range{ + {append(tablecodec.GenTableRecordPrefix(1), []byte("aaa")...), + append(tablecodec.GenTableRecordPrefix(1), []byte("bbb")...)}, + } + rs1, err := sortRanges(ranges1, rewriteRules) + c.Assert(err, IsNil, Commentf("sort range1 failed: %v", err)) + c.Assert(rs1, RangeEquals, []Range{ + {append(tablecodec.GenTableRecordPrefix(4), []byte("aaa")...), + append(tablecodec.GenTableRecordPrefix(4), []byte("bbb")...)}, + }) + + ranges2 := []Range{ + {append(tablecodec.GenTableRecordPrefix(1), []byte("aaa")...), + append(tablecodec.GenTableRecordPrefix(2), []byte("bbb")...)}, + } + _, err = sortRanges(ranges2, rewriteRules) + c.Assert(err, ErrorMatches, ".*table id does not match.*") + + ranges3 := initRanges() + rewriteRules1 := initRewriteRules() + rs3, err := sortRanges(ranges3, rewriteRules1) + c.Assert(err, IsNil, Commentf("sort range1 failed: %v", err)) + c.Assert(rs3, RangeEquals, []Range{ + {[]byte("bbd"), []byte("bbf")}, + {[]byte("bbf"), []byte("bbj")}, + {[]byte("xxa"), []byte("xxe")}, + {[]byte("xxe"), []byte("xxz")}, + }) +} diff --git a/pkg/restore/split.go b/pkg/restore/split.go new file mode 100644 index 000000000..31b23a60f --- /dev/null +++ b/pkg/restore/split.go @@ -0,0 +1,305 @@ +package restore + +import ( + "bytes" + "context" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/import_sstpb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" + "github.com/pingcap/tidb/util/codec" + "go.uber.org/zap" +) + +// Constants for split retry machinery. +const ( + SplitRetryTimes = 32 + SplitRetryInterval = 50 * time.Millisecond + SplitMaxRetryInterval = time.Second + + SplitCheckMaxRetryTimes = 64 + SplitCheckInterval = 8 * time.Millisecond + SplitMaxCheckInterval = time.Second + + ScatterWaitMaxRetryTimes = 64 + ScatterWaitInterval = 50 * time.Millisecond + ScatterMaxWaitInterval = time.Second + + ScatterWaitUpperInterval = 180 * time.Second +) + +// RegionSplitter is a executor of region split by rules. +type RegionSplitter struct { + client SplitClient +} + +// NewRegionSplitter returns a new RegionSplitter. +func NewRegionSplitter(client SplitClient) *RegionSplitter { + return &RegionSplitter{ + client: client, + } +} + +// OnSplitFunc is called before split a range. +type OnSplitFunc func(key [][]byte) + +// Split executes a region split. It will split regions by the rewrite rules, +// then it will split regions by the end key of each range. +// tableRules includes the prefix of a table, since some ranges may have +// a prefix with record sequence or index sequence. +// note: all ranges and rewrite rules must have raw key. +func (rs *RegionSplitter) Split( + ctx context.Context, + ranges []Range, + rewriteRules *RewriteRules, + onSplit OnSplitFunc, +) error { + if len(ranges) == 0 { + return nil + } + startTime := time.Now() + // Sort the range for getting the min and max key of the ranges + sortedRanges, err := sortRanges(ranges, rewriteRules) + if err != nil { + return errors.Trace(err) + } + minKey := codec.EncodeBytes([]byte{}, sortedRanges[0].StartKey) + maxKey := codec.EncodeBytes([]byte{}, sortedRanges[len(sortedRanges)-1].EndKey) + for _, rule := range rewriteRules.Table { + if bytes.Compare(minKey, rule.GetNewKeyPrefix()) > 0 { + minKey = rule.GetNewKeyPrefix() + } + if bytes.Compare(maxKey, rule.GetNewKeyPrefix()) < 0 { + maxKey = rule.GetNewKeyPrefix() + } + } + for _, rule := range rewriteRules.Data { + if bytes.Compare(minKey, rule.GetNewKeyPrefix()) > 0 { + minKey = rule.GetNewKeyPrefix() + } + if bytes.Compare(maxKey, rule.GetNewKeyPrefix()) < 0 { + maxKey = rule.GetNewKeyPrefix() + } + } + interval := SplitRetryInterval + scatterRegions := make([]*RegionInfo, 0) +SplitRegions: + for i := 0; i < SplitRetryTimes; i++ { + var regions []*RegionInfo + regions, err = rs.client.ScanRegions(ctx, minKey, maxKey, 0) + if err != nil { + return errors.Trace(err) + } + if len(regions) == 0 { + log.Warn("cannot scan any region") + return nil + } + splitKeyMap := getSplitKeys(rewriteRules, sortedRanges, regions) + regionMap := make(map[uint64]*RegionInfo) + for _, region := range regions { + regionMap[region.Region.GetId()] = region + } + for regionID, keys := range splitKeyMap { + var newRegions []*RegionInfo + newRegions, err = rs.splitAndScatterRegions(ctx, regionMap[regionID], keys) + if err != nil { + interval = 2 * interval + if interval > SplitMaxRetryInterval { + interval = SplitMaxRetryInterval + } + time.Sleep(interval) + if i > 3 { + log.Warn("splitting regions failed, retry it", zap.Error(err)) + } + continue SplitRegions + } + scatterRegions = append(scatterRegions, newRegions...) + onSplit(keys) + } + break + } + if err != nil { + return errors.Trace(err) + } + log.Info("splitting regions done, wait for scattering regions", + zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime))) + startTime = time.Now() + scatterCount := 0 + for _, region := range scatterRegions { + rs.waitForScatterRegion(ctx, region) + if time.Since(startTime) > ScatterWaitUpperInterval { + break + } + scatterCount++ + } + if scatterCount == len(scatterRegions) { + log.Info("waiting for scattering regions done", + zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime))) + } else { + log.Warn("waiting for scattering regions timeout", + zap.Int("scatterCount", scatterCount), + zap.Int("regions", len(scatterRegions)), + zap.Duration("take", time.Since(startTime))) + } + return nil +} + +func (rs *RegionSplitter) hasRegion(ctx context.Context, regionID uint64) (bool, error) { + regionInfo, err := rs.client.GetRegionByID(ctx, regionID) + if err != nil { + return false, err + } + return regionInfo != nil, nil +} + +func (rs *RegionSplitter) isScatterRegionFinished(ctx context.Context, regionID uint64) (bool, error) { + resp, err := rs.client.GetOperator(ctx, regionID) + if err != nil { + return false, err + } + // Heartbeat may not be sent to PD + if respErr := resp.GetHeader().GetError(); respErr != nil { + if respErr.GetType() == pdpb.ErrorType_REGION_NOT_FOUND { + return true, nil + } + return false, errors.Errorf("get operator error: %s", respErr.GetType()) + } + retryTimes := ctx.Value(retryTimes).(int) + if retryTimes > 3 { + log.Warn("get operator", zap.Uint64("regionID", regionID), zap.Stringer("resp", resp)) + } + // If the current operator of the region is not 'scatter-region', we could assume + // that 'scatter-operator' has finished or timeout + ok := string(resp.GetDesc()) != "scatter-region" || resp.GetStatus() != pdpb.OperatorStatus_RUNNING + return ok, nil +} + +func (rs *RegionSplitter) waitForSplit(ctx context.Context, regionID uint64) { + interval := SplitCheckInterval + for i := 0; i < SplitCheckMaxRetryTimes; i++ { + ok, err := rs.hasRegion(ctx, regionID) + if err != nil { + log.Warn("wait for split failed", zap.Error(err)) + return + } + if ok { + break + } + interval = 2 * interval + if interval > SplitMaxCheckInterval { + interval = SplitMaxCheckInterval + } + time.Sleep(interval) + } +} + +type retryTimeKey struct{} + +var retryTimes = new(retryTimeKey) + +func (rs *RegionSplitter) waitForScatterRegion(ctx context.Context, regionInfo *RegionInfo) { + interval := ScatterWaitInterval + regionID := regionInfo.Region.GetId() + for i := 0; i < ScatterWaitMaxRetryTimes; i++ { + ctx1 := context.WithValue(ctx, retryTimes, i) + ok, err := rs.isScatterRegionFinished(ctx1, regionID) + if err != nil { + log.Warn("scatter region failed: do not have the region", + zap.Stringer("region", regionInfo.Region)) + return + } + if ok { + break + } + interval = 2 * interval + if interval > ScatterMaxWaitInterval { + interval = ScatterMaxWaitInterval + } + time.Sleep(interval) + } +} + +func (rs *RegionSplitter) splitAndScatterRegions( + ctx context.Context, regionInfo *RegionInfo, keys [][]byte, +) ([]*RegionInfo, error) { + newRegions, err := rs.client.BatchSplitRegions(ctx, regionInfo, keys) + if err != nil { + return nil, err + } + for _, region := range newRegions { + // Wait for a while until the regions successfully splits. + rs.waitForSplit(ctx, region.Region.Id) + if err = rs.client.ScatterRegion(ctx, region); err != nil { + log.Warn("scatter region failed", zap.Stringer("region", region.Region), zap.Error(err)) + } + } + return newRegions, nil +} + +// getSplitKeys checks if the regions should be split by the new prefix of the rewrites rule and the end key of +// the ranges, groups the split keys by region id +func getSplitKeys(rewriteRules *RewriteRules, ranges []Range, regions []*RegionInfo) map[uint64][][]byte { + splitKeyMap := make(map[uint64][][]byte) + checkKeys := make([][]byte, 0) + for _, rule := range rewriteRules.Table { + checkKeys = append(checkKeys, rule.GetNewKeyPrefix()) + } + for _, rule := range rewriteRules.Data { + checkKeys = append(checkKeys, rule.GetNewKeyPrefix()) + } + for _, rg := range ranges { + checkKeys = append(checkKeys, rg.EndKey) + } + for _, key := range checkKeys { + if region := needSplit(key, regions); region != nil { + splitKeys, ok := splitKeyMap[region.Region.GetId()] + if !ok { + splitKeys = make([][]byte, 0, 1) + } + splitKeyMap[region.Region.GetId()] = append(splitKeys, key) + } + } + return splitKeyMap +} + +// needSplit checks whether a key is necessary to split, if true returns the split region +func needSplit(splitKey []byte, regions []*RegionInfo) *RegionInfo { + // If splitKey is the max key. + if len(splitKey) == 0 { + return nil + } + splitKey = codec.EncodeBytes([]byte{}, splitKey) + for _, region := range regions { + // If splitKey is the boundary of the region + if bytes.Equal(splitKey, region.Region.GetStartKey()) { + return nil + } + // If splitKey is in a region + if bytes.Compare(splitKey, region.Region.GetStartKey()) > 0 && beforeEnd(splitKey, region.Region.GetEndKey()) { + return region + } + } + return nil +} + +func beforeEnd(key []byte, end []byte) bool { + return bytes.Compare(key, end) < 0 || len(end) == 0 +} + +func replacePrefix(s []byte, rewriteRules *RewriteRules) ([]byte, *import_sstpb.RewriteRule) { + // We should search the dataRules firstly. + for _, rule := range rewriteRules.Data { + if bytes.HasPrefix(s, rule.GetOldKeyPrefix()) { + return append(append([]byte{}, rule.GetNewKeyPrefix()...), s[len(rule.GetOldKeyPrefix()):]...), rule + } + } + for _, rule := range rewriteRules.Table { + if bytes.HasPrefix(s, rule.GetOldKeyPrefix()) { + return append(append([]byte{}, rule.GetNewKeyPrefix()...), s[len(rule.GetOldKeyPrefix()):]...), rule + } + } + + return s, nil +} diff --git a/pkg/restore/split_client.go b/pkg/restore/split_client.go new file mode 100644 index 000000000..8a618a191 --- /dev/null +++ b/pkg/restore/split_client.go @@ -0,0 +1,353 @@ +package restore + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "path" + "strconv" + "strings" + "sync" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/kvproto/pkg/tikvpb" + pd "github.com/pingcap/pd/client" + "github.com/pingcap/pd/server/schedule/placement" + "google.golang.org/grpc" +) + +// SplitClient is an external client used by RegionSplitter. +type SplitClient interface { + // GetStore gets a store by a store id. + GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) + // GetRegion gets a region which includes a specified key. + GetRegion(ctx context.Context, key []byte) (*RegionInfo, error) + // GetRegionByID gets a region by a region id. + GetRegionByID(ctx context.Context, regionID uint64) (*RegionInfo, error) + // SplitRegion splits a region from a key, if key is not included in the region, it will return nil. + // note: the key should not be encoded + SplitRegion(ctx context.Context, regionInfo *RegionInfo, key []byte) (*RegionInfo, error) + // BatchSplitRegions splits a region from a batch of keys. + // note: the keys should not be encoded + BatchSplitRegions(ctx context.Context, regionInfo *RegionInfo, keys [][]byte) ([]*RegionInfo, error) + // ScatterRegion scatters a specified region. + ScatterRegion(ctx context.Context, regionInfo *RegionInfo) error + // GetOperator gets the status of operator of the specified region. + GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) + // ScanRegion gets a list of regions, starts from the region that contains key. + // Limit limits the maximum number of regions returned. + ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*RegionInfo, error) + // GetPlacementRule loads a placement rule from PD. + GetPlacementRule(ctx context.Context, groupID, ruleID string) (placement.Rule, error) + // SetPlacementRule insert or update a placement rule to PD. + SetPlacementRule(ctx context.Context, rule placement.Rule) error + // DeletePlacementRule removes a placement rule from PD. + DeletePlacementRule(ctx context.Context, groupID, ruleID string) error + // SetStoreLabel add or update specified label of stores. If labelValue + // is empty, it clears the label. + SetStoresLabel(ctx context.Context, stores []uint64, labelKey, labelValue string) error +} + +// pdClient is a wrapper of pd client, can be used by RegionSplitter. +type pdClient struct { + mu sync.Mutex + client pd.Client + storeCache map[uint64]*metapb.Store +} + +// NewSplitClient returns a client used by RegionSplitter. +func NewSplitClient(client pd.Client) SplitClient { + return &pdClient{ + client: client, + storeCache: make(map[uint64]*metapb.Store), + } +} + +func (c *pdClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) { + c.mu.Lock() + defer c.mu.Unlock() + store, ok := c.storeCache[storeID] + if ok { + return store, nil + } + store, err := c.client.GetStore(ctx, storeID) + if err != nil { + return nil, err + } + c.storeCache[storeID] = store + return store, nil + +} + +func (c *pdClient) GetRegion(ctx context.Context, key []byte) (*RegionInfo, error) { + region, leader, err := c.client.GetRegion(ctx, key) + if err != nil { + return nil, err + } + if region == nil { + return nil, nil + } + return &RegionInfo{ + Region: region, + Leader: leader, + }, nil +} + +func (c *pdClient) GetRegionByID(ctx context.Context, regionID uint64) (*RegionInfo, error) { + region, leader, err := c.client.GetRegionByID(ctx, regionID) + if err != nil { + return nil, err + } + if region == nil { + return nil, nil + } + return &RegionInfo{ + Region: region, + Leader: leader, + }, nil +} + +func (c *pdClient) SplitRegion(ctx context.Context, regionInfo *RegionInfo, key []byte) (*RegionInfo, error) { + var peer *metapb.Peer + if regionInfo.Leader != nil { + peer = regionInfo.Leader + } else { + if len(regionInfo.Region.Peers) == 0 { + return nil, errors.New("region does not have peer") + } + peer = regionInfo.Region.Peers[0] + } + storeID := peer.GetStoreId() + store, err := c.GetStore(ctx, storeID) + if err != nil { + return nil, err + } + conn, err := grpc.Dial(store.GetAddress(), grpc.WithInsecure()) + if err != nil { + return nil, err + } + defer conn.Close() + + client := tikvpb.NewTikvClient(conn) + resp, err := client.SplitRegion(ctx, &kvrpcpb.SplitRegionRequest{ + Context: &kvrpcpb.Context{ + RegionId: regionInfo.Region.Id, + RegionEpoch: regionInfo.Region.RegionEpoch, + Peer: peer, + }, + SplitKey: key, + }) + if err != nil { + return nil, err + } + if resp.RegionError != nil { + return nil, errors.Errorf("split region failed: region=%v, key=%x, err=%v", regionInfo.Region, key, resp.RegionError) + } + + // BUG: Left is deprecated, it may be nil even if split is succeed! + // Assume the new region is the left one. + newRegion := resp.GetLeft() + if newRegion == nil { + regions := resp.GetRegions() + for _, r := range regions { + if bytes.Equal(r.GetStartKey(), regionInfo.Region.GetStartKey()) { + newRegion = r + break + } + } + } + if newRegion == nil { + return nil, errors.New("split region failed: new region is nil") + } + var leader *metapb.Peer + // Assume the leaders will be at the same store. + if regionInfo.Leader != nil { + for _, p := range newRegion.GetPeers() { + if p.GetStoreId() == regionInfo.Leader.GetStoreId() { + leader = p + break + } + } + } + return &RegionInfo{ + Region: newRegion, + Leader: leader, + }, nil +} + +func (c *pdClient) BatchSplitRegions( + ctx context.Context, regionInfo *RegionInfo, keys [][]byte, +) ([]*RegionInfo, error) { + var peer *metapb.Peer + if regionInfo.Leader != nil { + peer = regionInfo.Leader + } else { + if len(regionInfo.Region.Peers) == 0 { + return nil, errors.New("region does not have peer") + } + peer = regionInfo.Region.Peers[0] + } + + storeID := peer.GetStoreId() + store, err := c.GetStore(ctx, storeID) + if err != nil { + return nil, err + } + conn, err := grpc.Dial(store.GetAddress(), grpc.WithInsecure()) + if err != nil { + return nil, err + } + defer conn.Close() + client := tikvpb.NewTikvClient(conn) + resp, err := client.SplitRegion(ctx, &kvrpcpb.SplitRegionRequest{ + Context: &kvrpcpb.Context{ + RegionId: regionInfo.Region.Id, + RegionEpoch: regionInfo.Region.RegionEpoch, + Peer: peer, + }, + SplitKeys: keys, + }) + if err != nil { + return nil, err + } + if resp.RegionError != nil { + return nil, errors.Errorf("split region failed: region=%v, err=%v", regionInfo.Region, resp.RegionError) + } + + regions := resp.GetRegions() + newRegionInfos := make([]*RegionInfo, 0, len(regions)) + for _, region := range regions { + // Skip the original region + if region.GetId() == regionInfo.Region.GetId() { + continue + } + var leader *metapb.Peer + // Assume the leaders will be at the same store. + if regionInfo.Leader != nil { + for _, p := range region.GetPeers() { + if p.GetStoreId() == regionInfo.Leader.GetStoreId() { + leader = p + break + } + } + } + newRegionInfos = append(newRegionInfos, &RegionInfo{ + Region: region, + Leader: leader, + }) + } + return newRegionInfos, nil +} + +func (c *pdClient) ScatterRegion(ctx context.Context, regionInfo *RegionInfo) error { + return c.client.ScatterRegion(ctx, regionInfo.Region.GetId()) +} + +func (c *pdClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) { + return c.client.GetOperator(ctx, regionID) +} + +func (c *pdClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*RegionInfo, error) { + regions, leaders, err := c.client.ScanRegions(ctx, key, endKey, limit) + if err != nil { + return nil, err + } + regionInfos := make([]*RegionInfo, 0, len(regions)) + for i := range regions { + regionInfos = append(regionInfos, &RegionInfo{ + Region: regions[i], + Leader: leaders[i], + }) + } + return regionInfos, nil +} + +func (c *pdClient) GetPlacementRule(ctx context.Context, groupID, ruleID string) (placement.Rule, error) { + var rule placement.Rule + addr := c.getPDAPIAddr() + if addr == "" { + return rule, errors.New("failed to add stores labels: no leader") + } + req, _ := http.NewRequestWithContext(ctx, "GET", addr+path.Join("/pd/api/v1/config/rule", groupID, ruleID), nil) + res, err := http.DefaultClient.Do(req) + if err != nil { + return rule, errors.WithStack(err) + } + b, err := ioutil.ReadAll(res.Body) + if err != nil { + return rule, errors.WithStack(err) + } + res.Body.Close() + err = json.Unmarshal(b, &rule) + if err != nil { + return rule, errors.WithStack(err) + } + return rule, nil +} + +func (c *pdClient) SetPlacementRule(ctx context.Context, rule placement.Rule) error { + addr := c.getPDAPIAddr() + if addr == "" { + return errors.New("failed to add stores labels: no leader") + } + m, _ := json.Marshal(rule) + req, _ := http.NewRequestWithContext(ctx, "POST", addr+path.Join("/pd/api/v1/config/rule"), bytes.NewReader(m)) + res, err := http.DefaultClient.Do(req) + if err != nil { + return errors.WithStack(err) + } + return errors.Trace(res.Body.Close()) +} + +func (c *pdClient) DeletePlacementRule(ctx context.Context, groupID, ruleID string) error { + addr := c.getPDAPIAddr() + if addr == "" { + return errors.New("failed to add stores labels: no leader") + } + req, _ := http.NewRequestWithContext(ctx, "DELETE", addr+path.Join("/pd/api/v1/config/rule", groupID, ruleID), nil) + res, err := http.DefaultClient.Do(req) + if err != nil { + return errors.WithStack(err) + } + return errors.Trace(res.Body.Close()) +} + +func (c *pdClient) SetStoresLabel( + ctx context.Context, stores []uint64, labelKey, labelValue string, +) error { + b := []byte(fmt.Sprintf(`{"%s": "%s"}`, labelKey, labelValue)) + addr := c.getPDAPIAddr() + if addr == "" { + return errors.New("failed to add stores labels: no leader") + } + for _, id := range stores { + req, _ := http.NewRequestWithContext( + ctx, "POST", + addr+path.Join("/pd/api/v1/store", strconv.FormatUint(id, 10), "label"), + bytes.NewReader(b), + ) + res, err := http.DefaultClient.Do(req) + if err != nil { + return errors.WithStack(err) + } + err = res.Body.Close() + if err != nil { + return errors.Trace(err) + } + } + return nil +} + +func (c *pdClient) getPDAPIAddr() string { + addr := c.client.GetLeaderAddr() + if addr != "" && !strings.HasPrefix(addr, "http") { + addr = "http://" + addr + } + return strings.TrimRight(addr, "/") +} diff --git a/pkg/restore/split_test.go b/pkg/restore/split_test.go new file mode 100644 index 000000000..509c4cfa0 --- /dev/null +++ b/pkg/restore/split_test.go @@ -0,0 +1,301 @@ +package restore + +import ( + "bytes" + "context" + "sync" + + . "github.com/pingcap/check" + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/import_sstpb" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/pd/server/schedule/placement" + "github.com/pingcap/tidb/util/codec" +) + +type testClient struct { + mu sync.RWMutex + stores map[uint64]*metapb.Store + regions map[uint64]*RegionInfo + nextRegionID uint64 +} + +func newTestClient(stores map[uint64]*metapb.Store, regions map[uint64]*RegionInfo, nextRegionID uint64) *testClient { + return &testClient{ + stores: stores, + regions: regions, + nextRegionID: nextRegionID, + } +} + +func (c *testClient) GetAllRegions() map[uint64]*RegionInfo { + c.mu.RLock() + defer c.mu.RUnlock() + return c.regions +} + +func (c *testClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) { + c.mu.RLock() + defer c.mu.RUnlock() + store, ok := c.stores[storeID] + if !ok { + return nil, errors.Errorf("store not found") + } + return store, nil +} + +func (c *testClient) GetRegion(ctx context.Context, key []byte) (*RegionInfo, error) { + c.mu.RLock() + defer c.mu.RUnlock() + for _, region := range c.regions { + if bytes.Compare(key, region.Region.StartKey) >= 0 && + (len(region.Region.EndKey) == 0 || bytes.Compare(key, region.Region.EndKey) < 0) { + return region, nil + } + } + return nil, errors.Errorf("region not found: key=%s", string(key)) +} + +func (c *testClient) GetRegionByID(ctx context.Context, regionID uint64) (*RegionInfo, error) { + c.mu.RLock() + defer c.mu.RUnlock() + region, ok := c.regions[regionID] + if !ok { + return nil, errors.Errorf("region not found: id=%d", regionID) + } + return region, nil +} + +func (c *testClient) SplitRegion(ctx context.Context, regionInfo *RegionInfo, key []byte) (*RegionInfo, error) { + c.mu.Lock() + defer c.mu.Unlock() + var target *RegionInfo + splitKey := codec.EncodeBytes([]byte{}, key) + for _, region := range c.regions { + if bytes.Compare(splitKey, region.Region.StartKey) >= 0 && + (len(region.Region.EndKey) == 0 || bytes.Compare(splitKey, region.Region.EndKey) < 0) { + target = region + } + } + if target == nil { + return nil, errors.Errorf("region not found: key=%s", string(key)) + } + newRegion := &RegionInfo{ + Region: &metapb.Region{ + Peers: target.Region.Peers, + Id: c.nextRegionID, + StartKey: target.Region.StartKey, + EndKey: splitKey, + }, + } + c.regions[c.nextRegionID] = newRegion + c.nextRegionID++ + target.Region.StartKey = splitKey + c.regions[target.Region.Id] = target + return newRegion, nil +} + +func (c *testClient) BatchSplitRegions( + ctx context.Context, regionInfo *RegionInfo, keys [][]byte, +) ([]*RegionInfo, error) { + c.mu.Lock() + defer c.mu.Unlock() + newRegions := make([]*RegionInfo, 0) + for _, key := range keys { + var target *RegionInfo + splitKey := codec.EncodeBytes([]byte{}, key) + for _, region := range c.regions { + if bytes.Compare(splitKey, region.Region.GetStartKey()) > 0 && + beforeEnd(splitKey, region.Region.GetEndKey()) { + target = region + } + } + if target == nil { + continue + } + newRegion := &RegionInfo{ + Region: &metapb.Region{ + Peers: target.Region.Peers, + Id: c.nextRegionID, + StartKey: target.Region.StartKey, + EndKey: splitKey, + }, + } + c.regions[c.nextRegionID] = newRegion + c.nextRegionID++ + target.Region.StartKey = splitKey + c.regions[target.Region.Id] = target + newRegions = append(newRegions, newRegion) + } + return newRegions, nil +} + +func (c *testClient) ScatterRegion(ctx context.Context, regionInfo *RegionInfo) error { + return nil +} + +func (c *testClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) { + return &pdpb.GetOperatorResponse{ + Header: new(pdpb.ResponseHeader), + }, nil +} + +func (c *testClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*RegionInfo, error) { + regions := make([]*RegionInfo, 0) + for _, region := range c.regions { + if limit > 0 && len(regions) >= limit { + break + } + if (len(region.Region.GetEndKey()) != 0 && bytes.Compare(region.Region.GetEndKey(), key) <= 0) || + bytes.Compare(region.Region.GetStartKey(), endKey) > 0 { + continue + } + regions = append(regions, region) + } + return regions, nil +} + +func (c *testClient) GetPlacementRule(ctx context.Context, groupID, ruleID string) (r placement.Rule, err error) { + return +} + +func (c *testClient) SetPlacementRule(ctx context.Context, rule placement.Rule) error { + return nil +} + +func (c *testClient) DeletePlacementRule(ctx context.Context, groupID, ruleID string) error { + return nil +} + +func (c *testClient) SetStoresLabel(ctx context.Context, stores []uint64, labelKey, labelValue string) error { + return nil +} + +// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) +// range: [aaa, aae), [aae, aaz), [ccd, ccf), [ccf, ccj) +// rewrite rules: aa -> xx, cc -> bb +// expected regions after split: +// [, aay), [aay, bb), [bb, bba), [bba, bbf), [bbf, bbh), [bbh, bbj), +// [bbj, cca), [cca, xx), [xx, xxe), [xxe, xxz), [xxz, ) +func (s *testRestoreUtilSuite) TestSplit(c *C) { + client := initTestClient() + ranges := initRanges() + rewriteRules := initRewriteRules() + regionSplitter := NewRegionSplitter(client) + + ctx := context.Background() + err := regionSplitter.Split(ctx, ranges, rewriteRules, func(key [][]byte) {}) + if err != nil { + c.Assert(err, IsNil, Commentf("split regions failed: %v", err)) + } + regions := client.GetAllRegions() + if !validateRegions(regions) { + for _, region := range regions { + c.Logf("region: %v\n", region.Region) + } + c.Log("get wrong result") + c.Fail() + } +} + +// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) +func initTestClient() *testClient { + peers := make([]*metapb.Peer, 1) + peers[0] = &metapb.Peer{ + Id: 1, + StoreId: 1, + } + keys := [6]string{"", "aay", "bba", "bbh", "cca", ""} + regions := make(map[uint64]*RegionInfo) + for i := uint64(1); i < 6; i++ { + startKey := []byte(keys[i-1]) + if len(startKey) != 0 { + startKey = codec.EncodeBytes([]byte{}, startKey) + } + endKey := []byte(keys[i]) + if len(endKey) != 0 { + endKey = codec.EncodeBytes([]byte{}, endKey) + } + regions[i] = &RegionInfo{ + Region: &metapb.Region{ + Id: i, + Peers: peers, + StartKey: startKey, + EndKey: endKey, + }, + } + } + stores := make(map[uint64]*metapb.Store) + stores[1] = &metapb.Store{ + Id: 1, + } + return newTestClient(stores, regions, 6) +} + +// range: [aaa, aae), [aae, aaz), [ccd, ccf), [ccf, ccj) +func initRanges() []Range { + var ranges [4]Range + ranges[0] = Range{ + StartKey: []byte("aaa"), + EndKey: []byte("aae"), + } + ranges[1] = Range{ + StartKey: []byte("aae"), + EndKey: []byte("aaz"), + } + ranges[2] = Range{ + StartKey: []byte("ccd"), + EndKey: []byte("ccf"), + } + ranges[3] = Range{ + StartKey: []byte("ccf"), + EndKey: []byte("ccj"), + } + return ranges[:] +} + +func initRewriteRules() *RewriteRules { + var rules [2]*import_sstpb.RewriteRule + rules[0] = &import_sstpb.RewriteRule{ + OldKeyPrefix: []byte("aa"), + NewKeyPrefix: []byte("xx"), + } + rules[1] = &import_sstpb.RewriteRule{ + OldKeyPrefix: []byte("cc"), + NewKeyPrefix: []byte("bb"), + } + return &RewriteRules{ + Table: rules[:], + Data: rules[:], + } +} + +// expected regions after split: +// [, aay), [aay, bb), [bb, bba), [bba, bbf), [bbf, bbh), [bbh, bbj), +// [bbj, cca), [cca, xx), [xx, xxe), [xxe, xxz), [xxz, ) +func validateRegions(regions map[uint64]*RegionInfo) bool { + keys := [12]string{"", "aay", "bb", "bba", "bbf", "bbh", "bbj", "cca", "xx", "xxe", "xxz", ""} + if len(regions) != 11 { + return false + } +FindRegion: + for i := 1; i < 12; i++ { + for _, region := range regions { + startKey := []byte(keys[i-1]) + if len(startKey) != 0 { + startKey = codec.EncodeBytes([]byte{}, startKey) + } + endKey := []byte(keys[i]) + if len(endKey) != 0 { + endKey = codec.EncodeBytes([]byte{}, endKey) + } + if bytes.Equal(region.Region.GetStartKey(), startKey) && + bytes.Equal(region.Region.GetEndKey(), endKey) { + continue FindRegion + } + } + return false + } + return true +} diff --git a/pkg/restore/util.go b/pkg/restore/util.go index 126e864fd..a2e9e3e38 100644 --- a/pkg/restore/util.go +++ b/pkg/restore/util.go @@ -13,7 +13,6 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/pingcap/parser/model" - restore_util "github.com/pingcap/tidb-tools/pkg/restore-util" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" "go.uber.org/zap" @@ -76,7 +75,7 @@ func GetRewriteRules( newTable *model.TableInfo, oldTable *model.TableInfo, newTimeStamp uint64, -) *restore_util.RewriteRules { +) *RewriteRules { tableIDs := make(map[int64]int64) tableIDs[oldTable.ID] = newTable.ID if oldTable.Partition != nil { @@ -119,7 +118,7 @@ func GetRewriteRules( } } - return &restore_util.RewriteRules{ + return &RewriteRules{ Table: tableRules, Data: dataRules, } @@ -196,9 +195,9 @@ func withRetry( // ValidateFileRanges checks and returns the ranges of the files. func ValidateFileRanges( files []*backup.File, - rewriteRules *restore_util.RewriteRules, -) ([]restore_util.Range, error) { - ranges := make([]restore_util.Range, 0, len(files)) + rewriteRules *RewriteRules, +) ([]Range, error) { + ranges := make([]Range, 0, len(files)) fileAppended := make(map[string]bool) for _, file := range files { @@ -217,7 +216,7 @@ func ValidateFileRanges( zap.Stringer("file", file)) return nil, errors.New("table ids dont match") } - ranges = append(ranges, restore_util.Range{ + ranges = append(ranges, Range{ StartKey: file.GetStartKey(), EndKey: file.GetEndKey(), }) @@ -228,7 +227,7 @@ func ValidateFileRanges( } // ValidateFileRewriteRule uses rewrite rules to validate the ranges of a file -func ValidateFileRewriteRule(file *backup.File, rewriteRules *restore_util.RewriteRules) error { +func ValidateFileRewriteRule(file *backup.File, rewriteRules *RewriteRules) error { // Check if the start key has a matched rewrite key _, startRule := rewriteRawKey(file.GetStartKey(), rewriteRules) if rewriteRules != nil && startRule == nil { @@ -269,7 +268,7 @@ func ValidateFileRewriteRule(file *backup.File, rewriteRules *restore_util.Rewri } // Rewrites a raw key and returns a encoded key -func rewriteRawKey(key []byte, rewriteRules *restore_util.RewriteRules) ([]byte, *import_sstpb.RewriteRule) { +func rewriteRawKey(key []byte, rewriteRules *RewriteRules) ([]byte, *import_sstpb.RewriteRule) { if rewriteRules == nil { return codec.EncodeBytes([]byte{}, key), nil } @@ -281,7 +280,7 @@ func rewriteRawKey(key []byte, rewriteRules *restore_util.RewriteRules) ([]byte, return nil, nil } -func matchOldPrefix(key []byte, rewriteRules *restore_util.RewriteRules) *import_sstpb.RewriteRule { +func matchOldPrefix(key []byte, rewriteRules *RewriteRules) *import_sstpb.RewriteRule { for _, rule := range rewriteRules.Data { if bytes.HasPrefix(key, rule.GetOldKeyPrefix()) { return rule @@ -295,7 +294,7 @@ func matchOldPrefix(key []byte, rewriteRules *restore_util.RewriteRules) *import return nil } -func matchNewPrefix(key []byte, rewriteRules *restore_util.RewriteRules) *import_sstpb.RewriteRule { +func matchNewPrefix(key []byte, rewriteRules *RewriteRules) *import_sstpb.RewriteRule { for _, rule := range rewriteRules.Data { if bytes.HasPrefix(key, rule.GetNewKeyPrefix()) { return rule @@ -319,8 +318,8 @@ func truncateTS(key []byte) []byte { func SplitRanges( ctx context.Context, client *Client, - ranges []restore_util.Range, - rewriteRules *restore_util.RewriteRules, + ranges []Range, + rewriteRules *RewriteRules, updateCh chan<- struct{}, ) error { start := time.Now() @@ -328,7 +327,7 @@ func SplitRanges( elapsed := time.Since(start) summary.CollectDuration("split region", elapsed) }() - splitter := restore_util.NewRegionSplitter(restore_util.NewClient(client.GetPDClient())) + splitter := NewRegionSplitter(NewSplitClient(client.GetPDClient())) return splitter.Split(ctx, ranges, rewriteRules, func(keys [][]byte) { for range keys { updateCh <- struct{}{} @@ -336,7 +335,7 @@ func SplitRanges( }) } -func rewriteFileKeys(file *backup.File, rewriteRules *restore_util.RewriteRules) (startKey, endKey []byte, err error) { +func rewriteFileKeys(file *backup.File, rewriteRules *RewriteRules) (startKey, endKey []byte, err error) { startID := tablecodec.DecodeTableID(file.GetStartKey()) endID := tablecodec.DecodeTableID(file.GetEndKey()) var rule *import_sstpb.RewriteRule diff --git a/pkg/restore/util_test.go b/pkg/restore/util_test.go index 5da5c9ab7..bc4da9168 100644 --- a/pkg/restore/util_test.go +++ b/pkg/restore/util_test.go @@ -5,7 +5,6 @@ import ( "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/metapb" - restore_util "github.com/pingcap/tidb-tools/pkg/restore-util" "github.com/pingcap/tidb/tablecodec" ) @@ -34,7 +33,7 @@ func (s *testRestoreUtilSuite) TestGetSSTMetaFromFile(c *C) { } func (s *testRestoreUtilSuite) TestValidateFileRanges(c *C) { - rules := &restore_util.RewriteRules{ + rules := &RewriteRules{ Table: []*import_sstpb.RewriteRule{&import_sstpb.RewriteRule{ OldKeyPrefix: []byte(tablecodec.EncodeTablePrefix(1)), NewKeyPrefix: []byte(tablecodec.EncodeTablePrefix(2)),