Skip to content

Commit

Permalink
ddl: fix cancel rename index ddl error (#8610)
Browse files Browse the repository at this point in the history
  • Loading branch information
ciscoxll authored Jan 8, 2019
1 parent ddda6b4 commit da31a46
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 16 deletions.
58 changes: 58 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,64 @@ func (s *testDBSuite) TestCancelDropIndex(c *C) {
s.mustExec(c, "alter table t drop index idx_c2")
}

// TestCancelRenameIndex tests cancel ddl job which type is rename index.
func (s *testDBSuite) TestCancelRenameIndex(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.mustExec(c, "use test_db")
s.mustExec(c, "create database if not exists test_rename_index")
s.mustExec(c, "drop table if exists t")
s.mustExec(c, "create table t(c1 int, c2 int)")
defer s.mustExec(c, "drop table t;")
for i := 0; i < 100; i++ {
s.mustExec(c, "insert into t values (?, ?)", i, i)
}
s.mustExec(c, "alter table t add index idx_c2(c2)")
var checkErr error
hook := &ddl.TestDDLCallback{}
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionRenameIndex && job.State == model.JobStateNone {
jobIDs := []int64{job.ID}
hookCtx := mock.NewContext()
hookCtx.Store = s.store
err := hookCtx.NewTxn(context.Background())
if err != nil {
checkErr = errors.Trace(err)
return
}
txn, err := hookCtx.Txn(true)
if err != nil {
checkErr = errors.Trace(err)
return
}
errs, err := admin.CancelJobs(txn, jobIDs)
if err != nil {
checkErr = errors.Trace(err)
return
}
if errs[0] != nil {
checkErr = errors.Trace(errs[0])
return
}
checkErr = txn.Commit(context.Background())
}
}
originalHook := s.dom.DDL().GetHook()
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)
rs, err := s.tk.Exec("alter table t rename index idx_c2 to idx_c3")
if rs != nil {
rs.Close()
}
c.Assert(checkErr, IsNil)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job")
s.dom.DDL().(ddl.DDLForTest).SetHook(originalHook)
t := s.testGetTable(c, "t")
for _, idx := range t.Indices() {
c.Assert(strings.EqualFold(idx.Meta().Name.L, "idx_c3"), IsFalse)
}
s.mustExec(c, "alter table t rename index idx_c2 to idx_c3")
}

// TestCancelDropTable tests cancel ddl job which type is drop table.
func (s *testDBSuite) TestCancelDropTableAndSchema(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
Expand Down
42 changes: 26 additions & 16 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,26 +200,11 @@ func validateRenameIndex(from, to model.CIStr, tbl *model.TableInfo) (ignore boo
}

func onRenameIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {
var from, to model.CIStr
if err := job.DecodeArgs(&from, &to); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
tblInfo, err := getTableInfo(t, job, job.SchemaID)
tblInfo, from, to, err := checkRenameIndex(t, job)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

// Double check. See function `RenameIndex` in ddl_api.go
duplicate, err := validateRenameIndex(from, to, tblInfo)
if duplicate {
return ver, nil
}
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
idx := schemautil.FindIndexByName(from.L, tblInfo.Indices)
idx.Name = to
if ver, err = updateVersionAndTableInfo(t, job, tblInfo, true); err != nil {
Expand Down Expand Up @@ -436,6 +421,31 @@ func checkDropIndex(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.Inde
return tblInfo, indexInfo, nil
}

func checkRenameIndex(t *meta.Meta, job *model.Job) (*model.TableInfo, model.CIStr, model.CIStr, error) {
var from, to model.CIStr
schemaID := job.SchemaID
tblInfo, err := getTableInfo(t, job, schemaID)
if err != nil {
return nil, from, to, errors.Trace(err)
}

if err := job.DecodeArgs(&from, &to); err != nil {
job.State = model.JobStateCancelled
return nil, from, to, errors.Trace(err)
}

// Double check. See function `RenameIndex` in ddl_api.go
duplicate, err := validateRenameIndex(from, to, tblInfo)
if duplicate {
return nil, from, to, nil
}
if err != nil {
job.State = model.JobStateCancelled
return nil, from, to, errors.Trace(err)
}
return tblInfo, from, to, errors.Trace(err)
}

const (
// DefaultTaskHandleCnt is default batch size of adding indices.
DefaultTaskHandleCnt = 128
Expand Down
17 changes: 17 additions & 0 deletions ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,21 @@ func rollingbackDropSchema(t *meta.Meta, job *model.Job) error {
return nil
}

func rollingbackRenameIndex(t *meta.Meta, job *model.Job) (ver int64, err error) {
tblInfo, from, _, err := checkRenameIndex(t, job)
if err != nil {
return ver, errors.Trace(err)
}
// Here rename index is done in a transaction, if the job is not completed, it can be canceled.
idx := schemautil.FindIndexByName(from.L, tblInfo.Indices)
if idx.State == model.StatePublic {
job.State = model.JobStateCancelled
return ver, errCancelledDDLJob
}
job.State = model.JobStateRunning
return ver, errors.Trace(err)
}

func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
switch job.Type {
case model.ActionAddColumn:
Expand All @@ -204,6 +219,8 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job)
err = rollingbackDropTableOrView(t, job)
case model.ActionDropSchema:
err = rollingbackDropSchema(t, job)
case model.ActionRenameIndex:
ver, err = rollingbackRenameIndex(t, job)
default:
job.State = model.JobStateCancelled
err = errCancelledDDLJob
Expand Down

0 comments on commit da31a46

Please sign in to comment.