Skip to content

Commit

Permalink
backupccl: add missing context cancel checks to restore
Browse files Browse the repository at this point in the history
In cockroachdb#95257 we saw a restore grind to a halt 2 hours into a
5 hour roachtest. The stacks indicated that we may have seen
a context cancellation that was not being respected by the
goroutine running `generateAndSendImportSpans`. This resulted
in the `generative_split_and_scatter_processor` getting stuck writing
to a channel nobody was reading from
(https://github.com/cockroachdb/cockroach/blob/master/pkg/ccl/backupccl/restore_span_covering.go#L516)
since the other goroutines
in the processor had seen the ctx cancellation and exited.
A side effect of the generative processor not shutting down
was that the downstream restore data processors would also
hang on their call to `input.Next()` as they would not receive
a row or a meta from the generative processor signalling them
to shutdown. This fix adds a ctx cancellation check to the
goroutine described above, thereby allowing a
graceful teardown of the flow.

Informs: cockroachdb#95257

Release note (bug fix): fixes a bug where a restore flow could
hang indefinitely in the face of a context cancellation, manifesting
as a stuck restore job.
  • Loading branch information
adityamaru committed Jan 31, 2023
1 parent 91bdcdd commit 7da704c
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 44 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (rd *restoreDataProcessor) Start(ctx context.Context) {
_ = rd.phaseGroup.Wait()
}
rd.phaseGroup = ctxgroup.WithContext(ctx)
log.Infof(ctx, "starting restore data")
log.Infof(ctx, "starting restore data processor")

entries := make(chan execinfrapb.RestoreSpanEntry, rd.numWorkers)
rd.sstCh = make(chan mergedSST, rd.numWorkers)
Expand Down
89 changes: 51 additions & 38 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func restore(
genSpan := func(ctx context.Context) error {
defer close(importSpanCh)
return generateAndSendImportSpans(
restoreCtx,
ctx,
dataToRestore.getSpans(),
backupManifests,
layerToBackupManifestFileIterFactory,
Expand All @@ -334,7 +334,6 @@ func restore(
// Count number of import spans.
var numImportSpans int
var countTasks []func(ctx context.Context) error
log.Infof(restoreCtx, "rh_debug: starting count task")
spanCountTask := func(ctx context.Context) error {
for range importSpanCh {
numImportSpans++
Expand Down Expand Up @@ -382,50 +381,61 @@ func restore(

generativeCheckpointLoop := func(ctx context.Context) error {
defer close(requestFinishedCh)
for progress := range progCh {
mu.Lock()
var progDetails backuppb.RestoreProgress
if err := pbtypes.UnmarshalAny(&progress.ProgressDetails, &progDetails); err != nil {
log.Errorf(ctx, "unable to unmarshal restore progress details: %+v", err)
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case progress, ok := <-progCh:
if !ok {
return nil
}
mu.Lock()
var progDetails backuppb.RestoreProgress
if err := pbtypes.UnmarshalAny(&progress.ProgressDetails, &progDetails); err != nil {
log.Errorf(ctx, "unable to unmarshal restore progress details: %+v", err)
}

mu.res.Add(progDetails.Summary)
idx := progDetails.ProgressIdx
mu.res.Add(progDetails.Summary)
idx := progDetails.ProgressIdx

if idx >= mu.ceiling {
for i := mu.ceiling; i <= idx; i++ {
importSpan := <-importSpanCh
mu.inFlightImportSpans[i] = importSpan.Span
if idx >= mu.ceiling {
for i := mu.ceiling; i <= idx; i++ {
importSpan, ok := <-importSpanCh
if !ok {
// Our channel was closed, nothing left to do.
return nil
}
mu.inFlightImportSpans[i] = importSpan.Span
}
mu.ceiling = idx + 1
}
mu.ceiling = idx + 1
}

if sp, ok := mu.inFlightImportSpans[idx]; ok {
// Assert that we're actually marking the correct span done. See #23977.
if !sp.Key.Equal(progDetails.DataSpan.Key) {
mu.Unlock()
return errors.Newf("request %d for span %v does not match import span for same idx: %v",
idx, progDetails.DataSpan, sp,
)
}
mu.requestsCompleted[idx] = true
prevHighWater := mu.highWaterMark
for j := mu.highWaterMark + 1; j < mu.ceiling && mu.requestsCompleted[j]; j++ {
mu.highWaterMark = j
}
if sp, ok := mu.inFlightImportSpans[idx]; ok {
// Assert that we're actually marking the correct span done. See #23977.
if !sp.Key.Equal(progDetails.DataSpan.Key) {
mu.Unlock()
return errors.Newf("request %d for span %v does not match import span for same idx: %v",
idx, progDetails.DataSpan, sp,
)
}
mu.requestsCompleted[idx] = true
prevHighWater := mu.highWaterMark
for j := mu.highWaterMark + 1; j < mu.ceiling && mu.requestsCompleted[j]; j++ {
mu.highWaterMark = j
}

for j := prevHighWater; j < mu.highWaterMark; j++ {
delete(mu.requestsCompleted, j)
delete(mu.inFlightImportSpans, j)
for j := prevHighWater; j < mu.highWaterMark; j++ {
delete(mu.requestsCompleted, j)
delete(mu.inFlightImportSpans, j)
}
}
}
mu.Unlock()
mu.Unlock()

// Signal that the processor has finished importing a span, to update job
// progress.
requestFinishedCh <- struct{}{}
// Signal that the processor has finished importing a span, to update job
// progress.
requestFinishedCh <- struct{}{}
}
}
return nil
}
tasks = append(tasks, generativeCheckpointLoop, genSpan)

Expand Down Expand Up @@ -1708,6 +1718,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
return err
}
}
log.Infof(ctx, "finished restoring the pre-data bundle")
}

if !preValidateData.isEmpty() {
Expand All @@ -1728,6 +1739,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
}

resTotal.Add(res)
log.Infof(ctx, "finished restoring the validate data bundle")
}
{
// Restore the main data bundle. We notably only restore the system tables
Expand All @@ -1749,6 +1761,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
}

resTotal.Add(res)
log.Infof(ctx, "finished restoring the main data bundle")
}

if err := insertStats(ctx, r.job, p.ExecCfg(), remappedStats); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/restore_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ func distRestore(
NumEntries: int64(numImportSpans),
NumNodes: int64(numNodes),
UseSimpleImportSpans: useSimpleImportSpans,
JobID: jobID,
}

proc := physicalplan.Processor{
Expand Down
17 changes: 12 additions & 5 deletions pkg/ccl/backupccl/restore_span_covering.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ func generateAndSendImportSpans(
var covFilesByLayer [][]backuppb.BackupManifest_File
var firstInSpan bool

flush := func() {
flush := func(ctx context.Context) error {
entry := execinfrapb.RestoreSpanEntry{
Span: lastCovSpan,
}
Expand All @@ -513,8 +513,14 @@ func generateAndSendImportSpans(
}

if len(entry.Files) > 0 {
spanCh <- entry
select {
case <-ctx.Done():
return ctx.Err()
case spanCh <- entry:
}
}

return nil
}

for _, span := range requiredSpans {
Expand Down Expand Up @@ -630,7 +636,9 @@ func generateAndSendImportSpans(
lastCovSpan.EndKey = coverSpan.EndKey
lastCovSpanSize = lastCovSpanSize + newCovFilesSize
} else {
flush()
if err := flush(ctx); err != nil {
return err
}
lastCovSpan = coverSpan
covFilesByLayer = filesByLayer
lastCovSpanSize = covSize
Expand All @@ -646,8 +654,7 @@ func generateAndSendImportSpans(
}
}

flush()
return nil
return flush(ctx)
}

// fileSpanStartAndEndKeyIterator yields (almost) all of the start and end keys
Expand Down

0 comments on commit 7da704c

Please sign in to comment.