Skip to content

Commit

Permalink
Pipelined restore. (pingcap#266)
Browse files Browse the repository at this point in the history
* restore: add pipelined CreateTable.

* restore: add pipelined ValidateFileRanges.

* restore: pipelining restore process.

* restore, task: use batching when pipelining.

* restore: batcher split by range(instead of table).

* restore,task: new way to for polling errCh.

We use select instead of for range, so we can send error when context cancelled.

* restore, task: pipelining checksum.

* restore, task: cancel parallel DDL request.

* restore: restore will now send batch periodly.

* restore: refactor batcher.

* restore: add tests on batcher.

* restore, task: make linter happy.

* *: add dep to multierr.

* task: adjust to new function sig.

* task, restore: close updateCh until all task finish.

* task, restore: pipelined restore supports parition.

* backup: always wait worker to finish.

* backup, task: skip checksum when needed.

* *: make linter happy.

* restore: move batcher test to restore_test package.

* Apply suggestions from code review

Co-authored-by: kennytm <[email protected]>

* restore, task: remove context on struct types.

* restore: batcher auto commit can be disabled now.

* restore, task: fix typos.

* recover: fix a bug about removing tiflash.

* restore: MapTableToFiles issues Error log when key range not match.

* *: merge master.

* restore: fix test to match new change of master.

* Apply suggestions from code review

* restore: merge two progresses.

* restore: fix a bug.

that is, when table is too big or batch size is too low,
we will fail to restore the head part of this table.

* restore: extract batcher to another file

* task: don't return imediately when files is empty.

* restore,task: do some refactor

We move `splitPrepareWork` into a struct named `ContextManager`,
so that we can batchly set placement rules on online restore.

* restore: fix a shaming bug... :|

* task,restore: panic on file broken

* restore: record tiflash count to disk when removed

* restore,task: simplify some code,

* task,restore: fix a bug.

The bug causes,
when a singal table is splt into multi part of batches,
it sometimes fail to checksum.

* restore: some factory and fix

1. make the batcher worker has two send style
2. make functions for debuging tables and ranges
3. rewrite a test case to adapt the new batcher

* tests: try to fix CI

* tests: try to fix CI, again

* Apply suggestions from code review

Co-authored-by: 3pointer <[email protected]>

* restore: change some log levels

* restore: merge joiner of sendWorker into messagebox

... and, some small changes:
- don't send sending request if here is one.
- the method of how a batcher is send move to log level debug

* restore,task: run RemoveRestoreLabels at restore post work

* task: adapt the remove-tiflash flag

* restore,task: fetch new placement rules each time

* Apply suggestions from code review

Co-authored-by: kennytm <[email protected]>

* restore,task: run Leave always, and modify some log level

* restore: fix a bug that may cause checksum time incorrect

* restore: don't Leave if never Enter

Co-authored-by: kennytm <[email protected]>
Co-authored-by: 3pointer <[email protected]>
Co-authored-by: 3pointer <[email protected]>
  • Loading branch information
4 people authored and Hillium committed Jun 16, 2020
1 parent b03f422 commit cfa7527
Show file tree
Hide file tree
Showing 12 changed files with 1,571 additions and 275 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ coverage.txt
docker/data/
docker/logs/
*.swp
.DS_Store
353 changes: 353 additions & 0 deletions pkg/restore/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,353 @@
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.

package restore

import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/log"
"go.uber.org/zap"

"github.com/pingcap/br/pkg/rtree"
)

// SendType is the 'type' of a send.
// when we make a 'send' command to worker, we may want to flush all pending ranges (when auto commit enabled),
// or, we just want to clean overflowing ranges(when just adding a table to batcher).
type SendType int

const (
// SendUntilLessThanBatch will make the batcher send batch until
// its remaining range is less than its batchSizeThreshold.
SendUntilLessThanBatch SendType = iota
// SendAll will make the batcher send all pending ranges.
SendAll
// SendAllThenClose will make the batcher send all pending ranges and then close itself.
SendAllThenClose
)

// Batcher collects ranges to restore and send batching split/ingest request.
type Batcher struct {
cachedTables []TableWithRange
cachedTablesMu *sync.Mutex
rewriteRules *RewriteRules

// autoCommitJoiner is for joining the background batch sender.
autoCommitJoiner chan<- struct{}
// everythingIsDone is for waiting for worker done: that is, after we send a
// signal to autoCommitJoiner, we must give it enough time to get things done.
// Then, it should notify us by this waitgroup.
// Use waitgroup instead of a trivial channel for further extension.
everythingIsDone *sync.WaitGroup
// sendErr is for output error information.
sendErr chan<- error
// sendCh is for communiate with sendWorker.
sendCh chan<- SendType
// outCh is for output the restored table, so it can be sent to do something like checksum.
outCh chan<- CreatedTable

sender BatchSender
manager ContextManager
batchSizeThreshold int
size int32
}

// Len calculate the current size of this batcher.
func (b *Batcher) Len() int {
return int(atomic.LoadInt32(&b.size))
}

