diff --git a/pkg/ccl/backupccl/generative_split_and_scatter_processor.go b/pkg/ccl/backupccl/generative_split_and_scatter_processor.go index bffa2482329e..c28c40850831 100644 --- a/pkg/ccl/backupccl/generative_split_and_scatter_processor.go +++ b/pkg/ccl/backupccl/generative_split_and_scatter_processor.go @@ -51,7 +51,17 @@ type generativeSplitAndScatterProcessor struct { spec execinfrapb.GenerativeSplitAndScatterSpec output execinfra.RowReceiver - scatterer splitAndScatterer + // chunkSplitAndScatterers contain the splitAndScatterers for the group of + // split and scatter workers that's responsible for splitting and scattering + // the import span chunks. Each worker needs its own scatterer as one cannot + // be used concurrently. + chunkSplitAndScatterers []splitAndScatterer + // chunkEntrySplitAndScatterers contain the splitAndScatterers for the group of + // split workers that's responsible for making splits at each import span + // entry. These scatterers only create splits for the start key of each import + // span and do not perform any scatters. + chunkEntrySplitAndScatterers []splitAndScatterer + // cancelScatterAndWaitForWorker cancels the scatter goroutine and waits for // it to finish. cancelScatterAndWaitForWorker func() @@ -72,24 +82,51 @@ func newGenerativeSplitAndScatterProcessor( post *execinfrapb.PostProcessSpec, output execinfra.RowReceiver, ) (execinfra.Processor, error) { - db := flowCtx.Cfg.DB - kr, err := MakeKeyRewriterFromRekeys(flowCtx.Codec(), spec.TableRekeys, spec.TenantRekeys, - false /* restoreTenantFromStream */) - if err != nil { - return nil, err + numChunkSplitAndScatterWorkers := int(spec.NumNodes) + // numEntrySplitWorkers is set to be 2 * numChunkSplitAndScatterWorkers in + // order to keep up with the rate at which chunks are split and scattered. + // TODO(rui): This tries to cover for a bad scatter by having 2 * the number + // of nodes in the cluster. Does this knob need to be re-tuned? + numEntrySplitWorkers := 2 * numChunkSplitAndScatterWorkers + + mkSplitAndScatterer := func() (splitAndScatterer, error) { + if spec.ValidateOnly { + nodeID, _ := flowCtx.NodeID.OptionalNodeID() + return noopSplitAndScatterer{nodeID}, nil + } + kr, err := MakeKeyRewriterFromRekeys(flowCtx.Codec(), spec.TableRekeys, spec.TenantRekeys, + false /* restoreTenantFromStream */) + if err != nil { + return nil, err + } + return makeSplitAndScatterer(db.KV(), kr), nil } - scatterer := makeSplitAndScatterer(db.KV(), kr) - if spec.ValidateOnly { - nodeID, _ := flowCtx.NodeID.OptionalNodeID() - scatterer = noopSplitAndScatterer{nodeID} + var chunkSplitAndScatterers []splitAndScatterer + for i := 0; i < numChunkSplitAndScatterWorkers; i++ { + scatterer, err := mkSplitAndScatterer() + if err != nil { + return nil, err + } + chunkSplitAndScatterers = append(chunkSplitAndScatterers, scatterer) } + + var chunkEntrySplitAndScatterers []splitAndScatterer + for i := 0; i < numEntrySplitWorkers; i++ { + scatterer, err := mkSplitAndScatterer() + if err != nil { + return nil, err + } + chunkEntrySplitAndScatterers = append(chunkEntrySplitAndScatterers, scatterer) + } + ssp := &generativeSplitAndScatterProcessor{ - flowCtx: flowCtx, - spec: spec, - output: output, - scatterer: scatterer, + flowCtx: flowCtx, + spec: spec, + output: output, + chunkSplitAndScatterers: chunkSplitAndScatterers, + chunkEntrySplitAndScatterers: chunkEntrySplitAndScatterers, // Large enough so that it never blocks. doneScatterCh: make(chan entryNode, spec.NumEntries), routingDatumCache: make(map[roachpb.NodeID]rowenc.EncDatum), @@ -124,7 +161,7 @@ func (gssp *generativeSplitAndScatterProcessor) Start(ctx context.Context) { TaskName: "generativeSplitAndScatter-worker", SpanOpt: stop.ChildSpan, }, func(ctx context.Context) { - gssp.scatterErr = runGenerativeSplitAndScatter(scatterCtx, gssp.flowCtx, &gssp.spec, gssp.scatterer, gssp.doneScatterCh) + gssp.scatterErr = runGenerativeSplitAndScatter(scatterCtx, gssp.flowCtx, &gssp.spec, gssp.chunkSplitAndScatterers, gssp.chunkEntrySplitAndScatterers, gssp.doneScatterCh) cancel() close(gssp.doneScatterCh) close(workerDone) @@ -223,15 +260,19 @@ func runGenerativeSplitAndScatter( ctx context.Context, flowCtx *execinfra.FlowCtx, spec *execinfrapb.GenerativeSplitAndScatterSpec, - scatterer splitAndScatterer, + chunkSplitAndScatterers []splitAndScatterer, + chunkEntrySplitAndScatterers []splitAndScatterer, doneScatterCh chan<- entryNode, ) error { log.Infof(ctx, "Running generative split and scatter with %d total spans, %d chunk size, %d nodes", spec.NumEntries, spec.ChunkSize, spec.NumNodes) g := ctxgroup.WithContext(ctx) - splitWorkers := int(spec.NumNodes) - restoreSpanEntriesCh := make(chan execinfrapb.RestoreSpanEntry, splitWorkers*int(spec.ChunkSize)) + chunkSplitAndScatterWorkers := len(chunkSplitAndScatterers) + restoreSpanEntriesCh := make(chan execinfrapb.RestoreSpanEntry, chunkSplitAndScatterWorkers*int(spec.ChunkSize)) + + // This goroutine generates import spans one at a time and sends them to + // restoreSpanEntriesCh. g.GoCtx(func(ctx context.Context) error { defer close(restoreSpanEntriesCh) @@ -265,7 +306,11 @@ func runGenerativeSplitAndScatter( ) }) - restoreEntryChunksCh := make(chan restoreEntryChunk, splitWorkers) + restoreEntryChunksCh := make(chan restoreEntryChunk, chunkSplitAndScatterWorkers) + + // This goroutine takes the import spans off of restoreSpanEntriesCh and + // groups them into chunks of spec.ChunkSize. These chunks are then sent to + // restoreEntryChunksCh. g.GoCtx(func(ctx context.Context) error { defer close(restoreEntryChunksCh) @@ -288,9 +333,15 @@ func runGenerativeSplitAndScatter( return nil }) - importSpanChunksCh := make(chan scatteredChunk, splitWorkers*2) + importSpanChunksCh := make(chan scatteredChunk, chunkSplitAndScatterWorkers*2) + + // This group of goroutines processes the chunks from restoreEntryChunksCh. + // For each chunk, a split is created at the start key of the next chunk. The + // current chunk is then scattered, and the chunk with its destination is + // passed to importSpanChunksCh. g2 := ctxgroup.WithContext(ctx) - for worker := 0; worker < splitWorkers; worker++ { + for worker := 0; worker < chunkSplitAndScatterWorkers; worker++ { + worker := worker g2.GoCtx(func(ctx context.Context) error { // Chunks' leaseholders should be randomly placed throughout the // cluster. @@ -299,11 +350,11 @@ func runGenerativeSplitAndScatter( if !importSpanChunk.splitKey.Equal(roachpb.Key{}) { // Split at the start of the next chunk, to partition off a // prefix of the space to scatter. - if err := scatterer.split(ctx, flowCtx.Codec(), importSpanChunk.splitKey); err != nil { + if err := chunkSplitAndScatterers[worker].split(ctx, flowCtx.Codec(), importSpanChunk.splitKey); err != nil { return err } } - chunkDestination, err := scatterer.scatter(ctx, flowCtx.Codec(), scatterKey) + chunkDestination, err := chunkSplitAndScatterers[worker].scatter(ctx, flowCtx.Codec(), scatterKey) if err != nil { return err } @@ -335,15 +386,20 @@ func runGenerativeSplitAndScatter( }) } + // This goroutine waits for the chunkSplitAndScatter workers to finish so that + // it can close importSpanChunksCh. g.GoCtx(func(ctx context.Context) error { defer close(importSpanChunksCh) return g2.Wait() }) - // TODO(pbardea): This tries to cover for a bad scatter by having 2 * the - // number of nodes in the cluster. Is it necessary? - splitScatterWorkers := 2 * splitWorkers - for worker := 0; worker < splitScatterWorkers; worker++ { + // This group of goroutines takes chunks that have already been split and + // scattered by the previous worker group. These workers create splits at the + // start key of the span of every entry of every chunk. After a chunk has been + // processed, it is passed to doneScatterCh to signal that the chunk has gone + // through the entire split and scatter process. + for worker := 0; worker < len(chunkEntrySplitAndScatterers); worker++ { + worker := worker g.GoCtx(func(ctx context.Context) error { for importSpanChunk := range importSpanChunksCh { chunkDestination := importSpanChunk.destination @@ -355,7 +411,7 @@ func runGenerativeSplitAndScatter( if nextChunkIdx < len(importSpanChunk.entries) { // Split at the next entry. splitKey = importSpanChunk.entries[nextChunkIdx].Span.Key - if err := scatterer.split(ctx, flowCtx.Codec(), splitKey); err != nil { + if err := chunkEntrySplitAndScatterers[worker].split(ctx, flowCtx.Codec(), splitKey); err != nil { return err } } diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 2511d02eaf1e..9188fe697186 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -314,9 +314,9 @@ func restore( } targetSize := targetRestoreSpanSize.Get(&execCtx.ExecCfg().Settings.SV) - importSpanCh := make(chan execinfrapb.RestoreSpanEntry, 1000) - genSpan := func(ctx context.Context) error { - defer close(importSpanCh) + countSpansCh := make(chan execinfrapb.RestoreSpanEntry, 1000) + genSpan := func(ctx context.Context, spanCh chan execinfrapb.RestoreSpanEntry) error { + defer close(spanCh) return generateAndSendImportSpans( restoreCtx, dataToRestore.getSpans(), @@ -326,7 +326,7 @@ func restore( introducedSpanFrontier, highWaterMark, targetSize, - importSpanCh, + spanCh, simpleImportSpans, ) } @@ -336,17 +336,20 @@ func restore( var countTasks []func(ctx context.Context) error log.Infof(restoreCtx, "rh_debug: starting count task") spanCountTask := func(ctx context.Context) error { - for range importSpanCh { + for range countSpansCh { numImportSpans++ } return nil } - countTasks = append(countTasks, genSpan, spanCountTask) + countTasks = append(countTasks, spanCountTask) + countTasks = append(countTasks, func(ctx context.Context) error { + return genSpan(ctx, countSpansCh) + }) if err := ctxgroup.GoAndWait(restoreCtx, countTasks...); err != nil { return emptyRowCount, errors.Wrapf(err, "counting number of import spans") } - importSpanCh = make(chan execinfrapb.RestoreSpanEntry, 1000) + importSpanCh := make(chan execinfrapb.RestoreSpanEntry, 1000) requestFinishedCh := make(chan struct{}, numImportSpans) // enough buffer to never block // tasks are the concurrent tasks that are run during the restore. var tasks []func(ctx context.Context) error @@ -427,7 +430,10 @@ func restore( } return nil } - tasks = append(tasks, generativeCheckpointLoop, genSpan) + tasks = append(tasks, generativeCheckpointLoop) + tasks = append(tasks, func(ctx context.Context) error { + return genSpan(ctx, importSpanCh) + }) runRestore := func(ctx context.Context) error { return distRestore(