Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: support auto_random column in composite primary key #41463

Merged
merged 4 commits into from
Feb 16, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 24 additions & 28 deletions br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ type genCol struct {
type autoIDConverter func(int64) int64

type tableKVEncoder struct {
tbl table.Table
se *session
recordCache []types.Datum
genCols []genCol
tbl table.Table
autoRandomColID int64
se *session
recordCache []types.Datum
genCols []genCol
// convert auto id for shard rowid or auto random id base on row id generated by lightning
autoIDFn autoIDConverter
metrics *metric.Metrics
Expand All @@ -84,17 +85,16 @@ func NewTableKVEncoder(
recordCtx := tables.NewCommonAddRecordCtx(len(cols))
tables.SetAddRecordCtx(se, recordCtx)

var autoRandomColID int64
autoIDFn := func(id int64) int64 { return id }
if meta.PKIsHandle && meta.ContainsAutoRandomBits() {
for _, col := range cols {
if mysql.HasPriKeyFlag(col.GetFlag()) {
shardFmt := autoid.NewShardIDFormat(&col.FieldType, meta.AutoRandomBits, meta.AutoRandomRangeBits)
shard := rand.New(rand.NewSource(options.AutoRandomSeed)).Int63()
autoIDFn = func(id int64) int64 {
return shardFmt.Compose(shard, id)
}
break
}
if meta.ContainsAutoRandomBits() {
col := common.GetAutoRandomColumn(meta)
autoRandomColID = col.ID

shardFmt := autoid.NewShardIDFormat(&col.FieldType, meta.AutoRandomBits, meta.AutoRandomRangeBits)
shard := rand.New(rand.NewSource(options.AutoRandomSeed)).Int63()
autoIDFn = func(id int64) int64 {
return shardFmt.Compose(shard, id)
}
} else if meta.ShardRowIDBits > 0 {
rd := rand.New(rand.NewSource(options.AutoRandomSeed)) // nolint:gosec
Expand All @@ -114,11 +114,12 @@ func NewTableKVEncoder(
}

return &tableKVEncoder{
tbl: tbl,
se: se,
genCols: genCols,
autoIDFn: autoIDFn,
metrics: metrics,
tbl: tbl,
autoRandomColID: autoRandomColID,
se: se,
genCols: genCols,
autoIDFn: autoIDFn,
metrics: metrics,
}, nil
}

Expand Down Expand Up @@ -380,7 +381,7 @@ func (kvcodec *tableKVEncoder) Encode(

record = append(record, value)

if isTableAutoRandom(meta) && isPKCol(col.ToInfo()) {
if kvcodec.isAutoRandomCol(col.ToInfo()) {
shardFmt := autoid.NewShardIDFormat(&col.FieldType, meta.AutoRandomBits, meta.AutoRandomRangeBits)
alloc := kvcodec.tbl.Allocators(kvcodec.se).Get(autoid.AutoRandomType)
if err := alloc.Rebase(context.Background(), value.GetInt64()&shardFmt.IncrementalMask(), false); err != nil {
Expand Down Expand Up @@ -438,18 +439,14 @@ func (kvcodec *tableKVEncoder) Encode(
return kvPairs, nil
}

func isTableAutoRandom(tblMeta *model.TableInfo) bool {
return tblMeta.PKIsHandle && tblMeta.ContainsAutoRandomBits()
func (kvcodec *tableKVEncoder) isAutoRandomCol(col *model.ColumnInfo) bool {
return kvcodec.tbl.Meta().ContainsAutoRandomBits() && col.ID == kvcodec.autoRandomColID
}

func isAutoIncCol(colInfo *model.ColumnInfo) bool {
return mysql.HasAutoIncrementFlag(colInfo.GetFlag())
}

func isPKCol(colInfo *model.ColumnInfo) bool {
return mysql.HasPriKeyFlag(colInfo.GetFlag())
}

// GetEncoderIncrementalID return Auto increment id.
func GetEncoderIncrementalID(encoder Encoder, id int64) int64 {
return encoder.(*tableKVEncoder).autoIDFn(id)
Expand All @@ -471,7 +468,6 @@ func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDa
err error
)

tblMeta := kvcodec.tbl.Meta()
cols := kvcodec.tbl.Cols()

// Since this method is only called when iterating the columns in the `Encode()` method,
Expand All @@ -494,7 +490,7 @@ func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDa
case isAutoIncCol(col.ToInfo()):
// we still need a conversion, e.g. to catch overflow with a TINYINT column.
value, err = table.CastValue(kvcodec.se, types.NewIntDatum(rowID), col.ToInfo(), false, false)
case isTableAutoRandom(tblMeta) && isPKCol(col.ToInfo()):
case kvcodec.isAutoRandomCol(col.ToInfo()):
var val types.Datum
realRowID := kvcodec.autoIDFn(rowID)
if mysql.HasUnsignedFlag(col.GetFlag()) {
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/lightning/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"//errno",
"//parser/model",
"//store/driver/error",
"//table/tables",
"//util",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_pingcap_errors//:errors",
Expand Down Expand Up @@ -99,8 +100,10 @@ go_test(
"//br/pkg/errors",
"//br/pkg/lightning/log",
"//errno",
"//parser",
"//store/driver/error",
"//testkit/testsetup",
"//util/dbutil",
"@com_github_data_dog_go_sqlmock//:go-sqlmock",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_pingcap_errors//:errors",
Expand Down
20 changes: 20 additions & 0 deletions br/pkg/lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
tmysql "github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/table/tables"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -416,3 +417,22 @@ func StringSliceEqual(a, b []string) bool {
}
return true
}

// GetAutoRandomColumn return the column with auto_random, return nil if the table doesn't have it.
// todo: better put in ddl package, but this will cause import cycle since ddl package import lightning
func GetAutoRandomColumn(tblInfo *model.TableInfo) *model.ColumnInfo {
if !tblInfo.ContainsAutoRandomBits() {
return nil
}
if tblInfo.PKIsHandle {
return tblInfo.GetPkColInfo()
} else if tblInfo.IsCommonHandle {
pk := tables.FindPrimaryIndex(tblInfo)
if pk == nil {
return nil
}
offset := pk.Columns[0].Offset
return tblInfo.Columns[offset]
}
return nil
}
27 changes: 27 additions & 0 deletions br/pkg/lightning/common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/util/dbutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -179,3 +181,28 @@ func TestInterpolateMySQLString(t *testing.T) {
assert.Equal(t, "'1''23'", common.InterpolateMySQLString("1'23"))
assert.Equal(t, "'1''2''''3'", common.InterpolateMySQLString("1'2''3"))
}

func TestGetAutoRandomColumn(t *testing.T) {
tests := []struct {
ddl string
colName string
}{
{"create table t(c int)", ""},
{"create table t(c int auto_increment)", ""},
{"create table t(c bigint auto_random primary key)", "c"},
{"create table t(a int, c bigint auto_random primary key)", "c"},
{"create table t(c bigint auto_random, a int, primary key(c,a))", "c"},
{"create table t(a int, c bigint auto_random, primary key(c,a))", "c"},
}
p := parser.New()
for _, tt := range tests {
tableInfo, err := dbutil.GetTableInfoBySQL(tt.ddl, p)
require.NoError(t, err)
col := common.GetAutoRandomColumn(tableInfo)
if tt.colName == "" {
require.Nil(t, col, tt.ddl)
} else {
require.Equal(t, tt.colName, col.Name.L, tt.ddl)
}
}
}
3 changes: 2 additions & 1 deletion br/pkg/lightning/restore/precheck_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1001,9 +1001,10 @@ func (ci *schemaCheckItem) SchemaIsValid(ctx context.Context, tableInfo *mydump.

core := info.Core
defaultCols := make(map[string]struct{})
autoRandomCol := common.GetAutoRandomColumn(core)
for _, col := range core.Columns {
// we can extend column the same with columns with default values
if _, isExtendCol := fullExtendColsSet[col.Name.O]; isExtendCol || hasDefault(col) || (info.Core.ContainsAutoRandomBits() && mysql.HasPriKeyFlag(col.GetFlag())) {
if _, isExtendCol := fullExtendColsSet[col.Name.O]; isExtendCol || hasDefault(col) || (autoRandomCol != nil && autoRandomCol.ID == col.ID) {
// this column has default value or it's auto random id, so we can ignore it
defaultCols[col.Name.L] = struct{}{}
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1804,7 +1804,7 @@ func (tr *TableRestore) restoreTable(
web.BroadcastTableCheckpoint(tr.tableName, cp)

// rebase the allocator so it exceeds the number of rows.
if tr.tableInfo.Core.PKIsHandle && tr.tableInfo.Core.ContainsAutoRandomBits() {
if tr.tableInfo.Core.ContainsAutoRandomBits() {
cp.AllocBase = mathutil.Max(cp.AllocBase, tr.tableInfo.Core.AutoRandID)
if err := tr.alloc.Get(autoid.AutoRandomType).Rebase(context.Background(), cp.AllocBase, false); err != nil {
return false, err
Expand Down Expand Up @@ -2518,7 +2518,7 @@ func saveCheckpoint(rc *Controller, t *TableRestore, engineID int32, chunk *chec
// or integer primary key), which can only be obtained by reading all data.

var base int64
if t.tableInfo.Core.PKIsHandle && t.tableInfo.Core.ContainsAutoRandomBits() {
if t.tableInfo.Core.ContainsAutoRandomBits() {
base = t.alloc.Get(autoid.AutoRandomType).Base() + 1
} else {
base = t.alloc.Get(autoid.RowIDAllocType).Base() + 1
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,8 +749,8 @@ func (tr *TableRestore) postProcess(
rc.alterTableLock.Lock()
tblInfo := tr.tableInfo.Core
var err error
if tblInfo.PKIsHandle && tblInfo.ContainsAutoRandomBits() {
ft := &tblInfo.GetPkColInfo().FieldType
if tblInfo.ContainsAutoRandomBits() {
ft := &common.GetAutoRandomColumn(tblInfo).FieldType
shardFmt := autoid.NewShardIDFormat(ft, tblInfo.AutoRandomBits, tblInfo.AutoRandomRangeBits)
maxCap := shardFmt.IncrementalBitsCapacity()
err = AlterAutoRandom(ctx, rc.tidbGlue.GetSQLExecutor(), tr.tableName, uint64(tr.alloc.Get(autoid.AutoRandomType).Base())+1, maxCap)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"a","b"
1,11
2,22
27 changes: 27 additions & 0 deletions br/tests/lightning_auto_random_default/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,30 @@ for backend in tidb local; do
run_sql "SELECT max(id & b'000001111111111111111111111111111111111111111111111111111111111') >= $NEXT_AUTO_RAND_VAL as ge FROM auto_random.t"
check_contains 'ge: 1'
done

function run_for_auro_random_data2() {
create_table=$1
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
run_sql 'DROP DATABASE IF EXISTS auto_random;'
run_sql 'CREATE DATABASE IF NOT EXISTS auto_random;'
run_sql "$create_table"
run_lightning --backend $backend -d "tests/$TEST_NAME/data2"
run_sql 'select count(*) as count from auto_random.t where c > 0'
check_contains "count: 2"
run_sql 'select count(*) as count from auto_random.t where a=1 and b=11'
check_contains "count: 1"
run_sql 'select count(*) as count from auto_random.t where a=2 and b=22'
check_contains "count: 1"
}

for backend in tidb local; do
if [ "$backend" = 'local' ]; then
check_cluster_version 4 0 0 'local backend' || continue
fi

run_for_auro_random_data2 'create table auto_random.t(c bigint auto_random primary key, a int, b int)'
run_for_auro_random_data2 'create table auto_random.t(a int, b int, c bigint auto_random primary key)'
# composite key and auto_random is the first column
run_for_auro_random_data2 'create table auto_random.t(c bigint auto_random, a int, b int, primary key(c, a))'
# composite key and auto_random is not the first column
run_for_auro_random_data2 'create table auto_random.t(a int, b int, c bigint auto_random, primary key(c, a))'
done