diff --git a/ddl/BUILD.bazel b/ddl/BUILD.bazel index e463908d31266..ef5dcedee95d3 100644 --- a/ddl/BUILD.bazel +++ b/ddl/BUILD.bazel @@ -43,6 +43,7 @@ go_library( "resource_group.go", "rollingback.go", "sanity_check.go", + "scheduler.go", "schema.go", "sequence.go", "split_region.go", @@ -68,6 +69,7 @@ go_library( "//distsql", "//disttask/framework/dispatcher", "//disttask/framework/proto", + "//disttask/framework/scheduler", "//domain/infosync", "//expression", "//infoschema", diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 56de409d4c969..a5860967f8287 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -519,21 +519,23 @@ func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey } type resultConsumer struct { - dc *ddlCtx - wg *sync.WaitGroup - err error - hasError *atomic.Bool - reorgInfo *reorgInfo // reorgInfo is used to update the reorg handle. - sessPool *sess.Pool // sessPool is used to get the session to update the reorg handle. + dc *ddlCtx + wg *sync.WaitGroup + err error + hasError *atomic.Bool + reorgInfo *reorgInfo // reorgInfo is used to update the reorg handle. + sessPool *sess.Pool // sessPool is used to get the session to update the reorg handle. + distribute bool } -func newResultConsumer(dc *ddlCtx, reorgInfo *reorgInfo, sessPool *sess.Pool) *resultConsumer { +func newResultConsumer(dc *ddlCtx, reorgInfo *reorgInfo, sessPool *sess.Pool, distribute bool) *resultConsumer { return &resultConsumer{ - dc: dc, - wg: &sync.WaitGroup{}, - hasError: &atomic.Bool{}, - reorgInfo: reorgInfo, - sessPool: sessPool, + dc: dc, + wg: &sync.WaitGroup{}, + hasError: &atomic.Bool{}, + reorgInfo: reorgInfo, + sessPool: sessPool, + distribute: distribute, } } @@ -597,30 +599,34 @@ func handleOneResult(result *backfillResult, scheduler backfillScheduler, consum return result.err } *totalAddedCount += int64(result.addedCount) - reorgCtx := consumer.dc.getReorgCtx(reorgInfo.Job.ID) - reorgCtx.setRowCount(*totalAddedCount) + if !consumer.distribute { + reorgCtx := consumer.dc.getReorgCtx(reorgInfo.Job.ID) + reorgCtx.setRowCount(*totalAddedCount) + } keeper.updateNextKey(result.taskID, result.nextKey) if taskSeq%(scheduler.currentWorkerSize()*4) == 0 { - err := consumer.dc.isReorgRunnable(reorgInfo.ID, false) - if err != nil { - logutil.BgLogger().Warn("[ddl] backfill worker is not runnable", zap.Error(err)) - scheduler.drainTasks() // Make it quit early. - return err - } - failpoint.Inject("MockGetIndexRecordErr", func() { - // Make sure this job didn't failed because by the "Write conflict" error. - if dbterror.ErrNotOwner.Equal(err) { - time.Sleep(50 * time.Millisecond) + if !consumer.distribute { + err := consumer.dc.isReorgRunnable(reorgInfo.ID, consumer.distribute) + if err != nil { + logutil.BgLogger().Warn("[ddl] backfill worker is not runnable", zap.Error(err)) + scheduler.drainTasks() // Make it quit early. + return err + } + failpoint.Inject("MockGetIndexRecordErr", func() { + // Make sure this job didn't failed because by the "Write conflict" error. + if dbterror.ErrNotOwner.Equal(err) { + time.Sleep(50 * time.Millisecond) + } + }) + err = reorgInfo.UpdateReorgMeta(keeper.nextKey, consumer.sessPool) + if err != nil { + logutil.BgLogger().Warn("[ddl] update reorg meta failed", + zap.Int64("job ID", reorgInfo.ID), zap.Error(err)) } - }) - err = reorgInfo.UpdateReorgMeta(keeper.nextKey, consumer.sessPool) - if err != nil { - logutil.BgLogger().Warn("[ddl] update reorg meta failed", - zap.Int64("job ID", reorgInfo.ID), zap.Error(err)) } // We try to adjust the worker size regularly to reduce // the overhead of loading the DDL related global variables. - err = scheduler.adjustWorkerSize() + err := scheduler.adjustWorkerSize() if err != nil { logutil.BgLogger().Warn("[ddl] cannot adjust backfill worker size", zap.Int64("job ID", reorgInfo.ID), zap.Error(err)) @@ -788,7 +794,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sess.Pool, t table.Physical } defer scheduler.close(true) - consumer := newResultConsumer(dc, reorgInfo, sessPool) + consumer := newResultConsumer(dc, reorgInfo, sessPool, false) consumer.run(scheduler, startKey, &totalAddedCount) err = scheduler.setupWorkers() diff --git a/ddl/backfilling_scheduler.go b/ddl/backfilling_scheduler.go index a4aa6e54fd62b..30ea3ed8f64f7 100644 --- a/ddl/backfilling_scheduler.go +++ b/ddl/backfilling_scheduler.go @@ -79,7 +79,7 @@ func newBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *sess.P tp backfillerType, tbl table.PhysicalTable, sessCtx sessionctx.Context, jobCtx *JobContext) (backfillScheduler, error) { if tp == typeAddIndexWorker && info.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge { - return newIngestBackfillScheduler(ctx, info, tbl), nil + return newIngestBackfillScheduler(ctx, info, tbl, false), nil } return newTxnBackfillScheduler(ctx, info, sessPool, tp, tbl, sessCtx, jobCtx) } @@ -267,16 +267,18 @@ type ingestBackfillScheduler struct { writerMaxID int poolErr chan error backendCtx *ingest.BackendContext + distribute bool } -func newIngestBackfillScheduler(ctx context.Context, info *reorgInfo, tbl table.PhysicalTable) *ingestBackfillScheduler { +func newIngestBackfillScheduler(ctx context.Context, info *reorgInfo, tbl table.PhysicalTable, distribute bool) *ingestBackfillScheduler { return &ingestBackfillScheduler{ - ctx: ctx, - reorgInfo: info, - tbl: tbl, - taskCh: make(chan *reorgBackfillTask, backfillTaskChanSize), - resultCh: make(chan *backfillResult, backfillTaskChanSize), - poolErr: make(chan error), + ctx: ctx, + reorgInfo: info, + tbl: tbl, + taskCh: make(chan *reorgBackfillTask, backfillTaskChanSize), + resultCh: make(chan *backfillResult, backfillTaskChanSize), + poolErr: make(chan error), + distribute: distribute, } } @@ -374,8 +376,7 @@ func (b *ingestBackfillScheduler) createWorker() workerpool.Worker[idxRecResult] zap.Int64("job ID", reorgInfo.ID), zap.Int64("index ID", b.reorgInfo.currElement.ID)) return nil } - worker, err := newAddIndexIngestWorker(b.tbl, reorgInfo.d, ei, b.resultCh, job.ID, - reorgInfo.SchemaName, b.reorgInfo.currElement.ID, b.writerMaxID, b.copReqSenderPool, sessCtx) + worker, err := newAddIndexIngestWorker(b.tbl, reorgInfo.d, ei, b.resultCh, job.ID, reorgInfo.SchemaName, b.reorgInfo.currElement.ID, b.writerMaxID, b.copReqSenderPool, sessCtx, b.distribute) if err != nil { // Return an error only if it is the first worker. if b.writerMaxID == 0 { @@ -433,11 +434,13 @@ func (w *addIndexIngestWorker) HandleTask(rs idxRecResult) { w.resultCh <- result return } - err := w.d.isReorgRunnable(w.jobID, false) - if err != nil { - result.err = err - w.resultCh <- result - return + if !w.distribute { + err := w.d.isReorgRunnable(w.jobID, false) + if err != nil { + result.err = err + w.resultCh <- result + return + } } count, nextKey, err := w.WriteLocal(&rs) if err != nil { diff --git a/ddl/ddl.go b/ddl/ddl.go index f6d21adcb6c5a..d2cd98a8b91f7 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -39,6 +39,7 @@ import ( sess "github.com/pingcap/tidb/ddl/internal/session" "github.com/pingcap/tidb/ddl/syncer" "github.com/pingcap/tidb/ddl/util" + "github.com/pingcap/tidb/disttask/framework/scheduler" "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" @@ -681,6 +682,11 @@ func newDDL(ctx context.Context, options ...Option) *ddl { ddlJobCh: make(chan struct{}, 100), } + scheduler.RegisterSchedulerConstructor("backfill", + func(taskMeta []byte, step int64) (scheduler.Scheduler, error) { + return NewBackfillSchedulerHandle(taskMeta, d) + }) + // Register functions for enable/disable ddl when changing system variable `tidb_enable_ddl`. variable.EnableDDL = d.EnableDDL variable.DisableDDL = d.DisableDDL diff --git a/ddl/index.go b/ddl/index.go index 0acfd4fd79f6f..056e80e887bb9 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1650,13 +1650,12 @@ type addIndexIngestWorker struct { writerCtx *ingest.WriterContext copReqSenderPool *copReqSenderPool - resultCh chan *backfillResult - jobID int64 + resultCh chan *backfillResult + jobID int64 + distribute bool } -func newAddIndexIngestWorker(t table.PhysicalTable, d *ddlCtx, ei *ingest.EngineInfo, - resultCh chan *backfillResult, jobID int64, schemaName string, indexID int64, writerID int, - copReqSenderPool *copReqSenderPool, sessCtx sessionctx.Context) (*addIndexIngestWorker, error) { +func newAddIndexIngestWorker(t table.PhysicalTable, d *ddlCtx, ei *ingest.EngineInfo, resultCh chan *backfillResult, jobID int64, schemaName string, indexID int64, writerID int, copReqSenderPool *copReqSenderPool, sessCtx sessionctx.Context, distribute bool) (*addIndexIngestWorker, error) { indexInfo := model.FindIndexInfoByID(t.Meta().Indices, indexID) index := tables.NewIndex(t.GetPhysicalID(), t.Meta(), indexInfo) lwCtx, err := ei.NewWriterCtx(writerID, indexInfo.Unique) @@ -1675,6 +1674,7 @@ func newAddIndexIngestWorker(t table.PhysicalTable, d *ddlCtx, ei *ingest.Engine copReqSenderPool: copReqSenderPool, resultCh: resultCh, jobID: jobID, + distribute: distribute, }, nil } diff --git a/ddl/scheduler.go b/ddl/scheduler.go new file mode 100644 index 0000000000000..8e5dcc4e89a9b --- /dev/null +++ b/ddl/scheduler.go @@ -0,0 +1,214 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl + +import ( + "context" + "encoding/hex" + "encoding/json" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/ddl/ingest" + "github.com/pingcap/tidb/disttask/framework/proto" + "github.com/pingcap/tidb/disttask/framework/scheduler" + "github.com/pingcap/tidb/meta" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" +) + +type backfillSchedulerHandle struct { + d *ddl + db *model.DBInfo + index *model.IndexInfo + job *model.Job + bc *ingest.BackendContext + ptbl table.PhysicalTable + jc *JobContext + eleTypeKey []byte + totalRowCnt int64 +} + +// BackfillGlobalMeta is the global task meta for backfilling index. +type BackfillGlobalMeta struct { + Job model.Job `json:"job"` + EleID int64 `json:"ele_id"` + EleTypeKey []byte `json:"ele_type_key"` +} + +// BackfillSubTaskMeta is the sub-task meta for backfilling index. +type BackfillSubTaskMeta struct { + PhysicalTableID int64 `json:"physical_table_id"` +} + +// BackfillMinimalTask is the minimal-task for backfilling index. +type BackfillMinimalTask struct { +} + +// IsMinimalTask implements the MinimalTask interface. +func (b *BackfillMinimalTask) IsMinimalTask() { +} + +// NewBackfillSchedulerHandle creates a new backfill scheduler. +func NewBackfillSchedulerHandle(taskMeta []byte, d *ddl) (scheduler.Scheduler, error) { + bh := &backfillSchedulerHandle{d: d} + + bgm := &BackfillGlobalMeta{} + err := json.Unmarshal(taskMeta, bgm) + if err != nil { + return nil, err + } + + bh.eleTypeKey = bgm.EleTypeKey + jobMeta := &bgm.Job + bh.job = jobMeta + + db, tbl, err := d.getTableByTxn(d.store, jobMeta.SchemaID, jobMeta.TableID) + if err != nil { + return nil, err + } + bh.db = db + + physicalTable := tbl.(table.PhysicalTable) + bh.ptbl = physicalTable + + d.setDDLLabelForTopSQL(jobMeta.ID, jobMeta.Query) + d.setDDLSourceForDiagnosis(jobMeta.ID, jobMeta.Type) + jobCtx := d.jobContext(jobMeta.ID) + bh.jc = jobCtx + + // Build reader. + indexInfo := model.FindIndexInfoByID(tbl.Meta().Indices, bgm.EleID) + if indexInfo == nil { + logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", + zap.Int64("table ID", tbl.Meta().ID), zap.Int64("index ID", bgm.EleID)) + return nil, errors.New("cannot find index info") + } + bh.index = indexInfo + + return bh, nil +} + +// InitSubtaskExecEnv implements the Scheduler interface. +func (b *backfillSchedulerHandle) InitSubtaskExecEnv(context.Context) error { + logutil.BgLogger().Info("[ddl] lightning init subtask exec env") + d := b.d + + bc, err := ingest.LitBackCtxMgr.Register(d.ctx, b.index.Unique, b.job.ID, b.job.ReorgMeta.SQLMode) + if err != nil { + logutil.BgLogger().Warn("[ddl] lightning register error", zap.Error(err)) + return err + } + b.bc = bc + return nil +} + +// SplitSubtask implements the Scheduler interface. +func (b *backfillSchedulerHandle) SplitSubtask(subtask []byte) ([]proto.MinimalTask, error) { + logutil.BgLogger().Info("[ddl] lightning split subtask") + + d := b.d + sm := &BackfillSubTaskMeta{} + err := json.Unmarshal(subtask, sm) + if err != nil { + logutil.BgLogger().Error("[ddl] unmarshal error", zap.Error(err)) + return nil, err + } + + pid := sm.PhysicalTableID + parTbl := b.ptbl.(table.PartitionedTable) + + startKey, endKey, err := getTableRange(b.jc, d.ddlCtx, parTbl.GetPartition(pid), b.job.SnapshotVer, b.job.Priority) + if err != nil { + logutil.BgLogger().Error("[ddl] get table range error", zap.Error(err)) + return nil, err + } + + mockReorgInfo := &reorgInfo{Job: b.job, d: d.ddlCtx} + elements := make([]*meta.Element, 0) + elements = append(elements, &meta.Element{ID: b.index.ID, TypeKey: meta.IndexElementKey}) + mockReorgInfo.elements = elements + mockReorgInfo.currElement = mockReorgInfo.elements[0] + + ingestScheduler := newIngestBackfillScheduler(d.ctx, mockReorgInfo, parTbl.GetPartition(pid), true) + defer ingestScheduler.close(true) + + consumer := newResultConsumer(d.ddlCtx, mockReorgInfo, nil, true) + consumer.run(ingestScheduler, startKey, &b.totalRowCnt) + + err = ingestScheduler.setupWorkers() + if err != nil { + logutil.BgLogger().Error("[ddl] setup workers error", zap.Error(err)) + return nil, err + } + + for { + kvRanges, err := splitTableRanges(b.ptbl, d.store, startKey, endKey, backfillTaskChanSize) + if err != nil { + return nil, err + } + if len(kvRanges) == 0 { + break + } + + logutil.BgLogger().Info("[ddl] start backfill workers to reorg record", + zap.Int("workerCnt", ingestScheduler.currentWorkerSize()), + zap.Int("regionCnt", len(kvRanges)), + zap.String("startKey", hex.EncodeToString(startKey)), + zap.String("endKey", hex.EncodeToString(endKey))) + + sendTasks(ingestScheduler, consumer, parTbl.GetPartition(pid), kvRanges, mockReorgInfo) + if consumer.shouldAbort() { + break + } + rangeEndKey := kvRanges[len(kvRanges)-1].EndKey + startKey = rangeEndKey.Next() + if startKey.Cmp(endKey) >= 0 { + break + } + } + ingestScheduler.close(false) + // TODO: unsafe import. + return nil, consumer.getResult() +} + +// CleanupSubtaskExecEnv implements the Scheduler interface. +func (b *backfillSchedulerHandle) CleanupSubtaskExecEnv(context.Context) error { + logutil.BgLogger().Info("[ddl] lightning cleanup subtask exec env") + + err := b.bc.FinishImport(b.index.ID, b.index.Unique, b.ptbl) + if err != nil { + return err + } + + b.bc.EngMgr.UnregisterAll(b.job.ID) + return nil +} + +// Rollback implements the Scheduler interface. +func (b *backfillSchedulerHandle) Rollback(context.Context) error { + return nil +} + +// BackFillSubtaskExecutor is the executor for backfill subtask. +type BackFillSubtaskExecutor struct { + Task proto.MinimalTask +} + +// Run implements the Executor interface. +func (b *BackFillSubtaskExecutor) Run(ctx context.Context) error { + return nil +} diff --git a/disttask/framework/framework_test.go b/disttask/framework/framework_test.go index ec51ed8cece3a..ba326f3a6169b 100644 --- a/disttask/framework/framework_test.go +++ b/disttask/framework/framework_test.go @@ -58,12 +58,12 @@ func (t *testScheduler) CleanupSubtaskExecEnv(_ context.Context) error { return func (t *testScheduler) Rollback(_ context.Context) error { return nil } -func (t *testScheduler) SplitSubtask(_ []byte) []proto.MinimalTask { +func (t *testScheduler) SplitSubtask(subtask []byte) ([]proto.MinimalTask, error) { return []proto.MinimalTask{ testMiniTask{}, testMiniTask{}, testMiniTask{}, - } + }, nil } type testSubtaskExecutor struct { diff --git a/disttask/framework/scheduler/interface.go b/disttask/framework/scheduler/interface.go index 81db9823b787e..46aceb8bd9711 100644 --- a/disttask/framework/scheduler/interface.go +++ b/disttask/framework/scheduler/interface.go @@ -48,7 +48,7 @@ type InternalScheduler interface { // User should implement this interface to define their own scheduler. type Scheduler interface { InitSubtaskExecEnv(context.Context) error - SplitSubtask(subtask []byte) []proto.MinimalTask + SplitSubtask(subtask []byte) ([]proto.MinimalTask, error) CleanupSubtaskExecEnv(context.Context) error Rollback(context.Context) error } diff --git a/disttask/framework/scheduler/interface_mock.go b/disttask/framework/scheduler/interface_mock.go index 07e54687c3ab7..42d0e93ad09ee 100644 --- a/disttask/framework/scheduler/interface_mock.go +++ b/disttask/framework/scheduler/interface_mock.go @@ -124,9 +124,9 @@ func (m *MockScheduler) InitSubtaskExecEnv(ctx context.Context) error { } // SplitSubtask implements Scheduler.SplitSubtask. -func (m *MockScheduler) SplitSubtask(subtask []byte) []proto.MinimalTask { +func (m *MockScheduler) SplitSubtask(subtask []byte) ([]proto.MinimalTask, error) { args := m.Called(subtask) - return args.Get(0).([]proto.MinimalTask) + return args.Get(0).([]proto.MinimalTask), nil } // CleanupSubtaskExecEnv implements Scheduler.CleanupSubtaskExecEnv. diff --git a/disttask/framework/scheduler/scheduler.go b/disttask/framework/scheduler/scheduler.go index b479beaed42e1..5dc0dd029a452 100644 --- a/disttask/framework/scheduler/scheduler.go +++ b/disttask/framework/scheduler/scheduler.go @@ -139,7 +139,11 @@ func (s *InternalSchedulerImpl) Run(ctx context.Context, task *proto.Task) error break } - minimalTasks := scheduler.SplitSubtask(subtask.Meta) + minimalTasks, err := scheduler.SplitSubtask(subtask.Meta) + if err != nil { + s.onError(err) + break + } logutil.Logger(s.logCtx).Info("split subTask", zap.Any("cnt", len(minimalTasks)), zap.Any("subtask_id", subtask.ID)) for _, minimalTask := range minimalTasks { minimalTaskWg.Add(1) diff --git a/domain/domain.go b/domain/domain.go index e7cb29d00de88..7e0e28699ff5c 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1113,6 +1113,7 @@ func (do *Domain) Init( ddl.WithHook(callback), ddl.WithLease(ddlLease), ) + failpoint.Inject("MockReplaceDDL", func(val failpoint.Value) { if val.(bool) { do.ddl = d