Skip to content

Commit

Permalink
ddl: fix cleanup subtasks after cancel add index with dist-reorg (pin…
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala authored and ghazalfamilyusa committed Feb 15, 2023
1 parent b290b0b commit d9dc46c
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 8 deletions.
5 changes: 5 additions & 0 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ type BackfillJob struct {
Meta *model.BackfillMeta
}

// PrefixKeyString returns the BackfillJob's prefix key.
func (bj *BackfillJob) PrefixKeyString() string {
return fmt.Sprintf("%d_%s_%d_%%", bj.JobID, hex.EncodeToString(bj.EleKey), bj.EleID)
}

// AbbrStr returns the BackfillJob's info without the Meta info.
func (bj *BackfillJob) AbbrStr() string {
return fmt.Sprintf("ID:%d, JobID:%d, EleID:%d, Type:%s, State:%s, InstanceID:%s, InstanceLease:%s",
Expand Down
28 changes: 22 additions & 6 deletions ddl/dist_owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/metrics"
Expand Down Expand Up @@ -323,7 +324,14 @@ func checkReorgJobFinished(ctx context.Context, sess *session, reorgCtxs *reorgC
ticker := time.NewTicker(CheckBackfillJobFinishInterval)
defer ticker.Stop()
for {
failpoint.Inject("MockCanceledErr", func() {
getReorgCtx(reorgCtxs, ddlJobID).notifyReorgCancel()
})
if getReorgCtx(reorgCtxs, ddlJobID).isReorgCanceled() {
err := cleanupBackfillJobs(sess, fmt.Sprintf("%d_%s_%d_%%", ddlJobID, hex.EncodeToString(currEle.TypeKey), currEle.ID))
if err != nil {
return err
}
// Job is cancelled. So it can't be done.
return dbterror.ErrCancelledDDLJob
}
Expand Down Expand Up @@ -366,6 +374,10 @@ func checkReorgJobFinished(ctx context.Context, sess *session, reorgCtxs *reorgC
}
}
case <-ctx.Done():
err := cleanupBackfillJobs(sess, fmt.Sprintf("%d_%s_%d_%%", ddlJobID, hex.EncodeToString(currEle.TypeKey), currEle.ID))
if err != nil {
return err
}
return ctx.Err()
}
}
Expand Down Expand Up @@ -428,15 +440,20 @@ func checkAndHandleInterruptedBackfillJobs(sess *session, ddlJobID, currEleID in
return nil
}

return cleanupBackfillJobs(sess, bJobs[0].PrefixKeyString())
}

func cleanupBackfillJobs(sess *session, prefixKey string) error {
var err error
for i := 0; i < retrySQLTimes; i++ {
err = MoveBackfillJobsToHistoryTable(sess, bJobs[0])
err = MoveBackfillJobsToHistoryTable(sess, prefixKey)
if err == nil {
return bJobs[0].Meta.Error
return nil
}
logutil.BgLogger().Info("[ddl] MoveBackfillJobsToHistoryTable failed", zap.Error(err))
time.Sleep(RetrySQLInterval)
}
return errors.Trace(err)
return err
}

func checkBackfillJobCount(sess *session, ddlJobID, currEleID int64, currEleKey []byte, pTblID int64) (backfillJobCnt int, err error) {
Expand Down Expand Up @@ -496,16 +513,15 @@ func GetPhysicalTableMetas(sess *session, ddlJobID, currEleID int64, currEleKey
}

// MoveBackfillJobsToHistoryTable moves backfill table jobs to the backfill history table.
func MoveBackfillJobsToHistoryTable(sctx sessionctx.Context, bfJob *BackfillJob) error {
func MoveBackfillJobsToHistoryTable(sctx sessionctx.Context, prefixKey string) error {
s, ok := sctx.(*session)
if !ok {
return errors.Errorf("sess ctx:%#v convert session failed", sctx)
}

return s.runInTxn(func(se *session) error {
// TODO: Consider batch by batch update backfill jobs and insert backfill history jobs.
bJobs, err := GetBackfillJobs(se, BackgroundSubtaskTable, fmt.Sprintf("task_key like \"%d_%s_%d_%%\"",
bfJob.JobID, hex.EncodeToString(bfJob.EleKey), bfJob.EleID), "update_backfill_job")
bJobs, err := GetBackfillJobs(se, BackgroundSubtaskTable, fmt.Sprintf("task_key like '%s'", prefixKey), "update_backfill_job")
if err != nil {
return errors.Trace(err)
}
Expand Down
1 change: 1 addition & 0 deletions ddl/failtest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_test(
"//ddl/testutil",
"//ddl/util",
"//domain",
"//errno",
"//kv",
"//parser/model",
"//session",
Expand Down
38 changes: 38 additions & 0 deletions ddl/failtest/fail_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/ddl/testutil"
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/session"
Expand Down Expand Up @@ -220,6 +221,43 @@ func TestAddIndexFailed(t *testing.T) {
tk.MustExec("admin check table t")
}

func TestAddIndexCanceledInDistReorg(t *testing.T) {
if !variable.DDLEnableDistributeReorg.Load() {
// Non-dist-reorg hasn't this fail-point.
return
}
s := createFailDBSuite(t)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/MockCanceledErr", `1*return`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/MockCanceledErr"))
}()
tk := testkit.NewTestKit(t, s.store)
tk.MustExec("create database if not exists test_add_index_cancel")
defer tk.MustExec("drop database test_add_index_cancel")
tk.MustExec("use test_add_index_cancel")

tk.MustExec("create table t(a bigint PRIMARY KEY, b int)")
for i := 0; i < 1000; i++ {
tk.MustExec(fmt.Sprintf("insert into t values(%v, %v)", i, i))
}

// Get table ID for split.
dom := domain.GetDomain(tk.Session())
is := dom.InfoSchema()
tbl, err := is.TableByName(model.NewCIStr("test_add_index_cancel"), model.NewCIStr("t"))
require.NoError(t, err)
tblID := tbl.Meta().ID

// Split the table.
tableStart := tablecodec.GenTableRecordPrefix(tblID)
s.cluster.SplitKeys(tableStart, tableStart.PrefixNext(), 100)

tk.MustGetErrCode("alter table t add index idx_b(b)", errno.ErrCancelledDDLJob)
tk.MustQuery(fmt.Sprintf("select count(1) from mysql.%s", ddl.BackgroundSubtaskTable)).Check(testkit.Rows("0"))
tk.MustQuery(fmt.Sprintf("select count(1) from mysql.%s", ddl.BackgroundSubtaskHistoryTable)).Check(testkit.Rows("100"))
tk.MustExec("admin check table t")
}

// TestFailSchemaSyncer test when the schema syncer is done,
// should prohibit DML executing until the syncer is restartd by loadSchemaInLoop.
func TestFailSchemaSyncer(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions ddl/job_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) {
allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskTable, getIdxConditionStr(jobID2, eleID3), "test_get_bj")
require.NoError(t, err)
require.Equal(t, allCnt, 2)
err = ddl.MoveBackfillJobsToHistoryTable(se, bJobs3[0])
err = ddl.MoveBackfillJobsToHistoryTable(se, bJobs3[0].PrefixKeyString())
require.NoError(t, err)
allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskTable, getIdxConditionStr(jobID2, eleID3), "test_get_bj")
require.NoError(t, err)
Expand Down Expand Up @@ -690,7 +690,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) {
allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj")
require.NoError(t, err)
require.Equal(t, allCnt, 6)
err = ddl.MoveBackfillJobsToHistoryTable(se, bJobs1[0])
err = ddl.MoveBackfillJobsToHistoryTable(se, bJobs1[0].PrefixKeyString())
require.NoError(t, err)
allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj")
require.NoError(t, err)
Expand Down

0 comments on commit d9dc46c

Please sign in to comment.