diff --git a/pkg/storage/allocator.go b/pkg/storage/allocator.go index ba9fda281ba9..5c33f67861a5 100644 --- a/pkg/storage/allocator.go +++ b/pkg/storage/allocator.go @@ -184,12 +184,14 @@ type RangeInfo struct { } func rangeInfoForRepl(repl *Replica, desc *roachpb.RangeDescriptor) RangeInfo { - writesPerSecond, _ := repl.writeStats.avgQPS() - return RangeInfo{ - Desc: desc, - LogicalBytes: repl.GetMVCCStats().Total(), - WritesPerSecond: writesPerSecond, + info := RangeInfo{ + Desc: desc, + LogicalBytes: repl.GetMVCCStats().Total(), } + if writesPerSecond, dur := repl.writeStats.avgQPS(); dur >= MinStatsDuration { + info.WritesPerSecond = writesPerSecond + } + return info } // Allocator tries to spread replicas as evenly as possible across the stores @@ -374,6 +376,25 @@ func (a *Allocator) AllocateTarget( } } +func (a Allocator) simulateRemoveTarget( + ctx context.Context, + targetStore roachpb.StoreID, + constraints config.Constraints, + candidates []roachpb.ReplicaDescriptor, + rangeInfo RangeInfo, +) (roachpb.ReplicaDescriptor, string, error) { + // Update statistics first + // TODO(a-robinson): This could theoretically interfere with decisions made by other goroutines, + // but as of October 2017 calls to the Allocator are mostly serialized by the ReplicateQueue + // (with the main exceptions being Scatter and the status server's allocator debug endpoint). + // Try to make this interfere less with other callers. + a.storePool.updateLocalStoreAfterRebalance(targetStore, rangeInfo, roachpb.ADD_REPLICA) + defer func() { + a.storePool.updateLocalStoreAfterRebalance(targetStore, rangeInfo, roachpb.REMOVE_REPLICA) + }() + return a.RemoveTarget(ctx, constraints, candidates, rangeInfo) +} + // RemoveTarget returns a suitable replica to remove from the provided replica // set. It first attempts to randomly select a target from the set of stores // that have greater than the average number of replicas. Failing that, it @@ -444,7 +465,11 @@ func (a Allocator) RemoveTarget( // rebalance. This helps prevent a stampeding herd targeting an abnormally // under-utilized store. func (a Allocator) RebalanceTarget( - ctx context.Context, constraints config.Constraints, rangeInfo RangeInfo, filter storeFilter, + ctx context.Context, + constraints config.Constraints, + raftStatus *raft.Status, + rangeInfo RangeInfo, + filter storeFilter, ) (*roachpb.StoreDescriptor, string) { sl, _, _ := a.storePool.getStoreList(rangeInfo.Desc.RangeID, filter) @@ -491,6 +516,53 @@ func (a Allocator) RebalanceTarget( if target == nil { return nil, "" } + + // Determine whether we'll just remove the target immediately after adding it. + // If we would, we don't want to actually do the rebalance. + for len(candidates) > 0 { + newReplica := roachpb.ReplicaDescriptor{ + NodeID: target.store.Node.NodeID, + StoreID: target.store.StoreID, + ReplicaID: rangeInfo.Desc.NextReplicaID, + } + + // Deep-copy the Replicas slice since we'll mutate it. + desc := *rangeInfo.Desc + desc.Replicas = append(desc.Replicas[:len(desc.Replicas):len(desc.Replicas)], newReplica) + rangeInfo.Desc = &desc + + // If we can, filter replicas as we would if we were actually removing one. + // If we can't (e.g. because we're the leaseholder but not the raft leader), + // it's better to simulate the removal with the info that we do have than to + // assume that the rebalance is ok (#20241). + replicaCandidates := desc.Replicas + if raftStatus != nil && raftStatus.Progress != nil { + replicaCandidates = simulateFilterUnremovableReplicas( + raftStatus, desc.Replicas, newReplica.ReplicaID) + } + + removeReplica, details, err := a.simulateRemoveTarget( + ctx, + target.store.StoreID, + constraints, + replicaCandidates, + rangeInfo) + if err != nil { + log.Warningf(ctx, "simulating RemoveTarget failed: %s", err) + return nil, "" + } + if shouldRebalanceBetween(ctx, a.storePool.st, *target, removeReplica, existingCandidates, details) { + break + } + // Remove the considered target from our modified RangeDescriptor and from + // the candidates list, then try again if there are any other candidates. + rangeInfo.Desc.Replicas = rangeInfo.Desc.Replicas[:len(rangeInfo.Desc.Replicas)-1] + candidates = candidates.removeCandidate(*target) + target = candidates.selectGood(a.randGen) + if target == nil { + return nil, "" + } + } details, err := json.Marshal(decisionDetails{ Target: target.String(), Existing: existingCandidates.String(), @@ -503,6 +575,44 @@ func (a Allocator) RebalanceTarget( return &target.store, string(details) } +// shouldRebalanceBetween returns whether it's a good idea to rebalance to the +// given `add` candidate if the replica that will be removed after adding it is +// `remove`. This is a last failsafe to ensure that we don't take unnecessary +// rebalance actions that cause thrashing. +func shouldRebalanceBetween( + ctx context.Context, + st *cluster.Settings, + add candidate, + remove roachpb.ReplicaDescriptor, + existingCandidates candidateList, + removeDetails string, +) bool { + if remove.StoreID == add.store.StoreID { + log.VEventf(ctx, 2, "not rebalancing to s%d because we'd immediately remove it: %s", + add.store.StoreID, removeDetails) + return false + } + + // It's possible that we initially decided to rebalance based on comparing + // rebalance candidates in one locality to an existing replica in another + // locality (e.g. if one locality has many more nodes than another). This can + // make for unnecessary rebalances and even thrashing, so do a more direct + // comparison here of the replicas we'll actually be adding and removing. + for _, removeCandidate := range existingCandidates { + if removeCandidate.store.StoreID == remove.StoreID { + if removeCandidate.worthRebalancingTo(st, add) { + return true + } + log.VEventf(ctx, 2, "not rebalancing to %s because it isn't an improvement over "+ + "what we'd remove after adding it: %s", add, removeCandidate) + return false + } + } + // If the code reaches this point, remove must be a non-live store, so let the + // rebalance happen. + return true +} + // TransferLeaseTarget returns a suitable replica to transfer the range lease // to from the provided list. It excludes the current lease holder replica // unless asked to do otherwise by the checkTransferLeaseSource parameter. @@ -877,6 +987,16 @@ func filterBehindReplicas( return candidates } +func simulateFilterUnremovableReplicas( + raftStatus *raft.Status, + replicas []roachpb.ReplicaDescriptor, + brandNewReplicaID roachpb.ReplicaID, +) []roachpb.ReplicaDescriptor { + status := *raftStatus + status.Progress[uint64(brandNewReplicaID)] = raft.Progress{Match: 0} + return filterUnremovableReplicas(&status, replicas, brandNewReplicaID) +} + // filterUnremovableReplicas removes any unremovable replicas from the supplied // slice. An unremovable replicas is one which is a necessary part of the // quorum that will result from removing 1 replica. We forgive brandNewReplicaID diff --git a/pkg/storage/allocator_scorer.go b/pkg/storage/allocator_scorer.go index 529604f61b90..0e61cffcc279 100644 --- a/pkg/storage/allocator_scorer.go +++ b/pkg/storage/allocator_scorer.go @@ -134,6 +134,41 @@ func (c candidate) less(o candidate) bool { return c.rangeCount > o.rangeCount } +// worthRebalancingTo returns true if o is enough of a better fit for some +// range than c is that it's worth rebalancing from c to o. +func (c candidate) worthRebalancingTo(st *cluster.Settings, o candidate) bool { + if !o.valid { + return false + } + if !c.valid { + return true + } + if c.constraintScore != o.constraintScore { + return c.constraintScore < o.constraintScore + } + if c.convergesScore != o.convergesScore { + return c.convergesScore < o.convergesScore + } + // You might intuitively think that we should require o's balanceScore to + // be considerably higher than c's balanceScore, but that will effectively + // rule out rebalancing in clusters where one locality is much larger or + // smaller than the others, since all the stores in that locality will tend + // to have either a maximal or minimal balanceScore. + if c.balanceScore.totalScore() != o.balanceScore.totalScore() { + return c.balanceScore.totalScore() < o.balanceScore.totalScore() + } + // Instead, just require a gap between their number of ranges. This isn't + // great, particularly for stats-based rebalancing, but it only breaks + // balanceScore ties and it's a workable stop-gap on the way to something + // like #20751. + avgRangeCount := float64(c.rangeCount+o.rangeCount) / 2.0 + // Use an overfullThreshold that is at least a couple replicas larger than + // the average of the two, to ensure that we don't keep rebalancing back + // and forth between nodes that only differ by one or two replicas. + overfullThreshold := math.Max(overfullRangeThreshold(st, avgRangeCount), avgRangeCount+1.5) + return float64(c.rangeCount) > overfullThreshold +} + type candidateList []candidate func (cl candidateList) String() string { @@ -285,6 +320,17 @@ func (cl candidateList) selectBad(randGen allocatorRand) *candidate { return worst } +// removeCandidate remove the specified candidate from candidateList. +func (cl candidateList) removeCandidate(c candidate) candidateList { + for i := 0; i < len(cl); i++ { + if cl[i].store.StoreID == c.store.StoreID { + cl = append(cl[:i], cl[i+1:]...) + break + } + } + return cl +} + // allocateCandidates creates a candidate list of all stores that can be used // for allocating a new replica ordered from the best to the worst. Only // stores that meet the criteria are included in the list. diff --git a/pkg/storage/allocator_test.go b/pkg/storage/allocator_test.go index 3c91b3ba0fa5..ee57b09dc70f 100644 --- a/pkg/storage/allocator_test.go +++ b/pkg/storage/allocator_test.go @@ -36,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil" "github.com/cockroachdb/cockroach/pkg/util" @@ -657,6 +658,7 @@ func TestAllocatorRebalance(t *testing.T) { result, _ := a.RebalanceTarget( ctx, config.Constraints{}, + nil, testRangeInfo([]roachpb.ReplicaDescriptor{{StoreID: 3}}, firstRange), storeFilterThrottled, ) @@ -685,6 +687,177 @@ func TestAllocatorRebalance(t *testing.T) { } } +// TestAllocatorRebalanceTarget could help us to verify whether we'll rebalance to a target that +// we'll immediately remove. +func TestAllocatorRebalanceTarget(t *testing.T) { + defer leaktest.AfterTest(t)() + manual := hlc.NewManualClock(123) + clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) + stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false) + defer stopper.Stop(context.Background()) + // We make 5 stores in this test -- 3 in the same datacenter, and 1 each in + // 2 other datacenters. All of our replicas are distributed within these 3 + // datacenters. Originally, the stores that are all alone in their datacenter + // are fuller than the other stores. If we didn't simulate RemoveTarget in + // RebalanceTarget, we would try to choose store 2 or 3 as the target store + // to make a rebalance. However, we would immediately remove the replica on + // store 1 or 2 to retain the locality diversity. + stores := []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{ + NodeID: 1, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "datacenter", Value: "a"}, + }, + }, + }, + Capacity: roachpb.StoreCapacity{ + RangeCount: 50, + }, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{ + NodeID: 2, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "datacenter", Value: "a"}, + }, + }, + }, + Capacity: roachpb.StoreCapacity{ + RangeCount: 55, + }, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{ + NodeID: 3, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "datacenter", Value: "a"}, + }, + }, + }, + Capacity: roachpb.StoreCapacity{ + RangeCount: 55, + }, + }, + { + StoreID: 4, + Node: roachpb.NodeDescriptor{ + NodeID: 4, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "datacenter", Value: "b"}, + }, + }, + }, + Capacity: roachpb.StoreCapacity{ + RangeCount: 100, + }, + }, + { + StoreID: 5, + Node: roachpb.NodeDescriptor{ + NodeID: 5, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "datacenter", Value: "c"}, + }, + }, + }, + Capacity: roachpb.StoreCapacity{ + RangeCount: 100, + }, + }, + } + sg := gossiputil.NewStoreGossiper(g) + sg.GossipStores(stores, t) + + st := a.storePool.st + EnableStatsBasedRebalancing.Override(&st.SV, false) + replicas := []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1}, + {NodeID: 4, StoreID: 4}, + {NodeID: 5, StoreID: 5}, + } + repl := &Replica{RangeID: firstRange} + + repl.mu.Lock() + repl.mu.state.Stats = enginepb.MVCCStats{} + repl.mu.Unlock() + + rs := newReplicaStats(clock, nil) + repl.writeStats = rs + + desc := &roachpb.RangeDescriptor{ + Replicas: replicas, + RangeID: firstRange, + } + + rangeInfo := rangeInfoForRepl(repl, desc) + + status := &raft.Status{ + Progress: make(map[uint64]raft.Progress), + } + for _, replica := range replicas { + status.Progress[uint64(replica.NodeID)] = raft.Progress{ + Match: 10, + } + } + for i := 0; i < 10; i++ { + result, details := a.RebalanceTarget( + context.Background(), + config.Constraints{}, + status, + rangeInfo, + storeFilterThrottled, + ) + if result != nil { + t.Fatalf("expected no rebalance, but got target s%d; details: %s", result.StoreID, details) + } + } + + // Set up a second round of testing where the other two stores in the big + // locality actually have fewer replicas, but enough that it still isn't + // worth rebalancing to them. + stores[1].Capacity.RangeCount = 46 + stores[2].Capacity.RangeCount = 46 + sg.GossipStores(stores, t) + for i := 0; i < 10; i++ { + result, details := a.RebalanceTarget( + context.Background(), + config.Constraints{}, + status, + rangeInfo, + storeFilterThrottled, + ) + if result != nil { + t.Fatalf("expected no rebalance, but got target s%d; details: %s", result.StoreID, details) + } + } + + // Make sure rebalancing does happen if we drop just a little further down. + stores[1].Capacity.RangeCount = 45 + sg.GossipStores(stores, t) + for i := 0; i < 10; i++ { + result, details := a.RebalanceTarget( + context.Background(), + config.Constraints{}, + status, + rangeInfo, + storeFilterThrottled, + ) + if result == nil || result.StoreID != stores[1].StoreID { + t.Fatalf("expected rebalance to s%d, but got %v; details: %s", + stores[1].StoreID, result, details) + } + } +} + func TestAllocatorRebalanceDeadNodes(t *testing.T) { defer leaktest.AfterTest(t)() @@ -754,7 +927,7 @@ func TestAllocatorRebalanceDeadNodes(t *testing.T) { for _, c := range testCases { t.Run("", func(t *testing.T) { result, _ := a.RebalanceTarget( - ctx, config.Constraints{}, testRangeInfo(c.existing, firstRange), storeFilterThrottled) + ctx, config.Constraints{}, nil, testRangeInfo(c.existing, firstRange), storeFilterThrottled) if c.expected > 0 { if result == nil { t.Fatalf("expected %d, but found nil", c.expected) @@ -949,6 +1122,7 @@ func TestAllocatorRebalanceByCount(t *testing.T) { result, _ := a.RebalanceTarget( ctx, config.Constraints{}, + nil, testRangeInfo([]roachpb.ReplicaDescriptor{{StoreID: stores[0].StoreID}}, firstRange), storeFilterThrottled, ) @@ -2552,6 +2726,7 @@ func TestAllocatorRebalanceAway(t *testing.T) { actual, _ := a.RebalanceTarget( ctx, constraints, + nil, testRangeInfo(existingReplicas, firstRange), storeFilterThrottled, ) @@ -2672,6 +2847,7 @@ func Example_rebalancing() { target, _ := alloc.RebalanceTarget( context.Background(), config.Constraints{}, + nil, testRangeInfo([]roachpb.ReplicaDescriptor{{NodeID: ts.Node.NodeID, StoreID: ts.StoreID}}, firstRange), storeFilterThrottled, ) diff --git a/pkg/storage/replicate_queue.go b/pkg/storage/replicate_queue.go index be08c229fd0e..1c39ae3d2a28 100644 --- a/pkg/storage/replicate_queue.go +++ b/pkg/storage/replicate_queue.go @@ -197,7 +197,7 @@ func (rq *replicateQueue) shouldQueue( return false, 0 } - target, _ := rq.allocator.RebalanceTarget(ctx, zone.Constraints, rangeInfo, storeFilterThrottled) + target, _ := rq.allocator.RebalanceTarget(ctx, zone.Constraints, repl.RaftStatus(), rangeInfo, storeFilterThrottled) if target != nil { log.VEventf(ctx, 2, "rebalance target found, enqueuing") } else { @@ -485,7 +485,7 @@ func (rq *replicateQueue) processOneChange( if !rq.store.TestingKnobs().DisableReplicaRebalancing { rebalanceStore, details := rq.allocator.RebalanceTarget( - ctx, zone.Constraints, rangeInfo, storeFilterThrottled) + ctx, zone.Constraints, repl.RaftStatus(), rangeInfo, storeFilterThrottled) if rebalanceStore == nil { log.VEventf(ctx, 1, "no suitable rebalance target") } else { @@ -572,7 +572,12 @@ func (rq *replicateQueue) addReplica( if dryRun { return nil } - return repl.changeReplicas(ctx, roachpb.ADD_REPLICA, target, desc, priority, reason, details) + if err := repl.changeReplicas(ctx, roachpb.ADD_REPLICA, target, desc, priority, reason, details); err != nil { + return err + } + rangeInfo := rangeInfoForRepl(repl, desc) + rq.allocator.storePool.updateLocalStoreAfterRebalance(target.StoreID, rangeInfo, roachpb.ADD_REPLICA) + return nil } func (rq *replicateQueue) removeReplica( @@ -587,7 +592,12 @@ func (rq *replicateQueue) removeReplica( if dryRun { return nil } - return repl.ChangeReplicas(ctx, roachpb.REMOVE_REPLICA, target, desc, reason, details) + if err := repl.ChangeReplicas(ctx, roachpb.REMOVE_REPLICA, target, desc, reason, details); err != nil { + return err + } + rangeInfo := rangeInfoForRepl(repl, desc) + rq.allocator.storePool.updateLocalStoreAfterRebalance(target.StoreID, rangeInfo, roachpb.REMOVE_REPLICA) + return nil } func (rq *replicateQueue) canTransferLease() bool { diff --git a/pkg/storage/store_pool.go b/pkg/storage/store_pool.go index be72d026848d..07fa574f9dfc 100644 --- a/pkg/storage/store_pool.go +++ b/pkg/storage/store_pool.go @@ -327,6 +327,39 @@ func (sp *StorePool) deadReplicasGossipUpdate(_ string, content roachpb.Value) { detail.deadReplicas = deadReplicas } +// updateLocalStoreAfterRebalance is used to update the local copy of the +// target store immediately after a rebalance. +func (sp *StorePool) updateLocalStoreAfterRebalance( + storeID roachpb.StoreID, rangeInfo RangeInfo, changeType roachpb.ReplicaChangeType, +) { + sp.detailsMu.Lock() + defer sp.detailsMu.Unlock() + detail := *sp.getStoreDetailLocked(storeID) + if detail.desc == nil { + // We don't have this store yet (this is normal when we're + // starting up and don't have full information from the gossip + // network). We can't update the local store at this time. + return + } + switch changeType { + case roachpb.ADD_REPLICA: + detail.desc.Capacity.LogicalBytes += rangeInfo.LogicalBytes + detail.desc.Capacity.WritesPerSecond += rangeInfo.WritesPerSecond + case roachpb.REMOVE_REPLICA: + if detail.desc.Capacity.LogicalBytes <= rangeInfo.LogicalBytes { + detail.desc.Capacity.LogicalBytes = 0 + } else { + detail.desc.Capacity.LogicalBytes -= rangeInfo.LogicalBytes + } + if detail.desc.Capacity.WritesPerSecond <= rangeInfo.WritesPerSecond { + detail.desc.Capacity.WritesPerSecond = 0 + } else { + detail.desc.Capacity.WritesPerSecond -= rangeInfo.WritesPerSecond + } + } + sp.detailsMu.storeDetails[storeID] = &detail +} + // newStoreDetail makes a new storeDetail struct. It sets index to be -1 to // ensure that it will be processed by a queue immediately. func newStoreDetail() *storeDetail { diff --git a/pkg/storage/store_pool_test.go b/pkg/storage/store_pool_test.go index 33f1f8cd303f..3422f2050c96 100644 --- a/pkg/storage/store_pool_test.go +++ b/pkg/storage/store_pool_test.go @@ -30,6 +30,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -318,6 +320,125 @@ func TestStorePoolGetStoreList(t *testing.T) { } } +func TestStorePoolUpdateLocalStore(t *testing.T) { + defer leaktest.AfterTest(t)() + manual := hlc.NewManualClock(123) + clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) + // We're going to manually mark stores dead in this test. + stopper, g, _, sp, _ := createTestStorePool( + TestTimeUntilStoreDead, false /* deterministic */, nodeStatusDead) + defer stopper.Stop(context.TODO()) + sg := gossiputil.NewStoreGossiper(g) + stores := []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{ + Capacity: 100, + Available: 50, + RangeCount: 5, + LogicalBytes: 30, + WritesPerSecond: 30, + }, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Capacity: roachpb.StoreCapacity{ + Capacity: 100, + Available: 55, + RangeCount: 4, + LogicalBytes: 25, + WritesPerSecond: 25, + }, + }, + } + sg.GossipStores(stores, t) + + replica := &Replica{RangeID: 1} + replica.mu.Lock() + replica.mu.state.Stats = enginepb.MVCCStats{ + KeyBytes: 2, + ValBytes: 4, + } + replica.mu.Unlock() + rs := newReplicaStats(clock, nil) + for _, store := range stores { + rs.record(store.Node.NodeID) + } + manual.Increment(int64(MinStatsDuration + time.Second)) + replica.writeStats = rs + + rangeDesc := &roachpb.RangeDescriptor{ + RangeID: replica.RangeID, + } + + rangeInfo := rangeInfoForRepl(replica, rangeDesc) + + sp.updateLocalStoreAfterRebalance(roachpb.StoreID(1), rangeInfo, roachpb.ADD_REPLICA) + desc, ok := sp.getStoreDescriptor(roachpb.StoreID(1)) + if !ok { + t.Fatalf("couldn't find StoreDescriptor for Store ID %d", 1) + } + QPS, _ := replica.writeStats.avgQPS() + if expectedBytes, expectedQPS := int64(36), 30+QPS; desc.Capacity.LogicalBytes != expectedBytes || desc.Capacity.WritesPerSecond != expectedQPS { + t.Fatalf("expected Logical bytes %d, but got %d, expected WritesPerSecond %f, but got %f", + expectedBytes, desc.Capacity.LogicalBytes, expectedQPS, desc.Capacity.WritesPerSecond) + } + + sp.updateLocalStoreAfterRebalance(roachpb.StoreID(2), rangeInfo, roachpb.REMOVE_REPLICA) + desc, ok = sp.getStoreDescriptor(roachpb.StoreID(2)) + if !ok { + t.Fatalf("couldn't find StoreDescriptor for Store ID %d", 2) + } + if expectedBytes, expectedQPS := int64(19), 25-QPS; desc.Capacity.LogicalBytes != expectedBytes || desc.Capacity.WritesPerSecond != expectedQPS { + t.Fatalf("expected Logical bytes %d, but got %d, expected WritesPerSecond %f, but got %f", + expectedBytes, desc.Capacity.LogicalBytes, expectedQPS, desc.Capacity.WritesPerSecond) + } +} + +// TestStorePoolUpdateLocalStoreBeforeGossip verifies that an attempt to update +// the local copy of store before that store has been gossiped will be a no-op. +func TestStorePoolUpdateLocalStoreBeforeGossip(t *testing.T) { + defer leaktest.AfterTest(t)() + manual := hlc.NewManualClock(123) + clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) + stopper, _, _, sp, _ := createTestStorePool( + TestTimeUntilStoreDead, false /* deterministic */, nodeStatusDead) + defer stopper.Stop(context.TODO()) + + // Create store. + node := roachpb.NodeDescriptor{NodeID: roachpb.NodeID(1)} + eng := engine.NewInMem(roachpb.Attributes{}, 1<<20) + stopper.AddCloser(eng) + cfg := TestStoreConfig(clock) + cfg.Transport = NewDummyRaftTransport(cfg.Settings) + store := NewStore(cfg, eng, &node) + + // Create replica. + rg := roachpb.RangeDescriptor{ + RangeID: 1, + StartKey: roachpb.RKey([]byte("a")), + EndKey: roachpb.RKey([]byte("b")), + } + replica, err := NewReplica(&rg, store, roachpb.ReplicaID(0)) + if err != nil { + t.Fatalf("make replica error : %s", err) + } + + rangeInfo := rangeInfoForRepl(replica, &rg) + + // Update StorePool, which should be a no-op. + storeID := roachpb.StoreID(1) + if _, ok := sp.getStoreDescriptor(storeID); ok { + t.Fatalf("StoreDescriptor not gossiped, should not be found") + } + sp.updateLocalStoreAfterRebalance(storeID, rangeInfo, roachpb.ADD_REPLICA) + if _, ok := sp.getStoreDescriptor(storeID); ok { + t.Fatalf("StoreDescriptor still not gossiped, should not be found") + } +} + func TestStorePoolGetStoreDetails(t *testing.T) { defer leaktest.AfterTest(t)() stopper, g, _, sp, _ := createTestStorePool(