Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedule: actively push operator (#1536) #1686

Merged
merged 4 commits into from
Aug 21, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error {
return errors.Errorf("invalid region, zero region peer count: %v", core.RegionToHexMeta(region.GetMeta()))
}

c.coordinator.dispatch(region)
c.coordinator.dispatch(region, DispatchFromHeartBeat)
return nil
}

Expand Down
112 changes: 105 additions & 7 deletions server/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package server

import (
"container/heap"
"container/list"
"context"
"fmt"
Expand All @@ -33,6 +34,7 @@ import (
"go.uber.org/zap"
)

//
const (
runSchedulerCheckInterval = 3 * time.Second
collectFactor = 0.8
Expand All @@ -44,6 +46,18 @@ const (
hotRegionScheduleName = "balance-hot-region-scheduler"

patrolScanRegionLimit = 128 // It takes about 14 minutes to iterate 1 million regions.

slowNotifyInterval = 5 * time.Second
fastNotifyInterval = 2 * time.Second
// PushOperatorTickInterval is the interval try to push the operator.
PushOperatorTickInterval = 500 * time.Millisecond
)

// The source of dispatched region.
const (
DispatchFromHeartBeat = "heartbeat"
DispatchFromNotifierQueue = "active push"
DispatchFromCreate = "create"
)

var (
Expand All @@ -70,6 +84,7 @@ type coordinator struct {
histories *list.List
hbStreams *heartbeatStreams
opRecords *OperatorRecords
opNotifierQueue operatorQueue
}

func newCoordinator(cluster *clusterInfo, hbStreams *heartbeatStreams, classifier namespace.Classifier) *coordinator {
Expand All @@ -89,16 +104,17 @@ func newCoordinator(cluster *clusterInfo, hbStreams *heartbeatStreams, classifie
histories: list.New(),
hbStreams: hbStreams,
opRecords: NewOperatorRecords(),
opNotifierQueue: make(operatorQueue, 0),
}
}

func (c *coordinator) dispatch(region *core.RegionInfo) {
func (c *coordinator) dispatch(region *core.RegionInfo, source string) {
// Check existed operator.
if op := c.getOperator(region.GetID()); op != nil {
timeout := op.IsTimeout()
if step := op.Check(region); step != nil && !timeout {
operatorCounter.WithLabelValues(op.Desc(), "check").Inc()
c.sendScheduleCommand(region, step)
c.sendScheduleCommand(region, step, source)
return
}
if op.IsFinish() {
Expand Down Expand Up @@ -164,6 +180,25 @@ func (c *coordinator) patrolRegions() {
}
}

// drivePushOperator is used to push the unfinished operator to the excutor.
func (c *coordinator) drivePushOperator() {
defer logutil.LogPanic()

defer c.wg.Done()
log.Info("coordinator begins to actively drive push operator")
ticker := time.NewTicker(PushOperatorTickInterval)
defer ticker.Stop()
for {
select {
case <-c.ctx.Done():
log.Info("drive push operator has been stopped")
return
case <-ticker.C:
c.PushOperators()
}
}
}

func (c *coordinator) checkRegion(region *core.RegionInfo) bool {
// If PD has restarted, it need to check learners added before and promote them.
// Don't check isRaftLearnerEnabled cause it may be disable learner feature but still some learners to promote.
Expand Down Expand Up @@ -258,8 +293,10 @@ func (c *coordinator) run() {
log.Error("cannot persist schedule config", zap.Error(err))
}

c.wg.Add(1)
c.wg.Add(2)
// Starts to patrol regions.
go c.patrolRegions()
go c.drivePushOperator()
}

func (c *coordinator) stop() {
Expand Down Expand Up @@ -465,12 +502,14 @@ func (c *coordinator) addOperatorLocked(op *schedule.Operator) bool {
c.operators[regionID] = op
c.limiter.UpdateCounts(c.operators)

var step schedule.OperatorStep
if region := c.cluster.GetRegion(op.RegionID()); region != nil {
if step := op.Check(region); step != nil {
c.sendScheduleCommand(region, step)
if step = op.Check(region); step != nil {
c.sendScheduleCommand(region, step, DispatchFromCreate)
}
}

heap.Push(&c.opNotifierQueue, &operatorWithTime{op, c.getNextPushOperatorTime(step, time.Now())})
operatorCounter.WithLabelValues(op.Desc(), "create").Inc()
return true
}
Expand Down Expand Up @@ -578,8 +617,8 @@ func (c *coordinator) getHistory(start time.Time) []schedule.OperatorHistory {
return histories
}

func (c *coordinator) sendScheduleCommand(region *core.RegionInfo, step schedule.OperatorStep) {
log.Info("send schedule command", zap.Uint64("region-id", region.GetID()), zap.Stringer("step", step))
func (c *coordinator) sendScheduleCommand(region *core.RegionInfo, step schedule.OperatorStep, source string) {
log.Info("send schedule command", zap.Uint64("region-id", region.GetID()), zap.Stringer("step", step), zap.String("source", source))
switch s := step.(type) {
case schedule.TransferLeader:
cmd := &pdpb.RegionHeartbeatResponse{
Expand Down Expand Up @@ -674,6 +713,65 @@ func (c *coordinator) GetOperatorStatus(id uint64) *OperatorWithStatus {
return c.opRecords.Get(id)
}

func (c *coordinator) getNextPushOperatorTime(step schedule.OperatorStep, now time.Time) time.Time {
nextTime := slowNotifyInterval
switch step.(type) {
case schedule.TransferLeader, schedule.PromoteLearner:
nextTime = fastNotifyInterval
}
return now.Add(nextTime)
}

// pollNeedDispatchRegion returns the region need to dispatch,
// "next" is true to indicate that it may exist in next attempt,
// and false is the end for the poll.
func (c *coordinator) pollNeedDispatchRegion() (r *core.RegionInfo, next bool) {
c.Lock()
defer c.Unlock()
if c.opNotifierQueue.Len() == 0 {
return nil, false
}
item := heap.Pop(&c.opNotifierQueue).(*operatorWithTime)
regionID := item.op.RegionID()
op, ok := c.operators[regionID]
if !ok || op == nil {
return nil, true
}
r = c.cluster.GetRegion(regionID)
if r == nil {
return nil, true
}
step := op.Check(r)
if step == nil {
return nil, true
}
now := time.Now()
if now.Before(item.time) {
heap.Push(&c.opNotifierQueue, item)
return nil, false
}

// pushes with new notify time.
item.time = c.getNextPushOperatorTime(step, now)
heap.Push(&c.opNotifierQueue, item)
return r, true
}

// PushOperators periodically pushes the unfinished operator to the executor(TiKV).
func (c *coordinator) PushOperators() {
for {
r, next := c.pollNeedDispatchRegion()
if !next {
break
}
if r == nil {
continue
}

c.dispatch(r, DispatchFromNotifierQueue)
}
}

type scheduleController struct {
schedule.Scheduler
cluster *clusterInfo
Expand Down
6 changes: 3 additions & 3 deletions server/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (s *testCoordinatorSuite) TestDispatch(c *C) {
func dispatchHeartbeat(c *C, co *coordinator, region *core.RegionInfo, stream *mockHeartbeatStream) {
co.hbStreams.bindStream(region.GetLeader().GetStoreId(), stream)
co.cluster.putRegion(region.Clone())
co.dispatch(region)
co.dispatch(region, DispatchFromHeartBeat)
}

func (s *testCoordinatorSuite) TestCollectMetrics(c *C) {
Expand Down Expand Up @@ -842,11 +842,11 @@ func (s *testScheduleControllerSuite) TestOperatorStatus(c *C) {
region := tc.GetRegion(1)
co.addOperator(op)
c.Assert(co.GetOperatorStatus(1).Status, Equals, pdpb.OperatorStatus_RUNNING)
co.dispatch(region)
co.dispatch(region, DispatchFromCreate)
c.Assert(co.GetOperatorStatus(1).Status, Equals, pdpb.OperatorStatus_RUNNING)
// apply the operator
region = region.Clone(core.WithRemoveStorePeer(2))
co.dispatch(region)
co.dispatch(region, DispatchFromCreate)
c.Assert(co.GetOperatorStatus(1).Status, Equals, pdpb.OperatorStatus_SUCCESS)
}

Expand Down
52 changes: 52 additions & 0 deletions server/operator_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"github.com/pingcap/pd/server/schedule"
"time"
nolouch marked this conversation as resolved.
Show resolved Hide resolved
)

type operatorWithTime struct {
op *schedule.Operator
time time.Time
}

type operatorQueue []*operatorWithTime

func (opn operatorQueue) Len() int { return len(opn) }

func (opn operatorQueue) Less(i, j int) bool {
return opn[i].time.Before(opn[j].time)
}

func (opn operatorQueue) Swap(i, j int) {
opn[i], opn[j] = opn[j], opn[i]
}

func (opn *operatorQueue) Push(x interface{}) {
item := x.(*operatorWithTime)
*opn = append(*opn, item)
}

func (opn *operatorQueue) Pop() interface{} {
old := *opn
n := len(old)
if n == 0 {
return nil
}
item := old[n-1]
*opn = old[0 : n-1]
return item
}