Skip to content

Commit

Permalink
lightning: fix generating region job is serialized (#43123)
Browse files Browse the repository at this point in the history
close #42456
  • Loading branch information
lance6716 authored Apr 19, 2023
1 parent 124abf6 commit 0c3b88c
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 17 deletions.
47 changes: 31 additions & 16 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,8 @@ type BackendConfig struct {
MaxConnPerStore int
// compress type when write or ingest into tikv
ConnCompressType config.CompressionType
// concurrency of generateJobForRange.
RangeConcurrency int
// number of import(write & ingest) workers
WorkerConcurrency int
KVWriteBatchSize int
Expand Down Expand Up @@ -425,6 +427,7 @@ func NewBackendConfig(cfg *config.Config, maxOpenFiles int, keyspaceName string)
LocalStoreDir: cfg.TikvImporter.SortedKVDir,
MaxConnPerStore: cfg.TikvImporter.RangeConcurrency,
ConnCompressType: cfg.TikvImporter.CompressKVPairs,
RangeConcurrency: cfg.TikvImporter.RangeConcurrency,
WorkerConcurrency: cfg.TikvImporter.RangeConcurrency * 2,
KVWriteBatchSize: cfg.TikvImporter.SendKVPairs,
CheckpointEnabled: cfg.Checkpoint.Enable,
Expand Down Expand Up @@ -1129,26 +1132,38 @@ func (local *Backend) generateAndSendJob(
}
logger.Debug("the ranges length write to tikv", zap.Int("length", len(jobRanges)))

for _, r := range jobRanges {
jobs, err := local.generateJobForRange(ctx, engine, r, regionSplitSize, regionSplitKeys)
if err != nil {
return err
}
for _, job := range jobs {
jobWg.Add(1)
eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(local.RangeConcurrency)
for _, jobRange := range jobRanges {
r := jobRange
eg.Go(func() error {
select {
case <-ctx.Done():
// this job is not put into jobToWorkerCh
jobWg.Done()
// if the context is canceled, it means worker has error, the first error can be
// found by worker's error group LATER. if this function returns an error it will
// seize the "first error".
case <-egCtx.Done():
return nil
case jobToWorkerCh <- job:
default:
}
}

jobs, err := local.generateJobForRange(egCtx, engine, r, regionSplitSize, regionSplitKeys)
if err != nil {
return err
}
for _, job := range jobs {
jobWg.Add(1)
select {
case <-egCtx.Done():
// this job is not put into jobToWorkerCh
jobWg.Done()
// if the context is canceled, it means worker has error, the first error can be
// found by worker's error group LATER. if this function returns an error it will
// seize the "first error".
return nil
case jobToWorkerCh <- job:
}
}
return nil
})
}
return nil
return eg.Wait()
}

// fakeRegionJobs is used in test , the injected job can be found by (startKey, endKey).
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1403,6 +1403,7 @@ func TestSplitRangeAgain4BigRegion(t *testing.T) {
panicSplitRegionClient{}, // make sure no further split region
),
}
local.BackendConfig.RangeConcurrency = 1
db, tmpPath := makePebbleDB(t, nil)
_, engineUUID := backend.MakeUUID("ww", 0)
ctx := context.Background()
Expand Down Expand Up @@ -1596,7 +1597,10 @@ func TestDoImport(t *testing.T) {

ctx := context.Background()
l := &Backend{
BackendConfig: BackendConfig{WorkerConcurrency: 2},
BackendConfig: BackendConfig{
RangeConcurrency: 1,
WorkerConcurrency: 2,
},
}
e := &Engine{}
err := l.doImport(ctx, e, initRanges, int64(config.SplitRegionSize), int64(config.SplitRegionKeys))
Expand Down
1 change: 1 addition & 0 deletions executor/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func NewTableImporter(param *JobImportParam, e *LoadDataController) (ti *TableIm
LocalStoreDir: dir,
MaxConnPerStore: config.DefaultRangeConcurrency,
ConnCompressType: config.CompressionNone,
RangeConcurrency: config.DefaultRangeConcurrency,
WorkerConcurrency: config.DefaultRangeConcurrency * 2,
KVWriteBatchSize: config.KVWriteBatchSize,
// todo: local backend report error when the sort-dir already exists & checkpoint disabled.
Expand Down

0 comments on commit 0c3b88c

Please sign in to comment.