Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

restore: split & scatter regions concurrently #27034

Merged
merged 25 commits into from
Aug 26, 2021
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d5cddad
squash merge conc-split
YuJuncen Aug 9, 2021
7a3110b
restore: fix build
YuJuncen Aug 9, 2021
33c9be9
Merge branch 'master' into conc-split
YuJuncen Aug 9, 2021
c2aeba6
restore: decrease the PD concurrenct, and increase the batch flush time
YuJuncen Aug 10, 2021
6651020
Merge branch 'conc-split' of https://github.com/yujuncen/tidb into co…
YuJuncen Aug 10, 2021
6755218
debug
YuJuncen Aug 11, 2021
e67b458
debug
YuJuncen Aug 11, 2021
393e22b
restore: retry hole in regions
YuJuncen Aug 12, 2021
8b4856b
Merge branch 'master' of https://github.com/pingcap/tidb into conc-split
YuJuncen Aug 12, 2021
4d7a945
ddl: revert ddl change
YuJuncen Aug 12, 2021
a98ba23
*: remove the complex hole-fine
YuJuncen Aug 13, 2021
7e60bfb
restore: revert retry-relative changes
YuJuncen Aug 16, 2021
9882e22
Merge branch 'master' of https://github.com/pingcap/tidb into conc-split
YuJuncen Aug 16, 2021
c999095
restore: retry scan regions
YuJuncen Aug 16, 2021
b5a4ce7
restore: remove a useless statment
YuJuncen Aug 16, 2021
1715381
restore: mute some verbose logs
YuJuncen Aug 16, 2021
470323b
Merge branch 'master' into conc-split
YuJuncen Aug 16, 2021
e437a8f
Merge branch 'master' of https://github.com/pingcap/tidb into conc-split
YuJuncen Aug 16, 2021
af51ea6
Merge branch 'master' into conc-split
YuJuncen Aug 16, 2021
545b061
*: disable concurrent split by default
YuJuncen Aug 19, 2021
4934af2
Merge branch 'conc-split' of https://github.com/yujuncen/tidb into co…
YuJuncen Aug 19, 2021
f012d4b
Merge branch 'master' into conc-split
YuJuncen Aug 19, 2021
c5fc5b9
Merge branch 'master' into conc-split
YuJuncen Aug 19, 2021
afe9bd3
Apply suggestions from code review
YuJuncen Aug 26, 2021
d7ed687
Merge branch 'master' into conc-split
ti-chi-bot Aug 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/logutil/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (region zapMarshalRegionMarshaler) MarshalLogObject(enc zapcore.ObjectEncod
for _, peer := range region.GetPeers() {
peers = append(peers, peer.String())
}
enc.AddUint64("ID", region.Id)
enc.AddUint64("ID", region.GetId())
enc.AddString("startKey", redact.Key(region.GetStartKey()))
enc.AddString("endKey", redact.Key(region.GetEndKey()))
enc.AddString("epoch", region.GetRegionEpoch().String())
Expand Down
11 changes: 9 additions & 2 deletions br/pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func (importer *FileImporter) Import(
files []*backuppb.File,
rewriteRules *RewriteRules,
) error {
start := time.Now()
log.Debug("import file", logutil.Files(files))
// Rewrite the start key and end key of file to scan regions
var startKey, endKey []byte
Expand Down Expand Up @@ -336,6 +337,8 @@ func (importer *FileImporter) Import(
logutil.Region(info.Region),
logutil.Key("startKey", startKey),
logutil.Key("endKey", endKey),
logutil.Key("file-simple-start", files[0].StartKey),
logutil.Key("file-simple-end", files[0].EndKey),
logutil.ShortError(e))
continue regionLoop
}
Expand All @@ -348,7 +351,10 @@ func (importer *FileImporter) Import(
logutil.ShortError(errDownload))
return errors.Trace(errDownload)
}

