diff --git a/br/pkg/logutil/logging.go b/br/pkg/logutil/logging.go index 6df032e69e421..6434cc8a06791 100644 --- a/br/pkg/logutil/logging.go +++ b/br/pkg/logutil/logging.go @@ -115,7 +115,7 @@ func (region zapMarshalRegionMarshaler) MarshalLogObject(enc zapcore.ObjectEncod for _, peer := range region.GetPeers() { peers = append(peers, peer.String()) } - enc.AddUint64("ID", region.Id) + enc.AddUint64("ID", region.GetId()) enc.AddString("startKey", redact.Key(region.GetStartKey())) enc.AddString("endKey", redact.Key(region.GetEndKey())) enc.AddString("epoch", region.GetRegionEpoch().String()) diff --git a/br/pkg/restore/import.go b/br/pkg/restore/import.go index 83ec6c59571d2..a48a5cffd94f9 100644 --- a/br/pkg/restore/import.go +++ b/br/pkg/restore/import.go @@ -259,6 +259,7 @@ func (importer *FileImporter) Import( files []*backuppb.File, rewriteRules *RewriteRules, ) error { + start := time.Now() log.Debug("import file", logutil.Files(files)) // Rewrite the start key and end key of file to scan regions var startKey, endKey []byte @@ -340,6 +341,8 @@ func (importer *FileImporter) Import( logutil.Region(info.Region), logutil.Key("startKey", startKey), logutil.Key("endKey", endKey), + logutil.Key("file-simple-start", files[0].StartKey), + logutil.Key("file-simple-end", files[0].EndKey), logutil.ShortError(e)) continue regionLoop } @@ -352,7 +355,10 @@ func (importer *FileImporter) Import( logutil.ShortError(errDownload)) return errors.Trace(errDownload) } - + log.Debug("download file done", zap.String("file-sample", files[0].Name), zap.Stringer("take", time.Since(start)), + logutil.Key("start", files[0].StartKey), + logutil.Key("end", files[0].EndKey), + ) ingestResp, errIngest := importer.ingestSSTs(ctx, downloadMetas, info) ingestRetry: for errIngest == nil { @@ -418,6 +424,7 @@ func (importer *FileImporter) Import( return errors.Trace(errIngest) } } + log.Debug("ingest file done", zap.String("file-sample", files[0].Name), zap.Stringer("take", time.Since(start))) for _, f := range files { summary.CollectSuccessUnit(summary.TotalKV, 1, f.TotalKvs) summary.CollectSuccessUnit(summary.TotalBytes, 1, f.TotalBytes) @@ -451,7 +458,7 @@ func (importer *FileImporter) downloadSST( } regionRule := matchNewPrefix(key, rewriteRules) if regionRule == nil { - return nil, errors.Trace(berrors.ErrKVRewriteRuleNotFound) + return nil, errors.Annotate(berrors.ErrKVRewriteRuleNotFound, "failed to find rewrite rule.") } rule := import_sstpb.RewriteRule{ OldKeyPrefix: encodeKeyPrefix(regionRule.GetOldKeyPrefix()), diff --git a/br/pkg/restore/pipeline_items.go b/br/pkg/restore/pipeline_items.go index 075e601e80025..ad31307ced7f9 100644 --- a/br/pkg/restore/pipeline_items.go +++ b/br/pkg/restore/pipeline_items.go @@ -12,7 +12,9 @@ import ( "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/rtree" + "github.com/pingcap/tidb/br/pkg/utils" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) const ( @@ -73,6 +75,7 @@ type brContextManager struct { // This 'set' of table ID allow us to handle each table just once. hasTable map[int64]CreatedTable + mu sync.Mutex } func (manager *brContextManager) Close(ctx context.Context) { @@ -85,6 +88,8 @@ func (manager *brContextManager) Close(ctx context.Context) { func (manager *brContextManager) Enter(ctx context.Context, tables []CreatedTable) error { placementRuleTables := make([]*model.TableInfo, 0, len(tables)) + manager.mu.Lock() + defer manager.mu.Unlock() for _, tbl := range tables { if _, ok := manager.hasTable[tbl.Table.ID]; !ok { @@ -97,6 +102,8 @@ func (manager *brContextManager) Enter(ctx context.Context, tables []CreatedTabl } func (manager *brContextManager) Leave(ctx context.Context, tables []CreatedTable) error { + manager.mu.Lock() + defer manager.mu.Unlock() placementRuleTables := make([]*model.TableInfo, 0, len(tables)) for _, table := range tables { @@ -182,6 +189,8 @@ type tikvSender struct { inCh chan<- DrainResult wg *sync.WaitGroup + + tableWaiters *sync.Map } func (b *tikvSender) PutSink(sink TableSink) { @@ -191,6 +200,7 @@ func (b *tikvSender) PutSink(sink TableSink) { } func (b *tikvSender) RestoreBatch(ranges DrainResult) { + log.Info("restore batch: waiting ranges", zap.Int("range", len(b.inCh))) b.inCh <- ranges } @@ -199,29 +209,52 @@ func NewTiKVSender( ctx context.Context, cli *Client, updateCh glue.Progress, + splitConcurrency uint, ) (BatchSender, error) { inCh := make(chan DrainResult, defaultChannelSize) - midCh := make(chan DrainResult, defaultChannelSize) + midCh := make(chan drainResultAndDone, defaultChannelSize) sender := &tikvSender{ - client: cli, - updateCh: updateCh, - inCh: inCh, - wg: new(sync.WaitGroup), + client: cli, + updateCh: updateCh, + inCh: inCh, + wg: new(sync.WaitGroup), + tableWaiters: new(sync.Map), } sender.wg.Add(2) - go sender.splitWorker(ctx, inCh, midCh) + go sender.splitWorker(ctx, inCh, midCh, splitConcurrency) go sender.restoreWorker(ctx, midCh) return sender, nil } -func (b *tikvSender) splitWorker(ctx context.Context, ranges <-chan DrainResult, next chan<- DrainResult) { +func (b *tikvSender) Close() { + close(b.inCh) + b.wg.Wait() + log.Debug("tikv sender closed") +} + +type drainResultAndDone struct { + result DrainResult + done func() +} + +func (b *tikvSender) splitWorker(ctx context.Context, + ranges <-chan DrainResult, + next chan<- drainResultAndDone, + concurrency uint, +) { defer log.Debug("split worker closed") + eg, ectx := errgroup.WithContext(ctx) defer func() { b.wg.Done() + if err := eg.Wait(); err != nil { + b.sink.EmitError(err) + return + } close(next) }() + pool := utils.NewWorkerPool(concurrency, "split") for { select { case <-ctx.Done(): @@ -230,19 +263,77 @@ func (b *tikvSender) splitWorker(ctx context.Context, ranges <-chan DrainResult, if !ok { return } - if err := SplitRanges(ctx, b.client, result.Ranges, result.RewriteRules, b.updateCh); err != nil { - log.Error("failed on split range", rtree.ZapRanges(result.Ranges), zap.Error(err)) - b.sink.EmitError(err) - return - } - next <- result + // When the batcher has sent all ranges from a table, it would + // mark this table 'all done'(BlankTablesAfterSend), and then we can send it to checksum. + // + // When there a sole worker sequentially running those batch tasks, everything is fine, however, + // in the context of multi-workers, that become buggy, for example: + // |------table 1, ranges 1------|------table 1, ranges 2------| + // The batcher send batches: [ + // {Ranges: ranges 1}, + // {Ranges: ranges 2, BlankTablesAfterSend: table 1} + // ] + // And there are two workers runs concurrently: + // worker 1: {Ranges: ranges 1} + // worker 2: {Ranges: ranges 2, BlankTablesAfterSend: table 1} + // And worker 2 finished its job before worker 1 done. Note the table wasn't restored fully, + // hence the checksum would fail. + done := b.registerTableIsRestoring(result.TablesToSend) + pool.ApplyOnErrorGroup(eg, func() error { + err := SplitRanges(ectx, b.client, result.Ranges, result.RewriteRules, b.updateCh) + if err != nil { + log.Error("failed on split range", rtree.ZapRanges(result.Ranges), zap.Error(err)) + return err + } + next <- drainResultAndDone{ + result: result, + done: done, + } + return nil + }) + } + } +} + +// registerTableIsRestoring marks some tables as 'current restoring'. +// Returning a function that mark the restore has been done. +func (b *tikvSender) registerTableIsRestoring(ts []CreatedTable) func() { + wgs := make([]*sync.WaitGroup, 0, len(ts)) + for _, t := range ts { + i, _ := b.tableWaiters.LoadOrStore(t.Table.ID, new(sync.WaitGroup)) + wg := i.(*sync.WaitGroup) + wg.Add(1) + wgs = append(wgs, wg) + } + return func() { + for _, wg := range wgs { + wg.Done() + } + } +} + +// waitTablesDone block the current goroutine, +// till all tables provided are no more ‘current restoring’. +func (b *tikvSender) waitTablesDone(ts []CreatedTable) { + for _, t := range ts { + wg, ok := b.tableWaiters.LoadAndDelete(t.Table.ID) + if !ok { + log.Panic("bug! table done before register!", + zap.Any("wait-table-map", b.tableWaiters), + zap.Stringer("table", t.Table.Name)) } + wg.(*sync.WaitGroup).Wait() } } -func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan DrainResult) { +func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan drainResultAndDone) { + eg, ectx := errgroup.WithContext(ctx) defer func() { log.Debug("restore worker closed") + if err := eg.Wait(); err != nil { + b.sink.EmitError(err) + return + } b.wg.Done() b.sink.Close() }() @@ -250,24 +341,24 @@ func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan DrainResul select { case <-ctx.Done(): return - case result, ok := <-ranges: + case r, ok := <-ranges: if !ok { return } - files := result.Files() - if err := b.client.RestoreFiles(ctx, files, result.RewriteRules, b.updateCh); err != nil { - b.sink.EmitError(err) - return - } - - log.Info("restore batch done", rtree.ZapRanges(result.Ranges)) - b.sink.EmitTables(result.BlankTablesAfterSend...) + files := r.result.Files() + // There has been a worker in the `RestoreFiles` procedure. + // Spawning a raw goroutine won't make too many requests to TiKV. + eg.Go(func() error { + e := b.client.RestoreFiles(ectx, files, r.result.RewriteRules, b.updateCh) + if e != nil { + return e + } + log.Info("restore batch done", rtree.ZapRanges(r.result.Ranges)) + r.done() + b.waitTablesDone(r.result.BlankTablesAfterSend) + b.sink.EmitTables(r.result.BlankTablesAfterSend...) + return nil + }) } } } - -func (b *tikvSender) Close() { - close(b.inCh) - b.wg.Wait() - log.Debug("tikv sender closed") -} diff --git a/br/pkg/restore/range.go b/br/pkg/restore/range.go index 1e8076f69d28b..cc81ba8423d94 100644 --- a/br/pkg/restore/range.go +++ b/br/pkg/restore/range.go @@ -72,12 +72,14 @@ func SortRanges(ranges []rtree.Range, rewriteRules *RewriteRules) ([]rtree.Range "rewrite start key", logutil.Key("key", rg.StartKey), logutil.RewriteRule(rule)) } + oldKey := rg.EndKey rg.EndKey, rule = replacePrefix(rg.EndKey, rewriteRules) if rule == nil { log.Warn("cannot find rewrite rule", logutil.Key("key", rg.EndKey)) } else { log.Debug( "rewrite end key", + logutil.Key("origin-key", oldKey), logutil.Key("key", rg.EndKey), logutil.RewriteRule(rule)) } diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index d36e85c559219..5a9f70e10c7bd 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -105,12 +105,13 @@ SplitRegions: for i := 0; i < SplitRetryTimes; i++ { regions, errScan := PaginateScanRegion(ctx, rs.client, minKey, maxKey, ScanRegionPaginationLimit) if errScan != nil { + if berrors.ErrPDBatchScanRegion.Equal(errScan) { + log.Warn("inconsistent region info get.", logutil.ShortError(errScan)) + time.Sleep(time.Second) + continue SplitRegions + } return errors.Trace(errScan) } - if len(regions) == 0 { - log.Warn("split regions cannot scan any region") - return nil - } splitKeyMap := getSplitKeys(rewriteRules, sortedRanges, regions) regionMap := make(map[uint64]*RegionInfo) for _, region := range regions { diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 1c45562071172..e68556dbb328b 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -35,9 +35,15 @@ const ( FlagMergeRegionSizeBytes = "merge-region-size-bytes" // FlagMergeRegionKeyCount is the flag name of merge small regions by key count FlagMergeRegionKeyCount = "merge-region-key-count" + // FlagPDConcurrency controls concurrency pd-relative operations like split & scatter. + FlagPDConcurrency = "pd-concurrency" + // FlagBatchFlushInterval controls after how long the restore batch would be auto sended. + FlagBatchFlushInterval = "batch-flush-interval" defaultRestoreConcurrency = 128 maxRestoreBatchSizeLimit = 10240 + defaultPDConcurrency = 1 + defaultBatchFlushInterval = 16 * time.Second defaultDDLConcurrency = 16 ) @@ -71,9 +77,15 @@ func DefineRestoreCommonFlags(flags *pflag.FlagSet) { flags.Uint64(FlagMergeRegionSizeBytes, restore.DefaultMergeRegionSizeBytes, "the threshold of merging small regions (Default 96MB, region split size)") flags.Uint64(FlagMergeRegionKeyCount, restore.DefaultMergeRegionKeyCount, - "the threshold of merging smalle regions (Default 960_000, region split key count)") + "the threshold of merging small regions (Default 960_000, region split key count)") + flags.Uint(FlagPDConcurrency, defaultPDConcurrency, + "concurrency pd-relative operations like split & scatter.") + flags.Duration(FlagBatchFlushInterval, defaultBatchFlushInterval, + "after how long a restore batch would be auto sended.") _ = flags.MarkHidden(FlagMergeRegionSizeBytes) _ = flags.MarkHidden(FlagMergeRegionKeyCount) + _ = flags.MarkHidden(FlagPDConcurrency) + _ = flags.MarkHidden(FlagBatchFlushInterval) } // ParseFromFlags parses the config from the flag set. @@ -99,7 +111,9 @@ type RestoreConfig struct { Config RestoreCommonConfig - NoSchema bool `json:"no-schema" toml:"no-schema"` + NoSchema bool `json:"no-schema" toml:"no-schema"` + PDConcurrency uint `json:"pd-concurrency" toml:"pd-concurrency"` + BatchFlushInterval time.Duration `json:"batch-flush-interval" toml:"batch-flush-interval"` } // DefineRestoreFlags defines common flags for the restore tidb command. @@ -130,6 +144,14 @@ func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error { if cfg.Config.Concurrency == 0 { cfg.Config.Concurrency = defaultRestoreConcurrency } + cfg.PDConcurrency, err = flags.GetUint(FlagPDConcurrency) + if err != nil { + return errors.Annotatef(err, "failed to get flag %s", FlagPDConcurrency) + } + cfg.BatchFlushInterval, err = flags.GetDuration(FlagBatchFlushInterval) + if err != nil { + return errors.Annotatef(err, "failed to get flag %s", FlagBatchFlushInterval) + } return nil } @@ -147,6 +169,12 @@ func (cfg *RestoreConfig) adjustRestoreConfig() { if cfg.Config.SwitchModeInterval == 0 { cfg.Config.SwitchModeInterval = defaultSwitchInterval } + if cfg.PDConcurrency == 0 { + cfg.PDConcurrency = defaultPDConcurrency + } + if cfg.BatchFlushInterval == 0 { + cfg.BatchFlushInterval = defaultBatchFlushInterval + } } // CheckRestoreDBAndTable is used to check whether the restore dbs or tables have been backup @@ -400,14 +428,14 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf int64(rangeSize+len(files)+len(tables)), !cfg.LogProgress) defer updateCh.Close() - sender, err := restore.NewTiKVSender(ctx, client, updateCh) + sender, err := restore.NewTiKVSender(ctx, client, updateCh, cfg.PDConcurrency) if err != nil { return errors.Trace(err) } manager := restore.NewBRContextManager(client) batcher, afterRestoreStream := restore.NewBatcher(ctx, sender, manager, errCh) batcher.SetThreshold(batchSize) - batcher.EnableAutoCommit(ctx, time.Second) + batcher.EnableAutoCommit(ctx, cfg.BatchFlushInterval) go restoreTableStream(ctx, rangeStream, batcher, errCh) var finish <-chan struct{} diff --git a/br/pkg/utils/worker.go b/br/pkg/utils/worker.go index df87fcb683707..773cfd41a64da 100644 --- a/br/pkg/utils/worker.go +++ b/br/pkg/utils/worker.go @@ -37,6 +37,16 @@ func NewWorkerPool(limit uint, name string) *WorkerPool { } } +// IdleCount counts how many idle workers in the pool. +func (pool *WorkerPool) IdleCount() int { + return len(pool.workers) +} + +// Limit is the limit of the pool +func (pool *WorkerPool) Limit() int { + return int(pool.limit) +} + // Apply executes a task. func (pool *WorkerPool) Apply(fn taskFunc) { worker := pool.ApplyWorker() @@ -95,5 +105,5 @@ func (pool *WorkerPool) RecycleWorker(worker *Worker) { // HasWorker checks if the pool has unallocated workers. func (pool *WorkerPool) HasWorker() bool { - return len(pool.workers) > 0 + return pool.IdleCount() > 0 }