Skip to content

Commit

Permalink
storage: Consider which replica will be removed when adding a replica…
Browse files Browse the repository at this point in the history
… to improve balance
  • Loading branch information
a6802739 authored and a-robinson committed Dec 20, 2017
1 parent 4893268 commit 5dded67
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 43 deletions.
78 changes: 72 additions & 6 deletions pkg/storage/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,14 @@ type RangeInfo struct {
}

func rangeInfoForRepl(repl *Replica, desc *roachpb.RangeDescriptor) RangeInfo {
writesPerSecond, _ := repl.writeStats.avgQPS()
return RangeInfo{
Desc: desc,
LogicalBytes: repl.GetMVCCStats().Total(),
WritesPerSecond: writesPerSecond,
info := RangeInfo{
Desc: desc,
LogicalBytes: repl.GetMVCCStats().Total(),
}
if writesPerSecond, dur := repl.writeStats.avgQPS(); dur >= MinStatsDuration {
info.WritesPerSecond = writesPerSecond
}
return info
}

// Allocator tries to spread replicas as evenly as possible across the stores
Expand Down Expand Up @@ -374,6 +376,25 @@ func (a *Allocator) AllocateTarget(
}
}

func (a Allocator) simulateRemoveTarget(
ctx context.Context,
targetStore roachpb.StoreID,
constraints config.Constraints,
candidates []roachpb.ReplicaDescriptor,
rangeInfo RangeInfo,
) (roachpb.ReplicaDescriptor, string, error) {
// Update statistics first
// TODO(a-robinson): This could theoretically interfere with decisions made by other goroutines,
// but as of October 2017 calls to the Allocator are mostly serialized by the ReplicateQueue
// (with the main exceptions being Scatter and the status server's allocator debug endpoint).
// Try to make this interfere less with other callers.
a.storePool.updateLocalStoreAfterRebalance(targetStore, rangeInfo, roachpb.ADD_REPLICA)
defer func() {
a.storePool.updateLocalStoreAfterRebalance(targetStore, rangeInfo, roachpb.REMOVE_REPLICA)
}()
return a.RemoveTarget(ctx, constraints, candidates, rangeInfo)
}

