From faab41fdefb8790e349a3106622f1b35f49edb8f Mon Sep 17 00:00:00 2001 From: adityamaru Date: Tue, 31 Jan 2023 15:25:42 -0500 Subject: [PATCH] backupccl: add missing context cancel checks to restore In #95257 we saw a restore grind to a halt 2 hours into a 5 hour roachtest. The stacks indicated that we may have seen a context cancellation that was not being respected by the goroutine running `generateAndSendImportSpans`. This resulted in the `generative_split_and_scatter_processor` getting stuck writing to a channel nobody was reading from (https://github.com/cockroachdb/cockroach/blob/master/pkg/ccl/backupccl/restore_span_covering.go#L516) since the other goroutines in the processor had seen the ctx cancellation and exited. A side effect of the generative processor not shutting down was that the downstream restore data processors would also hang on their call to `input.Next()` as they would not receive a row or a meta from the generative processor signalling them to shutdown. This fix adds a ctx cancellation check to the goroutine described above, thereby allowing a graceful teardown of the flow. This fix also adds the JobID to the generative processor spec so that logs on remote nodes are correctly tagged with the JobID making for easier debugging. Informs: #95257 Release note (bug fix): fixes a bug where a restore flow could hang indefinitely in the face of a context cancellation, manifesting as a stuck restore job. --- pkg/ccl/backupccl/restore_data_processor.go | 2 +- pkg/ccl/backupccl/restore_job.go | 14 ++++++++++---- pkg/ccl/backupccl/restore_processor_planning.go | 1 + pkg/ccl/backupccl/restore_span_covering.go | 17 ++++++++++++----- 4 files changed, 24 insertions(+), 10 deletions(-) diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 95d000c37d17..b9e58045c5cd 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -171,7 +171,7 @@ func (rd *restoreDataProcessor) Start(ctx context.Context) { _ = rd.phaseGroup.Wait() } rd.phaseGroup = ctxgroup.WithContext(ctx) - log.Infof(ctx, "starting restore data") + log.Infof(ctx, "starting restore data processor") entries := make(chan execinfrapb.RestoreSpanEntry, rd.numWorkers) rd.sstCh = make(chan mergedSST, rd.numWorkers) diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index f7e8b66a74d9..232f0a45f063 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -315,7 +315,7 @@ func restore( genSpan := func(ctx context.Context, spanCh chan execinfrapb.RestoreSpanEntry) error { defer close(spanCh) return generateAndSendImportSpans( - restoreCtx, + ctx, dataToRestore.getSpans(), backupManifests, layerToBackupManifestFileIterFactory, @@ -331,7 +331,6 @@ func restore( // Count number of import spans. var numImportSpans int var countTasks []func(ctx context.Context) error - log.Infof(restoreCtx, "rh_debug: starting count task") spanCountTask := func(ctx context.Context) error { for range countSpansCh { numImportSpans++ @@ -394,7 +393,12 @@ func restore( if idx >= mu.ceiling { for i := mu.ceiling; i <= idx; i++ { - importSpan := <-importSpanCh + importSpan, ok := <-importSpanCh + if !ok { + // The channel has been closed, there is nothing left to do. + log.Infof(ctx, "exiting restore checkpoint loop as the import span channel has been closed") + return nil + } mu.inFlightImportSpans[i] = importSpan.Span } mu.ceiling = idx + 1 @@ -413,7 +417,6 @@ func restore( for j := mu.highWaterMark + 1; j < mu.ceiling && mu.requestsCompleted[j]; j++ { mu.highWaterMark = j } - for j := prevHighWater; j < mu.highWaterMark; j++ { delete(mu.requestsCompleted, j) delete(mu.inFlightImportSpans, j) @@ -1640,6 +1643,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro return err } } + log.Infof(ctx, "finished restoring the pre-data bundle") } if !preValidateData.isEmpty() { @@ -1660,6 +1664,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro } resTotal.Add(res) + log.Infof(ctx, "finished restoring the validate data bundle") } { // Restore the main data bundle. We notably only restore the system tables @@ -1681,6 +1686,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro } resTotal.Add(res) + log.Infof(ctx, "finished restoring the main data bundle") } if err := insertStats(ctx, r.job, p.ExecCfg(), remappedStats); err != nil { diff --git a/pkg/ccl/backupccl/restore_processor_planning.go b/pkg/ccl/backupccl/restore_processor_planning.go index 6c21540ec28d..ef0587925e85 100644 --- a/pkg/ccl/backupccl/restore_processor_planning.go +++ b/pkg/ccl/backupccl/restore_processor_planning.go @@ -188,6 +188,7 @@ func distRestore( NumEntries: int64(numImportSpans), NumNodes: int64(numNodes), UseSimpleImportSpans: useSimpleImportSpans, + JobID: jobID, } proc := physicalplan.Processor{ diff --git a/pkg/ccl/backupccl/restore_span_covering.go b/pkg/ccl/backupccl/restore_span_covering.go index 1a3fc2b98a35..cdd4057ba9b5 100644 --- a/pkg/ccl/backupccl/restore_span_covering.go +++ b/pkg/ccl/backupccl/restore_span_covering.go @@ -496,7 +496,7 @@ func generateAndSendImportSpans( var covFilesByLayer [][]backuppb.BackupManifest_File var firstInSpan bool - flush := func() { + flush := func(ctx context.Context) error { entry := execinfrapb.RestoreSpanEntry{ Span: lastCovSpan, } @@ -512,8 +512,14 @@ func generateAndSendImportSpans( } if len(entry.Files) > 0 { - spanCh <- entry + select { + case <-ctx.Done(): + return ctx.Err() + case spanCh <- entry: + } } + + return nil } for _, span := range requiredSpans { @@ -629,7 +635,9 @@ func generateAndSendImportSpans( lastCovSpan.EndKey = coverSpan.EndKey lastCovSpanSize = lastCovSpanSize + newCovFilesSize } else { - flush() + if err := flush(ctx); err != nil { + return err + } lastCovSpan = coverSpan covFilesByLayer = filesByLayer lastCovSpanSize = covSize @@ -645,8 +653,7 @@ func generateAndSendImportSpans( } } - flush() - return nil + return flush(ctx) } // fileSpanStartAndEndKeyIterator yields (almost) all of the start and end keys