Skip to content

Commit

Permalink
lightning: change MinRowID because ADD UNIQUE INDEX may be smaller (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Mar 21, 2024
1 parent a04fed9 commit b91a1b3
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 24 deletions.
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ go_library(
"//pkg/tablecodec",
"//pkg/types",
"//pkg/util/chunk",
"//pkg/util/codec",
"//pkg/util/topsql/stmtstats",
"@com_github_docker_go_units//:go-units",
"@com_github_pingcap_errors//:errors",
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/kv/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/codec"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
Expand Down Expand Up @@ -201,7 +202,7 @@ func (e *BaseKVEncoder) Record2KV(record, originalRow []types.Datum, rowID int64
kvPairs := e.SessionCtx.TakeKvPairs()
for i := 0; i < len(kvPairs.Pairs); i++ {
var encoded [9]byte // The max length of encoded int64 is 9.
kvPairs.Pairs[i].RowID = common.EncodeIntRowIDToBuf(encoded[:0], rowID)
kvPairs.Pairs[i].RowID = codec.EncodeComparableVarint(encoded[:0], rowID)
}
e.recordCache = record[:0]
return kvPairs, nil
Expand Down
9 changes: 0 additions & 9 deletions br/pkg/lightning/backend/local/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,6 @@ type dupDBIter struct {
err error
}

func (d *dupDBIter) Seek(key []byte) bool {
rawKey := d.keyAdapter.Encode(nil, key, common.ZeroRowID)
if d.err != nil || !d.iter.SeekGE(rawKey) {
return false
}
d.curKey, d.err = d.keyAdapter.Decode(d.curKey[:0], d.iter.Key())
return d.err == nil
}

func (d *dupDBIter) Error() error {
if d.err != nil {
return d.err
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/lightning/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ go_test(
],
embed = [":common"],
flaky = True,
shard_count = 28,
shard_count = 29,
deps = [
"//br/pkg/errors",
"//br/pkg/lightning/log",
Expand All @@ -127,6 +127,8 @@ go_test(
"//pkg/store/driver/error",
"//pkg/store/mockstore",
"//pkg/testkit/testsetup",
"//pkg/types",
"//pkg/util/codec",
"//pkg/util/dbutil",
"//pkg/util/mock",
"@com_github_data_dog_go_sqlmock//:go-sqlmock",
Expand Down
7 changes: 2 additions & 5 deletions br/pkg/lightning/common/key_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package common

import (
"math"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/util/codec"
)
Expand Down Expand Up @@ -114,8 +112,7 @@ func (DupDetectKeyAdapter) EncodedLen(key []byte, rowID []byte) int {

var _ KeyAdapter = DupDetectKeyAdapter{}

// static vars for rowID
var (
MinRowID = EncodeIntRowID(math.MinInt64)
ZeroRowID = EncodeIntRowID(0)
// MinRowID is the minimum rowID value after DupDetectKeyAdapter.Encode().
MinRowID = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0}
)
55 changes: 53 additions & 2 deletions br/pkg/lightning/common/key_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ import (
"math"
"sort"
"testing"
"time"
"unsafe"

"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/stretchr/testify/require"
)

Expand All @@ -34,8 +38,9 @@ func randBytes(n int) []byte {
func TestNoopKeyAdapter(t *testing.T) {
keyAdapter := NoopKeyAdapter{}
key := randBytes(32)
require.Len(t, key, keyAdapter.EncodedLen(key, ZeroRowID))
encodedKey := keyAdapter.Encode(nil, key, ZeroRowID)
rowID := randBytes(8)
require.Len(t, key, keyAdapter.EncodedLen(key, rowID))
encodedKey := keyAdapter.Encode(nil, key, rowID)
require.Equal(t, key, encodedKey)

decodedKey, err := keyAdapter.Decode(nil, encodedKey)
Expand Down Expand Up @@ -159,3 +164,49 @@ func TestDecodeKeyDstIsInsufficient(t *testing.T) {
require.Equal(t, key, buf2[4:])
}
}

func TestMinRowID(t *testing.T) {
keyApapter := DupDetectKeyAdapter{}
key := []byte("key")
val := []byte("val")
shouldBeMin := keyApapter.Encode(key, val, MinRowID)

rowIDs := make([][]byte, 0, 20)

// DDL

rowIDs = append(rowIDs, kv.IntHandle(math.MinInt64).Encoded())
rowIDs = append(rowIDs, kv.IntHandle(-1).Encoded())
rowIDs = append(rowIDs, kv.IntHandle(0).Encoded())
rowIDs = append(rowIDs, kv.IntHandle(math.MaxInt64).Encoded())
handleData := []types.Datum{
types.NewIntDatum(math.MinInt64),
types.NewIntDatum(-1),
types.NewIntDatum(0),
types.NewIntDatum(math.MaxInt64),
types.NewBytesDatum(make([]byte, 1)),
types.NewBytesDatum(make([]byte, 7)),
types.NewBytesDatum(make([]byte, 8)),
types.NewBytesDatum(make([]byte, 9)),
types.NewBytesDatum(make([]byte, 100)),
}
for _, d := range handleData {
encodedKey, err := codec.EncodeKey(time.Local, nil, d)
require.NoError(t, err)
ch, err := kv.NewCommonHandle(encodedKey)
require.NoError(t, err)
rowIDs = append(rowIDs, ch.Encoded())
}

// lightning, IMPORT INTO, ...

numRowIDs := []int64{math.MinInt64, -1, 0, math.MaxInt64}
for _, id := range numRowIDs {
rowIDs = append(rowIDs, codec.EncodeComparableVarint(nil, id))
}

for _, id := range rowIDs {
bs := keyApapter.Encode(key, val, id)
require.True(t, bytes.Compare(bs, shouldBeMin) >= 0)
}
}
13 changes: 7 additions & 6 deletions br/pkg/lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,15 +461,16 @@ type KvPair struct {
Key []byte
// Val is the value of the KV pair
Val []byte
// RowID identifies a KvPair in case two KvPairs are equal in Key and Val. It's
// often set to the file offset of the KvPair in the source file or the record
// handle.
// RowID identifies a KvPair in case two KvPairs are equal in Key and Val. It has
// two sources:
//
// When the KvPair is generated from ADD INDEX, the RowID is the encoded handle.
//
// Otherwise, the RowID is related to the row number in the source files, and
// encode the number with `codec.EncodeComparableVarint`.
RowID []byte
}

// EncodeIntRowIDToBuf encodes an int64 row id to a buffer.
var EncodeIntRowIDToBuf = codec.EncodeComparableVarint

// EncodeIntRowID encodes an int64 row id.
func EncodeIntRowID(rowID int64) []byte {
return codec.EncodeComparableVarint(nil, rowID)
Expand Down
12 changes: 12 additions & 0 deletions tests/realtikvtest/addindextest/add_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,15 @@ func TestIssue51162(t *testing.T) {
tk.MustExec("alter table tl add index idx_16(`col_48`,(cast(`col_45` as signed array)),`col_46`(5));")
tk.MustExec("admin check table tl")
}

func TestAddUKWithSmallIntHandles(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists small;")
tk.MustExec("create database small;")
tk.MustExec("use small;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=1;`)
tk.MustExec("create table t (a bigint, b int, primary key (a) clustered)")
tk.MustExec("insert into t values (-9223372036854775808, 1),(-9223372036854775807, 1)")
tk.MustContainErrMsg("alter table t add unique index uk(b)", "Duplicate entry '1' for key 't.uk'")
}

0 comments on commit b91a1b3

Please sign in to comment.