diff --git a/br/pkg/lightning/backend/local/BUILD.bazel b/br/pkg/lightning/backend/local/BUILD.bazel index 34bfa91e5722f..ec1924d402cd1 100644 --- a/br/pkg/lightning/backend/local/BUILD.bazel +++ b/br/pkg/lightning/backend/local/BUILD.bazel @@ -63,6 +63,7 @@ go_library( "//pkg/util/ranger", "@com_github_cockroachdb_pebble//:pebble", "@com_github_cockroachdb_pebble//sstable", + "@com_github_cockroachdb_pebble//vfs", "@com_github_coreos_go_semver//semver", "@com_github_docker_go_units//:go-units", "@com_github_google_btree//:btree", diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 9d901f4455b74..93fc7c672ed36 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -29,6 +29,7 @@ import ( "time" "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/vfs" "github.com/coreos/go-semver/semver" "github.com/docker/go-units" "github.com/google/uuid" @@ -496,6 +497,25 @@ type Backend struct { var _ DiskUsage = (*Backend)(nil) var _ backend.Backend = (*Backend)(nil) +// only used in tests +type slowCreateFS struct { + vfs.FS +} + +// WaitRMFolderChForTest is a channel for testing. +var WaitRMFolderChForTest = make(chan struct{}) + +func (s slowCreateFS) Create(name string) (vfs.File, error) { + if strings.Contains(name, "temporary") { + select { + case <-WaitRMFolderChForTest: + case <-time.After(1 * time.Second): + log.FromContext(context.Background()).Info("no one removes folder") + } + } + return s.FS.Create(name) +} + func openDuplicateDB(storeDir string) (*pebble.DB, error) { dbPath := filepath.Join(storeDir, duplicateDBName) // TODO: Optimize the opts for better write. diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index 1b6ece2b63881..e9e146c466dce 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -752,6 +752,7 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error { if err != nil { logutil.BgLogger().Error("error when getting the ddl history count", zap.Error(err)) } + d.runningJobs.clear() }) d.delRangeMgr = d.newDeleteRangeManager(ctxPool == nil) @@ -790,13 +791,6 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error { defer d.sessPool.Put(ctx) ingest.InitGlobalLightningEnv() - d.ownerManager.SetRetireOwnerHook(func() { - // Since this instance is not DDL owner anymore, we clean up the processing job info. - if ingest.LitBackCtxMgr != nil { - ingest.LitBackCtxMgr.MarkJobFinish() - } - d.runningJobs = newRunningJobs() - }) return nil } diff --git a/pkg/ddl/ddl_running_jobs.go b/pkg/ddl/ddl_running_jobs.go index ffcb79af8a422..d2ce39d1725af 100644 --- a/pkg/ddl/ddl_running_jobs.go +++ b/pkg/ddl/ddl_running_jobs.go @@ -22,8 +22,11 @@ import ( "strconv" "strings" "sync" + "time" "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/util/logutil" + "go.uber.org/zap" ) type runningJobs struct { @@ -36,6 +39,11 @@ type runningJobs struct { // It is not necessarily being processed by a worker. unfinishedIDs map[int64]struct{} unfinishedSchema map[string]map[string]struct{} // database -> table -> struct{} + + // processingReorgJobID records the ID of the ingest job that is being processed by a worker. + // TODO(tangenta): remove this when we support running multiple concurrent ingest jobs. + processingIngestJobID int64 + lastLoggingTime time.Time } func newRunningJobs() *runningJobs { @@ -46,11 +54,21 @@ func newRunningJobs() *runningJobs { } } +func (j *runningJobs) clear() { + j.Lock() + defer j.Unlock() + j.unfinishedIDs = make(map[int64]struct{}) + j.unfinishedSchema = make(map[string]map[string]struct{}) +} + func (j *runningJobs) add(job *model.Job) { j.Lock() defer j.Unlock() j.processingIDs[job.ID] = struct{}{} j.updateInternalRunningJobIDs() + if isIngestJob(job) { + j.processingIngestJobID = job.ID + } if _, ok := j.unfinishedIDs[job.ID]; ok { // Already exists, no need to add it again. @@ -70,6 +88,9 @@ func (j *runningJobs) remove(job *model.Job) { defer j.Unlock() delete(j.processingIDs, job.ID) j.updateInternalRunningJobIDs() + if isIngestJob(job) && job.ID == j.processingIngestJobID { + j.processingIngestJobID = 0 + } if job.IsFinished() || job.IsSynced() { delete(j.unfinishedIDs, job.ID) @@ -110,6 +131,16 @@ func (j *runningJobs) checkRunnable(job *model.Job) bool { // Already processing by a worker. Skip running it again. return false } + if isIngestJob(job) && j.processingIngestJobID != 0 { + // We only allow one task to use ingest at the same time in order to limit the CPU/memory usage. + if time.Since(j.lastLoggingTime) > 1*time.Minute { + logutil.BgLogger().Info("ingest backfill worker is already in used by another DDL job", + zap.String("category", "ddl-ingest"), + zap.Int64("processing job ID", j.processingIngestJobID)) + j.lastLoggingTime = time.Now() + } + return false + } for _, info := range job.GetInvolvingSchemaInfo() { if _, ok := j.unfinishedSchema[model.InvolvingAll]; ok { return false @@ -131,3 +162,9 @@ func (j *runningJobs) checkRunnable(job *model.Job) bool { } return true } + +func isIngestJob(job *model.Job) bool { + return (job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey) && + job.ReorgMeta != nil && + job.ReorgMeta.IsFastReorg +} diff --git a/pkg/ddl/ddl_worker.go b/pkg/ddl/ddl_worker.go index 9f6d872942cc9..3935c2e1b649c 100644 --- a/pkg/ddl/ddl_worker.go +++ b/pkg/ddl/ddl_worker.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/tidb/pkg/ddl/ingest" sess "github.com/pingcap/tidb/pkg/ddl/internal/session" "github.com/pingcap/tidb/pkg/ddl/util" "github.com/pingcap/tidb/pkg/kv" @@ -576,7 +575,6 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) { startTime := time.Now() defer func() { metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerFinishDDLJob, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds()) - markJobFinish(job) }() if jobNeedGC(job) { @@ -622,15 +620,6 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) { return errors.Trace(err) } -func markJobFinish(job *model.Job) { - if (job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey) && - job.ReorgMeta != nil && - job.ReorgMeta.IsFastReorg && - ingest.LitBackCtxMgr != nil { - ingest.LitBackCtxMgr.MarkJobFinish() - } -} - func (w *worker) writeDDLSeqNum(job *model.Job) { w.ddlSeqNumMu.Lock() w.ddlSeqNumMu.seqNum++ diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 05ff2289d3a13..82e78ced0865c 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/tidb/br/pkg/lightning/backend/local" "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/ddl/copr" @@ -843,6 +844,9 @@ func cleanupSortPath(ctx context.Context, currentJobID int64) error { logutil.Logger(ctx).Warn(ingest.LitErrCleanSortPath, zap.Error(err)) return nil } + failpoint.Inject("ownerResignAfterDispatchLoopCheck", func() { + close(local.WaitRMFolderChForTest) + }) } } return nil diff --git a/pkg/ddl/ingest/backend_mgr.go b/pkg/ddl/ingest/backend_mgr.go index 794598d889807..28c749cfd6ad6 100644 --- a/pkg/ddl/ingest/backend_mgr.go +++ b/pkg/ddl/ingest/backend_mgr.go @@ -19,7 +19,6 @@ import ( "fmt" "math" "strconv" - "sync" "time" "github.com/pingcap/tidb/br/pkg/lightning/backend/local" @@ -28,6 +27,7 @@ import ( "github.com/pingcap/tidb/pkg/util/logutil" kvutil "github.com/tikv/client-go/v2/util" clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -37,18 +37,12 @@ type BackendCtxMgr interface { Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client, pdAddr string, resourceGroupName string) (BackendCtx, error) Unregister(jobID int64) Load(jobID int64) (BackendCtx, bool) - - MarkJobProcessing(jobID int64) (ok bool) - MarkJobFinish() } type litBackendCtxMgr struct { generic.SyncMap[int64, *litBackendCtx] - memRoot MemRoot - diskRoot DiskRoot - processingJobID int64 - lastLoggingTime time.Time - mu sync.Mutex + memRoot MemRoot + diskRoot DiskRoot } func newLitBackendCtxMgr(path string, memQuota uint64) BackendCtxMgr { @@ -69,30 +63,6 @@ func newLitBackendCtxMgr(path string, memQuota uint64) BackendCtxMgr { return mgr } -// MarkJobProcessing marks ingest backfill is processing. -func (m *litBackendCtxMgr) MarkJobProcessing(jobID int64) bool { - m.mu.Lock() - defer m.mu.Unlock() - if m.processingJobID == 0 || m.processingJobID == jobID { - m.processingJobID = jobID - return true - } - if time.Since(m.lastLoggingTime) > 1*time.Minute { - logutil.BgLogger().Info("ingest backfill worker is already in used by another DDL job", - zap.String("category", "ddl-ingest"), - zap.Int64("processing job ID", m.processingJobID)) - m.lastLoggingTime = time.Now() - } - return false -} - -// MarkJobFinish marks ingest backfill is finished. -func (m *litBackendCtxMgr) MarkJobFinish() { - m.mu.Lock() - m.processingJobID = 0 - m.mu.Unlock() -} - // CheckAvailable checks if the ingest backfill is available. func (m *litBackendCtxMgr) CheckAvailable() (bool, error) { if err := m.diskRoot.PreCheckUsage(); err != nil { @@ -102,6 +72,9 @@ func (m *litBackendCtxMgr) CheckAvailable() (bool, error) { return true, nil } +// ResignOwnerForTest is only used for test. +var ResignOwnerForTest = atomic.NewBool(false) + // Register creates a new backend and registers it to the backend context. func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client, pdAddr string, resourceGroupName string) (BackendCtx, error) { bc, exist := m.Load(jobID) diff --git a/pkg/ddl/ingest/mock.go b/pkg/ddl/ingest/mock.go index 52ae4bd17ddf4..41a73e1853b99 100644 --- a/pkg/ddl/ingest/mock.go +++ b/pkg/ddl/ingest/mock.go @@ -45,15 +45,6 @@ func NewMockBackendCtxMgr(sessCtxProvider func() sessionctx.Context) *MockBacken } } -// MarkJobProcessing implements BackendCtxMgr.MarkJobProcessing interface. -func (*MockBackendCtxMgr) MarkJobProcessing(_ int64) bool { - return true -} - -// MarkJobFinish implements BackendCtxMgr.MarkJobFinish interface. -func (*MockBackendCtxMgr) MarkJobFinish() { -} - // CheckAvailable implements BackendCtxMgr.Available interface. func (m *MockBackendCtxMgr) CheckAvailable() (bool, error) { return len(m.runningJobs) == 0, nil diff --git a/pkg/ddl/job_table.go b/pkg/ddl/job_table.go index 903a609af4927..60d38c911b728 100644 --- a/pkg/ddl/job_table.go +++ b/pkg/ddl/job_table.go @@ -224,17 +224,6 @@ func (d *ddl) getReorgJob(sess *sess.Session) (*model.Job, error) { if !d.runningJobs.checkRunnable(job) { return false, nil } - if (job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey) && - job.State == model.JobStateQueueing && - job.ReorgMeta != nil && - job.ReorgMeta.IsFastReorg && - ingest.LitBackCtxMgr != nil { - succeed := ingest.LitBackCtxMgr.MarkJobProcessing(job.ID) - if !succeed { - // We only allow one task to use ingest at the same time in order to limit the CPU/memory usage. - return false, nil - } - } // Check if there is any block ddl running, like drop schema and flashback cluster. sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job where "+ "(CONCAT(',', schema_ids, ',') REGEXP CONCAT(',', %s, ',') != 0 and type = %d and processing) "+ @@ -273,6 +262,15 @@ func (d *ddl) startDispatchLoop() { time.Sleep(dispatchLoopWaitingDuration) continue } + failpoint.Inject("ownerResignAfterDispatchLoopCheck", func() { + if ingest.ResignOwnerForTest.Load() { + err2 := d.ownerManager.ResignOwner(context.Background()) + if err2 != nil { + logutil.BgLogger().Info("resign meet error", zap.Error(err2)) + } + ingest.ResignOwnerForTest.Store(false) + } + }) select { case <-d.ddlJobCh: case <-ticker.C: diff --git a/pkg/owner/manager.go b/pkg/owner/manager.go index d5f7ddb46779f..fa3d2734bb2af 100644 --- a/pkg/owner/manager.go +++ b/pkg/owner/manager.go @@ -65,8 +65,6 @@ type Manager interface { // SetBeOwnerHook sets a hook. The hook is called before becoming an owner. SetBeOwnerHook(hook func()) - // SetRetireOwnerHook will be called after retiring the owner. - SetRetireOwnerHook(hook func()) } const ( @@ -118,8 +116,7 @@ type ownerManager struct { wg sync.WaitGroup campaignCancel context.CancelFunc - beOwnerHook func() - retireOwnerHook func() + beOwnerHook func() } // NewOwnerManager creates a new Manager. @@ -164,10 +161,6 @@ func (m *ownerManager) SetBeOwnerHook(hook func()) { m.beOwnerHook = hook } -func (m *ownerManager) SetRetireOwnerHook(hook func()) { - m.retireOwnerHook = hook -} - // ManagerSessionTTL is the etcd session's TTL in seconds. It's exported for testing. var ManagerSessionTTL = 60 @@ -230,9 +223,6 @@ func (m *ownerManager) toBeOwner(elec *concurrency.Election) { // RetireOwner make the manager to be a not owner. func (m *ownerManager) RetireOwner() { - if m.retireOwnerHook != nil { - m.retireOwnerHook() - } atomic.StorePointer(&m.elec, nil) } diff --git a/pkg/owner/mock.go b/pkg/owner/mock.go index 2b625d225aee1..babc054ed17b4 100644 --- a/pkg/owner/mock.go +++ b/pkg/owner/mock.go @@ -174,11 +174,6 @@ func (m *mockManager) SetBeOwnerHook(hook func()) { m.beOwnerHook = hook } -// SetRetireOwnerHook implements Manager.SetRetireOwnerHook interface. -func (m *mockManager) SetRetireOwnerHook(hook func()) { - m.retireHook = hook -} - // CampaignCancel implements Manager.CampaignCancel interface func (m *mockManager) CampaignCancel() { m.campaignDone <- struct{}{} diff --git a/tests/realtikvtest/addindextest4/ingest_test.go b/tests/realtikvtest/addindextest4/ingest_test.go index c8c5db70a3c3e..596697df37824 100644 --- a/tests/realtikvtest/addindextest4/ingest_test.go +++ b/tests/realtikvtest/addindextest4/ingest_test.go @@ -538,3 +538,45 @@ func TestAddUniqueIndexDuplicatedError(t *testing.T) { tk.MustExec("INSERT INTO `b1cce552` (`f5d9aecb`, `d9337060`, `4c74082f`, `9215adc3`, `85ad5a07`, `8c60260f`, `8069da7b`, `91e218e1`) VALUES ('2031-12-22 06:44:52', 'duplicatevalue', 2028, NULL, 846, 'N6QD1=@ped@owVoJx', '9soPM2d6H', 'Tv%'), ('2031-12-22 06:44:52', 'duplicatevalue', 2028, NULL, 9052, '_HWaf#gD!bw', '9soPM2d6H', 'Tv%');") tk.MustGetErrCode("ALTER TABLE `b1cce552` ADD unique INDEX `65290727` (`4c74082f`, `d9337060`, `8069da7b`);", errno.ErrDupEntry) } + +func TestFirstLitSlowStart(t *testing.T) { + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("drop database if exists addindexlit;") + tk.MustExec("create database addindexlit;") + tk.MustExec("use addindexlit;") + tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + + tk.MustExec("create table t(a int, b int);") + tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3);") + tk.MustExec("create table t2(a int, b int);") + tk.MustExec("insert into t2 values (1, 1), (2, 2), (3, 3);") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use addindexlit;") + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/ingest/beforeCreateLocalBackend", "1*return()")) + t.Cleanup(func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/ingest/beforeCreateLocalBackend")) + }) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/ownerResignAfterDispatchLoopCheck", "return()")) + t.Cleanup(func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/ownerResignAfterDispatchLoopCheck")) + }) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/slowCreateFS", "return()")) + t.Cleanup(func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/slowCreateFS")) + }) + + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + tk.MustExec("alter table t add unique index idx(a);") + }() + go func() { + defer wg.Done() + tk1.MustExec("alter table t2 add unique index idx(a);") + }() + wg.Wait() +}