diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index 4fbed0422726d..3b14c41e32cc5 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -178,6 +178,7 @@ func newBackfillCtx(id int, rInfo *reorgInfo, } exprCtx := sessCtx.GetExprCtx() + batchCnt := rInfo.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())) return &backfillCtx{ id: id, ddlCtx: rInfo.d, @@ -188,7 +189,7 @@ func newBackfillCtx(id int, rInfo *reorgInfo, loc: exprCtx.GetEvalCtx().Location(), schemaName: schemaName, table: tbl, - batchCnt: int(variable.GetDDLReorgBatchSize()), + batchCnt: batchCnt, jobContext: jobCtx, metricCounter: metrics.BackfillTotalCounter.WithLabelValues( metrics.GenerateReorgLabel(label, schemaName, tbl.Meta().Name.String())), @@ -415,7 +416,8 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { }) // Change the batch size dynamically. - w.GetCtx().batchCnt = int(variable.GetDDLReorgBatchSize()) + newBatchCnt := job.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())) + w.GetCtx().batchCnt = newBatchCnt result := w.handleBackfillTask(d, task, bf) w.sendResult(result) @@ -675,8 +677,9 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( //nolint: forcetypeassert discovery := dc.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery() + importConc := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) bcCtx, err := ingest.LitBackCtxMgr.Register( - ctx, job.ID, hasUnique, dc.etcdCli, discovery, job.ReorgMeta.ResourceGroupName) + ctx, job.ID, hasUnique, dc.etcdCli, discovery, job.ReorgMeta.ResourceGroupName, importConc) if err != nil { return errors.Trace(err) } @@ -705,16 +708,15 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( bcCtx.AttachCheckpointManager(cpMgr) } - reorgCtx := dc.getReorgCtx(reorgInfo.Job.ID) + reorgCtx := dc.getReorgCtx(job.ID) rowCntListener := &localRowCntListener{ prevPhysicalRowCnt: reorgCtx.getRowCount(), - reorgCtx: dc.getReorgCtx(reorgInfo.Job.ID), + reorgCtx: reorgCtx, counter: metrics.BackfillTotalCounter.WithLabelValues( metrics.GenerateReorgLabel("add_idx_rate", job.SchemaName, job.TableName)), } avgRowSize := estimateTableRowSize(ctx, dc.store, sctx.GetRestrictedSQLExecutor(), t) - concurrency := int(variable.GetDDLReorgWorkerCounter()) engines, err := bcCtx.Register(indexIDs, uniques, t) if err != nil { @@ -724,7 +726,6 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( zap.Int64s("index IDs", indexIDs)) return errors.Trace(err) } - pipe, err := NewAddIndexIngestPipeline( opCtx, dc.store, @@ -738,7 +739,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( reorgInfo.EndKey, job.ReorgMeta, avgRowSize, - concurrency, + importConc, cpMgr, rowCntListener, ) diff --git a/pkg/ddl/backfilling_dist_executor.go b/pkg/ddl/backfilling_dist_executor.go index c099c99750410..b635ddd83608c 100644 --- a/pkg/ddl/backfilling_dist_executor.go +++ b/pkg/ddl/backfilling_dist_executor.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/pkg/lightning/common" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/terror" + "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/util/dbterror" "github.com/tikv/client-go/v2/tikv" @@ -147,7 +148,14 @@ func (s *backfillDistExecutor) getBackendCtx() (ingest.BackendCtx, error) { ddlObj := s.d discovery := ddlObj.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery() - return ingest.LitBackCtxMgr.Register(s.BaseTaskExecutor.Ctx(), job.ID, hasUnique, ddlObj.etcdCli, discovery, job.ReorgMeta.ResourceGroupName) + return ingest.LitBackCtxMgr.Register( + s.BaseTaskExecutor.Ctx(), + job.ID, hasUnique, + ddlObj.etcdCli, + discovery, + job.ReorgMeta.ResourceGroupName, + job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())), + ) } func hasUniqueIndex(job *model.Job) (bool, error) { diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index 5ae5554ef290b..ca14b24807ed5 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -171,15 +171,11 @@ func NewAddIndexIngestPipeline( if err != nil { return nil, err } - poolSize := copReadChunkPoolSize() - srcChkPool := make(chan *chunk.Chunk, poolSize) - for i := 0; i < poolSize; i++ { - srcChkPool <- chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, copReadBatchSize()) - } + srcChkPool := createChunkPool(copCtx, concurrency, reorgMeta.BatchSize) readerCnt, writerCnt := expectedIngestWorkerCnt(concurrency, avgRowSize) srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, cpMgr) - scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, cpMgr) + scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, cpMgr, reorgMeta.BatchSize) ingestOp := NewIndexIngestOperator(ctx, copCtx, backendCtx, sessPool, tbl, indexes, engines, srcChkPool, writerCnt, reorgMeta, cpMgr, rowCntListener) sinkOp := newIndexWriteResultSink(ctx, backendCtx, tbl, indexes, cpMgr, rowCntListener) @@ -226,11 +222,7 @@ func NewWriteIndexToExternalStoragePipeline( if err != nil { return nil, err } - poolSize := copReadChunkPoolSize() - srcChkPool := make(chan *chunk.Chunk, poolSize) - for i := 0; i < poolSize; i++ { - srcChkPool <- chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, copReadBatchSize()) - } + srcChkPool := createChunkPool(copCtx, concurrency, reorgMeta.BatchSize) readerCnt, writerCnt := expectedIngestWorkerCnt(concurrency, avgRowSize) backend, err := storage.ParseBackend(extStoreURI, nil) @@ -248,7 +240,7 @@ func NewWriteIndexToExternalStoragePipeline( }) srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, nil) - scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, nil) + scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, nil, reorgMeta.BatchSize) writeOp := NewWriteExternalStoreOperator( ctx, copCtx, sessPool, jobID, subtaskID, tbl, indexes, extStore, srcChkPool, writerCnt, onClose, memSizePerIndex, reorgMeta) sinkOp := newIndexWriteResultSink(ctx, nil, tbl, indexes, nil, rowCntListener) @@ -270,6 +262,16 @@ func NewWriteIndexToExternalStoragePipeline( ), nil } +func createChunkPool(copCtx copr.CopContext, hintConc, hintBatchSize int) chan *chunk.Chunk { + poolSize := ingest.CopReadChunkPoolSize(hintConc) + batchSize := ingest.CopReadBatchSize(hintBatchSize) + srcChkPool := make(chan *chunk.Chunk, poolSize) + for i := 0; i < poolSize; i++ { + srcChkPool <- chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, batchSize) + } + return srcChkPool +} + // TableScanTask contains the start key and the end key of a region. type TableScanTask struct { ID int @@ -457,6 +459,7 @@ func NewTableScanOperator( srcChkPool chan *chunk.Chunk, concurrency int, cpMgr *ingest.CheckpointManager, + hintBatchSize int, ) *TableScanOperator { pool := workerpool.NewWorkerPool( "TableScanOperator", @@ -464,12 +467,13 @@ func NewTableScanOperator( concurrency, func() workerpool.Worker[TableScanTask, IndexRecordChunk] { return &tableScanWorker{ - ctx: ctx, - copCtx: copCtx, - sessPool: sessPool, - se: nil, - srcChkPool: srcChkPool, - cpMgr: cpMgr, + ctx: ctx, + copCtx: copCtx, + sessPool: sessPool, + se: nil, + srcChkPool: srcChkPool, + cpMgr: cpMgr, + hintBatchSize: hintBatchSize, } }) return &TableScanOperator{ @@ -484,7 +488,8 @@ type tableScanWorker struct { se *session.Session srcChkPool chan *chunk.Chunk - cpMgr *ingest.CheckpointManager + cpMgr *ingest.CheckpointManager + hintBatchSize int } func (w *tableScanWorker) HandleTask(task TableScanTask, sender func(IndexRecordChunk)) { @@ -554,7 +559,7 @@ func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecor func (w *tableScanWorker) getChunk() *chunk.Chunk { chk := <-w.srcChkPool - newCap := copReadBatchSize() + newCap := ingest.CopReadBatchSize(w.hintBatchSize) if chk.Capacity() != newCap { chk = chunk.NewChunkWithCapacity(w.copCtx.GetBase().FieldTypes, newCap) } diff --git a/pkg/ddl/backfilling_scheduler.go b/pkg/ddl/backfilling_scheduler.go index dc0201729787c..6d8954e2fad94 100644 --- a/pkg/ddl/backfilling_scheduler.go +++ b/pkg/ddl/backfilling_scheduler.go @@ -85,6 +85,7 @@ func newTxnBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *ses if err != nil { return nil, err } + workerCnt := info.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) return &txnBackfillScheduler{ ctx: ctx, reorgInfo: info, @@ -93,7 +94,7 @@ func newTxnBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *ses tbl: tbl, decodeColMap: decColMap, jobCtx: jobCtx, - workers: make([]*backfillWorker, 0, variable.GetDDLReorgWorkerCounter()), + workers: make([]*backfillWorker, 0, workerCnt), taskCh: make(chan *reorgBackfillTask, backfillTaskChanSize), resultCh: make(chan *backfillResult, backfillTaskChanSize), }, nil @@ -230,8 +231,8 @@ func restoreSessCtx(sessCtx sessionctx.Context) func(sessCtx sessionctx.Context) } } -func (*txnBackfillScheduler) expectedWorkerSize() (size int) { - workerCnt := int(variable.GetDDLReorgWorkerCounter()) +func (b *txnBackfillScheduler) expectedWorkerSize() (size int) { + workerCnt := b.reorgInfo.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) return min(workerCnt, maxBackfillWorkerSize) } diff --git a/pkg/ddl/cancel_test.go b/pkg/ddl/cancel_test.go index f095dae1aa54b..97272226ecd91 100644 --- a/pkg/ddl/cancel_test.go +++ b/pkg/ddl/cancel_test.go @@ -242,8 +242,8 @@ func TestCancel(t *testing.T) { // Change some configurations. ddl.ReorgWaitTimeout = 10 * time.Millisecond - tk.MustExec("set @@global.tidb_ddl_reorg_batch_size = 8") - tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 1") + tk.MustExec("set @@tidb_ddl_reorg_batch_size = 8") + tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 1") tk = testkit.NewTestKit(t, store) tk.MustExec("use test") require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockBackfillSlow", "return")) diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index 225ba84582730..1e25aa1258f29 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -4755,6 +4755,12 @@ func newReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) (*model. reorgMeta.IsDistReorg = variable.EnableDistTask.Load() reorgMeta.IsFastReorg = variable.EnableFastReorg.Load() reorgMeta.TargetScope = variable.ServiceScope.Load() + if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgWorkerCount); ok { + reorgMeta.Concurrency = variable.TidbOptInt(sv, 0) + } + if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgBatchSize); ok { + reorgMeta.BatchSize = variable.TidbOptInt(sv, 0) + } if reorgMeta.IsDistReorg && !reorgMeta.IsFastReorg { return nil, dbterror.ErrUnsupportedDistTask @@ -4770,6 +4776,17 @@ func newReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) (*model. LastReorgMetaFastReorgDisabled = true }) } + + logutil.DDLLogger().Info("initialize reorg meta", + zap.String("jobSchema", job.SchemaName), + zap.String("jobTable", job.TableName), + zap.Stringer("jobType", job.Type), + zap.Bool("enableDistTask", reorgMeta.IsDistReorg), + zap.Bool("enableFastReorg", reorgMeta.IsFastReorg), + zap.String("targetScope", reorgMeta.TargetScope), + zap.Int("concurrency", reorgMeta.Concurrency), + zap.Int("batchSize", reorgMeta.BatchSize), + ) return reorgMeta, nil } diff --git a/pkg/ddl/export_test.go b/pkg/ddl/export_test.go index 9e63c99c07be9..4aa98bb40fb0a 100644 --- a/pkg/ddl/export_test.go +++ b/pkg/ddl/export_test.go @@ -47,7 +47,7 @@ func FetchChunk4Test(copCtx copr.CopContext, tbl table.PhysicalTable, startKey, } opCtx := ddl.NewLocalOperatorCtx(context.Background(), 1) src := testutil.NewOperatorTestSource(ddl.TableScanTask{1, startKey, endKey}) - scanOp := ddl.NewTableScanOperator(opCtx, sessPool, copCtx, srcChkPool, 1, nil) + scanOp := ddl.NewTableScanOperator(opCtx, sessPool, copCtx, srcChkPool, 1, nil, 0) sink := testutil.NewOperatorTestSink[ddl.IndexRecordChunk]() operator.Compose[ddl.TableScanTask](src, scanOp) diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index b3a8af5bed61d..235e8add14f9b 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -1954,7 +1954,7 @@ func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo if indexInfo.Unique { ctx := tidblogutil.WithCategory(ctx, "ddl-ingest") if bc == nil { - bc, err = ingest.LitBackCtxMgr.Register(ctx, reorgInfo.ID, indexInfo.Unique, nil, discovery, reorgInfo.ReorgMeta.ResourceGroupName) + bc, err = ingest.LitBackCtxMgr.Register(ctx, reorgInfo.ID, indexInfo.Unique, nil, discovery, reorgInfo.ReorgMeta.ResourceGroupName, 1) if err != nil { return err } @@ -2029,7 +2029,7 @@ func (w *worker) executeDistTask(t table.Table, reorgInfo *reorgInfo) error { }) } else { job := reorgInfo.Job - workerCntLimit := int(variable.GetDDLReorgWorkerCounter()) + workerCntLimit := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) cpuCount, err := handle.GetCPUCountOfNode(ctx) if err != nil { return err diff --git a/pkg/ddl/index_cop.go b/pkg/ddl/index_cop.go index 6a9c443b7e032..785742d68b3ec 100644 --- a/pkg/ddl/index_cop.go +++ b/pkg/ddl/index_cop.go @@ -28,7 +28,6 @@ import ( exprctx "github.com/pingcap/tidb/pkg/expression/context" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/table/tables" "github.com/pingcap/tidb/pkg/tablecodec" @@ -41,20 +40,6 @@ import ( kvutil "github.com/tikv/client-go/v2/util" ) -// copReadBatchSize is the batch size of coprocessor read. -// It multiplies the tidb_ddl_reorg_batch_size by 10 to avoid -// sending too many cop requests for the same handle range. -func copReadBatchSize() int { - return 10 * int(variable.GetDDLReorgBatchSize()) -} - -// copReadChunkPoolSize is the size of chunk pool, which -// represents the max concurrent ongoing coprocessor requests. -// It multiplies the tidb_ddl_reorg_worker_cnt by 10. -func copReadChunkPoolSize() int { - return 10 * int(variable.GetDDLReorgWorkerCounter()) -} - func wrapInBeginRollback(se *sess.Session, f func(startTS uint64) error) error { err := se.Begin(context.Background()) if err != nil { diff --git a/pkg/ddl/index_modify_test.go b/pkg/ddl/index_modify_test.go index 2e72a2b2b2ecc..5a368fa589ca7 100644 --- a/pkg/ddl/index_modify_test.go +++ b/pkg/ddl/index_modify_test.go @@ -1047,7 +1047,7 @@ func TestAddIndexUniqueFailOnDuplicate(t *testing.T) { tk.MustExec("create table t (a bigint primary key clustered, b int);") // The subtask execution order is not guaranteed in distributed reorg. We need to disable it first. tk.MustExec("set @@global.tidb_enable_dist_task = 0;") - tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 1;") + tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 1;") for i := 1; i <= 12; i++ { tk.MustExec("insert into t values (?, ?)", i, i) } diff --git a/pkg/ddl/ingest/backend_mgr.go b/pkg/ddl/ingest/backend_mgr.go index 068047e5a8710..3719e801ad253 100644 --- a/pkg/ddl/ingest/backend_mgr.go +++ b/pkg/ddl/ingest/backend_mgr.go @@ -49,6 +49,7 @@ type BackendCtxMgr interface { etcdClient *clientv3.Client, pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, + importConc int, ) (BackendCtx, error) Unregister(jobID int64) // EncodeJobSortPath encodes the job ID to the local disk sort path. @@ -114,6 +115,7 @@ func (m *litBackendCtxMgr) Register( etcdClient *clientv3.Client, pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, + concurrency int, ) (BackendCtx, error) { bc, exist := m.Load(jobID) if exist { @@ -131,7 +133,7 @@ func (m *litBackendCtxMgr) Register( logutil.Logger(ctx).Error(LitErrCreateDirFail, zap.Error(err)) return nil, err } - cfg, err := genConfig(ctx, sortPath, m.memRoot, hasUnique, resourceGroupName) + cfg, err := genConfig(ctx, sortPath, m.memRoot, hasUnique, resourceGroupName, concurrency) if err != nil { logutil.Logger(ctx).Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err)) return nil, err diff --git a/pkg/ddl/ingest/config.go b/pkg/ddl/ingest/config.go index c6b59a531c5cd..0f31e64e5e9a3 100644 --- a/pkg/ddl/ingest/config.go +++ b/pkg/ddl/ingest/config.go @@ -48,16 +48,16 @@ func genConfig( memRoot MemRoot, unique bool, resourceGroup string, + concurrency int, ) (*litConfig, error) { tidbCfg := tidb.GetGlobalConfig() cfg := lightning.NewConfig() cfg.TikvImporter.Backend = lightning.BackendLocal // Each backend will build a single dir in lightning dir. cfg.TikvImporter.SortedKVDir = jobSortPath + cfg.TikvImporter.RangeConcurrency = concurrency if ImporterRangeConcurrencyForTest != nil { cfg.TikvImporter.RangeConcurrency = int(ImporterRangeConcurrencyForTest.Load()) - } else { - cfg.TikvImporter.RangeConcurrency = int(variable.GetDDLReorgWorkerCounter()) } err := cfg.AdjustForDDL() if err != nil { @@ -91,6 +91,26 @@ func genConfig( return c, nil } +// CopReadBatchSize is the batch size of coprocessor read. +// It multiplies the tidb_ddl_reorg_batch_size by 10 to avoid +// sending too many cop requests for the same handle range. +func CopReadBatchSize(hintSize int) int { + if hintSize > 0 { + return hintSize + } + return 10 * int(variable.GetDDLReorgBatchSize()) +} + +// CopReadChunkPoolSize is the size of chunk pool, which +// represents the max concurrent ongoing coprocessor requests. +// It multiplies the tidb_ddl_reorg_worker_cnt by 10. +func CopReadChunkPoolSize(hintConc int) int { + if hintConc > 0 { + return 10 * hintConc + } + return 10 * int(variable.GetDDLReorgWorkerCounter()) +} + // NewDDLTLS creates a common.TLS from the tidb config for DDL. func NewDDLTLS() (*common.TLS, error) { tidbCfg := tidb.GetGlobalConfig() diff --git a/pkg/ddl/ingest/integration_test.go b/pkg/ddl/ingest/integration_test.go index 049aff41e40eb..e47e955b9450c 100644 --- a/pkg/ddl/ingest/integration_test.go +++ b/pkg/ddl/ingest/integration_test.go @@ -87,7 +87,7 @@ func TestIngestError(t *testing.T) { tk.MustExec("set global tidb_enable_dist_task = 0") defer ingesttestutil.InjectMockBackendMgr(t, store)() - tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 1;") + tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 1;") tk.MustExec("create table t (a int primary key, b int);") for i := 0; i < 4; i++ { tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i*10000, i*10000)) diff --git a/pkg/ddl/ingest/mock.go b/pkg/ddl/ingest/mock.go index 4d9261ecfc672..8d5fe8744dccf 100644 --- a/pkg/ddl/ingest/mock.go +++ b/pkg/ddl/ingest/mock.go @@ -56,7 +56,8 @@ func (m *MockBackendCtxMgr) CheckMoreTasksAvailable() (bool, error) { } // Register implements BackendCtxMgr.Register interface. -func (m *MockBackendCtxMgr) Register(ctx context.Context, jobID int64, unique bool, etcdClient *clientv3.Client, pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string) (BackendCtx, error) { +func (m *MockBackendCtxMgr) Register(ctx context.Context, jobID int64, unique bool, etcdClient *clientv3.Client, + pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, importConc int) (BackendCtx, error) { logutil.DDLIngestLogger().Info("mock backend mgr register", zap.Int64("jobID", jobID)) if mockCtx, ok := m.runningJobs[jobID]; ok { return mockCtx, nil diff --git a/pkg/ddl/tests/adminpause/global.go b/pkg/ddl/tests/adminpause/global.go index c75a2ee4d1afc..2d6bd9aa79c0d 100644 --- a/pkg/ddl/tests/adminpause/global.go +++ b/pkg/ddl/tests/adminpause/global.go @@ -35,8 +35,8 @@ func prepareDomain(t *testing.T) (*domain.Domain, *testkit.TestKit, *testkit.Tes adminCommandKit := testkit.NewTestKit(t, store) ddlctrl.ReorgWaitTimeout = 10 * time.Millisecond - stmtKit.MustExec("set @@global.tidb_ddl_reorg_batch_size = 2") - stmtKit.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 1") + stmtKit.MustExec("set @@tidb_ddl_reorg_batch_size = 2") + stmtKit.MustExec("set @@tidb_ddl_reorg_worker_cnt = 1") stmtKit = testkit.NewTestKit(t, store) stmtKit.MustExec("use test") diff --git a/pkg/executor/test/admintest/admin_test.go b/pkg/executor/test/admintest/admin_test.go index d2d79799e89f1..e7fb86d69dd75 100644 --- a/pkg/executor/test/admintest/admin_test.go +++ b/pkg/executor/test/admintest/admin_test.go @@ -2073,7 +2073,7 @@ func TestAdminCheckGlobalIndexDuringDDL(t *testing.T) { } batchSize := 32 - tk.MustExec(fmt.Sprintf("set global tidb_ddl_reorg_batch_size = %d", batchSize)) + tk.MustExec(fmt.Sprintf("set @@tidb_ddl_reorg_batch_size = %d", batchSize)) var enableFastCheck = []bool{false, true} for _, enabled := range enableFastCheck { diff --git a/pkg/executor/test/ddl/ddl_test.go b/pkg/executor/test/ddl/ddl_test.go index 62d9764a723a4..449cf435a5e83 100644 --- a/pkg/executor/test/ddl/ddl_test.go +++ b/pkg/executor/test/ddl/ddl_test.go @@ -813,6 +813,11 @@ func TestSetDDLReorgWorkerCnt(t *testing.T) { tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 257") tk.MustQuery("SHOW WARNINGS").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_ddl_reorg_worker_cnt value: '257'")) tk.MustQuery("select @@global.tidb_ddl_reorg_worker_cnt").Check(testkit.Rows("256")) + + tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 10;") + tk.MustQuery("select @@tidb_ddl_reorg_worker_cnt;").Check(testkit.Rows("10")) + tk.MustQuery("select @@global.tidb_ddl_reorg_worker_cnt;").Check(testkit.Rows("256")) + require.Equal(t, int32(256), variable.GetDDLReorgWorkerCounter()) } func TestSetDDLReorgBatchSize(t *testing.T) { @@ -850,6 +855,10 @@ func TestSetDDLReorgBatchSize(t *testing.T) { tk.MustExec("set @@global.tidb_ddl_reorg_batch_size = 1000") res = tk.MustQuery("select @@global.tidb_ddl_reorg_batch_size") res.Check(testkit.Rows("1000")) + + tk.MustExec("set @@tidb_ddl_reorg_batch_size = 256;") + tk.MustQuery("select @@tidb_ddl_reorg_batch_size").Check(testkit.Rows("256")) + tk.MustQuery("select @@global.tidb_ddl_reorg_batch_size").Check(testkit.Rows("1000")) } func TestSetDDLErrorCountLimit(t *testing.T) { diff --git a/pkg/parser/model/reorg.go b/pkg/parser/model/reorg.go index 9355d796271eb..bcdfa9b358efc 100644 --- a/pkg/parser/model/reorg.go +++ b/pkg/parser/model/reorg.go @@ -34,6 +34,26 @@ type DDLReorgMeta struct { ResourceGroupName string `json:"resource_group_name"` Version int64 `json:"version"` TargetScope string `json:"target_scope"` + // These two variables are set when corresponding session variables are set explicitly. When they are set, + // user cannot change it by setting the global one. Otherwise, they can be adjusted dynamically through global var. + Concurrency int `json:"concurrency"` + BatchSize int `json:"batch_size"` +} + +// GetConcurrencyOrDefault gets the concurrency from DDLReorgMeta or returns the default value. +func (dm *DDLReorgMeta) GetConcurrencyOrDefault(defaultVal int) int { + if dm == nil || dm.Concurrency == 0 { + return defaultVal + } + return dm.Concurrency +} + +// GetBatchSizeOrDefault gets the batch size from DDLReorgMeta or returns the default value. +func (dm *DDLReorgMeta) GetBatchSizeOrDefault(defaultVal int) int { + if dm == nil || dm.BatchSize == 0 { + return defaultVal + } + return dm.BatchSize } const ( diff --git a/pkg/sessionctx/variable/sysvar.go b/pkg/sessionctx/variable/sysvar.go index 9e14e1b344e68..031dcac02003a 100644 --- a/pkg/sessionctx/variable/sysvar.go +++ b/pkg/sessionctx/variable/sysvar.go @@ -758,11 +758,11 @@ var defaultSysVars = []*SysVar{ SetDDLFlashbackConcurrency(int32(tidbOptPositiveInt32(val, DefTiDBDDLFlashbackConcurrency))) return nil }}, - {Scope: ScopeGlobal, Name: TiDBDDLReorgWorkerCount, Value: strconv.Itoa(DefTiDBDDLReorgWorkerCount), Type: TypeUnsigned, MinValue: 1, MaxValue: MaxConfigurableConcurrency, SetGlobal: func(_ context.Context, s *SessionVars, val string) error { + {Scope: ScopeGlobal | ScopeSession, Name: TiDBDDLReorgWorkerCount, Value: strconv.Itoa(DefTiDBDDLReorgWorkerCount), Type: TypeUnsigned, MinValue: 1, MaxValue: MaxConfigurableConcurrency, SetGlobal: func(_ context.Context, s *SessionVars, val string) error { SetDDLReorgWorkerCounter(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgWorkerCount))) return nil }}, - {Scope: ScopeGlobal, Name: TiDBDDLReorgBatchSize, Value: strconv.Itoa(DefTiDBDDLReorgBatchSize), Type: TypeUnsigned, MinValue: int64(MinDDLReorgBatchSize), MaxValue: uint64(MaxDDLReorgBatchSize), SetGlobal: func(_ context.Context, s *SessionVars, val string) error { + {Scope: ScopeGlobal | ScopeSession, Name: TiDBDDLReorgBatchSize, Value: strconv.Itoa(DefTiDBDDLReorgBatchSize), Type: TypeUnsigned, MinValue: int64(MinDDLReorgBatchSize), MaxValue: uint64(MaxDDLReorgBatchSize), SetGlobal: func(_ context.Context, s *SessionVars, val string) error { SetDDLReorgBatchSize(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgBatchSize))) return nil }}, diff --git a/pkg/sessionctx/variable/varsutil_test.go b/pkg/sessionctx/variable/varsutil_test.go index 3ba450378d65b..0592907bd9752 100644 --- a/pkg/sessionctx/variable/varsutil_test.go +++ b/pkg/sessionctx/variable/varsutil_test.go @@ -251,9 +251,6 @@ func TestVarsutil(t *testing.T) { require.NoError(t, err) require.Equal(t, false, v.OptimizerEnableNewOnlyFullGroupByCheck) - err = v.SetSystemVar(TiDBDDLReorgWorkerCount, "4") // wrong scope global only - require.True(t, terror.ErrorEqual(err, errGlobalVariable)) - err = v.SetSystemVar(TiDBRetryLimit, "3") require.NoError(t, err) val, err = v.GetSessionOrGlobalSystemVar(context.Background(), TiDBRetryLimit) diff --git a/tests/realtikvtest/addindextest1/disttask_test.go b/tests/realtikvtest/addindextest1/disttask_test.go index adccc41486da0..170c2c2746420 100644 --- a/tests/realtikvtest/addindextest1/disttask_test.go +++ b/tests/realtikvtest/addindextest1/disttask_test.go @@ -62,6 +62,7 @@ func TestAddIndexDistBasic(t *testing.T) { bak := variable.GetDDLReorgWorkerCounter() tk.MustExec("set global tidb_ddl_reorg_worker_cnt = 111") + tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 111") require.Equal(t, int32(111), variable.GetDDLReorgWorkerCounter()) tk.MustExec("create table t(a bigint auto_random primary key) partition by hash(a) partitions 20;") tk.MustExec("insert into t values (), (), (), (), (), ()") @@ -80,6 +81,7 @@ func TestAddIndexDistBasic(t *testing.T) { require.Equal(t, 1, task.Concurrency) tk.MustExec(fmt.Sprintf("set global tidb_ddl_reorg_worker_cnt = %d", bak)) + tk.MustExec(fmt.Sprintf("set @@tidb_ddl_reorg_worker_cnt = %d", bak)) require.Equal(t, bak, variable.GetDDLReorgWorkerCounter()) tk.MustExec("create table t1(a bigint auto_random primary key);") diff --git a/tests/realtikvtest/addindextest3/functional_test.go b/tests/realtikvtest/addindextest3/functional_test.go index d70f21a0853d4..7a3e29a4d3206 100644 --- a/tests/realtikvtest/addindextest3/functional_test.go +++ b/tests/realtikvtest/addindextest3/functional_test.go @@ -83,7 +83,7 @@ func TestDDLTestEstimateTableRowSize(t *testing.T) { func TestBackendCtxConcurrentUnregister(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) discovery := store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery() - bCtx, err := ingest.LitBackCtxMgr.Register(context.Background(), 1, false, nil, discovery, "test") + bCtx, err := ingest.LitBackCtxMgr.Register(context.Background(), 1, false, nil, discovery, "test", 1) require.NoError(t, err) idxIDs := []int64{1, 2, 3, 4, 5, 6, 7} uniques := make([]bool, 0, len(idxIDs)) diff --git a/tests/realtikvtest/addindextest3/ingest_test.go b/tests/realtikvtest/addindextest3/ingest_test.go index 87ca0b5e2d0ea..0b7b6c805df5a 100644 --- a/tests/realtikvtest/addindextest3/ingest_test.go +++ b/tests/realtikvtest/addindextest3/ingest_test.go @@ -191,7 +191,7 @@ func TestAddIndexIngestAdjustBackfillWorker(t *testing.T) { tk.MustExec("create database addindexlit;") tk.MustExec("use addindexlit;") tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) - tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 1;") + tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 1;") tk.MustExec("create table t (a int primary key);") var sb strings.Builder sb.WriteString("insert into t values ") @@ -219,7 +219,7 @@ func TestAddIndexIngestAdjustBackfillWorker(t *testing.T) { running = false case wg := <-ddl.TestCheckWorkerNumCh: offset = (offset + 1) % 3 - tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_reorg_worker_cnt=%d", cnt[offset])) + tk.MustExec(fmt.Sprintf("set @@tidb_ddl_reorg_worker_cnt=%d", cnt[offset])) atomic.StoreInt32(&ddl.TestCheckWorkerNumber, int32(cnt[offset]/2+2)) wg.Done() } @@ -231,7 +231,6 @@ func TestAddIndexIngestAdjustBackfillWorker(t *testing.T) { require.Len(t, rows, 1) jobTp := rows[0][3].(string) require.True(t, strings.Contains(jobTp, "ingest"), jobTp) - tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 4;") } func TestAddIndexIngestAdjustBackfillWorkerCountFail(t *testing.T) { @@ -244,7 +243,7 @@ func TestAddIndexIngestAdjustBackfillWorkerCountFail(t *testing.T) { ingest.ImporterRangeConcurrencyForTest = &atomic.Int32{} ingest.ImporterRangeConcurrencyForTest.Store(2) - tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 20;") + tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 20;") tk.MustExec("create table t (a int primary key);") var sb strings.Builder @@ -262,7 +261,6 @@ func TestAddIndexIngestAdjustBackfillWorkerCountFail(t *testing.T) { require.Len(t, rows, 1) jobTp := rows[0][3].(string) require.True(t, strings.Contains(jobTp, "ingest"), jobTp) - tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 4;") ingest.ImporterRangeConcurrencyForTest = nil } @@ -399,8 +397,8 @@ func TestAddIndexRemoteDuplicateCheck(t *testing.T) { tk.MustExec("create database addindexlit;") tk.MustExec("use addindexlit;") tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) - tk.MustExec("set global tidb_ddl_reorg_worker_cnt=1;") - tk.MustExec("set global tidb_enable_dist_task = 0;") + tk.MustExec("set @@tidb_ddl_reorg_worker_cnt=1;") + tk.MustExec("set @@global.tidb_enable_dist_task = 0;") tk.MustExec("create table t(id int primary key, b int, k int);") tk.MustQuery("split table t by (30000);").Check(testkit.Rows("1 1")) @@ -410,8 +408,6 @@ func TestAddIndexRemoteDuplicateCheck(t *testing.T) { ingest.ForceSyncFlagForTest = true tk.MustGetErrMsg("alter table t add unique index idx(b);", "[kv:1062]Duplicate entry '1' for key 't.idx'") ingest.ForceSyncFlagForTest = false - - tk.MustExec("set global tidb_ddl_reorg_worker_cnt=4;") } func TestAddIndexBackfillLostUpdate(t *testing.T) { diff --git a/tests/realtikvtest/addindextest3/operator_test.go b/tests/realtikvtest/addindextest3/operator_test.go index 74bb4a003a7b6..c8a805b693948 100644 --- a/tests/realtikvtest/addindextest3/operator_test.go +++ b/tests/realtikvtest/addindextest3/operator_test.go @@ -95,7 +95,7 @@ func TestBackfillOperators(t *testing.T) { ctx := context.Background() opCtx := ddl.NewDistTaskOperatorCtx(ctx, 1, 1) src := testutil.NewOperatorTestSource(opTasks...) - scanOp := ddl.NewTableScanOperator(opCtx, sessPool, copCtx, srcChkPool, 3, nil) + scanOp := ddl.NewTableScanOperator(opCtx, sessPool, copCtx, srcChkPool, 3, nil, 0) sink := testutil.NewOperatorTestSink[ddl.IndexRecordChunk]() operator.Compose[ddl.TableScanTask](src, scanOp)