-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ddl: fix the ddl txn commit may be conflict with reorg txn #24668
Changes from all commits
6ed2cbf
1982a4b
cac9b9a
9ad21e9
51f74fb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -546,7 +546,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo | |
return ver, errors.Trace(err) | ||
} | ||
|
||
err = w.runReorgJob(t, reorgInfo, tbl.Meta(), d.lease, func() (addIndexErr error) { | ||
err = w.runReorgJob(reorgInfo, tbl.Meta(), d.lease, func() (addIndexErr error) { | ||
defer util.Recover(metrics.LabelDDL, "onCreateIndex", | ||
func() { | ||
addIndexErr = errCancelledDDLJob.GenWithStack("add table `%v` index `%v` panic", tblInfo.Name, indexInfo.Name) | ||
|
@@ -561,7 +561,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo | |
if kv.ErrKeyExists.Equal(err) || errCancelledDDLJob.Equal(err) || errCantDecodeRecord.Equal(err) { | ||
logutil.BgLogger().Warn("[ddl] run add index job failed, convert job to rollback", zap.String("job", job.String()), zap.Error(err)) | ||
ver, err = convertAddIdxJob2RollbackJob(t, job, tblInfo, indexInfo, err) | ||
if err1 := t.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil { | ||
if err1 := reorgInfo.CleanReorgMeta(); err1 != nil { | ||
logutil.BgLogger().Warn("[ddl] run add index job failed, convert job to rollback, RemoveDDLReorgHandle failed", zap.String("job", job.String()), zap.Error(err1)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto. |
||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -155,7 +155,24 @@ func (rc *reorgCtx) clean() { | |
rc.doneCh = nil | ||
} | ||
|
||
func (w *worker) runReorgJob(t *meta.Meta, reorgInfo *reorgInfo, tblInfo *model.TableInfo, lease time.Duration, f func() error) error { | ||
// runReorgJob is used to do `READ` action of reorg info in ddl txn. | ||
// If we want to do `WRITE` action of recording some reorg handle down, we need to additional kv txn. | ||
// Otherwise, the below write conflicts will occur. | ||
// | ||
// ddl txn ------------+-------------------------------+ | ||
// (RunInKVTxn: start) | | (ddl done successfully) | ||
// | (Write) | (RunInKVTxn committed fail: write conflict) | ||
// V V | ||
// reorg handle | ||
// ^ | ||
// | (Update handle, eg: change element) | ||
// | (RunInKVTxn: committed instantly) | ||
// reorg txn--------------------------+ | ||
// | ||
// For this case: | ||
// Let's take the ddl txn as a Daemon thread, it isn't response for writing the reorg handle down, but care for reading | ||
// reorg result. Because read won't be conflict with any write action of the reorg txn. | ||
func (w *worker) runReorgJob(reorgInfo *reorgInfo, tblInfo *model.TableInfo, lease time.Duration, f func() error) error { | ||
// lease = 0 means it's in an integration test. In this case we don't delay so the test won't run too slowly. | ||
if lease > 0 { | ||
delayForAsyncCommit() | ||
|
@@ -219,7 +236,7 @@ func (w *worker) runReorgJob(t *meta.Meta, reorgInfo *reorgInfo, tblInfo *model. | |
case model.ActionModifyColumn: | ||
metrics.GetBackfillProgressByLabel(metrics.LblModifyColumn).Set(100) | ||
} | ||
if err1 := t.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil { | ||
if err1 := reorgInfo.CleanReorgMeta(); err1 != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This txn and DDL txn are not the same txn. There may be data and schema out of sync. I think this plan can be further optimized |
||
logutil.BgLogger().Warn("[ddl] run reorg job done, removeDDLReorgHandle failed", zap.Error(err1)) | ||
return errors.Trace(err1) | ||
} | ||
|
@@ -244,7 +261,7 @@ func (w *worker) runReorgJob(t *meta.Meta, reorgInfo *reorgInfo, tblInfo *model. | |
// Update a reorgInfo's handle. | ||
// Since daemon-worker is triggered by timer to store the info half-way. | ||
// you should keep these infos is read-only (like job) / atomic (like doneKey & element) / concurrent safe. | ||
err := t.UpdateDDLReorgStartHandle(job, currentElement, doneKey) | ||
err := reorgInfo.UpdateReorgMeta(doneKey) | ||
|
||
logutil.BgLogger().Info("[ddl] run reorg job wait timeout", | ||
zap.Duration("waitTime", waitTimeout), | ||
|
@@ -565,7 +582,7 @@ func getReorgInfo(d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, elem | |
}) | ||
|
||
info.first = true | ||
// get the current version for reorganization if we don't have | ||
// get the current version for reorganization if we don't have one. | ||
ver, err := getValidCurrentVersion(d.store) | ||
if err != nil { | ||
return nil, errors.Trace(err) | ||
|
@@ -706,3 +723,14 @@ func (r *reorgInfo) UpdateReorgMeta(startKey kv.Key) error { | |
} | ||
return nil | ||
} | ||
|
||
func (r *reorgInfo) CleanReorgMeta() error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems that this function is similar to |
||
err := kv.RunInNewTxn(context.Background(), r.d.store, true, func(ctx context.Context, txn kv.Transaction) error { | ||
t := meta.NewMeta(txn) | ||
return errors.Trace(t.RemoveDDLReorgHandle(r.Job, r.elements)) | ||
}) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need update this log?