Skip to content

Commit

Permalink
Merge pull request #20934 from a-robinson/cherrypick_20241
Browse files Browse the repository at this point in the history
cherrypick-1.1: storage: avoid replica thrashing when localities are different sizes
  • Loading branch information
a-robinson authored Dec 21, 2017
2 parents 2244045 + f01eb24 commit 2cded8b
Show file tree
Hide file tree
Showing 6 changed files with 517 additions and 11 deletions.
132 changes: 126 additions & 6 deletions pkg/storage/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,14 @@ type RangeInfo struct {
}

func rangeInfoForRepl(repl *Replica, desc *roachpb.RangeDescriptor) RangeInfo {
writesPerSecond, _ := repl.writeStats.avgQPS()
return RangeInfo{
Desc: desc,
LogicalBytes: repl.GetMVCCStats().Total(),
WritesPerSecond: writesPerSecond,
info := RangeInfo{
Desc: desc,
LogicalBytes: repl.GetMVCCStats().Total(),
}
if writesPerSecond, dur := repl.writeStats.avgQPS(); dur >= MinStatsDuration {
info.WritesPerSecond = writesPerSecond
}
return info
}

// Allocator tries to spread replicas as evenly as possible across the stores
Expand Down Expand Up @@ -374,6 +376,25 @@ func (a *Allocator) AllocateTarget(
}
}

func (a Allocator) simulateRemoveTarget(
ctx context.Context,
targetStore roachpb.StoreID,
constraints config.Constraints,
candidates []roachpb.ReplicaDescriptor,
rangeInfo RangeInfo,
) (roachpb.ReplicaDescriptor, string, error) {
// Update statistics first
// TODO(a-robinson): This could theoretically interfere with decisions made by other goroutines,
// but as of October 2017 calls to the Allocator are mostly serialized by the ReplicateQueue
// (with the main exceptions being Scatter and the status server's allocator debug endpoint).
// Try to make this interfere less with other callers.
a.storePool.updateLocalStoreAfterRebalance(targetStore, rangeInfo, roachpb.ADD_REPLICA)
defer func() {
a.storePool.updateLocalStoreAfterRebalance(targetStore, rangeInfo, roachpb.REMOVE_REPLICA)
}()
return a.RemoveTarget(ctx, constraints, candidates, rangeInfo)
}

