Skip to content

Commit

Permalink
*: implement scheduler for adding index (#42753)
Browse files Browse the repository at this point in the history
close #42754
  • Loading branch information
wjhuang2016 authored Apr 11, 2023
1 parent 8eb580e commit 7c0097d
Show file tree
Hide file tree
Showing 11 changed files with 293 additions and 57 deletions.
2 changes: 2 additions & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ go_library(
"resource_group.go",
"rollingback.go",
"sanity_check.go",
"scheduler.go",
"schema.go",
"sequence.go",
"split_region.go",
Expand All @@ -68,6 +69,7 @@ go_library(
"//distsql",
"//disttask/framework/dispatcher",
"//disttask/framework/proto",
"//disttask/framework/scheduler",
"//domain/infosync",
"//expression",
"//infoschema",
Expand Down
68 changes: 37 additions & 31 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,21 +519,23 @@ func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey
}

type resultConsumer struct {
dc *ddlCtx
wg *sync.WaitGroup
err error
hasError *atomic.Bool
reorgInfo *reorgInfo // reorgInfo is used to update the reorg handle.
sessPool *sess.Pool // sessPool is used to get the session to update the reorg handle.
dc *ddlCtx
wg *sync.WaitGroup
err error
hasError *atomic.Bool
reorgInfo *reorgInfo // reorgInfo is used to update the reorg handle.
sessPool *sess.Pool // sessPool is used to get the session to update the reorg handle.
distribute bool
}

func newResultConsumer(dc *ddlCtx, reorgInfo *reorgInfo, sessPool *sess.Pool) *resultConsumer {
func newResultConsumer(dc *ddlCtx, reorgInfo *reorgInfo, sessPool *sess.Pool, distribute bool) *resultConsumer {
return &resultConsumer{
dc: dc,
wg: &sync.WaitGroup{},
hasError: &atomic.Bool{},
reorgInfo: reorgInfo,
sessPool: sessPool,
dc: dc,
wg: &sync.WaitGroup{},
hasError: &atomic.Bool{},
reorgInfo: reorgInfo,
sessPool: sessPool,
distribute: distribute,
}
}

Expand Down Expand Up @@ -597,30 +599,34 @@ func handleOneResult(result *backfillResult, scheduler backfillScheduler, consum
return result.err
}
*totalAddedCount += int64(result.addedCount)
reorgCtx := consumer.dc.getReorgCtx(reorgInfo.Job.ID)
reorgCtx.setRowCount(*totalAddedCount)
if !consumer.distribute {
reorgCtx := consumer.dc.getReorgCtx(reorgInfo.Job.ID)
reorgCtx.setRowCount(*totalAddedCount)
}
keeper.updateNextKey(result.taskID, result.nextKey)
if taskSeq%(scheduler.currentWorkerSize()*4) == 0 {
err := consumer.dc.isReorgRunnable(reorgInfo.ID, false)
if err != nil {
logutil.BgLogger().Warn("[ddl] backfill worker is not runnable", zap.Error(err))
scheduler.drainTasks() // Make it quit early.
return err
}
failpoint.Inject("MockGetIndexRecordErr", func() {
// Make sure this job didn't failed because by the "Write conflict" error.
if dbterror.ErrNotOwner.Equal(err) {
time.Sleep(50 * time.Millisecond)
if !consumer.distribute {
err := consumer.dc.isReorgRunnable(reorgInfo.ID, consumer.distribute)
if err != nil {
logutil.BgLogger().Warn("[ddl] backfill worker is not runnable", zap.Error(err))
scheduler.drainTasks() // Make it quit early.
return err
}
failpoint.Inject("MockGetIndexRecordErr", func() {
// Make sure this job didn't failed because by the "Write conflict" error.
if dbterror.ErrNotOwner.Equal(err) {
time.Sleep(50 * time.Millisecond)
}
})
err = reorgInfo.UpdateReorgMeta(keeper.nextKey, consumer.sessPool)
if err != nil {
logutil.BgLogger().Warn("[ddl] update reorg meta failed",
zap.Int64("job ID", reorgInfo.ID), zap.Error(err))
}
})
err = reorgInfo.UpdateReorgMeta(keeper.nextKey, consumer.sessPool)
if err != nil {
logutil.BgLogger().Warn("[ddl] update reorg meta failed",
zap.Int64("job ID", reorgInfo.ID), zap.Error(err))
}
// We try to adjust the worker size regularly to reduce
// the overhead of loading the DDL related global variables.
err = scheduler.adjustWorkerSize()
err := scheduler.adjustWorkerSize()
if err != nil {
logutil.BgLogger().Warn("[ddl] cannot adjust backfill worker size",
zap.Int64("job ID", reorgInfo.ID), zap.Error(err))
Expand Down Expand Up @@ -788,7 +794,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sess.Pool, t table.Physical
}
defer scheduler.close(true)

consumer := newResultConsumer(dc, reorgInfo, sessPool)
consumer := newResultConsumer(dc, reorgInfo, sessPool, false)
consumer.run(scheduler, startKey, &totalAddedCount)

err = scheduler.setupWorkers()
Expand Down
33 changes: 18 additions & 15 deletions ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func newBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *sess.P
tp backfillerType, tbl table.PhysicalTable, sessCtx sessionctx.Context,
jobCtx *JobContext) (backfillScheduler, error) {
if tp == typeAddIndexWorker && info.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
return newIngestBackfillScheduler(ctx, info, tbl), nil
return newIngestBackfillScheduler(ctx, info, tbl, false), nil
}
return newTxnBackfillScheduler(ctx, info, sessPool, tp, tbl, sessCtx, jobCtx)
}
Expand Down Expand Up @@ -267,16 +267,18 @@ type ingestBackfillScheduler struct {
writerMaxID int
poolErr chan error
backendCtx *ingest.BackendContext
distribute bool
}

func newIngestBackfillScheduler(ctx context.Context, info *reorgInfo, tbl table.PhysicalTable) *ingestBackfillScheduler {
func newIngestBackfillScheduler(ctx context.Context, info *reorgInfo, tbl table.PhysicalTable, distribute bool) *ingestBackfillScheduler {
return &ingestBackfillScheduler{
ctx: ctx,
reorgInfo: info,
tbl: tbl,
taskCh: make(chan *reorgBackfillTask, backfillTaskChanSize),
resultCh: make(chan *backfillResult, backfillTaskChanSize),
poolErr: make(chan error),
ctx: ctx,
reorgInfo: info,
tbl: tbl,
taskCh: make(chan *reorgBackfillTask, backfillTaskChanSize),
resultCh: make(chan *backfillResult, backfillTaskChanSize),
poolErr: make(chan error),
distribute: distribute,
}
}

Expand Down Expand Up @@ -374,8 +376,7 @@ func (b *ingestBackfillScheduler) createWorker() workerpool.Worker[idxRecResult]
zap.Int64("job ID", reorgInfo.ID), zap.Int64("index ID", b.reorgInfo.currElement.ID))
return nil
}
worker, err := newAddIndexIngestWorker(b.tbl, reorgInfo.d, ei, b.resultCh, job.ID,
reorgInfo.SchemaName, b.reorgInfo.currElement.ID, b.writerMaxID, b.copReqSenderPool, sessCtx)
worker, err := newAddIndexIngestWorker(b.tbl, reorgInfo.d, ei, b.resultCh, job.ID, reorgInfo.SchemaName, b.reorgInfo.currElement.ID, b.writerMaxID, b.copReqSenderPool, sessCtx, b.distribute)
if err != nil {
// Return an error only if it is the first worker.
if b.writerMaxID == 0 {
Expand Down Expand Up @@ -433,11 +434,13 @@ func (w *addIndexIngestWorker) HandleTask(rs idxRecResult) {
w.resultCh <- result
return
}
err := w.d.isReorgRunnable(w.jobID, false)
if err != nil {
result.err = err
w.resultCh <- result
return
if !w.distribute {
err := w.d.isReorgRunnable(w.jobID, false)
if err != nil {
result.err = err
w.resultCh <- result
return
}
}
count, nextKey, err := w.WriteLocal(&rs)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
sess "github.com/pingcap/tidb/ddl/internal/session"
"github.com/pingcap/tidb/ddl/syncer"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/disttask/framework/scheduler"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -681,6 +682,11 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
ddlJobCh: make(chan struct{}, 100),
}

scheduler.RegisterSchedulerConstructor("backfill",
func(taskMeta []byte, step int64) (scheduler.Scheduler, error) {
return NewBackfillSchedulerHandle(taskMeta, d)
})

// Register functions for enable/disable ddl when changing system variable `tidb_enable_ddl`.
variable.EnableDDL = d.EnableDDL
variable.DisableDDL = d.DisableDDL
Expand Down
10 changes: 5 additions & 5 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1650,13 +1650,12 @@ type addIndexIngestWorker struct {
writerCtx *ingest.WriterContext
copReqSenderPool *copReqSenderPool

resultCh chan *backfillResult
jobID int64
resultCh chan *backfillResult
jobID int64
distribute bool
}

func newAddIndexIngestWorker(t table.PhysicalTable, d *ddlCtx, ei *ingest.EngineInfo,
resultCh chan *backfillResult, jobID int64, schemaName string, indexID int64, writerID int,
copReqSenderPool *copReqSenderPool, sessCtx sessionctx.Context) (*addIndexIngestWorker, error) {
func newAddIndexIngestWorker(t table.PhysicalTable, d *ddlCtx, ei *ingest.EngineInfo, resultCh chan *backfillResult, jobID int64, schemaName string, indexID int64, writerID int, copReqSenderPool *copReqSenderPool, sessCtx sessionctx.Context, distribute bool) (*addIndexIngestWorker, error) {
indexInfo := model.FindIndexInfoByID(t.Meta().Indices, indexID)
index := tables.NewIndex(t.GetPhysicalID(), t.Meta(), indexInfo)
lwCtx, err := ei.NewWriterCtx(writerID, indexInfo.Unique)
Expand All @@ -1675,6 +1674,7 @@ func newAddIndexIngestWorker(t table.PhysicalTable, d *ddlCtx, ei *ingest.Engine
copReqSenderPool: copReqSenderPool,
resultCh: resultCh,
jobID: jobID,
distribute: distribute,
}, nil
}

Expand Down
Loading

0 comments on commit 7c0097d

Please sign in to comment.