Skip to content

Commit

Permalink
changefeedccl: use new bulk oracle for changefeed planning
Browse files Browse the repository at this point in the history
This change uses the BulkOracle by default as part of changefeed
planning, instead of the bin packing oracle. This will allow changefeeds
to have plans that randomly assign spans to any replica, including
followers if enabled, following locality filter constraints.

A new cluster setting, `changefeed.balanced_distribution.enabled`,
protects this change. When enabled (by default), changefeeds will use
the new BulkOracle for planning. If disabled, changefeeds will use the
previous bin packing oracle.

Epic: none
Fixes: cockroachdb#119777
Fixes: cockroachdb#114611

Release note (enterprise change): Changefeeds now use the BulkOracle for
planning, which distributes work evenly across all replica in the
locality filter, including followers if enabled. This is enabled by
default with the cluster setting
`changefeed.balanced_distribution.enabled`. If disabled, changefeed
planning reverts to its previous bin packing oracle.
  • Loading branch information
rharding6373 committed May 31, 2024
1 parent 2982a0a commit 71c5185
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 3 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ go_library(
"//pkg/ccl/changefeedccl/kvevent",
"//pkg/ccl/changefeedccl/kvfeed",
"//pkg/ccl/changefeedccl/schemafeed",
"//pkg/ccl/kvccl/kvfollowerreadsccl",
"//pkg/ccl/utilccl",
"//pkg/cloud",
"//pkg/cloud/externalconn",
Expand Down
16 changes: 14 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvfollowerreadsccl"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand Down Expand Up @@ -329,6 +330,12 @@ var enableBalancedRangeDistribution = settings.RegisterBoolSetting(
settings.WithName("changefeed.balance_range_distribution.enabled"),
settings.WithPublic)

var useBulkOracle = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"changefeed.random_replica_selection.enabled",
"randomize the selection of which replica backs up each range",
false)

func makePlan(
execCtx sql.JobExecContext,
jobID jobspb.JobID,
Expand All @@ -354,8 +361,13 @@ func makePlan(
}
}

planCtx := dsp.NewPlanningCtxWithOracle(ctx, execCtx.ExtendedEvalContext(), nil /* planner */, blankTxn,
sql.DistributionType(distMode), physicalplan.DefaultReplicaChooser, locFilter)
evalCtx := execCtx.ExtendedEvalContext()
oracle := physicalplan.DefaultReplicaChooser
if useBulkOracle.Get(&evalCtx.Settings.SV) {
oracle = kvfollowerreadsccl.NewBulkOracle(dsp.ReplicaOracleConfig(evalCtx.Locality), locFilter)
}
planCtx := dsp.NewPlanningCtxWithOracle(ctx, execCtx.ExtendedEvalContext(), nil, /* planner */
blankTxn, sql.DistributionType(distMode), oracle, locFilter)
spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, trackedSpans)
if err != nil {
return nil, nil, err
Expand Down
6 changes: 5 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6171,7 +6171,7 @@ func TestChangefeedHandlesRollingRestart(t *testing.T) {
t.Fatal("did not get signal to proceed")
}
},
// Handle tarnsient changefeed error. We expect to see node drain error.
// Handle transient changefeed error. We expect to see node drain error.
// When we do, notify drainNotification, and reset node drain channel.
HandleDistChangefeedError: func(err error) error {
errCh <- err
Expand Down Expand Up @@ -6209,6 +6209,10 @@ func TestChangefeedHandlesRollingRestart(t *testing.T) {
serverutils.SetClusterSetting(t, tc, "kv.closed_timestamp.target_duration", 10*time.Millisecond)
serverutils.SetClusterSetting(t, tc, "changefeed.experimental_poll_interval", 10*time.Millisecond)
serverutils.SetClusterSetting(t, tc, "changefeed.aggregator.heartbeat", 10*time.Millisecond)
// Randomizing replica assignment can cause timeouts or other
// failures due to assumptions in the testing knobs about balanced
// assignments.
serverutils.SetClusterSetting(t, tc, "changefeed.random_replica_selection.enabled", false)

sqlutils.CreateTable(
t, db, "foo",
Expand Down

0 comments on commit 71c5185

Please sign in to comment.