diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index a099d68c7a07..9cb148ad4dd1 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -172,7 +172,7 @@ func (rd *restoreDataProcessor) Start(ctx context.Context) { _ = rd.phaseGroup.Wait() } rd.phaseGroup = ctxgroup.WithContext(ctx) - log.Infof(ctx, "starting restore data") + log.Infof(ctx, "starting restore data processor") entries := make(chan execinfrapb.RestoreSpanEntry, rd.numWorkers) rd.sstCh = make(chan mergedSST, rd.numWorkers) diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 2511d02eaf1e..f948c48aa886 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -318,7 +318,7 @@ func restore( genSpan := func(ctx context.Context) error { defer close(importSpanCh) return generateAndSendImportSpans( - restoreCtx, + ctx, dataToRestore.getSpans(), backupManifests, layerToBackupManifestFileIterFactory, @@ -334,7 +334,6 @@ func restore( // Count number of import spans. var numImportSpans int var countTasks []func(ctx context.Context) error - log.Infof(restoreCtx, "rh_debug: starting count task") spanCountTask := func(ctx context.Context) error { for range importSpanCh { numImportSpans++ @@ -394,7 +393,12 @@ func restore( if idx >= mu.ceiling { for i := mu.ceiling; i <= idx; i++ { - importSpan := <-importSpanCh + importSpan, ok := <-importSpanCh + if !ok { + // The channel has been closed, there is nothing left to do. + log.Infof(ctx, "exiting restore checkpoint loop as the import span channel has been closed") + return nil + } mu.inFlightImportSpans[i] = importSpan.Span } mu.ceiling = idx + 1 @@ -413,7 +417,6 @@ func restore( for j := mu.highWaterMark + 1; j < mu.ceiling && mu.requestsCompleted[j]; j++ { mu.highWaterMark = j } - for j := prevHighWater; j < mu.highWaterMark; j++ { delete(mu.requestsCompleted, j) delete(mu.inFlightImportSpans, j) @@ -1708,6 +1711,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro return err } } + log.Infof(ctx, "finished restoring the pre-data bundle") } if !preValidateData.isEmpty() { @@ -1728,6 +1732,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro } resTotal.Add(res) + log.Infof(ctx, "finished restoring the validate data bundle") } { // Restore the main data bundle. We notably only restore the system tables @@ -1749,6 +1754,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro } resTotal.Add(res) + log.Infof(ctx, "finished restoring the main data bundle") } if err := insertStats(ctx, r.job, p.ExecCfg(), remappedStats); err != nil { diff --git a/pkg/ccl/backupccl/restore_processor_planning.go b/pkg/ccl/backupccl/restore_processor_planning.go index 08ac4f069f45..fd68d1d5f477 100644 --- a/pkg/ccl/backupccl/restore_processor_planning.go +++ b/pkg/ccl/backupccl/restore_processor_planning.go @@ -188,6 +188,7 @@ func distRestore( NumEntries: int64(numImportSpans), NumNodes: int64(numNodes), UseSimpleImportSpans: useSimpleImportSpans, + JobID: jobID, } proc := physicalplan.Processor{ diff --git a/pkg/ccl/backupccl/restore_span_covering.go b/pkg/ccl/backupccl/restore_span_covering.go index b4abf18fa247..3d451eec28c4 100644 --- a/pkg/ccl/backupccl/restore_span_covering.go +++ b/pkg/ccl/backupccl/restore_span_covering.go @@ -497,7 +497,7 @@ func generateAndSendImportSpans( var covFilesByLayer [][]backuppb.BackupManifest_File var firstInSpan bool - flush := func() { + flush := func(ctx context.Context) error { entry := execinfrapb.RestoreSpanEntry{ Span: lastCovSpan, } @@ -513,8 +513,14 @@ func generateAndSendImportSpans( } if len(entry.Files) > 0 { - spanCh <- entry + select { + case <-ctx.Done(): + return ctx.Err() + case spanCh <- entry: + } } + + return nil } for _, span := range requiredSpans { @@ -630,7 +636,9 @@ func generateAndSendImportSpans( lastCovSpan.EndKey = coverSpan.EndKey lastCovSpanSize = lastCovSpanSize + newCovFilesSize } else { - flush() + if err := flush(ctx); err != nil { + return err + } lastCovSpan = coverSpan covFilesByLayer = filesByLayer lastCovSpanSize = covSize @@ -646,8 +654,7 @@ func generateAndSendImportSpans( } } - flush() - return nil + return flush(ctx) } // fileSpanStartAndEndKeyIterator yields (almost) all of the start and end keys