From 1d882cd1fff94e8b3a0276169a14801f3ea94adb Mon Sep 17 00:00:00 2001 From: adityamaru Date: Tue, 31 Jan 2023 15:25:42 -0500 Subject: [PATCH] backupccl: add missing context cancel checks to restore In #95257 we saw a restore grind to a halt 2 hours into a 5 hour roachtest. The stacks indicated that we may have seen a context cancellation that was not being respected by the goroutine running `generateAndSendImportSpans`. This resulted in the `generative_split_and_scatter_processor` getting stuck writing to a channel nobody was reading from (https://github.com/cockroachdb/cockroach/blob/master/pkg/ccl/backupccl/restore_span_covering.go#L516) since the other goroutines in the processor had seen the ctx cancellation and exited. A side effect of the generative processor not shutting down was that the downstream restore data processors would also hang on their call to `input.Next()` as they would not receive a row or a meta from the generative processor signalling them to shutdown. This fix adds a ctx cancellation check to the goroutine described above, thereby allowing a graceful teardown of the flow. This fix also adds the JobID to the generative processor spec so that logs on remote nodes are correctly tagged with the JobID making for easier debugging. Informs: #95257 Release note (bug fix): fixes a bug where a restore flow could hang indefinitely in the face of a context cancellation, manifesting as a stuck restore job. --- pkg/ccl/backupccl/restore_data_processor.go | 2 +- pkg/ccl/backupccl/restore_job.go | 14 ++++++++++---- pkg/ccl/backupccl/restore_processor_planning.go | 1 + pkg/ccl/backupccl/restore_span_covering.go | 17 ++++++++++++----- 4 files changed, 24 insertions(+), 10 deletions(-) diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index a099d68c7a07..9cb148ad4dd1 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -172,7 +172,7 @@ func (rd *restoreDataProcessor) Start(ctx context.Context) { _ = rd.phaseGroup.Wait() } rd.phaseGroup = ctxgroup.WithContext(ctx) - log.Infof(ctx, "starting restore data") + log.Infof(ctx, "starting restore data processor") entries := make(chan execinfrapb.RestoreSpanEntry, rd.numWorkers) rd.sstCh = make(chan mergedSST, rd.numWorkers) diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 2511d02eaf1e..f948c48aa886 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -318,7 +318,7 @@ func restore( genSpan := func(ctx context.Context) error { defer close(importSpanCh) return generateAndSendImportSpans( - restoreCtx, + ctx, dataToRestore.getSpans(), backupManifests, layerToBackupManifestFileIterFactory, @@ -334,7 +334,6 @@ func restore( // Count number of import spans. var numImportSpans int var countTasks []func(ctx context.Context) error - log.Infof(restoreCtx, "rh_debug: starting count task") spanCountTask := func(ctx context.Context) error { for range importSpanCh { numImportSpans++ @@ -394,7 +393,12 @@ func restore( if idx >= mu.ceiling { for i := mu.ceiling; i <= idx; i++ { - importSpan := <-importSpanCh + importSpan, ok := <-importSpanCh + if !ok { + // The channel has been closed, there is nothing left to do. + log.Infof(ctx, "exiting restore checkpoint loop as the import span channel has been closed") + return nil + } mu.inFlightImportSpans[i] = importSpan.Span } mu.ceiling = idx + 1 @@ -413,7 +417,6 @@ func restore( for j := mu.highWaterMark + 1; j < mu.ceiling && mu.requestsCompleted[j]; j++ { mu.highWaterMark = j } - for j := prevHighWater; j < mu.highWaterMark; j++ { delete(mu.requestsCompleted, j) delete(mu.inFlightImportSpans, j) @@ -1708,6 +1711,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro return err } } + log.Infof(ctx, "finished restoring the pre-data bundle") } if !preValidateData.isEmpty() { @@ -1728,6 +1732,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro } resTotal.Add(res) + log.Infof(ctx, "finished restoring the validate data bundle") } { // Restore the main data bundle. We notably only restore the system tables @@ -1749,6 +1754,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro } resTotal.Add(res) + log.Infof(ctx, "finished restoring the main data bundle") } if err := insertStats(ctx, r.job, p.ExecCfg(), remappedStats); err != nil { diff --git a/pkg/ccl/backupccl/restore_processor_planning.go b/pkg/ccl/backupccl/restore_processor_planning.go index 08ac4f069f45..fd68d1d5f477 100644 --- a/pkg/ccl/backupccl/restore_processor_planning.go +++ b/pkg/ccl/backupccl/restore_processor_planning.go @@ -188,6 +188,7 @@ func distRestore( NumEntries: int64(numImportSpans), NumNodes: int64(numNodes), UseSimpleImportSpans: useSimpleImportSpans, + JobID: jobID, } proc := physicalplan.Processor{ diff --git a/pkg/ccl/backupccl/restore_span_covering.go b/pkg/ccl/backupccl/restore_span_covering.go index b4abf18fa247..3d451eec28c4 100644 --- a/pkg/ccl/backupccl/restore_span_covering.go +++ b/pkg/ccl/backupccl/restore_span_covering.go @@ -497,7 +497,7 @@ func generateAndSendImportSpans( var covFilesByLayer [][]backuppb.BackupManifest_File var firstInSpan bool - flush := func() { + flush := func(ctx context.Context) error { entry := execinfrapb.RestoreSpanEntry{ Span: lastCovSpan, } @@ -513,8 +513,14 @@ func generateAndSendImportSpans( } if len(entry.Files) > 0 { - spanCh <- entry + select { + case <-ctx.Done(): + return ctx.Err() + case spanCh <- entry: + } } + + return nil } for _, span := range requiredSpans { @@ -630,7 +636,9 @@ func generateAndSendImportSpans( lastCovSpan.EndKey = coverSpan.EndKey lastCovSpanSize = lastCovSpanSize + newCovFilesSize } else { - flush() + if err := flush(ctx); err != nil { + return err + } lastCovSpan = coverSpan covFilesByLayer = filesByLayer lastCovSpanSize = covSize @@ -646,8 +654,7 @@ func generateAndSendImportSpans( } } - flush() - return nil + return flush(ctx) } // fileSpanStartAndEndKeyIterator yields (almost) all of the start and end keys