From fcb37c679f3d9a68ec4bc493080c289eb0ca2821 Mon Sep 17 00:00:00 2001 From: kennytm Date: Fri, 15 Sep 2023 02:10:34 +0800 Subject: [PATCH 1/6] lightning: allow configure the SQL-statement-length for logical backend Signed-off-by: kennytm --- br/pkg/lightning/backend/tidb/tidb.go | 20 +++--- br/pkg/lightning/backend/tidb/tidb_test.go | 65 ++++++++++++++++--- br/pkg/lightning/config/config.go | 8 +++ br/pkg/lightning/importer/import.go | 2 +- .../lightning_tidb_duplicate_data/error.toml | 1 + .../lightning_tidb_duplicate_data/ignore.toml | 1 + .../replace.toml | 1 + br/tidb-lightning.toml | 5 ++ 8 files changed, 84 insertions(+), 19 deletions(-) diff --git a/br/pkg/lightning/backend/tidb/tidb.go b/br/pkg/lightning/backend/tidb/tidb.go index 935b8aa36184a..d56af37cf836e 100644 --- a/br/pkg/lightning/backend/tidb/tidb.go +++ b/br/pkg/lightning/backend/tidb/tidb.go @@ -272,6 +272,9 @@ type tidbBackend struct { // view should be the same. onDuplicate string errorMgr *errormanager.ErrorManager + // maxChunkSize is the target size of each INSERT SQL statement to be sent to downstream. + // sometimes we want to reduce the txn size to avoid affecting the cluster too much. + maxChunkSize int } var _ backend.Backend = (*tidbBackend)(nil) @@ -285,6 +288,7 @@ func NewTiDBBackend( db *sql.DB, conflict config.Conflict, errorMgr *errormanager.ErrorManager, + maxChunkSize int, ) backend.Backend { var onDuplicate string switch conflict.Strategy { @@ -305,10 +309,11 @@ func NewTiDBBackend( onDuplicate = config.ErrorOnDup } return &tidbBackend{ - db: db, - conflictCfg: conflict, - onDuplicate: onDuplicate, - errorMgr: errorMgr, + db: db, + conflictCfg: conflict, + onDuplicate: onDuplicate, + errorMgr: errorMgr, + maxChunkSize: maxChunkSize, } } @@ -581,11 +586,8 @@ func (*tidbBackend) RetryImportDelay() time.Duration { return 0 } -func (*tidbBackend) MaxChunkSize() int { - failpoint.Inject("FailIfImportedSomeRows", func() { - failpoint.Return(1) - }) - return 1048576 +func (be *tidbBackend) MaxChunkSize() int { + return be.maxChunkSize } func (*tidbBackend) ShouldPostProcess() bool { diff --git a/br/pkg/lightning/backend/tidb/tidb_test.go b/br/pkg/lightning/backend/tidb/tidb_test.go index 5d031d31b4448..d5f1bece66742 100644 --- a/br/pkg/lightning/backend/tidb/tidb_test.go +++ b/br/pkg/lightning/backend/tidb/tidb_test.go @@ -73,7 +73,7 @@ func createMysqlSuite(t *testing.T) *mysqlSuite { cfg.Conflict.Strategy = config.ReplaceOnDup cfg.Conflict.Threshold = math.MaxInt64 cfg.Conflict.MaxRecordRows = 100 - backendObj := tidb.NewTiDBBackend(context.Background(), db, cfg.Conflict, errormanager.New(nil, cfg, log.L())) + backendObj := tidb.NewTiDBBackend(context.Background(), db, cfg.Conflict, errormanager.New(nil, cfg, log.L()), config.DefaultSQLStatementLength) return &mysqlSuite{ dbHandle: db, mockDB: mock, @@ -166,7 +166,7 @@ func TestWriteRowsIgnoreOnDup(t *testing.T) { cfg.Conflict.Strategy = config.IgnoreOnDup cfg.Conflict.Threshold = math.MaxInt64 cfg.Conflict.MaxRecordRows = 0 - ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg.Conflict, errormanager.New(nil, cfg, logger)) + ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg.Conflict, errormanager.New(nil, cfg, logger), config.DefaultSQLStatementLength) engine, err := backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) require.NoError(t, err) @@ -193,7 +193,7 @@ func TestWriteRowsIgnoreOnDup(t *testing.T) { // test conflict.strategy == ignore and not 0 conflict.max-record-rows will use ErrorOnDup cfg.Conflict.MaxRecordRows = 10 - ignoreBackend = tidb.NewTiDBBackend(ctx, s.dbHandle, cfg.Conflict, errormanager.New(nil, cfg, logger)) + ignoreBackend = tidb.NewTiDBBackend(ctx, s.dbHandle, cfg.Conflict, errormanager.New(nil, cfg, logger), config.DefaultSQLStatementLength) engine, err = backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) require.NoError(t, err) @@ -246,7 +246,7 @@ func TestWriteRowsErrorOnDup(t *testing.T) { cfg.Conflict.Strategy = config.ErrorOnDup cfg.Conflict.Threshold = math.MaxInt64 cfg.Conflict.MaxRecordRows = 0 - ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg.Conflict, errormanager.New(nil, cfg, logger)) + ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg.Conflict, errormanager.New(nil, cfg, logger), config.DefaultSQLStatementLength) engine, err := backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) require.NoError(t, err) @@ -536,8 +536,12 @@ func TestWriteRowsErrorNoRetry(t *testing.T) { cfg.Conflict.Strategy = config.ErrorOnDup cfg.Conflict.Threshold = 0 cfg.Conflict.MaxRecordRows = 0 - ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg.Conflict, + ignoreBackend := tidb.NewTiDBBackend( + context.Background(), + s.dbHandle, + cfg.Conflict, errormanager.New(s.dbHandle, cfg, log.L()), + config.DefaultSQLStatementLength, ) encBuilder := tidb.NewEncodingBuilder() dataRows := encodeRowsTiDB(t, encBuilder, s.tbl) @@ -602,8 +606,12 @@ func TestWriteRowsErrorDowngradingAll(t *testing.T) { cfg.Conflict.MaxRecordRows = 10 cfg.App.TaskInfoSchemaName = "tidb_lightning_errors" cfg.App.MaxError.Type = *atomic.NewInt64(10) - ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg.Conflict, + ignoreBackend := tidb.NewTiDBBackend( + context.Background(), + s.dbHandle, + cfg.Conflict, errormanager.New(s.dbHandle, cfg, log.L()), + config.DefaultSQLStatementLength, ) encBuilder := tidb.NewEncodingBuilder() dataRows := encodeRowsTiDB(t, encBuilder, s.tbl) @@ -657,8 +665,12 @@ func TestWriteRowsErrorDowngradingExceedThreshold(t *testing.T) { cfg.Conflict.MaxRecordRows = 10 cfg.App.TaskInfoSchemaName = "tidb_lightning_errors" cfg.App.MaxError.Type = *atomic.NewInt64(3) - ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg.Conflict, + ignoreBackend := tidb.NewTiDBBackend( + context.Background(), + s.dbHandle, + cfg.Conflict, errormanager.New(s.dbHandle, cfg, log.L()), + config.DefaultSQLStatementLength, ) encBuilder := tidb.NewEncodingBuilder() dataRows := encodeRowsTiDB(t, encBuilder, s.tbl) @@ -699,8 +711,12 @@ func TestWriteRowsRecordOneError(t *testing.T) { cfg.Conflict.Threshold = 0 cfg.Conflict.MaxRecordRows = 0 cfg.App.TaskInfoSchemaName = "tidb_lightning_errors" - ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg.Conflict, + ignoreBackend := tidb.NewTiDBBackend( + context.Background(), + s.dbHandle, + cfg.Conflict, errormanager.New(s.dbHandle, cfg, log.L()), + config.DefaultSQLStatementLength, ) encBuilder := tidb.NewEncodingBuilder() dataRows := encodeRowsTiDB(t, encBuilder, s.tbl) @@ -728,8 +744,12 @@ func TestDuplicateThreshold(t *testing.T) { cfg.Conflict.Strategy = config.IgnoreOnDup cfg.Conflict.Threshold = 5 cfg.Conflict.MaxRecordRows = 0 - ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg.Conflict, + ignoreBackend := tidb.NewTiDBBackend( + context.Background(), + s.dbHandle, + cfg.Conflict, errormanager.New(s.dbHandle, cfg, log.L()), + config.DefaultSQLStatementLength, ) encBuilder := tidb.NewEncodingBuilder() dataRows := encodeRowsTiDB(t, encBuilder, s.tbl) @@ -851,3 +871,30 @@ func TestEncodeRowForRecord(t *testing.T) { }, []int{0, -1, -1, -1, -1, -1, -1, -1, 1, 2, 3, -1, -1, -1}) require.Equal(t, row, "(5, \"test test\", \x00\x00\x00\xab\xcd\xef)") } + +func TestSQLStatementLength(t *testing.T) { + s := createMysqlSuite(t) + defer s.TearDownTest(t) + + s.mockDB. + ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1),(2),(3)\\E"). + WillReturnResult(sqlmock.NewResult(3, 3)) + s.mockDB. + ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(4),(5)\\E"). + WillReturnResult(sqlmock.NewResult(2, 2)) + + ctx := context.Background() + logger := log.L() + + cfg := config.NewConfig() + cfg.Conflict.Strategy = config.ErrorOnDup + ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg.Conflict, errormanager.New(nil, cfg, logger), 9) + encBuilder := tidb.NewEncodingBuilder() + dataRows := encodeRowsTiDB(t, encBuilder, s.tbl) + engine, err := backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) + require.NoError(t, err) + writer, err := engine.LocalWriter(ctx, &backend.LocalWriterConfig{TableName: "`foo`.`bar`"}) + require.NoError(t, err) + err = writer.AppendRows(ctx, []string{"a"}, dataRows) + require.NoError(t, err) +} diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index d293a41f7916d..f3c38eec229bf 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -84,6 +84,7 @@ const ( defaultIndexConcurrency = 2 DefaultRegionCheckBackoffLimit = 1800 DefaultRegionSplitBatchSize = 4096 + DefaultSQLStatementLength = 1 * units.MiB // defaultMetaSchemaName is the default database name used to store lightning metadata defaultMetaSchemaName = "lightning_metadata" @@ -1060,6 +1061,7 @@ type TikvImporter struct { DiskQuota ByteSize `toml:"disk-quota" json:"disk-quota"` RangeConcurrency int `toml:"range-concurrency" json:"range-concurrency"` DuplicateResolution DuplicateResolutionAlgorithm `toml:"duplicate-resolution" json:"duplicate-resolution"` + SQLStatementLength ByteSize `toml:"sql-statement-length" json:"sql-statement-length"` // deprecated, use ParallelImport instead. IncrementalImport bool `toml:"incremental-import" json:"incremental-import"` ParallelImport bool `toml:"parallel-import" json:"parallel-import"` @@ -1084,6 +1086,11 @@ func (t *TikvImporter) adjust() error { } switch t.Backend { case BackendTiDB: + if t.SQLStatementLength <= 0 { + return common.ErrInvalidConfig.GenWithStack( + "`tikv-importer.sql-statement-length` got %d, should be larger than 0", + t.SQLStatementLength) + } t.DuplicateResolution = DupeResAlgNone case BackendLocal: if t.RegionSplitBatchSize <= 0 { @@ -1458,6 +1465,7 @@ func NewConfig() *Config { DiskQuota: ByteSize(math.MaxInt64), DuplicateResolution: DupeResAlgNone, PausePDSchedulerScope: PausePDSchedulerScopeTable, + SQLStatementLength: ByteSize(DefaultSQLStatementLength), }, PostRestore: PostRestore{ Checksum: OpLevelRequired, diff --git a/br/pkg/lightning/importer/import.go b/br/pkg/lightning/importer/import.go index 72353daa7be82..aad8aba8684a3 100644 --- a/br/pkg/lightning/importer/import.go +++ b/br/pkg/lightning/importer/import.go @@ -343,7 +343,7 @@ func NewImportControllerWithPauser( switch cfg.TikvImporter.Backend { case config.BackendTiDB: encodingBuilder = tidb.NewEncodingBuilder() - backendObj = tidb.NewTiDBBackend(ctx, db, cfg.Conflict, errorMgr) + backendObj = tidb.NewTiDBBackend(ctx, db, cfg.Conflict, errorMgr, int(cfg.TikvImporter.SQLStatementLength)) case config.BackendLocal: var rLimit local.RlimT rLimit, err = local.GetSystemRLimit() diff --git a/br/tests/lightning_tidb_duplicate_data/error.toml b/br/tests/lightning_tidb_duplicate_data/error.toml index bca2d5218a071..d328d326183ee 100644 --- a/br/tests/lightning_tidb_duplicate_data/error.toml +++ b/br/tests/lightning_tidb_duplicate_data/error.toml @@ -1,3 +1,4 @@ [tikv-importer] backend = "tidb" on-duplicate = "error" +sql-statement-length = 1 diff --git a/br/tests/lightning_tidb_duplicate_data/ignore.toml b/br/tests/lightning_tidb_duplicate_data/ignore.toml index a67cd6e47ceb5..9815f591940f8 100644 --- a/br/tests/lightning_tidb_duplicate_data/ignore.toml +++ b/br/tests/lightning_tidb_duplicate_data/ignore.toml @@ -1,3 +1,4 @@ [tikv-importer] backend = "tidb" on-duplicate = "ignore" +sql-statement-length = 1 diff --git a/br/tests/lightning_tidb_duplicate_data/replace.toml b/br/tests/lightning_tidb_duplicate_data/replace.toml index 8b6826568fcd9..a749a4e658516 100644 --- a/br/tests/lightning_tidb_duplicate_data/replace.toml +++ b/br/tests/lightning_tidb_duplicate_data/replace.toml @@ -1,3 +1,4 @@ [tikv-importer] backend = "tidb" on-duplicate = "replace" +sql-statement-length = 1 diff --git a/br/tidb-lightning.toml b/br/tidb-lightning.toml index aaf79f23f4329..2bd341b59fe05 100644 --- a/br/tidb-lightning.toml +++ b/br/tidb-lightning.toml @@ -138,6 +138,11 @@ addr = "127.0.0.1:8287" #local-writer-mem-cache-size = '128MiB' # Limit the write bandwidth to each tikv store. The unit is 'Bytes per second'. 0 means no limit. #store-write-bwlimit = 0 +# The desired length of each INSERT/REPLACE statement sent to the downstream TiDB in each single transaction. +# This is not a hard limit; the actual SQL executed may be longer or shorter depending on the actual content imported. +# The default is 1.0 MiB which is optimized for import speed when Lightning is the only client of the cluster. +# This value may be decreased to reduce the stress on the cluster due to large transaction. +#sql-statement-length = '1MiB' [mydumper] # block size of file reading From cbd0edf0f56668d9cbd276923fb056716176fe80 Mon Sep 17 00:00:00 2001 From: kennytm Date: Tue, 19 Sep 2023 11:40:44 +0800 Subject: [PATCH 2/6] lightning: rename sql-statement-length to tidb-write-throughput-limit Signed-off-by: kennytm --- br/pkg/lightning/backend/tidb/tidb_test.go | 20 ++++----- br/pkg/lightning/config/config.go | 41 ++++++++++--------- br/pkg/lightning/importer/import.go | 2 +- .../lightning_tidb_duplicate_data/error.toml | 2 +- .../lightning_tidb_duplicate_data/ignore.toml | 2 +- .../replace.toml | 2 +- br/tidb-lightning.toml | 5 ++- 7 files changed, 38 insertions(+), 36 deletions(-) diff --git a/br/pkg/lightning/backend/tidb/tidb_test.go b/br/pkg/lightning/backend/tidb/tidb_test.go index d5f1bece66742..6494ae42b85ef 100644 --- a/br/pkg/lightning/backend/tidb/tidb_test.go +++ b/br/pkg/lightning/backend/tidb/tidb_test.go @@ -73,7 +73,7 @@ func createMysqlSuite(t *testing.T) *mysqlSuite { cfg.Conflict.Strategy = config.ReplaceOnDup cfg.Conflict.Threshold = math.MaxInt64 cfg.Conflict.MaxRecordRows = 100 - backendObj := tidb.NewTiDBBackend(context.Background(), db, cfg.Conflict, errormanager.New(nil, cfg, log.L()), config.DefaultSQLStatementLength) + backendObj := tidb.NewTiDBBackend(context.Background(), db, cfg.Conflict, errormanager.New(nil, cfg, log.L()), config.DefaultTiDBWriteThroughputLimit) return &mysqlSuite{ dbHandle: db, mockDB: mock, @@ -166,7 +166,7 @@ func TestWriteRowsIgnoreOnDup(t *testing.T) { cfg.Conflict.Strategy = config.IgnoreOnDup cfg.Conflict.Threshold = math.MaxInt64 cfg.Conflict.MaxRecordRows = 0 - ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg.Conflict, errormanager.New(nil, cfg, logger), config.DefaultSQLStatementLength) + ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg.Conflict, errormanager.New(nil, cfg, logger), config.DefaultTiDBWriteThroughputLimit) engine, err := backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) require.NoError(t, err) @@ -193,7 +193,7 @@ func TestWriteRowsIgnoreOnDup(t *testing.T) { // test conflict.strategy == ignore and not 0 conflict.max-record-rows will use ErrorOnDup cfg.Conflict.MaxRecordRows = 10 - ignoreBackend = tidb.NewTiDBBackend(ctx, s.dbHandle, cfg.Conflict, errormanager.New(nil, cfg, logger), config.DefaultSQLStatementLength) + ignoreBackend = tidb.NewTiDBBackend(ctx, s.dbHandle, cfg.Conflict, errormanager.New(nil, cfg, logger), config.DefaultTiDBWriteThroughputLimit) engine, err = backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) require.NoError(t, err) @@ -246,7 +246,7 @@ func TestWriteRowsErrorOnDup(t *testing.T) { cfg.Conflict.Strategy = config.ErrorOnDup cfg.Conflict.Threshold = math.MaxInt64 cfg.Conflict.MaxRecordRows = 0 - ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg.Conflict, errormanager.New(nil, cfg, logger), config.DefaultSQLStatementLength) + ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg.Conflict, errormanager.New(nil, cfg, logger), config.DefaultTiDBWriteThroughputLimit) engine, err := backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) require.NoError(t, err) @@ -541,7 +541,7 @@ func TestWriteRowsErrorNoRetry(t *testing.T) { s.dbHandle, cfg.Conflict, errormanager.New(s.dbHandle, cfg, log.L()), - config.DefaultSQLStatementLength, + config.DefaultTiDBWriteThroughputLimit, ) encBuilder := tidb.NewEncodingBuilder() dataRows := encodeRowsTiDB(t, encBuilder, s.tbl) @@ -611,7 +611,7 @@ func TestWriteRowsErrorDowngradingAll(t *testing.T) { s.dbHandle, cfg.Conflict, errormanager.New(s.dbHandle, cfg, log.L()), - config.DefaultSQLStatementLength, + config.DefaultTiDBWriteThroughputLimit, ) encBuilder := tidb.NewEncodingBuilder() dataRows := encodeRowsTiDB(t, encBuilder, s.tbl) @@ -670,7 +670,7 @@ func TestWriteRowsErrorDowngradingExceedThreshold(t *testing.T) { s.dbHandle, cfg.Conflict, errormanager.New(s.dbHandle, cfg, log.L()), - config.DefaultSQLStatementLength, + config.DefaultTiDBWriteThroughputLimit, ) encBuilder := tidb.NewEncodingBuilder() dataRows := encodeRowsTiDB(t, encBuilder, s.tbl) @@ -716,7 +716,7 @@ func TestWriteRowsRecordOneError(t *testing.T) { s.dbHandle, cfg.Conflict, errormanager.New(s.dbHandle, cfg, log.L()), - config.DefaultSQLStatementLength, + config.DefaultTiDBWriteThroughputLimit, ) encBuilder := tidb.NewEncodingBuilder() dataRows := encodeRowsTiDB(t, encBuilder, s.tbl) @@ -749,7 +749,7 @@ func TestDuplicateThreshold(t *testing.T) { s.dbHandle, cfg.Conflict, errormanager.New(s.dbHandle, cfg, log.L()), - config.DefaultSQLStatementLength, + config.DefaultTiDBWriteThroughputLimit, ) encBuilder := tidb.NewEncodingBuilder() dataRows := encodeRowsTiDB(t, encBuilder, s.tbl) @@ -872,7 +872,7 @@ func TestEncodeRowForRecord(t *testing.T) { require.Equal(t, row, "(5, \"test test\", \x00\x00\x00\xab\xcd\xef)") } -func TestSQLStatementLength(t *testing.T) { +func TestTiDBWriteThroughputLimit(t *testing.T) { s := createMysqlSuite(t) defer s.TearDownTest(t) diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index f3c38eec229bf..38de6fdd5ef22 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -84,7 +84,7 @@ const ( defaultIndexConcurrency = 2 DefaultRegionCheckBackoffLimit = 1800 DefaultRegionSplitBatchSize = 4096 - DefaultSQLStatementLength = 1 * units.MiB + DefaultTiDBWriteThroughputLimit = 1 * units.MiB // defaultMetaSchemaName is the default database name used to store lightning metadata defaultMetaSchemaName = "lightning_metadata" @@ -1061,16 +1061,17 @@ type TikvImporter struct { DiskQuota ByteSize `toml:"disk-quota" json:"disk-quota"` RangeConcurrency int `toml:"range-concurrency" json:"range-concurrency"` DuplicateResolution DuplicateResolutionAlgorithm `toml:"duplicate-resolution" json:"duplicate-resolution"` - SQLStatementLength ByteSize `toml:"sql-statement-length" json:"sql-statement-length"` // deprecated, use ParallelImport instead. IncrementalImport bool `toml:"incremental-import" json:"incremental-import"` ParallelImport bool `toml:"parallel-import" json:"parallel-import"` KeyspaceName string `toml:"keyspace-name" json:"keyspace-name"` AddIndexBySQL bool `toml:"add-index-by-sql" json:"add-index-by-sql"` - EngineMemCacheSize ByteSize `toml:"engine-mem-cache-size" json:"engine-mem-cache-size"` - LocalWriterMemCacheSize ByteSize `toml:"local-writer-mem-cache-size" json:"local-writer-mem-cache-size"` - StoreWriteBWLimit ByteSize `toml:"store-write-bwlimit" json:"store-write-bwlimit"` + EngineMemCacheSize ByteSize `toml:"engine-mem-cache-size" json:"engine-mem-cache-size"` + LocalWriterMemCacheSize ByteSize `toml:"local-writer-mem-cache-size" json:"local-writer-mem-cache-size"` + StoreWriteBWLimit ByteSize `toml:"store-write-bwlimit" json:"store-write-bwlimit"` + TiDBWriteThroughputLimit ByteSize `toml:"tidb-write-throughput-limit" json:"tidb-write-throughput-limit"` + // default is PausePDSchedulerScopeTable to compatible with previous version(>= 6.1) PausePDSchedulerScope PausePDSchedulerScope `toml:"pause-pd-scheduler-scope" json:"pause-pd-scheduler-scope"` } @@ -1086,10 +1087,10 @@ func (t *TikvImporter) adjust() error { } switch t.Backend { case BackendTiDB: - if t.SQLStatementLength <= 0 { + if t.TiDBWriteThroughputLimit <= 0 { return common.ErrInvalidConfig.GenWithStack( - "`tikv-importer.sql-statement-length` got %d, should be larger than 0", - t.SQLStatementLength) + "`tikv-importer.tidb-write-throughput-limit` got %d, should be larger than 0", + t.TiDBWriteThroughputLimit) } t.DuplicateResolution = DupeResAlgNone case BackendLocal: @@ -1454,18 +1455,18 @@ func NewConfig() *Config { DataInvalidCharReplace: string(defaultCSVDataInvalidCharReplace), }, TikvImporter: TikvImporter{ - Backend: "", - MaxKVPairs: 4096, - SendKVPairs: 32768, - SendKVSize: KVWriteBatchSize, - RegionSplitSize: 0, - RegionSplitBatchSize: DefaultRegionSplitBatchSize, - RegionSplitConcurrency: runtime.GOMAXPROCS(0), - RegionCheckBackoffLimit: DefaultRegionCheckBackoffLimit, - DiskQuota: ByteSize(math.MaxInt64), - DuplicateResolution: DupeResAlgNone, - PausePDSchedulerScope: PausePDSchedulerScopeTable, - SQLStatementLength: ByteSize(DefaultSQLStatementLength), + Backend: "", + MaxKVPairs: 4096, + SendKVPairs: 32768, + SendKVSize: KVWriteBatchSize, + RegionSplitSize: 0, + RegionSplitBatchSize: DefaultRegionSplitBatchSize, + RegionSplitConcurrency: runtime.GOMAXPROCS(0), + RegionCheckBackoffLimit: DefaultRegionCheckBackoffLimit, + DiskQuota: ByteSize(math.MaxInt64), + DuplicateResolution: DupeResAlgNone, + PausePDSchedulerScope: PausePDSchedulerScopeTable, + TiDBWriteThroughputLimit: ByteSize(DefaultTiDBWriteThroughputLimit), }, PostRestore: PostRestore{ Checksum: OpLevelRequired, diff --git a/br/pkg/lightning/importer/import.go b/br/pkg/lightning/importer/import.go index aad8aba8684a3..0b2d3bb905e55 100644 --- a/br/pkg/lightning/importer/import.go +++ b/br/pkg/lightning/importer/import.go @@ -343,7 +343,7 @@ func NewImportControllerWithPauser( switch cfg.TikvImporter.Backend { case config.BackendTiDB: encodingBuilder = tidb.NewEncodingBuilder() - backendObj = tidb.NewTiDBBackend(ctx, db, cfg.Conflict, errorMgr, int(cfg.TikvImporter.SQLStatementLength)) + backendObj = tidb.NewTiDBBackend(ctx, db, cfg.Conflict, errorMgr, int(cfg.TikvImporter.TiDBWriteThroughputLimit)) case config.BackendLocal: var rLimit local.RlimT rLimit, err = local.GetSystemRLimit() diff --git a/br/tests/lightning_tidb_duplicate_data/error.toml b/br/tests/lightning_tidb_duplicate_data/error.toml index d328d326183ee..bb6d84aeb38e6 100644 --- a/br/tests/lightning_tidb_duplicate_data/error.toml +++ b/br/tests/lightning_tidb_duplicate_data/error.toml @@ -1,4 +1,4 @@ [tikv-importer] backend = "tidb" on-duplicate = "error" -sql-statement-length = 1 +tidb-write-throughput-limit = 1 diff --git a/br/tests/lightning_tidb_duplicate_data/ignore.toml b/br/tests/lightning_tidb_duplicate_data/ignore.toml index 9815f591940f8..31a20e811b871 100644 --- a/br/tests/lightning_tidb_duplicate_data/ignore.toml +++ b/br/tests/lightning_tidb_duplicate_data/ignore.toml @@ -1,4 +1,4 @@ [tikv-importer] backend = "tidb" on-duplicate = "ignore" -sql-statement-length = 1 +tidb-write-throughput-limit = 1 diff --git a/br/tests/lightning_tidb_duplicate_data/replace.toml b/br/tests/lightning_tidb_duplicate_data/replace.toml index a749a4e658516..e0108d48a4e8c 100644 --- a/br/tests/lightning_tidb_duplicate_data/replace.toml +++ b/br/tests/lightning_tidb_duplicate_data/replace.toml @@ -1,4 +1,4 @@ [tikv-importer] backend = "tidb" on-duplicate = "replace" -sql-statement-length = 1 +tidb-write-throughput-limit = 1 diff --git a/br/tidb-lightning.toml b/br/tidb-lightning.toml index 2bd341b59fe05..61ce93a4947d3 100644 --- a/br/tidb-lightning.toml +++ b/br/tidb-lightning.toml @@ -138,11 +138,12 @@ addr = "127.0.0.1:8287" #local-writer-mem-cache-size = '128MiB' # Limit the write bandwidth to each tikv store. The unit is 'Bytes per second'. 0 means no limit. #store-write-bwlimit = 0 -# The desired length of each INSERT/REPLACE statement sent to the downstream TiDB in each single transaction. +# Limit the throughput to downstream TiDB server in logical mode (TiDB backend). +# This is the desired length of the VALUES part of each INSERT/REPLACE statement executed in a single transaction. # This is not a hard limit; the actual SQL executed may be longer or shorter depending on the actual content imported. # The default is 1.0 MiB which is optimized for import speed when Lightning is the only client of the cluster. # This value may be decreased to reduce the stress on the cluster due to large transaction. -#sql-statement-length = '1MiB' +#tidb-write-throughput-limit = '1MiB' [mydumper] # block size of file reading From 357fd475305549802bbde53a64930bf3c9aed1fe Mon Sep 17 00:00:00 2001 From: kennytm Date: Mon, 8 Jan 2024 17:08:29 +0800 Subject: [PATCH 3/6] lightning: fix bazel_prepare diff Signed-off-by: kennytm --- br/pkg/lightning/backend/tidb/BUILD.bazel | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/lightning/backend/tidb/BUILD.bazel b/br/pkg/lightning/backend/tidb/BUILD.bazel index 6e715be18eaaa..724d4ff6423c6 100644 --- a/br/pkg/lightning/backend/tidb/BUILD.bazel +++ b/br/pkg/lightning/backend/tidb/BUILD.bazel @@ -37,7 +37,7 @@ go_test( timeout = "short", srcs = ["tidb_test.go"], flaky = True, - shard_count = 14, + shard_count = 15, deps = [ ":tidb", "//br/pkg/lightning/backend", From dd6221f0f1e926f93d9c3160cd419d5eb4989544 Mon Sep 17 00:00:00 2001 From: kennytm Date: Wed, 10 Jan 2024 17:31:00 +0800 Subject: [PATCH 4/6] lightning: added logical-import-batch-{size,rows} config removed SplitIntoChunks() from the Rows interface and also MaxChunkSize() from the Backend interface since the split logic is no longer shared. reduced default logical-import-batch-size from 1M to 96K to match reality. --- br/pkg/lightning/backend/encode/encode.go | 5 -- br/pkg/lightning/backend/kv/sql2kv.go | 31 ------- br/pkg/lightning/backend/kv/sql2kv_test.go | 47 ----------- br/pkg/lightning/backend/tidb/tidb.go | 34 ++++---- br/pkg/lightning/backend/tidb/tidb_test.go | 97 ++++++++++++---------- br/pkg/lightning/config/config.go | 50 ++++++----- br/pkg/lightning/importer/import.go | 2 +- br/tidb-lightning.toml | 12 ++- 8 files changed, 105 insertions(+), 173 deletions(-) diff --git a/br/pkg/lightning/backend/encode/encode.go b/br/pkg/lightning/backend/encode/encode.go index 40d1b4cfb71ad..1f6cca4cb8e93 100644 --- a/br/pkg/lightning/backend/encode/encode.go +++ b/br/pkg/lightning/backend/encode/encode.go @@ -63,11 +63,6 @@ type SessionOptions struct { // Rows represents a collection of encoded rows. type Rows interface { - // SplitIntoChunks splits the rows into multiple consecutive parts, each - // part having total byte size less than `splitSize`. The meaning of "byte - // size" should be consistent with the value used in `Row.ClassifyAndAppend`. - SplitIntoChunks(splitSize int) []Rows - // Clear returns a new collection with empty content. It may share the // capacity with the current instance. The typical usage is `x = x.Clear()`. Clear() Rows diff --git a/br/pkg/lightning/backend/kv/sql2kv.go b/br/pkg/lightning/backend/kv/sql2kv.go index 1d3ef8b1d2038..ccffb672c6260 100644 --- a/br/pkg/lightning/backend/kv/sql2kv.go +++ b/br/pkg/lightning/backend/kv/sql2kv.go @@ -321,37 +321,6 @@ func (kvs *Pairs) ClassifyAndAppend( *indices = indexKVs } -// SplitIntoChunks splits the key-value pairs into chunks. -func (kvs *Pairs) SplitIntoChunks(splitSize int) []encode.Rows { - if len(kvs.Pairs) == 0 { - return nil - } - - res := make([]encode.Rows, 0, 1) - i := 0 - cumSize := 0 - for j, pair := range kvs.Pairs { - size := len(pair.Key) + len(pair.Val) - if i < j && cumSize+size > splitSize { - res = append(res, &Pairs{Pairs: kvs.Pairs[i:j]}) - i = j - cumSize = 0 - } - cumSize += size - } - - if i == 0 { - res = append(res, kvs) - } else { - res = append(res, &Pairs{ - Pairs: kvs.Pairs[i:], - BytesBuf: kvs.BytesBuf, - MemBuf: kvs.MemBuf, - }) - } - return res -} - // Clear clears the key-value pairs. func (kvs *Pairs) Clear() encode.Rows { if kvs.BytesBuf != nil { diff --git a/br/pkg/lightning/backend/kv/sql2kv_test.go b/br/pkg/lightning/backend/kv/sql2kv_test.go index 4b806d6e790c3..6393a6ef623ea 100644 --- a/br/pkg/lightning/backend/kv/sql2kv_test.go +++ b/br/pkg/lightning/backend/kv/sql2kv_test.go @@ -570,53 +570,6 @@ func TestShardRowId(t *testing.T) { require.Equal(t, tbl.Allocators(lkv.GetSession4test(encoder)).Get(autoid.RowIDAllocType).Base(), int64(32)) } -func TestSplitIntoChunks(t *testing.T) { - pairs := []common.KvPair{ - { - Key: []byte{1, 2, 3}, - Val: []byte{4, 5, 6}, - }, - { - Key: []byte{7, 8}, - Val: []byte{9, 0}, - }, - { - Key: []byte{1, 2, 3, 4}, - Val: []byte{5, 6, 7, 8}, - }, - { - Key: []byte{9, 0}, - Val: []byte{1, 2}, - }, - } - - splitBy10 := lkv.MakeRowsFromKvPairs(pairs).SplitIntoChunks(10) - require.Equal(t, splitBy10, []encode.Rows{ - lkv.MakeRowsFromKvPairs(pairs[0:2]), - lkv.MakeRowsFromKvPairs(pairs[2:3]), - lkv.MakeRowsFromKvPairs(pairs[3:4]), - }) - - splitBy12 := lkv.MakeRowsFromKvPairs(pairs).SplitIntoChunks(12) - require.Equal(t, splitBy12, []encode.Rows{ - lkv.MakeRowsFromKvPairs(pairs[0:2]), - lkv.MakeRowsFromKvPairs(pairs[2:4]), - }) - - splitBy1000 := lkv.MakeRowsFromKvPairs(pairs).SplitIntoChunks(1000) - require.Equal(t, splitBy1000, []encode.Rows{ - lkv.MakeRowsFromKvPairs(pairs[0:4]), - }) - - splitBy1 := lkv.MakeRowsFromKvPairs(pairs).SplitIntoChunks(1) - require.Equal(t, splitBy1, []encode.Rows{ - lkv.MakeRowsFromKvPairs(pairs[0:1]), - lkv.MakeRowsFromKvPairs(pairs[1:2]), - lkv.MakeRowsFromKvPairs(pairs[2:3]), - lkv.MakeRowsFromKvPairs(pairs[3:4]), - }) -} - func TestClassifyAndAppend(t *testing.T) { kvs := lkv.MakeRowFromKvPairs([]common.KvPair{ { diff --git a/br/pkg/lightning/backend/tidb/tidb.go b/br/pkg/lightning/backend/tidb/tidb.go index 7320c83b67f44..b525f5f714c42 100644 --- a/br/pkg/lightning/backend/tidb/tidb.go +++ b/br/pkg/lightning/backend/tidb/tidb.go @@ -272,9 +272,11 @@ type tidbBackend struct { // view should be the same. onDuplicate string errorMgr *errormanager.ErrorManager - // maxChunkSize is the target size of each INSERT SQL statement to be sent to downstream. - // sometimes we want to reduce the txn size to avoid affecting the cluster too much. - maxChunkSize int + // maxChunkSize and maxChunkRows are the target size and number of rows of each INSERT SQL + // statement to be sent to downstream. Sometimes we want to reduce the txn size to avoid + // affecting the cluster too much. + maxChunkSize uint64 + maxChunkRows int } var _ backend.Backend = (*tidbBackend)(nil) @@ -286,10 +288,10 @@ var _ backend.Backend = (*tidbBackend)(nil) func NewTiDBBackend( ctx context.Context, db *sql.DB, - conflict config.Conflict, + cfg *config.Config, errorMgr *errormanager.ErrorManager, - maxChunkSize int, ) backend.Backend { + conflict := cfg.Conflict var onDuplicate string switch conflict.Strategy { case config.ErrorOnDup: @@ -313,7 +315,8 @@ func NewTiDBBackend( conflictCfg: conflict, onDuplicate: onDuplicate, errorMgr: errorMgr, - maxChunkSize: maxChunkSize, + maxChunkSize: uint64(cfg.TikvImporter.LogicalImportBatchSize), + maxChunkRows: cfg.TikvImporter.LogicalImportBatchRows, } } @@ -334,18 +337,17 @@ func (row tidbRow) ClassifyAndAppend(data *encode.Rows, checksum *verification.K checksum.Add(&cs) } -func (rows tidbRows) SplitIntoChunks(splitSizeInt int) []encode.Rows { +func (rows tidbRows) splitIntoChunks(splitSize uint64, splitRows int) []tidbRows { if len(rows) == 0 { return nil } - res := make([]encode.Rows, 0, 1) + res := make([]tidbRows, 0, 1) i := 0 cumSize := uint64(0) - splitSize := uint64(splitSizeInt) for j, row := range rows { - if i < j && cumSize+row.Size() > splitSize { + if i < j && (cumSize+row.Size() > splitSize || j-i >= splitRows) { res = append(res, rows[i:j]) i = j cumSize = 0 @@ -586,10 +588,6 @@ func (*tidbBackend) RetryImportDelay() time.Duration { return 0 } -func (be *tidbBackend) MaxChunkSize() int { - return be.maxChunkSize -} - func (*tidbBackend) ShouldPostProcess() bool { return true } @@ -613,7 +611,7 @@ func (*tidbBackend) ImportEngine(context.Context, uuid.UUID, int64, int64) error func (be *tidbBackend) WriteRows(ctx context.Context, tableName string, columnNames []string, rows encode.Rows) error { var err error rowLoop: - for _, r := range rows.SplitIntoChunks(be.MaxChunkSize()) { + for _, r := range rows.(tidbRows).splitIntoChunks(be.maxChunkSize, be.maxChunkRows) { for i := 0; i < writeRowsMaxRetryTimes; i++ { // Write in the batch mode first. err = be.WriteBatchRowsToDB(ctx, tableName, columnNames, r) @@ -650,8 +648,7 @@ type stmtTask struct { // WriteBatchRowsToDB write rows in batch mode, which will insert multiple rows like this: // // insert into t1 values (111), (222), (333), (444); -func (be *tidbBackend) WriteBatchRowsToDB(ctx context.Context, tableName string, columnNames []string, r encode.Rows) error { - rows := r.(tidbRows) +func (be *tidbBackend) WriteBatchRowsToDB(ctx context.Context, tableName string, columnNames []string, rows tidbRows) error { insertStmt := be.checkAndBuildStmt(rows, tableName, columnNames) if insertStmt == nil { return nil @@ -684,8 +681,7 @@ func (be *tidbBackend) checkAndBuildStmt(rows tidbRows, tableName string, column // insert into t1 values (444); // // See more details in br#1366: https://github.com/pingcap/br/issues/1366 -func (be *tidbBackend) WriteRowsToDB(ctx context.Context, tableName string, columnNames []string, r encode.Rows) error { - rows := r.(tidbRows) +func (be *tidbBackend) WriteRowsToDB(ctx context.Context, tableName string, columnNames []string, rows tidbRows) error { insertStmt := be.checkAndBuildStmt(rows, tableName, columnNames) if insertStmt == nil { return nil diff --git a/br/pkg/lightning/backend/tidb/tidb_test.go b/br/pkg/lightning/backend/tidb/tidb_test.go index a3b9b5155d219..28c262d85c0f1 100644 --- a/br/pkg/lightning/backend/tidb/tidb_test.go +++ b/br/pkg/lightning/backend/tidb/tidb_test.go @@ -73,7 +73,7 @@ func createMysqlSuite(t *testing.T) *mysqlSuite { cfg.Conflict.Strategy = config.ReplaceOnDup cfg.Conflict.Threshold = math.MaxInt64 cfg.Conflict.MaxRecordRows = 100 - backendObj := tidb.NewTiDBBackend(context.Background(), db, cfg.Conflict, errormanager.New(nil, cfg, log.L()), config.DefaultTiDBWriteThroughputLimit) + backendObj := tidb.NewTiDBBackend(context.Background(), db, cfg, errormanager.New(nil, cfg, log.L())) return &mysqlSuite{ dbHandle: db, mockDB: mock, @@ -166,7 +166,7 @@ func TestWriteRowsIgnoreOnDup(t *testing.T) { cfg.Conflict.Strategy = config.IgnoreOnDup cfg.Conflict.Threshold = math.MaxInt64 cfg.Conflict.MaxRecordRows = 0 - ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg.Conflict, errormanager.New(nil, cfg, logger), config.DefaultTiDBWriteThroughputLimit) + ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg, errormanager.New(nil, cfg, logger)) engine, err := backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) require.NoError(t, err) @@ -193,7 +193,7 @@ func TestWriteRowsIgnoreOnDup(t *testing.T) { // test conflict.strategy == ignore and not 0 conflict.max-record-rows will use ErrorOnDup cfg.Conflict.MaxRecordRows = 10 - ignoreBackend = tidb.NewTiDBBackend(ctx, s.dbHandle, cfg.Conflict, errormanager.New(nil, cfg, logger), config.DefaultTiDBWriteThroughputLimit) + ignoreBackend = tidb.NewTiDBBackend(ctx, s.dbHandle, cfg, errormanager.New(nil, cfg, logger)) engine, err = backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) require.NoError(t, err) @@ -246,7 +246,7 @@ func TestWriteRowsErrorOnDup(t *testing.T) { cfg.Conflict.Strategy = config.ErrorOnDup cfg.Conflict.Threshold = math.MaxInt64 cfg.Conflict.MaxRecordRows = 0 - ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg.Conflict, errormanager.New(nil, cfg, logger), config.DefaultTiDBWriteThroughputLimit) + ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg, errormanager.New(nil, cfg, logger)) engine, err := backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) require.NoError(t, err) @@ -536,13 +536,7 @@ func TestWriteRowsErrorNoRetry(t *testing.T) { cfg.Conflict.Strategy = config.ErrorOnDup cfg.Conflict.Threshold = 0 cfg.Conflict.MaxRecordRows = 0 - ignoreBackend := tidb.NewTiDBBackend( - context.Background(), - s.dbHandle, - cfg.Conflict, - errormanager.New(s.dbHandle, cfg, log.L()), - config.DefaultTiDBWriteThroughputLimit, - ) + ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg, errormanager.New(s.dbHandle, cfg, log.L())) encBuilder := tidb.NewEncodingBuilder() dataRows := encodeRowsTiDB(t, encBuilder, s.tbl) ctx := context.Background() @@ -606,13 +600,7 @@ func TestWriteRowsErrorDowngradingAll(t *testing.T) { cfg.Conflict.MaxRecordRows = 10 cfg.App.TaskInfoSchemaName = "tidb_lightning_errors" cfg.App.MaxError.Type = *atomic.NewInt64(10) - ignoreBackend := tidb.NewTiDBBackend( - context.Background(), - s.dbHandle, - cfg.Conflict, - errormanager.New(s.dbHandle, cfg, log.L()), - config.DefaultTiDBWriteThroughputLimit, - ) + ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg, errormanager.New(s.dbHandle, cfg, log.L())) encBuilder := tidb.NewEncodingBuilder() dataRows := encodeRowsTiDB(t, encBuilder, s.tbl) ctx := context.Background() @@ -665,13 +653,7 @@ func TestWriteRowsErrorDowngradingExceedThreshold(t *testing.T) { cfg.Conflict.MaxRecordRows = 10 cfg.App.TaskInfoSchemaName = "tidb_lightning_errors" cfg.App.MaxError.Type = *atomic.NewInt64(3) - ignoreBackend := tidb.NewTiDBBackend( - context.Background(), - s.dbHandle, - cfg.Conflict, - errormanager.New(s.dbHandle, cfg, log.L()), - config.DefaultTiDBWriteThroughputLimit, - ) + ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg, errormanager.New(s.dbHandle, cfg, log.L())) encBuilder := tidb.NewEncodingBuilder() dataRows := encodeRowsTiDB(t, encBuilder, s.tbl) ctx := context.Background() @@ -711,13 +693,7 @@ func TestWriteRowsRecordOneError(t *testing.T) { cfg.Conflict.Threshold = 0 cfg.Conflict.MaxRecordRows = 0 cfg.App.TaskInfoSchemaName = "tidb_lightning_errors" - ignoreBackend := tidb.NewTiDBBackend( - context.Background(), - s.dbHandle, - cfg.Conflict, - errormanager.New(s.dbHandle, cfg, log.L()), - config.DefaultTiDBWriteThroughputLimit, - ) + ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg, errormanager.New(s.dbHandle, cfg, log.L())) encBuilder := tidb.NewEncodingBuilder() dataRows := encodeRowsTiDB(t, encBuilder, s.tbl) ctx := context.Background() @@ -744,13 +720,7 @@ func TestDuplicateThreshold(t *testing.T) { cfg.Conflict.Strategy = config.IgnoreOnDup cfg.Conflict.Threshold = 5 cfg.Conflict.MaxRecordRows = 0 - ignoreBackend := tidb.NewTiDBBackend( - context.Background(), - s.dbHandle, - cfg.Conflict, - errormanager.New(s.dbHandle, cfg, log.L()), - config.DefaultTiDBWriteThroughputLimit, - ) + ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg, errormanager.New(s.dbHandle, cfg, log.L())) encBuilder := tidb.NewEncodingBuilder() dataRows := encodeRowsTiDB(t, encBuilder, s.tbl) ctx := context.Background() @@ -872,25 +842,60 @@ func TestEncodeRowForRecord(t *testing.T) { require.Equal(t, row, "(5, \"test test\", \x00\x00\x00\xab\xcd\xef)") } -func TestTiDBWriteThroughputLimit(t *testing.T) { +// TestLogicalImportBatch tests that each INSERT statement is limited by both +// logical-import-batch-size and logical-import-batch-rows configurations. Here +// we ensure each INSERT statement has up to 5 rows *and* ~30 bytes of values. +func TestLogicalImportBatch(t *testing.T) { s := createMysqlSuite(t) defer s.TearDownTest(t) s.mockDB. - ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1),(2),(3)\\E"). - WillReturnResult(sqlmock.NewResult(3, 3)) + ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1),(2),(4),(8),(16)\\E"). + WillReturnResult(sqlmock.NewResult(5, 5)) s.mockDB. - ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(4),(5)\\E"). - WillReturnResult(sqlmock.NewResult(2, 2)) + ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(32),(64),(128),(256),(512)\\E"). + WillReturnResult(sqlmock.NewResult(5, 5)) + s.mockDB. + ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1024),(2048),(4096),(8192)\\E"). + WillReturnResult(sqlmock.NewResult(4, 4)) + s.mockDB. + ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(16384),(32768),(65536),(131072)\\E"). + WillReturnResult(sqlmock.NewResult(4, 4)) + s.mockDB. + ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(262144)\\E"). + WillReturnResult(sqlmock.NewResult(1, 1)) ctx := context.Background() logger := log.L() cfg := config.NewConfig() cfg.Conflict.Strategy = config.ErrorOnDup - ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg.Conflict, errormanager.New(nil, cfg, logger), 9) + cfg.TikvImporter.LogicalImportBatchSize = 30 + cfg.TikvImporter.LogicalImportBatchRows = 5 + ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg, errormanager.New(nil, cfg, logger)) encBuilder := tidb.NewEncodingBuilder() - dataRows := encodeRowsTiDB(t, encBuilder, s.tbl) + encoder, err := encBuilder.NewEncoder(context.Background(), &encode.EncodingConfig{ + Path: "1.csv", + Table: s.tbl, + Logger: log.L(), + }) + require.NoError(t, err) + + dataRows := encBuilder.MakeEmptyRows() + dataChecksum := verification.MakeKVChecksum(0, 0, 0) + indexRows := encBuilder.MakeEmptyRows() + indexChecksum := verification.MakeKVChecksum(0, 0, 0) + for i := int64(0); i < 19; i++ { // encode rows 1, 2, 4, 8, ..., 262144. + row, err := encoder.Encode( + []types.Datum{types.NewIntDatum(1 << i)}, + i, + []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, + 8*i, + ) + require.NoError(t, err) + row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum) + } + engine, err := backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) require.NoError(t, err) writer, err := engine.LocalWriter(ctx, &backend.LocalWriterConfig{TableName: "`foo`.`bar`"}) diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index 3b5e188fa9fec..bcb89872eca96 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -83,7 +83,8 @@ const ( defaultIndexConcurrency = 2 DefaultRegionCheckBackoffLimit = 1800 DefaultRegionSplitBatchSize = 4096 - DefaultTiDBWriteThroughputLimit = 1 * units.MiB + defaultLogicalImportBatchSize = 96 * units.KiB + defaultLogicalImportBatchRows = 65536 // defaultMetaSchemaName is the default database name used to store lightning metadata defaultMetaSchemaName = "lightning_metadata" @@ -1076,10 +1077,11 @@ type TikvImporter struct { KeyspaceName string `toml:"keyspace-name" json:"keyspace-name"` AddIndexBySQL bool `toml:"add-index-by-sql" json:"add-index-by-sql"` - EngineMemCacheSize ByteSize `toml:"engine-mem-cache-size" json:"engine-mem-cache-size"` - LocalWriterMemCacheSize ByteSize `toml:"local-writer-mem-cache-size" json:"local-writer-mem-cache-size"` - StoreWriteBWLimit ByteSize `toml:"store-write-bwlimit" json:"store-write-bwlimit"` - TiDBWriteThroughputLimit ByteSize `toml:"tidb-write-throughput-limit" json:"tidb-write-throughput-limit"` + EngineMemCacheSize ByteSize `toml:"engine-mem-cache-size" json:"engine-mem-cache-size"` + LocalWriterMemCacheSize ByteSize `toml:"local-writer-mem-cache-size" json:"local-writer-mem-cache-size"` + StoreWriteBWLimit ByteSize `toml:"store-write-bwlimit" json:"store-write-bwlimit"` + LogicalImportBatchSize ByteSize `toml:"logical-import-batch-size" json:"logical-import-batch-size"` + LogicalImportBatchRows int `toml:"logical-import-batch-rows" json:"logical-import-batch-rows"` // default is PausePDSchedulerScopeTable to compatible with previous version(>= 6.1) PausePDSchedulerScope PausePDSchedulerScope `toml:"pause-pd-scheduler-scope" json:"pause-pd-scheduler-scope"` @@ -1097,10 +1099,15 @@ func (t *TikvImporter) adjust() error { } switch t.Backend { case BackendTiDB: - if t.TiDBWriteThroughputLimit <= 0 { + if t.LogicalImportBatchSize <= 0 { return common.ErrInvalidConfig.GenWithStack( - "`tikv-importer.tidb-write-throughput-limit` got %d, should be larger than 0", - t.TiDBWriteThroughputLimit) + "`tikv-importer.logical-import-batch-size` got %d, should be larger than 0", + t.LogicalImportBatchSize) + } + if t.LogicalImportBatchRows <= 0 { + return common.ErrInvalidConfig.GenWithStack( + "`tikv-importer.logical-import-batch-rows` got %d, should be larger than 0", + t.LogicalImportBatchRows) } t.DuplicateResolution = DupeResAlgNone case BackendLocal: @@ -1465,19 +1472,20 @@ func NewConfig() *Config { DataInvalidCharReplace: string(defaultCSVDataInvalidCharReplace), }, TikvImporter: TikvImporter{ - Backend: "", - MaxKVPairs: 4096, - SendKVPairs: 32768, - SendKVSize: KVWriteBatchSize, - RegionSplitSize: 0, - RegionSplitBatchSize: DefaultRegionSplitBatchSize, - RegionSplitConcurrency: runtime.GOMAXPROCS(0), - RegionCheckBackoffLimit: DefaultRegionCheckBackoffLimit, - DiskQuota: ByteSize(math.MaxInt64), - DuplicateResolution: DupeResAlgNone, - PausePDSchedulerScope: PausePDSchedulerScopeTable, - BlockSize: 16 * 1024, - TiDBWriteThroughputLimit: ByteSize(DefaultTiDBWriteThroughputLimit), + Backend: "", + MaxKVPairs: 4096, + SendKVPairs: 32768, + SendKVSize: KVWriteBatchSize, + RegionSplitSize: 0, + RegionSplitBatchSize: DefaultRegionSplitBatchSize, + RegionSplitConcurrency: runtime.GOMAXPROCS(0), + RegionCheckBackoffLimit: DefaultRegionCheckBackoffLimit, + DiskQuota: ByteSize(math.MaxInt64), + DuplicateResolution: DupeResAlgNone, + PausePDSchedulerScope: PausePDSchedulerScopeTable, + BlockSize: 16 * 1024, + LogicalImportBatchSize: ByteSize(defaultLogicalImportBatchSize), + LogicalImportBatchRows: defaultLogicalImportBatchRows, }, PostRestore: PostRestore{ Checksum: OpLevelRequired, diff --git a/br/pkg/lightning/importer/import.go b/br/pkg/lightning/importer/import.go index dbe101e331fe9..1d243926dcf96 100644 --- a/br/pkg/lightning/importer/import.go +++ b/br/pkg/lightning/importer/import.go @@ -344,7 +344,7 @@ func NewImportControllerWithPauser( switch cfg.TikvImporter.Backend { case config.BackendTiDB: encodingBuilder = tidb.NewEncodingBuilder() - backendObj = tidb.NewTiDBBackend(ctx, db, cfg.Conflict, errorMgr, int(cfg.TikvImporter.TiDBWriteThroughputLimit)) + backendObj = tidb.NewTiDBBackend(ctx, db, cfg, errorMgr) case config.BackendLocal: var rLimit local.RlimT rLimit, err = local.GetSystemRLimit() diff --git a/br/tidb-lightning.toml b/br/tidb-lightning.toml index cd5d3cbbc7f7d..1f85163c6ae53 100644 --- a/br/tidb-lightning.toml +++ b/br/tidb-lightning.toml @@ -140,12 +140,18 @@ addr = "127.0.0.1:8287" #local-writer-mem-cache-size = '128MiB' # Limit the write bandwidth to each tikv store. The unit is 'Bytes per second'. 0 means no limit. #store-write-bwlimit = 0 -# Limit the throughput to downstream TiDB server in logical mode (TiDB backend). +# Limit the size of each SQL query executed on downstream TiDB server in logical mode (TiDB backend). # This is the desired length of the VALUES part of each INSERT/REPLACE statement executed in a single transaction. # This is not a hard limit; the actual SQL executed may be longer or shorter depending on the actual content imported. -# The default is 1.0 MiB which is optimized for import speed when Lightning is the only client of the cluster. +# The default value is optimized for import speed when Lightning is the only client of the cluster. +# Because of implementation details of Lightning, the value is capped at 96 KiB and larger values are ignored. # This value may be decreased to reduce the stress on the cluster due to large transaction. -#tidb-write-throughput-limit = '1MiB' +#logical-import-batch-size = '96KiB' +# Limit the maximum number of rows inserted per transaction in logical mode (TiDB backend). +# When both `logical-import-batch-size` and `logical-import-batch-rows` are defined, +# the rows will be split in a way to respect both settings. +# This value may be decreased to reduce the stress on the cluster due to large transaction. +#logical-import-batch-rows = 65536 [mydumper] # block size of file reading From 0c5272e636f7dad0883192a9e09cbe35a720911e Mon Sep 17 00:00:00 2001 From: kennytm Date: Fri, 12 Jan 2024 00:33:23 +0800 Subject: [PATCH 5/6] lightning: fix BAZEL --- br/pkg/lightning/backend/kv/BUILD.bazel | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/lightning/backend/kv/BUILD.bazel b/br/pkg/lightning/backend/kv/BUILD.bazel index a15f31d90f2e8..0bad4b0f5d7b9 100644 --- a/br/pkg/lightning/backend/kv/BUILD.bazel +++ b/br/pkg/lightning/backend/kv/BUILD.bazel @@ -56,7 +56,7 @@ go_test( embed = [":kv"], flaky = True, race = "on", - shard_count = 19, + shard_count = 18, deps = [ "//br/pkg/lightning/backend/encode", "//br/pkg/lightning/common", From 20a8e9f1f19287cb47da40748d1c22cd69a90c24 Mon Sep 17 00:00:00 2001 From: kennytm Date: Fri, 12 Jan 2024 20:09:33 +0800 Subject: [PATCH 6/6] lightning: update config name in test cases too --- br/tests/lightning_tidb_duplicate_data/error.toml | 2 +- br/tests/lightning_tidb_duplicate_data/ignore.toml | 2 +- br/tests/lightning_tidb_duplicate_data/replace.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/br/tests/lightning_tidb_duplicate_data/error.toml b/br/tests/lightning_tidb_duplicate_data/error.toml index bb6d84aeb38e6..ae201b501ccbb 100644 --- a/br/tests/lightning_tidb_duplicate_data/error.toml +++ b/br/tests/lightning_tidb_duplicate_data/error.toml @@ -1,4 +1,4 @@ [tikv-importer] backend = "tidb" on-duplicate = "error" -tidb-write-throughput-limit = 1 +logical-import-batch-rows = 1 diff --git a/br/tests/lightning_tidb_duplicate_data/ignore.toml b/br/tests/lightning_tidb_duplicate_data/ignore.toml index 31a20e811b871..1cfd526c1acb7 100644 --- a/br/tests/lightning_tidb_duplicate_data/ignore.toml +++ b/br/tests/lightning_tidb_duplicate_data/ignore.toml @@ -1,4 +1,4 @@ [tikv-importer] backend = "tidb" on-duplicate = "ignore" -tidb-write-throughput-limit = 1 +logical-import-batch-rows = 1 diff --git a/br/tests/lightning_tidb_duplicate_data/replace.toml b/br/tests/lightning_tidb_duplicate_data/replace.toml index e0108d48a4e8c..5e3b7b9993d63 100644 --- a/br/tests/lightning_tidb_duplicate_data/replace.toml +++ b/br/tests/lightning_tidb_duplicate_data/replace.toml @@ -1,4 +1,4 @@ [tikv-importer] backend = "tidb" on-duplicate = "replace" -tidb-write-throughput-limit = 1 +logical-import-batch-rows = 1