Skip to content

Commit

Permalink
Merge pull request #140139 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-25.1-139984

release-25.1: crosscluster/logical: reduce wasted work after offline scan completion
  • Loading branch information
msbutler authored Jan 31, 2025
2 parents 19efdfd + 9cc5ced commit a610428
Showing 1 changed file with 46 additions and 35 deletions.
81 changes: 46 additions & 35 deletions pkg/crosscluster/logical/offline_initial_scan_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/bulk"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
Expand All @@ -38,15 +37,6 @@ import (
"github.com/cockroachdb/logtags"
)

// TODO(msbutler): we can remove this setting once we eagerly spin down the proc
// when the initial scan completes.
var minimumFlushInterval = settings.RegisterDurationSettingWithExplicitUnit(
settings.SystemOnly,
"logical_replication.offline_scan_flush_interval",
"the minimum timestamp between flushes; flushes may still occur if internal buffers fill up",
5*time.Second,
)

// offlineInitialScanProcessor consumes a cross-cluster replication stream
// by decoding kvs in it to logical changes and applying them by executing DMLs.
type offlineInitialScanProcessor struct {
Expand All @@ -73,12 +63,18 @@ type offlineInitialScanProcessor struct {

errCh chan error

checkpointCh chan []jobspb.ResolvedSpan
checkpointCh chan offlineCheckpoint

rekey *backup.KeyRewriter

batcher *bulk.SSTBatcher
lastKeyAdded roachpb.Key
batcher *bulk.SSTBatcher
lastKeyAdded roachpb.Key
initialScanCompleted bool
}

type offlineCheckpoint struct {
resolvedSpans []jobspb.ResolvedSpan
afterInitialScanCompletion bool
}

var (
Expand Down Expand Up @@ -107,7 +103,7 @@ func newNewOfflineInitialScanProcessor(
spec: spec,
processorID: processorID,
stopCh: make(chan struct{}),
checkpointCh: make(chan []jobspb.ResolvedSpan),
checkpointCh: make(chan offlineCheckpoint),
errCh: make(chan error, 1),
rekey: rekeyer,
lastKeyAdded: roachpb.Key{},
Expand Down Expand Up @@ -244,19 +240,9 @@ func (o *offlineInitialScanProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.P
}

select {
case resolved, ok := <-o.checkpointCh:
if ok {
progressUpdate := &jobspb.ResolvedSpans{ResolvedSpans: resolved}
progressBytes, err := protoutil.Marshal(progressUpdate)
if err != nil {
o.MoveToDrainingAndLogError(err)
return nil, o.DrainHelper()
}
row := rowenc.EncDatumRow{
rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(progressBytes))),
}
return row, nil
} else {
case checkpoint, ok := <-o.checkpointCh:
switch {
case !ok:
select {
case err := <-o.errCh:
o.MoveToDrainingAndLogError(err)
Expand All @@ -267,6 +253,23 @@ func (o *offlineInitialScanProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.P
o.MoveToDrainingAndLogError(nil /* error */)
return nil, o.DrainHelper()
}
case checkpoint.afterInitialScanCompletion:
// The previous checkpoint completed the initial scan and was already
// ingested by the coordinator, so we can gracefully shut down the
// processor.
o.MoveToDraining(nil)
return nil, o.DrainHelper()
default:
progressUpdate := &jobspb.ResolvedSpans{ResolvedSpans: checkpoint.resolvedSpans}
progressBytes, err := protoutil.Marshal(progressUpdate)
if err != nil {
o.MoveToDrainingAndLogError(err)
return nil, o.DrainHelper()
}
row := rowenc.EncDatumRow{
rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(progressBytes))),
}
return row, nil
}
case err := <-o.errCh:
o.MoveToDrainingAndLogError(err)
Expand Down Expand Up @@ -360,28 +363,28 @@ func (o *offlineInitialScanProcessor) checkpoint(
return errors.New("checkpoint event expected to have resolved spans")
}

completedInitialScan := true
for i := range resolvedSpans {
if resolvedSpans[i].Timestamp.IsEmpty() {
completedInitialScan = false
}
if o.spec.InitialScanTimestamp.Less(resolvedSpans[i].Timestamp) {
// Elide all checkpoints after initial scan timestamp.
resolvedSpans[i].Timestamp = o.spec.InitialScanTimestamp
}
}

minFlushInterval := minimumFlushInterval.Get(&o.FlowCtx.Cfg.Settings.SV)
if timeutil.Since(o.lastFlushTime) < minFlushInterval {
// To prevent checkpoint overload after event stream finishes initial scan,
// pace these noop checkpoints. This is a temporary measure until the
// processor eagerly spins down after initial scan.
return nil
}
log.Infof(ctx, "flushing batcher on checkpoint")
if err := o.flushBatch(ctx); err != nil {
return errors.Wrap(err, "flushing batcher on checkpoint")
}
o.batcher.Reset(ctx)

select {
case o.checkpointCh <- resolvedSpans:
case o.checkpointCh <- offlineCheckpoint{
resolvedSpans: resolvedSpans,
afterInitialScanCompletion: o.initialScanCompleted,
}:
case <-ctx.Done():
return ctx.Err()
case <-o.stopCh:
Expand All @@ -392,6 +395,14 @@ func (o *offlineInitialScanProcessor) checkpoint(
return nil
}
o.lastFlushTime = timeutil.Now()
if completedInitialScan {
// Modify the processor state _after_ Next() has received the first
// checkpoint that completes the initial scan. This ensures Next()'s
// caller receives this checkpoint. Then, on the next checkpoint,
// `afterInitialScanCompletion` will be set, causing Next to gracefully
// shutdown the processor.
o.initialScanCompleted = true
}
return nil
}

Expand Down

0 comments on commit a610428

Please sign in to comment.