// NewBatcher creates a new batcher by a sender and a context manager.
// the former defines how the 'restore' a batch(i.e. send, or 'push down' the task to where).
// the context manager defines the 'lifetime' of restoring tables(i.e. how to enter 'restore' mode, and how to exit).
// this batcher will work background, send batches per second, or batch size reaches limit.
// and it will emit full-restored tables to the output channel returned.
func NewBatcher(
ctx context.Context,
sender BatchSender,
manager ContextManager,
errCh chan<- error,
) (*Batcher, <-chan CreatedTable) {
output := make(chan CreatedTable, defaultBatcherOutputChannelSize)
sendChan := make(chan SendType, 2)
b := &Batcher{
rewriteRules: EmptyRewriteRule(),
sendErr: errCh,
outCh: output,
sender: sender,
manager: manager,
sendCh: sendChan,
cachedTablesMu: new(sync.Mutex),
everythingIsDone: new(sync.WaitGroup),
batchSizeThreshold: 1,
}
b.everythingIsDone.Add(1)
go b.sendWorker(ctx, sendChan)
return b, output
}

// EnableAutoCommit enables the batcher commit batch periodically even batcher size isn't big enough.
// we make this function for disable AutoCommit in some case.
func (b *Batcher) EnableAutoCommit(ctx context.Context, delay time.Duration) {
if b.autoCommitJoiner != nil {
// IMO, making two auto commit goroutine wouldn't be a good idea.
// If desire(e.g. change the peroid of auto commit), please disable auto commit firstly.
log.L().DPanic("enabling auto commit on a batcher that auto commit has been enabled, which isn't allowed")
}
joiner := make(chan struct{})
go b.autoCommitWorker(ctx, joiner, delay)
b.autoCommitJoiner = joiner
}

// DisableAutoCommit blocks the current goroutine until the worker can gracefully stop,
// and then disable auto commit.
func (b *Batcher) DisableAutoCommit() {
b.joinWorker()
b.autoCommitJoiner = nil
}

func (b *Batcher) waitUntilSendDone() {
b.sendCh <- SendAllThenClose
b.everythingIsDone.Wait()
}

// joinWorker blocks the current goroutine until the worker can gracefully stop.
// return immediately when auto commit disabled.
func (b *Batcher) joinWorker() {
if b.autoCommitJoiner != nil {
log.Debug("gracefully stopping worker goroutine")
b.autoCommitJoiner <- struct{}{}
close(b.autoCommitJoiner)
log.Debug("gracefully stopped worker goroutine")
}
}

// sendWorker is the 'worker' that send all ranges to TiKV.
func (b *Batcher) sendWorker(ctx context.Context, send <-chan SendType) {
sendUntil := func(lessOrEqual int) {
for b.Len() > lessOrEqual {
tbls, err := b.Send(ctx)
if err != nil {
b.sendErr <- err
return
}
for _, t := range tbls {
b.outCh <- t
}
}
}

for sendType := range send {
switch sendType {
case SendUntilLessThanBatch:
sendUntil(b.batchSizeThreshold)
case SendAll:
sendUntil(0)
case SendAllThenClose:
sendUntil(0)
b.everythingIsDone.Done()
return
}
}
}

func (b *Batcher) autoCommitWorker(ctx context.Context, joiner <-chan struct{}, delay time.Duration) {
tick := time.NewTicker(delay)
defer tick.Stop()
for {
select {
case <-joiner:
log.Debug("graceful stop signal received")
return
case <-ctx.Done():
b.sendErr <- ctx.Err()
return
case <-tick.C:
if b.Len() > 0 {
log.Debug("sending batch because time limit exceed", zap.Int("size", b.Len()))
b.asyncSend(SendAll)
}
}
}
}

func (b *Batcher) asyncSend(t SendType) {
// add a check here so we won't replica sending.
if len(b.sendCh) == 0 {
b.sendCh <- t
}
}

type drainResult struct {
// TablesToSend are tables that would be send at this batch.
TablesToSend []CreatedTable
// BlankTablesAfterSend are tables that will be full-restored after this batch send.
BlankTablesAfterSend []CreatedTable
RewriteRules *RewriteRules
Ranges []rtree.Range
}

func newDrainResult() drainResult {
return drainResult{
TablesToSend: make([]CreatedTable, 0),
BlankTablesAfterSend: make([]CreatedTable, 0),
RewriteRules: EmptyRewriteRule(),
Ranges: make([]rtree.Range, 0),
}
}

