Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Pipelined restore. #266

Merged
merged 69 commits into from
Jun 15, 2020
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
0b07b64
restore: add pipelined CreateTable.
Apr 26, 2020
061b669
restore: add pipelined ValidateFileRanges.
Apr 26, 2020
d711ded
restore: pipelining restore process.
Apr 27, 2020
f258d99
restore, task: use batching when pipelining.
Apr 27, 2020
cefb696
restore: batcher split by range(instead of table).
Apr 28, 2020
7846324
restore,task: new way to for polling errCh.
Apr 28, 2020
c78a24b
restore, task: pipelining checksum.
Apr 28, 2020
93b5942
restore, task: cancel parallel DDL request.
Apr 29, 2020
f3ec5ee
restore: restore will now send batch periodly.
Apr 29, 2020
bb31ec0
restore: refactor batcher.
May 8, 2020
d54b99c
restore: add tests on batcher.
May 8, 2020
4c4a3d8
restore, task: make linter happy.
May 8, 2020
3cffa5d
*: add dep to multierr.
May 8, 2020
965779a
Merge branch 'master' of https://github.com/pingcap/br into pipelined…
May 8, 2020
59e33b0
task: adjust to new function sig.
May 8, 2020
44d52be
task, restore: close updateCh until all task finish.
May 9, 2020
a49e1f0
task, restore: pipelined restore supports parition.
May 9, 2020
84e267c
backup: always wait worker to finish.
May 9, 2020
7fab3c3
backup, task: skip checksum when needed.
May 9, 2020
39d2312
Merge branch 'master' of https://github.com/pingcap/br into pipelined…
May 9, 2020
ac6f5be
*: make linter happy.
May 9, 2020
25a9a3a
restore: move batcher test to restore_test package.
May 11, 2020
7f8251e
Apply suggestions from code review
YuJuncen May 12, 2020
4bf507e
restore, task: remove context on struct types.
May 12, 2020
c45c772
restore: batcher auto commit can be disabled now.
May 12, 2020
afea125
restore, task: fix typos.
May 12, 2020
3c7530c
recover: fix a bug about removing tiflash.
May 12, 2020
e164c4f
restore: MapTableToFiles issues Error log when key range not match.
May 13, 2020
2af022a
*: merge master.
May 13, 2020
2bc34d9
restore: fix test to match new change of master.
May 13, 2020
cee0fc3
Merge branch 'master' into pipelined-restore
YuJuncen May 13, 2020
acbb2fa
Merge branch 'master' into pipelined-restore
YuJuncen May 14, 2020
b96de5b
Apply suggestions from code review
YuJuncen May 17, 2020
8c9bf99
Merge branch 'master' into pipelined-restore
YuJuncen May 18, 2020
deb8848
restore: merge two progresses.
May 22, 2020
879b553
Merge branch 'pipelined-restore' of https://github.com/YuJuncen/br in…
May 22, 2020
30d22b4
Merge branch 'master' into pipelined-restore
YuJuncen May 22, 2020
4060eec
restore: fix a bug.
May 25, 2020
1027d28
Merge branch 'pipelined-restore' of https://github.com/YuJuncen/br in…
May 25, 2020
c1d2064
restore: extract batcher to another file
May 25, 2020
557e4e9
Merge branch 'master' into pipelined-restore
YuJuncen May 29, 2020
b0dd355
task: don't return imediately when files is empty.
May 29, 2020
7bca7ad
Merge branch 'master' into pipelined-restore
YuJuncen Jun 2, 2020
b53526e
restore,task: do some refactor
Jun 2, 2020
dd6af60
restore: fix a shaming bug... :|
Jun 2, 2020
f758994
Merge branch 'master' of https://github.com/pingcap/br into pipelined…
Jun 5, 2020
3303cef
task,restore: panic on file broken
Jun 5, 2020
a31e44c
restore: record tiflash count to disk when removed
Jun 5, 2020
88c8117
restore,task: simplify some code,
Jun 9, 2020
cc43d9d
task,restore: fix a bug.
Jun 9, 2020
312039d
restore: some factory and fix
Jun 9, 2020
bbdbecd
tests: try to fix CI
Jun 9, 2020
2c269dc
tests: try to fix CI, again
Jun 9, 2020
22cf64c
Merge branch 'master' into pipelined-restore
YuJuncen Jun 9, 2020
4705a55
Merge branch 'master' of https://github.com/pingcap/br into pipelined…
Jun 12, 2020
55d22e4
Apply suggestions from code review
YuJuncen Jun 12, 2020
c94435d
restore: change some log levels
Jun 12, 2020
fe59bc1
restore: merge joiner of sendWorker into messagebox
Jun 12, 2020
4d67e54
restore,task: run RemoveRestoreLabels at restore post work
Jun 12, 2020
dc1d293
task: adapt the remove-tiflash flag
Jun 12, 2020
5ac8cfe
restore,task: fetch new placement rules each time
Jun 12, 2020
4cbbff0
Apply suggestions from code review
YuJuncen Jun 15, 2020
460331f
restore,task: run Leave always, and modify some log level
Jun 15, 2020
b450532
Merge branch 'master' into pipelined-restore
YuJuncen Jun 15, 2020
0ee5223
restore: fix a bug that may cause checksum time incorrect
Jun 15, 2020
7437950
Merge branch 'pipelined-restore' of https://github.com/Yujuncen/br in…
Jun 15, 2020
8f96c30
Merge branch 'master' into pipelined-restore
3pointer Jun 15, 2020
77ab77f
restore: don't Leave if never Enter
Jun 15, 2020
0ecbcd6
Merge branch 'pipelined-restore' of https://github.com/Yujuncen/br in…
Jun 15, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ coverage.txt
docker/data/
docker/logs/
*.swp
.DS_Store
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ require (
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
go.opencensus.io v0.22.2 // indirect
go.uber.org/multierr v1.5.0
go.uber.org/zap v1.14.1
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
google.golang.org/api v0.14.0
Expand Down
221 changes: 221 additions & 0 deletions pkg/restore/batcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.

package restore_test

import (
"context"
"time"

"github.com/pingcap/br/pkg/restore"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"go.uber.org/zap"

"github.com/pingcap/br/pkg/rtree"
"github.com/pingcap/br/pkg/utils"
)

type testBatcherSuite struct{}

type drySender struct {
tbls chan restore.CreatedTable
ranges chan rtree.Range
nBatch int
}

func (d *drySender) RestoreBatch(
ranges []rtree.Range,
rewriteRules *restore.RewriteRules,
tbs []restore.CreatedTable,
) error {
d.nBatch++
for _, tbl := range tbs {
log.Info("dry restore", zap.Int64("table ID", tbl.Table.ID))
d.tbls <- tbl
}
for _, rng := range ranges {
d.ranges <- rng
}
return nil
}

func (d *drySender) Close() {
close(d.tbls)
close(d.ranges)
}

func (d *drySender) exhaust() (tbls []restore.CreatedTable, rngs []rtree.Range) {
for tbl := range d.tbls {
tbls = append(tbls, tbl)
}
for rng := range d.ranges {
rngs = append(rngs, rng)
}
return
}

func newDrySender() *drySender {
return &drySender{
tbls: make(chan restore.CreatedTable, 4096),
ranges: make(chan rtree.Range, 4096),
}
}

func (d *drySender) RangeLen() int {
return len(d.ranges)
}

func (d *drySender) TableLen() int {
return len(d.tbls)
}

func (d *drySender) BatchCount() int {
return d.nBatch
}

var (
_ = Suite(&testBatcherSuite{})
)

func fakeTableWithRange(id int64, rngs []rtree.Range) restore.TableWithRange {
tbl := &utils.Table{
Db: &model.DBInfo{},
Info: &model.TableInfo{
ID: id,
},
}
tblWithRng := restore.TableWithRange{
CreatedTable: restore.CreatedTable{
RewriteRule: restore.EmptyRewriteRule(),
Table: tbl.Info,
OldTable: tbl,
},
Range: rngs,
}
return tblWithRng
}

func fakeRange(startKey, endKey string) rtree.Range {
return rtree.Range{
StartKey: []byte(startKey),
EndKey: []byte(endKey),
}
}

// TestBasic tests basic workflow of batcher.
func (*testBatcherSuite) TestBasic(c *C) {
errCh := make(chan error, 8)
sender := newDrySender()
batcher, _ := restore.NewBatcher(context.Background(), sender, errCh)
batcher.BatchSizeThreshold = 2

simpleTables := []restore.TableWithRange{
fakeTableWithRange(1, []rtree.Range{fakeRange("aaa", "aab")}),
fakeTableWithRange(2, []rtree.Range{fakeRange("baa", "bab"), fakeRange("bac", "bad")}),
fakeTableWithRange(3, []rtree.Range{fakeRange("caa", "cab"), fakeRange("cac", "cad")}),
}

for _, tbl := range simpleTables {
batcher.Add(tbl)
}

batcher.Close()
tbls, rngs := sender.exhaust()
totalRngs := []rtree.Range{}

c.Assert(len(tbls), Equals, len(simpleTables))
for i, tbl := range simpleTables {
c.Assert(tbls[i], DeepEquals, tbl.CreatedTable)
totalRngs = append(totalRngs, tbl.Range...)
}

c.Assert(totalRngs, DeepEquals, rngs)
select {
case err := <-errCh:
c.Fatal(errors.Trace(err))
default:
}
}

func (*testBatcherSuite) TestAutoSend(c *C) {
errCh := make(chan error, 8)
sender := newDrySender()
batcher, _ := restore.NewBatcher(context.Background(), sender, errCh)
batcher.BatchSizeThreshold = 1024

simpleTable := fakeTableWithRange(1, []rtree.Range{fakeRange("caa", "cab"), fakeRange("cac", "cad")})

batcher.Add(simpleTable)
// wait until auto send.
time.Sleep(1300 * time.Millisecond)
c.Assert(sender.RangeLen(), Greater, 0)
c.Assert(sender.TableLen(), Greater, 0)
c.Assert(batcher.Len(), Equals, 0)

batcher.Close()

tbls, rngs := sender.exhaust()
c.Assert(len(tbls), Greater, 0)
c.Assert(rngs, DeepEquals, simpleTable.Range)
c.Assert(tbls[0], DeepEquals, simpleTable.CreatedTable)
select {
case err := <-errCh:
c.Fatal(errors.Trace(err))
default:
}
}

func (*testBatcherSuite) TestSplitRangeOnSameTable(c *C) {
errCh := make(chan error, 8)
sender := newDrySender()
batcher, _ := restore.NewBatcher(context.Background(), sender, errCh)
batcher.BatchSizeThreshold = 2

simpleTable := fakeTableWithRange(1, []rtree.Range{
fakeRange("caa", "cab"), fakeRange("cac", "cad"),
fakeRange("cae", "caf"), fakeRange("cag", "cai"),
fakeRange("caj", "cak"), fakeRange("cal", "cam"),
fakeRange("can", "cao"), fakeRange("cap", "caq")})

batcher.Add(simpleTable)
c.Assert(sender.BatchCount(), Equals, 4)

batcher.Close()

tbls, rngs := sender.exhaust()
c.Assert(len(tbls), Greater, 0)
c.Assert(rngs, DeepEquals, simpleTable.Range)
c.Assert(tbls[0], DeepEquals, simpleTable.CreatedTable)
select {
case err := <-errCh:
c.Fatal(errors.Trace(err))
default:
}
}

func (*testBatcherSuite) TestBatcherLen(c *C) {
errCh := make(chan error, 8)
sender := newDrySender()
batcher, _ := restore.NewBatcher(context.Background(), sender, errCh)
batcher.BatchSizeThreshold = 1024

simpleTable := fakeTableWithRange(1, []rtree.Range{
fakeRange("caa", "cab"), fakeRange("cac", "cad"),
fakeRange("cae", "caf"), fakeRange("cag", "cai"),
fakeRange("caj", "cak"), fakeRange("cal", "cam"),
fakeRange("can", "cao"), fakeRange("cap", "caq")})

batcher.Add(simpleTable)
c.Assert(batcher.Len(), Equals, 8)
batcher.Close()
c.Assert(batcher.Len(), Equals, 0)

select {
case err := <-errCh:
c.Fatal(errors.Trace(err))
default:
}
}
Loading