Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: add back table empty check and add a switch config (#30887) #30926

Merged
merged 19 commits into from
Feb 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
cc839fe
cherry pick #30887 to release-5.3
glorv Dec 22, 2021
6b9679f
cherrypick #31798
glorv Feb 21, 2022
32f6287
Merge branch 'release-5.3' into release-5.3-393415782452
ti-chi-bot Feb 21, 2022
e766e2c
Merge branch 'release-5.3' into release-5.3-393415782452
ti-chi-bot Feb 21, 2022
a236f7b
fix
glorv Feb 21, 2022
26337bb
Merge branch 'release-5.3' into release-5.3-393415782452
glorv Feb 22, 2022
e0343d5
Merge branch 'release-5.3' into release-5.3-393415782452
glorv Feb 22, 2022
38c03aa
Merge branch 'release-5.3' into release-5.3-393415782452
ti-chi-bot Feb 22, 2022
6d01bc8
Merge branch 'release-5.3' into release-5.3-393415782452
glorv Feb 22, 2022
68bb5c0
Merge branch 'release-5.3' into release-5.3-393415782452
glorv Feb 22, 2022
39689e6
Merge branch 'release-5.3' into release-5.3-393415782452
ti-chi-bot Feb 22, 2022
5d540fa
Merge branch 'release-5.3' into release-5.3-393415782452
glorv Feb 22, 2022
9e89999
Merge branch 'release-5.3' into release-5.3-393415782452
glorv Feb 22, 2022
31db680
Merge branch 'release-5.3' into release-5.3-393415782452
ti-chi-bot Feb 22, 2022
e797f74
Merge branch 'release-5.3' into release-5.3-393415782452
glorv Feb 22, 2022
4edee00
Merge branch 'release-5.3' into release-5.3-393415782452
ti-chi-bot Feb 22, 2022
ab95bda
Merge branch 'release-5.3' into release-5.3-393415782452
ti-chi-bot Feb 22, 2022
0138948
Merge branch 'release-5.3' into release-5.3-393415782452
ti-chi-bot Feb 22, 2022
599d321
Merge branch 'release-5.3' into release-5.3-393415782452
ti-chi-bot Feb 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ type TikvImporter struct {
DiskQuota ByteSize `toml:"disk-quota" json:"disk-quota"`
RangeConcurrency int `toml:"range-concurrency" json:"range-concurrency"`
DuplicateResolution DuplicateResolutionAlgorithm `toml:"duplicate-resolution" json:"duplicate-resolution"`
IncrementalImport bool `toml:"incremental-import" json:"incremental-import"`

EngineMemCacheSize ByteSize `toml:"engine-mem-cache-size" json:"engine-mem-cache-size"`
LocalWriterMemCacheSize ByteSize `toml:"local-writer-mem-cache-size" json:"local-writer-mem-cache-size"`
Expand Down
100 changes: 97 additions & 3 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,23 @@ package restore
import (
"bytes"
"context"
"database/sql"
"fmt"
"io"
"path/filepath"
"reflect"
"sort"
"strconv"
"strings"
"sync"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"modernc.org/mathutil"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
Expand All @@ -38,15 +44,13 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/table/tables"
"github.com/tikv/pd/server/api"
pdconfig "github.com/tikv/pd/server/config"

"go.uber.org/zap"
"modernc.org/mathutil"
)

const (
Expand Down Expand Up @@ -865,3 +869,93 @@ outloop:
log.L().Info("Sample source data", zap.String("table", tableMeta.Name), zap.Float64("IndexRatio", tableMeta.IndexRatio), zap.Bool("IsSourceOrder", tableMeta.IsRowOrdered))
return nil
}

func (rc *Controller) checkTableEmpty(ctx context.Context) error {
if rc.cfg.TikvImporter.Backend == config.BackendTiDB || rc.cfg.TikvImporter.IncrementalImport {
return nil
}
db, _ := rc.tidbGlue.GetDB()

tableCount := 0
for _, db := range rc.dbMetas {
tableCount += len(db.Tables)
}

var lock sync.Mutex
tableNames := make([]string, 0)
concurrency := utils.MinInt(tableCount, rc.cfg.App.RegionConcurrency)
ch := make(chan string, concurrency)
eg, gCtx := errgroup.WithContext(ctx)
for i := 0; i < concurrency; i++ {
eg.Go(func() error {
for tblName := range ch {
// skip tables that have checkpoint
if rc.cfg.Checkpoint.Enable {
_, err := rc.checkpointsDB.Get(gCtx, tblName)
switch {
case err == nil:
continue
case errors.IsNotFound(err):
default:
return errors.Trace(err)
}
}

hasData, err1 := tableContainsData(gCtx, db, tblName)
if err1 != nil {
return err1
}
if hasData {
lock.Lock()
tableNames = append(tableNames, tblName)
lock.Unlock()
}
}
return nil
})
}
loop:
for _, db := range rc.dbMetas {
for _, tbl := range db.Tables {
select {
case ch <- common.UniqueTable(tbl.DB, tbl.Name):
case <-gCtx.Done():
break loop
}
}
}
close(ch)
if err := eg.Wait(); err != nil {
if common.IsContextCanceledError(err) {
return nil
}
return errors.Annotate(err, "check table contains data failed")
}

if len(tableNames) > 0 {
// sort the failed names
sort.Strings(tableNames)
msg := fmt.Sprintf("table(s) [%s] are not empty", strings.Join(tableNames, ", "))
rc.checkTemplate.Collect(Critical, false, msg)
}
return nil
}

func tableContainsData(ctx context.Context, db utils.DBExecutor, tableName string) (bool, error) {
exec := common.SQLWithRetry{
DB: db,
Logger: log.L(),
}
query := "select 1 from " + tableName + " limit 1"
var dump int
err := exec.QueryRow(ctx, "check table empty", query, &dump)

switch {
case errors.ErrorEqual(err, sql.ErrNoRows):
return false, nil
case err != nil:
return false, errors.Trace(err)
default:
return true, nil
}
}
143 changes: 143 additions & 0 deletions br/pkg/lightning/restore/check_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,18 @@ package restore

import (
"context"
"database/sql"
"path/filepath"

"github.com/DATA-DOG/go-sqlmock"
. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/parser/mysql"

"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/glue"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/lightning/worker"
"github.com/pingcap/tidb/br/pkg/storage"
)
Expand All @@ -29,6 +36,142 @@ var _ = Suite(&checkInfoSuite{})

type checkInfoSuite struct{}

func (s *checkInfoSuite) TestCheckTableEmpty(c *C) {
dir := c.MkDir()
cfg := config.NewConfig()
cfg.Checkpoint.Enable = false
dbMetas := []*mydump.MDDatabaseMeta{
{
Name: "test1",
Tables: []*mydump.MDTableMeta{
{
DB: "test1",
Name: "tbl1",
},
{
DB: "test1",
Name: "tbl2",
},
},
},
{
Name: "test2",
Tables: []*mydump.MDTableMeta{
{
DB: "test2",
Name: "tbl1",
},
},
},
}

rc := &Controller{
cfg: cfg,
dbMetas: dbMetas,
checkpointsDB: checkpoints.NewNullCheckpointsDB(),
}

ctx := context.Background()

// test tidb will do nothing
rc.cfg.TikvImporter.Backend = config.BackendTiDB
err := rc.checkTableEmpty(ctx)
c.Assert(err, IsNil)

// test incremental mode
rc.cfg.TikvImporter.Backend = config.BackendLocal
rc.cfg.TikvImporter.IncrementalImport = true
err = rc.checkTableEmpty(ctx)
c.Assert(err, IsNil)

rc.cfg.TikvImporter.IncrementalImport = false
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
mock.MatchExpectationsInOrder(false)
rc.tidbGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone)
mock.ExpectQuery("select 1 from `test1`.`tbl1` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("select 1 from `test1`.`tbl2` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("select 1 from `test2`.`tbl1` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
// not error, need not to init check template
err = rc.checkTableEmpty(ctx)
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)

// single table contains data
db, mock, err = sqlmock.New()
c.Assert(err, IsNil)
rc.tidbGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone)
mock.MatchExpectationsInOrder(false)
mock.ExpectQuery("select 1 from `test1`.`tbl1` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("select 1 from `test1`.`tbl2` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("select 1 from `test2`.`tbl1` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).AddRow(1))
rc.checkTemplate = NewSimpleTemplate()
err = rc.checkTableEmpty(ctx)
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
tmpl := rc.checkTemplate.(*SimpleTemplate)
c.Assert(len(tmpl.criticalMsgs), Equals, 1)
c.Assert(tmpl.criticalMsgs[0], Matches, "table\\(s\\) \\[`test2`.`tbl1`\\] are not empty")

// multi tables contains data
db, mock, err = sqlmock.New()
c.Assert(err, IsNil)
rc.tidbGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone)
mock.MatchExpectationsInOrder(false)
mock.ExpectQuery("select 1 from `test1`.`tbl1` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).AddRow(1))
mock.ExpectQuery("select 1 from `test1`.`tbl2` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("select 1 from `test2`.`tbl1` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).AddRow(1))
rc.checkTemplate = NewSimpleTemplate()
err = rc.checkTableEmpty(ctx)
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
tmpl = rc.checkTemplate.(*SimpleTemplate)
c.Assert(len(tmpl.criticalMsgs), Equals, 1)
c.Assert(tmpl.criticalMsgs[0], Matches, "table\\(s\\) \\[`test1`.`tbl1`, `test2`.`tbl1`\\] are not empty")

// init checkpoint with only two of the three tables
dbInfos := map[string]*checkpoints.TidbDBInfo{
"test1": {
Name: "test1",
Tables: map[string]*checkpoints.TidbTableInfo{
"tbl1": {
Name: "tbl1",
},
},
},
"test2": {
Name: "test2",
Tables: map[string]*checkpoints.TidbTableInfo{
"tbl1": {
Name: "tbl1",
},
},
},
}
rc.cfg.Checkpoint.Enable = true
rc.checkpointsDB = checkpoints.NewFileCheckpointsDB(filepath.Join(dir, "cp.pb"))
err = rc.checkpointsDB.Initialize(ctx, cfg, dbInfos)
c.Check(err, IsNil)
db, mock, err = sqlmock.New()
c.Assert(err, IsNil)
rc.tidbGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone)
// only need to check the one that is not in checkpoint
mock.ExpectQuery("select 1 from `test1`.`tbl2` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
err = rc.checkTableEmpty(ctx)
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
}

func (s *checkInfoSuite) TestLocalResource(c *C) {
dir := c.MkDir()
mockStore, err := storage.NewLocalStorage(dir)
Expand Down
58 changes: 57 additions & 1 deletion br/pkg/lightning/restore/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1027,9 +1027,65 @@ func (m noopTableMetaMgr) UpdateTableBaseChecksum(ctx context.Context, checksum
}

func (m noopTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checksum *verify.KVChecksum, hasLocalDupes bool) (bool, bool, *verify.KVChecksum, error) {
return false, false, nil, nil
return true, true, &verify.KVChecksum{}, nil
}

func (m noopTableMetaMgr) FinishTable(ctx context.Context) error {
return nil
}

type singleMgrBuilder struct{}

func (b singleMgrBuilder) Init(context.Context) error {
return nil
}

func (b singleMgrBuilder) TaskMetaMgr(pd *pdutil.PdController) taskMetaMgr {
return &singleTaskMetaMgr{
pd: pd,
}
}

func (b singleMgrBuilder) TableMetaMgr(tr *TableRestore) tableMetaMgr {
return noopTableMetaMgr{}
}

type singleTaskMetaMgr struct {
pd *pdutil.PdController
}

func (m *singleTaskMetaMgr) InitTask(ctx context.Context, source int64) error {
return nil
}

func (m *singleTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error {
_, err := action(nil)
return err
}

func (m *singleTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error) {
return m.pd.RemoveSchedulers(ctx)
}

func (m *singleTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) {
return true, nil
}

func (m *singleTaskMetaMgr) CheckAndFinishRestore(context.Context, bool) (shouldSwitchBack bool, shouldCleanupMeta bool, err error) {
return true, true, nil
}

func (m *singleTaskMetaMgr) Cleanup(ctx context.Context) error {
return nil
}

func (m *singleTaskMetaMgr) CleanupTask(ctx context.Context) error {
return nil
}

func (m *singleTaskMetaMgr) CleanupAllMetas(ctx context.Context) error {
return nil
}

func (m *singleTaskMetaMgr) Close() {
}
Loading