Skip to content

Commit

Permalink
Merge pull request pingcap#6 from tangenta/multi-schema-change-dev
Browse files Browse the repository at this point in the history
support multi-schema change for add columns
  • Loading branch information
tangenta authored Mar 8, 2022
2 parents edb900e + 4f02b3f commit bc51c13
Show file tree
Hide file tree
Showing 10 changed files with 381 additions and 48 deletions.
35 changes: 3 additions & 32 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,36 +51,6 @@ import (
"go.uber.org/zap"
)

// adjustColumnInfoInAddColumn is used to set the correct position of column info when adding column.
// 1. The added column was append at the end of tblInfo.Columns, due to ddl state was not public then.
// It should be moved to the correct position when the ddl state to be changed to public.
// 2. The offset of column should also to be set to the right value.
func adjustColumnInfoInAddColumn(tblInfo *model.TableInfo, offset int) {
oldCols := tblInfo.Columns
newCols := make([]*model.ColumnInfo, 0, len(oldCols))
newCols = append(newCols, oldCols[:offset]...)
newCols = append(newCols, oldCols[len(oldCols)-1])
newCols = append(newCols, oldCols[offset:len(oldCols)-1]...)
// Adjust column offset.
offsetChanged := make(map[int]int, len(newCols)-offset-1)
for i := offset + 1; i < len(newCols); i++ {
offsetChanged[newCols[i].Offset] = i
newCols[i].Offset = i
}
newCols[offset].Offset = offset
// Update index column offset info.
// TODO: There may be some corner cases for index column offsets, we may check this later.
for _, idx := range tblInfo.Indices {
for _, col := range idx.Columns {
newOffset, ok := offsetChanged[col.Offset]
if ok {
col.Offset = newOffset
}
}
}
tblInfo.Columns = newCols
}

// adjustColumnInfoInDropColumn is used to set the correct position of column info when dropping column.
// 1. The offset of column should to be set to the last of the columns.
// 2. The dropped column is moved to the end of tblInfo.Columns, due to it was not public any more.
Expand Down Expand Up @@ -236,10 +206,11 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error)
}
// Update the job state when all affairs done.
job.SchemaState = model.StateWriteReorganization
job.MarkNonRevertible()
case model.StateWriteReorganization:
// reorganization -> public
// Adjust table column offset.
adjustColumnInfoInAddColumn(tblInfo, offset)
tblInfo.MoveColumnInfo(columnInfo.Offset, offset)
columnInfo.State = model.StatePublic
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != columnInfo.State)
if err != nil {
Expand Down Expand Up @@ -402,7 +373,7 @@ func onAddColumns(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error
}
}
tblInfo.Columns = append(tblInfo.Columns, newCols[i])
adjustColumnInfoInAddColumn(tblInfo, offsets[i])
tblInfo.MoveColumnInfo(len(tblInfo.Columns)-1, offsets[i])
}
setColumnsState(columnInfos, model.StatePublic)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != columnInfos[0].State)
Expand Down
6 changes: 6 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,12 @@ func recordLastDDLInfo(ctx sessionctx.Context, job *model.Job) {
// - context.Cancel: job has been sent to worker, but not found in history DDL job before cancel
// - other: found in history DDL job and return that job error
func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
if mci := ctx.GetSessionVars().StmtCtx.MultiSchemaInfo; mci != nil {
// In multiple schema change, we don't run the job.
// Instead, merge all the jobs into one pending job.
mci.MergeSubJob(job)
return nil
}
// Get a global job ID and put the DDL job in the queue.
job.Query, _ = ctx.Value(sessionctx.QueryString).(string)
task := &limitJobTask{job, make(chan error)}
Expand Down
41 changes: 26 additions & 15 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2832,22 +2832,8 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast
}

if len(validSpecs) > 1 {
switch validSpecs[0].Tp {
case ast.AlterTableAddColumns:
err = d.AddColumns(sctx, ident, validSpecs)
case ast.AlterTableDropColumn:
err = d.DropColumns(sctx, ident, validSpecs)
case ast.AlterTableDropPrimaryKey, ast.AlterTableDropIndex:
err = d.DropIndexes(sctx, ident, validSpecs)
default:
return errRunMultiSchemaChanges
}
if err != nil {
return errors.Trace(err)
}
return nil
sctx.GetSessionVars().StmtCtx.MultiSchemaInfo = model.NewMultiSchemaInfo()
}

for _, spec := range validSpecs {
var handledCharsetOrCollate bool
switch spec.Tp {
Expand Down Expand Up @@ -3034,6 +3020,12 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast
return errors.Trace(err)
}
}
if sctx.GetSessionVars().StmtCtx.MultiSchemaInfo != nil {
err = d.MultiSchemaChange(sctx, ident)
if err != nil {
return errors.Trace(err)
}
}

return nil
}
Expand Down Expand Up @@ -6990,3 +6982,22 @@ func checkTooBigFieldLengthAndTryAutoConvert(tp *types.FieldType, colName string
}
return nil
}

