From 6f392fb549f1d2d8e0cec22726c5c471705e10d3 Mon Sep 17 00:00:00 2001 From: wjhuang2016 Date: Fri, 16 Dec 2022 15:48:12 +0800 Subject: [PATCH] done Signed-off-by: wjhuang2016 --- ddl/ddl_worker.go | 15 +++++++++++++-- ddl/job_table.go | 5 ++++- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 71b0b1b14608e..dac2f01216edb 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -1381,7 +1381,7 @@ func waitSchemaChanged(ctx context.Context, d *ddlCtx, waitTime time.Duration, l } // waitSchemaSyncedForMDL likes waitSchemaSynced, but it waits for getting the metadata lock of the latest version of this DDL. -func waitSchemaSyncedForMDL(d *ddlCtx, job *model.Job, waitTime time.Duration, latestSchemaVersion int64) { +func waitSchemaSyncedForMDL(d *ddlCtx, job *model.Job, latestSchemaVersion int64) error { failpoint.Inject("checkDownBeforeUpdateGlobalVersion", func(val failpoint.Value) { if val.(bool) { if mockDDLErrOnce > 0 && mockDDLErrOnce != latestSchemaVersion { @@ -1392,7 +1392,18 @@ func waitSchemaSyncedForMDL(d *ddlCtx, job *model.Job, waitTime time.Duration, l } }) - waitSchemaChanged(context.Background(), d, waitTime, latestSchemaVersion, job) + timeStart := time.Now() + // OwnerCheckAllVersions returns only when all TiDB schemas are synced(exclude the isolated TiDB). + err := d.schemaSyncer.OwnerCheckAllVersions(context.Background(), job.ID, latestSchemaVersion) + if err != nil { + logutil.Logger(d.ctx).Info("[ddl] wait latest schema version encounter error", zap.Int64("ver", latestSchemaVersion), zap.Error(err)) + return err + } + logutil.Logger(d.ctx).Info("[ddl] wait latest schema version changed(get the metadata lock if tidb_enable_metadata_lock is true)", + zap.Int64("ver", latestSchemaVersion), + zap.Duration("take time", time.Since(timeStart)), + zap.String("job", job.String())) + return nil } // waitSchemaSynced handles the following situation: diff --git a/ddl/job_table.go b/ddl/job_table.go index bc0e1d1343518..894c4c45b8380 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -245,7 +245,10 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) { } else if exist { // Release the worker resource. pool.put(wk) - waitSchemaSyncedForMDL(d.ddlCtx, job, 2*d.lease, version) + err = waitSchemaSyncedForMDL(d.ddlCtx, job, version) + if err != nil { + return + } d.once.Store(false) cleanMDLInfo(d.sessPool, job.ID, d.etcdCli) // Don't have a worker now.