From 9cc5ced0d1c0cdeb2825345b2ed2ca7e4d5d5ac2 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Tue, 28 Jan 2025 18:28:52 +0000 Subject: [PATCH] crosscluster/logical: reduce wasted work after offline scan completion Previously, if an offline scan processor ingested the initial scan, it would keep pulling for more data from the rangefeed until the job coordinator shut it down, once the coordinator persisted the global resolved timestamp. This could lead to signifcant wasted rangefeed and data transfer work, if the offline scan processors didn't all complete their scan at the same time. With this patch, the offline scan processor will shut itself down once Next is called on the first checkpoint _after_ the checkpoint with a resolved timestamp greater than the initial scan. This policy guarantees that the job coordinator will persist the initial scan progress before the processor shuts itself down. Epic: none Release note: none --- .../logical/offline_initial_scan_processor.go | 81 +++++++++++-------- 1 file changed, 46 insertions(+), 35 deletions(-) diff --git a/pkg/crosscluster/logical/offline_initial_scan_processor.go b/pkg/crosscluster/logical/offline_initial_scan_processor.go index db5c5587f594..c7489d45d118 100644 --- a/pkg/crosscluster/logical/offline_initial_scan_processor.go +++ b/pkg/crosscluster/logical/offline_initial_scan_processor.go @@ -18,7 +18,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/bulk" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -38,15 +37,6 @@ import ( "github.com/cockroachdb/logtags" ) -// TODO(msbutler): we can remove this setting once we eagerly spin down the proc -// when the initial scan completes. -var minimumFlushInterval = settings.RegisterDurationSettingWithExplicitUnit( - settings.SystemOnly, - "logical_replication.offline_scan_flush_interval", - "the minimum timestamp between flushes; flushes may still occur if internal buffers fill up", - 5*time.Second, -) - // offlineInitialScanProcessor consumes a cross-cluster replication stream // by decoding kvs in it to logical changes and applying them by executing DMLs. type offlineInitialScanProcessor struct { @@ -73,12 +63,18 @@ type offlineInitialScanProcessor struct { errCh chan error - checkpointCh chan []jobspb.ResolvedSpan + checkpointCh chan offlineCheckpoint rekey *backup.KeyRewriter - batcher *bulk.SSTBatcher - lastKeyAdded roachpb.Key + batcher *bulk.SSTBatcher + lastKeyAdded roachpb.Key + initialScanCompleted bool +} + +type offlineCheckpoint struct { + resolvedSpans []jobspb.ResolvedSpan + afterInitialScanCompletion bool } var ( @@ -107,7 +103,7 @@ func newNewOfflineInitialScanProcessor( spec: spec, processorID: processorID, stopCh: make(chan struct{}), - checkpointCh: make(chan []jobspb.ResolvedSpan), + checkpointCh: make(chan offlineCheckpoint), errCh: make(chan error, 1), rekey: rekeyer, lastKeyAdded: roachpb.Key{}, @@ -244,19 +240,9 @@ func (o *offlineInitialScanProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.P } select { - case resolved, ok := <-o.checkpointCh: - if ok { - progressUpdate := &jobspb.ResolvedSpans{ResolvedSpans: resolved} - progressBytes, err := protoutil.Marshal(progressUpdate) - if err != nil { - o.MoveToDrainingAndLogError(err) - return nil, o.DrainHelper() - } - row := rowenc.EncDatumRow{ - rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(progressBytes))), - } - return row, nil - } else { + case checkpoint, ok := <-o.checkpointCh: + switch { + case !ok: select { case err := <-o.errCh: o.MoveToDrainingAndLogError(err) @@ -267,6 +253,23 @@ func (o *offlineInitialScanProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.P o.MoveToDrainingAndLogError(nil /* error */) return nil, o.DrainHelper() } + case checkpoint.afterInitialScanCompletion: + // The previous checkpoint completed the initial scan and was already + // ingested by the coordinator, so we can gracefully shut down the + // processor. + o.MoveToDraining(nil) + return nil, o.DrainHelper() + default: + progressUpdate := &jobspb.ResolvedSpans{ResolvedSpans: checkpoint.resolvedSpans} + progressBytes, err := protoutil.Marshal(progressUpdate) + if err != nil { + o.MoveToDrainingAndLogError(err) + return nil, o.DrainHelper() + } + row := rowenc.EncDatumRow{ + rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(progressBytes))), + } + return row, nil } case err := <-o.errCh: o.MoveToDrainingAndLogError(err) @@ -360,20 +363,17 @@ func (o *offlineInitialScanProcessor) checkpoint( return errors.New("checkpoint event expected to have resolved spans") } + completedInitialScan := true for i := range resolvedSpans { + if resolvedSpans[i].Timestamp.IsEmpty() { + completedInitialScan = false + } if o.spec.InitialScanTimestamp.Less(resolvedSpans[i].Timestamp) { // Elide all checkpoints after initial scan timestamp. resolvedSpans[i].Timestamp = o.spec.InitialScanTimestamp } } - minFlushInterval := minimumFlushInterval.Get(&o.FlowCtx.Cfg.Settings.SV) - if timeutil.Since(o.lastFlushTime) < minFlushInterval { - // To prevent checkpoint overload after event stream finishes initial scan, - // pace these noop checkpoints. This is a temporary measure until the - // processor eagerly spins down after initial scan. - return nil - } log.Infof(ctx, "flushing batcher on checkpoint") if err := o.flushBatch(ctx); err != nil { return errors.Wrap(err, "flushing batcher on checkpoint") @@ -381,7 +381,10 @@ func (o *offlineInitialScanProcessor) checkpoint( o.batcher.Reset(ctx) select { - case o.checkpointCh <- resolvedSpans: + case o.checkpointCh <- offlineCheckpoint{ + resolvedSpans: resolvedSpans, + afterInitialScanCompletion: o.initialScanCompleted, + }: case <-ctx.Done(): return ctx.Err() case <-o.stopCh: @@ -392,6 +395,14 @@ func (o *offlineInitialScanProcessor) checkpoint( return nil } o.lastFlushTime = timeutil.Now() + if completedInitialScan { + // Modify the processor state _after_ Next() has received the first + // checkpoint that completes the initial scan. This ensures Next()'s + // caller receives this checkpoint. Then, on the next checkpoint, + // `afterInitialScanCompletion` will be set, causing Next to gracefully + // shutdown the processor. + o.initialScanCompleted = true + } return nil }