diff --git a/pkg/crosscluster/logical/offline_initial_scan_processor.go b/pkg/crosscluster/logical/offline_initial_scan_processor.go index db5c5587f594..c7489d45d118 100644 --- a/pkg/crosscluster/logical/offline_initial_scan_processor.go +++ b/pkg/crosscluster/logical/offline_initial_scan_processor.go @@ -18,7 +18,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/bulk" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -38,15 +37,6 @@ import ( "github.com/cockroachdb/logtags" ) -// TODO(msbutler): we can remove this setting once we eagerly spin down the proc -// when the initial scan completes. -var minimumFlushInterval = settings.RegisterDurationSettingWithExplicitUnit( - settings.SystemOnly, - "logical_replication.offline_scan_flush_interval", - "the minimum timestamp between flushes; flushes may still occur if internal buffers fill up", - 5*time.Second, -) - // offlineInitialScanProcessor consumes a cross-cluster replication stream // by decoding kvs in it to logical changes and applying them by executing DMLs. type offlineInitialScanProcessor struct { @@ -73,12 +63,18 @@ type offlineInitialScanProcessor struct { errCh chan error - checkpointCh chan []jobspb.ResolvedSpan + checkpointCh chan offlineCheckpoint rekey *backup.KeyRewriter - batcher *bulk.SSTBatcher - lastKeyAdded roachpb.Key + batcher *bulk.SSTBatcher + lastKeyAdded roachpb.Key + initialScanCompleted bool +} + +type offlineCheckpoint struct { + resolvedSpans []jobspb.ResolvedSpan + afterInitialScanCompletion bool } var ( @@ -107,7 +103,7 @@ func newNewOfflineInitialScanProcessor( spec: spec, processorID: processorID, stopCh: make(chan struct{}), - checkpointCh: make(chan []jobspb.ResolvedSpan), + checkpointCh: make(chan offlineCheckpoint), errCh: make(chan error, 1), rekey: rekeyer, lastKeyAdded: roachpb.Key{}, @@ -244,19 +240,9 @@ func (o *offlineInitialScanProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.P } select { - case resolved, ok := <-o.checkpointCh: - if ok { - progressUpdate := &jobspb.ResolvedSpans{ResolvedSpans: resolved} - progressBytes, err := protoutil.Marshal(progressUpdate) - if err != nil { - o.MoveToDrainingAndLogError(err) - return nil, o.DrainHelper() - } - row := rowenc.EncDatumRow{ - rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(progressBytes))), - } - return row, nil - } else { + case checkpoint, ok := <-o.checkpointCh: + switch { + case !ok: select { case err := <-o.errCh: o.MoveToDrainingAndLogError(err) @@ -267,6 +253,23 @@ func (o *offlineInitialScanProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.P o.MoveToDrainingAndLogError(nil /* error */) return nil, o.DrainHelper() } + case checkpoint.afterInitialScanCompletion: + // The previous checkpoint completed the initial scan and was already + // ingested by the coordinator, so we can gracefully shut down the + // processor. + o.MoveToDraining(nil) + return nil, o.DrainHelper() + default: + progressUpdate := &jobspb.ResolvedSpans{ResolvedSpans: checkpoint.resolvedSpans} + progressBytes, err := protoutil.Marshal(progressUpdate) + if err != nil { + o.MoveToDrainingAndLogError(err) + return nil, o.DrainHelper() + } + row := rowenc.EncDatumRow{ + rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(progressBytes))), + } + return row, nil } case err := <-o.errCh: o.MoveToDrainingAndLogError(err) @@ -360,20 +363,17 @@ func (o *offlineInitialScanProcessor) checkpoint( return errors.New("checkpoint event expected to have resolved spans") } + completedInitialScan := true for i := range resolvedSpans { + if resolvedSpans[i].Timestamp.IsEmpty() { + completedInitialScan = false + } if o.spec.InitialScanTimestamp.Less(resolvedSpans[i].Timestamp) { // Elide all checkpoints after initial scan timestamp. resolvedSpans[i].Timestamp = o.spec.InitialScanTimestamp } } - minFlushInterval := minimumFlushInterval.Get(&o.FlowCtx.Cfg.Settings.SV) - if timeutil.Since(o.lastFlushTime) < minFlushInterval { - // To prevent checkpoint overload after event stream finishes initial scan, - // pace these noop checkpoints. This is a temporary measure until the - // processor eagerly spins down after initial scan. - return nil - } log.Infof(ctx, "flushing batcher on checkpoint") if err := o.flushBatch(ctx); err != nil { return errors.Wrap(err, "flushing batcher on checkpoint") @@ -381,7 +381,10 @@ func (o *offlineInitialScanProcessor) checkpoint( o.batcher.Reset(ctx) select { - case o.checkpointCh <- resolvedSpans: + case o.checkpointCh <- offlineCheckpoint{ + resolvedSpans: resolvedSpans, + afterInitialScanCompletion: o.initialScanCompleted, + }: case <-ctx.Done(): return ctx.Err() case <-o.stopCh: @@ -392,6 +395,14 @@ func (o *offlineInitialScanProcessor) checkpoint( return nil } o.lastFlushTime = timeutil.Now() + if completedInitialScan { + // Modify the processor state _after_ Next() has received the first + // checkpoint that completes the initial scan. This ensures Next()'s + // caller receives this checkpoint. Then, on the next checkpoint, + // `afterInitialScanCompletion` will be set, causing Next to gracefully + // shutdown the processor. + o.initialScanCompleted = true + } return nil }