From 56334949b01ef8666259cf204e2f64ed95dacf2e Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 21 Oct 2022 13:26:46 +0800 Subject: [PATCH] reduce get overlap operations Signed-off-by: Ryan Leung --- server/cluster/cluster.go | 2 +- server/cluster/cluster_test.go | 10 +++++----- server/core/basic_cluster.go | 22 ++++++++++---------- server/core/rangetree/range_tree.go | 20 ++++++++++++++++--- server/core/region.go | 13 +++++++++--- server/core/region_test.go | 16 +++++++-------- server/core/region_tree.go | 30 ++++++++++++++++++++++++++++ server/region_syncer/client.go | 2 +- server/schedule/range_cluster.go | 2 +- server/schedulers/balance_test.go | 6 +++--- server/schedulers/scheduler_test.go | 2 +- server/storage/storage_test.go | 26 ++++++++++++------------ tools/pd-simulator/simulator/raft.go | 2 +- 13 files changed, 102 insertions(+), 51 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 32c8eefc2849..2e59a03e82c8 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -829,7 +829,7 @@ var regionGuide = core.GenerateRegionGuideFunc(true) // processRegionHeartbeat updates the region information. func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { - origin, err := c.core.PreCheckPutRegion(region) + origin, _, err := c.core.PreCheckPutRegion(region) if err != nil { return err } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index c83c4e4de212..083eee5d0081 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -1678,7 +1678,7 @@ func Test(t *testing.T) { re.Nil(cache.GetRegionByKey(regionKey)) checkRegions(re, cache, regions[0:i]) - cache.SetRegion(region) + cache.SetRegion(region, false) checkRegion(re, cache.GetRegion(i), region) checkRegion(re, cache.GetRegionByKey(regionKey), region) checkRegions(re, cache, regions[0:(i+1)]) @@ -1691,7 +1691,7 @@ func Test(t *testing.T) { // Update leader to peer np-1. newRegion := region.Clone(core.WithLeader(region.GetPeers()[np-1])) regions[i] = newRegion - cache.SetRegion(newRegion) + cache.SetRegion(newRegion, false) checkRegion(re, cache.GetRegion(i), newRegion) checkRegion(re, cache.GetRegionByKey(regionKey), newRegion) checkRegions(re, cache, regions[0:(i+1)]) @@ -1704,7 +1704,7 @@ func Test(t *testing.T) { // Reset leader to peer 0. newRegion = region.Clone(core.WithLeader(region.GetPeers()[0])) regions[i] = newRegion - cache.SetRegion(newRegion) + cache.SetRegion(newRegion, false) checkRegion(re, cache.GetRegion(i), newRegion) checkRegions(re, cache, regions[0:(i+1)]) checkRegion(re, cache.GetRegionByKey(regionKey), newRegion) @@ -1725,7 +1725,7 @@ func Test(t *testing.T) { // check overlaps // clone it otherwise there are two items with the same key in the tree overlapRegion := regions[n-1].Clone(core.WithStartKey(regions[n-2].GetStartKey())) - cache.SetRegion(overlapRegion) + cache.SetRegion(overlapRegion, false) re.Nil(cache.GetRegion(n - 2)) re.NotNil(cache.GetRegion(n - 1)) @@ -1734,7 +1734,7 @@ func Test(t *testing.T) { for j := 0; j < cache.GetStoreLeaderCount(i); j++ { region := filter.SelectOneRegion(tc.RandLeaderRegions(i, []core.KeyRange{core.NewKeyRange("", "")}), nil, pendingFilter, downFilter) newRegion := region.Clone(core.WithPendingPeers(region.GetPeers())) - cache.SetRegion(newRegion) + cache.SetRegion(newRegion, false) } re.Nil(filter.SelectOneRegion(tc.RandLeaderRegions(i, []core.KeyRange{core.NewKeyRange("", "")}), nil, pendingFilter, downFilter)) } diff --git a/server/core/basic_cluster.go b/server/core/basic_cluster.go index e6bc2bb8d756..512c7bb4cf0a 100644 --- a/server/core/basic_cluster.go +++ b/server/core/basic_cluster.go @@ -430,22 +430,22 @@ func (bc *BasicCluster) getRelevantRegionsLocked(region *RegionInfo) (origin *Re /* Regions write operations */ // PreCheckPutRegion checks if the region is valid to put. -func (bc *BasicCluster) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, error) { +func (bc *BasicCluster) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, []*RegionInfo, error) { bc.Regions.mu.RLock() origin, overlaps := bc.getRelevantRegionsLocked(region) bc.Regions.mu.RUnlock() return bc.check(region, origin, overlaps) } -func (bc *BasicCluster) check(region, origin *RegionInfo, overlaps []*RegionInfo) (*RegionInfo, error) { +func (bc *BasicCluster) check(region, origin *RegionInfo, overlaps []*RegionInfo) (*RegionInfo, []*RegionInfo, error) { for _, item := range overlaps { // PD ignores stale regions' heartbeats, unless it is recreated recently by unsafe recover operation. if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() && !region.isRegionRecreated() { - return nil, errRegionIsStale(region.GetMeta(), item.GetMeta()) + return nil, nil, errRegionIsStale(region.GetMeta(), item.GetMeta()) } } if origin == nil { - return nil, nil + return nil, nil, nil } r := region.GetRegionEpoch() @@ -454,21 +454,21 @@ func (bc *BasicCluster) check(region, origin *RegionInfo, overlaps []*RegionInfo isTermBehind := region.GetTerm() > 0 && region.GetTerm() < origin.GetTerm() // Region meta is stale, return an error. if (isTermBehind || r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer()) && !region.isRegionRecreated() { - return origin, errRegionIsStale(region.GetMeta(), origin.GetMeta()) + return origin, nil, errRegionIsStale(region.GetMeta(), origin.GetMeta()) } - return origin, nil + return origin, overlaps, nil } // CheckAndPutRegion checks if the region is valid to put, if valid then put. func (bc *BasicCluster) CheckAndPutRegion(region *RegionInfo) []*RegionInfo { - origin, err := bc.PreCheckPutRegion(region) + origin, overlaps, err := bc.PreCheckPutRegion(region) if err != nil { log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err)) // return the state region to delete. return []*RegionInfo{region} } - return bc.PutRegion(region) + return bc.Regions.SetRegion(region, true, overlaps...) } // AtomicCheckAndPutRegion checks if the region is valid to put, if valid then put. @@ -476,18 +476,18 @@ func (bc *BasicCluster) AtomicCheckAndPutRegion(region *RegionInfo) ([]*RegionIn bc.Regions.mu.Lock() defer bc.Regions.mu.Unlock() origin, overlaps := bc.getRelevantRegionsLocked(region) - _, err := bc.check(region, origin, overlaps) + _, _, err := bc.check(region, origin, overlaps) if err != nil { return nil, err } - return bc.Regions.SetRegion(region), nil + return bc.Regions.SetRegion(region, false), nil } // PutRegion put a region. func (bc *BasicCluster) PutRegion(region *RegionInfo) []*RegionInfo { bc.Regions.mu.Lock() defer bc.Regions.mu.Unlock() - return bc.Regions.SetRegion(region) + return bc.Regions.SetRegion(region, false) } // RemoveRegionIfExist removes RegionInfo from regionTree and regionMap if exists. diff --git a/server/core/rangetree/range_tree.go b/server/core/rangetree/range_tree.go index a7e63d61787d..216a5cdbe77c 100644 --- a/server/core/rangetree/range_tree.go +++ b/server/core/rangetree/range_tree.go @@ -44,6 +44,11 @@ func NewRangeTree(degree int, factory DebrisFactory) *RangeTree { } } +// Factory is the factory that generates some debris when updating items. +func (r *RangeTree) Factory(startKey, endKey []byte, old RangeItem) []RangeItem { + return r.factory(startKey, endKey, old) +} + // Update insert the item and delete overlaps. func (r *RangeTree) Update(item RangeItem) []RangeItem { overlaps := r.GetOverlaps(item) @@ -52,16 +57,25 @@ func (r *RangeTree) Update(item RangeItem) []RangeItem { children := r.factory(item.GetStartKey(), item.GetEndKey(), old) for _, child := range children { if c := bytes.Compare(child.GetStartKey(), child.GetEndKey()); c < 0 { - r.tree.ReplaceOrInsert(child) + r.ReplaceOrInsert(child) } else if c > 0 && len(child.GetEndKey()) == 0 { - r.tree.ReplaceOrInsert(child) + r.ReplaceOrInsert(child) } } } - r.tree.ReplaceOrInsert(item) + r.ReplaceOrInsert(item) return overlaps } +// ReplaceOrInsert adds the given item to the tree. If an item in the tree +// already equals the given one, it is removed from the tree and returned. +// Otherwise, nil is returned. +// +// nil cannot be added to the tree (will panic). +func (r *RangeTree) ReplaceOrInsert(item RangeItem) { + r.tree.ReplaceOrInsert(item) +} + // GetOverlaps returns the range items that has some intersections with the given items. func (r *RangeTree) GetOverlaps(item RangeItem) []RangeItem { // note that Find() gets the last item that is less or equal than the item. diff --git a/server/core/region.go b/server/core/region.go index ae7063c799aa..f0e0f8b3ab7e 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -729,7 +729,7 @@ func (r *RegionsInfo) GetRegion(regionID uint64) *RegionInfo { // SetRegion sets the RegionInfo to regionTree and regionMap, also update leaders and followers by region peers // overlaps: Other regions that overlap with the specified region, excluding itself. -func (r *RegionsInfo) SetRegion(region *RegionInfo) (overlaps []*RegionInfo) { +func (r *RegionsInfo) SetRegion(region *RegionInfo, passCheck bool, ol ...*RegionInfo) (overlaps []*RegionInfo) { var item *regionItem // Pointer to the *RegionInfo of this ID. rangeChanged := true // This Region is new, or its range has changed. if item = r.regions[region.GetID()]; item != nil { @@ -764,8 +764,15 @@ func (r *RegionsInfo) SetRegion(region *RegionInfo) (overlaps []*RegionInfo) { } if rangeChanged { - // It has been removed and all information needs to be updated again. - overlaps = r.tree.update(item) + if passCheck { + if len(ol) != 0 { + overlaps = r.tree.updateWithOverlaps(item, ol) + } else { + r.tree.tree.ReplaceOrInsert(item) + } + } else { + overlaps = r.tree.update(item) + } for _, old := range overlaps { r.RemoveRegion(r.GetRegion(old.GetID())) } diff --git a/server/core/region_test.go b/server/core/region_test.go index 23acca7705c3..71e1cc6ade89 100644 --- a/server/core/region_test.go +++ b/server/core/region_test.go @@ -466,7 +466,7 @@ func TestSetRegion(t *testing.T) { StartKey: []byte(fmt.Sprintf("%20d", i*10)), EndKey: []byte(fmt.Sprintf("%20d", (i+1)*10)), }, peer1) - regions.SetRegion(region) + regions.SetRegion(region, false) } peer1 := &metapb.Peer{StoreId: uint64(4), Id: uint64(101)} @@ -479,12 +479,12 @@ func TestSetRegion(t *testing.T) { EndKey: []byte(fmt.Sprintf("%20d", 211)), }, peer1) region.pendingPeers = append(region.pendingPeers, peer3) - regions.SetRegion(region) + regions.SetRegion(region, false) checkRegions(re, regions) re.Equal(97, regions.tree.length()) re.Len(regions.GetRegions(), 97) - regions.SetRegion(region) + regions.SetRegion(region, false) peer1 = &metapb.Peer{StoreId: uint64(2), Id: uint64(101)} peer2 = &metapb.Peer{StoreId: uint64(3), Id: uint64(102), Role: metapb.PeerRole_Learner} peer3 = &metapb.Peer{StoreId: uint64(1), Id: uint64(103)} @@ -495,7 +495,7 @@ func TestSetRegion(t *testing.T) { EndKey: []byte(fmt.Sprintf("%20d", 212)), }, peer1) region.pendingPeers = append(region.pendingPeers, peer3) - regions.SetRegion(region) + regions.SetRegion(region, false) checkRegions(re, regions) re.Equal(97, regions.tree.length()) re.Len(regions.GetRegions(), 97) @@ -504,7 +504,7 @@ func TestSetRegion(t *testing.T) { region = region.Clone(WithStartKey([]byte(fmt.Sprintf("%20d", 175))), WithNewRegionID(201)) re.NotNil(regions.GetRegion(21)) re.NotNil(regions.GetRegion(18)) - regions.SetRegion(region) + regions.SetRegion(region, false) checkRegions(re, regions) re.Equal(96, regions.tree.length()) re.Len(regions.GetRegions(), 96) @@ -519,7 +519,7 @@ func TestSetRegion(t *testing.T) { SetWrittenBytes(40), SetWrittenKeys(10), SetReportInterval(0, 5)) - regions.SetRegion(region) + regions.SetRegion(region, false) checkRegions(re, regions) re.Equal(96, regions.tree.length()) re.Len(regions.GetRegions(), 96) @@ -630,7 +630,7 @@ func BenchmarkRandomRegion(b *testing.B) { StartKey: []byte(fmt.Sprintf("%20d", i)), EndKey: []byte(fmt.Sprintf("%20d", i+1)), }, peer) - regions.SetRegion(region) + regions.SetRegion(region, false) } b.ResetTimer() for i := 0; i < b.N; i++ { @@ -683,6 +683,6 @@ func BenchmarkAddRegion(b *testing.B) { } b.ResetTimer() for i := 0; i < b.N; i++ { - regions.SetRegion(items[i]) + regions.SetRegion(items[i], false) } } diff --git a/server/core/region_tree.go b/server/core/region_tree.go index 4029f8140e76..f6d4dc3c5eaa 100644 --- a/server/core/region_tree.go +++ b/server/core/region_tree.go @@ -147,6 +147,36 @@ func (t *regionTree) update(item *regionItem) []*RegionInfo { return result } +// updateWithOverlaps updates the tree with the region with pre obtained overlaps. +func (t *regionTree) updateWithOverlaps(item *regionItem, overlaps []*RegionInfo) []*RegionInfo { + region := item.RegionInfo + t.totalSize += region.approximateSize + regionWriteBytesRate, regionWriteKeysRate := region.GetWriteRate() + t.totalWriteBytesRate += regionWriteBytesRate + t.totalWriteKeysRate += regionWriteKeysRate + + for _, old := range overlaps { + o := ®ionItem{old} + t.tree.Delete(o) + } + t.tree.ReplaceOrInsert(item) + result := make([]*RegionInfo, len(overlaps)) + for i, overlap := range overlaps { + old := overlap + result[i] = old + log.Debug("overlapping region", + zap.Uint64("region-id", old.GetID()), + logutil.ZapRedactStringer("delete-region", RegionToHexMeta(old.GetMeta())), + logutil.ZapRedactStringer("update-region", RegionToHexMeta(region.GetMeta()))) + t.totalSize -= old.approximateSize + regionWriteBytesRate, regionWriteKeysRate = old.GetWriteRate() + t.totalWriteBytesRate -= regionWriteBytesRate + t.totalWriteKeysRate -= regionWriteKeysRate + } + + return result +} + // updateStat is used to update statistics when regionItem.RegionInfo is directly replaced. func (t *regionTree) updateStat(origin *RegionInfo, region *RegionInfo) { t.totalSize += region.approximateSize diff --git a/server/region_syncer/client.go b/server/region_syncer/client.go index debf39f556fa..20c0b9cb3027 100644 --- a/server/region_syncer/client.go +++ b/server/region_syncer/client.go @@ -211,7 +211,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { region = core.NewRegionInfo(r, regionLeader, core.SetFromHeartbeat(false)) } - origin, err := bc.PreCheckPutRegion(region) + origin, _, err := bc.PreCheckPutRegion(region) if err != nil { log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err)) continue diff --git a/server/schedule/range_cluster.go b/server/schedule/range_cluster.go index b3d1cb2e1ee8..f78f0eaf9212 100644 --- a/server/schedule/range_cluster.go +++ b/server/schedule/range_cluster.go @@ -31,7 +31,7 @@ type RangeCluster struct { func GenRangeCluster(cluster Cluster, startKey, endKey []byte) *RangeCluster { subCluster := core.NewBasicCluster() for _, r := range cluster.ScanRegions(startKey, endKey, -1) { - subCluster.Regions.SetRegion(r) + subCluster.Regions.SetRegion(r, false) } return &RangeCluster{ Cluster: cluster, diff --git a/server/schedulers/balance_test.go b/server/schedulers/balance_test.go index b1addea160e8..8b0983b2f615 100644 --- a/server/schedulers/balance_test.go +++ b/server/schedulers/balance_test.go @@ -1303,7 +1303,7 @@ func TestScatterRangeBalance(t *testing.T) { core.SetApproximateKeys(1), core.SetApproximateSize(1), ) - tc.Regions.SetRegion(regionInfo) + tc.Regions.SetRegion(regionInfo, false) } for i := 0; i < 100; i++ { _, err := tc.AllocPeer(1) @@ -1371,7 +1371,7 @@ func TestBalanceLeaderLimit(t *testing.T) { core.SetApproximateSize(96), ) - tc.Regions.SetRegion(regionInfo) + tc.Regions.SetRegion(regionInfo, false) } for i := 0; i < 100; i++ { @@ -1481,7 +1481,7 @@ func TestBalanceWhenRegionNotHeartbeat(t *testing.T) { core.SetApproximateSize(96), ) - tc.Regions.SetRegion(regionInfo) + tc.Regions.SetRegion(regionInfo, false) } for i := 1; i <= 3; i++ { diff --git a/server/schedulers/scheduler_test.go b/server/schedulers/scheduler_test.go index f4fd018441d0..0d680e67fd8e 100644 --- a/server/schedulers/scheduler_test.go +++ b/server/schedulers/scheduler_test.go @@ -118,7 +118,7 @@ func TestRejectLeader(t *testing.T) { break } } - tc.Regions.SetRegion(region) + tc.Regions.SetRegion(region, false) ops, _ = sl.Schedule(tc, false) testutil.CheckTransferLeader(re, ops[0], operator.OpLeader, 1, 2) } diff --git a/server/storage/storage_test.go b/server/storage/storage_test.go index 8793fbfc8c6a..342ad302d9c7 100644 --- a/server/storage/storage_test.go +++ b/server/storage/storage_test.go @@ -212,11 +212,11 @@ func TestLoadMinServiceGCSafePoint(t *testing.T) { func TestLoadRegions(t *testing.T) { re := require.New(t) storage := NewStorageWithMemoryBackend() - cache := core.NewRegionsInfo() + cache := core.NewBasicCluster() n := 10 regions := mustSaveRegions(re, storage, n) - re.NoError(storage.LoadRegions(context.Background(), cache.SetRegion)) + re.NoError(storage.LoadRegions(context.Background(), cache.CheckAndPutRegion)) re.Equal(n, cache.GetRegionCount()) for _, region := range cache.GetMetaRegions() { @@ -249,11 +249,11 @@ func newTestRegionMeta(regionID uint64) *metapb.Region { func TestLoadRegionsToCache(t *testing.T) { re := require.New(t) storage := NewStorageWithMemoryBackend() - cache := core.NewRegionsInfo() + cache := core.NewBasicCluster() n := 10 regions := mustSaveRegions(re, storage, n) - re.NoError(TryLoadRegionsOnce(context.Background(), storage, cache.SetRegion)) + re.NoError(TryLoadRegionsOnce(context.Background(), storage, cache.CheckAndPutRegion)) re.Equal(n, cache.GetRegionCount()) for _, region := range cache.GetMetaRegions() { @@ -262,7 +262,7 @@ func TestLoadRegionsToCache(t *testing.T) { n = 20 mustSaveRegions(re, storage, n) - re.NoError(TryLoadRegionsOnce(context.Background(), storage, cache.SetRegion)) + re.NoError(TryLoadRegionsOnce(context.Background(), storage, cache.CheckAndPutRegion)) re.Equal(n, cache.GetRegionCount()) } @@ -270,11 +270,11 @@ func TestLoadRegionsExceedRangeLimit(t *testing.T) { re := require.New(t) re.NoError(failpoint.Enable("github.com/tikv/pd/server/storage/kv/withRangeLimit", "return(500)")) storage := NewStorageWithMemoryBackend() - cache := core.NewRegionsInfo() + cache := core.NewBasicCluster() n := 1000 regions := mustSaveRegions(re, storage, n) - re.NoError(storage.LoadRegions(context.Background(), cache.SetRegion)) + re.NoError(storage.LoadRegions(context.Background(), cache.CheckAndPutRegion)) re.Equal(n, cache.GetRegionCount()) for _, region := range cache.GetMetaRegions() { re.Equal(regions[region.GetId()], region) @@ -287,13 +287,13 @@ func TestTrySwitchRegionStorage(t *testing.T) { defaultStorage := NewStorageWithMemoryBackend() localStorage := NewStorageWithMemoryBackend() storage := NewCoreStorage(defaultStorage, localStorage) - defaultCache := core.NewRegionsInfo() - localCache := core.NewRegionsInfo() + defaultCache := core.NewBasicCluster() + localCache := core.NewBasicCluster() TrySwitchRegionStorage(storage, false) regions10 := mustSaveRegions(re, storage, 10) - re.NoError(defaultStorage.LoadRegions(context.Background(), defaultCache.SetRegion)) - re.NoError(localStorage.LoadRegions(context.Background(), localCache.SetRegion)) + re.NoError(defaultStorage.LoadRegions(context.Background(), defaultCache.CheckAndPutRegion)) + re.NoError(localStorage.LoadRegions(context.Background(), localCache.CheckAndPutRegion)) re.Empty(localCache.GetMetaRegions()) re.Len(defaultCache.GetMetaRegions(), 10) for _, region := range defaultCache.GetMetaRegions() { @@ -302,8 +302,8 @@ func TestTrySwitchRegionStorage(t *testing.T) { TrySwitchRegionStorage(storage, true) regions20 := mustSaveRegions(re, storage, 20) - re.NoError(defaultStorage.LoadRegions(context.Background(), defaultCache.SetRegion)) - re.NoError(localStorage.LoadRegions(context.Background(), localCache.SetRegion)) + re.NoError(defaultStorage.LoadRegions(context.Background(), defaultCache.CheckAndPutRegion)) + re.NoError(localStorage.LoadRegions(context.Background(), localCache.CheckAndPutRegion)) re.Len(defaultCache.GetMetaRegions(), 10) re.Len(localCache.GetMetaRegions(), 20) for _, region := range defaultCache.GetMetaRegions() { diff --git a/tools/pd-simulator/simulator/raft.go b/tools/pd-simulator/simulator/raft.go index 45964eeecb2a..db8d23e2643f 100644 --- a/tools/pd-simulator/simulator/raft.go +++ b/tools/pd-simulator/simulator/raft.go @@ -275,7 +275,7 @@ func (r *RaftEngine) GetRegions() []*core.RegionInfo { func (r *RaftEngine) SetRegion(region *core.RegionInfo) []*core.RegionInfo { r.Lock() defer r.Unlock() - return r.regionsInfo.SetRegion(region) + return r.regionsInfo.SetRegion(region, false) } // GetRegionByKey searches the RegionInfo from regionTree