Skip to content

Commit

Permalink
backupccl: add missing context cancel checks to restore
Browse files Browse the repository at this point in the history
In cockroachdb#95257 we saw a restore grind to a halt 2 hours into a
5 hour roachtest. The stacks indicated that we may have seen
a context cancellation that was not being respected by the
goroutine running `generateAndSendImportSpans`. This resulted
in the `generative_split_and_scatter_processor` getting stuck writing
to a channel nobody was reading from
(https://github.com/cockroachdb/cockroach/blob/master/pkg/ccl/backupccl/restore_span_covering.go#L516)
since the other goroutines
in the processor had seen the ctx cancellation and exited.
A side effect of the generative processor not shutting down
was that the downstream restore data processors would also
hang on their call to `input.Next()` as they would not receive
a row or a meta from the generative processor signalling them
to shutdown. This fix adds a ctx cancellation check to the
goroutine described above, thereby allowing a
graceful teardown of the flow.

This fix also adds the JobID to the generative processor spec
so that logs on remote nodes are correctly tagged with the JobID
making for easier debugging.

Informs: cockroachdb#95257

Release note (bug fix): fixes a bug where a restore flow could
hang indefinitely in the face of a context cancellation, manifesting
as a stuck restore job.
  • Loading branch information
adityamaru authored and Rui Hu committed Feb 15, 2023
1 parent 33c7ba5 commit faab41f
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 10 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (rd *restoreDataProcessor) Start(ctx context.Context) {
_ = rd.phaseGroup.Wait()
}
rd.phaseGroup = ctxgroup.WithContext(ctx)
log.Infof(ctx, "starting restore data")
log.Infof(ctx, "starting restore data processor")

entries := make(chan execinfrapb.RestoreSpanEntry, rd.numWorkers)
rd.sstCh = make(chan mergedSST, rd.numWorkers)
Expand Down
14 changes: 10 additions & 4 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func restore(
genSpan := func(ctx context.Context, spanCh chan execinfrapb.RestoreSpanEntry) error {
defer close(spanCh)
return generateAndSendImportSpans(
restoreCtx,
ctx,
dataToRestore.getSpans(),
backupManifests,
layerToBackupManifestFileIterFactory,
Expand All @@ -331,7 +331,6 @@ func restore(
// Count number of import spans.
var numImportSpans int
var countTasks []func(ctx context.Context) error
log.Infof(restoreCtx, "rh_debug: starting count task")
spanCountTask := func(ctx context.Context) error {
for range countSpansCh {
numImportSpans++
Expand Down Expand Up @@ -394,7 +393,12 @@ func restore(

if idx >= mu.ceiling {
for i := mu.ceiling; i <= idx; i++ {
importSpan := <-importSpanCh
importSpan, ok := <-importSpanCh
if !ok {
// The channel has been closed, there is nothing left to do.
log.Infof(ctx, "exiting restore checkpoint loop as the import span channel has been closed")
return nil
}
mu.inFlightImportSpans[i] = importSpan.Span
}
mu.ceiling = idx + 1
Expand All @@ -413,7 +417,6 @@ func restore(
for j := mu.highWaterMark + 1; j < mu.ceiling && mu.requestsCompleted[j]; j++ {
mu.highWaterMark = j
}

for j := prevHighWater; j < mu.highWaterMark; j++ {
delete(mu.requestsCompleted, j)
delete(mu.inFlightImportSpans, j)
Expand Down Expand Up @@ -1640,6 +1643,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
return err
}
}
log.Infof(ctx, "finished restoring the pre-data bundle")
}

if !preValidateData.isEmpty() {
Expand All @@ -1660,6 +1664,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
}

resTotal.Add(res)
log.Infof(ctx, "finished restoring the validate data bundle")
}
{
// Restore the main data bundle. We notably only restore the system tables
Expand All @@ -1681,6 +1686,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
}

resTotal.Add(res)
log.Infof(ctx, "finished restoring the main data bundle")
}

if err := insertStats(ctx, r.job, p.ExecCfg(), remappedStats); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/restore_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ func distRestore(
NumEntries: int64(numImportSpans),
NumNodes: int64(numNodes),
UseSimpleImportSpans: useSimpleImportSpans,
JobID: jobID,
}

proc := physicalplan.Processor{
Expand Down
17 changes: 12 additions & 5 deletions pkg/ccl/backupccl/restore_span_covering.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ func generateAndSendImportSpans(
var covFilesByLayer [][]backuppb.BackupManifest_File
var firstInSpan bool

flush := func() {
flush := func(ctx context.Context) error {
entry := execinfrapb.RestoreSpanEntry{
Span: lastCovSpan,
}
Expand All @@ -512,8 +512,14 @@ func generateAndSendImportSpans(
}

if len(entry.Files) > 0 {
spanCh <- entry
select {
case <-ctx.Done():
return ctx.Err()
case spanCh <- entry:
}
}

return nil
}

for _, span := range requiredSpans {
Expand Down Expand Up @@ -629,7 +635,9 @@ func generateAndSendImportSpans(
lastCovSpan.EndKey = coverSpan.EndKey
lastCovSpanSize = lastCovSpanSize + newCovFilesSize
} else {
flush()
if err := flush(ctx); err != nil {
return err
}
lastCovSpan = coverSpan
covFilesByLayer = filesByLayer
lastCovSpanSize = covSize
Expand All @@ -645,8 +653,7 @@ func generateAndSendImportSpans(
}
}

flush()
return nil
return flush(ctx)
}

// fileSpanStartAndEndKeyIterator yields (almost) all of the start and end keys
Expand Down

0 comments on commit faab41f

Please sign in to comment.