Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: allow configure the desired size and number of rows of each INSERT statement for logical mode #46997

Merged
merged 8 commits into from
Feb 18, 2024
5 changes: 0 additions & 5 deletions br/pkg/lightning/backend/encode/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,6 @@ type SessionOptions struct {

// Rows represents a collection of encoded rows.
type Rows interface {
// SplitIntoChunks splits the rows into multiple consecutive parts, each
// part having total byte size less than `splitSize`. The meaning of "byte
// size" should be consistent with the value used in `Row.ClassifyAndAppend`.
SplitIntoChunks(splitSize int) []Rows

// Clear returns a new collection with empty content. It may share the
// capacity with the current instance. The typical usage is `x = x.Clear()`.
Clear() Rows
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ go_test(
embed = [":kv"],
flaky = True,
race = "on",
shard_count = 19,
shard_count = 18,
deps = [
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/common",
Expand Down
31 changes: 0 additions & 31 deletions br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,37 +321,6 @@ func (kvs *Pairs) ClassifyAndAppend(
*indices = indexKVs
}

// SplitIntoChunks splits the key-value pairs into chunks.
func (kvs *Pairs) SplitIntoChunks(splitSize int) []encode.Rows {
if len(kvs.Pairs) == 0 {
return nil
}

res := make([]encode.Rows, 0, 1)
i := 0
cumSize := 0
for j, pair := range kvs.Pairs {
size := len(pair.Key) + len(pair.Val)
if i < j && cumSize+size > splitSize {
res = append(res, &Pairs{Pairs: kvs.Pairs[i:j]})
i = j
cumSize = 0
}
cumSize += size
}

if i == 0 {
res = append(res, kvs)
} else {
res = append(res, &Pairs{
Pairs: kvs.Pairs[i:],
BytesBuf: kvs.BytesBuf,
MemBuf: kvs.MemBuf,
})
}
return res
}

// Clear clears the key-value pairs.
func (kvs *Pairs) Clear() encode.Rows {
if kvs.BytesBuf != nil {
Expand Down
47 changes: 0 additions & 47 deletions br/pkg/lightning/backend/kv/sql2kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,53 +570,6 @@ func TestShardRowId(t *testing.T) {
require.Equal(t, tbl.Allocators(lkv.GetSession4test(encoder)).Get(autoid.RowIDAllocType).Base(), int64(32))
}

func TestSplitIntoChunks(t *testing.T) {
pairs := []common.KvPair{
{
Key: []byte{1, 2, 3},
Val: []byte{4, 5, 6},
},
{
Key: []byte{7, 8},
Val: []byte{9, 0},
},
{
Key: []byte{1, 2, 3, 4},
Val: []byte{5, 6, 7, 8},
},
{
Key: []byte{9, 0},
Val: []byte{1, 2},
},
}

splitBy10 := lkv.MakeRowsFromKvPairs(pairs).SplitIntoChunks(10)
require.Equal(t, splitBy10, []encode.Rows{
lkv.MakeRowsFromKvPairs(pairs[0:2]),
lkv.MakeRowsFromKvPairs(pairs[2:3]),
lkv.MakeRowsFromKvPairs(pairs[3:4]),
})

splitBy12 := lkv.MakeRowsFromKvPairs(pairs).SplitIntoChunks(12)
require.Equal(t, splitBy12, []encode.Rows{
lkv.MakeRowsFromKvPairs(pairs[0:2]),
lkv.MakeRowsFromKvPairs(pairs[2:4]),
})

splitBy1000 := lkv.MakeRowsFromKvPairs(pairs).SplitIntoChunks(1000)
require.Equal(t, splitBy1000, []encode.Rows{
lkv.MakeRowsFromKvPairs(pairs[0:4]),
})

splitBy1 := lkv.MakeRowsFromKvPairs(pairs).SplitIntoChunks(1)
require.Equal(t, splitBy1, []encode.Rows{
lkv.MakeRowsFromKvPairs(pairs[0:1]),
lkv.MakeRowsFromKvPairs(pairs[1:2]),
lkv.MakeRowsFromKvPairs(pairs[2:3]),
lkv.MakeRowsFromKvPairs(pairs[3:4]),
})
}

func TestClassifyAndAppend(t *testing.T) {
kvs := lkv.MakeRowFromKvPairs([]common.KvPair{
{
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/tidb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ go_test(
timeout = "short",
srcs = ["tidb_test.go"],
flaky = True,
shard_count = 14,
shard_count = 15,
deps = [
":tidb",
"//br/pkg/lightning/backend",
Expand Down
40 changes: 19 additions & 21 deletions br/pkg/lightning/backend/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ type tidbBackend struct {
// view should be the same.
onDuplicate string
errorMgr *errormanager.ErrorManager
// maxChunkSize and maxChunkRows are the target size and number of rows of each INSERT SQL
// statement to be sent to downstream. Sometimes we want to reduce the txn size to avoid
// affecting the cluster too much.
maxChunkSize uint64
maxChunkRows int
}

var _ backend.Backend = (*tidbBackend)(nil)
Expand All @@ -283,9 +288,10 @@ var _ backend.Backend = (*tidbBackend)(nil)
func NewTiDBBackend(
ctx context.Context,
db *sql.DB,
conflict config.Conflict,
cfg *config.Config,
errorMgr *errormanager.ErrorManager,
) backend.Backend {
conflict := cfg.Conflict
var onDuplicate string
switch conflict.Strategy {
case config.ErrorOnDup:
Expand All @@ -305,10 +311,12 @@ func NewTiDBBackend(
onDuplicate = config.ErrorOnDup
}
return &tidbBackend{
db: db,
conflictCfg: conflict,
onDuplicate: onDuplicate,
errorMgr: errorMgr,
db: db,
conflictCfg: conflict,
onDuplicate: onDuplicate,
errorMgr: errorMgr,
maxChunkSize: uint64(cfg.TikvImporter.LogicalImportBatchSize),
maxChunkRows: cfg.TikvImporter.LogicalImportBatchRows,
}
}

Expand All @@ -329,18 +337,17 @@ func (row tidbRow) ClassifyAndAppend(data *encode.Rows, checksum *verification.K
checksum.Add(&cs)
}

func (rows tidbRows) SplitIntoChunks(splitSizeInt int) []encode.Rows {
func (rows tidbRows) splitIntoChunks(splitSize uint64, splitRows int) []tidbRows {
if len(rows) == 0 {
return nil
}

res := make([]encode.Rows, 0, 1)
res := make([]tidbRows, 0, 1)
i := 0
cumSize := uint64(0)
splitSize := uint64(splitSizeInt)

for j, row := range rows {
if i < j && cumSize+row.Size() > splitSize {
if i < j && (cumSize+row.Size() > splitSize || j-i >= splitRows) {
res = append(res, rows[i:j])
i = j
cumSize = 0
Expand Down Expand Up @@ -581,13 +588,6 @@ func (*tidbBackend) RetryImportDelay() time.Duration {
return 0
}

func (*tidbBackend) MaxChunkSize() int {
failpoint.Inject("FailIfImportedSomeRows", func() {
failpoint.Return(1)
})
return 1048576
}

func (*tidbBackend) ShouldPostProcess() bool {
return true
}
Expand All @@ -611,7 +611,7 @@ func (*tidbBackend) ImportEngine(context.Context, uuid.UUID, int64, int64) error
func (be *tidbBackend) WriteRows(ctx context.Context, tableName string, columnNames []string, rows encode.Rows) error {
var err error
rowLoop:
for _, r := range rows.SplitIntoChunks(be.MaxChunkSize()) {
for _, r := range rows.(tidbRows).splitIntoChunks(be.maxChunkSize, be.maxChunkRows) {
for i := 0; i < writeRowsMaxRetryTimes; i++ {
// Write in the batch mode first.
err = be.WriteBatchRowsToDB(ctx, tableName, columnNames, r)
Expand Down Expand Up @@ -648,8 +648,7 @@ type stmtTask struct {
// WriteBatchRowsToDB write rows in batch mode, which will insert multiple rows like this:
//
// insert into t1 values (111), (222), (333), (444);
func (be *tidbBackend) WriteBatchRowsToDB(ctx context.Context, tableName string, columnNames []string, r encode.Rows) error {
rows := r.(tidbRows)
func (be *tidbBackend) WriteBatchRowsToDB(ctx context.Context, tableName string, columnNames []string, rows tidbRows) error {
insertStmt := be.checkAndBuildStmt(rows, tableName, columnNames)
if insertStmt == nil {
return nil
Expand Down Expand Up @@ -682,8 +681,7 @@ func (be *tidbBackend) checkAndBuildStmt(rows tidbRows, tableName string, column
// insert into t1 values (444);
//
// See more details in br#1366: https://github.com/pingcap/br/issues/1366
func (be *tidbBackend) WriteRowsToDB(ctx context.Context, tableName string, columnNames []string, r encode.Rows) error {
rows := r.(tidbRows)
func (be *tidbBackend) WriteRowsToDB(ctx context.Context, tableName string, columnNames []string, rows tidbRows) error {
insertStmt := be.checkAndBuildStmt(rows, tableName, columnNames)
if insertStmt == nil {
return nil
Expand Down
90 changes: 71 additions & 19 deletions br/pkg/lightning/backend/tidb/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func createMysqlSuite(t *testing.T) *mysqlSuite {
cfg.Conflict.Strategy = config.ReplaceOnDup
cfg.Conflict.Threshold = math.MaxInt64
cfg.Conflict.MaxRecordRows = 100
backendObj := tidb.NewTiDBBackend(context.Background(), db, cfg.Conflict, errormanager.New(nil, cfg, log.L()))
backendObj := tidb.NewTiDBBackend(context.Background(), db, cfg, errormanager.New(nil, cfg, log.L()))
return &mysqlSuite{
dbHandle: db,
mockDB: mock,
Expand Down Expand Up @@ -166,7 +166,7 @@ func TestWriteRowsIgnoreOnDup(t *testing.T) {
cfg.Conflict.Strategy = config.IgnoreOnDup
cfg.Conflict.Threshold = math.MaxInt64
cfg.Conflict.MaxRecordRows = 0
ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg.Conflict, errormanager.New(nil, cfg, logger))
ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg, errormanager.New(nil, cfg, logger))
engine, err := backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1)
require.NoError(t, err)

Expand All @@ -193,7 +193,7 @@ func TestWriteRowsIgnoreOnDup(t *testing.T) {
// test conflict.strategy == ignore and not 0 conflict.max-record-rows will use ErrorOnDup

cfg.Conflict.MaxRecordRows = 10
ignoreBackend = tidb.NewTiDBBackend(ctx, s.dbHandle, cfg.Conflict, errormanager.New(nil, cfg, logger))
ignoreBackend = tidb.NewTiDBBackend(ctx, s.dbHandle, cfg, errormanager.New(nil, cfg, logger))
engine, err = backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1)
require.NoError(t, err)

Expand Down Expand Up @@ -246,7 +246,7 @@ func TestWriteRowsErrorOnDup(t *testing.T) {
cfg.Conflict.Strategy = config.ErrorOnDup
cfg.Conflict.Threshold = math.MaxInt64
cfg.Conflict.MaxRecordRows = 0
ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg.Conflict, errormanager.New(nil, cfg, logger))
ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg, errormanager.New(nil, cfg, logger))
engine, err := backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1)
require.NoError(t, err)

Expand Down Expand Up @@ -536,9 +536,7 @@ func TestWriteRowsErrorNoRetry(t *testing.T) {
cfg.Conflict.Strategy = config.ErrorOnDup
cfg.Conflict.Threshold = 0
cfg.Conflict.MaxRecordRows = 0
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg.Conflict,
errormanager.New(s.dbHandle, cfg, log.L()),
)
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg, errormanager.New(s.dbHandle, cfg, log.L()))
encBuilder := tidb.NewEncodingBuilder()
dataRows := encodeRowsTiDB(t, encBuilder, s.tbl)
ctx := context.Background()
Expand Down Expand Up @@ -602,9 +600,7 @@ func TestWriteRowsErrorDowngradingAll(t *testing.T) {
cfg.Conflict.MaxRecordRows = 10
cfg.App.TaskInfoSchemaName = "tidb_lightning_errors"
cfg.App.MaxError.Type = *atomic.NewInt64(10)
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg.Conflict,
errormanager.New(s.dbHandle, cfg, log.L()),
)
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg, errormanager.New(s.dbHandle, cfg, log.L()))
encBuilder := tidb.NewEncodingBuilder()
dataRows := encodeRowsTiDB(t, encBuilder, s.tbl)
ctx := context.Background()
Expand Down Expand Up @@ -657,9 +653,7 @@ func TestWriteRowsErrorDowngradingExceedThreshold(t *testing.T) {
cfg.Conflict.MaxRecordRows = 10
cfg.App.TaskInfoSchemaName = "tidb_lightning_errors"
cfg.App.MaxError.Type = *atomic.NewInt64(3)
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg.Conflict,
errormanager.New(s.dbHandle, cfg, log.L()),
)
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg, errormanager.New(s.dbHandle, cfg, log.L()))
encBuilder := tidb.NewEncodingBuilder()
dataRows := encodeRowsTiDB(t, encBuilder, s.tbl)
ctx := context.Background()
Expand Down Expand Up @@ -699,9 +693,7 @@ func TestWriteRowsRecordOneError(t *testing.T) {
cfg.Conflict.Threshold = 0
cfg.Conflict.MaxRecordRows = 0
cfg.App.TaskInfoSchemaName = "tidb_lightning_errors"
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg.Conflict,
errormanager.New(s.dbHandle, cfg, log.L()),
)
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg, errormanager.New(s.dbHandle, cfg, log.L()))
encBuilder := tidb.NewEncodingBuilder()
dataRows := encodeRowsTiDB(t, encBuilder, s.tbl)
ctx := context.Background()
Expand All @@ -728,9 +720,7 @@ func TestDuplicateThreshold(t *testing.T) {
cfg.Conflict.Strategy = config.IgnoreOnDup
cfg.Conflict.Threshold = 5
cfg.Conflict.MaxRecordRows = 0
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg.Conflict,
errormanager.New(s.dbHandle, cfg, log.L()),
)
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg, errormanager.New(s.dbHandle, cfg, log.L()))
encBuilder := tidb.NewEncodingBuilder()
dataRows := encodeRowsTiDB(t, encBuilder, s.tbl)
ctx := context.Background()
Expand Down Expand Up @@ -851,3 +841,65 @@ func TestEncodeRowForRecord(t *testing.T) {
}, []int{0, -1, -1, -1, -1, -1, -1, -1, 1, 2, 3, -1, -1, -1})
require.Equal(t, row, "(5, \"test test\", \x00\x00\x00\xab\xcd\xef)")
}

// TestLogicalImportBatch tests that each INSERT statement is limited by both
// logical-import-batch-size and logical-import-batch-rows configurations. Here
// we ensure each INSERT statement has up to 5 rows *and* ~30 bytes of values.
func TestLogicalImportBatch(t *testing.T) {
s := createMysqlSuite(t)
defer s.TearDownTest(t)

s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1),(2),(4),(8),(16)\\E").
WillReturnResult(sqlmock.NewResult(5, 5))
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(32),(64),(128),(256),(512)\\E").
WillReturnResult(sqlmock.NewResult(5, 5))
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1024),(2048),(4096),(8192)\\E").
WillReturnResult(sqlmock.NewResult(4, 4))
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(16384),(32768),(65536),(131072)\\E").
WillReturnResult(sqlmock.NewResult(4, 4))
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(262144)\\E").
WillReturnResult(sqlmock.NewResult(1, 1))

ctx := context.Background()
logger := log.L()

cfg := config.NewConfig()
cfg.Conflict.Strategy = config.ErrorOnDup
cfg.TikvImporter.LogicalImportBatchSize = 30
cfg.TikvImporter.LogicalImportBatchRows = 5
ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg, errormanager.New(nil, cfg, logger))
encBuilder := tidb.NewEncodingBuilder()
encoder, err := encBuilder.NewEncoder(context.Background(), &encode.EncodingConfig{
Path: "1.csv",
Table: s.tbl,
Logger: log.L(),
})
require.NoError(t, err)

dataRows := encBuilder.MakeEmptyRows()
dataChecksum := verification.MakeKVChecksum(0, 0, 0)
indexRows := encBuilder.MakeEmptyRows()
indexChecksum := verification.MakeKVChecksum(0, 0, 0)
for i := int64(0); i < 19; i++ { // encode rows 1, 2, 4, 8, ..., 262144.
row, err := encoder.Encode(
[]types.Datum{types.NewIntDatum(1 << i)},
i,
[]int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1},
8*i,
)
require.NoError(t, err)
row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)
}

engine, err := backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1)
require.NoError(t, err)
writer, err := engine.LocalWriter(ctx, &backend.LocalWriterConfig{TableName: "`foo`.`bar`"})
require.NoError(t, err)
err = writer.AppendRows(ctx, []string{"a"}, dataRows)
require.NoError(t, err)
}
Loading