diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index ef414c5f9dd89..fd51e8b8532ed 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -831,6 +831,7 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error { if err != nil { logutil.BgLogger().Error("error when getting the ddl history count", zap.Error(err)) } + d.runningJobs.clear() }) d.delRangeMgr = d.newDeleteRangeManager(ctxPool == nil) @@ -869,13 +870,6 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error { defer d.sessPool.Put(ctx) ingest.InitGlobalLightningEnv() - d.ownerManager.SetRetireOwnerHook(func() { - // Since this instance is not DDL owner anymore, we clean up the processing job info. - if ingest.LitBackCtxMgr != nil { - ingest.LitBackCtxMgr.MarkJobFinish() - } - d.runningJobs.clear() - }) return nil } diff --git a/pkg/ddl/ddl_running_jobs.go b/pkg/ddl/ddl_running_jobs.go index 4a27dd00e1a19..d2ce39d1725af 100644 --- a/pkg/ddl/ddl_running_jobs.go +++ b/pkg/ddl/ddl_running_jobs.go @@ -22,8 +22,11 @@ import ( "strconv" "strings" "sync" + "time" "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/util/logutil" + "go.uber.org/zap" ) type runningJobs struct { @@ -36,6 +39,11 @@ type runningJobs struct { // It is not necessarily being processed by a worker. unfinishedIDs map[int64]struct{} unfinishedSchema map[string]map[string]struct{} // database -> table -> struct{} + + // processingReorgJobID records the ID of the ingest job that is being processed by a worker. + // TODO(tangenta): remove this when we support running multiple concurrent ingest jobs. + processingIngestJobID int64 + lastLoggingTime time.Time } func newRunningJobs() *runningJobs { @@ -47,6 +55,8 @@ func newRunningJobs() *runningJobs { } func (j *runningJobs) clear() { + j.Lock() + defer j.Unlock() j.unfinishedIDs = make(map[int64]struct{}) j.unfinishedSchema = make(map[string]map[string]struct{}) } @@ -56,6 +66,9 @@ func (j *runningJobs) add(job *model.Job) { defer j.Unlock() j.processingIDs[job.ID] = struct{}{} j.updateInternalRunningJobIDs() + if isIngestJob(job) { + j.processingIngestJobID = job.ID + } if _, ok := j.unfinishedIDs[job.ID]; ok { // Already exists, no need to add it again. @@ -75,6 +88,9 @@ func (j *runningJobs) remove(job *model.Job) { defer j.Unlock() delete(j.processingIDs, job.ID) j.updateInternalRunningJobIDs() + if isIngestJob(job) && job.ID == j.processingIngestJobID { + j.processingIngestJobID = 0 + } if job.IsFinished() || job.IsSynced() { delete(j.unfinishedIDs, job.ID) @@ -115,6 +131,16 @@ func (j *runningJobs) checkRunnable(job *model.Job) bool { // Already processing by a worker. Skip running it again. return false } + if isIngestJob(job) && j.processingIngestJobID != 0 { + // We only allow one task to use ingest at the same time in order to limit the CPU/memory usage. + if time.Since(j.lastLoggingTime) > 1*time.Minute { + logutil.BgLogger().Info("ingest backfill worker is already in used by another DDL job", + zap.String("category", "ddl-ingest"), + zap.Int64("processing job ID", j.processingIngestJobID)) + j.lastLoggingTime = time.Now() + } + return false + } for _, info := range job.GetInvolvingSchemaInfo() { if _, ok := j.unfinishedSchema[model.InvolvingAll]; ok { return false @@ -136,3 +162,9 @@ func (j *runningJobs) checkRunnable(job *model.Job) bool { } return true } + +func isIngestJob(job *model.Job) bool { + return (job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey) && + job.ReorgMeta != nil && + job.ReorgMeta.IsFastReorg +} diff --git a/pkg/ddl/ddl_worker.go b/pkg/ddl/ddl_worker.go index 347ef20d3142b..1e025eb0fdc7e 100644 --- a/pkg/ddl/ddl_worker.go +++ b/pkg/ddl/ddl_worker.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/tidb/pkg/ddl/ingest" sess "github.com/pingcap/tidb/pkg/ddl/internal/session" "github.com/pingcap/tidb/pkg/ddl/util" "github.com/pingcap/tidb/pkg/kv" @@ -693,7 +692,6 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) { startTime := time.Now() defer func() { metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerFinishDDLJob, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds()) - markJobFinish(job) }() if JobNeedGC(job) { @@ -743,15 +741,6 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) { return errors.Trace(err) } -func markJobFinish(job *model.Job) { - if (job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey) && - job.ReorgMeta != nil && - job.ReorgMeta.IsFastReorg && - ingest.LitBackCtxMgr != nil { - ingest.LitBackCtxMgr.MarkJobFinish() - } -} - func (w *worker) writeDDLSeqNum(job *model.Job) { w.ddlSeqNumMu.Lock() w.ddlSeqNumMu.seqNum++ diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 58bf5f5b863d6..063403c2a97e8 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -41,6 +41,7 @@ import ( "github.com/pingcap/tidb/pkg/disttask/framework/storage" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/lightning/backend/local" "github.com/pingcap/tidb/pkg/lightning/common" "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/metrics" @@ -853,6 +854,9 @@ func cleanupSortPath(ctx context.Context, currentJobID int64) error { logutil.Logger(ctx).Warn(ingest.LitErrCleanSortPath, zap.Error(err)) return nil } + failpoint.Inject("ownerResignAfterDispatchLoopCheck", func() { + close(local.WaitRMFolderChForTest) + }) } } return nil diff --git a/pkg/ddl/ingest/backend_mgr.go b/pkg/ddl/ingest/backend_mgr.go index 58f83cb3af3ad..b627c76fa465f 100644 --- a/pkg/ddl/ingest/backend_mgr.go +++ b/pkg/ddl/ingest/backend_mgr.go @@ -19,9 +19,9 @@ import ( "fmt" "math" "strconv" - "sync" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/lightning/backend/local" "github.com/pingcap/tidb/pkg/lightning/config" "github.com/pingcap/tidb/pkg/util/generic" @@ -29,6 +29,7 @@ import ( kvutil "github.com/tikv/client-go/v2/util" pd "github.com/tikv/pd/client" clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -48,18 +49,12 @@ type BackendCtxMgr interface { ) (BackendCtx, error) Unregister(jobID int64) Load(jobID int64) (BackendCtx, bool) - - MarkJobProcessing(jobID int64) (ok bool) - MarkJobFinish() } type litBackendCtxMgr struct { generic.SyncMap[int64, *litBackendCtx] - memRoot MemRoot - diskRoot DiskRoot - processingJobID int64 - lastLoggingTime time.Time - mu sync.Mutex + memRoot MemRoot + diskRoot DiskRoot } func newLitBackendCtxMgr(path string, memQuota uint64) BackendCtxMgr { @@ -80,30 +75,6 @@ func newLitBackendCtxMgr(path string, memQuota uint64) BackendCtxMgr { return mgr } -// MarkJobProcessing marks ingest backfill is processing. -func (m *litBackendCtxMgr) MarkJobProcessing(jobID int64) bool { - m.mu.Lock() - defer m.mu.Unlock() - if m.processingJobID == 0 || m.processingJobID == jobID { - m.processingJobID = jobID - return true - } - if time.Since(m.lastLoggingTime) > 1*time.Minute { - logutil.BgLogger().Info("ingest backfill worker is already in used by another DDL job", - zap.String("category", "ddl-ingest"), - zap.Int64("processing job ID", m.processingJobID)) - m.lastLoggingTime = time.Now() - } - return false -} - -// MarkJobFinish marks ingest backfill is finished. -func (m *litBackendCtxMgr) MarkJobFinish() { - m.mu.Lock() - m.processingJobID = 0 - m.mu.Unlock() -} - // CheckAvailable checks if the ingest backfill is available. func (m *litBackendCtxMgr) CheckAvailable() (bool, error) { if err := m.diskRoot.PreCheckUsage(); err != nil { @@ -113,6 +84,9 @@ func (m *litBackendCtxMgr) CheckAvailable() (bool, error) { return true, nil } +// ResignOwnerForTest is only used for test. +var ResignOwnerForTest = atomic.NewBool(false) + // Register creates a new backend and registers it to the backend context. func (m *litBackendCtxMgr) Register( ctx context.Context, @@ -137,6 +111,9 @@ func (m *litBackendCtxMgr) Register( logutil.Logger(ctx).Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err)) return nil, err } + failpoint.Inject("beforeCreateLocalBackend", func() { + ResignOwnerForTest.Store(true) + }) bd, err := createLocalBackend(ctx, cfg, pdSvcDiscovery) if err != nil { logutil.Logger(ctx).Error(LitErrCreateBackendFail, zap.Int64("job ID", jobID), zap.Error(err)) diff --git a/pkg/ddl/ingest/mock.go b/pkg/ddl/ingest/mock.go index d41d12571d087..a510fc3949a76 100644 --- a/pkg/ddl/ingest/mock.go +++ b/pkg/ddl/ingest/mock.go @@ -43,15 +43,6 @@ func NewMockBackendCtxMgr(sessCtxProvider func() sessionctx.Context) *MockBacken } } -// MarkJobProcessing implements BackendCtxMgr.MarkJobProcessing interface. -func (*MockBackendCtxMgr) MarkJobProcessing(_ int64) bool { - return true -} - -// MarkJobFinish implements BackendCtxMgr.MarkJobFinish interface. -func (*MockBackendCtxMgr) MarkJobFinish() { -} - // CheckAvailable implements BackendCtxMgr.Available interface. func (m *MockBackendCtxMgr) CheckAvailable() (bool, error) { return len(m.runningJobs) == 0, nil diff --git a/pkg/ddl/job_table.go b/pkg/ddl/job_table.go index 52772db5e4348..70033dac550f7 100644 --- a/pkg/ddl/job_table.go +++ b/pkg/ddl/job_table.go @@ -228,17 +228,6 @@ func (d *ddl) getReorgJob(sess *sess.Session) (*model.Job, error) { if !d.runningJobs.checkRunnable(job) { return false, nil } - if (job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey) && - job.State == model.JobStateQueueing && - job.ReorgMeta != nil && - job.ReorgMeta.IsFastReorg && - ingest.LitBackCtxMgr != nil { - succeed := ingest.LitBackCtxMgr.MarkJobProcessing(job.ID) - if !succeed { - // We only allow one task to use ingest at the same time in order to limit the CPU/memory usage. - return false, nil - } - } // Check if there is any block ddl running, like drop schema and flashback cluster. sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job where "+ "(CONCAT(',', schema_ids, ',') REGEXP CONCAT(',', %s, ',') != 0 and type = %d and processing) "+ @@ -292,6 +281,15 @@ func (d *ddl) startDispatchLoop() { time.Sleep(dispatchLoopWaitingDuration) continue } + failpoint.Inject("ownerResignAfterDispatchLoopCheck", func() { + if ingest.ResignOwnerForTest.Load() { + err2 := d.ownerManager.ResignOwner(context.Background()) + if err2 != nil { + logutil.BgLogger().Info("resign meet error", zap.Error(err2)) + } + ingest.ResignOwnerForTest.Store(false) + } + }) select { case <-d.ddlJobCh: case <-ticker.C: diff --git a/pkg/lightning/backend/local/BUILD.bazel b/pkg/lightning/backend/local/BUILD.bazel index 7ed3c71601a03..13eb66eab9019 100644 --- a/pkg/lightning/backend/local/BUILD.bazel +++ b/pkg/lightning/backend/local/BUILD.bazel @@ -59,6 +59,7 @@ go_library( "//pkg/util/compress", "//pkg/util/engine", "//pkg/util/hack", + "//pkg/util/logutil", "//pkg/util/mathutil", "//pkg/util/ranger", "@com_github_cockroachdb_pebble//:pebble", diff --git a/pkg/lightning/backend/local/engine_mgr.go b/pkg/lightning/backend/local/engine_mgr.go index ca4140fa62435..acf4b3f97125c 100644 --- a/pkg/lightning/backend/local/engine_mgr.go +++ b/pkg/lightning/backend/local/engine_mgr.go @@ -19,9 +19,12 @@ import ( "math" "os" "path/filepath" + "strings" "sync" + "time" "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/vfs" "github.com/docker/go-units" "github.com/google/uuid" "github.com/pingcap/errors" @@ -33,6 +36,7 @@ import ( "github.com/pingcap/tidb/pkg/lightning/common" "github.com/pingcap/tidb/pkg/lightning/log" "github.com/pingcap/tidb/pkg/lightning/manual" + "github.com/pingcap/tidb/pkg/util/logutil" "github.com/tikv/client-go/v2/oracle" tikvclient "github.com/tikv/client-go/v2/tikv" "go.uber.org/atomic" @@ -581,6 +585,25 @@ func (em *engineManager) getBufferPool() *membuf.Pool { return em.bufferPool } +// only used in tests +type slowCreateFS struct { + vfs.FS +} + +// WaitRMFolderChForTest is a channel for testing. +var WaitRMFolderChForTest = make(chan struct{}) + +func (s slowCreateFS) Create(name string) (vfs.File, error) { + if strings.Contains(name, "temporary") { + select { + case <-WaitRMFolderChForTest: + case <-time.After(1 * time.Second): + logutil.BgLogger().Info("no one removes folder") + } + } + return s.FS.Create(name) +} + func openDuplicateDB(storeDir string) (*pebble.DB, error) { dbPath := filepath.Join(storeDir, duplicateDBName) // TODO: Optimize the opts for better write. @@ -589,6 +612,9 @@ func openDuplicateDB(storeDir string) (*pebble.DB, error) { newRangePropertiesCollector, }, } + failpoint.Inject("slowCreateFS", func() { + opts.FS = slowCreateFS{vfs.Default} + }) return pebble.Open(dbPath, opts) } diff --git a/pkg/owner/manager.go b/pkg/owner/manager.go index e9e9a7734679a..11d5636c12136 100644 --- a/pkg/owner/manager.go +++ b/pkg/owner/manager.go @@ -65,8 +65,6 @@ type Manager interface { // SetBeOwnerHook sets a hook. The hook is called before becoming an owner. SetBeOwnerHook(hook func()) - // SetRetireOwnerHook will be called after retiring the owner. - SetRetireOwnerHook(hook func()) } const ( @@ -118,8 +116,7 @@ type ownerManager struct { wg sync.WaitGroup campaignCancel context.CancelFunc - beOwnerHook func() - retireOwnerHook func() + beOwnerHook func() } // NewOwnerManager creates a new Manager. @@ -164,10 +161,6 @@ func (m *ownerManager) SetBeOwnerHook(hook func()) { m.beOwnerHook = hook } -func (m *ownerManager) SetRetireOwnerHook(hook func()) { - m.retireOwnerHook = hook -} - // ManagerSessionTTL is the etcd session's TTL in seconds. It's exported for testing. var ManagerSessionTTL = 60 @@ -230,9 +223,6 @@ func (m *ownerManager) toBeOwner(elec *concurrency.Election) { // RetireOwner make the manager to be a not owner. func (m *ownerManager) RetireOwner() { - if m.retireOwnerHook != nil { - m.retireOwnerHook() - } atomic.StorePointer(&m.elec, nil) } diff --git a/pkg/owner/mock.go b/pkg/owner/mock.go index 2b625d225aee1..babc054ed17b4 100644 --- a/pkg/owner/mock.go +++ b/pkg/owner/mock.go @@ -174,11 +174,6 @@ func (m *mockManager) SetBeOwnerHook(hook func()) { m.beOwnerHook = hook } -// SetRetireOwnerHook implements Manager.SetRetireOwnerHook interface. -func (m *mockManager) SetRetireOwnerHook(hook func()) { - m.retireHook = hook -} - // CampaignCancel implements Manager.CampaignCancel interface func (m *mockManager) CampaignCancel() { m.campaignDone <- struct{}{} diff --git a/tests/realtikvtest/addindextest4/ingest_test.go b/tests/realtikvtest/addindextest4/ingest_test.go index cff1d1f38a98f..b163234da43fe 100644 --- a/tests/realtikvtest/addindextest4/ingest_test.go +++ b/tests/realtikvtest/addindextest4/ingest_test.go @@ -538,3 +538,45 @@ func TestAddUniqueIndexDuplicatedError(t *testing.T) { tk.MustExec("INSERT INTO `b1cce552` (`f5d9aecb`, `d9337060`, `4c74082f`, `9215adc3`, `85ad5a07`, `8c60260f`, `8069da7b`, `91e218e1`) VALUES ('2031-12-22 06:44:52', 'duplicatevalue', 2028, NULL, 846, 'N6QD1=@ped@owVoJx', '9soPM2d6H', 'Tv%'), ('2031-12-22 06:44:52', 'duplicatevalue', 2028, NULL, 9052, '_HWaf#gD!bw', '9soPM2d6H', 'Tv%');") tk.MustGetErrCode("ALTER TABLE `b1cce552` ADD unique INDEX `65290727` (`4c74082f`, `d9337060`, `8069da7b`);", errno.ErrDupEntry) } + +func TestFirstLitSlowStart(t *testing.T) { + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("drop database if exists addindexlit;") + tk.MustExec("create database addindexlit;") + tk.MustExec("use addindexlit;") + tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + + tk.MustExec("create table t(a int, b int);") + tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3);") + tk.MustExec("create table t2(a int, b int);") + tk.MustExec("insert into t2 values (1, 1), (2, 2), (3, 3);") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use addindexlit;") + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/ingest/beforeCreateLocalBackend", "1*return()")) + t.Cleanup(func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/ingest/beforeCreateLocalBackend")) + }) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/ownerResignAfterDispatchLoopCheck", "return()")) + t.Cleanup(func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/ownerResignAfterDispatchLoopCheck")) + }) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/lightning/backend/local/slowCreateFS", "return()")) + t.Cleanup(func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/lightning/backend/local/slowCreateFS")) + }) + + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + tk.MustExec("alter table t add unique index idx(a);") + }() + go func() { + defer wg.Done() + tk1.MustExec("alter table t2 add unique index idx(a);") + }() + wg.Wait() +}