Skip to content

Commit

Permalink
br/lightning: change KvPair's row ID type from int64 to []bytes (
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta authored Mar 1, 2023
1 parent 24c24a8 commit 17c1376
Show file tree
Hide file tree
Showing 12 changed files with 99 additions and 65 deletions.
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,8 @@ func (kvcodec *tableKVEncoder) Encode(
}
kvPairs := kvcodec.se.takeKvPairs()
for i := 0; i < len(kvPairs.pairs); i++ {
kvPairs.pairs[i].RowID = rowID
var encoded [9]byte // The max length of encoded int64 is 9.
kvPairs.pairs[i].RowID = common.EncodeIntRowIDToBuf(encoded[:0], rowID)
}
kvcodec.recordCache = record[:0]
return kvPairs, nil
Expand Down
16 changes: 8 additions & 8 deletions br/pkg/lightning/backend/kv/sql2kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestEncode(t *testing.T) {
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
Val: []uint8{0x8, 0x2, 0x8, 0x2},
RowID: 2,
RowID: common.EncodeIntRowID(2),
},
}))

Expand All @@ -140,7 +140,7 @@ func TestEncode(t *testing.T) {
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
Val: []uint8{0x8, 0x2, 0x8, 0xfe, 0x1},
RowID: 1,
RowID: common.EncodeIntRowID(1),
},
}))
}
Expand Down Expand Up @@ -274,7 +274,7 @@ func TestEncodeRowFormatV2(t *testing.T) {
0x1, 0x0, // not null offsets = [1]
0x7f, // column version = 127 (10000000 clamped to TINYINT)
},
RowID: 1,
RowID: common.EncodeIntRowID(1),
},
}))
}
Expand Down Expand Up @@ -313,7 +313,7 @@ func TestEncodeTimestamp(t *testing.T) {
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
Val: []uint8{0x8, 0x2, 0x9, 0x80, 0x80, 0x80, 0xf0, 0xfd, 0x8e, 0xf7, 0xc0, 0x19},
RowID: 70,
RowID: common.EncodeIntRowID(70),
},
}))
}
Expand Down Expand Up @@ -346,12 +346,12 @@ func TestEncodeDoubleAutoIncrement(t *testing.T) {
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x1, 0x8, 0x0, 0xbf, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
RowID: 70,
RowID: common.EncodeIntRowID(70),
},
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5, 0xbf, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
Val: []uint8{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
RowID: 70,
RowID: common.EncodeIntRowID(70),
},
}), pairsExpect)

Expand Down Expand Up @@ -459,7 +459,7 @@ func TestDefaultAutoRandoms(t *testing.T) {
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0},
RowID: 70,
RowID: common.EncodeIntRowID(70),
},
}))
require.Equal(t, tbl.Allocators(lkv.GetSession4test(encoder)).Get(autoid.AutoRandomType).Base(), int64(70))
Expand All @@ -470,7 +470,7 @@ func TestDefaultAutoRandoms(t *testing.T) {
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x47},
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0},
RowID: 71,
RowID: common.EncodeIntRowID(71),
},
}))
require.Equal(t, tbl.Allocators(lkv.GetSession4test(encoder)).Get(autoid.AutoRandomType).Base(), int64(71))
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,8 +699,8 @@ func (m *DuplicateManager) CollectDuplicateRowsFromDupDB(ctx context.Context, du
}

