Skip to content

Commit

Permalink
reduce get overlap operations
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Nov 30, 2022
1 parent 5d34111 commit 5633494
Show file tree
Hide file tree
Showing 13 changed files with 102 additions and 51 deletions.
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,7 @@ var regionGuide = core.GenerateRegionGuideFunc(true)

// processRegionHeartbeat updates the region information.
func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
origin, err := c.core.PreCheckPutRegion(region)
origin, _, err := c.core.PreCheckPutRegion(region)
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1678,7 +1678,7 @@ func Test(t *testing.T) {
re.Nil(cache.GetRegionByKey(regionKey))
checkRegions(re, cache, regions[0:i])

cache.SetRegion(region)
cache.SetRegion(region, false)
checkRegion(re, cache.GetRegion(i), region)
checkRegion(re, cache.GetRegionByKey(regionKey), region)
checkRegions(re, cache, regions[0:(i+1)])
Expand All @@ -1691,7 +1691,7 @@ func Test(t *testing.T) {
// Update leader to peer np-1.
newRegion := region.Clone(core.WithLeader(region.GetPeers()[np-1]))
regions[i] = newRegion
cache.SetRegion(newRegion)
cache.SetRegion(newRegion, false)
checkRegion(re, cache.GetRegion(i), newRegion)
checkRegion(re, cache.GetRegionByKey(regionKey), newRegion)
checkRegions(re, cache, regions[0:(i+1)])
Expand All @@ -1704,7 +1704,7 @@ func Test(t *testing.T) {
// Reset leader to peer 0.
newRegion = region.Clone(core.WithLeader(region.GetPeers()[0]))
regions[i] = newRegion
cache.SetRegion(newRegion)
cache.SetRegion(newRegion, false)
checkRegion(re, cache.GetRegion(i), newRegion)
checkRegions(re, cache, regions[0:(i+1)])
checkRegion(re, cache.GetRegionByKey(regionKey), newRegion)
Expand All @@ -1725,7 +1725,7 @@ func Test(t *testing.T) {
// check overlaps
// clone it otherwise there are two items with the same key in the tree
overlapRegion := regions[n-1].Clone(core.WithStartKey(regions[n-2].GetStartKey()))
cache.SetRegion(overlapRegion)
cache.SetRegion(overlapRegion, false)
re.Nil(cache.GetRegion(n - 2))
re.NotNil(cache.GetRegion(n - 1))

Expand All @@ -1734,7 +1734,7 @@ func Test(t *testing.T) {
for j := 0; j < cache.GetStoreLeaderCount(i); j++ {
region := filter.SelectOneRegion(tc.RandLeaderRegions(i, []core.KeyRange{core.NewKeyRange("", "")}), nil, pendingFilter, downFilter)
newRegion := region.Clone(core.WithPendingPeers(region.GetPeers()))
cache.SetRegion(newRegion)
cache.SetRegion(newRegion, false)
}
re.Nil(filter.SelectOneRegion(tc.RandLeaderRegions(i, []core.KeyRange{core.NewKeyRange("", "")}), nil, pendingFilter, downFilter))
}
Expand Down
22 changes: 11 additions & 11 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,22 +430,22 @@ func (bc *BasicCluster) getRelevantRegionsLocked(region *RegionInfo) (origin *Re
/* Regions write operations */

// PreCheckPutRegion checks if the region is valid to put.
func (bc *BasicCluster) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, error) {
func (bc *BasicCluster) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, []*RegionInfo, error) {
bc.Regions.mu.RLock()
origin, overlaps := bc.getRelevantRegionsLocked(region)
bc.Regions.mu.RUnlock()
return bc.check(region, origin, overlaps)
}

func (bc *BasicCluster) check(region, origin *RegionInfo, overlaps []*RegionInfo) (*RegionInfo, error) {
func (bc *BasicCluster) check(region, origin *RegionInfo, overlaps []*RegionInfo) (*RegionInfo, []*RegionInfo, error) {
for _, item := range overlaps {
// PD ignores stale regions' heartbeats, unless it is recreated recently by unsafe recover operation.
if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() && !region.isRegionRecreated() {
return nil, errRegionIsStale(region.GetMeta(), item.GetMeta())
return nil, nil, errRegionIsStale(region.GetMeta(), item.GetMeta())
}
}
if origin == nil {
return nil, nil
return nil, nil, nil
}

r := region.GetRegionEpoch()
Expand All @@ -454,40 +454,40 @@ func (bc *BasicCluster) check(region, origin *RegionInfo, overlaps []*RegionInfo
isTermBehind := region.GetTerm() > 0 && region.GetTerm() < origin.GetTerm()
// Region meta is stale, return an error.
if (isTermBehind || r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer()) && !region.isRegionRecreated() {
return origin, errRegionIsStale(region.GetMeta(), origin.GetMeta())
return origin, nil, errRegionIsStale(region.GetMeta(), origin.GetMeta())
}

return origin, nil
return origin, overlaps, nil
}

// CheckAndPutRegion checks if the region is valid to put, if valid then put.
func (bc *BasicCluster) CheckAndPutRegion(region *RegionInfo) []*RegionInfo {
origin, err := bc.PreCheckPutRegion(region)
origin, overlaps, err := bc.PreCheckPutRegion(region)
if err != nil {
log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err))
// return the state region to delete.
return []*RegionInfo{region}
}
return bc.PutRegion(region)
return bc.Regions.SetRegion(region, true, overlaps...)
}

// AtomicCheckAndPutRegion checks if the region is valid to put, if valid then put.
func (bc *BasicCluster) AtomicCheckAndPutRegion(region *RegionInfo) ([]*RegionInfo, error) {
bc.Regions.mu.Lock()
defer bc.Regions.mu.Unlock()
origin, overlaps := bc.getRelevantRegionsLocked(region)
_, err := bc.check(region, origin, overlaps)
_, _, err := bc.check(region, origin, overlaps)
if err != nil {
return nil, err
}
return bc.Regions.SetRegion(region), nil
return bc.Regions.SetRegion(region, false), nil
}

// PutRegion put a region.
func (bc *BasicCluster) PutRegion(region *RegionInfo) []*RegionInfo {
bc.Regions.mu.Lock()
defer bc.Regions.mu.Unlock()
return bc.Regions.SetRegion(region)
return bc.Regions.SetRegion(region, false)
}

// RemoveRegionIfExist removes RegionInfo from regionTree and regionMap if exists.
Expand Down
20 changes: 17 additions & 3 deletions server/core/rangetree/range_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ func NewRangeTree(degree int, factory DebrisFactory) *RangeTree {
}
}

// Factory is the factory that generates some debris when updating items.
func (r *RangeTree) Factory(startKey, endKey []byte, old RangeItem) []RangeItem {
return r.factory(startKey, endKey, old)
}

// Update insert the item and delete overlaps.
func (r *RangeTree) Update(item RangeItem) []RangeItem {
overlaps := r.GetOverlaps(item)
Expand All @@ -52,16 +57,25 @@ func (r *RangeTree) Update(item RangeItem) []RangeItem {
children := r.factory(item.GetStartKey(), item.GetEndKey(), old)
for _, child := range children {
if c := bytes.Compare(child.GetStartKey(), child.GetEndKey()); c < 0 {
r.tree.ReplaceOrInsert(child)
r.ReplaceOrInsert(child)
} else if c > 0 && len(child.GetEndKey()) == 0 {
r.tree.ReplaceOrInsert(child)
r.ReplaceOrInsert(child)
}
}
}
r.tree.ReplaceOrInsert(item)
r.ReplaceOrInsert(item)
return overlaps
}

// ReplaceOrInsert adds the given item to the tree. If an item in the tree
// already equals the given one, it is removed from the tree and returned.
// Otherwise, nil is returned.
//
// nil cannot be added to the tree (will panic).
func (r *RangeTree) ReplaceOrInsert(item RangeItem) {
r.tree.ReplaceOrInsert(item)
}

// GetOverlaps returns the range items that has some intersections with the given items.
func (r *RangeTree) GetOverlaps(item RangeItem) []RangeItem {
// note that Find() gets the last item that is less or equal than the item.
Expand Down
13 changes: 10 additions & 3 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ func (r *RegionsInfo) GetRegion(regionID uint64) *RegionInfo {

// SetRegion sets the RegionInfo to regionTree and regionMap, also update leaders and followers by region peers
// overlaps: Other regions that overlap with the specified region, excluding itself.
func (r *RegionsInfo) SetRegion(region *RegionInfo) (overlaps []*RegionInfo) {
func (r *RegionsInfo) SetRegion(region *RegionInfo, passCheck bool, ol ...*RegionInfo) (overlaps []*RegionInfo) {
var item *regionItem // Pointer to the *RegionInfo of this ID.
rangeChanged := true // This Region is new, or its range has changed.
if item = r.regions[region.GetID()]; item != nil {
Expand Down Expand Up @@ -764,8 +764,15 @@ func (r *RegionsInfo) SetRegion(region *RegionInfo) (overlaps []*RegionInfo) {
}

if rangeChanged {
// It has been removed and all information needs to be updated again.
overlaps = r.tree.update(item)
if passCheck {
if len(ol) != 0 {
overlaps = r.tree.updateWithOverlaps(item, ol)
} else {
r.tree.tree.ReplaceOrInsert(item)
}
} else {
overlaps = r.tree.update(item)
}
for _, old := range overlaps {
r.RemoveRegion(r.GetRegion(old.GetID()))
}
Expand Down
16 changes: 8 additions & 8 deletions server/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ func TestSetRegion(t *testing.T) {
StartKey: []byte(fmt.Sprintf("%20d", i*10)),
EndKey: []byte(fmt.Sprintf("%20d", (i+1)*10)),
}, peer1)
regions.SetRegion(region)
regions.SetRegion(region, false)
}

peer1 := &metapb.Peer{StoreId: uint64(4), Id: uint64(101)}
Expand All @@ -479,12 +479,12 @@ func TestSetRegion(t *testing.T) {
EndKey: []byte(fmt.Sprintf("%20d", 211)),
}, peer1)
region.pendingPeers = append(region.pendingPeers, peer3)
regions.SetRegion(region)
regions.SetRegion(region, false)
checkRegions(re, regions)
re.Equal(97, regions.tree.length())
re.Len(regions.GetRegions(), 97)

regions.SetRegion(region)
regions.SetRegion(region, false)
peer1 = &metapb.Peer{StoreId: uint64(2), Id: uint64(101)}
peer2 = &metapb.Peer{StoreId: uint64(3), Id: uint64(102), Role: metapb.PeerRole_Learner}
peer3 = &metapb.Peer{StoreId: uint64(1), Id: uint64(103)}
Expand All @@ -495,7 +495,7 @@ func TestSetRegion(t *testing.T) {
EndKey: []byte(fmt.Sprintf("%20d", 212)),
}, peer1)
region.pendingPeers = append(region.pendingPeers, peer3)
regions.SetRegion(region)
regions.SetRegion(region, false)
checkRegions(re, regions)
re.Equal(97, regions.tree.length())
re.Len(regions.GetRegions(), 97)
Expand All @@ -504,7 +504,7 @@ func TestSetRegion(t *testing.T) {
region = region.Clone(WithStartKey([]byte(fmt.Sprintf("%20d", 175))), WithNewRegionID(201))
re.NotNil(regions.GetRegion(21))
re.NotNil(regions.GetRegion(18))
regions.SetRegion(region)
regions.SetRegion(region, false)
checkRegions(re, regions)
re.Equal(96, regions.tree.length())
re.Len(regions.GetRegions(), 96)
Expand All @@ -519,7 +519,7 @@ func TestSetRegion(t *testing.T) {
SetWrittenBytes(40),
SetWrittenKeys(10),
SetReportInterval(0, 5))
regions.SetRegion(region)
regions.SetRegion(region, false)
checkRegions(re, regions)
re.Equal(96, regions.tree.length())
re.Len(regions.GetRegions(), 96)
Expand Down Expand Up @@ -630,7 +630,7 @@ func BenchmarkRandomRegion(b *testing.B) {
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: []byte(fmt.Sprintf("%20d", i+1)),
}, peer)
regions.SetRegion(region)
regions.SetRegion(region, false)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down Expand Up @@ -683,6 +683,6 @@ func BenchmarkAddRegion(b *testing.B) {
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
regions.SetRegion(items[i])
regions.SetRegion(items[i], false)
}
}
30 changes: 30 additions & 0 deletions server/core/region_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,36 @@ func (t *regionTree) update(item *regionItem) []*RegionInfo {
return result
}

// updateWithOverlaps updates the tree with the region with pre obtained overlaps.
func (t *regionTree) updateWithOverlaps(item *regionItem, overlaps []*RegionInfo) []*RegionInfo {
region := item.RegionInfo
t.totalSize += region.approximateSize
regionWriteBytesRate, regionWriteKeysRate := region.GetWriteRate()
t.totalWriteBytesRate += regionWriteBytesRate
t.totalWriteKeysRate += regionWriteKeysRate

for _, old := range overlaps {
o := &regionItem{old}
t.tree.Delete(o)
}
t.tree.ReplaceOrInsert(item)
result := make([]*RegionInfo, len(overlaps))
for i, overlap := range overlaps {
old := overlap
result[i] = old
log.Debug("overlapping region",
zap.Uint64("region-id", old.GetID()),
logutil.ZapRedactStringer("delete-region", RegionToHexMeta(old.GetMeta())),
logutil.ZapRedactStringer("update-region", RegionToHexMeta(region.GetMeta())))
t.totalSize -= old.approximateSize
regionWriteBytesRate, regionWriteKeysRate = old.GetWriteRate()
t.totalWriteBytesRate -= regionWriteBytesRate
t.totalWriteKeysRate -= regionWriteKeysRate
}

return result
}

// updateStat is used to update statistics when regionItem.RegionInfo is directly replaced.
func (t *regionTree) updateStat(origin *RegionInfo, region *RegionInfo) {
t.totalSize += region.approximateSize
Expand Down
2 changes: 1 addition & 1 deletion server/region_syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
region = core.NewRegionInfo(r, regionLeader, core.SetFromHeartbeat(false))
}

origin, err := bc.PreCheckPutRegion(region)
origin, _, err := bc.PreCheckPutRegion(region)
if err != nil {
log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err))
continue
Expand Down
2 changes: 1 addition & 1 deletion server/schedule/range_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type RangeCluster struct {
func GenRangeCluster(cluster Cluster, startKey, endKey []byte) *RangeCluster {
subCluster := core.NewBasicCluster()
for _, r := range cluster.ScanRegions(startKey, endKey, -1) {
subCluster.Regions.SetRegion(r)
subCluster.Regions.SetRegion(r, false)
}
return &RangeCluster{
Cluster: cluster,
Expand Down
6 changes: 3 additions & 3 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1303,7 +1303,7 @@ func TestScatterRangeBalance(t *testing.T) {
core.SetApproximateKeys(1),
core.SetApproximateSize(1),
)
tc.Regions.SetRegion(regionInfo)
tc.Regions.SetRegion(regionInfo, false)
}
for i := 0; i < 100; i++ {
_, err := tc.AllocPeer(1)
Expand Down Expand Up @@ -1371,7 +1371,7 @@ func TestBalanceLeaderLimit(t *testing.T) {
core.SetApproximateSize(96),
)

tc.Regions.SetRegion(regionInfo)
tc.Regions.SetRegion(regionInfo, false)
}

for i := 0; i < 100; i++ {
Expand Down Expand Up @@ -1481,7 +1481,7 @@ func TestBalanceWhenRegionNotHeartbeat(t *testing.T) {
core.SetApproximateSize(96),
)

tc.Regions.SetRegion(regionInfo)
tc.Regions.SetRegion(regionInfo, false)
}

for i := 1; i <= 3; i++ {
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestRejectLeader(t *testing.T) {
break
}
}
tc.Regions.SetRegion(region)
tc.Regions.SetRegion(region, false)
ops, _ = sl.Schedule(tc, false)
testutil.CheckTransferLeader(re, ops[0], operator.OpLeader, 1, 2)
}
Expand Down
Loading

0 comments on commit 5633494

Please sign in to comment.