Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backupccl: fix key rewriter race in generative split and scatter processor #96313

Merged
merged 1 commit into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 84 additions & 28 deletions pkg/ccl/backupccl/generative_split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,17 @@ type generativeSplitAndScatterProcessor struct {
spec execinfrapb.GenerativeSplitAndScatterSpec
output execinfra.RowReceiver

scatterer splitAndScatterer
// chunkSplitAndScatterers contain the splitAndScatterers for the group of
// split and scatter workers that's responsible for splitting and scattering
// the import span chunks. Each worker needs its own scatterer as one cannot
// be used concurrently.
chunkSplitAndScatterers []splitAndScatterer
// chunkEntrySplitAndScatterers contain the splitAndScatterers for the group of
// split workers that's responsible for making splits at each import span
// entry. These scatterers only create splits for the start key of each import
// span and do not perform any scatters.
chunkEntrySplitAndScatterers []splitAndScatterer

// cancelScatterAndWaitForWorker cancels the scatter goroutine and waits for
// it to finish.
cancelScatterAndWaitForWorker func()
Expand All @@ -72,24 +82,51 @@ func newGenerativeSplitAndScatterProcessor(
post *execinfrapb.PostProcessSpec,
output execinfra.RowReceiver,
) (execinfra.Processor, error) {

db := flowCtx.Cfg.DB
kr, err := MakeKeyRewriterFromRekeys(flowCtx.Codec(), spec.TableRekeys, spec.TenantRekeys,
false /* restoreTenantFromStream */)
if err != nil {
return nil, err
numChunkSplitAndScatterWorkers := int(spec.NumNodes)
// numEntrySplitWorkers is set to be 2 * numChunkSplitAndScatterWorkers in
// order to keep up with the rate at which chunks are split and scattered.
// TODO(rui): This tries to cover for a bad scatter by having 2 * the number
// of nodes in the cluster. Does this knob need to be re-tuned?
numEntrySplitWorkers := 2 * numChunkSplitAndScatterWorkers

mkSplitAndScatterer := func() (splitAndScatterer, error) {
if spec.ValidateOnly {
nodeID, _ := flowCtx.NodeID.OptionalNodeID()
return noopSplitAndScatterer{nodeID}, nil
}
kr, err := MakeKeyRewriterFromRekeys(flowCtx.Codec(), spec.TableRekeys, spec.TenantRekeys,
false /* restoreTenantFromStream */)
if err != nil {
return nil, err
}
return makeSplitAndScatterer(db.KV(), kr), nil
}

scatterer := makeSplitAndScatterer(db.KV(), kr)
if spec.ValidateOnly {
nodeID, _ := flowCtx.NodeID.OptionalNodeID()
scatterer = noopSplitAndScatterer{nodeID}
var chunkSplitAndScatterers []splitAndScatterer
for i := 0; i < numChunkSplitAndScatterWorkers; i++ {
scatterer, err := mkSplitAndScatterer()
if err != nil {
return nil, err
}
chunkSplitAndScatterers = append(chunkSplitAndScatterers, scatterer)
}

var chunkEntrySplitAndScatterers []splitAndScatterer
for i := 0; i < numEntrySplitWorkers; i++ {
scatterer, err := mkSplitAndScatterer()
if err != nil {
return nil, err
}
chunkEntrySplitAndScatterers = append(chunkEntrySplitAndScatterers, scatterer)
}

ssp := &generativeSplitAndScatterProcessor{
flowCtx: flowCtx,
spec: spec,
output: output,
scatterer: scatterer,
flowCtx: flowCtx,
spec: spec,
output: output,
chunkSplitAndScatterers: chunkSplitAndScatterers,
chunkEntrySplitAndScatterers: chunkEntrySplitAndScatterers,
// Large enough so that it never blocks.
doneScatterCh: make(chan entryNode, spec.NumEntries),
routingDatumCache: make(map[roachpb.NodeID]rowenc.EncDatum),
Expand Down Expand Up @@ -124,7 +161,7 @@ func (gssp *generativeSplitAndScatterProcessor) Start(ctx context.Context) {
TaskName: "generativeSplitAndScatter-worker",
SpanOpt: stop.ChildSpan,
}, func(ctx context.Context) {
gssp.scatterErr = runGenerativeSplitAndScatter(scatterCtx, gssp.flowCtx, &gssp.spec, gssp.scatterer, gssp.doneScatterCh)
gssp.scatterErr = runGenerativeSplitAndScatter(scatterCtx, gssp.flowCtx, &gssp.spec, gssp.chunkSplitAndScatterers, gssp.chunkEntrySplitAndScatterers, gssp.doneScatterCh)
cancel()
close(gssp.doneScatterCh)
close(workerDone)
Expand Down Expand Up @@ -223,15 +260,19 @@ func runGenerativeSplitAndScatter(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
spec *execinfrapb.GenerativeSplitAndScatterSpec,
scatterer splitAndScatterer,
chunkSplitAndScatterers []splitAndScatterer,
chunkEntrySplitAndScatterers []splitAndScatterer,
doneScatterCh chan<- entryNode,
) error {
log.Infof(ctx, "Running generative split and scatter with %d total spans, %d chunk size, %d nodes",
spec.NumEntries, spec.ChunkSize, spec.NumNodes)
g := ctxgroup.WithContext(ctx)

splitWorkers := int(spec.NumNodes)
restoreSpanEntriesCh := make(chan execinfrapb.RestoreSpanEntry, splitWorkers*int(spec.ChunkSize))
chunkSplitAndScatterWorkers := len(chunkSplitAndScatterers)
restoreSpanEntriesCh := make(chan execinfrapb.RestoreSpanEntry, chunkSplitAndScatterWorkers*int(spec.ChunkSize))

// This goroutine generates import spans one at a time and sends them to
// restoreSpanEntriesCh.
g.GoCtx(func(ctx context.Context) error {
defer close(restoreSpanEntriesCh)

Expand Down Expand Up @@ -265,7 +306,11 @@ func runGenerativeSplitAndScatter(
)
})

restoreEntryChunksCh := make(chan restoreEntryChunk, splitWorkers)
restoreEntryChunksCh := make(chan restoreEntryChunk, chunkSplitAndScatterWorkers)

// This goroutine takes the import spans off of restoreSpanEntriesCh and
// groups them into chunks of spec.ChunkSize. These chunks are then sent to
// restoreEntryChunksCh.
g.GoCtx(func(ctx context.Context) error {
defer close(restoreEntryChunksCh)

Expand All @@ -288,9 +333,15 @@ func runGenerativeSplitAndScatter(
return nil
})

importSpanChunksCh := make(chan scatteredChunk, splitWorkers*2)
importSpanChunksCh := make(chan scatteredChunk, chunkSplitAndScatterWorkers*2)

// This group of goroutines processes the chunks from restoreEntryChunksCh.
// For each chunk, a split is created at the start key of the next chunk. The
// current chunk is then scattered, and the chunk with its destination is
// passed to importSpanChunksCh.
g2 := ctxgroup.WithContext(ctx)
for worker := 0; worker < splitWorkers; worker++ {
for worker := 0; worker < chunkSplitAndScatterWorkers; worker++ {
worker := worker
g2.GoCtx(func(ctx context.Context) error {
// Chunks' leaseholders should be randomly placed throughout the
// cluster.
Expand All @@ -299,11 +350,11 @@ func runGenerativeSplitAndScatter(
if !importSpanChunk.splitKey.Equal(roachpb.Key{}) {
// Split at the start of the next chunk, to partition off a
// prefix of the space to scatter.
if err := scatterer.split(ctx, flowCtx.Codec(), importSpanChunk.splitKey); err != nil {
if err := chunkSplitAndScatterers[worker].split(ctx, flowCtx.Codec(), importSpanChunk.splitKey); err != nil {
return err
}
}
chunkDestination, err := scatterer.scatter(ctx, flowCtx.Codec(), scatterKey)
chunkDestination, err := chunkSplitAndScatterers[worker].scatter(ctx, flowCtx.Codec(), scatterKey)
if err != nil {
return err
}
Expand Down Expand Up @@ -335,15 +386,20 @@ func runGenerativeSplitAndScatter(
})
}

// This goroutine waits for the chunkSplitAndScatter workers to finish so that
// it can close importSpanChunksCh.
g.GoCtx(func(ctx context.Context) error {
defer close(importSpanChunksCh)
return g2.Wait()
})

// TODO(pbardea): This tries to cover for a bad scatter by having 2 * the
// number of nodes in the cluster. Is it necessary?
splitScatterWorkers := 2 * splitWorkers
for worker := 0; worker < splitScatterWorkers; worker++ {
// This group of goroutines takes chunks that have already been split and
// scattered by the previous worker group. These workers create splits at the
// start key of the span of every entry of every chunk. After a chunk has been
// processed, it is passed to doneScatterCh to signal that the chunk has gone
// through the entire split and scatter process.
for worker := 0; worker < len(chunkEntrySplitAndScatterers); worker++ {
worker := worker
g.GoCtx(func(ctx context.Context) error {
for importSpanChunk := range importSpanChunksCh {
chunkDestination := importSpanChunk.destination
Expand All @@ -355,7 +411,7 @@ func runGenerativeSplitAndScatter(
if nextChunkIdx < len(importSpanChunk.entries) {
// Split at the next entry.
splitKey = importSpanChunk.entries[nextChunkIdx].Span.Key
if err := scatterer.split(ctx, flowCtx.Codec(), splitKey); err != nil {
if err := chunkEntrySplitAndScatterers[worker].split(ctx, flowCtx.Codec(), splitKey); err != nil {
return err
}
}
Expand Down
22 changes: 14 additions & 8 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,9 @@ func restore(
}

targetSize := targetRestoreSpanSize.Get(&execCtx.ExecCfg().Settings.SV)
importSpanCh := make(chan execinfrapb.RestoreSpanEntry, 1000)
genSpan := func(ctx context.Context) error {
defer close(importSpanCh)
countSpansCh := make(chan execinfrapb.RestoreSpanEntry, 1000)
genSpan := func(ctx context.Context, spanCh chan execinfrapb.RestoreSpanEntry) error {
defer close(spanCh)
return generateAndSendImportSpans(
restoreCtx,
dataToRestore.getSpans(),
Expand All @@ -326,7 +326,7 @@ func restore(
introducedSpanFrontier,
highWaterMark,
targetSize,
importSpanCh,
spanCh,
simpleImportSpans,
)
}
Expand All @@ -336,17 +336,20 @@ func restore(
var countTasks []func(ctx context.Context) error
log.Infof(restoreCtx, "rh_debug: starting count task")
spanCountTask := func(ctx context.Context) error {
for range importSpanCh {
for range countSpansCh {
numImportSpans++
}
return nil
}
countTasks = append(countTasks, genSpan, spanCountTask)
countTasks = append(countTasks, spanCountTask)
countTasks = append(countTasks, func(ctx context.Context) error {
return genSpan(ctx, countSpansCh)
})
if err := ctxgroup.GoAndWait(restoreCtx, countTasks...); err != nil {
return emptyRowCount, errors.Wrapf(err, "counting number of import spans")
}

importSpanCh = make(chan execinfrapb.RestoreSpanEntry, 1000)
importSpanCh := make(chan execinfrapb.RestoreSpanEntry, 1000)
requestFinishedCh := make(chan struct{}, numImportSpans) // enough buffer to never block
// tasks are the concurrent tasks that are run during the restore.
var tasks []func(ctx context.Context) error
Expand Down Expand Up @@ -427,7 +430,10 @@ func restore(
}
return nil
}
tasks = append(tasks, generativeCheckpointLoop, genSpan)
tasks = append(tasks, generativeCheckpointLoop)
tasks = append(tasks, func(ctx context.Context) error {
return genSpan(ctx, importSpanCh)
})

runRestore := func(ctx context.Context) error {
return distRestore(
Expand Down