Skip to content

Commit

Permalink
br: pipeline wait tiflash synced (#43726) (#45018)
Browse files Browse the repository at this point in the history
close #43828
  • Loading branch information
ti-chi-bot authored Aug 16, 2023
1 parent 3af566e commit 9d2e744
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 113 deletions.
2 changes: 2 additions & 0 deletions br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ go_library(
"//ddl",
"//ddl/util",
"//domain",
"//domain/infosync",
"//kv",
"//meta",
"//parser/model",
"//parser/mysql",
"//sessionctx/variable",
"//statistics/handle",
"//store/helper",
"//store/pdtypes",
"//tablecodec",
"//util",
Expand Down
13 changes: 7 additions & 6 deletions br/pkg/restore/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type Batcher struct {
// sendCh is for communiate with sendWorker.
sendCh chan<- SendType
// outCh is for output the restored table, so it can be sent to do something like checksum.
outCh chan<- CreatedTable
outCh chan<- *CreatedTable

sender BatchSender
manager ContextManager
Expand Down Expand Up @@ -86,7 +86,8 @@ func (b *Batcher) contextCleaner(ctx context.Context, tables <-chan []CreatedTab
return
}
for _, tbl := range tbls {
b.outCh <- tbl
cloneTable := tbl
b.outCh <- &cloneTable
}
}
}
Expand All @@ -102,13 +103,13 @@ func NewBatcher(
sender BatchSender,
manager ContextManager,
errCh chan<- error,
) (*Batcher, <-chan CreatedTable) {
output := make(chan CreatedTable, defaultChannelSize)
) (*Batcher, chan *CreatedTable) {
outCh := DefaultOutputTableChan()
sendChan := make(chan SendType, 2)
b := &Batcher{
rewriteRules: EmptyRewriteRule(),
sendErr: errCh,
outCh: output,
outCh: outCh,
sender: sender,
manager: manager,
sendCh: sendChan,
Expand All @@ -122,7 +123,7 @@ func NewBatcher(
go b.contextCleaner(ctx, restoredTables)
sink := chanTableSink{restoredTables, errCh}
sender.PutSink(sink)
return b, output
return b, outCh
}

// EnableAutoCommit enables the batcher commit batch periodically even batcher size isn't big enough.
Expand Down
263 changes: 174 additions & 89 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ import (
"github.com/pingcap/tidb/config"
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/store/pdtypes"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
Expand Down Expand Up @@ -1325,77 +1327,83 @@ func (rc *Client) switchTiKVMode(ctx context.Context, mode import_sstpb.SwitchMo
return nil
}

func concurrentHandleTablesCh(
ctx context.Context,
inCh <-chan *CreatedTable,
outCh chan<- *CreatedTable,
errCh chan<- error,
workers *utils.WorkerPool,
processFun func(context.Context, *CreatedTable) error,
deferFun func()) {
eg, ectx := errgroup.WithContext(ctx)
defer func() {
if err := eg.Wait(); err != nil {
errCh <- err
}
close(outCh)
deferFun()
}()

for {
select {
// if we use ectx here, maybe canceled will mask real error.
case <-ctx.Done():
errCh <- ctx.Err()
case tbl, ok := <-inCh:
if !ok {
return
}
cloneTable := tbl
worker := workers.ApplyWorker()
eg.Go(func() error {
defer workers.RecycleWorker(worker)
err := processFun(ectx, cloneTable)
if err != nil {
return err
}
outCh <- cloneTable
return nil
})
}
}
}

// GoValidateChecksum forks a goroutine to validate checksum after restore.
// it returns a channel fires a struct{} when all things get done.
func (rc *Client) GoValidateChecksum(
ctx context.Context,
tableStream <-chan CreatedTable,
inCh <-chan *CreatedTable,
kvClient kv.Client,
errCh chan<- error,
updateCh glue.Progress,
concurrency uint,
) <-chan struct{} {
) <-chan *CreatedTable {
log.Info("Start to validate checksum")
outCh := make(chan struct{}, 1)
wg := new(sync.WaitGroup)
wg.Add(2)
loadStatCh := make(chan *CreatedTable, 1024)
// run the stat loader
go func() {
defer wg.Done()
rc.updateMetaAndLoadStats(ctx, loadStatCh)
}()
outCh := DefaultOutputTableChan()
workers := utils.NewWorkerPool(defaultChecksumConcurrency, "RestoreChecksum")
go func() {
eg, ectx := errgroup.WithContext(ctx)
go concurrentHandleTablesCh(ctx, inCh, outCh, errCh, workers, func(c context.Context, tbl *CreatedTable) error {
start := time.Now()
defer func() {
if err := eg.Wait(); err != nil {
errCh <- err
}
close(loadStatCh)
wg.Done()
elapsed := time.Since(start)
summary.CollectSuccessUnit("table checksum", 1, elapsed)
}()

for {
select {
// if we use ectx here, maybe canceled will mask real error.
case <-ctx.Done():
errCh <- ctx.Err()
case tbl, ok := <-tableStream:
if !ok {
return
}

workers.ApplyOnErrorGroup(eg, func() error {
start := time.Now()
defer func() {
elapsed := time.Since(start)
summary.CollectSuccessUnit("table checksum", 1, elapsed)
}()
err := rc.execChecksum(ectx, tbl, kvClient, concurrency, loadStatCh)
if err != nil {
return errors.Trace(err)
}
updateCh.Inc()
return nil
})
}
err := rc.execChecksum(c, tbl, kvClient, concurrency)
if err != nil {
return errors.Trace(err)
}
}()
go func() {
wg.Wait()
updateCh.Inc()
return nil
}, func() {
log.Info("all checksum ended")
close(outCh)
}()
})
return outCh
}

func (rc *Client) execChecksum(
ctx context.Context,
tbl CreatedTable,
tbl *CreatedTable,
kvClient kv.Client,
concurrency uint,
loadStatCh chan<- *CreatedTable,
) error {
logger := log.With(
zap.String("db", tbl.OldTable.DB.Name.O),
Expand Down Expand Up @@ -1446,48 +1454,109 @@ func (rc *Client) execChecksum(
return errors.Annotate(berrors.ErrRestoreChecksumMismatch, "failed to validate checksum")
}

loadStatCh <- &tbl
logger.Info("success in validate checksum")
return nil
}

func (rc *Client) updateMetaAndLoadStats(ctx context.Context, input <-chan *CreatedTable) {
for {
select {
case <-ctx.Done():
return
case tbl, ok := <-input:
if !ok {
return
func (rc *Client) GoUpdateMetaAndLoadStats(ctx context.Context, inCh <-chan *CreatedTable, errCh chan<- error) chan *CreatedTable {
log.Info("Start to update meta then load stats")
outCh := DefaultOutputTableChan()
workers := utils.NewWorkerPool(1, "UpdateStats")
go concurrentHandleTablesCh(ctx, inCh, outCh, errCh, workers, func(c context.Context, tbl *CreatedTable) error {
oldTable := tbl.OldTable
// Not need to return err when failed because of update analysis-meta
restoreTS, err := rc.GetTSWithRetry(ctx)
if err != nil {
log.Error("getTS failed", zap.Error(err))
} else {
log.Info("start update metas",
zap.Stringer("table", oldTable.Info.Name),
zap.Stringer("db", oldTable.DB.Name))
err = rc.db.UpdateStatsMeta(ctx, tbl.Table.ID, restoreTS, oldTable.TotalKvs)
if err != nil {
log.Error("update stats meta failed", zap.Any("table", tbl.Table), zap.Error(err))
}
}

// Not need to return err when failed because of update analysis-meta
restoreTS, err := rc.GetTSWithRetry(ctx)
if err != nil {
log.Error("getTS failed", zap.Error(err))
} else {
err = rc.db.UpdateStatsMeta(ctx, tbl.Table.ID, restoreTS, tbl.OldTable.TotalKvs)
if err != nil {
log.Error("update stats meta failed", zap.Any("table", tbl.Table), zap.Error(err))
}
if oldTable.Stats != nil {
log.Info("start loads analyze after validate checksum",
zap.Int64("old id", oldTable.Info.ID),
zap.Int64("new id", tbl.Table.ID),
)
start := time.Now()
if err := rc.statsHandler.LoadStatsFromJSON(rc.dom.InfoSchema(), oldTable.Stats); err != nil {
log.Error("analyze table failed", zap.Any("table", oldTable.Stats), zap.Error(err))
}
log.Info("restore stat done",
zap.Stringer("table", oldTable.Info.Name),
zap.Stringer("db", oldTable.DB.Name),
zap.Duration("cost", time.Since(start)))
}
return nil
}, func() {
log.Info("all stats updated")
})
return outCh
}

table := tbl.OldTable
if table.Stats != nil {
log.Info("start loads analyze after validate checksum",
zap.Int64("old id", tbl.OldTable.Info.ID),
zap.Int64("new id", tbl.Table.ID),
)
start := time.Now()
if err := rc.statsHandler.LoadStatsFromJSON(rc.dom.InfoSchema(), table.Stats); err != nil {
log.Error("analyze table failed", zap.Any("table", table.Stats), zap.Error(err))
}
log.Info("restore stat done",
zap.String("table", table.Info.Name.L),
zap.String("db", table.DB.Name.L),
zap.Duration("cost", time.Since(start)))
func (rc *Client) GoWaitTiFlashReady(ctx context.Context, inCh <-chan *CreatedTable, updateCh glue.Progress, errCh chan<- error) chan *CreatedTable {
log.Info("Start to wait tiflash replica sync")
outCh := DefaultOutputTableChan()
workers := utils.NewWorkerPool(4, "WaitForTiflashReady")
// TODO support tiflash store changes
tikvStats, err := infosync.GetTiFlashStoresStat(context.Background())
if err != nil {
errCh <- err
}
tiFlashStores := make(map[int64]helper.StoreStat)
for _, store := range tikvStats.Stores {
for _, l := range store.Store.Labels {
if l.Key == "engine" && l.Value == "tiflash" {
tiFlashStores[store.Store.ID] = store
}
}
}
go concurrentHandleTablesCh(ctx, inCh, outCh, errCh, workers, func(c context.Context, tbl *CreatedTable) error {
if tbl.Table != nil && tbl.Table.TiFlashReplica == nil {
log.Info("table has no tiflash replica",
zap.Stringer("table", tbl.OldTable.Info.Name),
zap.Stringer("db", tbl.OldTable.DB.Name))
updateCh.Inc()
return nil
}
if rc.dom != nil {
log.Info("table has tiflash replica, start sync..",
zap.Stringer("table", tbl.OldTable.Info.Name),
zap.Stringer("db", tbl.OldTable.DB.Name))
for {
progress, err := infosync.CalculateTiFlashProgress(tbl.Table.ID, tbl.Table.TiFlashReplica.Count, tiFlashStores)
if err != nil {
log.Warn("failed to get tiflash replica progress, wait for next retry", zap.Error(err))
time.Sleep(time.Second)
continue
}
// check until progress is 1
if progress == 1 {
log.Info("tiflash replica synced",
zap.Stringer("table", tbl.OldTable.Info.Name),
zap.Stringer("db", tbl.OldTable.DB.Name))
break
}
// just wait for next check
// tiflash check the progress every 2s
// we can wait 2.5x times
time.Sleep(5 * time.Second)
}
} else {
// unreachable, current we have initial domain in mgr.
log.Fatal("unreachable, domain is nil")
}
updateCh.Inc()
return nil
}, func() {
log.Info("all tiflash replica synced")
})
return outCh
}

const (
Expand Down Expand Up @@ -1714,15 +1783,31 @@ func (rc *Client) PreCheckTableTiFlashReplica(
return err
}
for _, table := range tables {
if recorder != nil ||
(table.Info.TiFlashReplica != nil && table.Info.TiFlashReplica.Count > tiFlashStoreCount) {
if recorder != nil && table.Info.TiFlashReplica != nil {
if table.Info.TiFlashReplica != nil {
// we should not set available to true. because we cannot guarantee the raft log lag of tiflash when restore finished.
// just let tiflash ticker set it by checking lag of all related regions.
table.Info.TiFlashReplica.Available = false
table.Info.TiFlashReplica.AvailablePartitionIDs = nil
if recorder != nil {
recorder.AddTable(table.Info.ID, *table.Info.TiFlashReplica)
log.Info("record tiflash replica for table, to reset it by ddl later",
zap.Stringer("db", table.DB.Name),
zap.Stringer("table", table.Info.Name),
)
table.Info.TiFlashReplica = nil
} else if table.Info.TiFlashReplica.Count > tiFlashStoreCount {
// we cannot satisfy TiFlash replica in restore cluster. so we should
// set TiFlashReplica to unavailable in tableInfo, to avoid TiDB cannot sense TiFlash and make plan to TiFlash
// see details at https://github.com/pingcap/br/issues/931
// TODO maybe set table.Info.TiFlashReplica.Count to tiFlashStoreCount, but we need more tests about it.
log.Warn("table does not satisfy tiflash replica requirements, set tiflash replcia to unavaiable",
zap.Stringer("db", table.DB.Name),
zap.Stringer("table", table.Info.Name),
zap.Uint64("expect tiflash replica", table.Info.TiFlashReplica.Count),
zap.Uint64("actual tiflash store", tiFlashStoreCount),
)
table.Info.TiFlashReplica = nil
}
// we cannot satisfy TiFlash replica in restore cluster. so we should
// set TiFlashReplica to unavailable in tableInfo, to avoid TiDB cannot sense TiFlash and make plan to TiFlash
// see details at https://github.com/pingcap/br/issues/931
table.Info.TiFlashReplica = nil
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func TestPreCheckTableTiFlashReplicas(t *testing.T) {
}

tables[i] = &metautil.Table{
DB: nil,
DB: &model.DBInfo{},
Info: &model.TableInfo{
ID: int64(i),
Name: model.NewCIStr("test" + strconv.Itoa(i)),
Expand Down
Loading

0 comments on commit 9d2e744

Please sign in to comment.