From 6ed2cbf9d554ec22f5c79cc6a1bdd8a9397de306 Mon Sep 17 00:00:00 2001 From: AilinKid <314806019@qq.com> Date: Fri, 14 May 2021 18:00:44 +0800 Subject: [PATCH 1/2] . Signed-off-by: AilinKid <314806019@qq.com> --- ddl/column.go | 7 +++---- ddl/index.go | 4 ++-- ddl/partition.go | 2 +- ddl/reorg.go | 36 ++++++++++++++++++++++++++++++++---- ddl/reorg_test.go | 9 ++++----- 5 files changed, 42 insertions(+), 16 deletions(-) diff --git a/ddl/column.go b/ddl/column.go index 18c23b4d9c45a..a7f159f97d948 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -1012,10 +1012,10 @@ func (w *worker) doModifyColumnTypeWithData( // enable: curl -X PUT -d "pause" "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/ddl/mockDelayInModifyColumnTypeWithData". // disable: curl -X DELETE "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/ddl/mockDelayInModifyColumnTypeWithData" failpoint.Inject("mockDelayInModifyColumnTypeWithData", func() {}) - err = w.runReorgJob(t, reorgInfo, tbl.Meta(), d.lease, func() (addIndexErr error) { + err = w.runReorgJob(reorgInfo, tbl.Meta(), d.lease, func() (reorgErr error) { defer util.Recover(metrics.LabelDDL, "onModifyColumn", func() { - addIndexErr = errCancelledDDLJob.GenWithStack("modify table `%v` column `%v` panic", tblInfo.Name, oldCol.Name) + reorgErr = errCancelledDDLJob.GenWithStack("modify table `%v` column `%v` panic", tblInfo.Name, oldCol.Name) }, false) return w.updateColumnAndIndexes(tbl, oldCol, changingCol, changingIdxs, reorgInfo) }) @@ -1025,10 +1025,9 @@ func (w *worker) doModifyColumnTypeWithData( return ver, nil } if needRollbackData(err) { - if err1 := t.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil { + if err1 := reorgInfo.CleanReorgMeta(); err1 != nil { logutil.BgLogger().Warn("[ddl] run modify column job failed, RemoveDDLReorgHandle failed, can't convert job to rollback", zap.String("job", job.String()), zap.Error(err1)) - return ver, errors.Trace(err) } logutil.BgLogger().Warn("[ddl] run modify column job failed, convert job to rollback", zap.String("job", job.String()), zap.Error(err)) // When encounter these error above, we change the job to rolling back job directly. diff --git a/ddl/index.go b/ddl/index.go index f11a595aa8fb3..66f676c05dd9f 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -547,7 +547,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo return ver, errors.Trace(err) } - err = w.runReorgJob(t, reorgInfo, tbl.Meta(), d.lease, func() (addIndexErr error) { + err = w.runReorgJob(reorgInfo, tbl.Meta(), d.lease, func() (addIndexErr error) { defer util.Recover(metrics.LabelDDL, "onCreateIndex", func() { addIndexErr = errCancelledDDLJob.GenWithStack("add table `%v` index `%v` panic", tblInfo.Name, indexInfo.Name) @@ -562,7 +562,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo if kv.ErrKeyExists.Equal(err) || errCancelledDDLJob.Equal(err) || errCantDecodeRecord.Equal(err) { logutil.BgLogger().Warn("[ddl] run add index job failed, convert job to rollback", zap.String("job", job.String()), zap.Error(err)) ver, err = convertAddIdxJob2RollbackJob(t, job, tblInfo, indexInfo, err) - if err1 := t.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil { + if err1 := reorgInfo.CleanReorgMeta(); err1 != nil { logutil.BgLogger().Warn("[ddl] run add index job failed, convert job to rollback, RemoveDDLReorgHandle failed", zap.String("job", job.String()), zap.Error(err1)) } } diff --git a/ddl/partition.go b/ddl/partition.go index 4cc71eb1c8d74..6a3e8e418ddc6 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1006,7 +1006,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( // and then run the reorg next time. return ver, errors.Trace(err) } - err = w.runReorgJob(t, reorgInfo, tbl.Meta(), d.lease, func() (dropIndexErr error) { + err = w.runReorgJob(reorgInfo, tbl.Meta(), d.lease, func() (dropIndexErr error) { defer tidbutil.Recover(metrics.LabelDDL, "onDropTablePartition", func() { dropIndexErr = errCancelledDDLJob.GenWithStack("drop partition panic") diff --git a/ddl/reorg.go b/ddl/reorg.go index fbe42573dbbf7..6292ad358e2dd 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -156,7 +156,24 @@ func (rc *reorgCtx) clean() { rc.doneCh = nil } -func (w *worker) runReorgJob(t *meta.Meta, reorgInfo *reorgInfo, tblInfo *model.TableInfo, lease time.Duration, f func() error) error { +// runReorgJob is used to do `READ` action of reorg info in ddl txn. +// If we want to do `WRITE` action of recording some reorg handle down, we need to additional kv txn. +// Otherwise, the below write conflicts will occur. +// +// ddl txn ------------+-------------------------------+ +// (RunInKVTxn: start) | | (ddl done successfully) +// | (Write) | (RunInKVTxn committed fail: write conflict) +// V V +// reorg handle +// ^ +// | (Update handle, eg: change element) +// | (RunInKVTxn: committed instantly) +// reorg txn--------------------------+ +// +// For this case: +// Let's take the ddl txn as a Daemon thread, it isn't response for writing the reorg handle down, but care for reading +// reorg result. Because read won't be conflict with any write action of the reorg txn. +func (w *worker) runReorgJob(reorgInfo *reorgInfo, tblInfo *model.TableInfo, lease time.Duration, f func() error) error { // lease = 0 means it's in an integration test. In this case we don't delay so the test won't run too slowly. if lease > 0 { delayForAsyncCommit() @@ -220,7 +237,7 @@ func (w *worker) runReorgJob(t *meta.Meta, reorgInfo *reorgInfo, tblInfo *model. case model.ActionModifyColumn: metrics.GetBackfillProgressByLabel(metrics.LblModifyColumn).Set(100) } - if err1 := t.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil { + if err1 := reorgInfo.CleanReorgMeta(); err1 != nil { logutil.BgLogger().Warn("[ddl] run reorg job done, removeDDLReorgHandle failed", zap.Error(err1)) return errors.Trace(err1) } @@ -245,7 +262,7 @@ func (w *worker) runReorgJob(t *meta.Meta, reorgInfo *reorgInfo, tblInfo *model. // Update a reorgInfo's handle. // Since daemon-worker is triggered by timer to store the info half-way. // you should keep these infos is read-only (like job) / atomic (like doneKey & element) / concurrent safe. - err := t.UpdateDDLReorgStartHandle(job, currentElement, doneKey) + err := reorgInfo.UpdateReorgMeta(doneKey) logutil.BgLogger().Info("[ddl] run reorg job wait timeout", zap.Duration("waitTime", waitTimeout), @@ -566,7 +583,7 @@ func getReorgInfo(d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, elem }) info.first = true - // get the current version for reorganization if we don't have + // get the current version for reorganization if we don't have one. ver, err := getValidCurrentVersion(d.store) if err != nil { return nil, errors.Trace(err) @@ -707,3 +724,14 @@ func (r *reorgInfo) UpdateReorgMeta(startKey kv.Key) error { } return nil } + +func (r *reorgInfo) CleanReorgMeta() error { + err := kv.RunInNewTxn(context.Background(), r.d.store, true, func(ctx context.Context, txn kv.Transaction) error { + t := meta.NewMeta(txn) + return errors.Trace(t.RemoveDDLReorgHandle(r.Job, r.elements)) + }) + if err != nil { + return errors.Trace(err) + } + return nil +} diff --git a/ddl/reorg_test.go b/ddl/reorg_test.go index 18dd9a975fceb..231e14a52ce9d 100644 --- a/ddl/reorg_test.go +++ b/ddl/reorg_test.go @@ -93,20 +93,19 @@ func (s *testDDLSuite) TestReorg(c *C) { c.Assert(err, IsNil) txn, err = ctx.Txn(true) c.Assert(err, IsNil) - m := meta.NewMeta(txn) e := &meta.Element{ID: 333, TypeKey: meta.IndexElementKey} rInfo := &reorgInfo{ Job: job, currElement: e, } mockTbl := tables.MockTableFromMeta(&model.TableInfo{IsCommonHandle: s.IsCommonHandle, CommonHandleVersion: 1}) - err = d.generalWorker().runReorgJob(m, rInfo, mockTbl.Meta(), d.lease, f) + err = d.generalWorker().runReorgJob(rInfo, mockTbl.Meta(), d.lease, f) c.Assert(err, NotNil) // The longest to wait for 5 seconds to make sure the function of f is returned. for i := 0; i < 1000; i++ { time.Sleep(5 * time.Millisecond) - err = d.generalWorker().runReorgJob(m, rInfo, mockTbl.Meta(), d.lease, f) + err = d.generalWorker().runReorgJob(rInfo, mockTbl.Meta(), d.lease, f) if err == nil { c.Assert(job.RowCount, Equals, rowCount) c.Assert(d.generalWorker().reorgCtx.rowCount, Equals, int64(0)) @@ -117,7 +116,7 @@ func (s *testDDLSuite) TestReorg(c *C) { err = ctx.NewTxn(context.Background()) c.Assert(err, IsNil) - m = meta.NewMeta(txn) + m := meta.NewMeta(txn) info, err1 := getReorgInfo(d.ddlCtx, m, job, mockTbl, nil) c.Assert(err1, IsNil) c.Assert(info.StartKey, DeepEquals, kv.Key(handle.Encoded())) @@ -172,7 +171,7 @@ func (s *testDDLSuite) TestReorg(c *C) { err = d.Stop() c.Assert(err, IsNil) - err = d.generalWorker().runReorgJob(m, rInfo, mockTbl.Meta(), d.lease, func() error { + err = d.generalWorker().runReorgJob(rInfo, mockTbl.Meta(), d.lease, func() error { time.Sleep(4 * testLease) return nil }) From cac9b9a5c0bd24a8617bea35014c482e2c0e802e Mon Sep 17 00:00:00 2001 From: AilinKid <314806019@qq.com> Date: Fri, 14 May 2021 18:38:08 +0800 Subject: [PATCH 2/2] fix test Signed-off-by: AilinKid <314806019@qq.com> --- ddl/column_type_change_test.go | 17 +++++++++++++++++ ddl/reorg_test.go | 6 ++++++ 2 files changed, 23 insertions(+) diff --git a/ddl/column_type_change_test.go b/ddl/column_type_change_test.go index 992631b4bd97b..1a675e47cca16 100644 --- a/ddl/column_type_change_test.go +++ b/ddl/column_type_change_test.go @@ -1797,3 +1797,20 @@ func (s *testColumnTypeChangeSuite) TestChangeIntToBitWillPanicInBackfillIndexes ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) tk.MustQuery("select * from t").Check(testkit.Rows("\x13 1 1.00", "\x11 2 2.00")) } + +// Close issue #24427 +func (s *testColumnTypeChangeSuite) TestFixDDLTxnWillConflictWithReorgTxn(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + // Enable column change variable. + tk.Se.GetSessionVars().EnableChangeColumnType = true + + tk.MustExec("create table t(a int)") + tk.MustExec("alter table t add index(a)") + tk.MustExec("set @@sql_mode=\"\"") + tk.MustExec("insert into t values(128),(129)") + tk.MustExec("set @@tidb_enable_change_column_type=1") + tk.MustExec("alter table t modify column a tinyint") + + tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1690 constant 128 overflows tinyint", "Warning 1690 constant 128 overflows tinyint")) +} diff --git a/ddl/reorg_test.go b/ddl/reorg_test.go index 231e14a52ce9d..03b98e3c9c76c 100644 --- a/ddl/reorg_test.go +++ b/ddl/reorg_test.go @@ -94,9 +94,15 @@ func (s *testDDLSuite) TestReorg(c *C) { txn, err = ctx.Txn(true) c.Assert(err, IsNil) e := &meta.Element{ID: 333, TypeKey: meta.IndexElementKey} + dCtx := &ddlCtx{ + uuid: d.uuid, + store: d.store, + } rInfo := &reorgInfo{ Job: job, currElement: e, + // reorgJob depend on ddlCtx's store to do the extra txn. + d: dCtx, } mockTbl := tables.MockTableFromMeta(&model.TableInfo{IsCommonHandle: s.IsCommonHandle, CommonHandleVersion: 1}) err = d.generalWorker().runReorgJob(rInfo, mockTbl.Meta(), d.lease, f)