From 66cedb17dd9bb89fac4a9d75c1d94ad5227941e1 Mon Sep 17 00:00:00 2001 From: glorv Date: Fri, 6 Aug 2021 15:51:14 +0800 Subject: [PATCH] This is an automated cherry-pick of #1336 Signed-off-by: ti-chi-bot --- pkg/lightning/restore/meta_manager.go | 85 ++++++++++++++++++++++----- pkg/lightning/restore/restore.go | 28 ++++++++- 2 files changed, 95 insertions(+), 18 deletions(-) diff --git a/pkg/lightning/restore/meta_manager.go b/pkg/lightning/restore/meta_manager.go index c051c1ecc..eb766dd0f 100644 --- a/pkg/lightning/restore/meta_manager.go +++ b/pkg/lightning/restore/meta_manager.go @@ -462,7 +462,10 @@ func (m *dbTableMetaMgr) FinishTable(ctx context.Context) error { type taskMetaMgr interface { InitTask(ctx context.Context) error CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error) - CheckAndFinishRestore(ctx context.Context) (bool, error) + // CheckAndFinishRestore check task meta and return whether to switch cluster to normal state and clean up the metadata + // Return values: first boolean indicates whether switch back tidb cluster to normal state (restore schedulers, switch tikv to normal) + // the second boolean indicates whether to clean up the metadata in tidb + CheckAndFinishRestore(ctx context.Context, finished bool) (shouldSwitchBack bool, shouldCleanupMeta bool, err error) Cleanup(ctx context.Context) error CleanupAllMetas(ctx context.Context) error } @@ -485,6 +488,11 @@ const ( taskMetaStatusSwitchBack ) +const ( + taskStateNormal int = iota + taskStateExited +) + func (m taskMetaStatus) String() string { switch m { case taskMetaStatusInitial: @@ -526,8 +534,13 @@ func (m *dbTaskMetaMgr) InitTask(ctx context.Context) error { Logger: log.L(), } // avoid override existing metadata if the meta is already inserted. +<<<<<<< HEAD stmt := fmt.Sprintf(`INSERT IGNORE INTO %s (task_id, status) values (?, ?)`, m.tableName) err := exec.Exec(ctx, "init task meta", stmt, m.taskID, taskMetaStatusInitial.String()) +======= + stmt := fmt.Sprintf(`INSERT INTO %s (task_id, status, source_bytes) values (?, ?, ?) ON DUPLICATE KEY UPDATE state = ?`, m.tableName) + err := exec.Exec(ctx, "init task meta", stmt, m.taskID, taskMetaStatusInitial.String(), source, taskStateNormal) +>>>>>>> 1b0e54c2 (lightning: check and restore pd scheduler even if our task failed (#1336)) return errors.Trace(err) } @@ -551,7 +564,7 @@ func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.U paused := false var pausedCfg storedCfgs err = exec.Transact(ctx, "check and pause schedulers", func(ctx context.Context, tx *sql.Tx) error { - query := fmt.Sprintf("SELECT task_id, pd_cfgs, status from %s FOR UPDATE", m.tableName) + query := fmt.Sprintf("SELECT task_id, pd_cfgs, status, state from %s FOR UPDATE", m.tableName) rows, err := tx.QueryContext(ctx, query) if err != nil { return errors.Annotate(err, "fetch task meta failed") @@ -566,10 +579,11 @@ func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.U taskID int64 cfg string statusValue string + state int ) var cfgStr string for rows.Next() { - if err = rows.Scan(&taskID, &cfg, &statusValue); err != nil { + if err = rows.Scan(&taskID, &cfg, &statusValue, &state); err != nil { return errors.Trace(err) } status, err := parseTaskMetaStatus(statusValue) @@ -643,10 +657,13 @@ func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.U }, nil } -func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context) (bool, error) { +// CheckAndFinishRestore check task meta and return whether to switch cluster to normal state and clean up the metadata +// Return values: first boolean indicates whether switch back tidb cluster to normal state (restore schedulers, switch tikv to normal) +// the second boolean indicates whether to clean up the metadata in tidb +func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context, finished bool) (bool, bool, error) { conn, err := m.session.Conn(ctx) if err != nil { - return false, errors.Trace(err) + return false, false, errors.Trace(err) } defer conn.Close() exec := &common.SQLWithRetry{ @@ -655,12 +672,13 @@ func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context) (bool, error) } err = exec.Exec(ctx, "enable pessimistic transaction", "SET SESSION tidb_txn_mode = 'pessimistic';") if err != nil { - return false, errors.Annotate(err, "enable pessimistic transaction failed") + return false, false, errors.Annotate(err, "enable pessimistic transaction failed") } switchBack := true + allFinished := finished err = exec.Transact(ctx, "check and finish schedulers", func(ctx context.Context, tx *sql.Tx) error { - query := fmt.Sprintf("SELECT task_id, status from %s FOR UPDATE", m.tableName) + query := fmt.Sprintf("SELECT task_id, status, state from %s FOR UPDATE", m.tableName) rows, err := tx.QueryContext(ctx, query) if err != nil { return errors.Annotate(err, "fetch task meta failed") @@ -674,10 +692,12 @@ func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context) (bool, error) var ( taskID int64 statusValue string + state int ) - newStatus := taskMetaStatusSwitchBack + + taskStatus := taskMetaStatusInitial for rows.Next() { - if err = rows.Scan(&taskID, &statusValue); err != nil { + if err = rows.Scan(&taskID, &statusValue, &state); err != nil { return errors.Trace(err) } status, err := parseTaskMetaStatus(statusValue) @@ -686,13 +706,18 @@ func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context) (bool, error) } if taskID == m.taskID { + taskStatus = status continue } if status < taskMetaStatusSwitchSkipped { - newStatus = taskMetaStatusSwitchSkipped - switchBack = false - break + allFinished = false + // check if other task still running + if state == taskStateNormal { + log.L().Info("unfinished task found", zap.Int64("task_id", taskID), + zap.Stringer("status", status)) + switchBack = false + } } } if err = rows.Close(); err != nil { @@ -700,13 +725,28 @@ func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context) (bool, error) } closed = true - query = fmt.Sprintf("update %s set status = ? where task_id = ?", m.tableName) - _, err = tx.ExecContext(ctx, query, newStatus.String(), m.taskID) + if taskStatus < taskMetaStatusSwitchSkipped { + newStatus := taskMetaStatusSwitchBack + newState := taskStateNormal + if !finished { + newStatus = taskStatus + newState = taskStateExited + } else if !allFinished { + newStatus = taskMetaStatusSwitchSkipped + } - return errors.Trace(err) + query = fmt.Sprintf("update %s set status = ?, state = ? where task_id = ?", m.tableName) + if _, err = tx.ExecContext(ctx, query, newStatus.String(), newState, m.taskID); err != nil { + return errors.Trace(err) + } + } + + return nil }) + log.L().Info("check all task finish status", zap.Bool("task_finished", finished), + zap.Bool("all_finished", allFinished), zap.Bool("switch_back", switchBack)) - return switchBack, err + return switchBack, allFinished, err } func (m *dbTaskMetaMgr) Cleanup(ctx context.Context) error { @@ -773,8 +813,21 @@ func (m noopTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil. }, nil } +<<<<<<< HEAD func (m noopTaskMetaMgr) CheckAndFinishRestore(ctx context.Context) (bool, error) { return false, nil +======= +func (m noopTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) { + return false, nil +} + +func (m noopTaskMetaMgr) CheckClusterSource(ctx context.Context) (int64, error) { + return 0, nil +} + +func (m noopTaskMetaMgr) CheckAndFinishRestore(context.Context, bool) (bool, bool, error) { + return false, true, nil +>>>>>>> 1b0e54c2 (lightning: check and restore pd scheduler even if our task failed (#1336)) } func (m noopTaskMetaMgr) Cleanup(ctx context.Context) error { diff --git a/pkg/lightning/restore/restore.go b/pkg/lightning/restore/restore.go index c897613a1..c76c35c9a 100644 --- a/pkg/lightning/restore/restore.go +++ b/pkg/lightning/restore/restore.go @@ -105,6 +105,11 @@ const ( task_id BIGINT(20) UNSIGNED NOT NULL, pd_cfgs VARCHAR(2048) NOT NULL DEFAULT '', status VARCHAR(32) NOT NULL, +<<<<<<< HEAD +======= + state TINYINT(1) NOT NULL DEFAULT 0 COMMENT '0: normal, 1: exited before finish', + source_bytes BIGINT(20) UNSIGNED NOT NULL DEFAULT 0, +>>>>>>> 1b0e54c2 (lightning: check and restore pd scheduler even if our task failed (#1336)) PRIMARY KEY (task_id) );` @@ -1163,6 +1168,7 @@ func (rc *Controller) restoreTables(ctx context.Context) error { // we do not do switch back automatically cleanupFunc := func() {} switchBack := false + taskFinished := false if rc.cfg.TikvImporter.Backend == config.BackendLocal { // disable some pd schedulers pdController, err := pdutil.NewPdController(ctx, rc.cfg.TiDB.PdAddr, @@ -1183,7 +1189,11 @@ func (rc *Controller) restoreTables(ctx context.Context) error { if restoreFn != nil { // use context.Background to make sure this restore function can still be executed even if ctx is canceled restoreCtx := context.Background() +<<<<<<< HEAD needSwitchBack, err := mgr.CheckAndFinishRestore(restoreCtx) +======= + needSwitchBack, needCleanup, err := rc.taskMgr.CheckAndFinishRestore(restoreCtx, taskFinished) +>>>>>>> 1b0e54c2 (lightning: check and restore pd scheduler even if our task failed (#1336)) if err != nil { logTask.Warn("check restore pd schedulers failed", zap.Error(err)) return @@ -1193,7 +1203,10 @@ func (rc *Controller) restoreTables(ctx context.Context) error { if restoreE := restoreFn(restoreCtx); restoreE != nil { logTask.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) } + + logTask.Info("add back PD leader®ion schedulers") // clean up task metas +<<<<<<< HEAD if cleanupErr := mgr.Cleanup(restoreCtx); cleanupErr != nil { logTask.Warn("failed to clean task metas, you may need to restore them manually", zap.Error(cleanupErr)) } @@ -1201,11 +1214,21 @@ func (rc *Controller) restoreTables(ctx context.Context) error { cleanupFunc = func() { if e := mgr.CleanupAllMetas(restoreCtx); err != nil { logTask.Warn("failed to clean table task metas, you may need to restore them manually", zap.Error(e)) +======= + if needCleanup { + logTask.Info("cleanup task metas") + if cleanupErr := rc.taskMgr.Cleanup(restoreCtx); cleanupErr != nil { + logTask.Warn("failed to clean task metas, you may need to restore them manually", zap.Error(cleanupErr)) + } + // cleanup table meta and schema db if needed. + cleanupFunc = func() { + if e := rc.taskMgr.CleanupAllMetas(restoreCtx); err != nil { + logTask.Warn("failed to clean table task metas, you may need to restore them manually", zap.Error(e)) + } +>>>>>>> 1b0e54c2 (lightning: check and restore pd scheduler even if our task failed (#1336)) } } } - - logTask.Info("add back PD leader®ion schedulers") } pdController.Close() @@ -1403,6 +1426,7 @@ func (rc *Controller) restoreTables(ctx context.Context) error { // finishSchedulers() // cancelFunc(switchBack) // finishFuncCalled = true + taskFinished = true close(postProcessTaskChan) // otherwise, we should run all tasks in the post-process task chan