log.Debug("download file done", zap.String("file-sample", files[0].Name), zap.Stringer("take", time.Since(start)),
logutil.Key("start", files[0].StartKey),
logutil.Key("end", files[0].EndKey),
)
ingestResp, errIngest := importer.ingestSSTs(ctx, downloadMetas, info)
ingestRetry:
for errIngest == nil {
Expand Down Expand Up @@ -414,6 +420,7 @@ func (importer *FileImporter) Import(
return errors.Trace(errIngest)
}
}
log.Debug("ingtest file done", zap.String("file-sample", files[0].Name), zap.Stringer("take", time.Since(start)))
for _, f := range files {
summary.CollectSuccessUnit(summary.TotalKV, 1, f.TotalKvs)
summary.CollectSuccessUnit(summary.TotalBytes, 1, f.TotalBytes)
Expand Down Expand Up @@ -447,7 +454,7 @@ func (importer *FileImporter) downloadSST(
}
regionRule := matchNewPrefix(key, rewriteRules)
if regionRule == nil {
return nil, errors.Trace(berrors.ErrKVRewriteRuleNotFound)
return nil, errors.Annotatef(berrors.ErrKVRewriteRuleNotFound, "failed to find rewrite rule.")
}
rule := import_sstpb.RewriteRule{
OldKeyPrefix: encodeKeyPrefix(regionRule.GetOldKeyPrefix()),
Expand Down
149 changes: 120 additions & 29 deletions br/pkg/restore/pipeline_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/br/pkg/utils"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

const (
Expand Down Expand Up @@ -73,6 +75,7 @@ type brContextManager struct {

// This 'set' of table ID allow us to handle each table just once.
hasTable map[int64]CreatedTable
mu sync.Mutex
}

func (manager *brContextManager) Close(ctx context.Context) {
Expand All @@ -85,6 +88,8 @@ func (manager *brContextManager) Close(ctx context.Context) {

func (manager *brContextManager) Enter(ctx context.Context, tables []CreatedTable) error {
placementRuleTables := make([]*model.TableInfo, 0, len(tables))
manager.mu.Lock()
defer manager.mu.Unlock()

for _, tbl := range tables {
if _, ok := manager.hasTable[tbl.Table.ID]; !ok {
Expand All @@ -97,6 +102,8 @@ func (manager *brContextManager) Enter(ctx context.Context, tables []CreatedTabl
}

func (manager *brContextManager) Leave(ctx context.Context, tables []CreatedTable) error {
manager.mu.Lock()
defer manager.mu.Unlock()
placementRuleTables := make([]*model.TableInfo, 0, len(tables))

for _, table := range tables {
Expand Down Expand Up @@ -182,6 +189,8 @@ type tikvSender struct {
inCh chan<- DrainResult

wg *sync.WaitGroup

tableWaiters *sync.Map
}

func (b *tikvSender) PutSink(sink TableSink) {
Expand All @@ -191,6 +200,7 @@ func (b *tikvSender) PutSink(sink TableSink) {
}

func (b *tikvSender) RestoreBatch(ranges DrainResult) {
log.Info("restore batch: waiting ranges", zap.Int("range", len(b.inCh)))
b.inCh <- ranges
}

Expand All @@ -199,29 +209,52 @@ func NewTiKVSender(
ctx context.Context,
cli *Client,
updateCh glue.Progress,
splitConcurrency uint,
) (BatchSender, error) {
inCh := make(chan DrainResult, defaultChannelSize)
midCh := make(chan DrainResult, defaultChannelSize)
midCh := make(chan drainResultAndDone, defaultChannelSize)

sender := &tikvSender{
client: cli,
updateCh: updateCh,
inCh: inCh,
wg: new(sync.WaitGroup),
client: cli,
updateCh: updateCh,
inCh: inCh,
wg: new(sync.WaitGroup),
tableWaiters: new(sync.Map),
}

sender.wg.Add(2)
go sender.splitWorker(ctx, inCh, midCh)
go sender.splitWorker(ctx, inCh, midCh, splitConcurrency)
go sender.restoreWorker(ctx, midCh)
return sender, nil
}

func (b *tikvSender) splitWorker(ctx context.Context, ranges <-chan DrainResult, next chan<- DrainResult) {
func (b *tikvSender) Close() {
close(b.inCh)
b.wg.Wait()
log.Debug("tikv sender closed")
}

type drainResultAndDone struct {
result DrainResult
done func()
}

func (b *tikvSender) splitWorker(ctx context.Context,
ranges <-chan DrainResult,
next chan<- drainResultAndDone,
concurrency uint,
) {
defer log.Debug("split worker closed")
eg, ectx := errgroup.WithContext(ctx)
defer func() {
b.wg.Done()
if err := eg.Wait(); err != nil {
b.sink.EmitError(err)
return
}
close(next)
}()
pool := utils.NewWorkerPool(concurrency, "split")
for {
select {
case <-ctx.Done():
Expand All @@ -230,44 +263,102 @@ func (b *tikvSender) splitWorker(ctx context.Context, ranges <-chan DrainResult,
if !ok {
return
}
if err := SplitRanges(ctx, b.client, result.Ranges, result.RewriteRules, b.updateCh); err != nil {
log.Error("failed on split range", rtree.ZapRanges(result.Ranges), zap.Error(err))
b.sink.EmitError(err)
return
}
next <- result
// When the batcher has sent all ranges from a table, it would
// mark this table 'all done'(BlankTablesAfterSend), and then we can send it to checksum.
//
// When there a sole worker sequentially running those batch tasks, everything is fine, however,
// in the context of multi-workers, that become buggy, for example:
// |------table 1, ranges 1------|------table 1, ranges 2------|
// The batcher send batches: [
// {Ranges: ranges 1},
// {Ranges: ranges 2, BlankTablesAfterSend: table 1}
// ]
// And there are two workers runs concurrently:
// worker 1: {Ranges: ranges 1}
// worker 2: {Ranges: ranges 2, BlankTablesAfterSend: table 1}
// And worker 2 finished its job before worker 1 done. Note the table wasn't restored fully,
// hence the checksum would fail.
done := b.registerTableIsRestoring(result.TablesToSend)
pool.ApplyOnErrorGroup(eg, func() error {
err := SplitRanges(ectx, b.client, result.Ranges, result.RewriteRules, b.updateCh)
if err != nil {
log.Error("failed on split range", rtree.ZapRanges(result.Ranges), zap.Error(err))
return err
}
next <- drainResultAndDone{
result: result,
done: done,
}
return nil
})
}
}
}

// registerTableIsRestoring marks some tables as 'current restoring'.
// Returning a function that mark the restore has been done.
func (b *tikvSender) registerTableIsRestoring(ts []CreatedTable) func() {
wgs := make([]*sync.WaitGroup, 0, len(ts))
for _, t := range ts {
i, _ := b.tableWaiters.LoadOrStore(t.Table.ID, new(sync.WaitGroup))
wg := i.(*sync.WaitGroup)
wg.Add(1)
wgs = append(wgs, wg)
}
return func() {
for _, wg := range wgs {
wg.Done()
}
}
}

// waitTablesDone block the current goroutine,
// till all tables provided are no more ‘current restoring’.
func (b *tikvSender) waitTablesDone(ts []CreatedTable) {
for _, t := range ts {
wg, ok := b.tableWaiters.LoadAndDelete(t.Table.ID)
if !ok {
log.Panic("bug! table done before register!",
zap.Any("wait-table-map", b.tableWaiters),
zap.Stringer("table", t.Table.Name))
}
wg.(*sync.WaitGroup).Wait()
}
}

func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan DrainResult) {
func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan drainResultAndDone) {
eg, ectx := errgroup.WithContext(ctx)
defer func() {
log.Debug("restore worker closed")
if err := eg.Wait(); err != nil {
b.sink.EmitError(err)
return
}
b.wg.Done()
b.sink.Close()
}()
for {
select {
case <-ctx.Done():
return
case result, ok := <-ranges:
case r, ok := <-ranges:
if !ok {
return
}
files := result.Files()
if err := b.client.RestoreFiles(ctx, files, result.RewriteRules, b.updateCh); err != nil {
b.sink.EmitError(err)
return
}

log.Info("restore batch done", rtree.ZapRanges(result.Ranges))
b.sink.EmitTables(result.BlankTablesAfterSend...)
files := r.result.Files()
// There has been a worker in the `RestoreFiles` procedure.
// Spawning a raw goroutine won't make too many requests to TiKV.
eg.Go(func() error {
e := b.client.RestoreFiles(ectx, files, r.result.RewriteRules, b.updateCh)
if e != nil {
return e
}
log.Info("restore batch done", rtree.ZapRanges(r.result.Ranges))
r.done()
b.waitTablesDone(r.result.BlankTablesAfterSend)
b.sink.EmitTables(r.result.BlankTablesAfterSend...)
return nil
})
}
}
}

func (b *tikvSender) Close() {
close(b.inCh)
b.wg.Wait()
log.Debug("tikv sender closed")
}
2 changes: 2 additions & 0 deletions br/pkg/restore/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,14 @@ func SortRanges(ranges []rtree.Range, rewriteRules *RewriteRules) ([]rtree.Range
"rewrite start key",
logutil.Key("key", rg.StartKey), logutil.RewriteRule(rule))
}
oldKey := rg.EndKey
rg.EndKey, rule = replacePrefix(rg.EndKey, rewriteRules)
if rule == nil {
log.Warn("cannot find rewrite rule", logutil.Key("key", rg.EndKey))
} else {
log.Debug(
"rewrite end key",
logutil.Key("origin-key", oldKey),
logutil.Key("key", rg.EndKey),
logutil.RewriteRule(rule))
}
Expand Down
9 changes: 5 additions & 4 deletions br/pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,13 @@ SplitRegions:
for i := 0; i < SplitRetryTimes; i++ {
regions, errScan := PaginateScanRegion(ctx, rs.client, minKey, maxKey, ScanRegionPaginationLimit)
if errScan != nil {
if berrors.ErrPDBatchScanRegion.Equal(errScan) {
log.Warn("inconsistent region info get.", logutil.ShortError(errScan))
time.Sleep(time.Second)
continue SplitRegions
}
return errors.Trace(errScan)
}
if len(regions) == 0 {
log.Warn("split regions cannot scan any region")
return nil
}
splitKeyMap := getSplitKeys(rewriteRules, sortedRanges, regions)
regionMap := make(map[uint64]*RegionInfo)
for _, region := range regions {
Expand Down
Loading