diff --git a/.gitignore b/.gitignore index cf88895a3..66fe8f9aa 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ coverage.txt docker/data/ docker/logs/ *.swp +.DS_Store diff --git a/pkg/restore/batcher.go b/pkg/restore/batcher.go new file mode 100644 index 000000000..1a4c1256d --- /dev/null +++ b/pkg/restore/batcher.go @@ -0,0 +1,353 @@ +// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. + +package restore + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/log" + "go.uber.org/zap" + + "github.com/pingcap/br/pkg/rtree" +) + +// SendType is the 'type' of a send. +// when we make a 'send' command to worker, we may want to flush all pending ranges (when auto commit enabled), +// or, we just want to clean overflowing ranges(when just adding a table to batcher). +type SendType int + +const ( + // SendUntilLessThanBatch will make the batcher send batch until + // its remaining range is less than its batchSizeThreshold. + SendUntilLessThanBatch SendType = iota + // SendAll will make the batcher send all pending ranges. + SendAll + // SendAllThenClose will make the batcher send all pending ranges and then close itself. + SendAllThenClose +) + +// Batcher collects ranges to restore and send batching split/ingest request. +type Batcher struct { + cachedTables []TableWithRange + cachedTablesMu *sync.Mutex + rewriteRules *RewriteRules + + // autoCommitJoiner is for joining the background batch sender. + autoCommitJoiner chan<- struct{} + // everythingIsDone is for waiting for worker done: that is, after we send a + // signal to autoCommitJoiner, we must give it enough time to get things done. + // Then, it should notify us by this waitgroup. + // Use waitgroup instead of a trivial channel for further extension. + everythingIsDone *sync.WaitGroup + // sendErr is for output error information. + sendErr chan<- error + // sendCh is for communiate with sendWorker. + sendCh chan<- SendType + // outCh is for output the restored table, so it can be sent to do something like checksum. + outCh chan<- CreatedTable + + sender BatchSender + manager ContextManager + batchSizeThreshold int + size int32 +} + +// Len calculate the current size of this batcher. +func (b *Batcher) Len() int { + return int(atomic.LoadInt32(&b.size)) +} + +// NewBatcher creates a new batcher by a sender and a context manager. +// the former defines how the 'restore' a batch(i.e. send, or 'push down' the task to where). +// the context manager defines the 'lifetime' of restoring tables(i.e. how to enter 'restore' mode, and how to exit). +// this batcher will work background, send batches per second, or batch size reaches limit. +// and it will emit full-restored tables to the output channel returned. +func NewBatcher( + ctx context.Context, + sender BatchSender, + manager ContextManager, + errCh chan<- error, +) (*Batcher, <-chan CreatedTable) { + output := make(chan CreatedTable, defaultBatcherOutputChannelSize) + sendChan := make(chan SendType, 2) + b := &Batcher{ + rewriteRules: EmptyRewriteRule(), + sendErr: errCh, + outCh: output, + sender: sender, + manager: manager, + sendCh: sendChan, + cachedTablesMu: new(sync.Mutex), + everythingIsDone: new(sync.WaitGroup), + batchSizeThreshold: 1, + } + b.everythingIsDone.Add(1) + go b.sendWorker(ctx, sendChan) + return b, output +} + +// EnableAutoCommit enables the batcher commit batch periodically even batcher size isn't big enough. +// we make this function for disable AutoCommit in some case. +func (b *Batcher) EnableAutoCommit(ctx context.Context, delay time.Duration) { + if b.autoCommitJoiner != nil { + // IMO, making two auto commit goroutine wouldn't be a good idea. + // If desire(e.g. change the peroid of auto commit), please disable auto commit firstly. + log.L().DPanic("enabling auto commit on a batcher that auto commit has been enabled, which isn't allowed") + } + joiner := make(chan struct{}) + go b.autoCommitWorker(ctx, joiner, delay) + b.autoCommitJoiner = joiner +} + +// DisableAutoCommit blocks the current goroutine until the worker can gracefully stop, +// and then disable auto commit. +func (b *Batcher) DisableAutoCommit() { + b.joinWorker() + b.autoCommitJoiner = nil +} + +func (b *Batcher) waitUntilSendDone() { + b.sendCh <- SendAllThenClose + b.everythingIsDone.Wait() +} + +// joinWorker blocks the current goroutine until the worker can gracefully stop. +// return immediately when auto commit disabled. +func (b *Batcher) joinWorker() { + if b.autoCommitJoiner != nil { + log.Debug("gracefully stopping worker goroutine") + b.autoCommitJoiner <- struct{}{} + close(b.autoCommitJoiner) + log.Debug("gracefully stopped worker goroutine") + } +} + +// sendWorker is the 'worker' that send all ranges to TiKV. +func (b *Batcher) sendWorker(ctx context.Context, send <-chan SendType) { + sendUntil := func(lessOrEqual int) { + for b.Len() > lessOrEqual { + tbls, err := b.Send(ctx) + if err != nil { + b.sendErr <- err + return + } + for _, t := range tbls { + b.outCh <- t + } + } + } + + for sendType := range send { + switch sendType { + case SendUntilLessThanBatch: + sendUntil(b.batchSizeThreshold) + case SendAll: + sendUntil(0) + case SendAllThenClose: + sendUntil(0) + b.everythingIsDone.Done() + return + } + } +} + +func (b *Batcher) autoCommitWorker(ctx context.Context, joiner <-chan struct{}, delay time.Duration) { + tick := time.NewTicker(delay) + defer tick.Stop() + for { + select { + case <-joiner: + log.Debug("graceful stop signal received") + return + case <-ctx.Done(): + b.sendErr <- ctx.Err() + return + case <-tick.C: + if b.Len() > 0 { + log.Debug("sending batch because time limit exceed", zap.Int("size", b.Len())) + b.asyncSend(SendAll) + } + } + } +} + +func (b *Batcher) asyncSend(t SendType) { + // add a check here so we won't replica sending. + if len(b.sendCh) == 0 { + b.sendCh <- t + } +} + +type drainResult struct { + // TablesToSend are tables that would be send at this batch. + TablesToSend []CreatedTable + // BlankTablesAfterSend are tables that will be full-restored after this batch send. + BlankTablesAfterSend []CreatedTable + RewriteRules *RewriteRules + Ranges []rtree.Range +} + +func newDrainResult() drainResult { + return drainResult{ + TablesToSend: make([]CreatedTable, 0), + BlankTablesAfterSend: make([]CreatedTable, 0), + RewriteRules: EmptyRewriteRule(), + Ranges: make([]rtree.Range, 0), + } +} + +// drainRanges 'drains' ranges from current tables. +// for example, let a '-' character be a range, assume we have: +// |---|-----|-------| +// |t1 |t2 |t3 | +// after we run drainRanges() with batchSizeThreshold = 6, let '*' be the ranges will be sent this batch : +// |***|***--|-------| +// |t1 |t2 |-------| +// +// drainRanges() will return: +// TablesToSend: [t1, t2] (so we can make them enter restore mode) +// BlankTableAfterSend: [t1] (so we can make them leave restore mode after restoring this batch) +// RewriteRules: rewrite rules for [t1, t2] (so we can restore them) +// Ranges: those stared ranges (so we can restore them) +// +// then, it will leaving the batcher's cachedTables like this: +// |--|-------| +// |t2|t3 | +// as you can see, all restored ranges would be removed. +func (b *Batcher) drainRanges() drainResult { + result := newDrainResult() + + b.cachedTablesMu.Lock() + defer b.cachedTablesMu.Unlock() + + for offset, thisTable := range b.cachedTables { + thisTableLen := len(thisTable.Range) + collected := len(result.Ranges) + + result.RewriteRules.Append(*thisTable.RewriteRule) + result.TablesToSend = append(result.TablesToSend, thisTable.CreatedTable) + + // the batch is full, we should stop here! + // we use strictly greater than because when we send a batch at equal, the offset should plus one. + // (because the last table is sent, we should put it in emptyTables), and this will intrduce extra complex. + if thisTableLen+collected > b.batchSizeThreshold { + drainSize := b.batchSizeThreshold - collected + thisTableRanges := thisTable.Range + + var drained []rtree.Range + drained, b.cachedTables[offset].Range = thisTableRanges[:drainSize], thisTableRanges[drainSize:] + log.Debug("draining partial table to batch", + zap.Stringer("db", thisTable.OldTable.Db.Name), + zap.Stringer("table", thisTable.Table.Name), + zap.Int("size", thisTableLen), + zap.Int("drained", drainSize), + ) + result.Ranges = append(result.Ranges, drained...) + b.cachedTables = b.cachedTables[offset:] + atomic.AddInt32(&b.size, -int32(len(drained))) + return result + } + + result.BlankTablesAfterSend = append(result.BlankTablesAfterSend, thisTable.CreatedTable) + // let's 'drain' the ranges of current table. This op must not make the batch full. + result.Ranges = append(result.Ranges, thisTable.Range...) + atomic.AddInt32(&b.size, -int32(len(thisTable.Range))) + // clear the table length. + b.cachedTables[offset].Range = []rtree.Range{} + log.Debug("draining table to batch", + zap.Stringer("db", thisTable.OldTable.Db.Name), + zap.Stringer("table", thisTable.Table.Name), + zap.Int("size", thisTableLen), + ) + } + + // all tables are drained. + b.cachedTables = []TableWithRange{} + return result +} + +// Send sends all pending requests in the batcher. +// returns tables sent FULLY in the current batch. +func (b *Batcher) Send(ctx context.Context) ([]CreatedTable, error) { + drainResult := b.drainRanges() + tbs := drainResult.TablesToSend + ranges := drainResult.Ranges + + log.Info("restore batch start", + append( + ZapRanges(ranges), + ZapTables(tbs), + )..., + ) + + if err := b.manager.Enter(ctx, drainResult.TablesToSend); err != nil { + return nil, err + } + defer func() { + if err := b.manager.Leave(ctx, drainResult.BlankTablesAfterSend); err != nil { + log.Error("encountering error when leaving recover mode, we can go on but some regions may stick on restore mode", + append( + ZapRanges(ranges), + ZapTables(tbs), + zap.Error(err))..., + ) + } + if len(drainResult.BlankTablesAfterSend) > 0 { + log.Debug("table fully restored", + ZapTables(drainResult.BlankTablesAfterSend), + zap.Int("ranges", len(ranges)), + ) + } + }() + + if err := b.sender.RestoreBatch(ctx, ranges, drainResult.RewriteRules); err != nil { + return nil, err + } + return drainResult.BlankTablesAfterSend, nil +} + +func (b *Batcher) sendIfFull() { + if b.Len() >= b.batchSizeThreshold { + log.Debug("sending batch because batcher is full", zap.Int("size", b.Len())) + b.asyncSend(SendUntilLessThanBatch) + } +} + +// Add adds a task to the Batcher. +func (b *Batcher) Add(tbs TableWithRange) { + b.cachedTablesMu.Lock() + log.Debug("adding table to batch", + zap.Stringer("db", tbs.OldTable.Db.Name), + zap.Stringer("table", tbs.Table.Name), + zap.Int64("old id", tbs.OldTable.Info.ID), + zap.Int64("new id", tbs.Table.ID), + zap.Int("table size", len(tbs.Range)), + zap.Int("batch size", b.Len()), + ) + b.cachedTables = append(b.cachedTables, tbs) + b.rewriteRules.Append(*tbs.RewriteRule) + atomic.AddInt32(&b.size, int32(len(tbs.Range))) + b.cachedTablesMu.Unlock() + + b.sendIfFull() +} + +// Close closes the batcher, sending all pending requests, close updateCh. +func (b *Batcher) Close() { + log.Info("sending batch lastly on close", zap.Int("size", b.Len())) + b.DisableAutoCommit() + b.waitUntilSendDone() + close(b.outCh) + close(b.sendCh) + b.sender.Close() +} + +// SetThreshold sets the threshold that how big the batch size reaching need to send batch. +// note this function isn't goroutine safe yet, +// just set threshold before anything starts(e.g. EnableAutoCommit), please. +func (b *Batcher) SetThreshold(newThreshold int) { + b.batchSizeThreshold = newThreshold +} diff --git a/pkg/restore/batcher_test.go b/pkg/restore/batcher_test.go new file mode 100644 index 000000000..53a9fbbaa --- /dev/null +++ b/pkg/restore/batcher_test.go @@ -0,0 +1,359 @@ +// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. + +package restore_test + +import ( + "bytes" + "context" + "sync" + "time" + + "github.com/pingcap/kvproto/pkg/import_sstpb" + "github.com/pingcap/log" + "go.uber.org/zap" + + "github.com/pingcap/br/pkg/restore" + + . "github.com/pingcap/check" + "github.com/pingcap/errors" + "github.com/pingcap/parser/model" + + "github.com/pingcap/br/pkg/rtree" + "github.com/pingcap/br/pkg/utils" +) + +type testBatcherSuite struct{} + +type drySender struct { + mu *sync.Mutex + + rewriteRules *restore.RewriteRules + ranges []rtree.Range + nBatch int +} + +func (d *drySender) RestoreBatch( + _ctx context.Context, + ranges []rtree.Range, + rewriteRules *restore.RewriteRules, +) error { + d.mu.Lock() + defer d.mu.Unlock() + log.Info("fake restore range", restore.ZapRanges(ranges)...) + d.nBatch++ + d.rewriteRules.Append(*rewriteRules) + d.ranges = append(d.ranges, ranges...) + return nil +} + +func (d *drySender) Close() {} + +func waitForSend() { + time.Sleep(10 * time.Millisecond) +} + +func (d *drySender) Ranges() []rtree.Range { + return d.ranges +} + +func newDrySender() *drySender { + return &drySender{ + rewriteRules: restore.EmptyRewriteRule(), + ranges: []rtree.Range{}, + mu: new(sync.Mutex), + } +} + +type recordCurrentTableManager map[int64]bool + +func newMockManager() recordCurrentTableManager { + return make(recordCurrentTableManager) +} + +func (manager recordCurrentTableManager) Enter(_ context.Context, tables []restore.CreatedTable) error { + for _, t := range tables { + log.Info("entering", zap.Int64("table ID", t.Table.ID)) + manager[t.Table.ID] = true + } + return nil +} + +func (manager recordCurrentTableManager) Leave(_ context.Context, tables []restore.CreatedTable) error { + for _, t := range tables { + if !manager[t.Table.ID] { + return errors.Errorf("Table %d is removed before added", t.Table.ID) + } + log.Info("leaving", zap.Int64("table ID", t.Table.ID)) + manager[t.Table.ID] = false + } + return nil +} + +func (manager recordCurrentTableManager) Has(tables ...restore.TableWithRange) bool { + ids := make([]int64, 0, len(tables)) + currentIDs := make([]int64, 0, len(manager)) + for _, t := range tables { + ids = append(ids, t.Table.ID) + } + for id, contains := range manager { + if contains { + currentIDs = append(currentIDs, id) + } + } + log.Info("testing", zap.Int64s("should has ID", ids), zap.Int64s("has ID", currentIDs)) + for _, i := range ids { + if !manager[i] { + return false + } + } + return true +} + +func (d *drySender) HasRewriteRuleOfKey(prefix string) bool { + for _, rule := range d.rewriteRules.Table { + if bytes.Equal([]byte(prefix), rule.OldKeyPrefix) { + return true + } + } + return false +} + +func (d *drySender) RangeLen() int { + return len(d.ranges) +} + +func (d *drySender) BatchCount() int { + return d.nBatch +} + +var ( + _ = Suite(&testBatcherSuite{}) +) + +func fakeTableWithRange(id int64, rngs []rtree.Range) restore.TableWithRange { + tbl := &utils.Table{ + Db: &model.DBInfo{}, + Info: &model.TableInfo{ + ID: id, + }, + } + tblWithRng := restore.TableWithRange{ + CreatedTable: restore.CreatedTable{ + RewriteRule: restore.EmptyRewriteRule(), + Table: tbl.Info, + OldTable: tbl, + }, + Range: rngs, + } + return tblWithRng +} + +func fakeRewriteRules(oldPrefix string, newPrefix string) *restore.RewriteRules { + return &restore.RewriteRules{ + Table: []*import_sstpb.RewriteRule{ + { + OldKeyPrefix: []byte(oldPrefix), + NewKeyPrefix: []byte(newPrefix), + }, + }, + } +} + +func fakeRange(startKey, endKey string) rtree.Range { + return rtree.Range{ + StartKey: []byte(startKey), + EndKey: []byte(endKey), + } +} + +func join(nested [][]rtree.Range) (plain []rtree.Range) { + for _, ranges := range nested { + plain = append(plain, ranges...) + } + return plain +} + +// TestBasic tests basic workflow of batcher. +func (*testBatcherSuite) TestBasic(c *C) { + ctx := context.Background() + errCh := make(chan error, 8) + sender := newDrySender() + manager := newMockManager() + batcher, _ := restore.NewBatcher(ctx, sender, manager, errCh) + batcher.SetThreshold(2) + + tableRanges := [][]rtree.Range{ + {fakeRange("aaa", "aab")}, + {fakeRange("baa", "bab"), fakeRange("bac", "bad")}, + {fakeRange("caa", "cab"), fakeRange("cac", "cad")}, + } + + simpleTables := []restore.TableWithRange{} + for i, ranges := range tableRanges { + simpleTables = append(simpleTables, fakeTableWithRange(int64(i), ranges)) + } + for _, tbl := range simpleTables { + batcher.Add(tbl) + } + + batcher.Close() + rngs := sender.Ranges() + + c.Assert(join(tableRanges), DeepEquals, rngs) + select { + case err := <-errCh: + c.Fatal(errors.Trace(err)) + default: + } +} + +func (*testBatcherSuite) TestAutoSend(c *C) { + ctx := context.Background() + errCh := make(chan error, 8) + sender := newDrySender() + manager := newMockManager() + batcher, _ := restore.NewBatcher(ctx, sender, manager, errCh) + batcher.SetThreshold(1024) + + simpleTable := fakeTableWithRange(1, []rtree.Range{fakeRange("caa", "cab"), fakeRange("cac", "cad")}) + + batcher.Add(simpleTable) + c.Assert(batcher.Len(), Greater, 0) + + // enable auto commit. + batcher.EnableAutoCommit(ctx, 100*time.Millisecond) + time.Sleep(200 * time.Millisecond) + + c.Assert(sender.RangeLen(), Greater, 0) + c.Assert(batcher.Len(), Equals, 0) + + batcher.Close() + + rngs := sender.Ranges() + c.Assert(rngs, DeepEquals, simpleTable.Range) + select { + case err := <-errCh: + c.Fatal(errors.Trace(err)) + default: + } +} + +func (*testBatcherSuite) TestSplitRangeOnSameTable(c *C) { + ctx := context.Background() + errCh := make(chan error, 8) + sender := newDrySender() + manager := newMockManager() + batcher, _ := restore.NewBatcher(ctx, sender, manager, errCh) + batcher.SetThreshold(2) + + simpleTable := fakeTableWithRange(1, []rtree.Range{ + fakeRange("caa", "cab"), fakeRange("cac", "cad"), + fakeRange("cae", "caf"), fakeRange("cag", "cai"), + fakeRange("caj", "cak"), fakeRange("cal", "cam"), + fakeRange("can", "cao"), fakeRange("cap", "caq")}) + + batcher.Add(simpleTable) + batcher.Close() + c.Assert(sender.BatchCount(), Equals, 4) + + rngs := sender.Ranges() + c.Assert(rngs, DeepEquals, simpleTable.Range) + select { + case err := <-errCh: + c.Fatal(errors.Trace(err)) + default: + } +} + +func (*testBatcherSuite) TestRewriteRules(c *C) { + tableRanges := [][]rtree.Range{ + {fakeRange("aaa", "aab")}, + {fakeRange("baa", "bab"), fakeRange("bac", "bad")}, + {fakeRange("caa", "cab"), fakeRange("cac", "cad"), + fakeRange("cae", "caf"), fakeRange("cag", "cai"), + fakeRange("caj", "cak"), fakeRange("cal", "cam"), + fakeRange("can", "cao"), fakeRange("cap", "caq")}, + } + rewriteRules := []*restore.RewriteRules{ + fakeRewriteRules("a", "ada"), + fakeRewriteRules("b", "bob"), + fakeRewriteRules("c", "cpp"), + } + + tables := make([]restore.TableWithRange, 0, len(tableRanges)) + for i, ranges := range tableRanges { + table := fakeTableWithRange(int64(i), ranges) + table.RewriteRule = rewriteRules[i] + tables = append(tables, table) + } + + ctx := context.Background() + errCh := make(chan error, 8) + sender := newDrySender() + manager := newMockManager() + batcher, _ := restore.NewBatcher(ctx, sender, manager, errCh) + batcher.SetThreshold(2) + + batcher.Add(tables[0]) + waitForSend() + c.Assert(sender.RangeLen(), Equals, 0) + + batcher.Add(tables[1]) + waitForSend() + c.Assert(sender.HasRewriteRuleOfKey("a"), IsTrue) + c.Assert(sender.HasRewriteRuleOfKey("b"), IsTrue) + c.Assert(manager.Has(tables[1]), IsTrue) + c.Assert(sender.RangeLen(), Equals, 2) + + batcher.Add(tables[2]) + batcher.Close() + c.Assert(sender.HasRewriteRuleOfKey("c"), IsTrue) + c.Assert(sender.Ranges(), DeepEquals, join(tableRanges)) + + select { + case err := <-errCh: + c.Fatal(errors.Trace(err)) + default: + } +} + +func (*testBatcherSuite) TestBatcherLen(c *C) { + ctx := context.Background() + errCh := make(chan error, 8) + sender := newDrySender() + manager := newMockManager() + batcher, _ := restore.NewBatcher(ctx, sender, manager, errCh) + batcher.SetThreshold(15) + + simpleTable := fakeTableWithRange(1, []rtree.Range{ + fakeRange("caa", "cab"), fakeRange("cac", "cad"), + fakeRange("cae", "caf"), fakeRange("cag", "cai"), + fakeRange("caj", "cak"), fakeRange("cal", "cam"), + fakeRange("can", "cao"), fakeRange("cap", "caq")}) + + simpleTable2 := fakeTableWithRange(2, []rtree.Range{ + fakeRange("caa", "cab"), fakeRange("cac", "cad"), + fakeRange("cae", "caf"), fakeRange("cag", "cai"), + fakeRange("caj", "cak"), fakeRange("cal", "cam"), + fakeRange("can", "cao"), fakeRange("cap", "caq")}) + + batcher.Add(simpleTable) + waitForSend() + c.Assert(batcher.Len(), Equals, 8) + c.Assert(manager.Has(simpleTable), IsFalse) + c.Assert(manager.Has(simpleTable2), IsFalse) + + batcher.Add(simpleTable2) + waitForSend() + c.Assert(batcher.Len(), Equals, 1) + c.Assert(manager.Has(simpleTable2), IsTrue) + c.Assert(manager.Has(simpleTable), IsFalse) + batcher.Close() + c.Assert(batcher.Len(), Equals, 0) + + select { + case err := <-errCh: + c.Fatal(errors.Trace(err)) + default: + } +} diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 77e7ac258..4e1f855c0 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" + "go.uber.org/multierr" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/backoff" @@ -65,9 +66,16 @@ type Client struct { isOnline bool noSchema bool hasSpeedLimited bool + // Those fields should be removed after we have FULLY supportted TiFlash. + // we place this field here to make a 'good' memory align, but mainly make golang-ci happy :) + tiFlashRecordUpdated bool restoreStores []uint64 + // tables that has TiFlash and those TiFlash have been removed, should be written to disk. + // Those fields should be removed after we have FULLY supportted TiFlash. + tablesRemovedTiFlash []*backup.Schema + storage storage.ExternalStorage backend *backup.StorageBackend } @@ -320,93 +328,196 @@ func (rc *Client) CreateTables( Data: make([]*import_sstpb.RewriteRule, 0), } newTables := make([]*model.TableInfo, 0, len(tables)) - for _, table := range tables { - if rc.IsSkipCreateSQL() { - log.Info("skip create table and alter autoIncID", zap.Stringer("table", table.Info.Name)) - } else { - err := rc.db.CreateTable(rc.ctx, table) - if err != nil { - return nil, nil, err - } - } - newTableInfo, err := rc.GetTableSchema(dom, table.Db.Name, table.Info.Name) - if err != nil { - return nil, nil, err - } - rules := GetRewriteRules(newTableInfo, table.Info, newTS) + errCh := make(chan error, 1) + tbMapping := map[string]int{} + for i, t := range tables { + tbMapping[t.Info.Name.String()] = i + } + dataCh := rc.GoCreateTables(context.TODO(), dom, tables, newTS, errCh) + for et := range dataCh { + rules := et.RewriteRule rewriteRules.Table = append(rewriteRules.Table, rules.Table...) rewriteRules.Data = append(rewriteRules.Data, rules.Data...) - newTables = append(newTables, newTableInfo) + newTables = append(newTables, et.Table) + } + // Let's ensure that it won't break the original order. + sort.Slice(newTables, func(i, j int) bool { + return tbMapping[newTables[i].Name.String()] < tbMapping[newTables[j].Name.String()] + }) + + select { + case err, ok := <-errCh: + if ok { + return nil, nil, err + } + default: } return rewriteRules, newTables, nil } -// RemoveTiFlashReplica removes all the tiflash replicas of a table. -// TODO: remove this after tiflash supports restore. -func (rc *Client) RemoveTiFlashReplica( - tables []*utils.Table, newTables []*model.TableInfo, placementRules []placement.Rule) error { - schemas := make([]*backup.Schema, 0, len(tables)) - var updateReplica bool - // must use new table id to search placement rules - // here newTables and tables must have same order - for i, table := range tables { - if rule := utils.SearchPlacementRule(newTables[i].ID, placementRules, placement.Learner); rule != nil { - table.TiFlashReplicas = rule.Count - updateReplica = true - } - tableData, err := json.Marshal(newTables[i]) - if err != nil { - return errors.Trace(err) - } - dbData, err := json.Marshal(table.Db) +func (rc *Client) createTable(dom *domain.Domain, table *utils.Table, newTS uint64) (CreatedTable, error) { + if rc.IsSkipCreateSQL() { + log.Info("skip create table and alter autoIncID", zap.Stringer("table", table.Info.Name)) + } else { + err := rc.db.CreateTable(rc.ctx, table) if err != nil { - return errors.Trace(err) + return CreatedTable{}, err } - schemas = append(schemas, &backup.Schema{ - Db: dbData, - Table: tableData, - Crc64Xor: table.Crc64Xor, - TotalKvs: table.TotalKvs, - TotalBytes: table.TotalBytes, - TiflashReplicas: uint32(table.TiFlashReplicas), - }) } + newTableInfo, err := rc.GetTableSchema(dom, table.Db.Name, table.Info.Name) + if err != nil { + return CreatedTable{}, err + } + rules := GetRewriteRules(newTableInfo, table.Info, newTS) + et := CreatedTable{ + RewriteRule: rules, + Table: newTableInfo, + OldTable: table, + } + return et, nil +} - if updateReplica { - // Update backup meta - rc.backupMeta.Schemas = schemas - backupMetaData, err := proto.Marshal(rc.backupMeta) - if err != nil { - return errors.Trace(err) +// GoCreateTables create tables, and generate their information. +func (rc *Client) GoCreateTables( + ctx context.Context, + dom *domain.Domain, + tables []*utils.Table, + newTS uint64, + errCh chan<- error, +) <-chan CreatedTable { + // Could we have a smaller size of tables? + outCh := make(chan CreatedTable, len(tables)) + createOneTable := func(t *utils.Table) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: } - backendURL := storage.FormatBackendURL(rc.backend) - log.Info("update backup meta", zap.Stringer("path", &backendURL)) - err = rc.storage.Write(rc.ctx, utils.SavedMetaFile, backupMetaData) + rt, err := rc.createTable(dom, t, newTS) if err != nil { - return errors.Trace(err) + log.Error("create table failed", + zap.Error(err), + zap.Stringer("db", t.Db.Name), + zap.Stringer("table", t.Info.Name)) + return err } + log.Debug("table created and send to next", + zap.Int("output chan size", len(outCh)), + zap.Stringer("table", t.Info.Name), + zap.Stringer("database", t.Db.Name)) + outCh <- rt + return nil } + go func() { + defer close(outCh) + defer log.Info("all tables created") - for _, table := range tables { - if table.TiFlashReplicas > 0 { - err := rc.db.AlterTiflashReplica(rc.ctx, table, 0) + for _, table := range tables { + if err := createOneTable(table); err != nil { + errCh <- err + return + } + } + }() + return outCh +} + +// makeTiFlashOfTableRecord make a 'record' repsenting TiFlash of a table that has been removed. +// We doesn't record table ID here because when restore TiFlash replicas, +// we use `ALTER TABLE db.tbl SET TIFLASH_REPLICA = xxx` DDL, instead of use some internal TiDB API. +func makeTiFlashOfTableRecord(table *utils.Table, replica int) (*backup.Schema, error) { + tableData, err := json.Marshal(table.Info) + if err != nil { + return nil, errors.Trace(err) + } + dbData, err := json.Marshal(table.Db) + if err != nil { + return nil, errors.Trace(err) + } + result := &backup.Schema{ + Db: dbData, + Table: tableData, + Crc64Xor: table.Crc64Xor, + TotalKvs: table.TotalKvs, + TotalBytes: table.TotalBytes, + TiflashReplicas: uint32(replica), + } + return result, nil +} + +// RemoveTiFlashOfTable removes TiFlash replica of some table, +// returns the removed count of TiFlash nodes. +// TODO: save the removed TiFlash information into disk. +// TODO: remove this after tiflash supports restore. +func (rc *Client) RemoveTiFlashOfTable(table CreatedTable, rule []placement.Rule) (int, error) { + if rule := utils.SearchPlacementRule(table.Table.ID, rule, placement.Learner); rule != nil { + if rule.Count > 0 { + log.Info("remove TiFlash of table", zap.Int64("table ID", table.Table.ID), zap.Int("count", rule.Count)) + err := multierr.Combine( + rc.db.AlterTiflashReplica(rc.ctx, table.OldTable, 0), + rc.removeTiFlashOf(table.OldTable, rule.Count), + rc.flushTiFlashRecord(), + ) if err != nil { - return errors.Trace(err) + return 0, errors.Trace(err) } + return rule.Count, nil } } + return 0, nil +} + +func (rc *Client) removeTiFlashOf(table *utils.Table, replica int) error { + tableRecord, err := makeTiFlashOfTableRecord(table, replica) + if err != nil { + return err + } + rc.tablesRemovedTiFlash = append(rc.tablesRemovedTiFlash, tableRecord) + rc.tiFlashRecordUpdated = true return nil } -// RecoverTiFlashReplica recovers all the tiflash replicas of a table. +func (rc *Client) flushTiFlashRecord() error { + // Today nothing to do :D + if !rc.tiFlashRecordUpdated { + return nil + } + + // should we make a deep copy here? + // currently, write things directly to backup meta is OK since there seems nobody uses it. + // But would it be better if we don't do it? + rc.backupMeta.Schemas = rc.tablesRemovedTiFlash + backupMetaData, err := proto.Marshal(rc.backupMeta) + if err != nil { + return errors.Trace(err) + } + backendURL := storage.FormatBackendURL(rc.backend) + log.Info("update backup meta", zap.Stringer("path", &backendURL)) + err = rc.storage.Write(rc.ctx, utils.SavedMetaFile, backupMetaData) + if err != nil { + return errors.Trace(err) + } + return nil +} + +// RecoverTiFlashOfTable recovers TiFlash replica of some table. +// TODO: remove this after tiflash supports restore. +func (rc *Client) RecoverTiFlashOfTable(table *utils.Table) error { + if table.TiFlashReplicas > 0 { + err := rc.db.AlterTiflashReplica(rc.ctx, table, table.TiFlashReplicas) + if err != nil { + return errors.Trace(err) + } + } + return nil +} + +// RecoverTiFlashReplica recovers all the tiflash replicas of a table // TODO: remove this after tiflash supports restore. func (rc *Client) RecoverTiFlashReplica(tables []*utils.Table) error { for _, table := range tables { - if table.TiFlashReplicas > 0 { - err := rc.db.AlterTiflashReplica(rc.ctx, table, table.TiFlashReplicas) - if err != nil { - return errors.Trace(err) - } + if err := rc.RecoverTiFlashOfTable(table); err != nil { + return err } } return nil @@ -620,89 +731,94 @@ func (rc *Client) switchTiKVMode(ctx context.Context, mode import_sstpb.SwitchMo return nil } -// ValidateChecksum validate checksum after restore. -func (rc *Client) ValidateChecksum( +// GoValidateChecksum forks a goroutine to validate checksum after restore. +// it returns a channel fires a struct{} when all things get done. +func (rc *Client) GoValidateChecksum( ctx context.Context, + tableStream <-chan CreatedTable, kvClient kv.Client, - tables []*utils.Table, - newTables []*model.TableInfo, + errCh chan<- error, updateCh glue.Progress, -) error { - start := time.Now() - defer func() { - elapsed := time.Since(start) - summary.CollectDuration("restore checksum", elapsed) - }() - +) <-chan struct{} { log.Info("Start to validate checksum") - wg := new(sync.WaitGroup) - errCh := make(chan error) + outCh := make(chan struct{}, 1) workers := utils.NewWorkerPool(defaultChecksumConcurrency, "RestoreChecksum") go func() { - for i, t := range tables { - table := t - newTable := newTables[i] - wg.Add(1) - workers.Apply(func() { - defer wg.Done() - - if table.NoChecksum() { - log.Info("table doesn't have checksum, skipping checksum", - zap.Stringer("db", table.Db.Name), - zap.Stringer("table", table.Info.Name)) - updateCh.Inc() - return - } - - startTS, err := rc.GetTS(ctx) - if err != nil { - errCh <- errors.Trace(err) - return - } - exe, err := checksum.NewExecutorBuilder(newTable, startTS). - SetOldTable(table). - Build() - if err != nil { - errCh <- errors.Trace(err) + start := time.Now() + wg := new(sync.WaitGroup) + defer func() { + log.Info("all checksum ended") + wg.Wait() + elapsed := time.Since(start) + summary.CollectDuration("restore checksum", elapsed) + outCh <- struct{}{} + close(outCh) + }() + for { + select { + case <-ctx.Done(): + errCh <- ctx.Err() + case tbl, ok := <-tableStream: + if !ok { return } - checksumResp, err := exe.Execute(ctx, kvClient, func() { - // TODO: update progress here. + wg.Add(1) + workers.Apply(func() { + err := rc.execChecksum(ctx, tbl, kvClient) + if err != nil { + errCh <- err + } + updateCh.Inc() + wg.Done() }) - if err != nil { - errCh <- errors.Trace(err) - return - } - - if checksumResp.Checksum != table.Crc64Xor || - checksumResp.TotalKvs != table.TotalKvs || - checksumResp.TotalBytes != table.TotalBytes { - log.Error("failed in validate checksum", - zap.String("database", table.Db.Name.L), - zap.String("table", table.Info.Name.L), - zap.Uint64("origin tidb crc64", table.Crc64Xor), - zap.Uint64("calculated crc64", checksumResp.Checksum), - zap.Uint64("origin tidb total kvs", table.TotalKvs), - zap.Uint64("calculated total kvs", checksumResp.TotalKvs), - zap.Uint64("origin tidb total bytes", table.TotalBytes), - zap.Uint64("calculated total bytes", checksumResp.TotalBytes), - ) - errCh <- errors.New("failed to validate checksum") - return - } - - updateCh.Inc() - }) + } } - wg.Wait() - close(errCh) }() - for err := range errCh { - if err != nil { - return err - } + return outCh +} + +func (rc *Client) execChecksum(ctx context.Context, tbl CreatedTable, kvClient kv.Client) error { + if tbl.OldTable.NoChecksum() { + log.Warn("table has no checksum, skipping checksum", + zap.Stringer("table", tbl.OldTable.Info.Name), + zap.Stringer("database", tbl.OldTable.Db.Name), + ) + return nil + } + + startTS, err := rc.GetTS(ctx) + if err != nil { + return errors.Trace(err) + } + exe, err := checksum.NewExecutorBuilder(tbl.Table, startTS). + SetOldTable(tbl.OldTable). + Build() + if err != nil { + return errors.Trace(err) + } + checksumResp, err := exe.Execute(ctx, kvClient, func() { + // TODO: update progress here. + }) + if err != nil { + return errors.Trace(err) + } + + table := tbl.OldTable + if checksumResp.Checksum != table.Crc64Xor || + checksumResp.TotalKvs != table.TotalKvs || + checksumResp.TotalBytes != table.TotalBytes { + log.Error("failed in validate checksum", + zap.String("database", table.Db.Name.L), + zap.String("table", table.Info.Name.L), + zap.Uint64("origin tidb crc64", table.Crc64Xor), + zap.Uint64("calculated crc64", checksumResp.Checksum), + zap.Uint64("origin tidb total kvs", table.TotalKvs), + zap.Uint64("calculated total kvs", checksumResp.TotalKvs), + zap.Uint64("origin tidb total bytes", table.TotalBytes), + zap.Uint64("calculated total bytes", checksumResp.TotalBytes), + ) + return errors.New("failed to validate checksum") } - log.Info("validate checksum passed!!") return nil } diff --git a/pkg/restore/pipeline_items.go b/pkg/restore/pipeline_items.go new file mode 100644 index 000000000..d99f5bd8e --- /dev/null +++ b/pkg/restore/pipeline_items.go @@ -0,0 +1,200 @@ +// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. + +package restore + +import ( + "context" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/backup" + "github.com/pingcap/log" + "github.com/pingcap/parser/model" + "go.uber.org/zap" + + "github.com/pingcap/br/pkg/conn" + "github.com/pingcap/br/pkg/glue" + "github.com/pingcap/br/pkg/rtree" + "github.com/pingcap/br/pkg/utils" +) + +const ( + defaultBatcherOutputChannelSize = 1024 +) + +// ContextManager is the struct to manage a TiKV 'context' for restore. +// Batcher will call Enter when any table should be restore on batch, +// so you can do some prepare work here(e.g. set placement rules for online restore). +type ContextManager interface { + // Enter make some tables 'enter' this context(a.k.a., prepare for restore). + Enter(ctx context.Context, tables []CreatedTable) error + // Leave make some tables 'leave' this context(a.k.a., restore is done, do some post-works). + Leave(ctx context.Context, tables []CreatedTable) error +} + +// NewBRContextManager makes a BR context manager, that is, +// set placement rules for online restore when enter(see ), +// unset them when leave. +func NewBRContextManager(client *Client) ContextManager { + return &brContextManager{ + client: client, + + hasTable: make(map[int64]bool), + } +} + +type brContextManager struct { + client *Client + + // This 'set' of table ID allow us handle each table just once. + hasTable map[int64]bool +} + +func (manager *brContextManager) Enter(ctx context.Context, tables []CreatedTable) error { + placementRuleTables := make([]*model.TableInfo, 0, len(tables)) + + for _, tbl := range tables { + if !manager.hasTable[tbl.Table.ID] { + placementRuleTables = append(placementRuleTables, tbl.Table) + } + manager.hasTable[tbl.Table.ID] = true + } + + return splitPrepareWork(ctx, manager.client, placementRuleTables) +} + +func (manager *brContextManager) Leave(ctx context.Context, tables []CreatedTable) error { + placementRuleTables := make([]*model.TableInfo, 0, len(tables)) + + for _, table := range tables { + placementRuleTables = append(placementRuleTables, table.Table) + } + + splitPostWork(ctx, manager.client, placementRuleTables) + log.Info("restore table done", ZapTables(tables)) + return nil +} + +func splitPostWork(ctx context.Context, client *Client, tables []*model.TableInfo) { + err := client.ResetPlacementRules(ctx, tables) + if err != nil { + log.Warn("reset placement rules failed", zap.Error(err)) + return + } +} + +func splitPrepareWork(ctx context.Context, client *Client, tables []*model.TableInfo) error { + err := client.SetupPlacementRules(ctx, tables) + if err != nil { + log.Error("setup placement rules failed", zap.Error(err)) + return errors.Trace(err) + } + + err = client.WaitPlacementSchedule(ctx, tables) + if err != nil { + log.Error("wait placement schedule failed", zap.Error(err)) + return errors.Trace(err) + } + return nil +} + +// CreatedTable is a table created on restore process, +// but not yet filled with data. +type CreatedTable struct { + RewriteRule *RewriteRules + Table *model.TableInfo + OldTable *utils.Table +} + +// TableWithRange is a CreatedTable that has been bind to some of key ranges. +type TableWithRange struct { + CreatedTable + + Range []rtree.Range +} + +// Exhaust drains all remaining errors in the channel, into a slice of errors. +func Exhaust(ec <-chan error) []error { + out := make([]error, 0, len(ec)) + for { + select { + case err := <-ec: + out = append(out, err) + default: + // errCh will NEVER be closed(ya see, it has multi sender-part), + // so we just consume the current backlog of this channel, then return. + return out + } + } +} + +// BatchSender is the abstract of how the batcher send a batch. +type BatchSender interface { + // RestoreBatch will send the restore request. + RestoreBatch(ctx context.Context, ranges []rtree.Range, rewriteRules *RewriteRules) error + Close() +} + +type tikvSender struct { + client *Client + updateCh glue.Progress + rejectStoreMap map[uint64]bool +} + +// NewTiKVSender make a sender that send restore requests to TiKV. +func NewTiKVSender( + ctx context.Context, + cli *Client, + updateCh glue.Progress, + // TODO remove this field after we support TiFlash. + removeTiFlash bool, +) (BatchSender, error) { + rejectStoreMap := make(map[uint64]bool) + if removeTiFlash { + tiflashStores, err := conn.GetAllTiKVStores(ctx, cli.GetPDClient(), conn.TiFlashOnly) + if err != nil { + log.Error("failed to get and remove TiFlash replicas", zap.Error(err)) + return nil, err + } + for _, store := range tiflashStores { + rejectStoreMap[store.GetId()] = true + } + } + + return &tikvSender{ + client: cli, + updateCh: updateCh, + rejectStoreMap: rejectStoreMap, + }, nil +} + +func (b *tikvSender) RestoreBatch(ctx context.Context, ranges []rtree.Range, rewriteRules *RewriteRules) error { + if err := SplitRanges(ctx, b.client, ranges, rewriteRules, b.updateCh); err != nil { + log.Error("failed on split range", + zap.Any("ranges", ranges), + zap.Error(err), + ) + return err + } + + files := []*backup.File{} + for _, fs := range ranges { + files = append(files, fs.Files...) + } + + if err := b.client.RestoreFiles(files, rewriteRules, b.rejectStoreMap, b.updateCh); err != nil { + return err + } + + log.Info("restore batch done", + append( + ZapRanges(ranges), + zap.Int("file count", len(files)), + )..., + ) + + return nil +} + +func (b *tikvSender) Close() { + // don't close update channel here, since we may need it then. +} diff --git a/pkg/restore/range.go b/pkg/restore/range.go index 6459b0284..0e016dbde 100644 --- a/pkg/restore/range.go +++ b/pkg/restore/range.go @@ -78,3 +78,17 @@ type RewriteRules struct { Table []*import_sstpb.RewriteRule Data []*import_sstpb.RewriteRule } + +// Append append its argument to this rewrite rules. +func (r *RewriteRules) Append(other RewriteRules) { + r.Data = append(r.Data, other.Data...) + r.Table = append(r.Table, other.Table...) +} + +// EmptyRewriteRule make a new, empty rewrite rule. +func EmptyRewriteRule() *RewriteRules { + return &RewriteRules{ + Table: []*import_sstpb.RewriteRule{}, + Data: []*import_sstpb.RewriteRule{}, + } +} diff --git a/pkg/restore/util.go b/pkg/restore/util.go index 7c6ec37a9..5970c0add 100644 --- a/pkg/restore/util.go +++ b/pkg/restore/util.go @@ -6,6 +6,7 @@ import ( "bytes" "context" "encoding/hex" + "fmt" "strings" "time" @@ -19,6 +20,7 @@ import ( "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/rtree" @@ -139,6 +141,17 @@ func GetSSTMetaFromFile( } } +// EstimateRangeSize estimates the total range count by file. +func EstimateRangeSize(files []*backup.File) int { + result := 0 + for _, f := range files { + if strings.HasSuffix(f.GetName(), "_write.sst") { + result++ + } + } + return result +} + // ValidateFileRanges checks and returns the ranges of the files. func ValidateFileRanges( files []*backup.File, @@ -150,29 +163,113 @@ func ValidateFileRanges( for _, file := range files { // We skips all default cf files because we don't range overlap. if !fileAppended[file.GetName()] && strings.Contains(file.GetName(), "write") { - err := ValidateFileRewriteRule(file, rewriteRules) + rng, err := validateAndGetFileRange(file, rewriteRules) if err != nil { return nil, err } - startID := tablecodec.DecodeTableID(file.GetStartKey()) - endID := tablecodec.DecodeTableID(file.GetEndKey()) - if startID != endID { - log.Error("table ids dont match", - zap.Int64("startID", startID), - zap.Int64("endID", endID), - zap.Stringer("file", file)) - return nil, errors.New("table ids dont match") - } - ranges = append(ranges, rtree.Range{ - StartKey: file.GetStartKey(), - EndKey: file.GetEndKey(), - }) + ranges = append(ranges, rng) fileAppended[file.GetName()] = true } } return ranges, nil } +// MapTableToFiles makes a map that mapping table ID to its backup files. +// aware that one file can and only can hold one table. +func MapTableToFiles(files []*backup.File) map[int64][]*backup.File { + result := map[int64][]*backup.File{} + for _, file := range files { + tableID := tablecodec.DecodeTableID(file.GetStartKey()) + tableEndID := tablecodec.DecodeTableID(file.GetEndKey()) + if tableID != tableEndID { + log.Panic("key range spread between many files.", + zap.String("file name", file.Name), + zap.Binary("start key", file.GetStartKey()), + zap.Binary("end key", file.GetEndKey())) + } + if tableID == 0 { + log.Panic("invalid table key of file", + zap.String("file name", file.Name), + zap.Binary("start key", file.GetStartKey()), + zap.Binary("end key", file.GetEndKey())) + } + result[tableID] = append(result[tableID], file) + } + return result +} + +// GoValidateFileRanges validate files by a stream of tables and yields tables with range. +func GoValidateFileRanges( + ctx context.Context, + tableStream <-chan CreatedTable, + fileOfTable map[int64][]*backup.File, + errCh chan<- error, +) <-chan TableWithRange { + // Could we have a smaller outCh size? + outCh := make(chan TableWithRange, len(fileOfTable)) + go func() { + defer close(outCh) + defer log.Info("all range generated") + for { + select { + case <-ctx.Done(): + errCh <- ctx.Err() + return + case t, ok := <-tableStream: + if !ok { + return + } + files := fileOfTable[t.OldTable.Info.ID] + if partitions := t.OldTable.Info.Partition; partitions != nil { + log.Debug("table partition", + zap.Stringer("database", t.OldTable.Db.Name), + zap.Stringer("table", t.Table.Name), + zap.Any("partition info", partitions), + ) + for _, partition := range partitions.Definitions { + files = append(files, fileOfTable[partition.ID]...) + } + } + ranges, err := ValidateFileRanges(files, t.RewriteRule) + if err != nil { + errCh <- err + return + } + tableWithRange := TableWithRange{ + CreatedTable: t, + Range: AttachFilesToRanges(files, ranges), + } + log.Debug("sending range info", + zap.Stringer("table", t.Table.Name), + zap.Int("files", len(files)), + zap.Int("range size", len(ranges)), + zap.Int("output channel size", len(outCh))) + outCh <- tableWithRange + } + } + }() + return outCh +} + +// validateAndGetFileRange validates a file, if success, return the key range of this file. +func validateAndGetFileRange(file *backup.File, rules *RewriteRules) (rtree.Range, error) { + err := ValidateFileRewriteRule(file, rules) + if err != nil { + return rtree.Range{}, err + } + startID := tablecodec.DecodeTableID(file.GetStartKey()) + endID := tablecodec.DecodeTableID(file.GetEndKey()) + if startID != endID { + log.Error("table ids mismatch", + zap.Int64("startID", startID), + zap.Int64("endID", endID), + zap.Stringer("file", file)) + return rtree.Range{}, errors.New("table ids mismatch") + } + r := rtree.Range{StartKey: file.GetStartKey(), EndKey: file.GetEndKey()} + return r, nil +} + // AttachFilesToRanges attach files to ranges. // Panic if range is overlapped or no range for files. func AttachFilesToRanges( @@ -442,3 +539,30 @@ func waitForRemoveRejectStores( return false } + +// ZapTables make zap field of table for debuging, including table names. +func ZapTables(tables []CreatedTable) zapcore.Field { + tableNames := make([]string, 0, len(tables)) + for _, t := range tables { + tableNames = append(tableNames, fmt.Sprintf("%s.%s", t.OldTable.Db.Name, t.OldTable.Info.Name)) + } + return zap.Strings("tables", tableNames) +} + +// ZapRanges make zap fields for debuging, which contains kv, size and count of ranges. +func ZapRanges(ranges []rtree.Range) []zapcore.Field { + totalKV := uint64(0) + totalSize := uint64(0) + for _, r := range ranges { + for _, f := range r.Files { + totalKV += f.GetTotalKvs() + totalSize += f.GetTotalBytes() + } + } + + return []zap.Field{ + zap.Int("ranges", len(ranges)), + zap.Uint64("total kv", totalKV), + zap.Uint64("total size", totalSize), + } +} diff --git a/pkg/restore/util_test.go b/pkg/restore/util_test.go index 6b0d760f2..8e6e42c26 100644 --- a/pkg/restore/util_test.go +++ b/pkg/restore/util_test.go @@ -40,6 +40,32 @@ func (s *testRestoreUtilSuite) TestGetSSTMetaFromFile(c *C) { c.Assert(string(sstMeta.GetRange().GetEnd()), Equals, "t2\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff") } +func (s *testRestoreUtilSuite) TestMapTableToFiles(c *C) { + filesOfTable1 := []*backup.File{ + {Name: "table1-1.sst", + StartKey: tablecodec.EncodeTablePrefix(1), + EndKey: tablecodec.EncodeTablePrefix(1)}, + {Name: "table1-2.sst", + StartKey: tablecodec.EncodeTablePrefix(1), + EndKey: tablecodec.EncodeTablePrefix(1)}, + {Name: "table1-3.sst", + StartKey: tablecodec.EncodeTablePrefix(1), + EndKey: tablecodec.EncodeTablePrefix(1)}} + filesOfTable2 := []*backup.File{ + {Name: "table2-1.sst", + StartKey: tablecodec.EncodeTablePrefix(2), + EndKey: tablecodec.EncodeTablePrefix(2)}, + {Name: "table2-2.sst", + StartKey: tablecodec.EncodeTablePrefix(2), + EndKey: tablecodec.EncodeTablePrefix(2)}, + } + + result := restore.MapTableToFiles(append(filesOfTable2, filesOfTable1...)) + + c.Assert(result[1], DeepEquals, filesOfTable1) + c.Assert(result[2], DeepEquals, filesOfTable2) +} + func (s *testRestoreUtilSuite) TestValidateFileRanges(c *C) { rules := &restore.RewriteRules{ Table: []*import_sstpb.RewriteRule{&import_sstpb.RewriteRule{ @@ -94,7 +120,7 @@ func (s *testRestoreUtilSuite) TestValidateFileRanges(c *C) { }}, rules, ) - c.Assert(err, ErrorMatches, "table ids dont match") + c.Assert(err, ErrorMatches, "table ids mismatch") // Add a bad rule for end key, after rewrite start key > end key. rules.Table = append(rules.Table[:1], &import_sstpb.RewriteRule{ diff --git a/pkg/task/restore.go b/pkg/task/restore.go index a176c505e..684665135 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -5,19 +5,20 @@ package task import ( "context" "math" + "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/log" - "github.com/pingcap/parser/model" "github.com/pingcap/tidb/config" "github.com/spf13/pflag" + "go.uber.org/multierr" "go.uber.org/zap" "github.com/pingcap/br/pkg/conn" "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/restore" - "github.com/pingcap/br/pkg/rtree" "github.com/pingcap/br/pkg/storage" "github.com/pingcap/br/pkg/summary" "github.com/pingcap/br/pkg/utils" @@ -185,36 +186,23 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } } - rewriteRules, newTables, err := client.CreateTables(mgr.GetDomain(), tables, newTS) - if err != nil { - return err - } + // We make bigger errCh so we won't block on multi-part failed. + errCh := make(chan error, 32) + tableStream := client.GoCreateTables(ctx, mgr.GetDomain(), tables, newTS, errCh) if len(files) == 0 { log.Info("no files, empty databases and tables are restored") summary.SetSuccessStatus(true) - return nil + // don't return immediately, wait all pipeline done. } - ranges, err := restore.ValidateFileRanges(files, rewriteRules) - if err != nil { - return err - } - summary.CollectInt("restore ranges", len(ranges)) + tableFileMap := restore.MapTableToFiles(files) + log.Debug("mapped table to files", zap.Any("result map", tableFileMap)) - if err = splitPrepareWork(ctx, client, newTables); err != nil { - return err - } - defer splitPostWork(ctx, client, newTables) - - ranges = restore.AttachFilesToRanges(files, ranges) + rangeStream := restore.GoValidateFileRanges(ctx, tableStream, tableFileMap, errCh) - // Redirect to log if there is no log file to avoid unreadable output. - updateCh := g.StartProgress( - ctx, - cmdName, - // Split/Scatter + Download/Ingest - int64(len(ranges)+len(files)), - !cfg.LogProgress) + rangeSize := restore.EstimateRangeSize(files) + summary.CollectInt("restore ranges", rangeSize) + log.Info("range and file prepared", zap.Int("file count", len(files)), zap.Int("range count", rangeSize)) clusterCfg, err := restorePreWork(ctx, client, mgr) if err != nil { @@ -222,14 +210,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } // Always run the post-work even on error, so we don't stuck in the import // mode or emptied schedulers - shouldRestorePostWork := true - restorePostWork := func() { - if shouldRestorePostWork { - shouldRestorePostWork = false - restorePostWork(ctx, client, mgr, clusterCfg) - } - } - defer restorePostWork() + defer restorePostWork(ctx, client, mgr, clusterCfg) // Do not reset timestamp if we are doing incremental restore, because // we are not allowed to decrease timestamp. @@ -242,78 +223,48 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf // Restore sst files in batch. batchSize := utils.ClampInt(int(cfg.Concurrency), defaultRestoreConcurrency, maxRestoreBatchSizeLimit) + failpoint.Inject("small-batch-size", func(v failpoint.Value) { + log.Info("failpoint small batch size is on", zap.Int("size", v.(int))) + batchSize = v.(int) + }) - rejectStoreMap := make(map[uint64]bool) - if cfg.RemoveTiFlash { - placementRules, err := client.GetPlacementRules(cfg.PD) - if err != nil { - return err - } - - err = client.RemoveTiFlashReplica(tables, newTables, placementRules) - if err != nil { - return err - } - - defer func() { - _ = client.RecoverTiFlashReplica(tables) - }() - - tiflashStores, err := conn.GetAllTiKVStores(ctx, client.GetPDClient(), conn.TiFlashOnly) - if err != nil { - return errors.Trace(err) - } - for _, store := range tiflashStores { - rejectStoreMap[store.GetId()] = true - } + // Redirect to log if there is no log file to avoid unreadable output. + updateCh := g.StartProgress( + ctx, + cmdName, + // Split/Scatter + Download/Ingest + Checksum + int64(rangeSize+len(files)+len(tables)), + !cfg.LogProgress) + defer updateCh.Close() + sender, err := restore.NewTiKVSender(ctx, client, updateCh, cfg.RemoveTiFlash) + if err != nil { + return err } + manager := restore.NewBRContextManager(client) + batcher, afterRestoreStream := restore.NewBatcher(ctx, sender, manager, errCh) + batcher.SetThreshold(batchSize) + batcher.EnableAutoCommit(ctx, time.Second) + go restoreTableStream(ctx, rangeStream, cfg.RemoveTiFlash, cfg.PD, client, batcher, errCh) - for { - if len(ranges) == 0 { - break - } - batchSize = utils.MinInt(batchSize, len(ranges)) - var rangeBatch []rtree.Range - ranges, rangeBatch = ranges[batchSize:], ranges[0:batchSize:batchSize] - - // Split regions by the given rangeBatch. - err = restore.SplitRanges(ctx, client, rangeBatch, rewriteRules, updateCh) - if err != nil { - log.Error("split regions failed", zap.Error(err)) - // If any error happened, return now, don't execute checksum. - return err - } - - // Collect related files in the given rangeBatch. - fileBatch := make([]*backup.File, 0, 2*len(rangeBatch)) - for _, rg := range rangeBatch { - fileBatch = append(fileBatch, rg.Files...) - } - - // After split, we can restore backup files. - err = client.RestoreFiles(fileBatch, rewriteRules, rejectStoreMap, updateCh) - if err != nil { - // If any error happened, return now, don't execute checksum. - return err - } + var finish <-chan struct{} + // Checksum + if cfg.Checksum { + finish = client.GoValidateChecksum( + ctx, afterRestoreStream, mgr.GetTiKV().GetClient(), errCh, updateCh) + } else { + // when user skip checksum, just collect tables, and drop them. + finish = dropToBlackhole(ctx, afterRestoreStream, errCh, updateCh) } - // Restore has finished. - updateCh.Close() - - // Restore TiKV/PD config before validating checksum. - restorePostWork() + select { + case err = <-errCh: + err = multierr.Append(err, multierr.Combine(restore.Exhaust(errCh)...)) + case <-finish: + } - // Checksum - if cfg.Checksum { - updateCh = g.StartProgress( - ctx, "Checksum", int64(len(newTables)), !cfg.LogProgress) - err = client.ValidateChecksum( - ctx, mgr.GetTiKV().GetClient(), tables, newTables, updateCh) - if err != nil { - return err - } - updateCh.Close() + // If any error happened, return now. + if err != nil { + return err } // Set task summary to success status. @@ -321,6 +272,35 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf return nil } +// dropToBlackhole drop all incoming tables into black hole, +// i.e. don't execute checksum, just increase the process anyhow. +func dropToBlackhole( + ctx context.Context, + tableStream <-chan restore.CreatedTable, + errCh chan<- error, + updateCh glue.Progress, +) <-chan struct{} { + outCh := make(chan struct{}, 1) + go func() { + defer func() { + outCh <- struct{}{} + }() + for { + select { + case <-ctx.Done(): + errCh <- ctx.Err() + return + case _, ok := <-tableStream: + if !ok { + return + } + updateCh.Inc() + } + } + }() + return outCh +} + func filterRestoreFiles( client *restore.Client, cfg *RestoreConfig, @@ -441,6 +421,7 @@ func removePDLeaderScheduler(ctx context.Context, mgr *conn.Mgr, existSchedulers } // restorePostWork executes some post work after restore. +// TODO: aggregate all lifetime manage methods into batcher's context manager field. func restorePostWork( ctx context.Context, client *restore.Client, mgr *conn.Mgr, clusterCfg clusterConfig, ) { @@ -478,6 +459,9 @@ func restorePostWork( if err := mgr.UpdatePDScheduleConfig(ctx, scheduleLimitCfg); err != nil { log.Warn("fail to update PD schedule config") } + if err := client.ResetRestoreLabels(ctx); err != nil { + log.Warn("reset store labels failed", zap.Error(err)) + } } func addPDLeaderScheduler(ctx context.Context, mgr *conn.Mgr, removedSchedulers []string) error { @@ -490,34 +474,6 @@ func addPDLeaderScheduler(ctx context.Context, mgr *conn.Mgr, removedSchedulers return nil } -func splitPrepareWork(ctx context.Context, client *restore.Client, tables []*model.TableInfo) error { - err := client.SetupPlacementRules(ctx, tables) - if err != nil { - log.Error("setup placement rules failed", zap.Error(err)) - return errors.Trace(err) - } - - err = client.WaitPlacementSchedule(ctx, tables) - if err != nil { - log.Error("wait placement schedule failed", zap.Error(err)) - return errors.Trace(err) - } - return nil -} - -func splitPostWork(ctx context.Context, client *restore.Client, tables []*model.TableInfo) { - err := client.ResetPlacementRules(ctx, tables) - if err != nil { - log.Warn("reset placement rules failed", zap.Error(err)) - return - } - - err = client.ResetRestoreLabels(ctx) - if err != nil { - log.Warn("reset store labels failed", zap.Error(err)) - } -} - // RunRestoreTiflashReplica restores the replica of tiflash saved in the last restore. func RunRestoreTiflashReplica(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error { defer summary.Summary(cmdName) @@ -585,3 +541,61 @@ func enableTiDBConfig() { config.StoreGlobalConfig(conf) } + +// restoreTableStream blocks current goroutine and restore a stream of tables, +// by send tables to batcher. +func restoreTableStream( + ctx context.Context, + inputCh <-chan restore.TableWithRange, + // TODO: remove this field and rules field after we support TiFlash + removeTiFlashReplica bool, + pdAddr []string, + client *restore.Client, + batcher *restore.Batcher, + errCh chan<- error, +) { + // We cache old tables so that we can 'batch' recover TiFlash and tables. + oldTables := []*utils.Table{} + defer func() { + // when things done, we must clean pending requests. + batcher.Close() + log.Info("doing postwork", + zap.Int("table count", len(oldTables)), + ) + if err := client.RecoverTiFlashReplica(oldTables); err != nil { + log.Error("failed on recover TiFlash replicas", zap.Error(err)) + errCh <- err + } + }() + + for { + select { + case <-ctx.Done(): + errCh <- ctx.Err() + return + case t, ok := <-inputCh: + if !ok { + return + } + if removeTiFlashReplica { + rules, err := client.GetPlacementRules(pdAddr) + if err != nil { + errCh <- err + return + } + log.Debug("get rules", zap.Any("rules", rules)) + log.Debug("try to remove tiflash of table", zap.Stringer("table name", t.Table.Name)) + tiFlashRep, err := client.RemoveTiFlashOfTable(t.CreatedTable, rules) + if err != nil { + log.Error("failed on remove TiFlash replicas", zap.Error(err)) + errCh <- err + return + } + t.OldTable.TiFlashReplicas = tiFlashRep + } + oldTables = append(oldTables, t.OldTable) + + batcher.Add(t) + } + } +} diff --git a/tests/br_db_online_newkv/run.sh b/tests/br_db_online_newkv/run.sh index d8c3f15ff..528fed852 100755 --- a/tests/br_db_online_newkv/run.sh +++ b/tests/br_db_online_newkv/run.sh @@ -66,6 +66,7 @@ sleep 5 echo "restore start..." run_br restore db --db $DB -s "local://$TEST_DIR/$DB" --pd $PD_ADDR --online +# TODO we should check whether the restore RPCs are send to the new TiKV. table_count=$(run_sql "use $DB; show tables;" | grep "Tables_in" | wc -l) if [ "$table_count" -ne "2" ];then echo "TEST: [$TEST_NAME] failed!" diff --git a/tests/br_small_batch_size/run.sh b/tests/br_small_batch_size/run.sh new file mode 100755 index 000000000..aea469fac --- /dev/null +++ b/tests/br_small_batch_size/run.sh @@ -0,0 +1,76 @@ +#!/bin/sh +# +# Copyright 2020 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# See the License for the specific language governing permissions and +# limitations under the License. + +random_values() { + length=$1 + count=$2 + python -c " +import random +import string +for ignored in range($count): + print(''.join(random.choice(string.ascii_letters) for _ in range($length)))" | + awk '{print "(1" $1 "1)"}' | + tr "\n1" ",'" | + sed 's/,$//' +} + +create_and_insert() { + table_name=$1 + record_count=$2 + run_sql "CREATE TABLE $DB.$table_name(k varchar(256) primary key)" + stmt="INSERT INTO $DB.$table_name VALUES `random_values 255 $record_count`" + echo $stmt | mysql -uroot -h127.0.0.1 -P4000 +} + +check_size() { + table_name=$1 + record_count=$2 + + count=`run_sql 'select count(*) from $DB.$table_name' | awk '/count/{print $2}'` + + if [ $count -ne $record_count ]; then + echo "check size failed: $count vs $record_count" + fi +} + +set -eu +DB="$TEST_NAME" +TABLE="usertable" + +run_sql "CREATE DATABASE $DB;" + +record_counts=(10000 10010 10086) +for i in $record_counts; do + create_and_insert "t$i" $i +done +go-ycsb load mysql -P tests/$TEST_NAME/workload -p mysql.host=$TIDB_IP -p mysql.port=$TIDB_PORT -p mysql.user=root -p mysql.db=$DB + + +echo "backup start..." +backup_dir="$TEST_DIR/${TEST_NAME}_backup" +rm -rf $backup_dir +run_br backup full -s "local://$backup_dir" --pd $PD_ADDR + +run_sql "drop database $DB" + + +echo "restore start..." +GO_FAILPOINTS="github.com/pingcap/br/pkg/task/small-batch-size=return(2)" \ +run_br restore full -s "local://$backup_dir" --pd $PD_ADDR --ratelimit 1024 + +for i in $record_counts; do + check_size "t$i" $i +done +check_size $TABLE 10000 diff --git a/tests/br_small_batch_size/workload b/tests/br_small_batch_size/workload new file mode 100644 index 000000000..caba5e1ca --- /dev/null +++ b/tests/br_small_batch_size/workload @@ -0,0 +1,12 @@ +recordcount=30000 +operationcount=0 +workload=core + +readallfields=true + +readproportion=0 +updateproportion=0 +scanproportion=0 +insertproportion=0 + +requestdistribution=uniform \ No newline at end of file