Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allocator: prioritize non-voter promotion to satisfy voter constraints #111609

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions pkg/ccl/multiregionccl/testdata/secondary_region
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
skip issue-num=98020
----

new-cluster localities=us-east-1,us-east-1,us-west-1,us-west-1,us-central-1,us-central-1,us-central-1,eu-west-1,eu-west-1,eu-west-1
----

Expand Down
24 changes: 19 additions & 5 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1697,8 +1697,7 @@ func (a Allocator) RebalanceTarget(
)
var removalConstraintsChecker constraintsCheckFn
var rebalanceConstraintsChecker rebalanceConstraintsCheckFn
var replicaSetToRebalance, replicasWithExcludedStores []roachpb.ReplicaDescriptor
var otherReplicaSet []roachpb.ReplicaDescriptor
var replicaSetToRebalance, otherReplicaSet []roachpb.ReplicaDescriptor

switch t := targetType; t {
case VoterTarget:
Expand All @@ -1720,7 +1719,6 @@ func (a Allocator) RebalanceTarget(
// already have voting replicas as possible candidates. Voting replicas are
// supposed to be rebalanced before non-voting replicas, and they do
// consider the non-voters' stores as possible candidates.
replicasWithExcludedStores = existingVoters
otherReplicaSet = existingVoters
default:
log.KvDistribution.Fatalf(ctx, "unsupported targetReplicaType: %v", t)
Expand All @@ -1732,8 +1730,9 @@ func (a Allocator) RebalanceTarget(
sl,
removalConstraintsChecker,
rebalanceConstraintsChecker,
replicaSetToRebalance,
replicasWithExcludedStores,
existingVoters,
existingNonVoters,
targetType,
storePool.GetLocalitiesByStore(replicaSetForDiversityCalc),
storePool.IsStoreReadyForRoutineReplicaTransfer,
options,
Expand Down Expand Up @@ -1781,6 +1780,21 @@ func (a Allocator) RebalanceTarget(
return zero, zero, "", false
}

// If the target is a necessary non-voter promotion to satisfy some
// constraint, then do not attempt to simulate a remove target. We know
// that the target can be promoted, whilst another store is demoted (or
// removed) in order to satisfy a voter constraint. When every replica is
// necessary to satisfy an all-replica, or voter constraint, the simulated
// remove replica will not always be the existingCandidate depending on
// whether every voter is considered necessary.
if target.voterNecessary {
removeReplica = roachpb.ReplicationTarget{
NodeID: existingCandidate.store.Node.NodeID,
StoreID: existingCandidate.store.StoreID,
}
break
}

var removeDetails string
var err error
removeReplica, removeDetails, err = a.simulateRemoveTarget(
Expand Down
106 changes: 78 additions & 28 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ type candidate struct {
valid bool
fullDisk bool
necessary bool
voterNecessary bool
diversityScore float64
ioOverloaded bool
ioOverloadScore float64
Expand All @@ -704,9 +705,11 @@ type candidate struct {
}

func (c candidate) String() string {
str := fmt.Sprintf("s%d, valid:%t, fulldisk:%t, necessary:%t, diversity:%.2f, ioOverloaded: %t, ioOverload: %.2f, converges:%d, "+
"balance:%d, hasNonVoter:%t, rangeCount:%d, queriesPerSecond:%.2f",
c.store.StoreID, c.valid, c.fullDisk, c.necessary, c.diversityScore, c.ioOverloaded, c.ioOverloadScore, c.convergesScore,
str := fmt.Sprintf("s%d, valid:%t, fulldisk:%t, necessary:%t, "+
"voterNecessary:%t, diversity:%.2f, ioOverloaded: %t, ioOverload: %.2f, "+
"converges:%d, balance:%d, hasNonVoter:%t, rangeCount:%d, queriesPerSecond:%.2f",
c.store.StoreID, c.valid, c.fullDisk, c.necessary, c.voterNecessary,
c.diversityScore, c.ioOverloaded, c.ioOverloadScore, c.convergesScore,
c.balanceScore, c.hasNonVoter, c.rangeCount, c.store.Capacity.QueriesPerSecond)
if c.details != "" {
return fmt.Sprintf("%s, details:(%s)", str, c.details)
Expand All @@ -726,6 +729,9 @@ func (c candidate) compactString() string {
if c.necessary {
fmt.Fprintf(&buf, ", necessary:%t", c.necessary)
}
if c.voterNecessary {
fmt.Fprintf(&buf, ", voterNecessary:%t", c.voterNecessary)
}
if c.diversityScore != 0 {
fmt.Fprintf(&buf, ", diversity:%.2f", c.diversityScore)
}
Expand Down Expand Up @@ -772,6 +778,12 @@ func (c candidate) compare(o candidate) float64 {
}
return -400
}
if c.voterNecessary != o.voterNecessary {
if c.voterNecessary {
return 350
}
return -350
}
if !scoresAlmostEqual(c.diversityScore, o.diversityScore) {
if c.diversityScore > o.diversityScore {
return 300
Expand Down Expand Up @@ -860,6 +872,7 @@ func (c byScoreAndID) Less(i, j int) bool {
c[i].hasNonVoter == c[j].hasNonVoter &&
c[i].rangeCount == c[j].rangeCount &&
c[i].necessary == c[j].necessary &&
c[i].voterNecessary == c[j].voterNecessary &&
c[i].fullDisk == c[j].fullDisk &&
c[i].ioOverloaded == c[j].ioOverloaded &&
c[i].valid == c[j].valid {
Expand Down Expand Up @@ -890,6 +903,7 @@ func (cl candidateList) best() candidateList {
}
for i := 1; i < len(cl); i++ {
if cl[i].necessary == cl[0].necessary &&
cl[i].voterNecessary == cl[0].voterNecessary &&
scoresAlmostEqual(cl[i].diversityScore, cl[0].diversityScore) &&
cl[i].convergesScore == cl[0].convergesScore &&
cl[i].balanceScore == cl[0].balanceScore &&
Expand All @@ -910,6 +924,7 @@ func (cl candidateList) good() candidateList {
}
for i := 1; i < len(cl); i++ {
if cl[i].necessary == cl[0].necessary &&
cl[i].voterNecessary == cl[0].voterNecessary &&
scoresAlmostEqual(cl[i].diversityScore, cl[0].diversityScore) {
continue
}
Expand Down Expand Up @@ -955,6 +970,7 @@ func (cl candidateList) worst() candidateList {
// Find the worst constraint/locality/converges/balanceScore values.
for i := len(cl) - 2; i >= 0; i-- {
if cl[i].necessary == cl[len(cl)-1].necessary &&
cl[i].voterNecessary == cl[len(cl)-1].voterNecessary &&
scoresAlmostEqual(cl[i].diversityScore, cl[len(cl)-1].diversityScore) &&
cl[i].convergesScore == cl[len(cl)-1].convergesScore &&
cl[i].balanceScore == cl[len(cl)-1].balanceScore {
Expand Down Expand Up @@ -1476,14 +1492,22 @@ func rankedCandidateListForRebalancing(
allStores storepool.StoreList,
removalConstraintsChecker constraintsCheckFn,
rebalanceConstraintsChecker rebalanceConstraintsCheckFn,
existingReplicasForType, replicasOnExemptedStores []roachpb.ReplicaDescriptor,
existingVotingReplicas, existingNonVotingReplicas []roachpb.ReplicaDescriptor,
targetType TargetReplicaType,
existingStoreLocalities map[roachpb.StoreID]roachpb.Locality,
isStoreValidForRoutineReplicaTransfer func(context.Context, roachpb.StoreID) bool,
options ScorerOptions,
metrics AllocatorMetrics,
) []rebalanceOptions {
// 1. Determine whether existing replicas are valid and/or necessary.
existingStores := make(map[roachpb.StoreID]candidate)
var existingReplicasForType []roachpb.ReplicaDescriptor
if targetType == VoterTarget {
existingReplicasForType = existingVotingReplicas
} else {
existingReplicasForType = existingNonVotingReplicas
}

var needRebalanceFrom bool
curDiversityScore := RangeDiversityScore(existingStoreLocalities)
for _, store := range allStores.Stores {
Expand Down Expand Up @@ -1563,7 +1587,6 @@ func rankedCandidateListForRebalancing(
if store.StoreID == existing.store.StoreID {
continue
}

// Ignore any stores on dead nodes or stores that contain any of the
// replicas within `replicasOnExemptedStores`.
if !isStoreValidForRoutineReplicaTransfer(ctx, store.StoreID) {
Expand All @@ -1577,24 +1600,43 @@ func rankedCandidateListForRebalancing(
continue
}

// `replsOnExemptedStores` is used during non-voting replica rebalancing
// to ignore stores that already have a voting replica for the same range.
// When rebalancing a voter, `replsOnExemptedStores` is empty since voters
// are allowed to "displace" non-voting replicas (we correctly turn such
// actions into non-voter promotions, see replicationChangesForRebalance()).
var exempted bool
for _, replOnExemptedStore := range replicasOnExemptedStores {
if store.StoreID == replOnExemptedStore.StoreID {
log.KvDistribution.VEventf(
ctx,
6,
"s%d is not a possible rebalance candidate for non-voters because it already has a voter of the range; ignoring",
store.StoreID,
)
exempted = true
break
var exempted, promotionCandidate bool
if targetType == NonVoterTarget {
// During non-voting replica rebalancing we ignore stores that already
// have a voting replica for the same range. Voters are allowed to
// "displace" non-voting replicas (we correctly turn such actions into
// non-voter promotions, see replicationChangesForRebalance()).
for _, replOnExemptedStore := range existingVotingReplicas {
if store.StoreID == replOnExemptedStore.StoreID {
log.KvDistribution.VEventf(
ctx,
6,
"s%d is not a possible rebalance candidate for non-voters because it already has a voter of the range; ignoring",
store.StoreID,
)
exempted = true
break
}
}
} else if targetType == VoterTarget {
// During voting replica rebalancing, we check whether the candidate
// already has a non-voting replica, if so it may be necessary to
// promote this candidate to a voter, in order to satisfy a voter
// constraint (assume there are already a correct number of voting and
// non-voting replicas).
for _, repl := range existingNonVotingReplicas {
if store.StoreID == repl.StoreID {
if repl.Type == roachpb.NON_VOTER {
promotionCandidate = true
break
}
}
}
} else {
log.KvDistribution.Fatalf(ctx,
"unsupported targetReplicaType: %v", targetType)
}

if exempted {
continue
}
Expand All @@ -1604,20 +1646,20 @@ func rankedCandidateListForRebalancing(
// this stage, in additon to hard checks and validation.
// TODO(kvoli,ayushshah15): Refactor this to make it harder to
// inadvertently break the invariant above,
constraintsOK, necessary := rebalanceConstraintsChecker(store, existing.store)
constraintsOK, necessary, voterNecessary := rebalanceConstraintsChecker(store, existing.store)
diversityScore := diversityRebalanceFromScore(
store, existing.store.StoreID, existingStoreLocalities)
cand := candidate{
store: store,
valid: constraintsOK,
necessary: necessary,
voterNecessary: promotionCandidate && voterNecessary,
fullDisk: !options.getDiskOptions().maxCapacityCheck(store),
diversityScore: diversityScore,
}
if !cand.less(existing) {
// If `cand` is not worse than `existing`, add it to the list.
comparableCands = append(comparableCands, cand)

if !needRebalanceFrom && !needRebalanceTo && existing.less(cand) {
needRebalanceTo = true
log.KvDistribution.VEventf(ctx, 2,
Expand Down Expand Up @@ -1844,7 +1886,9 @@ type constraintsCheckFn func(roachpb.StoreDescriptor) (valid, necessary bool)
// rebalanceConstraintsCheckFn determines whether `toStore` is a valid and/or
// necessary replacement candidate for `fromStore` (which must contain an
// existing replica).
type rebalanceConstraintsCheckFn func(toStore, fromStore roachpb.StoreDescriptor) (valid, necessary bool)
type rebalanceConstraintsCheckFn func(toStore, fromStore roachpb.StoreDescriptor) (
valid, necessary, voterNecessary bool,
)

// voterConstraintsCheckerForAllocation returns a constraintsCheckFn that
// determines whether a candidate for a new voting replica is valid and/or
Expand Down Expand Up @@ -1919,11 +1963,16 @@ func nonVoterConstraintsCheckerForRemoval(
func voterConstraintsCheckerForRebalance(
overallConstraints, voterConstraints constraint.AnalyzedConstraints,
) rebalanceConstraintsCheckFn {
return func(toStore, fromStore roachpb.StoreDescriptor) (valid, necessary bool) {
return func(toStore, fromStore roachpb.StoreDescriptor) (valid, necessary, voterNecessary bool) {
overallConstraintsOK, necessaryOverall := rebalanceFromConstraintsCheck(toStore, fromStore, overallConstraints)
voterConstraintsOK, necessaryForVoters := rebalanceFromConstraintsCheck(toStore, fromStore, voterConstraints)

return overallConstraintsOK && voterConstraintsOK, necessaryOverall || necessaryForVoters
// If toStore is necessary to satisfy a voter constraint, whilst fromStore
// is not necessary to satisfy a voter constraint, then also include that
// this is a voterNecessary rebalance.
_, removeNecessaryForVoters := removeConstraintsCheck(fromStore, voterConstraints)
return overallConstraintsOK && voterConstraintsOK, necessaryOverall || necessaryForVoters,
necessaryForVoters && !removeNecessaryForVoters
}
}

Expand All @@ -1933,8 +1982,9 @@ func voterConstraintsCheckerForRebalance(
func nonVoterConstraintsCheckerForRebalance(
overallConstraints constraint.AnalyzedConstraints,
) rebalanceConstraintsCheckFn {
return func(toStore, fromStore roachpb.StoreDescriptor) (valid, necessary bool) {
return rebalanceFromConstraintsCheck(toStore, fromStore, overallConstraints)
return func(toStore, fromStore roachpb.StoreDescriptor) (valid, necessary, necessaryPromo bool) {
valid, necessary = rebalanceFromConstraintsCheck(toStore, fromStore, overallConstraints)
return valid, necessary, false /* voterNecessary */
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1248,6 +1248,7 @@ func TestShouldRebalanceDiversity(t *testing.T) {
rebalanceConstraintsChecker,
replicas,
nil,
VoterTarget,
existingStoreLocalities,
func(context.Context, roachpb.StoreID) bool { return true },
options,
Expand Down
Loading