func (d *ddl) MultiSchemaChange(ctx sessionctx.Context, ti ast.Ident) error {
schema, t, err := d.getSchemaAndTableByIdent(ctx, ti)
if err != nil {
return errors.Trace(err)
}
job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
Type: model.ActionMultiSchemaChange,
BinlogInfo: &model.HistoryInfo{},
Args: nil,
MultiSchemaInfo: ctx.GetSessionVars().StmtCtx.MultiSchemaInfo,
}
ctx.GetSessionVars().StmtCtx.MultiSchemaInfo = nil
err = d.doDDLJob(ctx, job)
return d.callHookOnChanged(err)
}
2 changes: 2 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,8 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
ver, err = onAlterCacheTable(t, job)
case model.ActionAlterNoCacheTable:
ver, err = onAlterNoCacheTable(t, job)
case model.ActionMultiSchemaChange:
ver, err = onMultiSchemaChange(w, d, t, job)
default:
// Invalid job, cancel it.
job.State = model.JobStateCancelled
Expand Down
145 changes: 145 additions & 0 deletions ddl/multi_schema_change.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl

import (
"sync"

"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/model"
)

func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
if job.MultiSchemaInfo.Revertible {
// Handle the rolling back job.
if job.IsRollingback() || job.IsCancelling() {
// Rollback/cancel the sub-jobs in reverse order.
for i := len(job.MultiSchemaInfo.SubJobs) - 1; i >= 0; i-- {
sub := job.MultiSchemaInfo.SubJobs[i]
if isFinished(sub) {
continue
}
proxyJob := cloneFromSubJob(job, sub)
ver, err = w.runDDLJob(d, t, proxyJob)
mergeBackToSubJob(proxyJob, sub)
if i == 0 {
// The last rollback/cancelling sub-job is done.
if job.IsRollingback() {
job.State = model.JobStateRollbackDone
}
if job.IsCancelling() {
job.State = model.JobStateCancelled
}
}
return ver, err
}
}

// The sub-jobs are normally running.
// Run the first executable sub-job.
for i, sub := range job.MultiSchemaInfo.SubJobs {
if !sub.Revertible {
// Skip the sub jobs which related schema states
// are in the last revertible point.
continue
}
proxyJob := cloneFromSubJob(job, sub)
ver, err = w.runDDLJob(d, t, proxyJob)
mergeBackToSubJob(proxyJob, sub)
handleRevertibleException(job, sub.State, i)
return ver, err
}
// All the sub-jobs are non-revertible.
job.MultiSchemaInfo.Revertible = false
// Step the sub-jobs to the non-revertible states all at once.
for _, sub := range job.MultiSchemaInfo.SubJobs {
proxyJob := cloneFromSubJob(job, sub)
ver, err = w.runDDLJob(d, t, proxyJob)
mergeBackToSubJob(proxyJob, sub)
}
return ver, err
}
// Run the rest non-revertible sub-jobs one by one.
for _, sub := range job.MultiSchemaInfo.SubJobs {
if isFinished(sub) {
continue
}
proxyJob := cloneFromSubJob(job, sub)
ver, err = w.runDDLJob(d, t, proxyJob)
mergeBackToSubJob(proxyJob, sub)
return ver, err
}
job.State = model.JobStateDone
return ver, err
}

func isFinished(job *model.SubJob) bool {
return job.State == model.JobStateDone ||
job.State == model.JobStateRollbackDone ||
job.State == model.JobStateCancelled
}

func cloneFromSubJob(job *model.Job, sub *model.SubJob) *model.Job {
return &model.Job{
ID: 0,
Type: sub.Type,
SchemaID: job.SchemaID,
TableID: job.TableID,
SchemaName: job.SchemaName,
State: job.State,
Error: nil,
ErrorCount: 0,
RowCount: 0,
Mu: sync.Mutex{},
CtxVars: nil,
Args: sub.Args,
RawArgs: sub.RawArgs,
SchemaState: sub.SchemaState,
SnapshotVer: sub.SnapshotVer,
RealStartTS: job.RealStartTS,
StartTS: job.StartTS,
DependencyID: job.DependencyID,
Query: job.Query,
BinlogInfo: job.BinlogInfo,
Version: job.Version,
ReorgMeta: job.ReorgMeta,
MultiSchemaInfo: &model.MultiSchemaInfo{Revertible: sub.Revertible},
Priority: job.Priority,
SeqNum: job.SeqNum,
}
}

func mergeBackToSubJob(job *model.Job, sub *model.SubJob) {
sub.Revertible = job.MultiSchemaInfo.Revertible
sub.SchemaState = job.SchemaState
sub.SnapshotVer = job.SnapshotVer
sub.Args = job.Args
sub.State = job.State
}

func handleRevertibleException(job *model.Job, res model.JobState, idx int) {
if res == model.JobStateRollingback || res == model.JobStateCancelling {
job.State = res
}
// Flush the rollback state and cancelled state to sub-jobs.
for i, sub := range job.MultiSchemaInfo.SubJobs {
if i < idx {
sub.State = model.JobStateRollingback
}
if i > idx {
sub.State = model.JobStateCancelled
}
}
}
34 changes: 34 additions & 0 deletions ddl/multi_schema_change_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl_test

import (
"testing"

"github.com/pingcap/tidb/testkit"
)

func TestMultiSchemaChangeAddColumns(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set @@global.tidb_enable_change_multi_schema = 1")

tk.MustExec("create table t (a int);")
tk.MustExec("insert into t values (1);")
tk.MustExec("alter table t add column b int default 2, add column c int default 3;")
tk.MustQuery("select * from t;").Check(testkit.Rows("1 2 3"))
}
Loading

0 comments on commit bc51c13

Please sign in to comment.