diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 9f1a68641e6d..da78636b9dc4 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -54,6 +54,7 @@ go_library( "//pkg/ccl/changefeedccl/kvevent", "//pkg/ccl/changefeedccl/kvfeed", "//pkg/ccl/changefeedccl/schemafeed", + "//pkg/ccl/kvccl/kvfollowerreadsccl", "//pkg/ccl/utilccl", "//pkg/cloud", "//pkg/cloud/externalconn", diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index a407d64f519e..e7f5d34be1a1 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvfeed" + "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvfollowerreadsccl" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler" "github.com/cockroachdb/cockroach/pkg/kv" @@ -348,6 +349,12 @@ var enableBalancedRangeDistribution = settings.RegisterBoolSetting( "changefeed.balance_range_distribution.enable", false), ).WithPublic() +var useBulkOracle = settings.RegisterBoolSetting( + settings.TenantWritable, + "changefeed.random_replica_selection.enabled", + "randomize the selection of which replica backs up each range", + false) + func makePlan( execCtx sql.JobExecContext, jobID jobspb.JobID, @@ -372,8 +379,13 @@ func makePlan( } } - planCtx := dsp.NewPlanningCtxWithOracle(ctx, execCtx.ExtendedEvalContext(), nil /* planner */, blankTxn, - sql.DistributionType(distMode), physicalplan.DefaultReplicaChooser, locFilter) + evalCtx := execCtx.ExtendedEvalContext() + oracle := physicalplan.DefaultReplicaChooser + if useBulkOracle.Get(&evalCtx.Settings.SV) { + oracle = kvfollowerreadsccl.NewBulkOracle(dsp.ReplicaOracleConfig(evalCtx.Locality), locFilter) + } + planCtx := dsp.NewPlanningCtxWithOracle(ctx, execCtx.ExtendedEvalContext(), nil, /* planner */ + blankTxn, sql.DistributionType(distMode), oracle, locFilter) spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, trackedSpans) if err != nil { return nil, nil, err