diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 95d000c37d17..b9e58045c5cd 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -171,7 +171,7 @@ func (rd *restoreDataProcessor) Start(ctx context.Context) { _ = rd.phaseGroup.Wait() } rd.phaseGroup = ctxgroup.WithContext(ctx) - log.Infof(ctx, "starting restore data") + log.Infof(ctx, "starting restore data processor") entries := make(chan execinfrapb.RestoreSpanEntry, rd.numWorkers) rd.sstCh = make(chan mergedSST, rd.numWorkers) diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index f7e8b66a74d9..232f0a45f063 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -315,7 +315,7 @@ func restore( genSpan := func(ctx context.Context, spanCh chan execinfrapb.RestoreSpanEntry) error { defer close(spanCh) return generateAndSendImportSpans( - restoreCtx, + ctx, dataToRestore.getSpans(), backupManifests, layerToBackupManifestFileIterFactory, @@ -331,7 +331,6 @@ func restore( // Count number of import spans. var numImportSpans int var countTasks []func(ctx context.Context) error - log.Infof(restoreCtx, "rh_debug: starting count task") spanCountTask := func(ctx context.Context) error { for range countSpansCh { numImportSpans++ @@ -394,7 +393,12 @@ func restore( if idx >= mu.ceiling { for i := mu.ceiling; i <= idx; i++ { - importSpan := <-importSpanCh + importSpan, ok := <-importSpanCh + if !ok { + // The channel has been closed, there is nothing left to do. + log.Infof(ctx, "exiting restore checkpoint loop as the import span channel has been closed") + return nil + } mu.inFlightImportSpans[i] = importSpan.Span } mu.ceiling = idx + 1 @@ -413,7 +417,6 @@ func restore( for j := mu.highWaterMark + 1; j < mu.ceiling && mu.requestsCompleted[j]; j++ { mu.highWaterMark = j } - for j := prevHighWater; j < mu.highWaterMark; j++ { delete(mu.requestsCompleted, j) delete(mu.inFlightImportSpans, j) @@ -1640,6 +1643,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro return err } } + log.Infof(ctx, "finished restoring the pre-data bundle") } if !preValidateData.isEmpty() { @@ -1660,6 +1664,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro } resTotal.Add(res) + log.Infof(ctx, "finished restoring the validate data bundle") } { // Restore the main data bundle. We notably only restore the system tables @@ -1681,6 +1686,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro } resTotal.Add(res) + log.Infof(ctx, "finished restoring the main data bundle") } if err := insertStats(ctx, r.job, p.ExecCfg(), remappedStats); err != nil { diff --git a/pkg/ccl/backupccl/restore_processor_planning.go b/pkg/ccl/backupccl/restore_processor_planning.go index 6c21540ec28d..ef0587925e85 100644 --- a/pkg/ccl/backupccl/restore_processor_planning.go +++ b/pkg/ccl/backupccl/restore_processor_planning.go @@ -188,6 +188,7 @@ func distRestore( NumEntries: int64(numImportSpans), NumNodes: int64(numNodes), UseSimpleImportSpans: useSimpleImportSpans, + JobID: jobID, } proc := physicalplan.Processor{ diff --git a/pkg/ccl/backupccl/restore_span_covering.go b/pkg/ccl/backupccl/restore_span_covering.go index 1a3fc2b98a35..cdd4057ba9b5 100644 --- a/pkg/ccl/backupccl/restore_span_covering.go +++ b/pkg/ccl/backupccl/restore_span_covering.go @@ -496,7 +496,7 @@ func generateAndSendImportSpans( var covFilesByLayer [][]backuppb.BackupManifest_File var firstInSpan bool - flush := func() { + flush := func(ctx context.Context) error { entry := execinfrapb.RestoreSpanEntry{ Span: lastCovSpan, } @@ -512,8 +512,14 @@ func generateAndSendImportSpans( } if len(entry.Files) > 0 { - spanCh <- entry + select { + case <-ctx.Done(): + return ctx.Err() + case spanCh <- entry: + } } + + return nil } for _, span := range requiredSpans { @@ -629,7 +635,9 @@ func generateAndSendImportSpans( lastCovSpan.EndKey = coverSpan.EndKey lastCovSpanSize = lastCovSpanSize + newCovFilesSize } else { - flush() + if err := flush(ctx); err != nil { + return err + } lastCovSpan = coverSpan covFilesByLayer = filesByLayer lastCovSpanSize = covSize @@ -645,8 +653,7 @@ func generateAndSendImportSpans( } } - flush() - return nil + return flush(ctx) } // fileSpanStartAndEndKeyIterator yields (almost) all of the start and end keys