// drainRanges 'drains' ranges from current tables.
// for example, let a '-' character be a range, assume we have:
// |---|-----|-------|
// |t1 |t2 |t3 |
// after we run drainRanges() with batchSizeThreshold = 6, let '*' be the ranges will be sent this batch :
// |***|***--|-------|
// |t1 |t2 |-------|
//
// drainRanges() will return:
// TablesToSend: [t1, t2] (so we can make them enter restore mode)
// BlankTableAfterSend: [t1] (so we can make them leave restore mode after restoring this batch)
// RewriteRules: rewrite rules for [t1, t2] (so we can restore them)
// Ranges: those stared ranges (so we can restore them)
//
// then, it will leaving the batcher's cachedTables like this:
// |--|-------|
// |t2|t3 |
// as you can see, all restored ranges would be removed.
func (b *Batcher) drainRanges() drainResult {
result := newDrainResult()

b.cachedTablesMu.Lock()
defer b.cachedTablesMu.Unlock()

for offset, thisTable := range b.cachedTables {
thisTableLen := len(thisTable.Range)
collected := len(result.Ranges)

result.RewriteRules.Append(*thisTable.RewriteRule)
result.TablesToSend = append(result.TablesToSend, thisTable.CreatedTable)

// the batch is full, we should stop here!
// we use strictly greater than because when we send a batch at equal, the offset should plus one.
// (because the last table is sent, we should put it in emptyTables), and this will intrduce extra complex.
if thisTableLen+collected > b.batchSizeThreshold {
drainSize := b.batchSizeThreshold - collected
thisTableRanges := thisTable.Range

var drained []rtree.Range
drained, b.cachedTables[offset].Range = thisTableRanges[:drainSize], thisTableRanges[drainSize:]
log.Debug("draining partial table to batch",
zap.Stringer("db", thisTable.OldTable.Db.Name),
zap.Stringer("table", thisTable.Table.Name),
zap.Int("size", thisTableLen),
zap.Int("drained", drainSize),
)
result.Ranges = append(result.Ranges, drained...)
b.cachedTables = b.cachedTables[offset:]
atomic.AddInt32(&b.size, -int32(len(drained)))
return result
}

result.BlankTablesAfterSend = append(result.BlankTablesAfterSend, thisTable.CreatedTable)
// let's 'drain' the ranges of current table. This op must not make the batch full.
result.Ranges = append(result.Ranges, thisTable.Range...)
atomic.AddInt32(&b.size, -int32(len(thisTable.Range)))
// clear the table length.
b.cachedTables[offset].Range = []rtree.Range{}
log.Debug("draining table to batch",
zap.Stringer("db", thisTable.OldTable.Db.Name),
zap.Stringer("table", thisTable.Table.Name),
zap.Int("size", thisTableLen),
)
}

// all tables are drained.
b.cachedTables = []TableWithRange{}
return result
}

// Send sends all pending requests in the batcher.
// returns tables sent FULLY in the current batch.
func (b *Batcher) Send(ctx context.Context) ([]CreatedTable, error) {
drainResult := b.drainRanges()
tbs := drainResult.TablesToSend
ranges := drainResult.Ranges

log.Info("restore batch start",
append(
ZapRanges(ranges),
ZapTables(tbs),
)...,
)

if err := b.manager.Enter(ctx, drainResult.TablesToSend); err != nil {
return nil, err
}
defer func() {
if err := b.manager.Leave(ctx, drainResult.BlankTablesAfterSend); err != nil {
log.Error("encountering error when leaving recover mode, we can go on but some regions may stick on restore mode",
append(
ZapRanges(ranges),
ZapTables(tbs),
zap.Error(err))...,
)
}
if len(drainResult.BlankTablesAfterSend) > 0 {
log.Debug("table fully restored",
ZapTables(drainResult.BlankTablesAfterSend),
zap.Int("ranges", len(ranges)),
)
}
}()

if err := b.sender.RestoreBatch(ctx, ranges, drainResult.RewriteRules); err != nil {
return nil, err
}
return drainResult.BlankTablesAfterSend, nil
}

func (b *Batcher) sendIfFull() {
if b.Len() >= b.batchSizeThreshold {
log.Debug("sending batch because batcher is full", zap.Int("size", b.Len()))
b.asyncSend(SendUntilLessThanBatch)
}
}

// Add adds a task to the Batcher.
func (b *Batcher) Add(tbs TableWithRange) {
b.cachedTablesMu.Lock()
log.Debug("adding table to batch",
zap.Stringer("db", tbs.OldTable.Db.Name),
zap.Stringer("table", tbs.Table.Name),
zap.Int64("old id", tbs.OldTable.Info.ID),
zap.Int64("new id", tbs.Table.ID),
zap.Int("table size", len(tbs.Range)),
zap.Int("batch size", b.Len()),
)
b.cachedTables = append(b.cachedTables, tbs)
b.rewriteRules.Append(*tbs.RewriteRule)
atomic.AddInt32(&b.size, int32(len(tbs.Range)))
b.cachedTablesMu.Unlock()

b.sendIfFull()
}

// Close closes the batcher, sending all pending requests, close updateCh.
func (b *Batcher) Close() {
log.Info("sending batch lastly on close", zap.Int("size", b.Len()))
b.DisableAutoCommit()
b.waitUntilSendDone()
close(b.outCh)
close(b.sendCh)
b.sender.Close()
}

// SetThreshold sets the threshold that how big the batch size reaching need to send batch.
// note this function isn't goroutine safe yet,
// just set threshold before anything starts(e.g. EnableAutoCommit), please.
func (b *Batcher) SetThreshold(newThreshold int) {
b.batchSizeThreshold = newThreshold
}
Loading

0 comments on commit cfa7527

Please sign in to comment.