// Delete the key range in duplicate DB since we have the duplicates have been collected.
rawStartKey := keyAdapter.Encode(nil, task.StartKey, math.MinInt64)
rawEndKey := keyAdapter.Encode(nil, task.EndKey, math.MinInt64)
rawStartKey := keyAdapter.Encode(nil, task.StartKey, MinRowID)
rawEndKey := keyAdapter.Encode(nil, task.EndKey, MinRowID)
err = dupDB.DeleteRange(rawStartKey, rawEndKey, nil)
return errors.Trace(err)
})
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1070,7 +1070,7 @@ func (w *Writer) appendRowsSorted(kvs []common.KvPair) error {
keyAdapter := w.engine.keyAdapter
totalKeySize := 0
for i := 0; i < len(kvs); i++ {
keySize := keyAdapter.EncodedLen(kvs[i].Key)
keySize := keyAdapter.EncodedLen(kvs[i].Key, kvs[i].RowID)
w.batchSize += int64(keySize + len(kvs[i].Val))
totalKeySize += keySize
}
Expand Down Expand Up @@ -1107,7 +1107,7 @@ func (w *Writer) appendRowsUnsorted(ctx context.Context, kvs []common.KvPair) er
}
lastKey = pair.Key
w.batchSize += int64(len(pair.Key) + len(pair.Val))
buf := w.kvBuffer.AllocBytes(keyAdapter.EncodedLen(pair.Key))
buf := w.kvBuffer.AllocBytes(keyAdapter.EncodedLen(pair.Key, pair.RowID))
key := keyAdapter.Encode(buf[:0], pair.Key, pair.RowID)
val := w.kvBuffer.AddBytes(pair.Val)
if cnt < l {
Expand Down
13 changes: 6 additions & 7 deletions br/pkg/lightning/backend/local/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package local
import (
"bytes"
"context"
"math"

"github.com/cockroachdb/pebble"
sst "github.com/pingcap/kvproto/pkg/import_sstpb"
Expand Down Expand Up @@ -91,7 +90,7 @@ type dupDetectOpt struct {
}

func (d *dupDetectIter) Seek(key []byte) bool {
rawKey := d.keyAdapter.Encode(nil, key, 0)
rawKey := d.keyAdapter.Encode(nil, key, ZeroRowID)
if d.err != nil || !d.iter.SeekGE(rawKey) {
return false
}
Expand Down Expand Up @@ -209,10 +208,10 @@ func newDupDetectIter(ctx context.Context, db *pebble.DB, keyAdapter KeyAdapter,
opts *pebble.IterOptions, dupDB *pebble.DB, logger log.Logger, dupOpt dupDetectOpt) *dupDetectIter {
newOpts := &pebble.IterOptions{TableFilter: opts.TableFilter}
if len(opts.LowerBound) > 0 {
newOpts.LowerBound = keyAdapter.Encode(nil, opts.LowerBound, math.MinInt64)
newOpts.LowerBound = keyAdapter.Encode(nil, opts.LowerBound, MinRowID)
}
if len(opts.UpperBound) > 0 {
newOpts.UpperBound = keyAdapter.Encode(nil, opts.UpperBound, math.MinInt64)
newOpts.UpperBound = keyAdapter.Encode(nil, opts.UpperBound, MinRowID)
}
return &dupDetectIter{
ctx: ctx,
Expand All @@ -232,7 +231,7 @@ type dupDBIter struct {
}

func (d *dupDBIter) Seek(key []byte) bool {
rawKey := d.keyAdapter.Encode(nil, key, 0)
rawKey := d.keyAdapter.Encode(nil, key, ZeroRowID)
if d.err != nil || !d.iter.SeekGE(rawKey) {
return false
}
Expand Down Expand Up @@ -296,10 +295,10 @@ var _ Iter = &dupDBIter{}
func newDupDBIter(dupDB *pebble.DB, keyAdapter KeyAdapter, opts *pebble.IterOptions) *dupDBIter {
newOpts := &pebble.IterOptions{TableFilter: opts.TableFilter}
if len(opts.LowerBound) > 0 {
newOpts.LowerBound = keyAdapter.Encode(nil, opts.LowerBound, math.MinInt64)
newOpts.LowerBound = keyAdapter.Encode(nil, opts.LowerBound, MinRowID)
}
if len(opts.UpperBound) > 0 {
newOpts.UpperBound = keyAdapter.Encode(nil, opts.UpperBound, math.MinInt64)
newOpts.UpperBound = keyAdapter.Encode(nil, opts.UpperBound, MinRowID)
}
return &dupDBIter{
iter: dupDB.NewIter(newOpts),
Expand Down
34 changes: 24 additions & 10 deletions br/pkg/lightning/backend/local/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestDupDetectIterator(t *testing.T) {
pairs = append(pairs, common.KvPair{
Key: randBytes(32),
Val: randBytes(128),
RowID: prevRowMax,
RowID: common.EncodeIntRowID(prevRowMax),
})
prevRowMax++
}
Expand All @@ -47,13 +47,13 @@ func TestDupDetectIterator(t *testing.T) {
pairs = append(pairs, common.KvPair{
Key: key,
Val: randBytes(128),
RowID: prevRowMax,
RowID: common.EncodeIntRowID(prevRowMax),
})
prevRowMax++
pairs = append(pairs, common.KvPair{
Key: key,
Val: randBytes(128),
RowID: prevRowMax,
RowID: common.EncodeIntRowID(prevRowMax),
})
prevRowMax++
}
Expand All @@ -63,19 +63,19 @@ func TestDupDetectIterator(t *testing.T) {
pairs = append(pairs, common.KvPair{
Key: key,
Val: randBytes(128),
RowID: prevRowMax,
RowID: common.EncodeIntRowID(prevRowMax),
})
prevRowMax++
pairs = append(pairs, common.KvPair{
Key: key,
Val: randBytes(128),
RowID: prevRowMax,
RowID: common.EncodeIntRowID(prevRowMax),
})
prevRowMax++
pairs = append(pairs, common.KvPair{
Key: key,
Val: randBytes(128),
RowID: prevRowMax,
RowID: common.EncodeIntRowID(prevRowMax),
})
prevRowMax++
}
Expand Down Expand Up @@ -184,22 +184,22 @@ func TestDupDetectIterSeek(t *testing.T) {
{
Key: []byte{1, 2, 3, 0},
Val: randBytes(128),
RowID: 1,
RowID: common.EncodeIntRowID(1),
},
{
Key: []byte{1, 2, 3, 1},
Val: randBytes(128),
RowID: 2,
RowID: common.EncodeIntRowID(2),
},
{
Key: []byte{1, 2, 3, 1},
Val: randBytes(128),
RowID: 3,
RowID: common.EncodeIntRowID(3),
},
{
Key: []byte{1, 2, 3, 2},
Val: randBytes(128),
RowID: 4,
RowID: common.EncodeIntRowID(4),
},
}

Expand Down Expand Up @@ -227,3 +227,17 @@ func TestDupDetectIterSeek(t *testing.T) {
require.NoError(t, db.Close())
require.NoError(t, dupDB.Close())
}

func TestKeyAdapterEncoding(t *testing.T) {
keyAdapter := dupDetectKeyAdapter{}
srcKey := []byte{1, 2, 3}
v := keyAdapter.Encode(nil, srcKey, common.EncodeIntRowID(1))
resKey, err := keyAdapter.Decode(nil, v)
require.NoError(t, err)
require.EqualValues(t, srcKey, resKey)

v = keyAdapter.Encode(nil, srcKey, []byte("mock_common_handle"))
resKey, err = keyAdapter.Decode(nil, v)
require.NoError(t, err)
require.EqualValues(t, srcKey, resKey)
}
39 changes: 25 additions & 14 deletions br/pkg/lightning/backend/local/key_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,24 @@
package local

import (
"encoding/binary"
"math"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/util/codec"
)

// KeyAdapter is used to encode and decode keys.
type KeyAdapter interface {
// Encode encodes the key with its corresponding rowID. It appends the encoded key to dst and returns the
// resulting slice. The encoded key is guaranteed to be in ascending order for comparison.
Encode(dst []byte, key []byte, rowID int64) []byte
Encode(dst []byte, key []byte, rowID []byte) []byte

// Decode decodes the original key to dst. It appends the encoded key to dst and returns the resulting slice.
Decode(dst []byte, data []byte) ([]byte, error)

// EncodedLen returns the encoded key length.
EncodedLen(key []byte) int
EncodedLen(key []byte, rowID []byte) int
}

func reallocBytes(b []byte, n int) []byte {
Expand All @@ -46,36 +47,41 @@ func reallocBytes(b []byte, n int) []byte {

type noopKeyAdapter struct{}

func (noopKeyAdapter) Encode(dst []byte, key []byte, _ int64) []byte {
func (noopKeyAdapter) Encode(dst []byte, key []byte, _ []byte) []byte {
return append(dst, key...)
}

func (noopKeyAdapter) Decode(dst []byte, data []byte) ([]byte, error) {
return append(dst, data...), nil
}

func (noopKeyAdapter) EncodedLen(key []byte) int {
func (noopKeyAdapter) EncodedLen(key []byte, _ []byte) int {
return len(key)
}

var _ KeyAdapter = noopKeyAdapter{}

type dupDetectKeyAdapter struct{}

func (dupDetectKeyAdapter) Encode(dst []byte, key []byte, rowID int64) []byte {
func (dupDetectKeyAdapter) Encode(dst []byte, key []byte, rowID []byte) []byte {
dst = codec.EncodeBytes(dst, key)
dst = reallocBytes(dst, 8)
n := len(dst)
dst = dst[:n+8]
binary.BigEndian.PutUint64(dst[n:n+8], codec.EncodeIntToCmpUint(rowID))
dst = reallocBytes(dst, len(rowID)+2)
dst = append(dst, rowID...)
rowIDLen := uint16(len(rowID))
dst = append(dst, byte(rowIDLen>>8), byte(rowIDLen))
return dst
}

func (dupDetectKeyAdapter) Decode(dst []byte, data []byte) ([]byte, error) {
if len(data) < 8 {
if len(data) < 2 {
return nil, errors.New("insufficient bytes to decode value")
}
_, key, err := codec.DecodeBytes(data[:len(data)-8], dst[len(dst):cap(dst)])
rowIDLen := uint16(data[len(data)-2])<<8 | uint16(data[len(data)-1])
tailLen := int(rowIDLen + 2)
if len(data) < tailLen {
return nil, errors.New("insufficient bytes to decode value")
}
_, key, err := codec.DecodeBytes(data[:len(data)-tailLen], dst[len(dst):cap(dst)])
if err != nil {
return nil, err
}
Expand All @@ -90,8 +96,13 @@ func (dupDetectKeyAdapter) Decode(dst []byte, data []byte) ([]byte, error) {
return append(dst, key...), nil
}

func (dupDetectKeyAdapter) EncodedLen(key []byte) int {
return codec.EncodedBytesLength(len(key)) + 8
func (dupDetectKeyAdapter) EncodedLen(key []byte, rowID []byte) int {
return codec.EncodedBytesLength(len(key)) + len(rowID) + 2
}

var _ KeyAdapter = dupDetectKeyAdapter{}

var (
MinRowID = common.EncodeIntRowID(math.MinInt64)
ZeroRowID = common.EncodeIntRowID(0)
)
Loading

0 comments on commit 17c1376

Please sign in to comment.