Skip to content

Commit

Permalink
log-backup: Refactor daemon (#36763)
Browse files Browse the repository at this point in the history
close #36762
  • Loading branch information
YuJuncen authored Sep 23, 2022
1 parent 15b63e5 commit 4e1860b
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 79 deletions.
17 changes: 0 additions & 17 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/streamhelper/config"
"github.com/pingcap/tidb/br/pkg/utils"
Expand Down Expand Up @@ -461,22 +460,6 @@ func (c *CheckpointAdvancer) advanceCheckpointBy(ctx context.Context, getCheckpo
return nil
}

// OnTick advances the inner logic clock for the advancer.
// It's synchronous: this would only return after the events triggered by the clock has all been done.
// It's generally panic-free, you may not need to trying recover a panic here.
func (c *CheckpointAdvancer) OnTick(ctx context.Context) (err error) {
defer c.recordTimeCost("tick")()
defer func() {
e := recover()
if e != nil {
log.Error("panic during handing tick", zap.Stack("stack"), logutil.ShortError(err))
err = errors.Annotatef(berrors.ErrUnknown, "panic during handling tick: %s", e)
}
}()
err = c.tick(ctx)
return
}

func (c *CheckpointAdvancer) onConsistencyCheckTick(s *updateSmallTree) error {
if s.consistencyCheckTick > 0 {
s.consistencyCheckTick--
Expand Down
82 changes: 25 additions & 57 deletions br/pkg/streamhelper/advancer_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,80 +4,48 @@ package streamhelper

import (
"context"
"time"

"github.com/google/uuid"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/owner"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

const (
ownerPrompt = "log-backup"
ownerPath = "/tidb/br-stream/owner"
)

// AdvancerDaemon is a "high-availability" version of advancer.
// It involved the manager for electing a owner and doing things.
// You can embed it into your code by simply call:
//
// ad := NewAdvancerDaemon(adv, mgr)
// loop, err := ad.Begin(ctx)
//
// if err != nil {
// return err
// }
//
// loop()
type AdvancerDaemon struct {
adv *CheckpointAdvancer
manager owner.Manager
// OnTick advances the inner logic clock for the advancer.
// It's synchronous: this would only return after the events triggered by the clock has all been done.
// It's generally panic-free, you may not need to trying recover a panic here.
func (c *CheckpointAdvancer) OnTick(ctx context.Context) (err error) {
defer c.recordTimeCost("tick")()
defer utils.PanicToErr(&err)
return c.tick(ctx)
}

func NewAdvancerDaemon(adv *CheckpointAdvancer, manager owner.Manager) *AdvancerDaemon {
return &AdvancerDaemon{
adv: adv,
manager: manager,
}
// OnStart implements daemon.Interface.
func (c *CheckpointAdvancer) OnStart(ctx context.Context) {
metrics.AdvancerOwner.Set(1.0)
c.StartTaskListener(ctx)
go func() {
<-ctx.Done()
c.onStop()
}()
}

func OwnerManagerForLogBackup(ctx context.Context, etcdCli *clientv3.Client) owner.Manager {
id := uuid.New()
return owner.NewOwnerManager(ctx, etcdCli, ownerPrompt, id.String(), ownerPath)
// Name implements daemon.Interface.
func (c *CheckpointAdvancer) Name() string {
return "LogBackup::Advancer"
}

// Begin starts the daemon.
// It would do some bootstrap task, and return a closure that would begin the main loop.
func (ad *AdvancerDaemon) Begin(ctx context.Context) (func(), error) {
log.Info("begin advancer daemon", zap.String("id", ad.manager.ID()))
if err := ad.manager.CampaignOwner(); err != nil {
return nil, err
}
func (c *CheckpointAdvancer) onStop() {
metrics.AdvancerOwner.Set(0.0)
}

ad.adv.StartTaskListener(ctx)
tick := time.NewTicker(ad.adv.cfg.TickDuration)
loop := func() {
log.Info("begin advancer daemon loop", zap.String("id", ad.manager.ID()))
for {
select {
case <-ctx.Done():
log.Info("advancer loop exits", zap.String("id", ad.manager.ID()))
return
case <-tick.C:
log.Debug("deamon tick start", zap.Bool("is-owner", ad.manager.IsOwner()))
if ad.manager.IsOwner() {
metrics.AdvancerOwner.Set(1.0)
if err := ad.adv.OnTick(ctx); err != nil {
log.Warn("failed on tick", logutil.ShortError(err))
}
} else {
metrics.AdvancerOwner.Set(0.0)
}
}
}
}
return loop, nil
func OwnerManagerForLogBackup(ctx context.Context, etcdCli *clientv3.Client) owner.Manager {
id := uuid.New()
return owner.NewOwnerManager(ctx, etcdCli, ownerPrompt, id.String(), ownerPath)
}
17 changes: 17 additions & 0 deletions br/pkg/streamhelper/daemon/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.

package daemon

import "context"

// Interface describes the lifetime hook of a daemon application.
type Interface interface {
// OnStart would be called once become the owner.
// The context passed in would be canceled once it is no more the owner.
OnStart(ctx context.Context)
// OnTick would be called periodically.
// The error can be recorded.
OnTick(ctx context.Context) error
// Name returns the name which is used for tracing the daemon.
Name() string
}
94 changes: 94 additions & 0 deletions br/pkg/streamhelper/daemon/owner_daemon.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.

package daemon

import (
"context"
"time"

"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/owner"
"go.uber.org/zap"
)

// OwnerDaemon is a wrapper for running "daemon" in the TiDB cluster.
// Generally, it uses the etcd election API (wrapped in the `owner.Manager` interface),
// and shares nothing between nodes.
// Please make sure the daemon is "stateless" (i.e. it doesn't depend on the local storage or memory state.)
// This struct is "synchronous" (which means there are no race accessing of these variables.).
type OwnerDaemon struct {
daemon Interface
manager owner.Manager
tickInterval time.Duration

// When not `nil`, implies the daemon is running.
cancel context.CancelFunc
}

// New creates a new owner daemon.
func New(daemon Interface, manager owner.Manager, tickInterval time.Duration) *OwnerDaemon {
return &OwnerDaemon{
daemon: daemon,
manager: manager,
tickInterval: tickInterval,
}
}

// Running tests whether the daemon is running (i.e. is it the owner?)
func (od *OwnerDaemon) Running() bool {
return od.cancel != nil
}

func (od *OwnerDaemon) cancelRun() {
if od.Running() {
log.Info("cancel running daemon", zap.String("daemon", od.daemon.Name()))
od.cancel()
od.cancel = nil
}
}

func (od *OwnerDaemon) ownerTick(ctx context.Context) {
// If not running, switching to running.
if !od.Running() {
cx, cancel := context.WithCancel(ctx)
od.cancel = cancel
log.Info("daemon became owner", zap.String("id", od.manager.ID()), zap.String("daemon-id", od.daemon.Name()))
// Note: maybe save the context so we can cancel the tick when we are not owner?
od.daemon.OnStart(cx)
}

// Tick anyway.
if err := od.daemon.OnTick(ctx); err != nil {
log.Warn("failed on tick", logutil.ShortError(err))
}
}

// Begin starts the daemon.
// It would do some bootstrap task, and return a closure that would begin the main loop.
func (od *OwnerDaemon) Begin(ctx context.Context) (func(), error) {
log.Info("begin advancer daemon", zap.String("daemon-id", od.daemon.Name()))
if err := od.manager.CampaignOwner(); err != nil {
return nil, err
}

tick := time.NewTicker(od.tickInterval)
loop := func() {
log.Info("begin running daemon", zap.String("id", od.manager.ID()), zap.String("daemon-id", od.daemon.Name()))
for {
select {
case <-ctx.Done():
log.Info("daemon loop exits", zap.String("id", od.manager.ID()), zap.String("daemon-id", od.daemon.Name()))
return
case <-tick.C:
log.Debug("daemon tick start", zap.Bool("is-owner", od.manager.IsOwner()), zap.String("daemon-id", od.daemon.Name()))
if od.manager.IsOwner() {
od.ownerTick(ctx)
} else {
od.cancelRun()
}
}
}
}
return loop, nil
}
144 changes: 144 additions & 0 deletions br/pkg/streamhelper/daemon/owner_daemon_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.

package daemon_test

import (
"context"
"sync"
"testing"
"time"

"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/streamhelper/daemon"
"github.com/pingcap/tidb/owner"
"github.com/stretchr/testify/require"
)

type anApp struct {
sync.Mutex
begun bool

tickingMessenger chan struct{}
tickingMessengerOnce *sync.Once
stopMessenger chan struct{}
startMessenger chan struct{}

tCtx *testing.T
}

func newTestApp(t *testing.T) *anApp {
return &anApp{
tCtx: t,
startMessenger: make(chan struct{}),
}
}

// OnStart would be called once become the owner.
// The context passed in would be canceled once it is no more the owner.
func (a *anApp) OnStart(ctx context.Context) {
a.Lock()
defer a.Unlock()
if a.begun {
a.tCtx.Fatalf("failed: an app is started twice")
}
a.begun = true
a.tickingMessenger = make(chan struct{})
a.tickingMessengerOnce = new(sync.Once)
a.stopMessenger = make(chan struct{})
go func() {
<-ctx.Done()

a.Lock()
defer a.Unlock()

a.begun = false
a.tickingMessenger = nil
a.startMessenger = make(chan struct{})
close(a.stopMessenger)
}()
close(a.startMessenger)
}

// OnTick would be called periodically.
// The error can be recorded.
func (a *anApp) OnTick(ctx context.Context) error {
log.Info("tick")
a.Lock()
defer a.Unlock()
if !a.begun {
a.tCtx.Fatal("failed: an app is ticking before start")
}
a.tickingMessengerOnce.Do(func() {
log.Info("close")
close(a.tickingMessenger)
})
return nil
}

// Name returns the name which is used for tracing the daemon.
func (a *anApp) Name() string {
return "testing"
}

func (a *anApp) Running() bool {
a.Lock()
defer a.Unlock()

return a.begun
}

func (a *anApp) AssertTick(timeout time.Duration) {
a.Lock()
messenger := a.tickingMessenger
a.Unlock()
log.Info("waiting")
select {
case <-messenger:
case <-time.After(timeout):
a.tCtx.Fatalf("tick not triggered after %s", timeout)
}
}

func (a *anApp) AssertNotRunning(timeout time.Duration) {
a.Lock()
messenger := a.stopMessenger
a.Unlock()
select {
case <-messenger:
case <-time.After(timeout):
a.tCtx.Fatalf("stop not triggered after %s", timeout)
}
}

func (a *anApp) AssertStart(timeout time.Duration) {
a.Lock()
messenger := a.startMessenger
a.Unlock()
select {
case <-messenger:
case <-time.After(timeout):
a.tCtx.Fatalf("start not triggered after %s", timeout)
}
}

func TestDaemon(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
req := require.New(t)
app := newTestApp(t)
ow := owner.NewMockManager(ctx, "owner_daemon_test")
d := daemon.New(app, ow, 100*time.Millisecond)

f, err := d.Begin(ctx)
req.NoError(err)
go f()
app.AssertStart(1 * time.Second)
app.AssertTick(1 * time.Second)
ow.RetireOwner()
req.False(ow.IsOwner())
app.AssertNotRunning(1 * time.Second)
ow.CampaignOwner()
req.True(ow.IsOwner())
app.AssertStart(1 * time.Second)
app.AssertTick(1 * time.Second)
}
Loading

0 comments on commit 4e1860b

Please sign in to comment.