// RemoveTarget returns a suitable replica to remove from the provided replica
// set. It first attempts to randomly select a target from the set of stores
// that have greater than the average number of replicas. Failing that, it
Expand Down Expand Up @@ -444,7 +465,11 @@ func (a Allocator) RemoveTarget(
// rebalance. This helps prevent a stampeding herd targeting an abnormally
// under-utilized store.
func (a Allocator) RebalanceTarget(
ctx context.Context, constraints config.Constraints, rangeInfo RangeInfo, filter storeFilter,
ctx context.Context,
constraints config.Constraints,
raftStatus *raft.Status,
rangeInfo RangeInfo,
filter storeFilter,
) (*roachpb.StoreDescriptor, string) {
sl, _, _ := a.storePool.getStoreList(rangeInfo.Desc.RangeID, filter)

Expand Down Expand Up @@ -491,6 +516,53 @@ func (a Allocator) RebalanceTarget(
if target == nil {
return nil, ""
}

// Determine whether we'll just remove the target immediately after adding it.
// If we would, we don't want to actually do the rebalance.
for len(candidates) > 0 {
newReplica := roachpb.ReplicaDescriptor{
NodeID: target.store.Node.NodeID,
StoreID: target.store.StoreID,
ReplicaID: rangeInfo.Desc.NextReplicaID,
}

// Deep-copy the Replicas slice since we'll mutate it.
desc := *rangeInfo.Desc
desc.Replicas = append(desc.Replicas[:len(desc.Replicas):len(desc.Replicas)], newReplica)
rangeInfo.Desc = &desc

// If we can, filter replicas as we would if we were actually removing one.
// If we can't (e.g. because we're the leaseholder but not the raft leader),
// it's better to simulate the removal with the info that we do have than to
// assume that the rebalance is ok (#20241).
replicaCandidates := desc.Replicas
if raftStatus != nil && raftStatus.Progress != nil {
replicaCandidates = simulateFilterUnremovableReplicas(
raftStatus, desc.Replicas, newReplica.ReplicaID)
}

removeReplica, details, err := a.simulateRemoveTarget(
ctx,
target.store.StoreID,
constraints,
replicaCandidates,
rangeInfo)
if err != nil {
log.Warningf(ctx, "simulating RemoveTarget failed: %s", err)
return nil, ""
}
if shouldRebalanceBetween(ctx, a.storePool.st, *target, removeReplica, existingCandidates, details) {
break
}
// Remove the considered target from our modified RangeDescriptor and from
// the candidates list, then try again if there are any other candidates.
rangeInfo.Desc.Replicas = rangeInfo.Desc.Replicas[:len(rangeInfo.Desc.Replicas)-1]
candidates = candidates.removeCandidate(*target)
target = candidates.selectGood(a.randGen)
if target == nil {
return nil, ""
}
}
details, err := json.Marshal(decisionDetails{
Target: target.String(),
Existing: existingCandidates.String(),
Expand All @@ -503,6 +575,44 @@ func (a Allocator) RebalanceTarget(
return &target.store, string(details)
}

// shouldRebalanceBetween returns whether it's a good idea to rebalance to the
// given `add` candidate if the replica that will be removed after adding it is
// `remove`. This is a last failsafe to ensure that we don't take unnecessary
// rebalance actions that cause thrashing.
func shouldRebalanceBetween(
ctx context.Context,
st *cluster.Settings,
add candidate,
remove roachpb.ReplicaDescriptor,
existingCandidates candidateList,
removeDetails string,
) bool {
if remove.StoreID == add.store.StoreID {
log.VEventf(ctx, 2, "not rebalancing to s%d because we'd immediately remove it: %s",
add.store.StoreID, removeDetails)
return false
}

// It's possible that we initially decided to rebalance based on comparing
// rebalance candidates in one locality to an existing replica in another
// locality (e.g. if one locality has many more nodes than another). This can
// make for unnecessary rebalances and even thrashing, so do a more direct
// comparison here of the replicas we'll actually be adding and removing.
for _, removeCandidate := range existingCandidates {
if removeCandidate.store.StoreID == remove.StoreID {
if removeCandidate.worthRebalancingTo(st, add) {
return true
}
log.VEventf(ctx, 2, "not rebalancing to %s because it isn't an improvement over "+
"what we'd remove after adding it: %s", add, removeCandidate)
return false
}
}
// If the code reaches this point, remove must be a non-live store, so let the
// rebalance happen.
return true
}

// TransferLeaseTarget returns a suitable replica to transfer the range lease
// to from the provided list. It excludes the current lease holder replica
// unless asked to do otherwise by the checkTransferLeaseSource parameter.
Expand Down Expand Up @@ -877,6 +987,16 @@ func filterBehindReplicas(
return candidates
}

func simulateFilterUnremovableReplicas(
raftStatus *raft.Status,
replicas []roachpb.ReplicaDescriptor,
brandNewReplicaID roachpb.ReplicaID,
) []roachpb.ReplicaDescriptor {
status := *raftStatus
status.Progress[uint64(brandNewReplicaID)] = raft.Progress{Match: 0}
return filterUnremovableReplicas(&status, replicas, brandNewReplicaID)
}

// filterUnremovableReplicas removes any unremovable replicas from the supplied
// slice. An unremovable replicas is one which is a necessary part of the
// quorum that will result from removing 1 replica. We forgive brandNewReplicaID
Expand Down
46 changes: 46 additions & 0 deletions pkg/storage/allocator_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,41 @@ func (c candidate) less(o candidate) bool {
return c.rangeCount > o.rangeCount
}

// worthRebalancingTo returns true if o is enough of a better fit for some
// range than c is that it's worth rebalancing from c to o.
func (c candidate) worthRebalancingTo(st *cluster.Settings, o candidate) bool {
if !o.valid {
return false
}
if !c.valid {
return true
}
if c.constraintScore != o.constraintScore {
return c.constraintScore < o.constraintScore
}
if c.convergesScore != o.convergesScore {
return c.convergesScore < o.convergesScore
}
// You might intuitively think that we should require o's balanceScore to
// be considerably higher than c's balanceScore, but that will effectively
// rule out rebalancing in clusters where one locality is much larger or
// smaller than the others, since all the stores in that locality will tend
// to have either a maximal or minimal balanceScore.
if c.balanceScore.totalScore() != o.balanceScore.totalScore() {
return c.balanceScore.totalScore() < o.balanceScore.totalScore()
}
// Instead, just require a gap between their number of ranges. This isn't
// great, particularly for stats-based rebalancing, but it only breaks
// balanceScore ties and it's a workable stop-gap on the way to something
// like #20751.
avgRangeCount := float64(c.rangeCount+o.rangeCount) / 2.0
// Use an overfullThreshold that is at least a couple replicas larger than
// the average of the two, to ensure that we don't keep rebalancing back
// and forth between nodes that only differ by one or two replicas.
overfullThreshold := math.Max(overfullRangeThreshold(st, avgRangeCount), avgRangeCount+1.5)
return float64(c.rangeCount) > overfullThreshold
}

type candidateList []candidate

func (cl candidateList) String() string {
Expand Down Expand Up @@ -285,6 +320,17 @@ func (cl candidateList) selectBad(randGen allocatorRand) *candidate {
return worst
}

// removeCandidate remove the specified candidate from candidateList.
func (cl candidateList) removeCandidate(c candidate) candidateList {
for i := 0; i < len(cl); i++ {
if cl[i].store.StoreID == c.store.StoreID {
cl = append(cl[:i], cl[i+1:]...)
break
}
}
return cl
}

// allocateCandidates creates a candidate list of all stores that can be used
// for allocating a new replica ordered from the best to the worst. Only
// stores that meet the criteria are included in the list.
Expand Down
Loading

0 comments on commit 2cded8b

Please sign in to comment.