// RemoveTarget returns a suitable replica to remove from the provided replica
// set. It first attempts to randomly select a target from the set of stores
// that have greater than the average number of replicas. Failing that, it
Expand Down Expand Up @@ -444,7 +465,11 @@ func (a Allocator) RemoveTarget(
// rebalance. This helps prevent a stampeding herd targeting an abnormally
// under-utilized store.
func (a Allocator) RebalanceTarget(
ctx context.Context, constraints config.Constraints, rangeInfo RangeInfo, filter storeFilter,
ctx context.Context,
constraints config.Constraints,
raftStatus *raft.Status,
rangeInfo RangeInfo,
filter storeFilter,
) (*roachpb.StoreDescriptor, string) {
sl, _, _ := a.storePool.getStoreList(rangeInfo.Desc.RangeID, filter)

Expand Down Expand Up @@ -491,6 +516,37 @@ func (a Allocator) RebalanceTarget(
if target == nil {
return nil, ""
}
// We could make a simulation here to verify whether we'll remove the target we'll rebalance to.
for len(candidates) > 0 {
if raftStatus == nil || raftStatus.Progress == nil {
break
}
newReplica := roachpb.ReplicaDescriptor{
NodeID: target.store.Node.NodeID,
StoreID: target.store.StoreID,
ReplicaID: rangeInfo.Desc.NextReplicaID,
}
desc := *rangeInfo.Desc
desc.Replicas = append(desc.Replicas, newReplica)
rangeInfo.Desc = &desc

replicaCandidates := simulateFilterUnremovableReplicas(raftStatus, desc.Replicas, newReplica.ReplicaID)

removeReplica, _, err := a.simulateRemoveTarget(ctx, target.store.StoreID, constraints, replicaCandidates, rangeInfo)
if err != nil {
log.Warningf(ctx, "simulating RemoveTarget failed: %s", err)
return nil, ""
}
if removeReplica.StoreID != target.store.StoreID {
break
}
newTargets := candidates.removeCandidate(*target)
newTarget := newTargets.selectGood(a.randGen)
if newTarget == nil {
return nil, ""
}
target = newTarget
}
details, err := json.Marshal(decisionDetails{
Target: target.String(),
Existing: existingCandidates.String(),
Expand Down Expand Up @@ -877,6 +933,16 @@ func filterBehindReplicas(
return candidates
}

func simulateFilterUnremovableReplicas(
raftStatus *raft.Status,
replicas []roachpb.ReplicaDescriptor,
brandNewReplicaID roachpb.ReplicaID,
) []roachpb.ReplicaDescriptor {
status := *raftStatus
status.Progress[uint64(brandNewReplicaID)] = raft.Progress{Match: 0}
return filterUnremovableReplicas(&status, replicas, brandNewReplicaID)
}

// filterUnremovableReplicas removes any unremovable replicas from the supplied
// slice. An unremovable replicas is one which is a necessary part of the
// quorum that will result from removing 1 replica. We forgive brandNewReplicaID
Expand Down
11 changes: 11 additions & 0 deletions pkg/storage/allocator_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,17 @@ func (cl candidateList) selectBad(randGen allocatorRand) *candidate {
return worst
}

// removeCandidate remove the specified candidate from candidateList.
func (cl candidateList) removeCandidate(c candidate) candidateList {
for i := 0; i < len(cl); i++ {
if cl[i].store.StoreID == c.store.StoreID {
cl = append(cl[:i], cl[i+1:]...)
break
}
}
return cl
}

// allocateCandidates creates a candidate list of all stores that can be used
// for allocating a new replica ordered from the best to the worst. Only
// stores that meet the criteria are included in the list.
Expand Down
137 changes: 136 additions & 1 deletion pkg/storage/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/gossiputil"
"github.com/cockroachdb/cockroach/pkg/util"
Expand Down Expand Up @@ -657,6 +658,7 @@ func TestAllocatorRebalance(t *testing.T) {
result, _ := a.RebalanceTarget(
ctx,
config.Constraints{},
nil,
testRangeInfo([]roachpb.ReplicaDescriptor{{StoreID: 3}}, firstRange),
storeFilterThrottled,
)
Expand Down Expand Up @@ -685,6 +687,136 @@ func TestAllocatorRebalance(t *testing.T) {
}
}

// TestAllocatorRebalanceTarget could help us to verify whether we'll rebalance to a target that
// we'll immediately remove.
func TestAllocatorRebalanceTarget(t *testing.T) {
defer leaktest.AfterTest(t)()
manual := hlc.NewManualClock(123)
clock := hlc.NewClock(manual.UnixNano, time.Nanosecond)
stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false)
defer stopper.Stop(context.Background())
// We make 4 stores in this test, store1 and store2 are in the same datacenter a,
// store3 in the datacenter b, store4 in the datacenter c. all of our replicas are
// distributed within these three datacenters. Originally, store4 has much more replicas
// than other stores. So if we didn't make the simulation in RebalanceTarget, it will
// try to choose store2 as the target store to make an rebalance.
// However, we'll immediately remove the replica on the store2 to guarantee the locality diversity.
// But after we make the simulation in RebalanceTarget, we could avoid that happen.
stores := []*roachpb.StoreDescriptor{
{
StoreID: 1,
Node: roachpb.NodeDescriptor{
NodeID: 1,
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{
{Key: "datacenter", Value: "a"},
},
},
},
Capacity: roachpb.StoreCapacity{
RangeCount: 50,
},
},
{
StoreID: 2,
Node: roachpb.NodeDescriptor{
NodeID: 2,
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{
{Key: "datacenter", Value: "a"},
},
},
},
Capacity: roachpb.StoreCapacity{
RangeCount: 56,
},
},
{
StoreID: 3,
Node: roachpb.NodeDescriptor{
NodeID: 3,
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{
{Key: "datacenter", Value: "b"},
},
},
},
Capacity: roachpb.StoreCapacity{
RangeCount: 55,
},
},
{
StoreID: 4,
Node: roachpb.NodeDescriptor{
NodeID: 4,
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{
{Key: "datacenter", Value: "c"},
},
},
},
Capacity: roachpb.StoreCapacity{
RangeCount: 150,
},
},
}
sg := gossiputil.NewStoreGossiper(g)
sg.GossipStores(stores, t)

st := a.storePool.st
EnableStatsBasedRebalancing.Override(&st.SV, false)
replicas := []roachpb.ReplicaDescriptor{
{
StoreID: 1,
NodeID: 1,
},
{
StoreID: 3,
NodeID: 3,
},
{
StoreID: 4,
NodeID: 4,
},
}
repl := &Replica{RangeID: firstRange}

repl.mu.Lock()
repl.mu.state.Stats = &enginepb.MVCCStats{}
repl.mu.Unlock()

rs := newReplicaStats(clock, nil)
repl.writeStats = rs

desc := &roachpb.RangeDescriptor{
Replicas: replicas,
RangeID: firstRange,
}

rangeInfo := rangeInfoForRepl(repl, desc)

status := &raft.Status{
Progress: make(map[uint64]raft.Progress),
}
for _, replica := range replicas {
status.Progress[uint64(replica.NodeID)] = raft.Progress{
Match: 10,
}
}
for i := 0; i < 10; i++ {
result, _ := a.RebalanceTarget(
context.Background(),
config.Constraints{},
status,
rangeInfo,
storeFilterThrottled,
)
if result != nil {
t.Errorf("expected no rebalance, but got %d.", result.StoreID)
}
}
}

