From 559000c10a6fef7a84ab870f89ec601d5ce38c2a Mon Sep 17 00:00:00 2001 From: ShuNing Date: Sun, 19 May 2019 15:16:47 +0800 Subject: [PATCH 1/4] schedule: actively push operator (#1536) * schedule: actively push operator Signed-off-by: nolouch --- server/cluster_worker.go | 2 +- server/coordinator.go | 107 ++++++++++++++++++++++++++++++++++--- server/coordinator_test.go | 8 +++ server/operator_queue.go | 52 ++++++++++++++++++ 4 files changed, 161 insertions(+), 8 deletions(-) create mode 100644 server/operator_queue.go diff --git a/server/cluster_worker.go b/server/cluster_worker.go index 1d381d711f5..75246696d28 100644 --- a/server/cluster_worker.go +++ b/server/cluster_worker.go @@ -37,7 +37,7 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { return errors.Errorf("invalid region, zero region peer count: %v", core.RegionToHexMeta(region.GetMeta())) } - c.coordinator.dispatch(region) + c.coordinator.dispatch(region, DispatchFromHeartBeat) return nil } diff --git a/server/coordinator.go b/server/coordinator.go index 8cd15dfe306..2d5ff46f46a 100644 --- a/server/coordinator.go +++ b/server/coordinator.go @@ -14,6 +14,7 @@ package server import ( + "container/heap" "container/list" "context" "fmt" @@ -44,6 +45,14 @@ const ( hotRegionScheduleName = "balance-hot-region-scheduler" patrolScanRegionLimit = 128 // It takes about 14 minutes to iterate 1 million regions. + + DispatchFromHeartBeat = "heartbeat" + DispatchFromNotifierQueue = "active push" + DispatchFromCreate = "create" + slowNotifyInterval = 5 * time.Second + fastNotifyInterval = 2 * time.Second + // PushOperatorTickInterval is the interval try to push the operator. + PushOperatorTickInterval = 500 * time.Millisecond ) var ( @@ -70,6 +79,7 @@ type coordinator struct { histories *list.List hbStreams *heartbeatStreams opRecords *OperatorRecords + opNotifierQueue operatorQueue } func newCoordinator(cluster *clusterInfo, hbStreams *heartbeatStreams, classifier namespace.Classifier) *coordinator { @@ -89,16 +99,17 @@ func newCoordinator(cluster *clusterInfo, hbStreams *heartbeatStreams, classifie histories: list.New(), hbStreams: hbStreams, opRecords: NewOperatorRecords(), + opNotifierQueue: make(operatorQueue, 0), } } -func (c *coordinator) dispatch(region *core.RegionInfo) { +func (c *coordinator) dispatch(region *core.RegionInfo, source string) { // Check existed operator. if op := c.getOperator(region.GetID()); op != nil { timeout := op.IsTimeout() if step := op.Check(region); step != nil && !timeout { operatorCounter.WithLabelValues(op.Desc(), "check").Inc() - c.sendScheduleCommand(region, step) + c.sendScheduleCommand(region, step, source) return } if op.IsFinish() { @@ -164,6 +175,25 @@ func (c *coordinator) patrolRegions() { } } +// drivePushOperator is used to push the unfinished operator to the excutor. +func (c *coordinator) drivePushOperator() { + defer logutil.LogPanic() + + defer c.wg.Done() + log.Info("coordinator begins to actively drive push operator") + ticker := time.NewTicker(PushOperatorTickInterval) + defer ticker.Stop() + for { + select { + case <-c.ctx.Done(): + log.Info("drive push operator has been stopped") + return + case <-ticker.C: + c.PushOperators() + } + } +} + func (c *coordinator) checkRegion(region *core.RegionInfo) bool { // If PD has restarted, it need to check learners added before and promote them. // Don't check isRaftLearnerEnabled cause it may be disable learner feature but still some learners to promote. @@ -258,8 +288,10 @@ func (c *coordinator) run() { log.Error("cannot persist schedule config", zap.Error(err)) } - c.wg.Add(1) + c.wg.Add(2) + // Starts to patrol regions. go c.patrolRegions() + go c.drivePushOperator() } func (c *coordinator) stop() { @@ -465,12 +497,14 @@ func (c *coordinator) addOperatorLocked(op *schedule.Operator) bool { c.operators[regionID] = op c.limiter.UpdateCounts(c.operators) + var step schedule.OperatorStep if region := c.cluster.GetRegion(op.RegionID()); region != nil { - if step := op.Check(region); step != nil { - c.sendScheduleCommand(region, step) + if step = op.Check(region); step != nil { + c.sendScheduleCommand(region, step, DispatchFromCreate) } } + heap.Push(&c.opNotifierQueue, &operatorWithTime{op, c.getNextPushOperatorTime(step, time.Now())}) operatorCounter.WithLabelValues(op.Desc(), "create").Inc() return true } @@ -578,8 +612,8 @@ func (c *coordinator) getHistory(start time.Time) []schedule.OperatorHistory { return histories } -func (c *coordinator) sendScheduleCommand(region *core.RegionInfo, step schedule.OperatorStep) { - log.Info("send schedule command", zap.Uint64("region-id", region.GetID()), zap.Stringer("step", step)) +func (c *coordinator) sendScheduleCommand(region *core.RegionInfo, step schedule.OperatorStep, source string) { + log.Info("send schedule command", zap.Uint64("region-id", region.GetID()), zap.Stringer("step", step), zap.String("source", source)) switch s := step.(type) { case schedule.TransferLeader: cmd := &pdpb.RegionHeartbeatResponse{ @@ -674,6 +708,65 @@ func (c *coordinator) GetOperatorStatus(id uint64) *OperatorWithStatus { return c.opRecords.Get(id) } +func (oc *coordinator) getNextPushOperatorTime(step schedule.OperatorStep, now time.Time) time.Time { + nextTime := slowNotifyInterval + switch step.(type) { + case schedule.TransferLeader, schedule.PromoteLearner: + nextTime = fastNotifyInterval + } + return now.Add(nextTime) +} + +// pollNeedDispatchRegion returns the region need to dispatch, +// "next" is true to indicate that it may exist in next attempt, +// and false is the end for the poll. +func (oc *coordinator) pollNeedDispatchRegion() (r *core.RegionInfo, next bool) { + oc.Lock() + defer oc.Unlock() + if oc.opNotifierQueue.Len() == 0 { + return nil, false + } + item := heap.Pop(&oc.opNotifierQueue).(*operatorWithTime) + regionID := item.op.RegionID() + op, ok := oc.operators[regionID] + if !ok || op == nil { + return nil, true + } + r = oc.cluster.GetRegion(regionID) + if r == nil { + return nil, true + } + step := op.Check(r) + if step == nil { + return nil, true + } + now := time.Now() + if now.Before(item.time) { + heap.Push(&oc.opNotifierQueue, item) + return nil, false + } + + // pushes with new notify time. + item.time = oc.getNextPushOperatorTime(step, now) + heap.Push(&oc.opNotifierQueue, item) + return r, true +} + +// PushOperators periodically pushes the unfinished operator to the executor(TiKV). +func (oc *coordinator) PushOperators() { + for { + r, next := oc.pollNeedDispatchRegion() + if !next { + break + } + if r == nil { + continue + } + + oc.dispatch(r, DispatchFromNotifierQueue) + } +} + type scheduleController struct { schedule.Scheduler cluster *clusterInfo diff --git a/server/coordinator_test.go b/server/coordinator_test.go index 5aec1fafb80..405a3315dab 100644 --- a/server/coordinator_test.go +++ b/server/coordinator_test.go @@ -243,8 +243,16 @@ func (s *testCoordinatorSuite) TestDispatch(c *C) { func dispatchHeartbeat(c *C, co *coordinator, region *core.RegionInfo, stream *mockHeartbeatStream) { co.hbStreams.bindStream(region.GetLeader().GetStoreId(), stream) +<<<<<<< HEAD co.cluster.putRegion(region.Clone()) co.dispatch(region) +======= + if err := co.cluster.putRegion(region.Clone()); err != nil { + return err + } + co.opController.Dispatch(region, schedule.DispatchFromHeartBeat) + return nil +>>>>>>> b6150ca1... schedule: actively push operator (#1536) } func (s *testCoordinatorSuite) TestCollectMetrics(c *C) { diff --git a/server/operator_queue.go b/server/operator_queue.go new file mode 100644 index 00000000000..1c1fd552c96 --- /dev/null +++ b/server/operator_queue.go @@ -0,0 +1,52 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "github.com/pingcap/pd/server/schedule" + "time" +) + +type operatorWithTime struct { + op *schedule.Operator + time time.Time +} + +type operatorQueue []*operatorWithTime + +func (opn operatorQueue) Len() int { return len(opn) } + +func (opn operatorQueue) Less(i, j int) bool { + return opn[i].time.Before(opn[j].time) +} + +func (opn operatorQueue) Swap(i, j int) { + opn[i], opn[j] = opn[j], opn[i] +} + +func (opn *operatorQueue) Push(x interface{}) { + item := x.(*operatorWithTime) + *opn = append(*opn, item) +} + +func (opn *operatorQueue) Pop() interface{} { + old := *opn + n := len(old) + if n == 0 { + return nil + } + item := old[n-1] + *opn = old[0 : n-1] + return item +} From 6bd5fb2e50792f756dc0b0efc2a91d12e1ea6d3b Mon Sep 17 00:00:00 2001 From: nolouch Date: Fri, 16 Aug 2019 11:51:31 +0800 Subject: [PATCH 2/4] fix test Signed-off-by: nolouch --- server/coordinator_test.go | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/server/coordinator_test.go b/server/coordinator_test.go index 405a3315dab..14dede5050d 100644 --- a/server/coordinator_test.go +++ b/server/coordinator_test.go @@ -243,16 +243,8 @@ func (s *testCoordinatorSuite) TestDispatch(c *C) { func dispatchHeartbeat(c *C, co *coordinator, region *core.RegionInfo, stream *mockHeartbeatStream) { co.hbStreams.bindStream(region.GetLeader().GetStoreId(), stream) -<<<<<<< HEAD co.cluster.putRegion(region.Clone()) - co.dispatch(region) -======= - if err := co.cluster.putRegion(region.Clone()); err != nil { - return err - } - co.opController.Dispatch(region, schedule.DispatchFromHeartBeat) - return nil ->>>>>>> b6150ca1... schedule: actively push operator (#1536) + co.dispatch(region, DispatchFromHeartBeat) } func (s *testCoordinatorSuite) TestCollectMetrics(c *C) { @@ -850,11 +842,11 @@ func (s *testScheduleControllerSuite) TestOperatorStatus(c *C) { region := tc.GetRegion(1) co.addOperator(op) c.Assert(co.GetOperatorStatus(1).Status, Equals, pdpb.OperatorStatus_RUNNING) - co.dispatch(region) + co.dispatch(region, DispatchFromCreate) c.Assert(co.GetOperatorStatus(1).Status, Equals, pdpb.OperatorStatus_RUNNING) // apply the operator region = region.Clone(core.WithRemoveStorePeer(2)) - co.dispatch(region) + co.dispatch(region, DispatchFromCreate) c.Assert(co.GetOperatorStatus(1).Status, Equals, pdpb.OperatorStatus_SUCCESS) } From acc1bcd537d8b9dd6ac5c4eef9be2a76f39ead08 Mon Sep 17 00:00:00 2001 From: nolouch Date: Fri, 16 Aug 2019 12:06:32 +0800 Subject: [PATCH 3/4] fix test Signed-off-by: nolouch --- server/coordinator.go | 41 +++++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/server/coordinator.go b/server/coordinator.go index 2d5ff46f46a..b3f69e7c177 100644 --- a/server/coordinator.go +++ b/server/coordinator.go @@ -34,6 +34,7 @@ import ( "go.uber.org/zap" ) +// const ( runSchedulerCheckInterval = 3 * time.Second collectFactor = 0.8 @@ -46,13 +47,17 @@ const ( patrolScanRegionLimit = 128 // It takes about 14 minutes to iterate 1 million regions. + slowNotifyInterval = 5 * time.Second + fastNotifyInterval = 2 * time.Second + // PushOperatorTickInterval is the interval try to push the operator. + PushOperatorTickInterval = 500 * time.Millisecond +) + +// The source of dispatched region. +const ( DispatchFromHeartBeat = "heartbeat" DispatchFromNotifierQueue = "active push" DispatchFromCreate = "create" - slowNotifyInterval = 5 * time.Second - fastNotifyInterval = 2 * time.Second - // PushOperatorTickInterval is the interval try to push the operator. - PushOperatorTickInterval = 500 * time.Millisecond ) var ( @@ -708,7 +713,7 @@ func (c *coordinator) GetOperatorStatus(id uint64) *OperatorWithStatus { return c.opRecords.Get(id) } -func (oc *coordinator) getNextPushOperatorTime(step schedule.OperatorStep, now time.Time) time.Time { +func (c *coordinator) getNextPushOperatorTime(step schedule.OperatorStep, now time.Time) time.Time { nextTime := slowNotifyInterval switch step.(type) { case schedule.TransferLeader, schedule.PromoteLearner: @@ -720,19 +725,19 @@ func (oc *coordinator) getNextPushOperatorTime(step schedule.OperatorStep, now t // pollNeedDispatchRegion returns the region need to dispatch, // "next" is true to indicate that it may exist in next attempt, // and false is the end for the poll. -func (oc *coordinator) pollNeedDispatchRegion() (r *core.RegionInfo, next bool) { - oc.Lock() - defer oc.Unlock() - if oc.opNotifierQueue.Len() == 0 { +func (c *coordinator) pollNeedDispatchRegion() (r *core.RegionInfo, next bool) { + c.Lock() + defer c.Unlock() + if c.opNotifierQueue.Len() == 0 { return nil, false } - item := heap.Pop(&oc.opNotifierQueue).(*operatorWithTime) + item := heap.Pop(&c.opNotifierQueue).(*operatorWithTime) regionID := item.op.RegionID() - op, ok := oc.operators[regionID] + op, ok := c.operators[regionID] if !ok || op == nil { return nil, true } - r = oc.cluster.GetRegion(regionID) + r = c.cluster.GetRegion(regionID) if r == nil { return nil, true } @@ -742,20 +747,20 @@ func (oc *coordinator) pollNeedDispatchRegion() (r *core.RegionInfo, next bool) } now := time.Now() if now.Before(item.time) { - heap.Push(&oc.opNotifierQueue, item) + heap.Push(&c.opNotifierQueue, item) return nil, false } // pushes with new notify time. - item.time = oc.getNextPushOperatorTime(step, now) - heap.Push(&oc.opNotifierQueue, item) + item.time = c.getNextPushOperatorTime(step, now) + heap.Push(&c.opNotifierQueue, item) return r, true } // PushOperators periodically pushes the unfinished operator to the executor(TiKV). -func (oc *coordinator) PushOperators() { +func (c *coordinator) PushOperators() { for { - r, next := oc.pollNeedDispatchRegion() + r, next := c.pollNeedDispatchRegion() if !next { break } @@ -763,7 +768,7 @@ func (oc *coordinator) PushOperators() { continue } - oc.dispatch(r, DispatchFromNotifierQueue) + c.dispatch(r, DispatchFromNotifierQueue) } } From 0f05fe11e67b1da1498ca50e2a64e30162470c89 Mon Sep 17 00:00:00 2001 From: nolouch Date: Mon, 19 Aug 2019 15:45:21 +0800 Subject: [PATCH 4/4] address comments Signed-off-by: nolouch --- server/operator_queue.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/operator_queue.go b/server/operator_queue.go index 1c1fd552c96..4d5811864a6 100644 --- a/server/operator_queue.go +++ b/server/operator_queue.go @@ -14,8 +14,9 @@ package server import ( - "github.com/pingcap/pd/server/schedule" "time" + + "github.com/pingcap/pd/server/schedule" ) type operatorWithTime struct {