diff --git a/br/pkg/lightning/backend/local/BUILD.bazel b/br/pkg/lightning/backend/local/BUILD.bazel index 892f4a8494658..5e52f04aa02e8 100644 --- a/br/pkg/lightning/backend/local/BUILD.bazel +++ b/br/pkg/lightning/backend/local/BUILD.bazel @@ -106,7 +106,11 @@ go_test( "//br/pkg/lightning/backend", "//br/pkg/lightning/backend/kv", "//br/pkg/lightning/common", +<<<<<<< HEAD "//br/pkg/lightning/glue", +======= + "//br/pkg/lightning/config", +>>>>>>> 44aa4cf3cd1 (lightning: generate and send region job more smoothly (#42780)) "//br/pkg/lightning/log", "//br/pkg/lightning/mydump", "//br/pkg/membuf", diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go index 61417ea85e368..7e113d3927b8b 100644 --- a/br/pkg/lightning/backend/local/engine.go +++ b/br/pkg/lightning/backend/local/engine.go @@ -106,14 +106,13 @@ type Engine struct { // flush and ingest sst hold the rlock, other operation hold the wlock. mutex sync.RWMutex - ctx context.Context - cancel context.CancelFunc - sstDir string - sstMetasChan chan metaOrFlush - ingestErr common.OnceError - wg sync.WaitGroup - sstIngester sstIngester - finishedRanges syncedRanges + ctx context.Context + cancel context.CancelFunc + sstDir string + sstMetasChan chan metaOrFlush + ingestErr common.OnceError + wg sync.WaitGroup + sstIngester sstIngester // sst seq lock seqLock sync.Mutex @@ -907,72 +906,6 @@ func (e *Engine) loadEngineMeta() error { return nil } -// sortAndMergeRanges sort the ranges and merge range that overlaps with each other into a single range. -func sortAndMergeRanges(ranges []Range) []Range { - if len(ranges) == 0 { - return ranges - } - - slices.SortFunc(ranges, func(i, j Range) bool { - return bytes.Compare(i.start, j.start) < 0 - }) - - curEnd := ranges[0].end - i := 0 - for j := 1; j < len(ranges); j++ { - if bytes.Compare(curEnd, ranges[j].start) >= 0 { - if bytes.Compare(curEnd, ranges[j].end) < 0 { - curEnd = ranges[j].end - } - } else { - ranges[i].end = curEnd - i++ - ranges[i].start = ranges[j].start - curEnd = ranges[j].end - } - } - ranges[i].end = curEnd - return ranges[:i+1] -} - -func filterOverlapRange(ranges []Range, finishedRanges []Range) []Range { - if len(ranges) == 0 || len(finishedRanges) == 0 { - return ranges - } - - result := make([]Range, 0) - for _, r := range ranges { - start := r.start - end := r.end - for len(finishedRanges) > 0 && bytes.Compare(finishedRanges[0].start, end) < 0 { - fr := finishedRanges[0] - if bytes.Compare(fr.start, start) > 0 { - result = append(result, Range{start: start, end: fr.start}) - } - if bytes.Compare(fr.end, start) > 0 { - start = fr.end - } - if bytes.Compare(fr.end, end) > 0 { - break - } - finishedRanges = finishedRanges[1:] - } - if bytes.Compare(start, end) < 0 { - result = append(result, Range{start: start, end: end}) - } - } - return result -} - -func (e *Engine) unfinishedRanges(ranges []Range) []Range { - e.finishedRanges.Lock() - defer e.finishedRanges.Unlock() - - e.finishedRanges.ranges = sortAndMergeRanges(e.finishedRanges.ranges) - - return filterOverlapRange(ranges, e.finishedRanges.ranges) -} - func (e *Engine) newKVIter(ctx context.Context, opts *pebble.IterOptions) Iter { if bytes.Compare(opts.LowerBound, normalIterStartKey) < 0 { newOpts := *opts diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index bb2c9cd79f56e..cc29bc7087db3 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1015,14 +1015,24 @@ func (local *local) readAndSplitIntoRange(ctx context.Context, engine *Engine, r return ranges, nil } +<<<<<<< HEAD // prepareAndGenerateUnfinishedJob will read the engine to get unfinished key range, // then split and scatter regions for these range and generate region jobs. func (local *local) prepareAndGenerateUnfinishedJob( +======= +// prepareAndSendJob will read the engine to get estimated key range, +// then split and scatter regions for these range and send region jobs to jobToWorkerCh. +// NOTE when ctx is Done, this function will NOT return error even if it hasn't sent +// all the jobs to jobToWorkerCh. This is because the "first error" can only be +// found by checking the work group LATER, we don't want to return an error to +// seize the "first" error. +func (local *Backend) prepareAndSendJob( +>>>>>>> 44aa4cf3cd1 (lightning: generate and send region job more smoothly (#42780)) ctx context.Context, - engineUUID uuid.UUID, - lf *Engine, + engine *Engine, initialSplitRanges []Range, regionSplitSize, regionSplitKeys int64, +<<<<<<< HEAD ) ([]*regionJob, error) { lfTotalSize := lf.TotalSize.Load() lfLength := lf.Length.Load() @@ -1051,23 +1061,67 @@ func (local *local) prepareAndGenerateUnfinishedJob( log.FromContext(ctx).Error("split & scatter ranges failed", zap.Stringer("uuid", engineUUID), log.ShortError(err)) return nil, err } +======= + jobToWorkerCh chan<- *regionJob, + jobWg *sync.WaitGroup, +) error { + lfTotalSize := engine.TotalSize.Load() + lfLength := engine.Length.Load() + log.FromContext(ctx).Info("import engine ranges", zap.Int("count", len(initialSplitRanges))) + if len(initialSplitRanges) == 0 { + return nil + } + + // if all the kv can fit in one region, skip split regions. TiDB will split one region for + // the table when table is created. + needSplit := len(initialSplitRanges) > 1 || lfTotalSize > regionSplitSize || lfLength > regionSplitKeys + var err error + // split region by given ranges + failpoint.Inject("failToSplit", func(_ failpoint.Value) { + needSplit = true + }) + for i := 0; i < maxRetryTimes; i++ { + failpoint.Inject("skipSplitAndScatter", func() { + failpoint.Break() + }) + + err = local.SplitAndScatterRegionInBatches(ctx, initialSplitRanges, engine.tableInfo, needSplit, regionSplitSize, maxBatchSplitRanges) + if err == nil || common.IsContextCanceledError(err) { + break + } + + log.FromContext(ctx).Warn("split and scatter failed in retry", zap.Stringer("uuid", engine.UUID), + log.ShortError(err), zap.Int("retry", i)) + } + if err != nil { + log.FromContext(ctx).Error("split & scatter ranges failed", zap.Stringer("uuid", engine.UUID), log.ShortError(err)) + return err +>>>>>>> 44aa4cf3cd1 (lightning: generate and send region job more smoothly (#42780)) } - return local.generateJobInRanges( + return local.generateAndSendJob( ctx, - lf, - unfinishedRanges, + engine, + initialSplitRanges, regionSplitSize, regionSplitKeys, + jobToWorkerCh, + jobWg, ) } +<<<<<<< HEAD // generateJobInRanges scans the region in ranges and generate region jobs. func (local *local) generateJobInRanges( +======= +// generateAndSendJob scans the region in ranges and send region jobs to jobToWorkerCh. +func (local *Backend) generateAndSendJob( +>>>>>>> 44aa4cf3cd1 (lightning: generate and send region job more smoothly (#42780)) ctx context.Context, engine *Engine, ranges []Range, regionSplitSize, regionSplitKeys int64, +<<<<<<< HEAD ) ([]*regionJob, error) { log.FromContext(ctx).Debug("the ranges Length write to tikv", zap.Int("Length", len(ranges))) @@ -1076,63 +1130,140 @@ func (local *local) generateJobInRanges( for _, r := range ranges { start, end := r.start, r.end pairStart, pairEnd, err := engine.getFirstAndLastKey(start, end) +======= + jobToWorkerCh chan<- *regionJob, + jobWg *sync.WaitGroup, +) error { + logger := log.FromContext(ctx) + + // when use dynamic region feature, the region may be very big, we need + // to split to smaller ranges to increase the concurrency. + if regionSplitSize > 2*int64(config.SplitRegionSize) { + sizeProps, err := getSizePropertiesFn(logger, engine.db, local.keyAdapter) if err != nil { - return nil, err - } - if pairStart == nil { - log.FromContext(ctx).Info("There is no pairs in range", - logutil.Key("start", start), - logutil.Key("end", end)) - engine.finishedRanges.add(Range{start: start, end: end}) - continue + return errors.Trace(err) } - startKey := codec.EncodeBytes([]byte{}, pairStart) - endKey := codec.EncodeBytes([]byte{}, nextKey(pairEnd)) - regions, err := split.PaginateScanRegion(ctx, local.splitCli, startKey, endKey, scanRegionLimit) + jobRanges = splitRangeBySizeProps( + Range{start: jobRanges[0].start, end: jobRanges[len(jobRanges)-1].end}, + sizeProps, + int64(config.SplitRegionSize), + int64(config.SplitRegionKeys)) + } + logger.Debug("the ranges length write to tikv", zap.Int("length", len(jobRanges))) + + for _, r := range jobRanges { + jobs, err := local.generateJobForRange(ctx, engine, r, regionSplitSize, regionSplitKeys) +>>>>>>> 44aa4cf3cd1 (lightning: generate and send region job more smoothly (#42780)) if err != nil { - log.FromContext(ctx).Error("scan region failed", - log.ShortError(err), zap.Int("region_len", len(regions)), - logutil.Key("startKey", startKey), - logutil.Key("endKey", endKey)) - return nil, err + return err } - - for _, region := range regions { - log.FromContext(ctx).Debug("get region", - zap.Binary("startKey", startKey), - zap.Binary("endKey", endKey), - zap.Uint64("id", region.Region.GetId()), - zap.Stringer("epoch", region.Region.GetRegionEpoch()), - zap.Binary("start", region.Region.GetStartKey()), - zap.Binary("end", region.Region.GetEndKey()), - zap.Reflect("peers", region.Region.GetPeers())) - - job := ®ionJob{ - keyRange: intersectRange(region.Region, Range{start: start, end: end}), - region: region, - stage: regionScanned, - engine: engine, - regionSplitSize: regionSplitSize, - regionSplitKeys: regionSplitKeys, - metrics: local.metrics, + for _, job := range jobs { + jobWg.Add(1) + select { + case <-ctx.Done(): + // this job is not put into jobToWorkerCh + jobWg.Done() + // if the context is canceled, it means worker has error, the first error can be + // found by worker's error group LATER. if this function returns an error it will + // seize the "first error". + return nil + case jobToWorkerCh <- job: } + } + } + return nil +} + +// fakeRegionJobs is used in test , the injected job can be found by (startKey, endKey). +var fakeRegionJobs map[[2]string]struct { + jobs []*regionJob + err error +} - ret = append(ret, job) +// generateJobForRange will scan the region in `keyRange` and generate region jobs. +// It will retry internally when scan region meet error. +func (local *Backend) generateJobForRange( + ctx context.Context, + engine *Engine, + keyRange Range, + regionSplitSize, regionSplitKeys int64, +) ([]*regionJob, error) { + failpoint.Inject("fakeRegionJobs", func() { + key := [2]string{string(keyRange.start), string(keyRange.end)} + injected := fakeRegionJobs[key] + // overwrite the stage to regionScanned, because some time same keyRange + // will be generated more than once. + for _, job := range injected.jobs { + job.stage = regionScanned } + failpoint.Return(injected.jobs, injected.err) + }) + + start, end := keyRange.start, keyRange.end + pairStart, pairEnd, err := engine.getFirstAndLastKey(start, end) + if err != nil { + return nil, err } - return ret, nil + if pairStart == nil { + log.FromContext(ctx).Info("There is no pairs in range", + logutil.Key("start", start), + logutil.Key("end", end)) + return nil, nil + } + + startKey := codec.EncodeBytes([]byte{}, pairStart) + endKey := codec.EncodeBytes([]byte{}, nextKey(pairEnd)) + regions, err := split.PaginateScanRegion(ctx, local.splitCli, startKey, endKey, scanRegionLimit) + if err != nil { + log.FromContext(ctx).Error("scan region failed", + log.ShortError(err), zap.Int("region_len", len(regions)), + logutil.Key("startKey", startKey), + logutil.Key("endKey", endKey)) + return nil, err + } + + jobs := make([]*regionJob, 0, len(regions)) + for _, region := range regions { + log.FromContext(ctx).Debug("get region", + zap.Binary("startKey", startKey), + zap.Binary("endKey", endKey), + zap.Uint64("id", region.Region.GetId()), + zap.Stringer("epoch", region.Region.GetRegionEpoch()), + zap.Binary("start", region.Region.GetStartKey()), + zap.Binary("end", region.Region.GetEndKey()), + zap.Reflect("peers", region.Region.GetPeers())) + + jobs = append(jobs, ®ionJob{ + keyRange: intersectRange(region.Region, Range{start: start, end: end}), + region: region, + stage: regionScanned, + engine: engine, + regionSplitSize: regionSplitSize, + regionSplitKeys: regionSplitKeys, + metrics: local.metrics, + }) + } + return jobs, nil } // startWorker creates a worker that reads from the job channel and processes. +<<<<<<< HEAD // startWorker will return nil if it's expected to stop, which means the context // is canceled or channel is closed. It will return not nil error when it actively // stops. // this function must send the job back to jobOutCh after read it from jobInCh, // even if any error happens. func (local *local) startWorker( +======= +// startWorker will return nil if it's expected to stop, where the only case is +// the context canceled. It will return not nil error when it actively stops. +// startWorker must Done the jobWg if it does not put the job into jobOutCh. +func (local *Backend) startWorker( +>>>>>>> 44aa4cf3cd1 (lightning: generate and send region job more smoothly (#42780)) ctx context.Context, jobInCh, jobOutCh chan *regionJob, + jobWg *sync.WaitGroup, ) error { for { select { @@ -1140,24 +1271,38 @@ func (local *local) startWorker( return nil case job, ok := <-jobInCh: if !ok { + // In fact we don't use close input channel to notify worker to + // exit, because there's a cycle in workflow. return nil } - now := time.Now() - if now.Before(job.waitUntil) { - duration := job.waitUntil.Sub(now) - log.FromContext(ctx).Debug("need to wait before processing this job", - zap.Duration("wait", duration)) - select { - case <-ctx.Done(): - jobOutCh <- job - return nil - case <-time.After(duration): + err := local.executeJob(ctx, job) + switch job.stage { + case regionScanned, wrote, ingested: + jobOutCh <- job + case needRescan: + jobs, err2 := local.generateJobForRange( + ctx, + job.engine, + job.keyRange, + job.regionSplitSize, + job.regionSplitKeys, + ) + if err2 != nil { + // Don't need to put the job back to retry, because generateJobForRange + // has done the retry internally. Here just done for the "needRescan" + // job and exit directly. + jobWg.Done() + return err2 + } + // 1 "needRescan" job becomes len(jobs) "regionScanned" jobs. + jobWg.Add(len(jobs) - 1) + for _, j := range jobs { + j.retryCount = job.retryCount + jobOutCh <- j } } - err := local.executeJob(ctx, job) - jobOutCh <- job if err != nil { return err } @@ -1179,7 +1324,6 @@ func (local *local) isRetryableImportTiKVError(err error) bool { } // executeJob handles a regionJob and tries to convert it to ingested stage. -// The ingested job will record finished ranges in engine as a checkpoint. // If non-retryable error occurs, it will return the error. // If retryable error occurs, it will return nil and caller should check the stage // of the regionJob to determine what to do with it. @@ -1218,6 +1362,7 @@ func (local *local) executeJob( } } +<<<<<<< HEAD err := job.writeToTiKV(ctx, local.tikvCodec.GetAPIVersion(), local.importClientFactory, @@ -1227,13 +1372,23 @@ func (local *local) executeJob( if err != nil { if !local.isRetryableImportTiKVError(err) { return err +======= + for { + err := local.writeToTiKV(ctx, job) + if err != nil { + if !local.isRetryableImportTiKVError(err) { + return err + } + // if it's retryable error, we retry from scanning region + log.FromContext(ctx).Warn("meet retryable error when writing to TiKV", + log.ShortError(err), zap.Stringer("job stage", job.stage)) + job.convertStageTo(needRescan) + job.lastRetryableErr = err + return nil +>>>>>>> 44aa4cf3cd1 (lightning: generate and send region job more smoothly (#42780)) } - log.FromContext(ctx).Warn("meet retryable error when writing to TiKV", - log.ShortError(err), zap.Stringer("job stage", job.stage)) - job.lastRetryableErr = err - return nil - } +<<<<<<< HEAD err = job.ingest( ctx, local.importClientFactory, @@ -1244,12 +1399,25 @@ func (local *local) executeJob( if err != nil { if !local.isRetryableImportTiKVError(err) { return err +======= + err = local.ingest(ctx, job) + if err != nil { + if !local.isRetryableImportTiKVError(err) { + return err + } + log.FromContext(ctx).Warn("meet retryable error when ingesting", + log.ShortError(err), zap.Stringer("job stage", job.stage)) + job.lastRetryableErr = err + return nil +>>>>>>> 44aa4cf3cd1 (lightning: generate and send region job more smoothly (#42780)) } - log.FromContext(ctx).Warn("meet retryable error when ingesting", - log.ShortError(err), zap.Stringer("job stage", job.stage)) - job.lastRetryableErr = err + + if job.writeResult == nil || job.writeResult.remainingStartKey == nil { + return nil + } + job.keyRange.start = job.writeResult.remainingStartKey + job.convertStageTo(regionScanned) } - return nil } func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error { @@ -1311,28 +1479,56 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi failpoint.Inject("ReadyForImportEngine", func() {}) + err = local.doImport(ctx, lf, regionRanges, regionSplitSize, regionSplitKeys) + if err == nil { + log.FromContext(ctx).Info("import engine success", zap.Stringer("uuid", engineUUID), + zap.Int64("size", lfTotalSize), zap.Int64("kvs", lfLength), + zap.Int64("importedSize", lf.importedKVSize.Load()), zap.Int64("importedCount", lf.importedKVCount.Load())) + } + return err +} + +func (local *Backend) doImport(ctx context.Context, engine *Engine, regionRanges []Range, regionSplitSize, regionSplitKeys int64) error { + /* + [prepareAndSendJob]-----jobToWorkerCh--->[workers] + ^ | + | jobFromWorkerCh + | | + | v + [regionJobRetryer]<--[dispatchJobGoroutine]-->done + */ + var ( - ctx2, workerCancel = context.WithCancel(ctx) + ctx2, workerCancel = context.WithCancel(ctx) + // workerCtx.Done() means workflow is canceled by error. It may be caused + // by calling workerCancel() or workers in workGroup meets error. workGroup, workerCtx = errgroup.WithContext(ctx2) - // jobToWorkerCh is unbuffered so when we finished sending all jobs, we can make sure all jobs have been - // received by workers. + firstErr common.OnceError + // jobToWorkerCh and jobFromWorkerCh are unbuffered so jobs will not be + // owned by them. jobToWorkerCh = make(chan *regionJob) jobFromWorkerCh = make(chan *regionJob) - // jobWg is the number of jobs that need to be processed by worker in this round. - jobWg sync.WaitGroup - jobsNeedRetry []*regionJob - retryErr atomic.Error - retryGoroutineDone = make(chan struct{}) + // jobWg tracks the number of jobs in this workflow. + // prepareAndSendJob, workers and regionJobRetryer can own jobs. + // When cancel on error, the goroutine of above three components have + // responsibility to Done jobWg of their owning jobs. + jobWg sync.WaitGroup + dispatchJobGoroutine = make(chan struct{}) ) + defer workerCancel() - // handle processed job from worker, it will only exit when jobFromWorkerCh - // is closed to avoid send to jobFromWorkerCh is blocked. + retryer := startRegionJobRetryer(workerCtx, jobToWorkerCh, &jobWg) + + // dispatchJobGoroutine handles processed job from worker, it will only exit + // when jobFromWorkerCh is closed to avoid worker is blocked on sending to + // jobFromWorkerCh. defer func() { + // use defer to close jobFromWorkerCh after all workers are exited close(jobFromWorkerCh) - <-retryGoroutineDone + <-dispatchJobGoroutine }() go func() { - defer close(retryGoroutineDone) + defer close(dispatchJobGoroutine) for { job, ok := <-jobFromWorkerCh if !ok { @@ -1342,7 +1538,7 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi case regionScanned, wrote: job.retryCount++ if job.retryCount > maxWriteAndIngestRetryTimes { - retryErr.Store(job.lastRetryableErr) + firstErr.Set(job.lastRetryableErr) workerCancel() jobWg.Done() continue @@ -1359,19 +1555,25 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi zap.Stringer("stage", job.stage), zap.Int("retryCount", job.retryCount), zap.Time("waitUntil", job.waitUntil)) - jobsNeedRetry = append(jobsNeedRetry, job) - case ingested, needRescan: + if !retryer.push(job) { + // retryer is closed by worker error + jobWg.Done() + } + case ingested: + jobWg.Done() + case needRescan: + panic("should not reach here") } - jobWg.Done() } }() for i := 0; i < local.workerConcurrency; i++ { workGroup.Go(func() error { - return local.startWorker(workerCtx, jobToWorkerCh, jobFromWorkerCh) + return local.startWorker(workerCtx, jobToWorkerCh, jobFromWorkerCh, &jobWg) }) } +<<<<<<< HEAD var pendingJobs []*regionJob for { pendingJobs = append(pendingJobs, jobsNeedRetry...) @@ -1424,13 +1626,28 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi return errors.Trace(groupErr) default: } +======= + err := local.prepareAndSendJob( + workerCtx, + engine, + regionRanges, + regionSplitSize, + regionSplitKeys, + jobToWorkerCh, + &jobWg, + ) + if err != nil { + firstErr.Set(err) + workerCancel() + _ = workGroup.Wait() + return firstErr.Get() +>>>>>>> 44aa4cf3cd1 (lightning: generate and send region job more smoothly (#42780)) } - close(jobToWorkerCh) - log.FromContext(ctx).Info("import engine success", zap.Stringer("uuid", engineUUID), - zap.Int64("size", lfTotalSize), zap.Int64("kvs", lfLength), - zap.Int64("importedSize", lf.importedKVSize.Load()), zap.Int64("importedCount", lf.importedKVCount.Load())) - return workGroup.Wait() + jobWg.Wait() + workerCancel() + firstErr.Set(workGroup.Wait()) + return firstErr.Get() } func (local *local) CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *kv.SessionOptions) (hasDupe bool, err error) { @@ -1614,7 +1831,6 @@ func (local *local) ResetEngine(ctx context.Context, engineUUID uuid.UUID) error } } localEngine.pendingFileSize.Store(0) - localEngine.finishedRanges.reset() return err } diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index 4a20d7adc5cc5..d443ead6daacf 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -41,6 +41,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/membuf" "github.com/pingcap/tidb/br/pkg/pdutil" @@ -554,80 +555,6 @@ func makeRanges(input []string) []Range { return ranges } -func TestDedupAndMergeRanges(t *testing.T) { - cases := [][]string{ - // empty - {}, - {}, - // without overlap - {"1", "2", "3", "4", "5", "6", "7", "8"}, - {"1", "2", "3", "4", "5", "6", "7", "8"}, - // merge all as one - {"1", "12", "12", "13", "13", "14", "14", "15", "15", "999"}, - {"1", "999"}, - // overlap - {"1", "12", "12", "13", "121", "129", "122", "133", "14", "15", "15", "999"}, - {"1", "133", "14", "999"}, - - // out of order, same as test 3 - {"15", "999", "1", "12", "121", "129", "12", "13", "122", "133", "14", "15"}, - {"1", "133", "14", "999"}, - - // not continuous - {"000", "001", "002", "004", "100", "108", "107", "200", "255", "300"}, - {"000", "001", "002", "004", "100", "200", "255", "300"}, - } - - for i := 0; i < len(cases)-1; i += 2 { - input := makeRanges(cases[i]) - output := makeRanges(cases[i+1]) - - require.Equal(t, output, sortAndMergeRanges(input)) - } -} - -func TestFilterOverlapRange(t *testing.T) { - cases := [][]string{ - // both empty input - {}, - {}, - {}, - - // ranges are empty - {}, - {"0", "1"}, - {}, - - // finished ranges are empty - {"0", "1", "2", "3"}, - {}, - {"0", "1", "2", "3"}, - - // single big finished range - {"00", "10", "20", "30", "40", "50", "60", "70"}, - {"25", "65"}, - {"00", "10", "20", "25", "65", "70"}, - - // single big input - {"10", "99"}, - {"00", "10", "15", "30", "45", "60"}, - {"10", "15", "30", "45", "60", "99"}, - - // multi input and finished - {"00", "05", "05", "10", "10", "20", "30", "45", "50", "70", "70", "90"}, - {"07", "12", "14", "16", "17", "30", "45", "70"}, - {"00", "05", "05", "07", "12", "14", "16", "17", "30", "45", "70", "90"}, - } - - for i := 0; i < len(cases)-2; i += 3 { - input := makeRanges(cases[i]) - finished := makeRanges(cases[i+1]) - output := makeRanges(cases[i+2]) - - require.Equal(t, output, filterOverlapRange(input, finished)) - } -} - func testMergeSSTs(t *testing.T, kvs [][]common.KvPair, meta *sstMeta) { opt := &pebble.Options{ MemTableSize: 1024 * 1024, @@ -1113,7 +1040,7 @@ func TestLocalWriteAndIngestPairsFailFast(t *testing.T) { jobCh := make(chan *regionJob, 1) jobCh <- ®ionJob{} jobOutCh := make(chan *regionJob, 1) - err := bak.startWorker(context.Background(), jobCh, jobOutCh) + err := bak.startWorker(context.Background(), jobCh, jobOutCh, nil) require.Error(t, err) require.Regexp(t, "The available disk of TiKV.*", err.Error()) require.Len(t, jobCh, 0) @@ -1272,7 +1199,7 @@ func TestCheckPeersBusy(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - err := local.startWorker(ctx, jobCh, jobOutCh) + err := local.startWorker(ctx, jobCh, jobOutCh, nil) require.NoError(t, err) }() @@ -1291,5 +1218,390 @@ func TestCheckPeersBusy(t *testing.T) { // store 12 has a follower busy, so it will break the workflow for region (11, 12, 13) require.Equal(t, []uint64{11, 12, 21, 22, 23, 21}, apiInvokeRecorder["MultiIngest"]) // region (11, 12, 13) has key range ["a", "b"), it's not finished. - require.Equal(t, []Range{{start: []byte("b"), end: []byte("")}}, f.finishedRanges.ranges) + require.Equal(t, []byte("a"), retryJob.keyRange.start) + require.Equal(t, []byte("b"), retryJob.keyRange.end) +} +<<<<<<< HEAD +======= + +// mockGetSizeProperties mocks that 50MB * 20 SST file. +func mockGetSizeProperties(log.Logger, *pebble.DB, KeyAdapter) (*sizeProperties, error) { + props := newSizeProperties() + // keys starts with 0 is meta keys, so we start with 1. + for i := byte(1); i <= 10; i++ { + rangeProps := &rangeProperty{ + Key: []byte{i}, + rangeOffsets: rangeOffsets{ + Size: 50 * units.MiB, + Keys: 100_000, + }, + } + props.add(rangeProps) + rangeProps = &rangeProperty{ + Key: []byte{i, 1}, + rangeOffsets: rangeOffsets{ + Size: 50 * units.MiB, + Keys: 100_000, + }, + } + props.add(rangeProps) + } + return props, nil +} + +type panicSplitRegionClient struct{} + +func (p panicSplitRegionClient) BeforeSplitRegion(context.Context, *split.RegionInfo, [][]byte) (*split.RegionInfo, [][]byte) { + panic("should not be called") +} + +func (p panicSplitRegionClient) AfterSplitRegion(context.Context, *split.RegionInfo, [][]byte, []*split.RegionInfo, error) ([]*split.RegionInfo, error) { + panic("should not be called") +} + +func (p panicSplitRegionClient) BeforeScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]byte, []byte, int) { + return key, endKey, limit +} + +func (p panicSplitRegionClient) AfterScanRegions(infos []*split.RegionInfo, err error) ([]*split.RegionInfo, error) { + return infos, err +} + +func TestSplitRangeAgain4BigRegion(t *testing.T) { + backup := getSizePropertiesFn + getSizePropertiesFn = mockGetSizeProperties + t.Cleanup(func() { + getSizePropertiesFn = backup + }) + + local := &Backend{ + splitCli: initTestSplitClient( + [][]byte{{1}, {11}}, // we have one big region + panicSplitRegionClient{}, // make sure no further split region + ), + } + db, tmpPath := makePebbleDB(t, nil) + _, engineUUID := backend.MakeUUID("ww", 0) + ctx := context.Background() + engineCtx, cancel := context.WithCancel(context.Background()) + f := &Engine{ + db: db, + UUID: engineUUID, + sstDir: tmpPath, + ctx: engineCtx, + cancel: cancel, + sstMetasChan: make(chan metaOrFlush, 64), + keyAdapter: noopKeyAdapter{}, + logger: log.L(), + } + // keys starts with 0 is meta keys, so we start with 1. + for i := byte(1); i <= 10; i++ { + err := f.db.Set([]byte{i}, []byte{i}, nil) + require.NoError(t, err) + err = f.db.Set([]byte{i, 1}, []byte{i, 1}, nil) + require.NoError(t, err) + } + + bigRegionRange := []Range{{start: []byte{1}, end: []byte{11}}} + jobCh := make(chan *regionJob, 10) + jobWg := sync.WaitGroup{} + err := local.generateAndSendJob( + ctx, + f, + bigRegionRange, + 10*units.GB, + 1<<30, + jobCh, + &jobWg, + ) + require.NoError(t, err) + require.Len(t, jobCh, 10) + for i := 0; i < 10; i++ { + job := <-jobCh + require.Equal(t, []byte{byte(i + 1)}, job.keyRange.start) + require.Equal(t, []byte{byte(i + 2)}, job.keyRange.end) + jobWg.Done() + } + jobWg.Wait() +} + +func getSuccessInjectedBehaviour() []injectedBehaviour { + return []injectedBehaviour{ + { + write: injectedWriteBehaviour{ + result: &tikvWriteResult{ + remainingStartKey: nil, + }, + }, + }, + { + ingest: injectedIngestBehaviour{ + nextStage: ingested, + }, + }, + } +} + +func getNeedRescanWhenIngestBehaviour() []injectedBehaviour { + return []injectedBehaviour{ + { + write: injectedWriteBehaviour{ + result: &tikvWriteResult{ + remainingStartKey: nil, + }, + }, + }, + { + ingest: injectedIngestBehaviour{ + nextStage: needRescan, + err: common.ErrKVEpochNotMatch, + }, + }, + } +} + +func TestDoImport(t *testing.T) { + backup := maxRetryBackoffSecond + maxRetryBackoffSecond = 1 + t.Cleanup(func() { + maxRetryBackoffSecond = backup + }) + + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter", "return()") + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/fakeRegionJobs", "return()") + t.Cleanup(func() { + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter") + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/fakeRegionJobs") + }) + + // test that + // - one job need rescan when ingest + // - one job need retry when write + + initRanges := []Range{ + {start: []byte{'a'}, end: []byte{'b'}}, + {start: []byte{'b'}, end: []byte{'c'}}, + {start: []byte{'c'}, end: []byte{'d'}}, + } + fakeRegionJobs = map[[2]string]struct { + jobs []*regionJob + err error + }{ + {"a", "b"}: { + jobs: []*regionJob{ + { + keyRange: Range{start: []byte{'a'}, end: []byte{'b'}}, + engine: &Engine{}, + injected: getSuccessInjectedBehaviour(), + }, + }, + }, + {"b", "c"}: { + jobs: []*regionJob{ + { + keyRange: Range{start: []byte{'b'}, end: []byte{'c'}}, + engine: &Engine{}, + injected: []injectedBehaviour{ + { + write: injectedWriteBehaviour{ + result: &tikvWriteResult{ + remainingStartKey: []byte{'b', '2'}, + }, + }, + }, + { + ingest: injectedIngestBehaviour{ + nextStage: ingested, + }, + }, + { + write: injectedWriteBehaviour{ + result: &tikvWriteResult{ + remainingStartKey: nil, + }, + }, + }, + { + ingest: injectedIngestBehaviour{ + nextStage: ingested, + }, + }, + }, + }, + }, + }, + {"c", "d"}: { + jobs: []*regionJob{ + { + keyRange: Range{start: []byte{'c'}, end: []byte{'c', '2'}}, + engine: &Engine{}, + injected: getNeedRescanWhenIngestBehaviour(), + }, + { + keyRange: Range{start: []byte{'c', '2'}, end: []byte{'d'}}, + engine: &Engine{}, + injected: []injectedBehaviour{ + { + write: injectedWriteBehaviour{ + // a retryable error + err: errors.New("is not fully replicated"), + }, + }, + }, + }, + }, + }, + {"c", "c2"}: { + jobs: []*regionJob{ + { + keyRange: Range{start: []byte{'c'}, end: []byte{'c', '2'}}, + engine: &Engine{}, + injected: getSuccessInjectedBehaviour(), + }, + }, + }, + {"c2", "d"}: { + jobs: []*regionJob{ + { + keyRange: Range{start: []byte{'c', '2'}, end: []byte{'d'}}, + engine: &Engine{}, + injected: getSuccessInjectedBehaviour(), + }, + }, + }, + } + + ctx := context.Background() + l := &Backend{ + BackendConfig: BackendConfig{WorkerConcurrency: 2}, + } + e := &Engine{} + err := l.doImport(ctx, e, initRanges, int64(config.SplitRegionSize), int64(config.SplitRegionKeys)) + require.NoError(t, err) + for _, v := range fakeRegionJobs { + for _, job := range v.jobs { + require.Len(t, job.injected, 0) + } + } + + // test first call to generateJobForRange meet error + + fakeRegionJobs = map[[2]string]struct { + jobs []*regionJob + err error + }{ + {"a", "b"}: { + jobs: []*regionJob{ + { + keyRange: Range{start: []byte{'a'}, end: []byte{'b'}}, + engine: &Engine{}, + injected: getSuccessInjectedBehaviour(), + }, + }, + }, + {"b", "c"}: { + err: errors.New("meet error when generateJobForRange"), + }, + } + err = l.doImport(ctx, e, initRanges, int64(config.SplitRegionSize), int64(config.SplitRegionKeys)) + require.ErrorContains(t, err, "meet error when generateJobForRange") + + // test second call to generateJobForRange (needRescan) meet error + + fakeRegionJobs = map[[2]string]struct { + jobs []*regionJob + err error + }{ + {"a", "b"}: { + jobs: []*regionJob{ + { + keyRange: Range{start: []byte{'a'}, end: []byte{'a', '2'}}, + engine: &Engine{}, + injected: getNeedRescanWhenIngestBehaviour(), + }, + { + keyRange: Range{start: []byte{'a', '2'}, end: []byte{'b'}}, + engine: &Engine{}, + injected: getSuccessInjectedBehaviour(), + }, + }, + }, + {"b", "c"}: { + jobs: []*regionJob{ + { + keyRange: Range{start: []byte{'b'}, end: []byte{'c'}}, + engine: &Engine{}, + injected: getSuccessInjectedBehaviour(), + }, + }, + }, + {"c", "d"}: { + jobs: []*regionJob{ + { + keyRange: Range{start: []byte{'c'}, end: []byte{'d'}}, + engine: &Engine{}, + injected: getSuccessInjectedBehaviour(), + }, + }, + }, + {"a", "a2"}: { + err: errors.New("meet error when generateJobForRange again"), + }, + } + err = l.doImport(ctx, e, initRanges, int64(config.SplitRegionSize), int64(config.SplitRegionKeys)) + require.ErrorContains(t, err, "meet error when generateJobForRange again") + + // test write meet unretryable error + maxRetryBackoffSecond = 100 + l.WorkerConcurrency = 1 + fakeRegionJobs = map[[2]string]struct { + jobs []*regionJob + err error + }{ + {"a", "b"}: { + jobs: []*regionJob{ + { + keyRange: Range{start: []byte{'a'}, end: []byte{'b'}}, + engine: &Engine{}, + retryCount: maxWriteAndIngestRetryTimes - 1, + injected: getSuccessInjectedBehaviour(), + }, + }, + }, + {"b", "c"}: { + jobs: []*regionJob{ + { + keyRange: Range{start: []byte{'b'}, end: []byte{'c'}}, + engine: &Engine{}, + retryCount: maxWriteAndIngestRetryTimes - 1, + injected: getSuccessInjectedBehaviour(), + }, + }, + }, + {"c", "d"}: { + jobs: []*regionJob{ + { + keyRange: Range{start: []byte{'c'}, end: []byte{'d'}}, + engine: &Engine{}, + retryCount: maxWriteAndIngestRetryTimes - 2, + injected: []injectedBehaviour{ + { + write: injectedWriteBehaviour{ + // unretryable error + err: errors.New("fatal error"), + }, + }, + }, + }, + }, + }, + } + err = l.doImport(ctx, e, initRanges, int64(config.SplitRegionSize), int64(config.SplitRegionKeys)) + require.ErrorContains(t, err, "fatal error") + for _, v := range fakeRegionJobs { + for _, job := range v.jobs { + require.Len(t, job.injected, 0) + } + } } +>>>>>>> 44aa4cf3cd1 (lightning: generate and send region job more smoothly (#42780)) diff --git a/br/pkg/lightning/backend/local/region_job.go b/br/pkg/lightning/backend/local/region_job.go index 16288eaa44c96..ca854f42e5ad2 100644 --- a/br/pkg/lightning/backend/local/region_job.go +++ b/br/pkg/lightning/backend/local/region_job.go @@ -15,8 +15,10 @@ package local import ( + "container/heap" "context" "strings" + "sync" "time" "github.com/cockroachdb/pebble" @@ -32,7 +34,6 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/lightning/metric" "github.com/pingcap/tidb/br/pkg/logutil" - "github.com/pingcap/tidb/br/pkg/membuf" "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/mathutil" @@ -41,12 +42,37 @@ import ( type jobStageTp string -// nil -> regionScanned: create a new region job -// regionScanned -> wrote: write the data to TiKV -// wrote -> ingested: ingest the data to TiKV -// ingested -> nil: finish the job -// regionScanned / wrote -> needRescan: need to rescan the data, maybe region is expanded. -// needRescan -> nil: discard the job. caller will create a new job from unfinishedRanges. +/* + + + v + +------+------+ + +->+regionScanned+<------+ + | +------+------+ | + | | | + | | | + | v | + | +--+--+ +-----+----+ + | |wrote+---->+needRescan| + | +--+--+ +-----+----+ + | | ^ + | | | + | v | + | +---+----+ | + +-----+ingested+---------+ + +---+----+ + | + v + +above diagram shows the state transition of a region job, here are some special +cases: + - regionScanned can directly jump to ingested if the keyRange has no data + - regionScanned can only transit to wrote. TODO: check if it should be transited + to needRescan + - if a job only partially writes the data, after it becomes ingested, it will + update its keyRange and transits to regionScanned to continue the remaining + data + - needRescan may output multiple regionScanned jobs when the old region is split +*/ const ( regionScanned jobStageTp = "regionScanned" wrote jobStageTp = "wrote" @@ -58,34 +84,51 @@ func (j jobStageTp) String() string { return string(j) } -// regionJob is dedicated to import the data in [keyRange.start, keyRange.end) to a region. +// regionJob is dedicated to import the data in [keyRange.start, keyRange.end) +// to a region. The keyRange may be changed when processing because of writing +// partial data to TiKV or region split. type regionJob struct { keyRange Range // TODO: check the keyRange so that it's always included in region region *split.RegionInfo // stage should be updated only by convertStageTo stage jobStageTp + // writeResult is available only in wrote and ingested stage + writeResult *tikvWriteResult engine *Engine regionSplitSize int64 regionSplitKeys int64 metrics *metric.Metrics - // below fields are available after wrote stage - writeResult *tikvWriteResult retryCount int waitUntil time.Time lastRetryableErr error + + // injected is used in test to set the behaviour + injected []injectedBehaviour } type tikvWriteResult struct { - sstMeta []*sst.SSTMeta - rangeStats rangeStats + sstMeta []*sst.SSTMeta + count int64 + totalBytes int64 + remainingStartKey []byte +} + +type injectedBehaviour struct { + write injectedWriteBehaviour + ingest injectedIngestBehaviour +} + +type injectedWriteBehaviour struct { + result *tikvWriteResult + err error } -type rangeStats struct { - count int64 - totalBytes int64 +type injectedIngestBehaviour struct { + nextStage jobStageTp + err error } func (j *regionJob) convertStageTo(stage jobStageTp) { @@ -94,42 +137,56 @@ func (j *regionJob) convertStageTo(stage jobStageTp) { case regionScanned: j.writeResult = nil case ingested: - j.engine.finishedRanges.add(j.keyRange) - - // when writing is skipped because range is empty + // when writing is skipped because key range is empty if j.writeResult == nil { return } - j.engine.importedKVSize.Add(j.writeResult.rangeStats.totalBytes) - j.engine.importedKVCount.Add(j.writeResult.rangeStats.count) + j.engine.importedKVSize.Add(j.writeResult.totalBytes) + j.engine.importedKVCount.Add(j.writeResult.count) if j.metrics != nil { +<<<<<<< HEAD j.metrics.BytesCounter.WithLabelValues(metric.BytesStateImported). Add(float64(j.writeResult.rangeStats.totalBytes)) +======= + j.metrics.BytesCounter.WithLabelValues(metric.StateImported). + Add(float64(j.writeResult.totalBytes)) +>>>>>>> 44aa4cf3cd1 (lightning: generate and send region job more smoothly (#42780)) } + case needRescan: + j.region = nil } } // writeToTiKV writes the data to TiKV and mark this job as wrote stage. -// if any write logic has error, writeToTiKV will set job to a proper stage and return nil. +// if any write logic has error, writeToTiKV will set job to a proper stage and return nil. TODO: <-check this // if any underlying logic has error, writeToTiKV will return an error. // we don't need to do cleanup for the pairs written to tikv if encounters an error, // tikv will take the responsibility to do so. // TODO: let client-go provide a high-level write interface. -func (j *regionJob) writeToTiKV( - ctx context.Context, - apiVersion kvrpcpb.APIVersion, - clientFactory ImportClientFactory, - kvBatchSize int, - bufferPool *membuf.Pool, - writeLimiter StoreWriteLimiter, -) error { +func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error { if j.stage != regionScanned { return nil } + failpoint.Inject("fakeRegionJobs", func() { + front := j.injected[0] + j.injected = j.injected[1:] + j.writeResult = front.write.result + err := front.write.err + if err == nil { + j.convertStageTo(wrote) + } + failpoint.Return(err) + }) + + apiVersion := local.tikvCodec.GetAPIVersion() + clientFactory := local.importClientFactory + kvBatchSize := local.KVWriteBatchSize + bufferPool := local.bufferPool + writeLimiter := local.writeLimiter + begin := time.Now() - stats := rangeStats{} region := j.region.Region firstKey, lastKey, err := j.engine.getFirstAndLastKey(j.keyRange.start, j.keyRange.end) @@ -216,7 +273,6 @@ func (j *regionJob) writeToTiKV( if err := writeLimiter.WaitN(ctx, storeIDs[i], int(size)); err != nil { return errors.Trace(err) } - // TODO: concurrent write? requests[i].Chunk.(*sst.WriteRequest_Batch).Batch.Pairs = pairs[:count] if err := clients[i].Send(requests[i]); err != nil { return errors.Trace(err) @@ -230,6 +286,7 @@ func (j *regionJob) writeToTiKV( //nolint: errcheck defer iter.Close() + var remainingStartKey []byte for iter.First(); iter.Valid(); iter.Next() { kvSize := int64(len(iter.Key()) + len(iter.Value())) // here we reuse the `*sst.Pair`s to optimize object allocation @@ -258,17 +315,14 @@ func (j *regionJob) writeToTiKV( } if totalSize >= regionMaxSize || totalCount >= j.regionSplitKeys { // we will shrink the key range of this job to real written range - if iter.Valid() && iter.Next() { - firstKey := append([]byte{}, iter.Key()...) - oldEndKey := j.keyRange.end - j.keyRange.end = firstKey + if iter.Next() { + remainingStartKey = append([]byte{}, iter.Key()...) log.FromContext(ctx).Info("write to tikv partial finish", zap.Int64("count", totalCount), zap.Int64("size", totalSize), logutil.Key("startKey", j.keyRange.start), - logutil.Key("endKey", oldEndKey), - logutil.Key("remainStart", firstKey), - logutil.Key("remainEnd", oldEndKey), + logutil.Key("endKey", j.keyRange.end), + logutil.Key("remainStart", remainingStartKey), logutil.Region(region), logutil.Leader(j.region.Leader)) } @@ -321,11 +375,11 @@ func (j *regionJob) writeToTiKV( zap.Int64("buf_size", bytesBuf.TotalSize()), zap.Stringer("takeTime", time.Since(begin))) - stats.count = totalCount - stats.totalBytes = totalSize j.writeResult = &tikvWriteResult{ - sstMeta: leaderPeerMetas, - rangeStats: stats, + sstMeta: leaderPeerMetas, + count: totalCount, + totalBytes: totalSize, + remainingStartKey: remainingStartKey, } j.convertStageTo(wrote) return nil @@ -336,26 +390,26 @@ func (j *regionJob) writeToTiKV( // set job to a proper stage with nil error returned. // if any underlying logic has error, ingest will return an error to let caller // handle it. -func (j *regionJob) ingest( - ctx context.Context, - clientFactory ImportClientFactory, - splitCli split.SplitClient, - supportMultiIngest bool, - shouldCheckWriteStall bool, -) error { - switch j.stage { - case regionScanned, ingested: +func (local *Backend) ingest(ctx context.Context, j *regionJob) error { + splitCli := local.splitCli + if j.stage != wrote { return nil - case wrote: } + failpoint.Inject("fakeRegionJobs", func() { + front := j.injected[0] + j.injected = j.injected[1:] + j.convertStageTo(front.ingest.nextStage) + failpoint.Return(front.ingest.err) + }) + if len(j.writeResult.sstMeta) == 0 { j.convertStageTo(ingested) return nil } for retry := 0; retry < maxRetryTimes; retry++ { - resp, err := j.doIngest(ctx, clientFactory, supportMultiIngest, shouldCheckWriteStall) + resp, err := local.doIngest(ctx, j) if err == nil && resp.GetError() == nil { j.convertStageTo(ingested) return nil @@ -369,7 +423,7 @@ func (j *regionJob) ingest( logutil.Region(j.region.Region), logutil.Leader(j.region.Leader)) continue } - canContinue, err := j.fixIngestError(ctx, resp, splitCli) + canContinue, err := j.convertStageOnIngestError(ctx, resp, splitCli) if common.IsContextCanceledError(err) { return err } @@ -377,25 +431,29 @@ func (j *regionJob) ingest( log.FromContext(ctx).Warn("meet error and handle the job later", zap.Stringer("job stage", j.stage), logutil.ShortError(j.lastRetryableErr), - logutil.Region(j.region.Region), + j.region.ToZapFields(), logutil.Key("start", j.keyRange.start), logutil.Key("end", j.keyRange.end)) return nil } - log.FromContext(ctx).Warn("meet error and will doIngest region, again", + log.FromContext(ctx).Warn("meet error and will doIngest region again", logutil.ShortError(j.lastRetryableErr), - logutil.Region(j.region.Region), + j.region.ToZapFields(), logutil.Key("start", j.keyRange.start), logutil.Key("end", j.keyRange.end)) } return nil } +<<<<<<< HEAD func (j *regionJob) checkWriteStall( +======= +func (local *Backend) checkWriteStall( +>>>>>>> 44aa4cf3cd1 (lightning: generate and send region job more smoothly (#42780)) ctx context.Context, region *split.RegionInfo, - clientFactory ImportClientFactory, ) (bool, *sst.IngestResponse, error) { + clientFactory := local.importClientFactory for _, peer := range region.Region.GetPeers() { cli, err := clientFactory.Create(ctx, peer.StoreId) if err != nil { @@ -416,14 +474,12 @@ func (j *regionJob) checkWriteStall( // doIngest send ingest commands to TiKV based on regionJob.writeResult.sstMeta. // When meet error, it will remove finished sstMetas before return. -func (j *regionJob) doIngest( - ctx context.Context, - clientFactory ImportClientFactory, - supportMultiIngest bool, - shouldCheckWriteStall bool, -) (*sst.IngestResponse, error) { +func (local *Backend) doIngest(ctx context.Context, j *regionJob) (*sst.IngestResponse, error) { + clientFactory := local.importClientFactory + supportMultiIngest := local.supportMultiIngest + shouldCheckWriteStall := local.ShouldCheckWriteStall if shouldCheckWriteStall { - writeStall, resp, err := j.checkWriteStall(ctx, j.region, clientFactory) + writeStall, resp, err := local.checkWriteStall(ctx, j.region) if err != nil { return nil, errors.Trace(err) } @@ -507,11 +563,11 @@ func (j *regionJob) doIngest( return resp, nil } -// fixIngestError will try to fix the error contained in ingest response. +// convertStageOnIngestError will try to fix the error contained in ingest response. // Return (_, error) when another error occurred. // Return (true, nil) when the job can retry ingesting immediately. // Return (false, nil) when the job should be put back to queue. -func (j *regionJob) fixIngestError( +func (j *regionJob) convertStageOnIngestError( ctx context.Context, resp *sst.IngestResponse, splitCli split.SplitClient, @@ -619,6 +675,8 @@ func (j *regionJob) fixIngestError( j.convertStageTo(needRescan) return false, nil case errPb.DiskFull != nil: + j.lastRetryableErr = common.ErrKVIngestFailed.GenWithStack(errPb.GetMessage()) + return false, errors.Errorf("non-retryable error: %s", resp.GetError().GetMessage()) } // all others doIngest error, such as stale command, etc. we'll retry it again from writeAndIngestByRange @@ -626,3 +684,156 @@ func (j *regionJob) fixIngestError( j.convertStageTo(regionScanned) return false, nil } + +type regionJobRetryHeap []*regionJob + +var _ heap.Interface = (*regionJobRetryHeap)(nil) + +func (h *regionJobRetryHeap) Len() int { + return len(*h) +} + +func (h *regionJobRetryHeap) Less(i, j int) bool { + v := *h + return v[i].waitUntil.Before(v[j].waitUntil) +} + +func (h *regionJobRetryHeap) Swap(i, j int) { + v := *h + v[i], v[j] = v[j], v[i] +} + +func (h *regionJobRetryHeap) Push(x any) { + *h = append(*h, x.(*regionJob)) +} + +func (h *regionJobRetryHeap) Pop() any { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +// regionJobRetryer is a concurrent-safe queue holding jobs that need to put +// back later, and put back when the regionJob.waitUntil is reached. It maintains +// a heap of jobs internally based on the regionJob.waitUntil field. +type regionJobRetryer struct { + // lock acquiring order: protectedClosed > protectedQueue > protectedToPutBack + protectedClosed struct { + mu sync.Mutex + closed bool + } + protectedQueue struct { + mu sync.Mutex + q regionJobRetryHeap + } + protectedToPutBack struct { + mu sync.Mutex + toPutBack *regionJob + } + putBackCh chan<- *regionJob + reload chan struct{} + jobWg *sync.WaitGroup +} + +// startRegionJobRetryer starts a new regionJobRetryer and it will run in +// background to put the job back to `putBackCh` when job's waitUntil is reached. +// Cancel the `ctx` will stop retryer and `jobWg.Done` will be trigger for jobs +// that are not put back yet. +func startRegionJobRetryer( + ctx context.Context, + putBackCh chan<- *regionJob, + jobWg *sync.WaitGroup, +) *regionJobRetryer { + ret := ®ionJobRetryer{ + putBackCh: putBackCh, + reload: make(chan struct{}, 1), + jobWg: jobWg, + } + ret.protectedQueue.q = make(regionJobRetryHeap, 0, 16) + go ret.run(ctx) + return ret +} + +// run is only internally used, caller should not use it. +func (q *regionJobRetryer) run(ctx context.Context) { + defer q.close() + + for { + var front *regionJob + q.protectedQueue.mu.Lock() + if len(q.protectedQueue.q) > 0 { + front = q.protectedQueue.q[0] + } + q.protectedQueue.mu.Unlock() + + switch { + case front != nil: + select { + case <-ctx.Done(): + return + case <-q.reload: + case <-time.After(time.Until(front.waitUntil)): + q.protectedQueue.mu.Lock() + q.protectedToPutBack.mu.Lock() + q.protectedToPutBack.toPutBack = heap.Pop(&q.protectedQueue.q).(*regionJob) + // release the lock of queue to avoid blocking regionJobRetryer.push + q.protectedQueue.mu.Unlock() + + // hold the lock of toPutBack to make sending to putBackCh and + // resetting toPutBack atomic w.r.t. regionJobRetryer.close + select { + case <-ctx.Done(): + q.protectedToPutBack.mu.Unlock() + return + case q.putBackCh <- q.protectedToPutBack.toPutBack: + q.protectedToPutBack.toPutBack = nil + q.protectedToPutBack.mu.Unlock() + } + } + default: + // len(q.q) == 0 + select { + case <-ctx.Done(): + return + case <-q.reload: + } + } + } +} + +// close is only internally used, caller should not use it. +func (q *regionJobRetryer) close() { + q.protectedClosed.mu.Lock() + defer q.protectedClosed.mu.Unlock() + q.protectedClosed.closed = true + + count := len(q.protectedQueue.q) + if q.protectedToPutBack.toPutBack != nil { + count++ + } + for count > 0 { + q.jobWg.Done() + count-- + } +} + +// push should not be blocked for long time in any cases. +func (q *regionJobRetryer) push(job *regionJob) bool { + q.protectedClosed.mu.Lock() + defer q.protectedClosed.mu.Unlock() + if q.protectedClosed.closed { + return false + } + + q.protectedQueue.mu.Lock() + heap.Push(&q.protectedQueue.q, job) + q.protectedQueue.mu.Unlock() + + select { + case q.reload <- struct{}{}: + default: + } + return true +} diff --git a/br/pkg/lightning/backend/local/region_job_test.go b/br/pkg/lightning/backend/local/region_job_test.go index 21723ca116148..db555ff20d0fc 100644 --- a/br/pkg/lightning/backend/local/region_job_test.go +++ b/br/pkg/lightning/backend/local/region_job_test.go @@ -16,7 +16,9 @@ package local import ( "context" + "sync" "testing" + "time" "github.com/pingcap/kvproto/pkg/errorpb" sst "github.com/pingcap/kvproto/pkg/import_sstpb" @@ -77,7 +79,7 @@ func TestIsIngestRetryable(t *testing.T) { } clone := job - canContinueIngest, err := (&clone).fixIngestError(ctx, resp, splitCli) + canContinueIngest, err := (&clone).convertStageOnIngestError(ctx, resp, splitCli) require.NoError(t, err) require.True(t, canContinueIngest) require.Equal(t, wrote, clone.stage) @@ -104,7 +106,7 @@ func TestIsIngestRetryable(t *testing.T) { }, } clone = job - canContinueIngest, err = (&clone).fixIngestError(ctx, resp, splitCli) + canContinueIngest, err = (&clone).convertStageOnIngestError(ctx, resp, splitCli) require.NoError(t, err) require.False(t, canContinueIngest) require.Equal(t, regionScanned, clone.stage) @@ -129,7 +131,7 @@ func TestIsIngestRetryable(t *testing.T) { }, } clone = job - canContinueIngest, err = (&clone).fixIngestError(ctx, resp, splitCli) + canContinueIngest, err = (&clone).convertStageOnIngestError(ctx, resp, splitCli) require.NoError(t, err) require.False(t, canContinueIngest) require.Equal(t, needRescan, clone.stage) @@ -139,7 +141,7 @@ func TestIsIngestRetryable(t *testing.T) { resp.Error = &errorpb.Error{Message: "raft: proposal dropped"} clone = job - canContinueIngest, err = (&clone).fixIngestError(ctx, resp, splitCli) + canContinueIngest, err = (&clone).convertStageOnIngestError(ctx, resp, splitCli) require.NoError(t, err) require.False(t, canContinueIngest) require.Equal(t, needRescan, clone.stage) @@ -153,7 +155,7 @@ func TestIsIngestRetryable(t *testing.T) { }, } clone = job - canContinueIngest, err = (&clone).fixIngestError(ctx, resp, splitCli) + canContinueIngest, err = (&clone).convertStageOnIngestError(ctx, resp, splitCli) require.NoError(t, err) require.False(t, canContinueIngest) require.Equal(t, needRescan, clone.stage) @@ -165,7 +167,7 @@ func TestIsIngestRetryable(t *testing.T) { DiskFull: &errorpb.DiskFull{}, } clone = job - _, err = (&clone).fixIngestError(ctx, resp, splitCli) + _, err = (&clone).convertStageOnIngestError(ctx, resp, splitCli) require.ErrorContains(t, err, "non-retryable error") // a general error is retryable from writing @@ -174,10 +176,88 @@ func TestIsIngestRetryable(t *testing.T) { StaleCommand: &errorpb.StaleCommand{}, } clone = job - canContinueIngest, err = (&clone).fixIngestError(ctx, resp, splitCli) + canContinueIngest, err = (&clone).convertStageOnIngestError(ctx, resp, splitCli) require.NoError(t, err) require.False(t, canContinueIngest) require.Equal(t, regionScanned, clone.stage) require.Nil(t, clone.writeResult) require.Error(t, clone.lastRetryableErr) } + +func TestRegionJobRetryer(t *testing.T) { + var ( + putBackCh = make(chan *regionJob, 10) + jobWg sync.WaitGroup + ctx, cancel = context.WithCancel(context.Background()) + ) + retryer := startRegionJobRetryer(ctx, putBackCh, &jobWg) + require.Len(t, putBackCh, 0) + + for i := 0; i < 8; i++ { + go func() { + job := ®ionJob{ + waitUntil: time.Now().Add(time.Hour), + } + jobWg.Add(1) + ok := retryer.push(job) + require.True(t, ok) + }() + } + select { + case <-putBackCh: + require.Fail(t, "should not put back so soon") + case <-time.After(500 * time.Millisecond): + } + + job := ®ionJob{ + keyRange: Range{ + start: []byte("123"), + }, + waitUntil: time.Now().Add(-time.Second), + } + jobWg.Add(1) + ok := retryer.push(job) + require.True(t, ok) + select { + case j := <-putBackCh: + jobWg.Done() + require.Equal(t, job, j) + case <-time.After(5 * time.Second): + require.Fail(t, "should put back very quickly") + } + + cancel() + jobWg.Wait() + ok = retryer.push(job) + require.False(t, ok) + + // test when putBackCh is blocked, retryer.push is not blocked and + // the return value of retryer.close is correct + + ctx, cancel = context.WithCancel(context.Background()) + putBackCh = make(chan *regionJob) + retryer = startRegionJobRetryer(ctx, putBackCh, &jobWg) + + job = ®ionJob{ + keyRange: Range{ + start: []byte("123"), + }, + waitUntil: time.Now().Add(-time.Second), + } + jobWg.Add(1) + ok = retryer.push(job) + require.True(t, ok) + time.Sleep(3 * time.Second) + // now retryer is sending to putBackCh, but putBackCh is blocked + job = ®ionJob{ + keyRange: Range{ + start: []byte("456"), + }, + waitUntil: time.Now().Add(-time.Second), + } + jobWg.Add(1) + ok = retryer.push(job) + require.True(t, ok) + cancel() + jobWg.Wait() +} diff --git a/br/pkg/restore/split/region.go b/br/pkg/restore/split/region.go index 856c3c95793e2..d8b51db2e799f 100644 --- a/br/pkg/restore/split/region.go +++ b/br/pkg/restore/split/region.go @@ -6,6 +6,8 @@ import ( "bytes" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/br/pkg/logutil" + "go.uber.org/zap" ) // RegionInfo includes a region and the leader of the region. @@ -23,3 +25,11 @@ func (region *RegionInfo) ContainsInterior(key []byte) bool { (len(region.Region.GetEndKey()) == 0 || bytes.Compare(key, region.Region.GetEndKey()) < 0) } + +// ToZapFields returns zap fields for the RegionInfo. It can handle nil RegionInfo. +func (region *RegionInfo) ToZapFields() zap.Field { + if region == nil { + return zap.Skip() + } + return logutil.Region(region.Region) +}