Skip to content

Commit

Permalink
schedule: scatter regions to stores with the same engine (#2531) (#2706)
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
ti-srebot authored Aug 3, 2020
1 parent 4520e3e commit 238b9c1
Show file tree
Hide file tree
Showing 7 changed files with 329 additions and 172 deletions.
17 changes: 15 additions & 2 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,14 @@ func (mc *Cluster) AddLeaderRegion(regionID uint64, leaderID uint64, followerIds
return region
}

// AddRegionWithLearner adds region with specified leader, followers and learners.
func (mc *Cluster) AddRegionWithLearner(regionID uint64, leaderID uint64, followerIDs, learnerIDs []uint64) *core.RegionInfo {
origin := mc.MockRegionInfo(regionID, leaderID, followerIDs, learnerIDs, nil)
region := origin.Clone(core.SetApproximateSize(10), core.SetApproximateKeys(10))
mc.PutRegion(region)
return region
}

// AddLeaderRegionWithRange adds region with specified leader, followers and key range.
func (mc *Cluster) AddLeaderRegionWithRange(regionID uint64, startKey string, endKey string, leaderID uint64, followerIds ...uint64) {
o := mc.newMockRegionInfo(regionID, leaderID, followerIds...)
Expand Down Expand Up @@ -508,7 +516,7 @@ func (mc *Cluster) UpdateStoreStatus(id uint64) {
}

func (mc *Cluster) newMockRegionInfo(regionID uint64, leaderID uint64, followerIDs ...uint64) *core.RegionInfo {
return mc.MockRegionInfo(regionID, leaderID, followerIDs, nil)
return mc.MockRegionInfo(regionID, leaderID, followerIDs, []uint64{}, nil)
}

// GetOpt mocks method.
Expand Down Expand Up @@ -588,7 +596,7 @@ func (mc *Cluster) RemoveScheduler(name string) error {

// MockRegionInfo returns a mock region
func (mc *Cluster) MockRegionInfo(regionID uint64, leaderID uint64,
followerIDs []uint64, epoch *metapb.RegionEpoch) *core.RegionInfo {
followerIDs, learnerIDs []uint64, epoch *metapb.RegionEpoch) *core.RegionInfo {

region := &metapb.Region{
Id: regionID,
Expand All @@ -602,5 +610,10 @@ func (mc *Cluster) MockRegionInfo(regionID uint64, leaderID uint64,
peer, _ := mc.AllocPeer(id)
region.Peers = append(region.Peers, peer)
}
for _, id := range learnerIDs {
peer, _ := mc.AllocPeer(id)
peer.IsLearner = true
region.Peers = append(region.Peers, peer)
}
return core.NewRegionInfo(region, leader)
}
43 changes: 32 additions & 11 deletions server/schedule/filter/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,19 +598,11 @@ type engineFilter struct {
constraint placement.LabelConstraint
}

// NewEngineFilter creates a filter that filters out default engine stores.
// By default, all stores that are not marked with a special engine will be filtered out.
// Specify the special engine label if you want to include the special stores.
func NewEngineFilter(scope string, allowEngines ...string) Filter {
var values []string
for _, v := range allSpeicalEngines {
if slice.NoneOf(allowEngines, func(i int) bool { return allowEngines[i] == v }) {
values = append(values, v)
}
}
// NewEngineFilter creates a filter that only keeps allowedEngines.
func NewEngineFilter(scope string, allowedEngines ...string) Filter {
return &engineFilter{
scope: scope,
constraint: placement.LabelConstraint{Key: "engine", Op: "notIn", Values: values},
constraint: placement.LabelConstraint{Key: "engine", Op: "in", Values: allowedEngines},
}
}

Expand All @@ -630,6 +622,35 @@ func (f *engineFilter) Target(opt opt.Options, store *core.StoreInfo) bool {
return f.constraint.MatchStore(store)
}

type ordinaryEngineFilter struct {
scope string
constraint placement.LabelConstraint
}

// NewOrdinaryEngineFilter creates a filter that only keeps ordinary engine stores.
func NewOrdinaryEngineFilter(scope string) Filter {
return &ordinaryEngineFilter{
scope: scope,
constraint: placement.LabelConstraint{Key: "engine", Op: "notIn", Values: allSpeicalEngines},
}
}

func (f *ordinaryEngineFilter) Scope() string {
return f.scope
}

func (f *ordinaryEngineFilter) Type() string {
return "ordinary-engine-filter"
}

func (f *ordinaryEngineFilter) Source(opt opt.Options, store *core.StoreInfo) bool {
return f.constraint.MatchStore(store)
}

func (f *ordinaryEngineFilter) Target(opt opt.Options, store *core.StoreInfo) bool {
return f.constraint.MatchStore(store)
}

type specialUseFilter struct {
scope string
constraint placement.LabelConstraint
Expand Down
6 changes: 4 additions & 2 deletions server/schedule/operator/create_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,10 @@ func isRegionMatch(a, b *core.RegionInfo) bool {
func CreateScatterRegionOperator(desc string, cluster Cluster, origin *core.RegionInfo, targetPeers map[uint64]*metapb.Peer) (*Operator, error) {
// randomly pick a leader.
var ids []uint64
for id := range targetPeers {
ids = append(ids, id)
for id, peer := range targetPeers {
if !peer.IsLearner {
ids = append(ids, id)
}
}
var leader uint64
if len(ids) > 0 {
Expand Down
8 changes: 4 additions & 4 deletions server/schedule/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,15 +422,15 @@ func (t *testOperatorControllerSuite) TestDispatchOutdatedRegion(c *C) {
c.Assert(len(stream.MsgCh()), Equals, 1)

// report the result of transferring leader
region := cluster.MockRegionInfo(1, 2, []uint64{1, 2},
region := cluster.MockRegionInfo(1, 2, []uint64{1, 2}, []uint64{},
&metapb.RegionEpoch{ConfVer: 0, Version: 0})

controller.Dispatch(region, DispatchFromHeartBeat)
c.Assert(op.ConfVerChanged(region), Equals, 0)
c.Assert(len(stream.MsgCh()), Equals, 2)

// report the result of removing peer
region = cluster.MockRegionInfo(1, 2, []uint64{2},
region = cluster.MockRegionInfo(1, 2, []uint64{2}, []uint64{},
&metapb.RegionEpoch{ConfVer: 0, Version: 0})

controller.Dispatch(region, DispatchFromHeartBeat)
Expand All @@ -446,7 +446,7 @@ func (t *testOperatorControllerSuite) TestDispatchOutdatedRegion(c *C) {
c.Assert(len(stream.MsgCh()), Equals, 3)

// report region with an abnormal confver
region = cluster.MockRegionInfo(1, 1, []uint64{1, 2},
region = cluster.MockRegionInfo(1, 1, []uint64{1, 2}, []uint64{},
&metapb.RegionEpoch{ConfVer: 1, Version: 0})
controller.Dispatch(region, DispatchFromHeartBeat)
c.Assert(op.ConfVerChanged(region), Equals, 0)
Expand All @@ -464,7 +464,7 @@ func (t *testOperatorControllerSuite) TestDispatchUnfinishedStep(c *C) {
// so the two peers are {peerid: 1, storeid: 1}, {peerid: 2, storeid: 2}
// The peer on store 1 is the leader
epoch := &metapb.RegionEpoch{ConfVer: 0, Version: 0}
region := cluster.MockRegionInfo(1, 1, []uint64{2}, epoch)
region := cluster.MockRegionInfo(1, 1, []uint64{2}, []uint64{}, epoch)
// Put region into cluster, otherwise, AddOperator will fail because of
// missing region
cluster.PutRegion(region)
Expand Down
98 changes: 67 additions & 31 deletions server/schedule/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,32 @@ func (s *selectedStores) newFilter(scope string) filter.Filter {

// RegionScatterer scatters regions.
type RegionScatterer struct {
name string
cluster opt.Cluster
filters []filter.Filter
selected *selectedStores
name string
cluster opt.Cluster
ordinaryEngine engineContext
specialEngines map[string]engineContext
}

// NewRegionScatterer creates a region scatterer.
// RegionScatter is used for the `Lightning`, it will scatter the specified regions before import data.
func NewRegionScatterer(cluster opt.Cluster) *RegionScatterer {
return &RegionScatterer{
name: regionScatterName,
cluster: cluster,
filters: []filter.Filter{
filter.StoreStateFilter{ActionScope: regionScatterName},
filter.NewEngineFilter(regionScatterName),
},
name: regionScatterName,
cluster: cluster,
ordinaryEngine: newEngineContext(filter.NewOrdinaryEngineFilter(regionScatterName)),
specialEngines: make(map[string]engineContext),
}
}

type engineContext struct {
filters []filter.Filter
selected *selectedStores
}

func newEngineContext(filters ...filter.Filter) engineContext {
filters = append(filters, filter.StoreStateFilter{ActionScope: regionScatterName})
return engineContext{
filters: filters,
selected: newSelectedStores(),
}
}
Expand All @@ -102,30 +112,56 @@ func (r *RegionScatterer) Scatter(region *core.RegionInfo) (*operator.Operator,
}

func (r *RegionScatterer) scatterRegion(region *core.RegionInfo) *operator.Operator {
stores := r.collectAvailableStores(region)
targetPeers := make(map[uint64]*metapb.Peer)
ordinaryFilter := filter.NewOrdinaryEngineFilter(r.name)
var ordinaryPeers []*metapb.Peer
specialPeers := make(map[string][]*metapb.Peer)
// Group peers by the engine of their stores
for _, peer := range region.GetPeers() {
if len(stores) == 0 {
// Reset selected stores if we have no available stores.
r.selected.reset()
stores = r.collectAvailableStores(region)
store := r.cluster.GetStore(peer.GetStoreId())
if ordinaryFilter.Target(r.cluster, store) {
ordinaryPeers = append(ordinaryPeers, peer)
} else {
engine := store.GetLabelValue(filter.EngineKey)
specialPeers[engine] = append(specialPeers[engine], peer)
}
}

if r.selected.put(peer.GetStoreId()) {
delete(stores, peer.GetStoreId())
targetPeers[peer.GetStoreId()] = peer
continue
targetPeers := make(map[uint64]*metapb.Peer)

scatterWithSameEngine := func(peers []*metapb.Peer, context engineContext) {
stores := r.collectAvailableStores(region, context)
for _, peer := range peers {
if len(stores) == 0 {
context.selected.reset()
stores = r.collectAvailableStores(region, context)
}
if context.selected.put(peer.GetStoreId()) {
delete(stores, peer.GetStoreId())
targetPeers[peer.GetStoreId()] = peer
continue
}
newPeer := r.selectPeerToReplace(stores, region, peer)
if newPeer == nil {
targetPeers[peer.GetStoreId()] = peer
continue
}
// Remove it from stores and mark it as selected.
delete(stores, newPeer.GetStoreId())
context.selected.put(newPeer.GetStoreId())
targetPeers[newPeer.GetStoreId()] = newPeer
}
newPeer := r.selectPeerToReplace(stores, region, peer)
if newPeer == nil {
targetPeers[peer.GetStoreId()] = peer
continue
}

scatterWithSameEngine(ordinaryPeers, r.ordinaryEngine)
for engine, peers := range specialPeers {
context, ok := r.specialEngines[engine]
if !ok {
context = newEngineContext(filter.NewEngineFilter(r.name, engine))
r.specialEngines[engine] = context
}
// Remove it from stores and mark it as selected.
delete(stores, newPeer.GetStoreId())
r.selected.put(newPeer.GetStoreId())
targetPeers[newPeer.GetStoreId()] = newPeer
scatterWithSameEngine(peers, context)
}

op, err := operator.CreateScatterRegionOperator("scatter-region", r.cluster, region, targetPeers)
if err != nil {
log.Debug("fail to create scatter region operator", zap.Error(err))
Expand Down Expand Up @@ -169,12 +205,12 @@ func (r *RegionScatterer) selectPeerToReplace(stores map[uint64]*core.StoreInfo,
}
}

func (r *RegionScatterer) collectAvailableStores(region *core.RegionInfo) map[uint64]*core.StoreInfo {
func (r *RegionScatterer) collectAvailableStores(region *core.RegionInfo, context engineContext) map[uint64]*core.StoreInfo {
filters := []filter.Filter{
r.selected.newFilter(r.name),
context.selected.newFilter(r.name),
filter.NewExcludedFilter(r.name, nil, region.GetStoreIds()),
}
filters = append(filters, r.filters...)
filters = append(filters, context.filters...)

stores := r.cluster.GetStores()
targets := make(map[uint64]*core.StoreInfo, len(stores))
Expand Down
Loading

0 comments on commit 238b9c1

Please sign in to comment.