Skip to content

Commit

Permalink
support range in region tree
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Sep 30, 2019
1 parent 93d49b7 commit 51c1215
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 50 deletions.
4 changes: 2 additions & 2 deletions server/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,10 +905,10 @@ func checkRegions(c *C, cache *core.RegionsInfo, regions []*core.RegionInfo) {
regionCount[peer.StoreId]++
if peer.Id == region.GetLeader().Id {
leaderCount[peer.StoreId]++
checkRegion(c, cache.GetLeader(peer.StoreId, region.GetID()), region)
checkRegion(c, cache.GetLeader(peer.StoreId, region), region)
} else {
followerCount[peer.StoreId]++
checkRegion(c, cache.GetFollower(peer.StoreId, region.GetID()), region)
checkRegion(c, cache.GetFollower(peer.StoreId, region), region)
}
}
}
Expand Down
184 changes: 141 additions & 43 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,13 +430,6 @@ func (rm *regionMap) Put(region *RegionInfo) {
rm.totalKeys += region.approximateKeys
}

func (rm *regionMap) RandomRegion() *RegionInfo {
if rm.Len() == 0 {
return nil
}
return rm.Get(rm.ids[rand.Intn(rm.Len())])
}

func (rm *regionMap) Delete(id uint64) {
if rm == nil {
return
Expand All @@ -460,25 +453,125 @@ func (rm *regionMap) TotalSize() int64 {
return rm.totalSize
}

// regionSubTree is used to manager different types of regions.
type regionSubTree struct {
*regionTree
totalSize int64
totalKeys int64
}

func newRegionSubTree() *regionSubTree {
return &regionSubTree{
regionTree: newRegionTree(),
totalSize: 0,
}
}

func (rst *regionSubTree) TotalSize() int64 {
if rst.length() == 0 {
return 0
}
return rst.totalSize
}

func (rst *regionSubTree) scanRanges() []*RegionInfo {
var res []*RegionInfo
rst.scanRange([]byte(""), func(region *RegionInfo) bool {
res = append(res, region)
return true
})
return res
}

func (rst *regionSubTree) update(region *RegionInfo) {
if r := rst.find(region); r != nil {
rst.totalSize += region.approximateSize - r.region.approximateSize
rst.totalKeys += region.approximateKeys - r.region.approximateKeys
r.region = region
return
}
rst.totalSize += region.approximateSize
rst.totalKeys += region.approximateKeys
rst.regionTree.update(region)
}

func (rst *regionSubTree) remove(region *RegionInfo) {
if rst == nil {
return
}
rst.regionTree.remove(region)
}

func (rst *regionSubTree) length() int {
if rst == nil {
return 0
}
return rst.regionTree.length()
}

func (rst *regionSubTree) RandomRegion(startKey, endKey []byte) *RegionInfo {
if rst == nil || rst.regionTree.length() == 0 {
return nil
}

// The tree only contains one region.
if rst.regionTree.length() == 1 {
return rst.tree.Min().(*regionItem).region
}

t := rst.regionTree
var startIndex, endIndex, index int
var startRegion, endRegion *RegionInfo

if len(startKey) != 0 {
startRegion, startIndex = t.getWithIndex(&regionItem{region: &RegionInfo{meta: &metapb.Region{StartKey: startKey}}})
} else {
startRegion, startIndex = t.getWithIndex(t.tree.Min())
}

if len(endKey) != 0 {
endRegion, endIndex = t.getWithIndex(&regionItem{region: &RegionInfo{meta: &metapb.Region{StartKey: endKey}}})
} else {
_, endIndex = t.getWithIndex(t.tree.Max())
endRegion = nil
endIndex++
}

if endIndex == startIndex {
if endRegion == nil {
return t.tree.GetAt(startIndex - 1).(*regionItem).region
}
return t.tree.GetAt(startIndex).(*regionItem).region
}

if endRegion == nil && startRegion == nil {
index = rand.Intn(endIndex-startIndex+1) + startIndex
return t.tree.GetAt(index - 1).(*regionItem).region
}

index = rand.Intn(endIndex-startIndex) + startIndex
return t.tree.GetAt(index).(*regionItem).region
}

// RegionsInfo for export
type RegionsInfo struct {
tree *regionTree
regions *regionMap // regionID -> regionInfo
leaders map[uint64]*regionMap // storeID -> regionID -> regionInfo
followers map[uint64]*regionMap // storeID -> regionID -> regionInfo
learners map[uint64]*regionMap // storeID -> regionID -> regionInfo
pendingPeers map[uint64]*regionMap // storeID -> regionID -> regionInfo
regions *regionMap // regionID -> regionInfo
leaders map[uint64]*regionSubTree // storeID -> regionID -> regionInfo
followers map[uint64]*regionSubTree // storeID -> regionID -> regionInfo
learners map[uint64]*regionSubTree // storeID -> regionID -> regionInfo
pendingPeers map[uint64]*regionSubTree // storeID -> regionID -> regionInfo
}

// NewRegionsInfo creates RegionsInfo with tree, regions, leaders and followers
func NewRegionsInfo() *RegionsInfo {
return &RegionsInfo{
tree: newRegionTree(),
regions: newRegionMap(),
leaders: make(map[uint64]*regionMap),
followers: make(map[uint64]*regionMap),
learners: make(map[uint64]*regionMap),
pendingPeers: make(map[uint64]*regionMap),
leaders: make(map[uint64]*regionSubTree),
followers: make(map[uint64]*regionSubTree),
learners: make(map[uint64]*regionSubTree),
pendingPeers: make(map[uint64]*regionSubTree),
}
}

Expand Down Expand Up @@ -531,18 +624,18 @@ func (r *RegionsInfo) AddRegion(region *RegionInfo) []*RegionInfo {
// Add leader peer to leaders.
store, ok := r.leaders[storeID]
if !ok {
store = newRegionMap()
store = newRegionSubTree()
r.leaders[storeID] = store
}
store.Put(region)
store.update(region)
} else {
// Add follower peer to followers.
store, ok := r.followers[storeID]
if !ok {
store = newRegionMap()
store = newRegionSubTree()
r.followers[storeID] = store
}
store.Put(region)
store.update(region)
}
}

Expand All @@ -551,20 +644,20 @@ func (r *RegionsInfo) AddRegion(region *RegionInfo) []*RegionInfo {
storeID := peer.GetStoreId()
store, ok := r.learners[storeID]
if !ok {
store = newRegionMap()
store = newRegionSubTree()
r.learners[storeID] = store
}
store.Put(region)
store.update(region)
}

for _, peer := range region.pendingPeers {
storeID := peer.GetStoreId()
store, ok := r.pendingPeers[storeID]
if !ok {
store = newRegionMap()
store = newRegionSubTree()
r.pendingPeers[storeID] = store
}
store.Put(region)
store.update(region)
}

return overlaps
Expand All @@ -578,10 +671,10 @@ func (r *RegionsInfo) RemoveRegion(region *RegionInfo) {
// Remove from leaders and followers.
for _, peer := range region.meta.GetPeers() {
storeID := peer.GetStoreId()
r.leaders[storeID].Delete(region.GetID())
r.followers[storeID].Delete(region.GetID())
r.learners[storeID].Delete(region.GetID())
r.pendingPeers[storeID].Delete(region.GetID())
r.leaders[storeID].remove(region)
r.followers[storeID].remove(region)
r.learners[storeID].remove(region)
r.pendingPeers[storeID].remove(region)
}
}

Expand Down Expand Up @@ -616,13 +709,13 @@ func (r *RegionsInfo) GetRegions() []*RegionInfo {
func (r *RegionsInfo) GetStoreRegions(storeID uint64) []*RegionInfo {
regions := make([]*RegionInfo, 0, r.GetStoreLeaderCount(storeID)+r.GetStoreFollowerCount(storeID))
if leaders, ok := r.leaders[storeID]; ok {
for _, region := range leaders.m {
regions = append(regions, region.RegionInfo)
for _, region := range leaders.scanRanges() {
regions = append(regions, region)
}
}
if followers, ok := r.followers[storeID]; ok {
for _, region := range followers.m {
regions = append(regions, region.RegionInfo)
for _, region := range followers.scanRanges() {
regions = append(regions, region)
}
}
return regions
Expand Down Expand Up @@ -669,27 +762,27 @@ func (r *RegionsInfo) GetStoreRegionCount(storeID uint64) int {

// GetStorePendingPeerCount gets the total count of a store's region that includes pending peer
func (r *RegionsInfo) GetStorePendingPeerCount(storeID uint64) int {
return r.pendingPeers[storeID].Len()
return r.pendingPeers[storeID].length()
}

// GetStoreLeaderCount get the total count of a store's leader RegionInfo
func (r *RegionsInfo) GetStoreLeaderCount(storeID uint64) int {
return r.leaders[storeID].Len()
return r.leaders[storeID].length()
}

// GetStoreFollowerCount get the total count of a store's follower RegionInfo
func (r *RegionsInfo) GetStoreFollowerCount(storeID uint64) int {
return r.followers[storeID].Len()
return r.followers[storeID].length()
}

// GetStoreLearnerCount get the total count of a store's learner RegionInfo
func (r *RegionsInfo) GetStoreLearnerCount(storeID uint64) int {
return r.learners[storeID].Len()
return r.learners[storeID].length()
}

// RandRegion get a region by random
func (r *RegionsInfo) RandRegion(opts ...RegionOption) *RegionInfo {
return randRegion(r.regions, opts...)
return randRegion(r.tree, opts...)
}

// RandPendingRegion randomly gets a store's region with a pending peer.
Expand All @@ -708,13 +801,13 @@ func (r *RegionsInfo) RandFollowerRegion(storeID uint64, opts ...RegionOption) *
}

// GetLeader return leader RegionInfo by storeID and regionID(now only used in test)
func (r *RegionsInfo) GetLeader(storeID uint64, regionID uint64) *RegionInfo {
return r.leaders[storeID].Get(regionID)
func (r *RegionsInfo) GetLeader(storeID uint64, region *RegionInfo) *RegionInfo {
return r.leaders[storeID].find(region).region
}

// GetFollower return follower RegionInfo by storeID and regionID(now only used in test)
func (r *RegionsInfo) GetFollower(storeID uint64, regionID uint64) *RegionInfo {
return r.followers[storeID].Get(regionID)
func (r *RegionsInfo) GetFollower(storeID uint64, region *RegionInfo) *RegionInfo {
return r.followers[storeID].find(region).region
}

// ScanRange scans regions intersecting [start key, end key), returns at most
Expand Down Expand Up @@ -764,9 +857,14 @@ func (r *RegionsInfo) GetAverageRegionSize() int64 {

const randomRegionMaxRetry = 10

func randRegion(regions *regionMap, opts ...RegionOption) *RegionInfo {
// RegionsContainer is a container to store regions.
type RegionsContainer interface {
RandomRegion(startKey, endKey []byte) *RegionInfo
}

func randRegion(regions RegionsContainer, opts ...RegionOption) *RegionInfo {
for i := 0; i < randomRegionMaxRetry; i++ {
region := regions.RandomRegion()
region := regions.RandomRegion(nil, nil)
if region == nil {
return nil
}
Expand Down
54 changes: 53 additions & 1 deletion server/core/region_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ package core

import (
"bytes"
"math/rand"

"github.com/google/btree"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/pd/pkg/btree"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -192,3 +193,54 @@ func (t *regionTree) getAdjacentRegions(region *RegionInfo) (*regionItem, *regio
})
return prev, next
}

func (t *regionTree) RandomRegion(startKey, endKey []byte) *RegionInfo {
if t.length() == 0 {
return nil
}

// The tree only contains one region.
if t.length() == 1 {
return t.tree.Min().(*regionItem).region
}

var startIndex, endIndex, index int
var startRegion, endRegion *RegionInfo

if len(startKey) != 0 {
startRegion, startIndex = t.getWithIndex(&regionItem{region: &RegionInfo{meta: &metapb.Region{StartKey: startKey}}})
} else {
startRegion, startIndex = t.getWithIndex(t.tree.Min())
}

if len(endKey) != 0 {
endRegion, endIndex = t.getWithIndex(&regionItem{region: &RegionInfo{meta: &metapb.Region{StartKey: endKey}}})
} else {
_, endIndex = t.getWithIndex(t.tree.Max())
endRegion = nil
endIndex++
}

if endIndex == startIndex {
if endRegion == nil {
return t.tree.GetAt(startIndex - 1).(*regionItem).region
}
return t.tree.GetAt(startIndex).(*regionItem).region
}

if endRegion == nil && startRegion == nil {
index = rand.Intn(endIndex-startIndex+1) + startIndex
return t.tree.GetAt(index - 1).(*regionItem).region
}

index = rand.Intn(endIndex-startIndex) + startIndex
return t.tree.GetAt(index).(*regionItem).region
}

func (t *regionTree) getWithIndex(item btree.Item) (*RegionInfo, int) {
item, index := t.tree.GetWithIndex(item)
if item != nil {
return item.(*regionItem).region, index
}
return nil, index
}
Loading

0 comments on commit 51c1215

Please sign in to comment.