From 51fd26f6610c89f725a60cf5268a9220397e0432 Mon Sep 17 00:00:00 2001 From: adityamaru Date: Wed, 15 Nov 2023 18:51:25 -0500 Subject: [PATCH] sql: prevent gateway from always being picked Previously, the instance resoler would always assign the partition span to the gateway if the gateway was in the set of eligible instances and we did not find an eligible instance with a better locality match. In large clusters during backup/cdc running with execution locality, this could cause the gateway to get the lions share of work thereby causing it to OOM or severely throttle performance. This change make span partitioning a little more stateful. Concretely, we now track how many partition spans have been assigned to each node in the `planCtx` that is used throughout the planning of a single statement. This distribution is then used to limit the number of partition spans we default to the gateway. Currently, by default we allow the gateway to have: `2 * average number of partition spans across the other instances` If the gateway does not satisfy this heuristic we randomly pick one of the other eligible instances. Note, if there are no eligible instances except for the gateway, or the gateway has received no spans yet, we will pick the gateway. Fixes: #114079 Release note (bug fix): fixes a bug where large jobs running with execution locality could result in the gateway being assigned most of the work causing performance degradation and cluster instability --- pkg/sql/distsql_physical_planner.go | 132 ++++++-- pkg/sql/distsql_physical_planner_test.go | 393 ++++++++++++++++++++++- pkg/sql/execinfra/server_config.go | 5 + 3 files changed, 494 insertions(+), 36 deletions(-) diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 1bd2e271cc95..7c6f6470faf4 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -801,6 +801,33 @@ const ( NodeDistSQLVersionIncompatible ) +// spanPartitionState captures information about the current state of the +// partitioning that has occurred during the planning process. +type spanPartitionState struct { + // partitionSpanDecisions is a mapping from a SpanPartitionReason to the number of + // times we have picked an instance for that reason. + partitionSpanDecisions [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int + + // partitionSpans is a mapping from a SQLInstanceID to the number of + // partition spans that have been assigned to that node. + partitionSpans map[base.SQLInstanceID]int + + // totalPartitionSpans is the total number of partitions that have been processed + // so far. + totalPartitionSpans int + + testingOverrideRandomSelection func() base.SQLInstanceID +} + +// update updates the spanPartitionState with the information about the new span partition. +func (p *spanPartitionState) update( + partitionNode base.SQLInstanceID, partitionReason SpanPartitionReason, +) { + p.totalPartitionSpans++ + p.partitionSpanDecisions[partitionReason]++ + p.partitionSpans[partitionNode]++ +} + // PlanningCtx contains data used and updated throughout the planning process of // a single query. type PlanningCtx struct { @@ -808,6 +835,10 @@ type PlanningCtx struct { localityFilter roachpb.Locality + // spanPartitionState captures information about the current state of the + // partitioning that has occurred during the planning process. + spanPartitionState *spanPartitionState + spanIter physicalplan.SpanResolverIterator // nodeStatuses contains info for all SQLInstanceIDs that are referenced by // any PhysicalPlan we generate with this context. @@ -1080,40 +1111,44 @@ type SpanPartitionReason int32 const ( // SpanPartitionReason_UNSPECIFIED is reported when the reason is unspecified. - SpanPartitionReason_UNSPECIFIED SpanPartitionReason = 0 + SpanPartitionReason_UNSPECIFIED SpanPartitionReason = iota // SpanPartitionReason_GATEWAY_TARGET_UNHEALTHY is reported when the target // node is unhealthy and so we default to the gateway node. - SpanPartitionReason_GATEWAY_TARGET_UNHEALTHY SpanPartitionReason = 1 + SpanPartitionReason_GATEWAY_TARGET_UNHEALTHY // SpanPartitionReason_GATEWAY_NO_HEALTHY_INSTANCES is reported when there are // no healthy instances and so we default to the gateway node. - SpanPartitionReason_GATEWAY_NO_HEALTHY_INSTANCES SpanPartitionReason = 2 + SpanPartitionReason_GATEWAY_NO_HEALTHY_INSTANCES // SpanPartitionReason_GATEWAY_ON_ERROR is reported when there is an error and // so we default to the gateway node. - SpanPartitionReason_GATEWAY_ON_ERROR SpanPartitionReason = 3 + SpanPartitionReason_GATEWAY_ON_ERROR // SpanPartitionReason_TARGET_HEALTHY is reported when the target node is // healthy. - SpanPartitionReason_TARGET_HEALTHY SpanPartitionReason = 4 + SpanPartitionReason_TARGET_HEALTHY // SpanPartitionReason_CLOSEST_LOCALITY_MATCH is reported when we picked an // instance with the closest match to the provided locality filter. - SpanPartitionReason_CLOSEST_LOCALITY_MATCH SpanPartitionReason = 5 + SpanPartitionReason_CLOSEST_LOCALITY_MATCH // SpanPartitionReason_GATEWAY_NO_LOCALITY_MATCH is reported when there is no // match to the provided locality filter and so we default to the gateway. - SpanPartitionReason_GATEWAY_NO_LOCALITY_MATCH SpanPartitionReason = 6 - // SpanPartitionReason_LOCALITY_AWARE_RANDOM is reported when there is no + SpanPartitionReason_GATEWAY_NO_LOCALITY_MATCH + // SpanPartitionReason_LOCALITY_FILTERED_RANDOM is reported when there is no // match to the provided locality filter and the gateway is not eligible. In // this case we pick a random available instance. - SpanPartitionReason_LOCALITY_AWARE_RANDOM SpanPartitionReason = 7 + SpanPartitionReason_LOCALITY_FILTERED_RANDOM // SpanPartitionReason_ROUND_ROBIN is reported when there is no locality info // on any of the instances and so we default to a naive round-robin strategy. - SpanPartitionReason_ROUND_ROBIN SpanPartitionReason = 8 - + SpanPartitionReason_ROUND_ROBIN // SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY is reported when the // target node retrieved via gossip is deemed unhealthy. In this case we // default to the gateway node. - SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY SpanPartitionReason = 9 + SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY // SpanPartitionReason_GOSSIP_TARGET_HEALTHY is reported when the // target node retrieved via gossip is deemed healthy. - SpanPartitionReason_GOSSIP_TARGET_HEALTHY SpanPartitionReason = 10 + SpanPartitionReason_GOSSIP_TARGET_HEALTHY + // SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED is reported + // when there is no match to the provided locality filter and the gateway is + // eligible but overloaded with other partitions. In this case we pick a + // random instance apart from the gateway. + SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED ) func (r SpanPartitionReason) String() string { @@ -1132,14 +1167,16 @@ func (r SpanPartitionReason) String() string { return "closest-locality-match" case SpanPartitionReason_GATEWAY_NO_LOCALITY_MATCH: return "gateway-no-locality-match" - case SpanPartitionReason_LOCALITY_AWARE_RANDOM: - return "locality-aware-random" + case SpanPartitionReason_LOCALITY_FILTERED_RANDOM: + return "locality-filtered-random" case SpanPartitionReason_ROUND_ROBIN: return "round-robin" case SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY: return "gossip-gateway-target-unhealthy" case SpanPartitionReason_GOSSIP_TARGET_HEALTHY: return "gossip-target-healthy" + case SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED: + return "locality-filtered-random-gateway-overloaded" default: return "unknown" } @@ -1350,6 +1387,7 @@ func (dsp *DistSQLPlanner) partitionSpan( } sqlInstanceID, reason := getSQLInstanceIDForKVNodeID(replDesc.NodeID) + planCtx.spanPartitionState.update(sqlInstanceID, reason) partitionIdx, inNodeMap := nodeMap[sqlInstanceID] if !inNodeMap { partitionIdx = len(partitions) @@ -1534,6 +1572,38 @@ var noInstancesMatchingLocalityFilterErr = errors.New( "no healthy sql instances available matching locality requirement", ) +// shouldPickGateway determines whether the gateway node should be picked for a +// particular partition. +func (dsp *DistSQLPlanner) shouldPickGateway( + planCtx *PlanningCtx, instances []sqlinstance.InstanceInfo, +) bool { + numEligibleInstancesExcludingGateway := len(instances) - 1 + if numEligibleInstancesExcludingGateway <= 0 { + return true + } + + partitionsOnGateway := planCtx.spanPartitionState.partitionSpans[dsp.gatewaySQLInstanceID] + averageDistributionOnNonGatewayInstances := + (planCtx.spanPartitionState.totalPartitionSpans - partitionsOnGateway) / numEligibleInstancesExcludingGateway + + // If the gateway does not have very many partitions yet, we should use the + // gateway. This is to avoid the situation where we are partitioning spans to + // remote nodes even when the overall number of partitions is not that high. + minPartitionsOnGateway := 10 + if dsp.distSQLSrv.TestingKnobs.MinimumNumberOfGatewayPartitions != 0 { + minPartitionsOnGateway = dsp.distSQLSrv.TestingKnobs.MinimumNumberOfGatewayPartitions + } + if partitionsOnGateway < minPartitionsOnGateway { + return true + } + + // If the gateway has span partitions >= twice (by default) the average span + // partitions across other nodes we should distribute the partition to another + // node. + bias := int(planCtx.ExtendedEvalCtx.SessionData().DistsqlPlanGatewayBias) + return partitionsOnGateway < bias*averageDistributionOnNonGatewayInstances +} + // makeInstanceResolver returns a function that can choose the SQL instance ID // for a provided KV node ID. func (dsp *DistSQLPlanner) makeInstanceResolver( @@ -1632,13 +1702,27 @@ func (dsp *DistSQLPlanner) makeInstanceResolver( return closest[rng.Intn(len(closest))], SpanPartitionReason_CLOSEST_LOCALITY_MATCH } - // No instances had any locality tiers in common with the node locality so - // just return the gateway if it is eligible. If it isn't, just pick a - // random instance from the eligible instances. - if gatewayIsEligible { + // No instances had any locality tiers in common with the node locality. + // At this point we pick the gateway if it is eligible, otherwise we pick + // a random instance from the eligible instances. + if !gatewayIsEligible { + return instances[rng.Intn(len(instances))].InstanceID, SpanPartitionReason_LOCALITY_FILTERED_RANDOM + } + if dsp.shouldPickGateway(planCtx, instances) { return dsp.gatewaySQLInstanceID, SpanPartitionReason_GATEWAY_NO_LOCALITY_MATCH + } else { + // If the gateway has a disproportionate number of partitions pick a + // random instance that is not the gateway. + if planCtx.spanPartitionState.testingOverrideRandomSelection != nil { + return planCtx.spanPartitionState.testingOverrideRandomSelection(), + SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + } + // NB: This random selection may still pick the gateway but that is + // alright as we are more interested in a uniform distribution rather + // than avoiding the gateway. + id := instances[rng.Intn(len(instances))].InstanceID + return id, SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED } - return instances[rng.Intn(len(instances))].InstanceID, SpanPartitionReason_LOCALITY_AWARE_RANDOM } return resolver, nil } @@ -1743,11 +1827,12 @@ func (dsp *DistSQLPlanner) getInstanceIDForScan( if err != nil { return 0, err } - sqlInstanceID, _ := resolver(replDesc.NodeID) + sqlInstanceID, reason := resolver(replDesc.NodeID) + planCtx.spanPartitionState.update(sqlInstanceID, reason) return sqlInstanceID, nil } -func (dsp *DistSQLPlanner) useGossipPlanning(ctx context.Context, planCtx *PlanningCtx) bool { +func (dsp *DistSQLPlanner) useGossipPlanning(_ context.Context, planCtx *PlanningCtx) bool { // TODO(dt): enable this by default, e.g. // && !dsp.distSQLSrv.Settings.Version.IsActive(ctx, clusterversion.V23_1) return dsp.codec.ForSystemTenant() && planCtx.localityFilter.Empty() } @@ -4788,6 +4873,9 @@ func (dsp *DistSQLPlanner) NewPlanningCtxWithOracle( planCtx.spanIter = dsp.spanResolver.NewSpanResolverIterator(txn, oracle) planCtx.nodeStatuses = make(map[base.SQLInstanceID]NodeStatus) planCtx.nodeStatuses[dsp.gatewaySQLInstanceID] = NodeOK + planCtx.spanPartitionState = &spanPartitionState{ + partitionSpans: make(map[base.SQLInstanceID]int), + } return planCtx } diff --git a/pkg/sql/distsql_physical_planner_test.go b/pkg/sql/distsql_physical_planner_test.go index 671a519eeb97..07960523dc48 100644 --- a/pkg/sql/distsql_physical_planner_test.go +++ b/pkg/sql/distsql_physical_planner_test.go @@ -673,6 +673,7 @@ func TestPartitionSpans(t *testing.T) { // expected result: a map of node to list of spans. partitions map[int][][2]string partitionStates []string + partitionState spanPartitionState }{ { ranges: []testSpanResolverRange{{"A", 1}, {"B", 2}, {"C", 1}, {"D", 3}}, @@ -692,6 +693,18 @@ func TestPartitionSpans(t *testing.T) { "partition span: C{-1}, instance ID: 1, reason: gossip-target-healthy", "partition span: {D1-X}, instance ID: 3, reason: gossip-target-healthy", }, + + partitionState: spanPartitionState{ + partitionSpans: map[base.SQLInstanceID]int{ + 1: 2, + 2: 1, + 3: 1, + }, + partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 4, + }, + totalPartitionSpans: 4, + }, }, { @@ -713,6 +726,18 @@ func TestPartitionSpans(t *testing.T) { "partition span: C{-1}, instance ID: 1, reason: gossip-target-healthy", "partition span: {D1-X}, instance ID: 3, reason: gossip-target-healthy", }, + + partitionState: spanPartitionState{ + partitionSpans: map[base.SQLInstanceID]int{ + 1: 2, + 2: 1, + 3: 1, + }, + partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 4, + }, + totalPartitionSpans: 4, + }, }, { @@ -733,6 +758,18 @@ func TestPartitionSpans(t *testing.T) { "partition span: C{-1}, instance ID: 1, reason: gossip-target-healthy", "partition span: {D1-X}, instance ID: 3, reason: gossip-target-healthy", }, + + partitionState: spanPartitionState{ + partitionSpans: map[base.SQLInstanceID]int{ + 1: 3, + 3: 1, + }, + partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 3, + SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY: 1, + }, + totalPartitionSpans: 4, + }, }, { @@ -753,6 +790,18 @@ func TestPartitionSpans(t *testing.T) { "partition span: C{-1}, instance ID: 1, reason: gossip-target-healthy", "partition span: {D1-X}, instance ID: 1, reason: gossip-gateway-target-unhealthy", }, + + partitionState: spanPartitionState{ + partitionSpans: map[base.SQLInstanceID]int{ + 1: 3, + 2: 1, + }, + partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 3, + SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY: 1, + }, + totalPartitionSpans: 4, + }, }, { @@ -773,6 +822,18 @@ func TestPartitionSpans(t *testing.T) { "partition span: C{-1}, instance ID: 2, reason: gossip-gateway-target-unhealthy", "partition span: {D1-X}, instance ID: 3, reason: gossip-target-healthy", }, + + partitionState: spanPartitionState{ + partitionSpans: map[base.SQLInstanceID]int{ + 2: 3, + 3: 1, + }, + partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 2, + SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY: 2, + }, + totalPartitionSpans: 4, + }, }, { @@ -793,6 +854,18 @@ func TestPartitionSpans(t *testing.T) { "partition span: C{-1}, instance ID: 3, reason: gossip-gateway-target-unhealthy", "partition span: {D1-X}, instance ID: 3, reason: gossip-target-healthy", }, + + partitionState: spanPartitionState{ + partitionSpans: map[base.SQLInstanceID]int{ + 2: 1, + 3: 3, + }, + partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 2, + SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY: 2, + }, + totalPartitionSpans: 4, + }, }, // Test point lookups in isolation. @@ -812,6 +885,17 @@ func TestPartitionSpans(t *testing.T) { "partition span: A1, instance ID: 1, reason: gossip-target-healthy", "partition span: B1, instance ID: 2, reason: gossip-target-healthy", }, + + partitionState: spanPartitionState{ + partitionSpans: map[base.SQLInstanceID]int{ + 1: 2, + 2: 1, + }, + partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 3, + }, + totalPartitionSpans: 3, + }, }, // Test point lookups intertwined with span scans. @@ -838,6 +922,17 @@ func TestPartitionSpans(t *testing.T) { "partition span: B{-3}, instance ID: 1, reason: gossip-target-healthy", "partition span: B2, instance ID: 1, reason: gossip-target-healthy", }, + + partitionState: spanPartitionState{ + partitionSpans: map[base.SQLInstanceID]int{ + 1: 9, + 2: 1, + }, + partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 10, + }, + totalPartitionSpans: 10, + }, }, // A single span touching multiple ranges but on the same node results @@ -856,6 +951,16 @@ func TestPartitionSpans(t *testing.T) { "partition span: A{-1}, instance ID: 1, reason: gossip-target-healthy", "partition span: {A1-B}, instance ID: 1, reason: gossip-target-healthy", }, + + partitionState: spanPartitionState{ + partitionSpans: map[base.SQLInstanceID]int{ + 1: 2, + }, + partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 2, + }, + totalPartitionSpans: 2, + }, }, // Test some locality-filtered planning too. // @@ -870,15 +975,30 @@ func TestPartitionSpans(t *testing.T) { spans: [][2]string{{"A1", "C1"}, {"D1", "X"}}, locFilter: "x=1", partitions: map[int][][2]string{ - 1: {{"A1", "B"}, {"C", "C1"}, {"D1", "X"}}, - 2: {{"B", "C"}}, + 1: {{"A1", "B"}, {"C", "C1"}}, + 2: {{"B", "C"}, {"D1", "X"}}, }, partitionStates: []string{ "partition span: {A1-B}, instance ID: 1, reason: target-healthy", "partition span: {B-C}, instance ID: 2, reason: target-healthy", "partition span: C{-1}, instance ID: 1, reason: target-healthy", - "partition span: {D1-X}, instance ID: 1, reason: gateway-no-locality-match", + "partition span: {D1-X}, instance ID: 2, reason: locality-filtered-random-gateway-overloaded", + }, + + partitionState: spanPartitionState{ + partitionSpans: map[base.SQLInstanceID]int{ + 1: 2, + 2: 2, + }, + partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + SpanPartitionReason_TARGET_HEALTHY: 3, + SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED: 1, + }, + testingOverrideRandomSelection: func() base.SQLInstanceID { + return 2 + }, + totalPartitionSpans: 4, }, }, { @@ -898,6 +1018,18 @@ func TestPartitionSpans(t *testing.T) { "partition span: C{-1}, instance ID: 2, reason: closest-locality-match", "partition span: {D1-X}, instance ID: 4, reason: closest-locality-match", }, + + partitionState: spanPartitionState{ + partitionSpans: map[base.SQLInstanceID]int{ + 2: 3, + 4: 1, + }, + partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + SpanPartitionReason_TARGET_HEALTHY: 1, + SpanPartitionReason_CLOSEST_LOCALITY_MATCH: 3, + }, + totalPartitionSpans: 4, + }, }, { ranges: []testSpanResolverRange{{"A", 1}, {"B", 2}, {"C", 1}, {"D", 3}}, @@ -906,15 +1038,31 @@ func TestPartitionSpans(t *testing.T) { spans: [][2]string{{"A1", "C1"}, {"D1", "X"}}, locFilter: "x=3", partitions: map[int][][2]string{ - 7: {{"A1", "C1"}, {"D1", "X"}}, + 6: {{"B", "C1"}}, + 7: {{"A1", "B"}, {"D1", "X"}}, }, partitionStates: []string{ "partition span: {A1-B}, instance ID: 7, reason: gateway-no-locality-match", - "partition span: {B-C}, instance ID: 7, reason: gateway-no-locality-match", - "partition span: C{-1}, instance ID: 7, reason: gateway-no-locality-match", + "partition span: {B-C}, instance ID: 6, reason: locality-filtered-random-gateway-overloaded", + "partition span: C{-1}, instance ID: 6, reason: locality-filtered-random-gateway-overloaded", "partition span: {D1-X}, instance ID: 7, reason: gateway-no-locality-match", }, + + partitionState: spanPartitionState{ + partitionSpans: map[base.SQLInstanceID]int{ + 6: 2, + 7: 2, + }, + partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + SpanPartitionReason_GATEWAY_NO_LOCALITY_MATCH: 2, + SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED: 2, + }, + totalPartitionSpans: 4, + testingOverrideRandomSelection: func() base.SQLInstanceID { + return 6 + }, + }, }, { ranges: []testSpanResolverRange{{"A", 1}, {"B", 2}, {"C", 1}, {"D", 3}}, @@ -927,10 +1075,20 @@ func TestPartitionSpans(t *testing.T) { }, partitionStates: []string{ - "partition span: {A1-B}, instance ID: 7, reason: locality-aware-random", - "partition span: {B-C}, instance ID: 7, reason: locality-aware-random", - "partition span: C{-1}, instance ID: 7, reason: locality-aware-random", - "partition span: {D1-X}, instance ID: 7, reason: locality-aware-random", + "partition span: {A1-B}, instance ID: 7, reason: locality-filtered-random", + "partition span: {B-C}, instance ID: 7, reason: locality-filtered-random", + "partition span: C{-1}, instance ID: 7, reason: locality-filtered-random", + "partition span: {D1-X}, instance ID: 7, reason: locality-filtered-random", + }, + + partitionState: spanPartitionState{ + partitionSpans: map[base.SQLInstanceID]int{ + 7: 4, + }, + partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + SpanPartitionReason_LOCALITY_FILTERED_RANDOM: 4, + }, + totalPartitionSpans: 4, }, }, } @@ -938,7 +1096,13 @@ func TestPartitionSpans(t *testing.T) { // We need a mock Gossip to contain addresses for the nodes. Otherwise the // DistSQLPlanner will not plan flows on them. ctx := context.Background() - s := serverutils.StartServerOnly(t, base.TestServerArgs{}) + s := serverutils.StartServerOnly(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + DistSQL: &execinfra.TestingKnobs{ + MinimumNumberOfGatewayPartitions: 1, + }, + }, + }) defer s.Stopper().Stop(ctx) mockGossip := gossip.NewTest(roachpb.NodeID(1), s.Stopper(), metric.NewRegistry()) var nodeDescs []*roachpb.NodeDescriptor @@ -1009,9 +1173,14 @@ func TestPartitionSpans(t *testing.T) { }, }, sqlAddressResolver: mockInstances, - distSQLSrv: &distsql.ServerImpl{ServerConfig: execinfra.ServerConfig{NodeID: base.NewSQLIDContainerForNode(nID)}}, - codec: keys.SystemSQLCodec, - nodeDescs: mockGossip, + distSQLSrv: &distsql.ServerImpl{ + ServerConfig: execinfra.ServerConfig{ + NodeID: base.NewSQLIDContainerForNode(nID), + TestingKnobs: execinfra.TestingKnobs{MinimumNumberOfGatewayPartitions: 1}, + }, + }, + codec: keys.SystemSQLCodec, + nodeDescs: mockGossip, } var locFilter roachpb.Locality @@ -1021,6 +1190,7 @@ func TestPartitionSpans(t *testing.T) { planCtx := dsp.NewPlanningCtxWithOracle(ctx, &extendedEvalContext{ Context: eval.Context{Codec: keys.SystemSQLCodec}, }, nil, nil, DistributionTypeSystemTenantOnly, physicalplan.DefaultReplicaChooser, locFilter) + planCtx.spanPartitionState.testingOverrideRandomSelection = tc.partitionState.testingOverrideRandomSelection var spans []roachpb.Span for _, s := range tc.spans { spans = append(spans, roachpb.Span{Key: roachpb.Key(s[0]), EndKey: roachpb.Key(s[1])}) @@ -1031,6 +1201,14 @@ func TestPartitionSpans(t *testing.T) { t.Fatal(err) } + // Assert that the PartitionState is what we expect it to be. + tc.partitionState.testingOverrideRandomSelection = nil + planCtx.spanPartitionState.testingOverrideRandomSelection = nil + if !reflect.DeepEqual(*planCtx.spanPartitionState, tc.partitionState) { + t.Errorf("expected partition state:\n %v\ngot:\n %v", + tc.partitionState, *planCtx.spanPartitionState) + } + resMap := make(map[int][][2]string) for _, p := range partitions { if _, ok := resMap[int(p.SQLInstanceID)]; ok { @@ -1056,6 +1234,193 @@ func TestPartitionSpans(t *testing.T) { } } +// TestShouldPickGatewayNode is a unit test of the shouldPickGateway method. +func TestShouldPickGatewayNode(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testCases := []struct { + name string + gatewayInstance base.SQLInstanceID + instances []sqlinstance.InstanceInfo + partitionState *spanPartitionState + shouldPick bool + }{ + { + name: "no_instances", + gatewayInstance: 1, + instances: []sqlinstance.InstanceInfo{}, + partitionState: &spanPartitionState{partitionSpans: map[base.SQLInstanceID]int{ + 1: 5, + }}, + shouldPick: true, + }, + { + name: "only_gateway", + gatewayInstance: 1, + instances: []sqlinstance.InstanceInfo{ + { + InstanceID: base.SQLInstanceID(1), + }, + }, + partitionState: &spanPartitionState{partitionSpans: map[base.SQLInstanceID]int{ + 1: 5, + }}, + shouldPick: true, + }, + { + name: "gateway_0", + gatewayInstance: 1, + instances: []sqlinstance.InstanceInfo{ + { + InstanceID: base.SQLInstanceID(1), + }, + { + InstanceID: base.SQLInstanceID(2), + }, + { + InstanceID: base.SQLInstanceID(3), + }, + }, + partitionState: &spanPartitionState{partitionSpans: map[base.SQLInstanceID]int{ + 1: 0, + 2: 0, + 3: 0, + }}, + shouldPick: true, + }, + { + name: "gateway_0_others_non_zero", + gatewayInstance: 1, + instances: []sqlinstance.InstanceInfo{ + { + InstanceID: base.SQLInstanceID(1), + }, + { + InstanceID: base.SQLInstanceID(2), + }, + { + InstanceID: base.SQLInstanceID(3), + }, + }, + partitionState: &spanPartitionState{partitionSpans: map[base.SQLInstanceID]int{ + 1: 0, + 2: 1, + 3: 1, + }}, + shouldPick: true, + }, + { + name: "below_threshold_1", + gatewayInstance: 1, + instances: []sqlinstance.InstanceInfo{ + { + InstanceID: base.SQLInstanceID(1), + }, + { + InstanceID: base.SQLInstanceID(2), + }, + { + InstanceID: base.SQLInstanceID(3), + }, + }, + partitionState: &spanPartitionState{partitionSpans: map[base.SQLInstanceID]int{ + 1: 1, + 2: 1, + 3: 1, + }}, + shouldPick: true, + }, + { + name: "above_threshold_1", + gatewayInstance: 1, + instances: []sqlinstance.InstanceInfo{ + { + InstanceID: base.SQLInstanceID(1), + }, + { + InstanceID: base.SQLInstanceID(2), + }, + { + InstanceID: base.SQLInstanceID(3), + }, + }, + partitionState: &spanPartitionState{partitionSpans: map[base.SQLInstanceID]int{ + 1: 1, + 2: 1, + 3: 0, + }}, + shouldPick: false, + }, + { + name: "above_threshold_2", + gatewayInstance: 1, + instances: []sqlinstance.InstanceInfo{ + { + InstanceID: base.SQLInstanceID(1), + }, + { + InstanceID: base.SQLInstanceID(2), + }, + { + InstanceID: base.SQLInstanceID(3), + }, + }, + partitionState: &spanPartitionState{partitionSpans: map[base.SQLInstanceID]int{ + 1: 2, + 2: 1, + 3: 1, + }}, + shouldPick: false, + }, + { + name: "above_threshold_3", + gatewayInstance: 1, + instances: []sqlinstance.InstanceInfo{ + { + InstanceID: base.SQLInstanceID(1), + }, + { + InstanceID: base.SQLInstanceID(2), + }, + { + InstanceID: base.SQLInstanceID(3), + }, + { + InstanceID: base.SQLInstanceID(4), + }, + }, + partitionState: &spanPartitionState{partitionSpans: map[base.SQLInstanceID]int{ + 1: 4, + 2: 1, + 3: 0, + 4: 5, + }}, + shouldPick: false, + }, + } + + for _, tc := range testCases { + mockDsp := &DistSQLPlanner{ + gatewaySQLInstanceID: tc.gatewayInstance, + distSQLSrv: &distsql.ServerImpl{ + ServerConfig: execinfra.ServerConfig{ + TestingKnobs: execinfra.TestingKnobs{MinimumNumberOfGatewayPartitions: 1}, + }, + }, + } + mockPlanCtx := &PlanningCtx{} + t.Run(tc.name, func(t *testing.T) { + mockPlanCtx.spanPartitionState = tc.partitionState + for _, partitionCount := range tc.partitionState.partitionSpans { + mockPlanCtx.spanPartitionState.totalPartitionSpans += partitionCount + } + shouldPick := mockDsp.shouldPickGateway(mockPlanCtx, tc.instances) + require.Equal(t, tc.shouldPick, shouldPick) + }) + } +} + // Test that span partitioning takes into account the advertised acceptable // versions of each node. Spans for which the owner node doesn't support our // plan's version will be planned on the gateway. diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index 0ebb96e3dc81..0e7e59f37bf0 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -319,6 +319,11 @@ type TestingKnobs struct { // run. The associated transaction ID of the statement performing the cascade // or check query is passed in as an argument. RunBeforeCascadesAndChecks func(txnID uuid.UUID) + + // MinimumNumberOfGatewayPartitions is the minimum number of partitions that + // will be assigned to the gateway before we start assigning partitions to + // other nodes. + MinimumNumberOfGatewayPartitions int } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.