From 6ef56ae8cac5853f956b55ccfbf57ee48c6524f3 Mon Sep 17 00:00:00 2001 From: a6802739 Date: Mon, 11 Sep 2017 16:42:48 +0800 Subject: [PATCH 1/8] storage: immediately update target store write stats after rebalance --- pkg/storage/replicate_queue.go | 12 ++++- pkg/storage/store_pool.go | 32 +++++++++++++ pkg/storage/store_pool_test.go | 87 ++++++++++++++++++++++++++++++++++ 3 files changed, 129 insertions(+), 2 deletions(-) diff --git a/pkg/storage/replicate_queue.go b/pkg/storage/replicate_queue.go index be08c229fd0e..f873ce0f3350 100644 --- a/pkg/storage/replicate_queue.go +++ b/pkg/storage/replicate_queue.go @@ -572,7 +572,11 @@ func (rq *replicateQueue) addReplica( if dryRun { return nil } - return repl.changeReplicas(ctx, roachpb.ADD_REPLICA, target, desc, priority, reason, details) + if err := repl.changeReplicas(ctx, roachpb.ADD_REPLICA, target, desc, priority, reason, details); err != nil { + return err + } + rq.allocator.storePool.updateLocalStoreAfterRebalance(target.StoreID, repl, roachpb.ADD_REPLICA) + return nil } func (rq *replicateQueue) removeReplica( @@ -587,7 +591,11 @@ func (rq *replicateQueue) removeReplica( if dryRun { return nil } - return repl.ChangeReplicas(ctx, roachpb.REMOVE_REPLICA, target, desc, reason, details) + if err := repl.ChangeReplicas(ctx, roachpb.REMOVE_REPLICA, target, desc, reason, details); err != nil { + return err + } + rq.allocator.storePool.updateLocalStoreAfterRebalance(target.StoreID, repl, roachpb.REMOVE_REPLICA) + return nil } func (rq *replicateQueue) canTransferLease() bool { diff --git a/pkg/storage/store_pool.go b/pkg/storage/store_pool.go index be72d026848d..3dd010a09544 100644 --- a/pkg/storage/store_pool.go +++ b/pkg/storage/store_pool.go @@ -327,6 +327,38 @@ func (sp *StorePool) deadReplicasGossipUpdate(_ string, content roachpb.Value) { detail.deadReplicas = deadReplicas } +// updateLocalStoreAfterRebalance is used to update local copy of target store +// after we make an rebalance immediately. +func (sp *StorePool) updateLocalStoreAfterRebalance( + storeID roachpb.StoreID, repl *Replica, changeType roachpb.ReplicaChangeType, +) { + sp.detailsMu.Lock() + defer sp.detailsMu.Unlock() + detail := *sp.getStoreDetailLocked(storeID) + switch changeType { + case roachpb.ADD_REPLICA: + detail.desc.Capacity.LogicalBytes += repl.GetMVCCStats().Total() + if qps, dur := repl.writeStats.avgQPS(); dur >= MinStatsDuration { + detail.desc.Capacity.WritesPerSecond += qps + } + case roachpb.REMOVE_REPLICA: + total := repl.GetMVCCStats().Total() + if detail.desc.Capacity.LogicalBytes <= total { + detail.desc.Capacity.LogicalBytes = 0 + } else { + detail.desc.Capacity.LogicalBytes -= total + } + if qps, dur := repl.writeStats.avgQPS(); dur >= MinStatsDuration { + if detail.desc.Capacity.WritesPerSecond <= qps { + detail.desc.Capacity.WritesPerSecond = 0 + } else { + detail.desc.Capacity.WritesPerSecond -= qps + } + } + } + sp.detailsMu.storeDetails[storeID] = &detail +} + // newStoreDetail makes a new storeDetail struct. It sets index to be -1 to // ensure that it will be processed by a queue immediately. func newStoreDetail() *storeDetail { diff --git a/pkg/storage/store_pool_test.go b/pkg/storage/store_pool_test.go index 33f1f8cd303f..60559137bf16 100644 --- a/pkg/storage/store_pool_test.go +++ b/pkg/storage/store_pool_test.go @@ -21,6 +21,9 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/pkg/errors" "golang.org/x/net/context" @@ -318,6 +321,90 @@ func TestStorePoolGetStoreList(t *testing.T) { } } +func TestStorePoolUpdateLocalStore(t *testing.T) { + defer leaktest.AfterTest(t)() + manual := hlc.NewManualClock(123) + clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) + // We're going to manually mark stores dead in this test. + stopper, g, _, sp, _ := createTestStorePool( + TestTimeUntilStoreDead, false /* deterministic */, nodeStatusDead) + defer stopper.Stop(context.TODO()) + sg := gossiputil.NewStoreGossiper(g) + stores := []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{ + Capacity: 100, + Available: 50, + RangeCount: 5, + LogicalBytes: 30, + WritesPerSecond: 30, + }, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Capacity: roachpb.StoreCapacity{ + Capacity: 100, + Available: 55, + RangeCount: 4, + LogicalBytes: 25, + WritesPerSecond: 25, + }, + }, + } + sg.GossipStores(stores, t) + node := roachpb.NodeDescriptor{NodeID: roachpb.NodeID(1)} + eng := engine.NewInMem(roachpb.Attributes{}, 1<<20) + stopper.AddCloser(eng) + cfg := TestStoreConfig(clock) + cfg.Transport = NewDummyRaftTransport(cfg.Settings) + store := NewStore(cfg, eng, &node) + rg := roachpb.RangeDescriptor{ + RangeID: 1, + StartKey: roachpb.RKey([]byte("a")), + EndKey: roachpb.RKey([]byte("b")), + } + replica, err := NewReplica(&rg, store, roachpb.ReplicaID(0)) + if err != nil { + t.Fatalf("make replica error : %s", err) + } + replica.mu.Lock() + replica.mu.state.Stats = enginepb.MVCCStats{ + KeyBytes: 2, + ValBytes: 4, + } + replica.mu.Unlock() + rs := newReplicaStats(clock, nil) + for _, store := range stores { + rs.record(store.Node.NodeID) + } + manual.Increment(int64(MinStatsDuration + time.Second)) + replica.writeStats = rs + + sp.updateLocalStoreAfterRebalance(roachpb.StoreID(1), replica, roachpb.ADD_REPLICA) + desc, ok := sp.getStoreDescriptor(roachpb.StoreID(1)) + if !ok { + t.Fatalf("couldn't find StoreDescriptor for Store ID %d", 1) + } + QPS, _ := replica.writeStats.avgQPS() + if expectedBytes, expectedQPS := int64(36), 30+QPS; desc.Capacity.LogicalBytes != expectedBytes || desc.Capacity.WritesPerSecond != expectedQPS { + t.Fatalf("expected Logical bytes %d, but got %d, expected WritesPerSecond %f, but got %f", + expectedBytes, desc.Capacity.LogicalBytes, expectedQPS, desc.Capacity.WritesPerSecond) + } + + sp.updateLocalStoreAfterRebalance(roachpb.StoreID(2), replica, roachpb.REMOVE_REPLICA) + desc, ok = sp.getStoreDescriptor(roachpb.StoreID(2)) + if !ok { + t.Fatalf("couldn't find StoreDescriptor for Store ID %d", 2) + } + if expectedBytes, expectedQPS := int64(19), 25-QPS; desc.Capacity.LogicalBytes != expectedBytes || desc.Capacity.WritesPerSecond != expectedQPS { + t.Fatalf("expected Logical bytes %d, but got %d, expected WritesPerSecond %f, but got %f", + expectedBytes, desc.Capacity.LogicalBytes, expectedQPS, desc.Capacity.WritesPerSecond) + } +} + func TestStorePoolGetStoreDetails(t *testing.T) { defer leaktest.AfterTest(t)() stopper, g, _, sp, _ := createTestStorePool( From 489326825997fd76903202f4708cc2095ae0200c Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Sun, 8 Oct 2017 16:52:22 -0400 Subject: [PATCH 2/8] storage: updating local Store before gossip should not crash Updating a target store write stats immediately after rebalancing was recently addressed in #18425. With that change, if `updateLocalStoreAfterRebalance` is called before the `StorePool` had seen the `StoreDescriptor` in gossip, it will trigger a NPE. This change fixes this by making the update a no-op if the descriptor has yet to be seen in gossip. --- pkg/storage/store_pool.go | 10 ++++++-- pkg/storage/store_pool_test.go | 45 +++++++++++++++++++++++++++++++--- 2 files changed, 50 insertions(+), 5 deletions(-) diff --git a/pkg/storage/store_pool.go b/pkg/storage/store_pool.go index 3dd010a09544..001ce5ac9c06 100644 --- a/pkg/storage/store_pool.go +++ b/pkg/storage/store_pool.go @@ -327,14 +327,20 @@ func (sp *StorePool) deadReplicasGossipUpdate(_ string, content roachpb.Value) { detail.deadReplicas = deadReplicas } -// updateLocalStoreAfterRebalance is used to update local copy of target store -// after we make an rebalance immediately. +// updateLocalStoreAfterRebalance is used to update the local copy of the +// target store immediately after a rebalance. func (sp *StorePool) updateLocalStoreAfterRebalance( storeID roachpb.StoreID, repl *Replica, changeType roachpb.ReplicaChangeType, ) { sp.detailsMu.Lock() defer sp.detailsMu.Unlock() detail := *sp.getStoreDetailLocked(storeID) + if detail.desc == nil { + // We don't have this store yet (this is normal when we're + // starting up and don't have full information from the gossip + // network). We can't update the local store at this time. + return + } switch changeType { case roachpb.ADD_REPLICA: detail.desc.Capacity.LogicalBytes += repl.GetMVCCStats().Total() diff --git a/pkg/storage/store_pool_test.go b/pkg/storage/store_pool_test.go index 60559137bf16..4829caa22bce 100644 --- a/pkg/storage/store_pool_test.go +++ b/pkg/storage/store_pool_test.go @@ -21,9 +21,6 @@ import ( "testing" "time" - "github.com/cockroachdb/cockroach/pkg/storage/engine" - "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" - "github.com/pkg/errors" "golang.org/x/net/context" @@ -33,6 +30,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -405,6 +404,46 @@ func TestStorePoolUpdateLocalStore(t *testing.T) { } } +// TestStorePoolUpdateLocalStoreBeforeGossip verifies that an attempt to update +// the local copy of store before that store has been gossiped will be a no-op. +func TestStorePoolUpdateLocalStoreBeforeGossip(t *testing.T) { + defer leaktest.AfterTest(t)() + manual := hlc.NewManualClock(123) + clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) + stopper, _, _, sp, _ := createTestStorePool( + TestTimeUntilStoreDead, false /* deterministic */, nodeStatusDead) + defer stopper.Stop(context.TODO()) + + // Create store. + node := roachpb.NodeDescriptor{NodeID: roachpb.NodeID(1)} + eng := engine.NewInMem(roachpb.Attributes{}, 1<<20) + stopper.AddCloser(eng) + cfg := TestStoreConfig(clock) + cfg.Transport = NewDummyRaftTransport(cfg.Settings) + store := NewStore(cfg, eng, &node) + + // Create replica. + rg := roachpb.RangeDescriptor{ + RangeID: 1, + StartKey: roachpb.RKey([]byte("a")), + EndKey: roachpb.RKey([]byte("b")), + } + replica, err := NewReplica(&rg, store, roachpb.ReplicaID(0)) + if err != nil { + t.Fatalf("make replica error : %s", err) + } + + // Update StorePool, which should be a no-op. + storeID := roachpb.StoreID(1) + if _, ok := sp.getStoreDescriptor(storeID); ok { + t.Fatalf("StoreDescriptor not gossiped, should not be found") + } + sp.updateLocalStoreAfterRebalance(storeID, replica, roachpb.ADD_REPLICA) + if _, ok := sp.getStoreDescriptor(storeID); ok { + t.Fatalf("StoreDescriptor still not gossiped, should not be found") + } +} + func TestStorePoolGetStoreDetails(t *testing.T) { defer leaktest.AfterTest(t)() stopper, g, _, sp, _ := createTestStorePool( From 5dded67dd86919be3555e8a7f938509df4ccaa50 Mon Sep 17 00:00:00 2001 From: a6802739 Date: Fri, 8 Sep 2017 16:33:50 +0800 Subject: [PATCH 3/8] storage: Consider which replica will be removed when adding a replica to improve balance --- pkg/storage/allocator.go | 78 ++++++++++++++++-- pkg/storage/allocator_scorer.go | 11 +++ pkg/storage/allocator_test.go | 137 +++++++++++++++++++++++++++++++- pkg/storage/replicate_queue.go | 10 ++- pkg/storage/store_pool.go | 23 +++--- pkg/storage/store_pool_test.go | 31 +++----- 6 files changed, 247 insertions(+), 43 deletions(-) diff --git a/pkg/storage/allocator.go b/pkg/storage/allocator.go index ba9fda281ba9..39c06edbb04f 100644 --- a/pkg/storage/allocator.go +++ b/pkg/storage/allocator.go @@ -184,12 +184,14 @@ type RangeInfo struct { } func rangeInfoForRepl(repl *Replica, desc *roachpb.RangeDescriptor) RangeInfo { - writesPerSecond, _ := repl.writeStats.avgQPS() - return RangeInfo{ - Desc: desc, - LogicalBytes: repl.GetMVCCStats().Total(), - WritesPerSecond: writesPerSecond, + info := RangeInfo{ + Desc: desc, + LogicalBytes: repl.GetMVCCStats().Total(), } + if writesPerSecond, dur := repl.writeStats.avgQPS(); dur >= MinStatsDuration { + info.WritesPerSecond = writesPerSecond + } + return info } // Allocator tries to spread replicas as evenly as possible across the stores @@ -374,6 +376,25 @@ func (a *Allocator) AllocateTarget( } } +func (a Allocator) simulateRemoveTarget( + ctx context.Context, + targetStore roachpb.StoreID, + constraints config.Constraints, + candidates []roachpb.ReplicaDescriptor, + rangeInfo RangeInfo, +) (roachpb.ReplicaDescriptor, string, error) { + // Update statistics first + // TODO(a-robinson): This could theoretically interfere with decisions made by other goroutines, + // but as of October 2017 calls to the Allocator are mostly serialized by the ReplicateQueue + // (with the main exceptions being Scatter and the status server's allocator debug endpoint). + // Try to make this interfere less with other callers. + a.storePool.updateLocalStoreAfterRebalance(targetStore, rangeInfo, roachpb.ADD_REPLICA) + defer func() { + a.storePool.updateLocalStoreAfterRebalance(targetStore, rangeInfo, roachpb.REMOVE_REPLICA) + }() + return a.RemoveTarget(ctx, constraints, candidates, rangeInfo) +} + // RemoveTarget returns a suitable replica to remove from the provided replica // set. It first attempts to randomly select a target from the set of stores // that have greater than the average number of replicas. Failing that, it @@ -444,7 +465,11 @@ func (a Allocator) RemoveTarget( // rebalance. This helps prevent a stampeding herd targeting an abnormally // under-utilized store. func (a Allocator) RebalanceTarget( - ctx context.Context, constraints config.Constraints, rangeInfo RangeInfo, filter storeFilter, + ctx context.Context, + constraints config.Constraints, + raftStatus *raft.Status, + rangeInfo RangeInfo, + filter storeFilter, ) (*roachpb.StoreDescriptor, string) { sl, _, _ := a.storePool.getStoreList(rangeInfo.Desc.RangeID, filter) @@ -491,6 +516,37 @@ func (a Allocator) RebalanceTarget( if target == nil { return nil, "" } + // We could make a simulation here to verify whether we'll remove the target we'll rebalance to. + for len(candidates) > 0 { + if raftStatus == nil || raftStatus.Progress == nil { + break + } + newReplica := roachpb.ReplicaDescriptor{ + NodeID: target.store.Node.NodeID, + StoreID: target.store.StoreID, + ReplicaID: rangeInfo.Desc.NextReplicaID, + } + desc := *rangeInfo.Desc + desc.Replicas = append(desc.Replicas, newReplica) + rangeInfo.Desc = &desc + + replicaCandidates := simulateFilterUnremovableReplicas(raftStatus, desc.Replicas, newReplica.ReplicaID) + + removeReplica, _, err := a.simulateRemoveTarget(ctx, target.store.StoreID, constraints, replicaCandidates, rangeInfo) + if err != nil { + log.Warningf(ctx, "simulating RemoveTarget failed: %s", err) + return nil, "" + } + if removeReplica.StoreID != target.store.StoreID { + break + } + newTargets := candidates.removeCandidate(*target) + newTarget := newTargets.selectGood(a.randGen) + if newTarget == nil { + return nil, "" + } + target = newTarget + } details, err := json.Marshal(decisionDetails{ Target: target.String(), Existing: existingCandidates.String(), @@ -877,6 +933,16 @@ func filterBehindReplicas( return candidates } +func simulateFilterUnremovableReplicas( + raftStatus *raft.Status, + replicas []roachpb.ReplicaDescriptor, + brandNewReplicaID roachpb.ReplicaID, +) []roachpb.ReplicaDescriptor { + status := *raftStatus + status.Progress[uint64(brandNewReplicaID)] = raft.Progress{Match: 0} + return filterUnremovableReplicas(&status, replicas, brandNewReplicaID) +} + // filterUnremovableReplicas removes any unremovable replicas from the supplied // slice. An unremovable replicas is one which is a necessary part of the // quorum that will result from removing 1 replica. We forgive brandNewReplicaID diff --git a/pkg/storage/allocator_scorer.go b/pkg/storage/allocator_scorer.go index 8d50b92f7f8d..2776cc3dcca1 100644 --- a/pkg/storage/allocator_scorer.go +++ b/pkg/storage/allocator_scorer.go @@ -285,6 +285,17 @@ func (cl candidateList) selectBad(randGen allocatorRand) *candidate { return worst } +// removeCandidate remove the specified candidate from candidateList. +func (cl candidateList) removeCandidate(c candidate) candidateList { + for i := 0; i < len(cl); i++ { + if cl[i].store.StoreID == c.store.StoreID { + cl = append(cl[:i], cl[i+1:]...) + break + } + } + return cl +} + // allocateCandidates creates a candidate list of all stores that can be used // for allocating a new replica ordered from the best to the worst. Only // stores that meet the criteria are included in the list. diff --git a/pkg/storage/allocator_test.go b/pkg/storage/allocator_test.go index 3c91b3ba0fa5..2a9ef75e2254 100644 --- a/pkg/storage/allocator_test.go +++ b/pkg/storage/allocator_test.go @@ -36,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil" "github.com/cockroachdb/cockroach/pkg/util" @@ -657,6 +658,7 @@ func TestAllocatorRebalance(t *testing.T) { result, _ := a.RebalanceTarget( ctx, config.Constraints{}, + nil, testRangeInfo([]roachpb.ReplicaDescriptor{{StoreID: 3}}, firstRange), storeFilterThrottled, ) @@ -685,6 +687,136 @@ func TestAllocatorRebalance(t *testing.T) { } } +// TestAllocatorRebalanceTarget could help us to verify whether we'll rebalance to a target that +// we'll immediately remove. +func TestAllocatorRebalanceTarget(t *testing.T) { + defer leaktest.AfterTest(t)() + manual := hlc.NewManualClock(123) + clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) + stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false) + defer stopper.Stop(context.Background()) + // We make 4 stores in this test, store1 and store2 are in the same datacenter a, + // store3 in the datacenter b, store4 in the datacenter c. all of our replicas are + // distributed within these three datacenters. Originally, store4 has much more replicas + // than other stores. So if we didn't make the simulation in RebalanceTarget, it will + // try to choose store2 as the target store to make an rebalance. + // However, we'll immediately remove the replica on the store2 to guarantee the locality diversity. + // But after we make the simulation in RebalanceTarget, we could avoid that happen. + stores := []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{ + NodeID: 1, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "datacenter", Value: "a"}, + }, + }, + }, + Capacity: roachpb.StoreCapacity{ + RangeCount: 50, + }, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{ + NodeID: 2, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "datacenter", Value: "a"}, + }, + }, + }, + Capacity: roachpb.StoreCapacity{ + RangeCount: 56, + }, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{ + NodeID: 3, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "datacenter", Value: "b"}, + }, + }, + }, + Capacity: roachpb.StoreCapacity{ + RangeCount: 55, + }, + }, + { + StoreID: 4, + Node: roachpb.NodeDescriptor{ + NodeID: 4, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "datacenter", Value: "c"}, + }, + }, + }, + Capacity: roachpb.StoreCapacity{ + RangeCount: 150, + }, + }, + } + sg := gossiputil.NewStoreGossiper(g) + sg.GossipStores(stores, t) + + st := a.storePool.st + EnableStatsBasedRebalancing.Override(&st.SV, false) + replicas := []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + }, + { + StoreID: 3, + NodeID: 3, + }, + { + StoreID: 4, + NodeID: 4, + }, + } + repl := &Replica{RangeID: firstRange} + + repl.mu.Lock() + repl.mu.state.Stats = &enginepb.MVCCStats{} + repl.mu.Unlock() + + rs := newReplicaStats(clock, nil) + repl.writeStats = rs + + desc := &roachpb.RangeDescriptor{ + Replicas: replicas, + RangeID: firstRange, + } + + rangeInfo := rangeInfoForRepl(repl, desc) + + status := &raft.Status{ + Progress: make(map[uint64]raft.Progress), + } + for _, replica := range replicas { + status.Progress[uint64(replica.NodeID)] = raft.Progress{ + Match: 10, + } + } + for i := 0; i < 10; i++ { + result, _ := a.RebalanceTarget( + context.Background(), + config.Constraints{}, + status, + rangeInfo, + storeFilterThrottled, + ) + if result != nil { + t.Errorf("expected no rebalance, but got %d.", result.StoreID) + } + } +} + func TestAllocatorRebalanceDeadNodes(t *testing.T) { defer leaktest.AfterTest(t)() @@ -754,7 +886,7 @@ func TestAllocatorRebalanceDeadNodes(t *testing.T) { for _, c := range testCases { t.Run("", func(t *testing.T) { result, _ := a.RebalanceTarget( - ctx, config.Constraints{}, testRangeInfo(c.existing, firstRange), storeFilterThrottled) + ctx, config.Constraints{}, nil, testRangeInfo(c.existing, firstRange), storeFilterThrottled) if c.expected > 0 { if result == nil { t.Fatalf("expected %d, but found nil", c.expected) @@ -949,6 +1081,7 @@ func TestAllocatorRebalanceByCount(t *testing.T) { result, _ := a.RebalanceTarget( ctx, config.Constraints{}, + nil, testRangeInfo([]roachpb.ReplicaDescriptor{{StoreID: stores[0].StoreID}}, firstRange), storeFilterThrottled, ) @@ -2552,6 +2685,7 @@ func TestAllocatorRebalanceAway(t *testing.T) { actual, _ := a.RebalanceTarget( ctx, constraints, + nil, testRangeInfo(existingReplicas, firstRange), storeFilterThrottled, ) @@ -2672,6 +2806,7 @@ func Example_rebalancing() { target, _ := alloc.RebalanceTarget( context.Background(), config.Constraints{}, + nil, testRangeInfo([]roachpb.ReplicaDescriptor{{NodeID: ts.Node.NodeID, StoreID: ts.StoreID}}, firstRange), storeFilterThrottled, ) diff --git a/pkg/storage/replicate_queue.go b/pkg/storage/replicate_queue.go index f873ce0f3350..1c39ae3d2a28 100644 --- a/pkg/storage/replicate_queue.go +++ b/pkg/storage/replicate_queue.go @@ -197,7 +197,7 @@ func (rq *replicateQueue) shouldQueue( return false, 0 } - target, _ := rq.allocator.RebalanceTarget(ctx, zone.Constraints, rangeInfo, storeFilterThrottled) + target, _ := rq.allocator.RebalanceTarget(ctx, zone.Constraints, repl.RaftStatus(), rangeInfo, storeFilterThrottled) if target != nil { log.VEventf(ctx, 2, "rebalance target found, enqueuing") } else { @@ -485,7 +485,7 @@ func (rq *replicateQueue) processOneChange( if !rq.store.TestingKnobs().DisableReplicaRebalancing { rebalanceStore, details := rq.allocator.RebalanceTarget( - ctx, zone.Constraints, rangeInfo, storeFilterThrottled) + ctx, zone.Constraints, repl.RaftStatus(), rangeInfo, storeFilterThrottled) if rebalanceStore == nil { log.VEventf(ctx, 1, "no suitable rebalance target") } else { @@ -575,7 +575,8 @@ func (rq *replicateQueue) addReplica( if err := repl.changeReplicas(ctx, roachpb.ADD_REPLICA, target, desc, priority, reason, details); err != nil { return err } - rq.allocator.storePool.updateLocalStoreAfterRebalance(target.StoreID, repl, roachpb.ADD_REPLICA) + rangeInfo := rangeInfoForRepl(repl, desc) + rq.allocator.storePool.updateLocalStoreAfterRebalance(target.StoreID, rangeInfo, roachpb.ADD_REPLICA) return nil } @@ -594,7 +595,8 @@ func (rq *replicateQueue) removeReplica( if err := repl.ChangeReplicas(ctx, roachpb.REMOVE_REPLICA, target, desc, reason, details); err != nil { return err } - rq.allocator.storePool.updateLocalStoreAfterRebalance(target.StoreID, repl, roachpb.REMOVE_REPLICA) + rangeInfo := rangeInfoForRepl(repl, desc) + rq.allocator.storePool.updateLocalStoreAfterRebalance(target.StoreID, rangeInfo, roachpb.REMOVE_REPLICA) return nil } diff --git a/pkg/storage/store_pool.go b/pkg/storage/store_pool.go index 001ce5ac9c06..07fa574f9dfc 100644 --- a/pkg/storage/store_pool.go +++ b/pkg/storage/store_pool.go @@ -330,7 +330,7 @@ func (sp *StorePool) deadReplicasGossipUpdate(_ string, content roachpb.Value) { // updateLocalStoreAfterRebalance is used to update the local copy of the // target store immediately after a rebalance. func (sp *StorePool) updateLocalStoreAfterRebalance( - storeID roachpb.StoreID, repl *Replica, changeType roachpb.ReplicaChangeType, + storeID roachpb.StoreID, rangeInfo RangeInfo, changeType roachpb.ReplicaChangeType, ) { sp.detailsMu.Lock() defer sp.detailsMu.Unlock() @@ -343,23 +343,18 @@ func (sp *StorePool) updateLocalStoreAfterRebalance( } switch changeType { case roachpb.ADD_REPLICA: - detail.desc.Capacity.LogicalBytes += repl.GetMVCCStats().Total() - if qps, dur := repl.writeStats.avgQPS(); dur >= MinStatsDuration { - detail.desc.Capacity.WritesPerSecond += qps - } + detail.desc.Capacity.LogicalBytes += rangeInfo.LogicalBytes + detail.desc.Capacity.WritesPerSecond += rangeInfo.WritesPerSecond case roachpb.REMOVE_REPLICA: - total := repl.GetMVCCStats().Total() - if detail.desc.Capacity.LogicalBytes <= total { + if detail.desc.Capacity.LogicalBytes <= rangeInfo.LogicalBytes { detail.desc.Capacity.LogicalBytes = 0 } else { - detail.desc.Capacity.LogicalBytes -= total + detail.desc.Capacity.LogicalBytes -= rangeInfo.LogicalBytes } - if qps, dur := repl.writeStats.avgQPS(); dur >= MinStatsDuration { - if detail.desc.Capacity.WritesPerSecond <= qps { - detail.desc.Capacity.WritesPerSecond = 0 - } else { - detail.desc.Capacity.WritesPerSecond -= qps - } + if detail.desc.Capacity.WritesPerSecond <= rangeInfo.WritesPerSecond { + detail.desc.Capacity.WritesPerSecond = 0 + } else { + detail.desc.Capacity.WritesPerSecond -= rangeInfo.WritesPerSecond } } sp.detailsMu.storeDetails[storeID] = &detail diff --git a/pkg/storage/store_pool_test.go b/pkg/storage/store_pool_test.go index 4829caa22bce..3422f2050c96 100644 --- a/pkg/storage/store_pool_test.go +++ b/pkg/storage/store_pool_test.go @@ -354,21 +354,8 @@ func TestStorePoolUpdateLocalStore(t *testing.T) { }, } sg.GossipStores(stores, t) - node := roachpb.NodeDescriptor{NodeID: roachpb.NodeID(1)} - eng := engine.NewInMem(roachpb.Attributes{}, 1<<20) - stopper.AddCloser(eng) - cfg := TestStoreConfig(clock) - cfg.Transport = NewDummyRaftTransport(cfg.Settings) - store := NewStore(cfg, eng, &node) - rg := roachpb.RangeDescriptor{ - RangeID: 1, - StartKey: roachpb.RKey([]byte("a")), - EndKey: roachpb.RKey([]byte("b")), - } - replica, err := NewReplica(&rg, store, roachpb.ReplicaID(0)) - if err != nil { - t.Fatalf("make replica error : %s", err) - } + + replica := &Replica{RangeID: 1} replica.mu.Lock() replica.mu.state.Stats = enginepb.MVCCStats{ KeyBytes: 2, @@ -382,7 +369,13 @@ func TestStorePoolUpdateLocalStore(t *testing.T) { manual.Increment(int64(MinStatsDuration + time.Second)) replica.writeStats = rs - sp.updateLocalStoreAfterRebalance(roachpb.StoreID(1), replica, roachpb.ADD_REPLICA) + rangeDesc := &roachpb.RangeDescriptor{ + RangeID: replica.RangeID, + } + + rangeInfo := rangeInfoForRepl(replica, rangeDesc) + + sp.updateLocalStoreAfterRebalance(roachpb.StoreID(1), rangeInfo, roachpb.ADD_REPLICA) desc, ok := sp.getStoreDescriptor(roachpb.StoreID(1)) if !ok { t.Fatalf("couldn't find StoreDescriptor for Store ID %d", 1) @@ -393,7 +386,7 @@ func TestStorePoolUpdateLocalStore(t *testing.T) { expectedBytes, desc.Capacity.LogicalBytes, expectedQPS, desc.Capacity.WritesPerSecond) } - sp.updateLocalStoreAfterRebalance(roachpb.StoreID(2), replica, roachpb.REMOVE_REPLICA) + sp.updateLocalStoreAfterRebalance(roachpb.StoreID(2), rangeInfo, roachpb.REMOVE_REPLICA) desc, ok = sp.getStoreDescriptor(roachpb.StoreID(2)) if !ok { t.Fatalf("couldn't find StoreDescriptor for Store ID %d", 2) @@ -433,12 +426,14 @@ func TestStorePoolUpdateLocalStoreBeforeGossip(t *testing.T) { t.Fatalf("make replica error : %s", err) } + rangeInfo := rangeInfoForRepl(replica, &rg) + // Update StorePool, which should be a no-op. storeID := roachpb.StoreID(1) if _, ok := sp.getStoreDescriptor(storeID); ok { t.Fatalf("StoreDescriptor not gossiped, should not be found") } - sp.updateLocalStoreAfterRebalance(storeID, replica, roachpb.ADD_REPLICA) + sp.updateLocalStoreAfterRebalance(storeID, rangeInfo, roachpb.ADD_REPLICA) if _, ok := sp.getStoreDescriptor(storeID); ok { t.Fatalf("StoreDescriptor still not gossiped, should not be found") } From 21d4cd9a90f6a35820315396941b058e169e7903 Mon Sep 17 00:00:00 2001 From: a6802739 Date: Thu, 12 Oct 2017 16:33:24 +0800 Subject: [PATCH 4/8] storage : fix bug for the testrace of TestSystemZoneConfigs Fixes #19180. Fixes #19207. --- pkg/storage/allocator.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/storage/allocator.go b/pkg/storage/allocator.go index 39c06edbb04f..cbefd5c153ff 100644 --- a/pkg/storage/allocator.go +++ b/pkg/storage/allocator.go @@ -526,7 +526,11 @@ func (a Allocator) RebalanceTarget( StoreID: target.store.StoreID, ReplicaID: rangeInfo.Desc.NextReplicaID, } + + // Deep-copy the Replicas slice since we'll mutate it. desc := *rangeInfo.Desc + desc.Replicas = append([]roachpb.ReplicaDescriptor(nil), desc.Replicas...) + desc.Replicas = append(desc.Replicas, newReplica) rangeInfo.Desc = &desc From 5dbe05eab5a45d0d969c0bad3472ea5f877d7c2f Mon Sep 17 00:00:00 2001 From: Tamir Duberstein Date: Wed, 25 Oct 2017 20:47:26 -0400 Subject: [PATCH 5/8] storage: allocate the slice just once --- pkg/storage/allocator.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/storage/allocator.go b/pkg/storage/allocator.go index cbefd5c153ff..12b950444a89 100644 --- a/pkg/storage/allocator.go +++ b/pkg/storage/allocator.go @@ -529,9 +529,7 @@ func (a Allocator) RebalanceTarget( // Deep-copy the Replicas slice since we'll mutate it. desc := *rangeInfo.Desc - desc.Replicas = append([]roachpb.ReplicaDescriptor(nil), desc.Replicas...) - - desc.Replicas = append(desc.Replicas, newReplica) + desc.Replicas = append(desc.Replicas[:len(desc.Replicas):len(desc.Replicas)], newReplica) rangeInfo.Desc = &desc replicaCandidates := simulateFilterUnremovableReplicas(raftStatus, desc.Replicas, newReplica.ReplicaID) From fd46e3f3949efcf0cafe215fa1ef51a4f8bbb6eb Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Wed, 13 Dec 2017 16:50:51 -0500 Subject: [PATCH 6/8] storage: Fix simulation of rebalance removals to actually remove targets If the first target attempted was rejected due to the simulation claiming that it would be immediately removed, we would reuse the modified `rangeInfo.Desc.Replicas` that had the target added to it, messing with future iterations of the loop. Also, we weren't properly modifying the `candidates` slice, meaning that we could end up trying the same replica multiple times. Release note (bug fix): Improve data rebalancing to make thrashing back and forth between nodes much less likely. --- pkg/storage/allocator.go | 10 ++++--- pkg/storage/allocator_test.go | 53 +++++++++++++++++++---------------- 2 files changed, 35 insertions(+), 28 deletions(-) diff --git a/pkg/storage/allocator.go b/pkg/storage/allocator.go index 12b950444a89..964a74e13d59 100644 --- a/pkg/storage/allocator.go +++ b/pkg/storage/allocator.go @@ -542,12 +542,14 @@ func (a Allocator) RebalanceTarget( if removeReplica.StoreID != target.store.StoreID { break } - newTargets := candidates.removeCandidate(*target) - newTarget := newTargets.selectGood(a.randGen) - if newTarget == nil { + // Remove the considered target from our modified RangeDescriptor and from + // the candidates list, then try again if there are any other candidates. + rangeInfo.Desc.Replicas = rangeInfo.Desc.Replicas[:len(rangeInfo.Desc.Replicas)-1] + candidates = candidates.removeCandidate(*target) + target = candidates.selectGood(a.randGen) + if target == nil { return nil, "" } - target = newTarget } details, err := json.Marshal(decisionDetails{ Target: target.String(), diff --git a/pkg/storage/allocator_test.go b/pkg/storage/allocator_test.go index 2a9ef75e2254..90599852180c 100644 --- a/pkg/storage/allocator_test.go +++ b/pkg/storage/allocator_test.go @@ -695,13 +695,13 @@ func TestAllocatorRebalanceTarget(t *testing.T) { clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false) defer stopper.Stop(context.Background()) - // We make 4 stores in this test, store1 and store2 are in the same datacenter a, - // store3 in the datacenter b, store4 in the datacenter c. all of our replicas are - // distributed within these three datacenters. Originally, store4 has much more replicas - // than other stores. So if we didn't make the simulation in RebalanceTarget, it will - // try to choose store2 as the target store to make an rebalance. - // However, we'll immediately remove the replica on the store2 to guarantee the locality diversity. - // But after we make the simulation in RebalanceTarget, we could avoid that happen. + // We make 5 stores in this test -- 3 in the same datacenter, and 1 each in + // 2 other datacenters. All of our replicas are distributed within these 3 + // datacenters. Originally, the stores that are all alone in their datacenter + // are fuller than the other stores. If we didn't simulate RemoveTarget in + // RebalanceTarget, we would try to choose store 2 or 3 as the target store + // to make a rebalance. However, we would immediately remove the replica on + // store 1 or 2 to retain the locality diversity. stores := []*roachpb.StoreDescriptor{ { StoreID: 1, @@ -728,7 +728,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { }, }, Capacity: roachpb.StoreCapacity{ - RangeCount: 56, + RangeCount: 55, }, }, { @@ -737,7 +737,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { NodeID: 3, Locality: roachpb.Locality{ Tiers: []roachpb.Tier{ - {Key: "datacenter", Value: "b"}, + {Key: "datacenter", Value: "a"}, }, }, }, @@ -749,6 +749,20 @@ func TestAllocatorRebalanceTarget(t *testing.T) { StoreID: 4, Node: roachpb.NodeDescriptor{ NodeID: 4, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "datacenter", Value: "b"}, + }, + }, + }, + Capacity: roachpb.StoreCapacity{ + RangeCount: 100, + }, + }, + { + StoreID: 5, + Node: roachpb.NodeDescriptor{ + NodeID: 5, Locality: roachpb.Locality{ Tiers: []roachpb.Tier{ {Key: "datacenter", Value: "c"}, @@ -756,7 +770,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { }, }, Capacity: roachpb.StoreCapacity{ - RangeCount: 150, + RangeCount: 100, }, }, } @@ -766,18 +780,9 @@ func TestAllocatorRebalanceTarget(t *testing.T) { st := a.storePool.st EnableStatsBasedRebalancing.Override(&st.SV, false) replicas := []roachpb.ReplicaDescriptor{ - { - StoreID: 1, - NodeID: 1, - }, - { - StoreID: 3, - NodeID: 3, - }, - { - StoreID: 4, - NodeID: 4, - }, + {NodeID: 1, StoreID: 1}, + {NodeID: 4, StoreID: 4}, + {NodeID: 5, StoreID: 5}, } repl := &Replica{RangeID: firstRange} @@ -804,7 +809,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { } } for i := 0; i < 10; i++ { - result, _ := a.RebalanceTarget( + result, details := a.RebalanceTarget( context.Background(), config.Constraints{}, status, @@ -812,7 +817,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { storeFilterThrottled, ) if result != nil { - t.Errorf("expected no rebalance, but got %d.", result.StoreID) + t.Fatalf("expected no rebalance, but got target s%d; details: %s", result.StoreID, details) } } } From 06f7d7664796558f4e8df1faab63e0be5f8a5fde Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Fri, 15 Dec 2017 12:51:45 -0500 Subject: [PATCH 7/8] storage: Always simulate RemoveTarget when rebalancing Skipping the simulation when raftStatus.Progress is nil can make for undesirable thrashing of replicas, as seen when testing #20241. It's better to run the simulation without properly filtering replicas than to not run it at all. Release note: None --- pkg/storage/allocator.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/pkg/storage/allocator.go b/pkg/storage/allocator.go index 964a74e13d59..99e9c0245e36 100644 --- a/pkg/storage/allocator.go +++ b/pkg/storage/allocator.go @@ -518,9 +518,6 @@ func (a Allocator) RebalanceTarget( } // We could make a simulation here to verify whether we'll remove the target we'll rebalance to. for len(candidates) > 0 { - if raftStatus == nil || raftStatus.Progress == nil { - break - } newReplica := roachpb.ReplicaDescriptor{ NodeID: target.store.Node.NodeID, StoreID: target.store.StoreID, @@ -532,7 +529,15 @@ func (a Allocator) RebalanceTarget( desc.Replicas = append(desc.Replicas[:len(desc.Replicas):len(desc.Replicas)], newReplica) rangeInfo.Desc = &desc - replicaCandidates := simulateFilterUnremovableReplicas(raftStatus, desc.Replicas, newReplica.ReplicaID) + // If we can, filter replicas as we would if we were actually removing one. + // If we can't (e.g. because we're the leaseholder but not the raft leader), + // it's better to simulate the removal with the info that we do have than to + // assume that the rebalance is ok (#20241). + replicaCandidates := desc.Replicas + if raftStatus != nil && raftStatus.Progress != nil { + replicaCandidates = simulateFilterUnremovableReplicas( + raftStatus, desc.Replicas, newReplica.ReplicaID) + } removeReplica, _, err := a.simulateRemoveTarget(ctx, target.store.StoreID, constraints, replicaCandidates, rangeInfo) if err != nil { From f01eb24f5a4862d74b7d335230ce1c3b2b25d7d9 Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Fri, 15 Dec 2017 13:18:57 -0500 Subject: [PATCH 8/8] storage: Avoid replica thrashing when localities are different sizes Fixes #20241 Release note (bug fix): avoid rebalance thrashing when localities have very different numbers of nodes --- pkg/storage/allocator.go | 51 +++++++++++++++++++++++++++++++-- pkg/storage/allocator_scorer.go | 35 ++++++++++++++++++++++ pkg/storage/allocator_test.go | 38 +++++++++++++++++++++++- 3 files changed, 120 insertions(+), 4 deletions(-) diff --git a/pkg/storage/allocator.go b/pkg/storage/allocator.go index 99e9c0245e36..5c33f67861a5 100644 --- a/pkg/storage/allocator.go +++ b/pkg/storage/allocator.go @@ -516,7 +516,9 @@ func (a Allocator) RebalanceTarget( if target == nil { return nil, "" } - // We could make a simulation here to verify whether we'll remove the target we'll rebalance to. + + // Determine whether we'll just remove the target immediately after adding it. + // If we would, we don't want to actually do the rebalance. for len(candidates) > 0 { newReplica := roachpb.ReplicaDescriptor{ NodeID: target.store.Node.NodeID, @@ -539,12 +541,17 @@ func (a Allocator) RebalanceTarget( raftStatus, desc.Replicas, newReplica.ReplicaID) } - removeReplica, _, err := a.simulateRemoveTarget(ctx, target.store.StoreID, constraints, replicaCandidates, rangeInfo) + removeReplica, details, err := a.simulateRemoveTarget( + ctx, + target.store.StoreID, + constraints, + replicaCandidates, + rangeInfo) if err != nil { log.Warningf(ctx, "simulating RemoveTarget failed: %s", err) return nil, "" } - if removeReplica.StoreID != target.store.StoreID { + if shouldRebalanceBetween(ctx, a.storePool.st, *target, removeReplica, existingCandidates, details) { break } // Remove the considered target from our modified RangeDescriptor and from @@ -568,6 +575,44 @@ func (a Allocator) RebalanceTarget( return &target.store, string(details) } +// shouldRebalanceBetween returns whether it's a good idea to rebalance to the +// given `add` candidate if the replica that will be removed after adding it is +// `remove`. This is a last failsafe to ensure that we don't take unnecessary +// rebalance actions that cause thrashing. +func shouldRebalanceBetween( + ctx context.Context, + st *cluster.Settings, + add candidate, + remove roachpb.ReplicaDescriptor, + existingCandidates candidateList, + removeDetails string, +) bool { + if remove.StoreID == add.store.StoreID { + log.VEventf(ctx, 2, "not rebalancing to s%d because we'd immediately remove it: %s", + add.store.StoreID, removeDetails) + return false + } + + // It's possible that we initially decided to rebalance based on comparing + // rebalance candidates in one locality to an existing replica in another + // locality (e.g. if one locality has many more nodes than another). This can + // make for unnecessary rebalances and even thrashing, so do a more direct + // comparison here of the replicas we'll actually be adding and removing. + for _, removeCandidate := range existingCandidates { + if removeCandidate.store.StoreID == remove.StoreID { + if removeCandidate.worthRebalancingTo(st, add) { + return true + } + log.VEventf(ctx, 2, "not rebalancing to %s because it isn't an improvement over "+ + "what we'd remove after adding it: %s", add, removeCandidate) + return false + } + } + // If the code reaches this point, remove must be a non-live store, so let the + // rebalance happen. + return true +} + // TransferLeaseTarget returns a suitable replica to transfer the range lease // to from the provided list. It excludes the current lease holder replica // unless asked to do otherwise by the checkTransferLeaseSource parameter. diff --git a/pkg/storage/allocator_scorer.go b/pkg/storage/allocator_scorer.go index 2776cc3dcca1..48ef71786c10 100644 --- a/pkg/storage/allocator_scorer.go +++ b/pkg/storage/allocator_scorer.go @@ -134,6 +134,41 @@ func (c candidate) less(o candidate) bool { return c.rangeCount > o.rangeCount } +// worthRebalancingTo returns true if o is enough of a better fit for some +// range than c is that it's worth rebalancing from c to o. +func (c candidate) worthRebalancingTo(st *cluster.Settings, o candidate) bool { + if !o.valid { + return false + } + if !c.valid { + return true + } + if c.constraintScore != o.constraintScore { + return c.constraintScore < o.constraintScore + } + if c.convergesScore != o.convergesScore { + return c.convergesScore < o.convergesScore + } + // You might intuitively think that we should require o's balanceScore to + // be considerably higher than c's balanceScore, but that will effectively + // rule out rebalancing in clusters where one locality is much larger or + // smaller than the others, since all the stores in that locality will tend + // to have either a maximal or minimal balanceScore. + if c.balanceScore.totalScore() != o.balanceScore.totalScore() { + return c.balanceScore.totalScore() < o.balanceScore.totalScore() + } + // Instead, just require a gap between their number of ranges. This isn't + // great, particularly for stats-based rebalancing, but it only breaks + // balanceScore ties and it's a workable stop-gap on the way to something + // like #20751. + avgRangeCount := float64(c.rangeCount+o.rangeCount) / 2.0 + // Use an overfullThreshold that is at least a couple replicas larger than + // the average of the two, to ensure that we don't keep rebalancing back + // and forth between nodes that only differ by one or two replicas. + overfullThreshold := math.Max(overfullRangeThreshold(st, avgRangeCount), avgRangeCount+1.5) + return float64(c.rangeCount) > overfullThreshold +} + type candidateList []candidate func (cl candidateList) String() string { diff --git a/pkg/storage/allocator_test.go b/pkg/storage/allocator_test.go index 90599852180c..ee57b09dc70f 100644 --- a/pkg/storage/allocator_test.go +++ b/pkg/storage/allocator_test.go @@ -787,7 +787,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { repl := &Replica{RangeID: firstRange} repl.mu.Lock() - repl.mu.state.Stats = &enginepb.MVCCStats{} + repl.mu.state.Stats = enginepb.MVCCStats{} repl.mu.Unlock() rs := newReplicaStats(clock, nil) @@ -820,6 +820,42 @@ func TestAllocatorRebalanceTarget(t *testing.T) { t.Fatalf("expected no rebalance, but got target s%d; details: %s", result.StoreID, details) } } + + // Set up a second round of testing where the other two stores in the big + // locality actually have fewer replicas, but enough that it still isn't + // worth rebalancing to them. + stores[1].Capacity.RangeCount = 46 + stores[2].Capacity.RangeCount = 46 + sg.GossipStores(stores, t) + for i := 0; i < 10; i++ { + result, details := a.RebalanceTarget( + context.Background(), + config.Constraints{}, + status, + rangeInfo, + storeFilterThrottled, + ) + if result != nil { + t.Fatalf("expected no rebalance, but got target s%d; details: %s", result.StoreID, details) + } + } + + // Make sure rebalancing does happen if we drop just a little further down. + stores[1].Capacity.RangeCount = 45 + sg.GossipStores(stores, t) + for i := 0; i < 10; i++ { + result, details := a.RebalanceTarget( + context.Background(), + config.Constraints{}, + status, + rangeInfo, + storeFilterThrottled, + ) + if result == nil || result.StoreID != stores[1].StoreID { + t.Fatalf("expected rebalance to s%d, but got %v; details: %s", + stores[1].StoreID, result, details) + } + } } func TestAllocatorRebalanceDeadNodes(t *testing.T) {