diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 1c5ec6c8f074..df804fee4d88 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -57,6 +57,7 @@ go_library( "//pkg/ccl/changefeedccl/kvevent", "//pkg/ccl/changefeedccl/kvfeed", "//pkg/ccl/changefeedccl/schemafeed", + "//pkg/ccl/kvccl/kvfollowerreadsccl", "//pkg/ccl/utilccl", "//pkg/cloud", "//pkg/cloud/externalconn", diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index db8ca1edba7f..a59e3b45921d 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvfollowerreadsccl" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler" "github.com/cockroachdb/cockroach/pkg/keys" @@ -358,6 +359,12 @@ var RangeDistributionStrategy = settings.RegisterEnumSetting( }, settings.WithPublic) +var useBulkOracle = settings.RegisterBoolSetting( + settings.ApplicationLevel, + "changefeed.random_replica_selection.enabled", + "randomize the selection of which replica backs up each range", + true) + func makePlan( execCtx sql.JobExecContext, jobID jobspb.JobID, @@ -386,7 +393,11 @@ func makePlan( } rangeDistribution := RangeDistributionStrategy.Get(sv) + evalCtx := execCtx.ExtendedEvalContext() oracle := replicaoracle.NewOracle(replicaOracleChoice, dsp.ReplicaOracleConfig(locFilter)) + if useBulkOracle.Get(&evalCtx.Settings.SV) { + oracle = kvfollowerreadsccl.NewBulkOracle(dsp.ReplicaOracleConfig(evalCtx.Locality), locFilter, kvfollowerreadsccl.StreakConfig{}) + } planCtx := dsp.NewPlanningCtxWithOracle(ctx, execCtx.ExtendedEvalContext(), nil, /* planner */ blankTxn, sql.DistributionType(distMode), oracle, locFilter) spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, trackedSpans) diff --git a/pkg/ccl/changefeedccl/changefeed_dist_test.go b/pkg/ccl/changefeedccl/changefeed_dist_test.go index 35f47c1904f2..69d2ca5abe0c 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist_test.go +++ b/pkg/ccl/changefeedccl/changefeed_dist_test.go @@ -484,6 +484,7 @@ func TestChangefeedWithNoDistributionStrategy(t *testing.T) { defer tester.cleanup() tester.sqlDB.Exec(t, "SET CLUSTER SETTING changefeed.default_range_distribution_strategy = 'default'") + tester.sqlDB.Exec(t, "SET CLUSTER SETTING changefeed.random_replica_selection.enabled = false") tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no'") partitions := tester.getPartitions() counts := tester.countRangesPerNode(partitions) diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index c60e11901609..0bc1825b8e60 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -6190,7 +6190,7 @@ func TestChangefeedHandlesRollingRestart(t *testing.T) { t.Fatal("did not get signal to proceed") } }, - // Handle tarnsient changefeed error. We expect to see node drain error. + // Handle transient changefeed error. We expect to see node drain error. // When we do, notify drainNotification, and reset node drain channel. HandleDistChangefeedError: func(err error) error { errCh <- err @@ -6227,6 +6227,10 @@ func TestChangefeedHandlesRollingRestart(t *testing.T) { serverutils.SetClusterSetting(t, tc, "kv.closed_timestamp.target_duration", 10*time.Millisecond) serverutils.SetClusterSetting(t, tc, "changefeed.experimental_poll_interval", 10*time.Millisecond) serverutils.SetClusterSetting(t, tc, "changefeed.aggregator.heartbeat", 10*time.Millisecond) + // Randomizing replica assignment can cause timeouts or other + // failures due to assumptions in the testing knobs about balanced + // assignments. + serverutils.SetClusterSetting(t, tc, "changefeed.random_replica_selection.enabled", false) sqlutils.CreateTable( t, db, "foo",