From 5e2f23a9e764690d3eba0197a4e10d342988415b Mon Sep 17 00:00:00 2001 From: tangenta Date: Fri, 9 Aug 2024 18:10:45 +0800 Subject: [PATCH 01/10] ddl: support session level tidb_ddl_reorg_worker_cnt and batch_size --- pkg/ddl/backfilling.go | 21 ++++++--- pkg/ddl/backfilling_dist_executor.go | 9 +++- pkg/ddl/backfilling_operators.go | 45 ++++++++++--------- pkg/ddl/backfilling_scheduler.go | 13 ++++-- pkg/ddl/executor.go | 17 +++++++ pkg/ddl/export_test.go | 3 +- pkg/ddl/index.go | 4 +- pkg/ddl/index_cop.go | 15 ------- pkg/ddl/ingest/backend_mgr.go | 4 +- pkg/ddl/ingest/config.go | 29 +++++++++++- pkg/ddl/ingest/mock.go | 3 +- pkg/executor/test/ddl/ddl_test.go | 9 ++++ pkg/parser/model/reorg.go | 2 + pkg/sessionctx/variable/sysvar.go | 4 +- .../addindextest3/functional_test.go | 2 +- .../addindextest3/operator_test.go | 2 +- 16 files changed, 126 insertions(+), 56 deletions(-) diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index 4fbed0422726d..42940f2d08195 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -178,6 +178,10 @@ func newBackfillCtx(id int, rInfo *reorgInfo, } exprCtx := sessCtx.GetExprCtx() + batchCnt := rInfo.ReorgMeta.BatchSize + if batchCnt == 0 { + batchCnt = int(variable.GetDDLReorgBatchSize()) + } return &backfillCtx{ id: id, ddlCtx: rInfo.d, @@ -188,7 +192,7 @@ func newBackfillCtx(id int, rInfo *reorgInfo, loc: exprCtx.GetEvalCtx().Location(), schemaName: schemaName, table: tbl, - batchCnt: int(variable.GetDDLReorgBatchSize()), + batchCnt: batchCnt, jobContext: jobCtx, metricCounter: metrics.BackfillTotalCounter.WithLabelValues( metrics.GenerateReorgLabel(label, schemaName, tbl.Meta().Name.String())), @@ -415,7 +419,11 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { }) // Change the batch size dynamically. - w.GetCtx().batchCnt = int(variable.GetDDLReorgBatchSize()) + newBatchCnt := job.ReorgMeta.BatchSize + if newBatchCnt == 0 { + newBatchCnt = int(variable.GetDDLReorgBatchSize()) + } + w.GetCtx().batchCnt = newBatchCnt result := w.handleBackfillTask(d, task, bf) w.sendResult(result) @@ -675,8 +683,9 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( //nolint: forcetypeassert discovery := dc.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery() + importConc := ingest.IngestConcurrency(job.ReorgMeta.Concurrency) bcCtx, err := ingest.LitBackCtxMgr.Register( - ctx, job.ID, hasUnique, dc.etcdCli, discovery, job.ReorgMeta.ResourceGroupName) + ctx, job.ID, hasUnique, dc.etcdCli, discovery, job.ReorgMeta.ResourceGroupName, importConc) if err != nil { return errors.Trace(err) } @@ -705,16 +714,15 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( bcCtx.AttachCheckpointManager(cpMgr) } - reorgCtx := dc.getReorgCtx(reorgInfo.Job.ID) + reorgCtx := dc.getReorgCtx(job.ID) rowCntListener := &localRowCntListener{ prevPhysicalRowCnt: reorgCtx.getRowCount(), - reorgCtx: dc.getReorgCtx(reorgInfo.Job.ID), + reorgCtx: reorgCtx, counter: metrics.BackfillTotalCounter.WithLabelValues( metrics.GenerateReorgLabel("add_idx_rate", job.SchemaName, job.TableName)), } avgRowSize := estimateTableRowSize(ctx, dc.store, sctx.GetRestrictedSQLExecutor(), t) - concurrency := int(variable.GetDDLReorgWorkerCounter()) engines, err := bcCtx.Register(indexIDs, uniques, t) if err != nil { @@ -725,6 +733,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( return errors.Trace(err) } + concurrency := ingest.IngestConcurrency(job.ReorgMeta.Concurrency) pipe, err := NewAddIndexIngestPipeline( opCtx, dc.store, diff --git a/pkg/ddl/backfilling_dist_executor.go b/pkg/ddl/backfilling_dist_executor.go index c099c99750410..3a06211ec2d52 100644 --- a/pkg/ddl/backfilling_dist_executor.go +++ b/pkg/ddl/backfilling_dist_executor.go @@ -147,7 +147,14 @@ func (s *backfillDistExecutor) getBackendCtx() (ingest.BackendCtx, error) { ddlObj := s.d discovery := ddlObj.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery() - return ingest.LitBackCtxMgr.Register(s.BaseTaskExecutor.Ctx(), job.ID, hasUnique, ddlObj.etcdCli, discovery, job.ReorgMeta.ResourceGroupName) + return ingest.LitBackCtxMgr.Register( + s.BaseTaskExecutor.Ctx(), + job.ID, hasUnique, + ddlObj.etcdCli, + discovery, + job.ReorgMeta.ResourceGroupName, + ingest.IngestConcurrency(job.ReorgMeta.Concurrency), + ) } func hasUniqueIndex(job *model.Job) (bool, error) { diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index 5ae5554ef290b..ca14b24807ed5 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -171,15 +171,11 @@ func NewAddIndexIngestPipeline( if err != nil { return nil, err } - poolSize := copReadChunkPoolSize() - srcChkPool := make(chan *chunk.Chunk, poolSize) - for i := 0; i < poolSize; i++ { - srcChkPool <- chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, copReadBatchSize()) - } + srcChkPool := createChunkPool(copCtx, concurrency, reorgMeta.BatchSize) readerCnt, writerCnt := expectedIngestWorkerCnt(concurrency, avgRowSize) srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, cpMgr) - scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, cpMgr) + scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, cpMgr, reorgMeta.BatchSize) ingestOp := NewIndexIngestOperator(ctx, copCtx, backendCtx, sessPool, tbl, indexes, engines, srcChkPool, writerCnt, reorgMeta, cpMgr, rowCntListener) sinkOp := newIndexWriteResultSink(ctx, backendCtx, tbl, indexes, cpMgr, rowCntListener) @@ -226,11 +222,7 @@ func NewWriteIndexToExternalStoragePipeline( if err != nil { return nil, err } - poolSize := copReadChunkPoolSize() - srcChkPool := make(chan *chunk.Chunk, poolSize) - for i := 0; i < poolSize; i++ { - srcChkPool <- chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, copReadBatchSize()) - } + srcChkPool := createChunkPool(copCtx, concurrency, reorgMeta.BatchSize) readerCnt, writerCnt := expectedIngestWorkerCnt(concurrency, avgRowSize) backend, err := storage.ParseBackend(extStoreURI, nil) @@ -248,7 +240,7 @@ func NewWriteIndexToExternalStoragePipeline( }) srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, nil) - scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, nil) + scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, nil, reorgMeta.BatchSize) writeOp := NewWriteExternalStoreOperator( ctx, copCtx, sessPool, jobID, subtaskID, tbl, indexes, extStore, srcChkPool, writerCnt, onClose, memSizePerIndex, reorgMeta) sinkOp := newIndexWriteResultSink(ctx, nil, tbl, indexes, nil, rowCntListener) @@ -270,6 +262,16 @@ func NewWriteIndexToExternalStoragePipeline( ), nil } +func createChunkPool(copCtx copr.CopContext, hintConc, hintBatchSize int) chan *chunk.Chunk { + poolSize := ingest.CopReadChunkPoolSize(hintConc) + batchSize := ingest.CopReadBatchSize(hintBatchSize) + srcChkPool := make(chan *chunk.Chunk, poolSize) + for i := 0; i < poolSize; i++ { + srcChkPool <- chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, batchSize) + } + return srcChkPool +} + // TableScanTask contains the start key and the end key of a region. type TableScanTask struct { ID int @@ -457,6 +459,7 @@ func NewTableScanOperator( srcChkPool chan *chunk.Chunk, concurrency int, cpMgr *ingest.CheckpointManager, + hintBatchSize int, ) *TableScanOperator { pool := workerpool.NewWorkerPool( "TableScanOperator", @@ -464,12 +467,13 @@ func NewTableScanOperator( concurrency, func() workerpool.Worker[TableScanTask, IndexRecordChunk] { return &tableScanWorker{ - ctx: ctx, - copCtx: copCtx, - sessPool: sessPool, - se: nil, - srcChkPool: srcChkPool, - cpMgr: cpMgr, + ctx: ctx, + copCtx: copCtx, + sessPool: sessPool, + se: nil, + srcChkPool: srcChkPool, + cpMgr: cpMgr, + hintBatchSize: hintBatchSize, } }) return &TableScanOperator{ @@ -484,7 +488,8 @@ type tableScanWorker struct { se *session.Session srcChkPool chan *chunk.Chunk - cpMgr *ingest.CheckpointManager + cpMgr *ingest.CheckpointManager + hintBatchSize int } func (w *tableScanWorker) HandleTask(task TableScanTask, sender func(IndexRecordChunk)) { @@ -554,7 +559,7 @@ func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecor func (w *tableScanWorker) getChunk() *chunk.Chunk { chk := <-w.srcChkPool - newCap := copReadBatchSize() + newCap := ingest.CopReadBatchSize(w.hintBatchSize) if chk.Capacity() != newCap { chk = chunk.NewChunkWithCapacity(w.copCtx.GetBase().FieldTypes, newCap) } diff --git a/pkg/ddl/backfilling_scheduler.go b/pkg/ddl/backfilling_scheduler.go index dc0201729787c..c0069b377087e 100644 --- a/pkg/ddl/backfilling_scheduler.go +++ b/pkg/ddl/backfilling_scheduler.go @@ -85,6 +85,10 @@ func newTxnBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *ses if err != nil { return nil, err } + workerCnt := info.ReorgMeta.Concurrency + if workerCnt == 0 { + workerCnt = int(variable.GetDDLReorgWorkerCounter()) + } return &txnBackfillScheduler{ ctx: ctx, reorgInfo: info, @@ -93,7 +97,7 @@ func newTxnBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *ses tbl: tbl, decodeColMap: decColMap, jobCtx: jobCtx, - workers: make([]*backfillWorker, 0, variable.GetDDLReorgWorkerCounter()), + workers: make([]*backfillWorker, 0, workerCnt), taskCh: make(chan *reorgBackfillTask, backfillTaskChanSize), resultCh: make(chan *backfillResult, backfillTaskChanSize), }, nil @@ -230,8 +234,11 @@ func restoreSessCtx(sessCtx sessionctx.Context) func(sessCtx sessionctx.Context) } } -func (*txnBackfillScheduler) expectedWorkerSize() (size int) { - workerCnt := int(variable.GetDDLReorgWorkerCounter()) +func (b *txnBackfillScheduler) expectedWorkerSize() (size int) { + workerCnt := b.reorgInfo.ReorgMeta.Concurrency + if workerCnt == 0 { + workerCnt = int(variable.GetDDLReorgWorkerCounter()) + } return min(workerCnt, maxBackfillWorkerSize) } diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index 225ba84582730..1e25aa1258f29 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -4755,6 +4755,12 @@ func newReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) (*model. reorgMeta.IsDistReorg = variable.EnableDistTask.Load() reorgMeta.IsFastReorg = variable.EnableFastReorg.Load() reorgMeta.TargetScope = variable.ServiceScope.Load() + if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgWorkerCount); ok { + reorgMeta.Concurrency = variable.TidbOptInt(sv, 0) + } + if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgBatchSize); ok { + reorgMeta.BatchSize = variable.TidbOptInt(sv, 0) + } if reorgMeta.IsDistReorg && !reorgMeta.IsFastReorg { return nil, dbterror.ErrUnsupportedDistTask @@ -4770,6 +4776,17 @@ func newReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) (*model. LastReorgMetaFastReorgDisabled = true }) } + + logutil.DDLLogger().Info("initialize reorg meta", + zap.String("jobSchema", job.SchemaName), + zap.String("jobTable", job.TableName), + zap.Stringer("jobType", job.Type), + zap.Bool("enableDistTask", reorgMeta.IsDistReorg), + zap.Bool("enableFastReorg", reorgMeta.IsFastReorg), + zap.String("targetScope", reorgMeta.TargetScope), + zap.Int("concurrency", reorgMeta.Concurrency), + zap.Int("batchSize", reorgMeta.BatchSize), + ) return reorgMeta, nil } diff --git a/pkg/ddl/export_test.go b/pkg/ddl/export_test.go index 9e63c99c07be9..05641bd0edf27 100644 --- a/pkg/ddl/export_test.go +++ b/pkg/ddl/export_test.go @@ -19,6 +19,7 @@ import ( "time" "github.com/ngaut/pools" + "github.com/pingcap/parser/model" "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/ddl/copr" "github.com/pingcap/tidb/pkg/ddl/session" @@ -47,7 +48,7 @@ func FetchChunk4Test(copCtx copr.CopContext, tbl table.PhysicalTable, startKey, } opCtx := ddl.NewLocalOperatorCtx(context.Background(), 1) src := testutil.NewOperatorTestSource(ddl.TableScanTask{1, startKey, endKey}) - scanOp := ddl.NewTableScanOperator(opCtx, sessPool, copCtx, srcChkPool, 1, nil) + scanOp := ddl.NewTableScanOperator(opCtx, sessPool, copCtx, srcChkPool, 1, nil, 0) sink := testutil.NewOperatorTestSink[ddl.IndexRecordChunk]() operator.Compose[ddl.TableScanTask](src, scanOp) diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index b3a8af5bed61d..f54e2682977cb 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -1954,7 +1954,7 @@ func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo if indexInfo.Unique { ctx := tidblogutil.WithCategory(ctx, "ddl-ingest") if bc == nil { - bc, err = ingest.LitBackCtxMgr.Register(ctx, reorgInfo.ID, indexInfo.Unique, nil, discovery, reorgInfo.ReorgMeta.ResourceGroupName) + bc, err = ingest.LitBackCtxMgr.Register(ctx, reorgInfo.ID, indexInfo.Unique, nil, discovery, reorgInfo.ReorgMeta.ResourceGroupName, 1) if err != nil { return err } @@ -2029,7 +2029,7 @@ func (w *worker) executeDistTask(t table.Table, reorgInfo *reorgInfo) error { }) } else { job := reorgInfo.Job - workerCntLimit := int(variable.GetDDLReorgWorkerCounter()) + workerCntLimit := ingest.IngestConcurrency(job.ReorgMeta.Concurrency) cpuCount, err := handle.GetCPUCountOfNode(ctx) if err != nil { return err diff --git a/pkg/ddl/index_cop.go b/pkg/ddl/index_cop.go index 6a9c443b7e032..785742d68b3ec 100644 --- a/pkg/ddl/index_cop.go +++ b/pkg/ddl/index_cop.go @@ -28,7 +28,6 @@ import ( exprctx "github.com/pingcap/tidb/pkg/expression/context" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/table/tables" "github.com/pingcap/tidb/pkg/tablecodec" @@ -41,20 +40,6 @@ import ( kvutil "github.com/tikv/client-go/v2/util" ) -// copReadBatchSize is the batch size of coprocessor read. -// It multiplies the tidb_ddl_reorg_batch_size by 10 to avoid -// sending too many cop requests for the same handle range. -func copReadBatchSize() int { - return 10 * int(variable.GetDDLReorgBatchSize()) -} - -// copReadChunkPoolSize is the size of chunk pool, which -// represents the max concurrent ongoing coprocessor requests. -// It multiplies the tidb_ddl_reorg_worker_cnt by 10. -func copReadChunkPoolSize() int { - return 10 * int(variable.GetDDLReorgWorkerCounter()) -} - func wrapInBeginRollback(se *sess.Session, f func(startTS uint64) error) error { err := se.Begin(context.Background()) if err != nil { diff --git a/pkg/ddl/ingest/backend_mgr.go b/pkg/ddl/ingest/backend_mgr.go index 068047e5a8710..3719e801ad253 100644 --- a/pkg/ddl/ingest/backend_mgr.go +++ b/pkg/ddl/ingest/backend_mgr.go @@ -49,6 +49,7 @@ type BackendCtxMgr interface { etcdClient *clientv3.Client, pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, + importConc int, ) (BackendCtx, error) Unregister(jobID int64) // EncodeJobSortPath encodes the job ID to the local disk sort path. @@ -114,6 +115,7 @@ func (m *litBackendCtxMgr) Register( etcdClient *clientv3.Client, pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, + concurrency int, ) (BackendCtx, error) { bc, exist := m.Load(jobID) if exist { @@ -131,7 +133,7 @@ func (m *litBackendCtxMgr) Register( logutil.Logger(ctx).Error(LitErrCreateDirFail, zap.Error(err)) return nil, err } - cfg, err := genConfig(ctx, sortPath, m.memRoot, hasUnique, resourceGroupName) + cfg, err := genConfig(ctx, sortPath, m.memRoot, hasUnique, resourceGroupName, concurrency) if err != nil { logutil.Logger(ctx).Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err)) return nil, err diff --git a/pkg/ddl/ingest/config.go b/pkg/ddl/ingest/config.go index c6b59a531c5cd..354b4dbc79223 100644 --- a/pkg/ddl/ingest/config.go +++ b/pkg/ddl/ingest/config.go @@ -48,16 +48,16 @@ func genConfig( memRoot MemRoot, unique bool, resourceGroup string, + concurrency int, ) (*litConfig, error) { tidbCfg := tidb.GetGlobalConfig() cfg := lightning.NewConfig() cfg.TikvImporter.Backend = lightning.BackendLocal // Each backend will build a single dir in lightning dir. cfg.TikvImporter.SortedKVDir = jobSortPath + cfg.TikvImporter.RangeConcurrency = concurrency if ImporterRangeConcurrencyForTest != nil { cfg.TikvImporter.RangeConcurrency = int(ImporterRangeConcurrencyForTest.Load()) - } else { - cfg.TikvImporter.RangeConcurrency = int(variable.GetDDLReorgWorkerCounter()) } err := cfg.AdjustForDDL() if err != nil { @@ -91,6 +91,31 @@ func genConfig( return c, nil } +// IngestConcurrency is the concurrency used during ingest. +func IngestConcurrency(hintConc int) int { + if hintConc > 0 { + return hintConc + } + return int(variable.GetDDLReorgWorkerCounter()) +} + +// CopReadBatchSize is the batch size of coprocessor read. +// It multiplies the tidb_ddl_reorg_batch_size by 10 to avoid +// sending too many cop requests for the same handle range. +func CopReadBatchSize(hintSize int) int { + if hintSize > 0 { + return hintSize + } + return 10 * int(variable.GetDDLReorgBatchSize()) +} + +// CopReadChunkPoolSize is the size of chunk pool, which +// represents the max concurrent ongoing coprocessor requests. +// It multiplies the tidb_ddl_reorg_worker_cnt by 10. +func CopReadChunkPoolSize(hintConc int) int { + return IngestConcurrency(hintConc) * 10 +} + // NewDDLTLS creates a common.TLS from the tidb config for DDL. func NewDDLTLS() (*common.TLS, error) { tidbCfg := tidb.GetGlobalConfig() diff --git a/pkg/ddl/ingest/mock.go b/pkg/ddl/ingest/mock.go index 4d9261ecfc672..8d5fe8744dccf 100644 --- a/pkg/ddl/ingest/mock.go +++ b/pkg/ddl/ingest/mock.go @@ -56,7 +56,8 @@ func (m *MockBackendCtxMgr) CheckMoreTasksAvailable() (bool, error) { } // Register implements BackendCtxMgr.Register interface. -func (m *MockBackendCtxMgr) Register(ctx context.Context, jobID int64, unique bool, etcdClient *clientv3.Client, pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string) (BackendCtx, error) { +func (m *MockBackendCtxMgr) Register(ctx context.Context, jobID int64, unique bool, etcdClient *clientv3.Client, + pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, importConc int) (BackendCtx, error) { logutil.DDLIngestLogger().Info("mock backend mgr register", zap.Int64("jobID", jobID)) if mockCtx, ok := m.runningJobs[jobID]; ok { return mockCtx, nil diff --git a/pkg/executor/test/ddl/ddl_test.go b/pkg/executor/test/ddl/ddl_test.go index 62d9764a723a4..449cf435a5e83 100644 --- a/pkg/executor/test/ddl/ddl_test.go +++ b/pkg/executor/test/ddl/ddl_test.go @@ -813,6 +813,11 @@ func TestSetDDLReorgWorkerCnt(t *testing.T) { tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 257") tk.MustQuery("SHOW WARNINGS").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_ddl_reorg_worker_cnt value: '257'")) tk.MustQuery("select @@global.tidb_ddl_reorg_worker_cnt").Check(testkit.Rows("256")) + + tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 10;") + tk.MustQuery("select @@tidb_ddl_reorg_worker_cnt;").Check(testkit.Rows("10")) + tk.MustQuery("select @@global.tidb_ddl_reorg_worker_cnt;").Check(testkit.Rows("256")) + require.Equal(t, int32(256), variable.GetDDLReorgWorkerCounter()) } func TestSetDDLReorgBatchSize(t *testing.T) { @@ -850,6 +855,10 @@ func TestSetDDLReorgBatchSize(t *testing.T) { tk.MustExec("set @@global.tidb_ddl_reorg_batch_size = 1000") res = tk.MustQuery("select @@global.tidb_ddl_reorg_batch_size") res.Check(testkit.Rows("1000")) + + tk.MustExec("set @@tidb_ddl_reorg_batch_size = 256;") + tk.MustQuery("select @@tidb_ddl_reorg_batch_size").Check(testkit.Rows("256")) + tk.MustQuery("select @@global.tidb_ddl_reorg_batch_size").Check(testkit.Rows("1000")) } func TestSetDDLErrorCountLimit(t *testing.T) { diff --git a/pkg/parser/model/reorg.go b/pkg/parser/model/reorg.go index 9355d796271eb..153b3589e9d44 100644 --- a/pkg/parser/model/reorg.go +++ b/pkg/parser/model/reorg.go @@ -30,6 +30,8 @@ type DDLReorgMeta struct { ReorgTp ReorgType `json:"reorg_tp"` IsFastReorg bool `json:"is_fast_reorg"` IsDistReorg bool `json:"is_dist_reorg"` + Concurrency int `json:"concurrency"` + BatchSize int `json:"batch_size"` UseCloudStorage bool `json:"use_cloud_storage"` ResourceGroupName string `json:"resource_group_name"` Version int64 `json:"version"` diff --git a/pkg/sessionctx/variable/sysvar.go b/pkg/sessionctx/variable/sysvar.go index 9e14e1b344e68..031dcac02003a 100644 --- a/pkg/sessionctx/variable/sysvar.go +++ b/pkg/sessionctx/variable/sysvar.go @@ -758,11 +758,11 @@ var defaultSysVars = []*SysVar{ SetDDLFlashbackConcurrency(int32(tidbOptPositiveInt32(val, DefTiDBDDLFlashbackConcurrency))) return nil }}, - {Scope: ScopeGlobal, Name: TiDBDDLReorgWorkerCount, Value: strconv.Itoa(DefTiDBDDLReorgWorkerCount), Type: TypeUnsigned, MinValue: 1, MaxValue: MaxConfigurableConcurrency, SetGlobal: func(_ context.Context, s *SessionVars, val string) error { + {Scope: ScopeGlobal | ScopeSession, Name: TiDBDDLReorgWorkerCount, Value: strconv.Itoa(DefTiDBDDLReorgWorkerCount), Type: TypeUnsigned, MinValue: 1, MaxValue: MaxConfigurableConcurrency, SetGlobal: func(_ context.Context, s *SessionVars, val string) error { SetDDLReorgWorkerCounter(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgWorkerCount))) return nil }}, - {Scope: ScopeGlobal, Name: TiDBDDLReorgBatchSize, Value: strconv.Itoa(DefTiDBDDLReorgBatchSize), Type: TypeUnsigned, MinValue: int64(MinDDLReorgBatchSize), MaxValue: uint64(MaxDDLReorgBatchSize), SetGlobal: func(_ context.Context, s *SessionVars, val string) error { + {Scope: ScopeGlobal | ScopeSession, Name: TiDBDDLReorgBatchSize, Value: strconv.Itoa(DefTiDBDDLReorgBatchSize), Type: TypeUnsigned, MinValue: int64(MinDDLReorgBatchSize), MaxValue: uint64(MaxDDLReorgBatchSize), SetGlobal: func(_ context.Context, s *SessionVars, val string) error { SetDDLReorgBatchSize(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgBatchSize))) return nil }}, diff --git a/tests/realtikvtest/addindextest3/functional_test.go b/tests/realtikvtest/addindextest3/functional_test.go index d70f21a0853d4..7a3e29a4d3206 100644 --- a/tests/realtikvtest/addindextest3/functional_test.go +++ b/tests/realtikvtest/addindextest3/functional_test.go @@ -83,7 +83,7 @@ func TestDDLTestEstimateTableRowSize(t *testing.T) { func TestBackendCtxConcurrentUnregister(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) discovery := store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery() - bCtx, err := ingest.LitBackCtxMgr.Register(context.Background(), 1, false, nil, discovery, "test") + bCtx, err := ingest.LitBackCtxMgr.Register(context.Background(), 1, false, nil, discovery, "test", 1) require.NoError(t, err) idxIDs := []int64{1, 2, 3, 4, 5, 6, 7} uniques := make([]bool, 0, len(idxIDs)) diff --git a/tests/realtikvtest/addindextest3/operator_test.go b/tests/realtikvtest/addindextest3/operator_test.go index 74bb4a003a7b6..c8a805b693948 100644 --- a/tests/realtikvtest/addindextest3/operator_test.go +++ b/tests/realtikvtest/addindextest3/operator_test.go @@ -95,7 +95,7 @@ func TestBackfillOperators(t *testing.T) { ctx := context.Background() opCtx := ddl.NewDistTaskOperatorCtx(ctx, 1, 1) src := testutil.NewOperatorTestSource(opTasks...) - scanOp := ddl.NewTableScanOperator(opCtx, sessPool, copCtx, srcChkPool, 3, nil) + scanOp := ddl.NewTableScanOperator(opCtx, sessPool, copCtx, srcChkPool, 3, nil, 0) sink := testutil.NewOperatorTestSink[ddl.IndexRecordChunk]() operator.Compose[ddl.TableScanTask](src, scanOp) From fc77a0b82a523fdef889cbe7a723fdbedc4149b6 Mon Sep 17 00:00:00 2001 From: tangenta Date: Fri, 9 Aug 2024 18:26:12 +0800 Subject: [PATCH 02/10] fix build --- pkg/ddl/BUILD.bazel | 1 + pkg/ddl/backfilling.go | 4 ++-- pkg/ddl/backfilling_dist_executor.go | 2 +- pkg/ddl/index.go | 2 +- pkg/ddl/ingest/config.go | 6 +++--- 5 files changed, 8 insertions(+), 7 deletions(-) diff --git a/pkg/ddl/BUILD.bazel b/pkg/ddl/BUILD.bazel index 914731ca6d462..5e585c0e0c60c 100644 --- a/pkg/ddl/BUILD.bazel +++ b/pkg/ddl/BUILD.bazel @@ -337,6 +337,7 @@ go_test( "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_parser//model:go_default_library", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//oracle", diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index 42940f2d08195..85cb9252d1875 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -683,7 +683,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( //nolint: forcetypeassert discovery := dc.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery() - importConc := ingest.IngestConcurrency(job.ReorgMeta.Concurrency) + importConc := ingest.ResolveConcurrency(job.ReorgMeta.Concurrency) bcCtx, err := ingest.LitBackCtxMgr.Register( ctx, job.ID, hasUnique, dc.etcdCli, discovery, job.ReorgMeta.ResourceGroupName, importConc) if err != nil { @@ -733,7 +733,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( return errors.Trace(err) } - concurrency := ingest.IngestConcurrency(job.ReorgMeta.Concurrency) + concurrency := ingest.ResolveConcurrency(job.ReorgMeta.Concurrency) pipe, err := NewAddIndexIngestPipeline( opCtx, dc.store, diff --git a/pkg/ddl/backfilling_dist_executor.go b/pkg/ddl/backfilling_dist_executor.go index 3a06211ec2d52..a5cbf4217d55d 100644 --- a/pkg/ddl/backfilling_dist_executor.go +++ b/pkg/ddl/backfilling_dist_executor.go @@ -153,7 +153,7 @@ func (s *backfillDistExecutor) getBackendCtx() (ingest.BackendCtx, error) { ddlObj.etcdCli, discovery, job.ReorgMeta.ResourceGroupName, - ingest.IngestConcurrency(job.ReorgMeta.Concurrency), + ingest.ResolveConcurrency(job.ReorgMeta.Concurrency), ) } diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index f54e2682977cb..532afb5b54a35 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -2029,7 +2029,7 @@ func (w *worker) executeDistTask(t table.Table, reorgInfo *reorgInfo) error { }) } else { job := reorgInfo.Job - workerCntLimit := ingest.IngestConcurrency(job.ReorgMeta.Concurrency) + workerCntLimit := ingest.ResolveConcurrency(job.ReorgMeta.Concurrency) cpuCount, err := handle.GetCPUCountOfNode(ctx) if err != nil { return err diff --git a/pkg/ddl/ingest/config.go b/pkg/ddl/ingest/config.go index 354b4dbc79223..f0b812afdbf61 100644 --- a/pkg/ddl/ingest/config.go +++ b/pkg/ddl/ingest/config.go @@ -91,8 +91,8 @@ func genConfig( return c, nil } -// IngestConcurrency is the concurrency used during ingest. -func IngestConcurrency(hintConc int) int { +// ResolveConcurrency gets the concurrency used for ingest mode of adding index. +func ResolveConcurrency(hintConc int) int { if hintConc > 0 { return hintConc } @@ -113,7 +113,7 @@ func CopReadBatchSize(hintSize int) int { // represents the max concurrent ongoing coprocessor requests. // It multiplies the tidb_ddl_reorg_worker_cnt by 10. func CopReadChunkPoolSize(hintConc int) int { - return IngestConcurrency(hintConc) * 10 + return ResolveConcurrency(hintConc) * 10 } // NewDDLTLS creates a common.TLS from the tidb config for DDL. From ee33c524343094eae1a81ca1eecc90e66a6fbeca Mon Sep 17 00:00:00 2001 From: tangenta Date: Fri, 9 Aug 2024 21:42:13 +0800 Subject: [PATCH 03/10] update bazel --- pkg/ddl/BUILD.bazel | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/ddl/BUILD.bazel b/pkg/ddl/BUILD.bazel index 5e585c0e0c60c..914731ca6d462 100644 --- a/pkg/ddl/BUILD.bazel +++ b/pkg/ddl/BUILD.bazel @@ -337,7 +337,6 @@ go_test( "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", - "@com_github_pingcap_parser//model:go_default_library", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//oracle", From 64dd55f1a5e58cbc28d453fed2fccb77340c8b3e Mon Sep 17 00:00:00 2001 From: tangenta Date: Fri, 9 Aug 2024 23:59:36 +0800 Subject: [PATCH 04/10] fix build --- pkg/ddl/export_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/ddl/export_test.go b/pkg/ddl/export_test.go index 05641bd0edf27..4aa98bb40fb0a 100644 --- a/pkg/ddl/export_test.go +++ b/pkg/ddl/export_test.go @@ -19,7 +19,6 @@ import ( "time" "github.com/ngaut/pools" - "github.com/pingcap/parser/model" "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/ddl/copr" "github.com/pingcap/tidb/pkg/ddl/session" From b8a32cb986fce41b6dc236c81b51ea223a9da99b Mon Sep 17 00:00:00 2001 From: tangenta Date: Sat, 10 Aug 2024 10:03:46 +0800 Subject: [PATCH 05/10] fix TestVarsutil --- pkg/sessionctx/variable/varsutil_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/sessionctx/variable/varsutil_test.go b/pkg/sessionctx/variable/varsutil_test.go index 3ba450378d65b..0592907bd9752 100644 --- a/pkg/sessionctx/variable/varsutil_test.go +++ b/pkg/sessionctx/variable/varsutil_test.go @@ -251,9 +251,6 @@ func TestVarsutil(t *testing.T) { require.NoError(t, err) require.Equal(t, false, v.OptimizerEnableNewOnlyFullGroupByCheck) - err = v.SetSystemVar(TiDBDDLReorgWorkerCount, "4") // wrong scope global only - require.True(t, terror.ErrorEqual(err, errGlobalVariable)) - err = v.SetSystemVar(TiDBRetryLimit, "3") require.NoError(t, err) val, err = v.GetSessionOrGlobalSystemVar(context.Background(), TiDBRetryLimit) From 7a3b67fe587b9de2f6269a269a7a0e04b11497b7 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 12 Aug 2024 11:29:24 +0800 Subject: [PATCH 06/10] skip init for these two variables --- pkg/sessionctx/variable/sysvar.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/sessionctx/variable/sysvar.go b/pkg/sessionctx/variable/sysvar.go index 031dcac02003a..20f036099bc0b 100644 --- a/pkg/sessionctx/variable/sysvar.go +++ b/pkg/sessionctx/variable/sysvar.go @@ -758,11 +758,11 @@ var defaultSysVars = []*SysVar{ SetDDLFlashbackConcurrency(int32(tidbOptPositiveInt32(val, DefTiDBDDLFlashbackConcurrency))) return nil }}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBDDLReorgWorkerCount, Value: strconv.Itoa(DefTiDBDDLReorgWorkerCount), Type: TypeUnsigned, MinValue: 1, MaxValue: MaxConfigurableConcurrency, SetGlobal: func(_ context.Context, s *SessionVars, val string) error { + {Scope: ScopeGlobal | ScopeSession, Name: TiDBDDLReorgWorkerCount, skipInit: true, Value: strconv.Itoa(DefTiDBDDLReorgWorkerCount), Type: TypeUnsigned, MinValue: 1, MaxValue: MaxConfigurableConcurrency, SetGlobal: func(_ context.Context, s *SessionVars, val string) error { SetDDLReorgWorkerCounter(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgWorkerCount))) return nil }}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBDDLReorgBatchSize, Value: strconv.Itoa(DefTiDBDDLReorgBatchSize), Type: TypeUnsigned, MinValue: int64(MinDDLReorgBatchSize), MaxValue: uint64(MaxDDLReorgBatchSize), SetGlobal: func(_ context.Context, s *SessionVars, val string) error { + {Scope: ScopeGlobal | ScopeSession, Name: TiDBDDLReorgBatchSize, skipInit: true, Value: strconv.Itoa(DefTiDBDDLReorgBatchSize), Type: TypeUnsigned, MinValue: int64(MinDDLReorgBatchSize), MaxValue: uint64(MaxDDLReorgBatchSize), SetGlobal: func(_ context.Context, s *SessionVars, val string) error { SetDDLReorgBatchSize(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgBatchSize))) return nil }}, From 9e9594038a4e7999433994286f8a39854f758f49 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 12 Aug 2024 12:20:08 +0800 Subject: [PATCH 07/10] address comments --- pkg/ddl/backfilling.go | 16 ++++------------ pkg/ddl/backfilling_dist_executor.go | 3 ++- pkg/ddl/backfilling_scheduler.go | 10 ++-------- pkg/ddl/index.go | 2 +- pkg/ddl/ingest/config.go | 13 ++++--------- pkg/parser/model/reorg.go | 22 ++++++++++++++++++++-- 6 files changed, 33 insertions(+), 33 deletions(-) diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index 85cb9252d1875..3b14c41e32cc5 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -178,10 +178,7 @@ func newBackfillCtx(id int, rInfo *reorgInfo, } exprCtx := sessCtx.GetExprCtx() - batchCnt := rInfo.ReorgMeta.BatchSize - if batchCnt == 0 { - batchCnt = int(variable.GetDDLReorgBatchSize()) - } + batchCnt := rInfo.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())) return &backfillCtx{ id: id, ddlCtx: rInfo.d, @@ -419,10 +416,7 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { }) // Change the batch size dynamically. - newBatchCnt := job.ReorgMeta.BatchSize - if newBatchCnt == 0 { - newBatchCnt = int(variable.GetDDLReorgBatchSize()) - } + newBatchCnt := job.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())) w.GetCtx().batchCnt = newBatchCnt result := w.handleBackfillTask(d, task, bf) w.sendResult(result) @@ -683,7 +677,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( //nolint: forcetypeassert discovery := dc.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery() - importConc := ingest.ResolveConcurrency(job.ReorgMeta.Concurrency) + importConc := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) bcCtx, err := ingest.LitBackCtxMgr.Register( ctx, job.ID, hasUnique, dc.etcdCli, discovery, job.ReorgMeta.ResourceGroupName, importConc) if err != nil { @@ -732,8 +726,6 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( zap.Int64s("index IDs", indexIDs)) return errors.Trace(err) } - - concurrency := ingest.ResolveConcurrency(job.ReorgMeta.Concurrency) pipe, err := NewAddIndexIngestPipeline( opCtx, dc.store, @@ -747,7 +739,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( reorgInfo.EndKey, job.ReorgMeta, avgRowSize, - concurrency, + importConc, cpMgr, rowCntListener, ) diff --git a/pkg/ddl/backfilling_dist_executor.go b/pkg/ddl/backfilling_dist_executor.go index a5cbf4217d55d..b635ddd83608c 100644 --- a/pkg/ddl/backfilling_dist_executor.go +++ b/pkg/ddl/backfilling_dist_executor.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/pkg/lightning/common" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/terror" + "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/util/dbterror" "github.com/tikv/client-go/v2/tikv" @@ -153,7 +154,7 @@ func (s *backfillDistExecutor) getBackendCtx() (ingest.BackendCtx, error) { ddlObj.etcdCli, discovery, job.ReorgMeta.ResourceGroupName, - ingest.ResolveConcurrency(job.ReorgMeta.Concurrency), + job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())), ) } diff --git a/pkg/ddl/backfilling_scheduler.go b/pkg/ddl/backfilling_scheduler.go index c0069b377087e..6d8954e2fad94 100644 --- a/pkg/ddl/backfilling_scheduler.go +++ b/pkg/ddl/backfilling_scheduler.go @@ -85,10 +85,7 @@ func newTxnBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *ses if err != nil { return nil, err } - workerCnt := info.ReorgMeta.Concurrency - if workerCnt == 0 { - workerCnt = int(variable.GetDDLReorgWorkerCounter()) - } + workerCnt := info.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) return &txnBackfillScheduler{ ctx: ctx, reorgInfo: info, @@ -235,10 +232,7 @@ func restoreSessCtx(sessCtx sessionctx.Context) func(sessCtx sessionctx.Context) } func (b *txnBackfillScheduler) expectedWorkerSize() (size int) { - workerCnt := b.reorgInfo.ReorgMeta.Concurrency - if workerCnt == 0 { - workerCnt = int(variable.GetDDLReorgWorkerCounter()) - } + workerCnt := b.reorgInfo.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) return min(workerCnt, maxBackfillWorkerSize) } diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 532afb5b54a35..235e8add14f9b 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -2029,7 +2029,7 @@ func (w *worker) executeDistTask(t table.Table, reorgInfo *reorgInfo) error { }) } else { job := reorgInfo.Job - workerCntLimit := ingest.ResolveConcurrency(job.ReorgMeta.Concurrency) + workerCntLimit := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) cpuCount, err := handle.GetCPUCountOfNode(ctx) if err != nil { return err diff --git a/pkg/ddl/ingest/config.go b/pkg/ddl/ingest/config.go index f0b812afdbf61..0f31e64e5e9a3 100644 --- a/pkg/ddl/ingest/config.go +++ b/pkg/ddl/ingest/config.go @@ -91,14 +91,6 @@ func genConfig( return c, nil } -// ResolveConcurrency gets the concurrency used for ingest mode of adding index. -func ResolveConcurrency(hintConc int) int { - if hintConc > 0 { - return hintConc - } - return int(variable.GetDDLReorgWorkerCounter()) -} - // CopReadBatchSize is the batch size of coprocessor read. // It multiplies the tidb_ddl_reorg_batch_size by 10 to avoid // sending too many cop requests for the same handle range. @@ -113,7 +105,10 @@ func CopReadBatchSize(hintSize int) int { // represents the max concurrent ongoing coprocessor requests. // It multiplies the tidb_ddl_reorg_worker_cnt by 10. func CopReadChunkPoolSize(hintConc int) int { - return ResolveConcurrency(hintConc) * 10 + if hintConc > 0 { + return 10 * hintConc + } + return 10 * int(variable.GetDDLReorgWorkerCounter()) } // NewDDLTLS creates a common.TLS from the tidb config for DDL. diff --git a/pkg/parser/model/reorg.go b/pkg/parser/model/reorg.go index 153b3589e9d44..bcdfa9b358efc 100644 --- a/pkg/parser/model/reorg.go +++ b/pkg/parser/model/reorg.go @@ -30,12 +30,30 @@ type DDLReorgMeta struct { ReorgTp ReorgType `json:"reorg_tp"` IsFastReorg bool `json:"is_fast_reorg"` IsDistReorg bool `json:"is_dist_reorg"` - Concurrency int `json:"concurrency"` - BatchSize int `json:"batch_size"` UseCloudStorage bool `json:"use_cloud_storage"` ResourceGroupName string `json:"resource_group_name"` Version int64 `json:"version"` TargetScope string `json:"target_scope"` + // These two variables are set when corresponding session variables are set explicitly. When they are set, + // user cannot change it by setting the global one. Otherwise, they can be adjusted dynamically through global var. + Concurrency int `json:"concurrency"` + BatchSize int `json:"batch_size"` +} + +// GetConcurrencyOrDefault gets the concurrency from DDLReorgMeta or returns the default value. +func (dm *DDLReorgMeta) GetConcurrencyOrDefault(defaultVal int) int { + if dm == nil || dm.Concurrency == 0 { + return defaultVal + } + return dm.Concurrency +} + +// GetBatchSizeOrDefault gets the batch size from DDLReorgMeta or returns the default value. +func (dm *DDLReorgMeta) GetBatchSizeOrDefault(defaultVal int) int { + if dm == nil || dm.BatchSize == 0 { + return defaultVal + } + return dm.BatchSize } const ( From aaba86939852af44d105b8354a85bd605ae5c3e8 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 12 Aug 2024 15:55:26 +0800 Subject: [PATCH 08/10] remove skipInit --- pkg/ddl/cancel_test.go | 4 ++-- pkg/ddl/index_modify_test.go | 2 +- pkg/ddl/ingest/integration_test.go | 2 +- pkg/ddl/tests/adminpause/global.go | 4 ++-- pkg/executor/test/admintest/admin_test.go | 2 +- pkg/sessionctx/variable/sysvar.go | 4 ++-- tests/realtikvtest/addindextest1/disttask_test.go | 2 ++ tests/realtikvtest/addindextest3/ingest_test.go | 14 +++++--------- 8 files changed, 16 insertions(+), 18 deletions(-) diff --git a/pkg/ddl/cancel_test.go b/pkg/ddl/cancel_test.go index f095dae1aa54b..97272226ecd91 100644 --- a/pkg/ddl/cancel_test.go +++ b/pkg/ddl/cancel_test.go @@ -242,8 +242,8 @@ func TestCancel(t *testing.T) { // Change some configurations. ddl.ReorgWaitTimeout = 10 * time.Millisecond - tk.MustExec("set @@global.tidb_ddl_reorg_batch_size = 8") - tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 1") + tk.MustExec("set @@tidb_ddl_reorg_batch_size = 8") + tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 1") tk = testkit.NewTestKit(t, store) tk.MustExec("use test") require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockBackfillSlow", "return")) diff --git a/pkg/ddl/index_modify_test.go b/pkg/ddl/index_modify_test.go index 2e72a2b2b2ecc..5a368fa589ca7 100644 --- a/pkg/ddl/index_modify_test.go +++ b/pkg/ddl/index_modify_test.go @@ -1047,7 +1047,7 @@ func TestAddIndexUniqueFailOnDuplicate(t *testing.T) { tk.MustExec("create table t (a bigint primary key clustered, b int);") // The subtask execution order is not guaranteed in distributed reorg. We need to disable it first. tk.MustExec("set @@global.tidb_enable_dist_task = 0;") - tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 1;") + tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 1;") for i := 1; i <= 12; i++ { tk.MustExec("insert into t values (?, ?)", i, i) } diff --git a/pkg/ddl/ingest/integration_test.go b/pkg/ddl/ingest/integration_test.go index 049aff41e40eb..e47e955b9450c 100644 --- a/pkg/ddl/ingest/integration_test.go +++ b/pkg/ddl/ingest/integration_test.go @@ -87,7 +87,7 @@ func TestIngestError(t *testing.T) { tk.MustExec("set global tidb_enable_dist_task = 0") defer ingesttestutil.InjectMockBackendMgr(t, store)() - tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 1;") + tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 1;") tk.MustExec("create table t (a int primary key, b int);") for i := 0; i < 4; i++ { tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i*10000, i*10000)) diff --git a/pkg/ddl/tests/adminpause/global.go b/pkg/ddl/tests/adminpause/global.go index c75a2ee4d1afc..2d6bd9aa79c0d 100644 --- a/pkg/ddl/tests/adminpause/global.go +++ b/pkg/ddl/tests/adminpause/global.go @@ -35,8 +35,8 @@ func prepareDomain(t *testing.T) (*domain.Domain, *testkit.TestKit, *testkit.Tes adminCommandKit := testkit.NewTestKit(t, store) ddlctrl.ReorgWaitTimeout = 10 * time.Millisecond - stmtKit.MustExec("set @@global.tidb_ddl_reorg_batch_size = 2") - stmtKit.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 1") + stmtKit.MustExec("set @@tidb_ddl_reorg_batch_size = 2") + stmtKit.MustExec("set @@tidb_ddl_reorg_worker_cnt = 1") stmtKit = testkit.NewTestKit(t, store) stmtKit.MustExec("use test") diff --git a/pkg/executor/test/admintest/admin_test.go b/pkg/executor/test/admintest/admin_test.go index d2d79799e89f1..e7fb86d69dd75 100644 --- a/pkg/executor/test/admintest/admin_test.go +++ b/pkg/executor/test/admintest/admin_test.go @@ -2073,7 +2073,7 @@ func TestAdminCheckGlobalIndexDuringDDL(t *testing.T) { } batchSize := 32 - tk.MustExec(fmt.Sprintf("set global tidb_ddl_reorg_batch_size = %d", batchSize)) + tk.MustExec(fmt.Sprintf("set @@tidb_ddl_reorg_batch_size = %d", batchSize)) var enableFastCheck = []bool{false, true} for _, enabled := range enableFastCheck { diff --git a/pkg/sessionctx/variable/sysvar.go b/pkg/sessionctx/variable/sysvar.go index 20f036099bc0b..031dcac02003a 100644 --- a/pkg/sessionctx/variable/sysvar.go +++ b/pkg/sessionctx/variable/sysvar.go @@ -758,11 +758,11 @@ var defaultSysVars = []*SysVar{ SetDDLFlashbackConcurrency(int32(tidbOptPositiveInt32(val, DefTiDBDDLFlashbackConcurrency))) return nil }}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBDDLReorgWorkerCount, skipInit: true, Value: strconv.Itoa(DefTiDBDDLReorgWorkerCount), Type: TypeUnsigned, MinValue: 1, MaxValue: MaxConfigurableConcurrency, SetGlobal: func(_ context.Context, s *SessionVars, val string) error { + {Scope: ScopeGlobal | ScopeSession, Name: TiDBDDLReorgWorkerCount, Value: strconv.Itoa(DefTiDBDDLReorgWorkerCount), Type: TypeUnsigned, MinValue: 1, MaxValue: MaxConfigurableConcurrency, SetGlobal: func(_ context.Context, s *SessionVars, val string) error { SetDDLReorgWorkerCounter(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgWorkerCount))) return nil }}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBDDLReorgBatchSize, skipInit: true, Value: strconv.Itoa(DefTiDBDDLReorgBatchSize), Type: TypeUnsigned, MinValue: int64(MinDDLReorgBatchSize), MaxValue: uint64(MaxDDLReorgBatchSize), SetGlobal: func(_ context.Context, s *SessionVars, val string) error { + {Scope: ScopeGlobal | ScopeSession, Name: TiDBDDLReorgBatchSize, Value: strconv.Itoa(DefTiDBDDLReorgBatchSize), Type: TypeUnsigned, MinValue: int64(MinDDLReorgBatchSize), MaxValue: uint64(MaxDDLReorgBatchSize), SetGlobal: func(_ context.Context, s *SessionVars, val string) error { SetDDLReorgBatchSize(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgBatchSize))) return nil }}, diff --git a/tests/realtikvtest/addindextest1/disttask_test.go b/tests/realtikvtest/addindextest1/disttask_test.go index adccc41486da0..170c2c2746420 100644 --- a/tests/realtikvtest/addindextest1/disttask_test.go +++ b/tests/realtikvtest/addindextest1/disttask_test.go @@ -62,6 +62,7 @@ func TestAddIndexDistBasic(t *testing.T) { bak := variable.GetDDLReorgWorkerCounter() tk.MustExec("set global tidb_ddl_reorg_worker_cnt = 111") + tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 111") require.Equal(t, int32(111), variable.GetDDLReorgWorkerCounter()) tk.MustExec("create table t(a bigint auto_random primary key) partition by hash(a) partitions 20;") tk.MustExec("insert into t values (), (), (), (), (), ()") @@ -80,6 +81,7 @@ func TestAddIndexDistBasic(t *testing.T) { require.Equal(t, 1, task.Concurrency) tk.MustExec(fmt.Sprintf("set global tidb_ddl_reorg_worker_cnt = %d", bak)) + tk.MustExec(fmt.Sprintf("set @@tidb_ddl_reorg_worker_cnt = %d", bak)) require.Equal(t, bak, variable.GetDDLReorgWorkerCounter()) tk.MustExec("create table t1(a bigint auto_random primary key);") diff --git a/tests/realtikvtest/addindextest3/ingest_test.go b/tests/realtikvtest/addindextest3/ingest_test.go index 87ca0b5e2d0ea..d7e548cc7133c 100644 --- a/tests/realtikvtest/addindextest3/ingest_test.go +++ b/tests/realtikvtest/addindextest3/ingest_test.go @@ -191,7 +191,7 @@ func TestAddIndexIngestAdjustBackfillWorker(t *testing.T) { tk.MustExec("create database addindexlit;") tk.MustExec("use addindexlit;") tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) - tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 1;") + tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 1;") tk.MustExec("create table t (a int primary key);") var sb strings.Builder sb.WriteString("insert into t values ") @@ -219,7 +219,7 @@ func TestAddIndexIngestAdjustBackfillWorker(t *testing.T) { running = false case wg := <-ddl.TestCheckWorkerNumCh: offset = (offset + 1) % 3 - tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_reorg_worker_cnt=%d", cnt[offset])) + tk.MustExec(fmt.Sprintf("set @@tidb_ddl_reorg_worker_cnt=%d", cnt[offset])) atomic.StoreInt32(&ddl.TestCheckWorkerNumber, int32(cnt[offset]/2+2)) wg.Done() } @@ -231,7 +231,6 @@ func TestAddIndexIngestAdjustBackfillWorker(t *testing.T) { require.Len(t, rows, 1) jobTp := rows[0][3].(string) require.True(t, strings.Contains(jobTp, "ingest"), jobTp) - tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 4;") } func TestAddIndexIngestAdjustBackfillWorkerCountFail(t *testing.T) { @@ -244,7 +243,7 @@ func TestAddIndexIngestAdjustBackfillWorkerCountFail(t *testing.T) { ingest.ImporterRangeConcurrencyForTest = &atomic.Int32{} ingest.ImporterRangeConcurrencyForTest.Store(2) - tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 20;") + tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 20;") tk.MustExec("create table t (a int primary key);") var sb strings.Builder @@ -262,7 +261,6 @@ func TestAddIndexIngestAdjustBackfillWorkerCountFail(t *testing.T) { require.Len(t, rows, 1) jobTp := rows[0][3].(string) require.True(t, strings.Contains(jobTp, "ingest"), jobTp) - tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 4;") ingest.ImporterRangeConcurrencyForTest = nil } @@ -399,8 +397,8 @@ func TestAddIndexRemoteDuplicateCheck(t *testing.T) { tk.MustExec("create database addindexlit;") tk.MustExec("use addindexlit;") tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) - tk.MustExec("set global tidb_ddl_reorg_worker_cnt=1;") - tk.MustExec("set global tidb_enable_dist_task = 0;") + tk.MustExec("set @@tidb_ddl_reorg_worker_cnt=1;") + tk.MustExec("set @@tidb_enable_dist_task = 0;") tk.MustExec("create table t(id int primary key, b int, k int);") tk.MustQuery("split table t by (30000);").Check(testkit.Rows("1 1")) @@ -410,8 +408,6 @@ func TestAddIndexRemoteDuplicateCheck(t *testing.T) { ingest.ForceSyncFlagForTest = true tk.MustGetErrMsg("alter table t add unique index idx(b);", "[kv:1062]Duplicate entry '1' for key 't.idx'") ingest.ForceSyncFlagForTest = false - - tk.MustExec("set global tidb_ddl_reorg_worker_cnt=4;") } func TestAddIndexBackfillLostUpdate(t *testing.T) { From 068ad158d50ca88d86282ccb514be4ed3563f9cd Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 12 Aug 2024 16:33:11 +0800 Subject: [PATCH 09/10] fix TestAddIndexRemoteDuplicateCheck --- tests/realtikvtest/addindextest3/ingest_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/realtikvtest/addindextest3/ingest_test.go b/tests/realtikvtest/addindextest3/ingest_test.go index d7e548cc7133c..d3e59b351f4dc 100644 --- a/tests/realtikvtest/addindextest3/ingest_test.go +++ b/tests/realtikvtest/addindextest3/ingest_test.go @@ -398,7 +398,7 @@ func TestAddIndexRemoteDuplicateCheck(t *testing.T) { tk.MustExec("use addindexlit;") tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) tk.MustExec("set @@tidb_ddl_reorg_worker_cnt=1;") - tk.MustExec("set @@tidb_enable_dist_task = 0;") + tk.MustExec("set @@global tidb_enable_dist_task = 0;") tk.MustExec("create table t(id int primary key, b int, k int);") tk.MustQuery("split table t by (30000);").Check(testkit.Rows("1 1")) From d02c6ac4e6909cd9b71fc3e88c9f16a22dc7d4d7 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 12 Aug 2024 16:59:28 +0800 Subject: [PATCH 10/10] fix TestAddIndexRemoteDuplicateCheck --- tests/realtikvtest/addindextest3/ingest_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/realtikvtest/addindextest3/ingest_test.go b/tests/realtikvtest/addindextest3/ingest_test.go index d3e59b351f4dc..0b7b6c805df5a 100644 --- a/tests/realtikvtest/addindextest3/ingest_test.go +++ b/tests/realtikvtest/addindextest3/ingest_test.go @@ -398,7 +398,7 @@ func TestAddIndexRemoteDuplicateCheck(t *testing.T) { tk.MustExec("use addindexlit;") tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) tk.MustExec("set @@tidb_ddl_reorg_worker_cnt=1;") - tk.MustExec("set @@global tidb_enable_dist_task = 0;") + tk.MustExec("set @@global.tidb_enable_dist_task = 0;") tk.MustExec("create table t(id int primary key, b int, k int);") tk.MustQuery("split table t by (30000);").Check(testkit.Rows("1 1"))