Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: refactors for value separation #4357

Merged
merged 3 commits into from
Mar 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/base/lazy_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type ShortAttributeExtractor func(
// AttributeAndLen represents the pair of value length and the short
// attribute.
type AttributeAndLen struct {
ValueLen int32
ValueLen uint32
ShortAttribute ShortAttribute
}

Expand Down Expand Up @@ -184,7 +184,7 @@ type ValueFetcher interface {
// will allocate a new slice for the value. In either case it will set
// callerOwned to true.
Fetch(
ctx context.Context, handle []byte, valLen int32, buf []byte,
ctx context.Context, handle []byte, valLen uint32, buf []byte,
) (val []byte, callerOwned bool, err error)
}

Expand Down
8 changes: 4 additions & 4 deletions internal/base/lazy_value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import (
)

type valueFetcherFunc func(
handle []byte, valLen int32, buf []byte) (val []byte, callerOwned bool, err error)
handle []byte, valLen uint32, buf []byte) (val []byte, callerOwned bool, err error)

func (v valueFetcherFunc) Fetch(
ctx context.Context, handle []byte, valLen int32, buf []byte,
ctx context.Context, handle []byte, valLen uint32, buf []byte,
) (val []byte, callerOwned bool, err error) {
return v(handle, valLen, buf)
}
Expand Down Expand Up @@ -54,10 +54,10 @@ func TestLazyValue(t *testing.T) {
ValueOrHandle: []byte("foo-handle"),
Fetcher: &LazyFetcher{
Fetcher: valueFetcherFunc(
func(handle []byte, valLen int32, buf []byte) ([]byte, bool, error) {
func(handle []byte, valLen uint32, buf []byte) ([]byte, bool, error) {
numCalls++
require.Equal(t, []byte("foo-handle"), handle)
require.Equal(t, int32(3), valLen)
require.Equal(t, uint32(3), valLen)
return fooBytes1, callerOwned, nil
}),
Attribute: AttributeAndLen{ValueLen: 3, ShortAttribute: 7},
Expand Down
9 changes: 8 additions & 1 deletion internal/base/value.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type InternalValue struct {
lazyValue LazyValue
}

// MakeLazyValue constructs an IntrenalValue from a LazyValue.
// MakeLazyValue constructs an InternalValue from a LazyValue.
func MakeLazyValue(v LazyValue) InternalValue {
return InternalValue{lazyValue: v}
}
Expand Down Expand Up @@ -60,6 +60,13 @@ func (v *InternalValue) InternalLen() int {
return len(v.lazyValue.ValueOrHandle)
}

// ValueOrHandle returns the value or handle that is stored inlined. If the
// value is stored out-of-band, the returned slice contains a binary-encoded
// value handle.
func (v *InternalValue) ValueOrHandle() []byte {
return v.lazyValue.ValueOrHandle
}

// Value returns the KV's underlying value.
func (v *InternalValue) Value(buf []byte) (val []byte, callerOwned bool, err error) {
return v.lazyValue.Value(buf)
Expand Down
25 changes: 15 additions & 10 deletions sstable/block/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ type ValuePrefix byte

const (
// 2 most-significant bits of valuePrefix encodes the value-kind.
valueKindMask ValuePrefix = 0xC0
valueKindIsValueHandle ValuePrefix = 0x80
valueKindIsInPlaceValue ValuePrefix = 0x00
valueKindMask ValuePrefix = 0xC0
valueKindIsValueBlockHandle ValuePrefix = 0x80
valueKindIsInPlaceValue ValuePrefix = 0x00

// 1 bit indicates SET has same key prefix as immediately preceding key that
// is also a SET. If the immediately preceding key in the same block is a
Expand All @@ -32,9 +32,14 @@ const (
userDefinedShortAttributeMask ValuePrefix = 0x07
)

// IsValueHandle returns true if the ValuePrefix is for a valueHandle.
func (vp ValuePrefix) IsValueHandle() bool {
return vp&valueKindMask == valueKindIsValueHandle
// IsInPlaceValue returns true if the ValuePrefix is for an in-place value.
func (vp ValuePrefix) IsInPlaceValue() bool {
return vp&valueKindMask == valueKindIsInPlaceValue
}

// IsValueBlockHandle returns true if the ValuePrefix is for a valblk.Handle.
func (vp ValuePrefix) IsValueBlockHandle() bool {
return vp&valueKindMask == valueKindIsValueBlockHandle
}

// SetHasSamePrefix returns true if the ValuePrefix encodes that the key is a
Expand All @@ -46,14 +51,14 @@ func (vp ValuePrefix) SetHasSamePrefix() bool {
// ShortAttribute returns the user-defined base.ShortAttribute encoded in the
// ValuePrefix.
//
// REQUIRES: IsValueHandle()
// REQUIRES: !IsInPlaceValue()
func (vp ValuePrefix) ShortAttribute() base.ShortAttribute {
return base.ShortAttribute(vp & userDefinedShortAttributeMask)
}

// ValueHandlePrefix returns the ValuePrefix for a valueHandle.
func ValueHandlePrefix(setHasSameKeyPrefix bool, attribute base.ShortAttribute) ValuePrefix {
prefix := valueKindIsValueHandle | ValuePrefix(attribute)
// ValueBlockHandlePrefix returns the ValuePrefix for a valblk.Handle.
func ValueBlockHandlePrefix(setHasSameKeyPrefix bool, attribute base.ShortAttribute) ValuePrefix {
prefix := valueKindIsValueBlockHandle | ValuePrefix(attribute)
if setHasSameKeyPrefix {
prefix = prefix | setHasSameKeyPrefixMask
}
Expand Down
4 changes: 2 additions & 2 deletions sstable/block/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ func TestValuePrefix(t *testing.T) {
t.Run(fmt.Sprintf("%+v", tc), func(t *testing.T) {
var prefix ValuePrefix
if tc.isHandle {
prefix = ValueHandlePrefix(tc.setHasSamePrefix, tc.attr)
prefix = ValueBlockHandlePrefix(tc.setHasSamePrefix, tc.attr)
} else {
prefix = InPlaceValuePrefix(tc.setHasSamePrefix)
}
require.Equal(t, tc.isHandle, prefix.IsValueHandle())
require.Equal(t, tc.isHandle, prefix.IsValueBlockHandle())
require.Equal(t, tc.setHasSamePrefix, prefix.SetHasSamePrefix())
if tc.isHandle {
require.Equal(t, tc.attr, prefix.ShortAttribute())
Expand Down
2 changes: 1 addition & 1 deletion sstable/colblk/data_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ func (w *DataBlockEncoder) Add(
w.isObsolete.Set(w.rows)
}
w.trailers.Set(w.rows, uint64(ikey.Trailer))
if valuePrefix.IsValueHandle() {
if valuePrefix.IsValueBlockHandle() {
w.isValueExternal.Set(w.rows)
// Write the value with the value prefix byte preceding the value.
w.valuePrefixTmp[0] = byte(valuePrefix)
Expand Down
2 changes: 1 addition & 1 deletion sstable/colblk/data_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestDataBlock(t *testing.T) {
valueString := line[j+1:]
vp := block.InPlaceValuePrefix(kcmp.PrefixEqual())
if strings.HasPrefix(valueString, "valueHandle") {
vp = block.ValueHandlePrefix(kcmp.PrefixEqual(), 0)
vp = block.ValueBlockHandlePrefix(kcmp.PrefixEqual(), 0)
}
if kcmp.UserKeyComparison == 0 && prevKey.Kind() != base.InternalKeyKindMerge {
isObsolete = true
Expand Down
2 changes: 1 addition & 1 deletion sstable/colblk_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func (w *RawColumnWriter) AddWithForceObsolete(
return err
}
}
valuePrefix = block.ValueHandlePrefix(eval.kcmp.PrefixEqual(), attribute)
valuePrefix = block.ValueBlockHandlePrefix(eval.kcmp.PrefixEqual(), attribute)
} else {
valueStoredWithKey = value
if len(value) > 0 {
Expand Down
9 changes: 7 additions & 2 deletions sstable/layout.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,11 +444,16 @@ func formatRowblkDataBlock(
if fmtRecord != nil {
if r.tableFormat < TableFormatPebblev3 || key.Kind() != InternalKeyKindSet {
fmt.Fprintf(w, "\n %s", fmtRecord(key, value))
} else if !block.ValuePrefix(value[0]).IsValueHandle() {
return
}
vp := block.ValuePrefix(value[0])
if vp.IsInPlaceValue() {
fmt.Fprintf(w, "\n %s", fmtRecord(key, value[1:]))
} else {
} else if vp.IsValueBlockHandle() {
vh := valblk.DecodeHandle(value[1:])
fmt.Fprintf(w, "\n %s", fmtRecord(key, []byte(fmt.Sprintf("value handle %+v", vh))))
} else {
panic(fmt.Sprintf("unknown value prefix: %d", value[0]))
}
}
})
Expand Down
16 changes: 8 additions & 8 deletions sstable/rowblk/rowblk_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ func (i *Iter) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV {
if !i.lazyValueHandling.hasValuePrefix ||
i.ikv.K.Kind() != base.InternalKeyKindSet {
i.ikv.V = base.MakeInPlaceValue(i.val)
} else if i.lazyValueHandling.getValue == nil || !block.ValuePrefix(i.val[0]).IsValueHandle() {
} else if i.lazyValueHandling.getValue == nil || block.ValuePrefix(i.val[0]).IsInPlaceValue() {
i.ikv.V = base.MakeInPlaceValue(i.val[1:])
} else {
i.ikv.V = i.lazyValueHandling.getValue.GetInternalValueForPrefixAndValueHandle(i.val)
Expand Down Expand Up @@ -989,7 +989,7 @@ func (i *Iter) SeekLT(key []byte, flags base.SeekLTFlags) *base.InternalKV {
if !i.lazyValueHandling.hasValuePrefix ||
i.ikv.K.Kind() != base.InternalKeyKindSet {
i.ikv.V = base.MakeInPlaceValue(i.val)
} else if i.lazyValueHandling.getValue == nil || !block.ValuePrefix(i.val[0]).IsValueHandle() {
} else if i.lazyValueHandling.getValue == nil || block.ValuePrefix(i.val[0]).IsInPlaceValue() {
i.ikv.V = base.MakeInPlaceValue(i.val[1:])
} else {
i.ikv.V = i.lazyValueHandling.getValue.GetInternalValueForPrefixAndValueHandle(i.val)
Expand Down Expand Up @@ -1018,7 +1018,7 @@ func (i *Iter) First() *base.InternalKV {
if !i.lazyValueHandling.hasValuePrefix ||
i.ikv.K.Kind() != base.InternalKeyKindSet {
i.ikv.V = base.MakeInPlaceValue(i.val)
} else if i.lazyValueHandling.getValue == nil || !block.ValuePrefix(i.val[0]).IsValueHandle() {
} else if i.lazyValueHandling.getValue == nil || block.ValuePrefix(i.val[0]).IsInPlaceValue() {
i.ikv.V = base.MakeInPlaceValue(i.val[1:])
} else {
i.ikv.V = i.lazyValueHandling.getValue.GetInternalValueForPrefixAndValueHandle(i.val)
Expand Down Expand Up @@ -1064,7 +1064,7 @@ func (i *Iter) Last() *base.InternalKV {
if !i.lazyValueHandling.hasValuePrefix ||
i.ikv.K.Kind() != base.InternalKeyKindSet {
i.ikv.V = base.MakeInPlaceValue(i.val)
} else if i.lazyValueHandling.getValue == nil || !block.ValuePrefix(i.val[0]).IsValueHandle() {
} else if i.lazyValueHandling.getValue == nil || block.ValuePrefix(i.val[0]).IsInPlaceValue() {
i.ikv.V = base.MakeInPlaceValue(i.val[1:])
} else {
i.ikv.V = i.lazyValueHandling.getValue.GetInternalValueForPrefixAndValueHandle(i.val)
Expand Down Expand Up @@ -1123,7 +1123,7 @@ start:
if !i.lazyValueHandling.hasValuePrefix ||
i.ikv.K.Kind() != base.InternalKeyKindSet {
i.ikv.V = base.MakeInPlaceValue(i.val)
} else if i.lazyValueHandling.getValue == nil || !block.ValuePrefix(i.val[0]).IsValueHandle() {
} else if i.lazyValueHandling.getValue == nil || block.ValuePrefix(i.val[0]).IsInPlaceValue() {
i.ikv.V = base.MakeInPlaceValue(i.val[1:])
} else {
i.ikv.V = i.lazyValueHandling.getValue.GetInternalValueForPrefixAndValueHandle(i.val)
Expand Down Expand Up @@ -1413,7 +1413,7 @@ func (i *Iter) nextPrefixV3(succKey []byte) *base.InternalKV {
}
if i.ikv.K.Kind() != base.InternalKeyKindSet {
i.ikv.V = base.MakeInPlaceValue(i.val)
} else if i.lazyValueHandling.getValue == nil || !block.ValuePrefix(i.val[0]).IsValueHandle() {
} else if i.lazyValueHandling.getValue == nil || block.ValuePrefix(i.val[0]).IsInPlaceValue() {
i.ikv.V = base.MakeInPlaceValue(i.val[1:])
} else {
i.ikv.V = i.lazyValueHandling.getValue.GetInternalValueForPrefixAndValueHandle(i.val)
Expand Down Expand Up @@ -1471,7 +1471,7 @@ start:
if !i.lazyValueHandling.hasValuePrefix ||
i.ikv.K.Kind() != base.InternalKeyKindSet {
i.ikv.V = base.MakeInPlaceValue(i.val)
} else if i.lazyValueHandling.getValue == nil || !block.ValuePrefix(i.val[0]).IsValueHandle() {
} else if i.lazyValueHandling.getValue == nil || block.ValuePrefix(i.val[0]).IsInPlaceValue() {
i.ikv.V = base.MakeInPlaceValue(i.val[1:])
} else {
i.ikv.V = i.lazyValueHandling.getValue.GetInternalValueForPrefixAndValueHandle(i.val)
Expand Down Expand Up @@ -1553,7 +1553,7 @@ start:
if !i.lazyValueHandling.hasValuePrefix ||
i.ikv.K.Kind() != base.InternalKeyKindSet {
i.ikv.V = base.MakeInPlaceValue(i.val)
} else if i.lazyValueHandling.getValue == nil || !block.ValuePrefix(i.val[0]).IsValueHandle() {
} else if i.lazyValueHandling.getValue == nil || block.ValuePrefix(i.val[0]).IsInPlaceValue() {
i.ikv.V = base.MakeInPlaceValue(i.val[1:])
} else {
i.ikv.V = i.lazyValueHandling.getValue.GetInternalValueForPrefixAndValueHandle(i.val)
Expand Down
2 changes: 1 addition & 1 deletion sstable/rowblk_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ func (w *RawRowWriter) addPoint(key InternalKey, value []byte, forceObsolete boo
return err
}
}
prefix = block.ValueHandlePrefix(setHasSameKeyPrefix, attribute)
prefix = block.ValueBlockHandlePrefix(setHasSameKeyPrefix, attribute)
} else {
valueStoredWithKey = value
valueStoredWithKeyLen = len(value)
Expand Down
32 changes: 5 additions & 27 deletions sstable/valblk/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"math/rand/v2"
"unsafe"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
Expand Down Expand Up @@ -121,7 +120,7 @@ func (r *Reader) GetInternalValueForPrefixAndValueHandle(handle []byte) base.Int
*lazyFetcher = base.LazyFetcher{
Fetcher: r.fetcher,
Attribute: base.AttributeAndLen{
ValueLen: int32(valLen),
ValueLen: valLen,
ShortAttribute: block.ValuePrefix(handle[0]).ShortAttribute(),
},
}
Expand Down Expand Up @@ -191,7 +190,7 @@ func newValueBlockFetcher(

// Fetch implements base.ValueFetcher.
func (f *valueBlockFetcher) Fetch(
ctx context.Context, handle []byte, valLen int32, buf []byte,
ctx context.Context, handle []byte, valLen uint32, buf []byte,
) (val []byte, callerOwned bool, err error) {
if !f.closed {
val, err := f.getValueInternal(handle, valLen)
Expand Down Expand Up @@ -251,9 +250,9 @@ func (f *valueBlockFetcher) doValueMangling(v []byte) []byte {
return f.bufToMangle
}

func (f *valueBlockFetcher) getValueInternal(handle []byte, valLen int32) (val []byte, err error) {
func (f *valueBlockFetcher) getValueInternal(handle []byte, valLen uint32) (val []byte, err error) {
vh := DecodeRemainingHandle(handle)
vh.ValueLen = uint32(valLen)
vh.ValueLen = valLen
if f.vbiBlock == nil {
ch, err := f.bpOpen.ReadValueBlock(f.vbih.Handle, f.stats)
if err != nil {
Expand Down Expand Up @@ -284,26 +283,5 @@ func (f *valueBlockFetcher) getValueInternal(handle []byte, valLen int32) (val [
}

func (f *valueBlockFetcher) getBlockHandle(blockNum uint32) (block.Handle, error) {
indexEntryLen :=
int(f.vbih.BlockNumByteLength + f.vbih.BlockOffsetByteLength + f.vbih.BlockLengthByteLength)
offsetInIndex := indexEntryLen * int(blockNum)
if len(f.vbiBlock) < offsetInIndex+indexEntryLen {
return block.Handle{}, base.AssertionFailedf(
"index entry out of bounds: offset %d length %d block length %d",
offsetInIndex, indexEntryLen, len(f.vbiBlock))
}
b := f.vbiBlock[offsetInIndex : offsetInIndex+indexEntryLen]
n := int(f.vbih.BlockNumByteLength)
bn := littleEndianGet(b, n)
if uint32(bn) != blockNum {
return block.Handle{},
errors.Errorf("expected block num %d but found %d", blockNum, bn)
}
b = b[n:]
n = int(f.vbih.BlockOffsetByteLength)
blockOffset := littleEndianGet(b, n)
b = b[n:]
n = int(f.vbih.BlockLengthByteLength)
blockLen := littleEndianGet(b, n)
return block.Handle{Offset: blockOffset, Length: blockLen}, nil
return DecodeBlockHandleFromIndex(f.vbiBlock, blockNum, f.vbih)
}
29 changes: 29 additions & 0 deletions sstable/valblk/valblk.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ import (
"unsafe"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/sstable/block"
)

Expand Down Expand Up @@ -367,6 +368,34 @@ func DecodeIndex(data []byte, vbih IndexHandle) ([]block.Handle, error) {
return valueBlocks, nil
}

// DecodeBlockHandleFromIndex decodes the block handle for the given block
// number from the provided index block.
func DecodeBlockHandleFromIndex(
vbiBlock []byte, blockNum uint32, indexHandle IndexHandle,
) (block.Handle, error) {
w := indexHandle.RowWidth()
off := w * int(blockNum)
if len(vbiBlock) < off+w {
return block.Handle{}, base.CorruptionErrorf(
"index entry out of bounds: offset %d length %d block length %d",
off, w, len(vbiBlock))
}
b := vbiBlock[off : off+w]
n := int(indexHandle.BlockNumByteLength)
bn := littleEndianGet(b, n)
if uint32(bn) != blockNum {
return block.Handle{},
errors.Errorf("expected block num %d but found %d", blockNum, bn)
}
b = b[n:]
n = int(indexHandle.BlockOffsetByteLength)
blockOffset := littleEndianGet(b, n)
b = b[n:]
n = int(indexHandle.BlockLengthByteLength)
blockLen := littleEndianGet(b, n)
return block.Handle{Offset: blockOffset, Length: blockLen}, nil
}

// littleEndianPut writes v to b using little endian encoding, under the
// assumption that v can be represented using n bytes.
func littleEndianPut(v uint64, b []byte, n int) {
Expand Down
6 changes: 4 additions & 2 deletions sstable/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,13 +443,15 @@ func TestWriterWithValueBlocks(t *testing.T) {
if kv.K.Kind() == InternalKeyKindSet {
prefix := block.ValuePrefix(lv.ValueOrHandle[0])
setWithSamePrefix := prefix.SetHasSamePrefix()
if prefix.IsValueHandle() {
if prefix.IsInPlaceValue() {
fmt.Fprintf(&buf, "%s:in-place %s, same-pre %t\n", kv.K, lv.ValueOrHandle[1:], setWithSamePrefix)
} else if prefix.IsValueBlockHandle() {
attribute := prefix.ShortAttribute()
vh := valblk.DecodeHandle(lv.ValueOrHandle[1:])
fmt.Fprintf(&buf, "%s:value-handle len %d block %d offset %d, att %d, same-pre %t\n",
kv.K, vh.ValueLen, vh.BlockNum, vh.OffsetInBlock, attribute, setWithSamePrefix)
} else {
fmt.Fprintf(&buf, "%s:in-place %s, same-pre %t\n", kv.K, lv.ValueOrHandle[1:], setWithSamePrefix)
panic(fmt.Sprintf("unknown value prefix: %d", lv.ValueOrHandle[0]))
}
} else {
fmt.Fprintf(&buf, "%s:%s\n", kv.K, lv.ValueOrHandle)
Expand Down