func TestAllocatorRebalanceDeadNodes(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down Expand Up @@ -754,7 +886,7 @@ func TestAllocatorRebalanceDeadNodes(t *testing.T) {
for _, c := range testCases {
t.Run("", func(t *testing.T) {
result, _ := a.RebalanceTarget(
ctx, config.Constraints{}, testRangeInfo(c.existing, firstRange), storeFilterThrottled)
ctx, config.Constraints{}, nil, testRangeInfo(c.existing, firstRange), storeFilterThrottled)
if c.expected > 0 {
if result == nil {
t.Fatalf("expected %d, but found nil", c.expected)
Expand Down Expand Up @@ -949,6 +1081,7 @@ func TestAllocatorRebalanceByCount(t *testing.T) {
result, _ := a.RebalanceTarget(
ctx,
config.Constraints{},
nil,
testRangeInfo([]roachpb.ReplicaDescriptor{{StoreID: stores[0].StoreID}}, firstRange),
storeFilterThrottled,
)
Expand Down Expand Up @@ -2552,6 +2685,7 @@ func TestAllocatorRebalanceAway(t *testing.T) {
actual, _ := a.RebalanceTarget(
ctx,
constraints,
nil,
testRangeInfo(existingReplicas, firstRange),
storeFilterThrottled,
)
Expand Down Expand Up @@ -2672,6 +2806,7 @@ func Example_rebalancing() {
target, _ := alloc.RebalanceTarget(
context.Background(),
config.Constraints{},
nil,
testRangeInfo([]roachpb.ReplicaDescriptor{{NodeID: ts.Node.NodeID, StoreID: ts.StoreID}}, firstRange),
storeFilterThrottled,
)
Expand Down
10 changes: 6 additions & 4 deletions pkg/storage/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (rq *replicateQueue) shouldQueue(
return false, 0
}

target, _ := rq.allocator.RebalanceTarget(ctx, zone.Constraints, rangeInfo, storeFilterThrottled)
target, _ := rq.allocator.RebalanceTarget(ctx, zone.Constraints, repl.RaftStatus(), rangeInfo, storeFilterThrottled)
if target != nil {
log.VEventf(ctx, 2, "rebalance target found, enqueuing")
} else {
Expand Down Expand Up @@ -485,7 +485,7 @@ func (rq *replicateQueue) processOneChange(

if !rq.store.TestingKnobs().DisableReplicaRebalancing {
rebalanceStore, details := rq.allocator.RebalanceTarget(
ctx, zone.Constraints, rangeInfo, storeFilterThrottled)
ctx, zone.Constraints, repl.RaftStatus(), rangeInfo, storeFilterThrottled)
if rebalanceStore == nil {
log.VEventf(ctx, 1, "no suitable rebalance target")
} else {
Expand Down Expand Up @@ -575,7 +575,8 @@ func (rq *replicateQueue) addReplica(
if err := repl.changeReplicas(ctx, roachpb.ADD_REPLICA, target, desc, priority, reason, details); err != nil {
return err
}
rq.allocator.storePool.updateLocalStoreAfterRebalance(target.StoreID, repl, roachpb.ADD_REPLICA)
rangeInfo := rangeInfoForRepl(repl, desc)
rq.allocator.storePool.updateLocalStoreAfterRebalance(target.StoreID, rangeInfo, roachpb.ADD_REPLICA)
return nil
}

Expand All @@ -594,7 +595,8 @@ func (rq *replicateQueue) removeReplica(
if err := repl.ChangeReplicas(ctx, roachpb.REMOVE_REPLICA, target, desc, reason, details); err != nil {
return err
}
rq.allocator.storePool.updateLocalStoreAfterRebalance(target.StoreID, repl, roachpb.REMOVE_REPLICA)
rangeInfo := rangeInfoForRepl(repl, desc)
rq.allocator.storePool.updateLocalStoreAfterRebalance(target.StoreID, rangeInfo, roachpb.REMOVE_REPLICA)
return nil
}

Expand Down
23 changes: 9 additions & 14 deletions pkg/storage/store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func (sp *StorePool) deadReplicasGossipUpdate(_ string, content roachpb.Value) {
// updateLocalStoreAfterRebalance is used to update the local copy of the
// target store immediately after a rebalance.
func (sp *StorePool) updateLocalStoreAfterRebalance(
storeID roachpb.StoreID, repl *Replica, changeType roachpb.ReplicaChangeType,
storeID roachpb.StoreID, rangeInfo RangeInfo, changeType roachpb.ReplicaChangeType,
) {
sp.detailsMu.Lock()
defer sp.detailsMu.Unlock()
Expand All @@ -343,23 +343,18 @@ func (sp *StorePool) updateLocalStoreAfterRebalance(
}
switch changeType {
case roachpb.ADD_REPLICA:
detail.desc.Capacity.LogicalBytes += repl.GetMVCCStats().Total()
if qps, dur := repl.writeStats.avgQPS(); dur >= MinStatsDuration {
detail.desc.Capacity.WritesPerSecond += qps
}
detail.desc.Capacity.LogicalBytes += rangeInfo.LogicalBytes
detail.desc.Capacity.WritesPerSecond += rangeInfo.WritesPerSecond
case roachpb.REMOVE_REPLICA:
total := repl.GetMVCCStats().Total()
if detail.desc.Capacity.LogicalBytes <= total {
if detail.desc.Capacity.LogicalBytes <= rangeInfo.LogicalBytes {
detail.desc.Capacity.LogicalBytes = 0
} else {
detail.desc.Capacity.LogicalBytes -= total
detail.desc.Capacity.LogicalBytes -= rangeInfo.LogicalBytes
}
if qps, dur := repl.writeStats.avgQPS(); dur >= MinStatsDuration {
if detail.desc.Capacity.WritesPerSecond <= qps {
detail.desc.Capacity.WritesPerSecond = 0
} else {
detail.desc.Capacity.WritesPerSecond -= qps
}
if detail.desc.Capacity.WritesPerSecond <= rangeInfo.WritesPerSecond {
detail.desc.Capacity.WritesPerSecond = 0
} else {
detail.desc.Capacity.WritesPerSecond -= rangeInfo.WritesPerSecond
}
}
sp.detailsMu.storeDetails[storeID] = &detail
Expand Down
Loading

0 comments on commit 5dded67

Please sign in to comment.