Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importinto/lightning: do remote checksum via sql #44803

Merged
merged 27 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions br/pkg/lightning/backend/local/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ const (
var (
serviceSafePointTTL int64 = 10 * 60 // 10 min in seconds

minDistSQLScanConcurrency = 4
// MinDistSQLScanConcurrency is the minimum value of tidb_distsql_scan_concurrency.
MinDistSQLScanConcurrency = 4
)

// RemoteChecksum represents a checksum result got from tidb.
Expand Down Expand Up @@ -111,7 +112,11 @@ func (e *tidbChecksumExecutor) Checksum(ctx context.Context, tableInfo *checkpoi
// +---------+------------+---------------------+-----------+-------------+

cs := RemoteChecksum{}
err = common.SQLWithRetry{DB: e.db, Logger: task.Logger}.QueryRow(ctx, "compute remote checksum",
exec := common.SQLWithRetry{DB: e.db, Logger: task.Logger}
if err := exec.Exec(ctx, "increase tidb_backoff_weight", "SET SESSION tidb_backoff_weight = '6';"); err != nil {
return nil, errors.Trace(err)
}
err = exec.QueryRow(ctx, "compute remote checksum",
"ADMIN CHECKSUM TABLE "+tableName, &cs.Schema, &cs.Table, &cs.Checksum, &cs.TotalKVs, &cs.TotalBytes,
)
dur := task.End(zap.ErrorLevel, err)
Expand Down Expand Up @@ -286,8 +291,8 @@ func (e *TiKVChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpo
if !common.IsRetryableError(err) {
break
}
if distSQLScanConcurrency > minDistSQLScanConcurrency {
distSQLScanConcurrency = mathutil.Max(distSQLScanConcurrency/2, minDistSQLScanConcurrency)
if distSQLScanConcurrency > MinDistSQLScanConcurrency {
distSQLScanConcurrency = mathutil.Max(distSQLScanConcurrency/2, MinDistSQLScanConcurrency)
}
}

Expand Down
5 changes: 4 additions & 1 deletion br/pkg/lightning/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -1534,7 +1534,10 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
if err != nil {
return errors.Trace(err)
}
manager, err := NewChecksumManager(ctx, rc, kvStore)
var manager local.ChecksumManager
if rc.cfg.TikvImporter.Backend == config.BackendLocal && rc.cfg.PostRestore.Checksum != config.OpLevelOff {
manager = local.NewTiDBChecksumExecutor(rc.db)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about add a hidden switch for checksum via sql or tikv ?

}
if err != nil {
return errors.Trace(err)
}
Expand Down
20 changes: 10 additions & 10 deletions disttask/framework/storage/task_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ func SetTaskManager(is *TaskManager) {
taskManagerInstance.Store(is)
}

// execSQL executes the sql and returns the result.
// ExecSQL executes the sql and returns the result.
// TODO: consider retry.
func execSQL(ctx context.Context, se sessionctx.Context, sql string, args ...interface{}) ([]chunk.Row, error) {
func ExecSQL(ctx context.Context, se sessionctx.Context, sql string, args ...interface{}) ([]chunk.Row, error) {
rs, err := se.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql, args...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -125,7 +125,7 @@ func (stm *TaskManager) WithNewSession(fn func(se sessionctx.Context) error) err
// WithNewTxn executes the fn in a new transaction.
func (stm *TaskManager) WithNewTxn(fn func(se sessionctx.Context) error) error {
return stm.WithNewSession(func(se sessionctx.Context) (err error) {
_, err = execSQL(stm.ctx, se, "begin")
_, err = ExecSQL(stm.ctx, se, "begin")
if err != nil {
return err
}
Expand All @@ -136,7 +136,7 @@ func (stm *TaskManager) WithNewTxn(fn func(se sessionctx.Context) error) error {
if success {
sql = "commit"
}
_, commitErr := execSQL(stm.ctx, se, sql)
_, commitErr := ExecSQL(stm.ctx, se, sql)
if err == nil && commitErr != nil {
err = commitErr
}
Expand All @@ -153,7 +153,7 @@ func (stm *TaskManager) WithNewTxn(fn func(se sessionctx.Context) error) error {

func (stm *TaskManager) executeSQLWithNewSession(ctx context.Context, sql string, args ...interface{}) (rs []chunk.Row, err error) {
err = stm.WithNewSession(func(se sessionctx.Context) error {
rs, err = execSQL(ctx, se, sql, args...)
rs, err = ExecSQL(ctx, se, sql, args...)
return err
})

Expand All @@ -176,15 +176,15 @@ func (stm *TaskManager) AddNewGlobalTask(key, tp string, concurrency int, meta [

// AddGlobalTaskWithSession adds a new task to global task table with session.
func (stm *TaskManager) AddGlobalTaskWithSession(se sessionctx.Context, key, tp string, concurrency int, meta []byte) (taskID int64, err error) {
_, err = execSQL(stm.ctx, se,
_, err = ExecSQL(stm.ctx, se,
`insert into mysql.tidb_global_task(task_key, type, state, concurrency, step, meta, state_update_time)
values (%?, %?, %?, %?, %?, %?, %?)`,
key, tp, proto.TaskStatePending, concurrency, proto.StepInit, meta, time.Now().UTC().String())
if err != nil {
return 0, err
}

rs, err := execSQL(stm.ctx, se, "select @@last_insert_id")
rs, err := ExecSQL(stm.ctx, se, "select @@last_insert_id")
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -406,7 +406,7 @@ func (stm *TaskManager) GetSchedulerIDsByTaskID(taskID int64) ([]string, error)
// UpdateGlobalTaskAndAddSubTasks update the global task and add new subtasks
func (stm *TaskManager) UpdateGlobalTaskAndAddSubTasks(gTask *proto.Task, subtasks []*proto.Subtask, isSubtaskRevert bool) error {
return stm.WithNewTxn(func(se sessionctx.Context) error {
_, err := execSQL(stm.ctx, se, "update mysql.tidb_global_task set state = %?, dispatcher_id = %?, step = %?, state_update_time = %?, concurrency = %?, meta = %?, error = %? where id = %?",
_, err := ExecSQL(stm.ctx, se, "update mysql.tidb_global_task set state = %?, dispatcher_id = %?, step = %?, state_update_time = %?, concurrency = %?, meta = %?, error = %? where id = %?",
gTask.State, gTask.DispatcherID, gTask.Step, gTask.StateUpdateTime.UTC().String(), gTask.Concurrency, gTask.Meta, gTask.Error, gTask.ID)
if err != nil {
return err
Expand All @@ -425,7 +425,7 @@ func (stm *TaskManager) UpdateGlobalTaskAndAddSubTasks(gTask *proto.Task, subtas

for _, subtask := range subtasks {
// TODO: insert subtasks in batch
_, err = execSQL(stm.ctx, se, "insert into mysql.tidb_background_subtask(step, task_key, exec_id, meta, state, type, checkpoint) values (%?, %?, %?, %?, %?, %?, %?)",
_, err = ExecSQL(stm.ctx, se, "insert into mysql.tidb_background_subtask(step, task_key, exec_id, meta, state, type, checkpoint) values (%?, %?, %?, %?, %?, %?, %?)",
gTask.Step, gTask.ID, subtask.SchedulerID, subtask.Meta, subtaskState, proto.Type2Int(subtask.Type), []byte{})
if err != nil {
return err
Expand All @@ -446,7 +446,7 @@ func (stm *TaskManager) CancelGlobalTask(taskID int64) error {

// CancelGlobalTaskByKeySession cancels global task by key using input session
func (stm *TaskManager) CancelGlobalTaskByKeySession(se sessionctx.Context, taskKey string) error {
_, err := execSQL(stm.ctx, se, "update mysql.tidb_global_task set state=%? where task_key=%? and state in (%?, %?)",
_, err := ExecSQL(stm.ctx, se, "update mysql.tidb_global_task set state=%? where task_key=%? and state in (%?, %?)",
proto.TaskStateCancelling, taskKey, proto.TaskStatePending, proto.TaskStateRunning)
return err
}
Expand Down
5 changes: 5 additions & 0 deletions disttask/importinto/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
deps = [
"//br/pkg/lightning/backend",
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/backend/local",
"//br/pkg/lightning/checkpoints",
"//br/pkg/lightning/common",
"//br/pkg/lightning/config",
Expand All @@ -33,15 +34,19 @@ go_library(
"//parser/ast",
"//parser/mysql",
"//sessionctx",
"//sessionctx/variable",
"//table/tables",
"//util/chunk",
"//util/dbterror/exeerrors",
"//util/etcd",
"//util/logutil",
"//util/mathutil",
"//util/sqlexec",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_tikv_client_go_v2//kv",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
Expand Down
37 changes: 0 additions & 37 deletions disttask/importinto/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,43 +365,6 @@ func preProcess(_ context.Context, _ dispatcher.TaskHandle, gTask *proto.Task, t
return updateMeta(gTask, taskMeta)
}

// postProcess does the post-processing for the task.
func postProcess(ctx context.Context, taskMeta *TaskMeta, subtaskMeta *PostProcessStepMeta, logger *zap.Logger) (err error) {
failpoint.Inject("syncBeforePostProcess", func() {
TestSyncChan <- struct{}{}
<-TestSyncChan
})
// TODO: create table indexes depends on the option.
// globalTaskManager, err := storage.GetTaskManager()
// if err != nil {
// return err
// }
// create table indexes even if the post process is failed.
// defer func() {
// err2 := createTableIndexes(ctx, globalTaskManager, taskMeta, logger)
// err = multierr.Append(err, err2)
// }()

controller, err := buildController(taskMeta)
if err != nil {
return err
}
// no need and should not call controller.InitDataFiles, files might not exist on this instance.

logger.Info("post process")

return verifyChecksum(ctx, controller, subtaskMeta.Checksum, logger)
}

func verifyChecksum(ctx context.Context, controller *importer.LoadDataController, checksum Checksum, logger *zap.Logger) error {
if controller.Checksum == config.OpLevelOff {
return nil
}
localChecksum := verify.MakeKVChecksum(checksum.Size, checksum.KVs, checksum.Sum)
logger.Info("local checksum", zap.Object("checksum", &localChecksum))
return controller.VerifyChecksum(ctx, localChecksum)
}

// nolint:deadcode
func dropTableIndexes(ctx context.Context, handle dispatcher.TaskHandle, taskMeta *TaskMeta, logger *zap.Logger) error {
tblInfo := taskMeta.Plan.TableInfo
Expand Down
120 changes: 120 additions & 0 deletions disttask/importinto/subtask_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,25 @@ package importinto

import (
"context"
"strconv"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
verify "github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/disttask/framework/scheduler"
"github.com/pingcap/tidb/disttask/framework/storage"
"github.com/pingcap/tidb/executor/importer"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
tikvstore "github.com/tikv/client-go/v2/kv"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -73,6 +84,115 @@ func (e *postProcessMinimalTaskExecutor) Run(ctx context.Context) error {
return postProcess(ctx, mTask.taskMeta, &mTask.meta, mTask.logger)
}

// postProcess does the post-processing for the task.
func postProcess(ctx context.Context, taskMeta *TaskMeta, subtaskMeta *PostProcessStepMeta, logger *zap.Logger) (err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add more unit tests for this function or checksum via sql

failpoint.Inject("syncBeforePostProcess", func() {
TestSyncChan <- struct{}{}
<-TestSyncChan
})

logger.Info("post process")

// TODO: create table indexes depends on the option.
// create table indexes even if the post process is failed.
// defer func() {
// err2 := createTableIndexes(ctx, globalTaskManager, taskMeta, logger)
// err = multierr.Append(err, err2)
// }()

return verifyChecksum(ctx, taskMeta, subtaskMeta, logger)
}

func verifyChecksum(ctx context.Context, taskMeta *TaskMeta, subtaskMeta *PostProcessStepMeta, logger *zap.Logger) error {
if taskMeta.Plan.Checksum == config.OpLevelOff {
return nil
}
localChecksum := verify.MakeKVChecksum(subtaskMeta.Checksum.Size, subtaskMeta.Checksum.KVs, subtaskMeta.Checksum.Sum)
logger.Info("local checksum", zap.Object("checksum", &localChecksum))

failpoint.Inject("waitCtxDone", func() {
<-ctx.Done()
})

globalTaskManager, err := storage.GetTaskManager()
if err != nil {
return err
}
remoteChecksum, err := checksumTable(ctx, globalTaskManager, taskMeta, logger)
if err != nil {
return err
}
if !remoteChecksum.IsEqual(&localChecksum) {
err2 := common.ErrChecksumMismatch.GenWithStackByArgs(
remoteChecksum.Checksum, localChecksum.Sum(),
remoteChecksum.TotalKVs, localChecksum.SumKVS(),
remoteChecksum.TotalBytes, localChecksum.SumSize(),
)
if taskMeta.Plan.Checksum == config.OpLevelOptional {
logger.Warn("verify checksum failed, but checksum is optional, will skip it", zap.Error(err2))
err2 = nil
}
return err2
}
logger.Info("checksum pass", zap.Object("local", &localChecksum))
return nil
}

func checksumTable(ctx context.Context, executor storage.SessionExecutor, taskMeta *TaskMeta, logger *zap.Logger) (*local.RemoteChecksum, error) {
var (
tableName = common.UniqueTable(taskMeta.Plan.DBName, taskMeta.Plan.TableInfo.Name.L)
sql = "ADMIN CHECKSUM TABLE " + tableName
remoteChecksum *local.RemoteChecksum
maxErrorRetryCount = 3
distSQLScanConcurrency int
rs []chunk.Row
execErr error
)

err := executor.WithNewSession(func(se sessionctx.Context) error {
if err := se.GetSessionVars().SetSystemVar(variable.TiDBBackOffWeight, strconv.Itoa(3*tikvstore.DefBackOffWeight)); err != nil {
return err
}
distSQLScanConcurrency = se.GetSessionVars().DistSQLScanConcurrency()

for i := 0; i < maxErrorRetryCount; i++ {
rs, execErr = storage.ExecSQL(ctx, se, sql)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put in explicit txn?

if execErr == nil {
if len(rs) < 1 {
return errors.New("empty checksum result")
}
// ADMIN CHECKSUM TABLE <schema>.<table> example.
// mysql> admin checksum table test.t;
// +---------+------------+---------------------+-----------+-------------+
// | Db_name | Table_name | Checksum_crc64_xor | Total_kvs | Total_bytes |
// +---------+------------+---------------------+-----------+-------------+
// | test | t | 8520875019404689597 | 7296873 | 357601387 |
// +---------+------------+-------------
remoteChecksum = &local.RemoteChecksum{
Schema: rs[0].GetString(0),
Table: rs[0].GetString(1),
Checksum: rs[0].GetUint64(2),
TotalKVs: rs[0].GetUint64(3),
TotalBytes: rs[0].GetUint64(4),
}
return nil
}
if !common.IsRetryableError(execErr) {
return execErr
}

logger.Warn("remote checksum failed", zap.String("sql", sql), zap.Error(execErr),
zap.Int("concurrency", distSQLScanConcurrency), zap.Int("retry", i))
if distSQLScanConcurrency > local.MinDistSQLScanConcurrency {
distSQLScanConcurrency = mathutil.Max(distSQLScanConcurrency/2, local.MinDistSQLScanConcurrency)
se.GetSessionVars().SetDistSQLScanConcurrency(distSQLScanConcurrency)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert it back after execute? the session taken from resource pool

}
}
return execErr
})
return remoteChecksum, err
}

func init() {
scheduler.RegisterSubtaskExectorConstructor(proto.ImportInto, StepImport,
// The order of the subtask executors is the same as the order of the subtasks.
Expand Down
1 change: 0 additions & 1 deletion executor/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ go_library(
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd_client//:client",
"@org_golang_x_exp//slices",
"@org_golang_x_sync//errgroup",
"@org_uber_go_multierr//:multierr",
Expand Down
Loading