diff --git a/cockroachkvs/blockproperties.go b/cockroachkvs/blockproperties.go new file mode 100644 index 0000000000..22ebd0b9b2 --- /dev/null +++ b/cockroachkvs/blockproperties.go @@ -0,0 +1,138 @@ +// Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package cockroachkvs + +import ( + "encoding/binary" + "math" + + "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/sstable" +) + +const mvccWallTimeIntervalCollector = "MVCCTimeInterval" + +// BlockPropertyCollectors is a list of constructors for block-property +// collectors used by CockroachDB. +var BlockPropertyCollectors = []func() pebble.BlockPropertyCollector{ + func() pebble.BlockPropertyCollector { + return sstable.NewBlockIntervalCollector( + mvccWallTimeIntervalCollector, + pebbleIntervalMapper{}, + MVCCBlockIntervalSuffixReplacer{}, + ) + }, +} + +// NewMVCCTimeIntervalFilter constructs a new block-property filter that skips +// keys that encode timestamps with wall times that do not fall within the +// interval [minWallTime,maxWallTime]. +func NewMVCCTimeIntervalFilter(minWallTime, maxWallTime uint64) sstable.BlockPropertyFilter { + return sstable.NewBlockIntervalFilter(mvccWallTimeIntervalCollector, + uint64(minWallTime), + uint64(maxWallTime)+1, + MVCCBlockIntervalSuffixReplacer{}, + ) +} + +// MVCCWallTimeIntervalRangeKeyMask implements pebble.BlockPropertyFilterMask +// for filtering blocks using the MVCCTimeInterval block property during range +// key masking. +type MVCCWallTimeIntervalRangeKeyMask struct { + sstable.BlockIntervalFilter +} + +// SetSuffix implements the pebble.BlockPropertyFilterMask interface. +func (m *MVCCWallTimeIntervalRangeKeyMask) SetSuffix(suffix []byte) error { + if len(suffix) == 0 { + // This is currently impossible, because the only range key Cockroach + // writes today is the MVCC Delete Range that's always suffixed. + return nil + } + wall, _, err := DecodeMVCCTimestampSuffix(suffix) + if err != nil { + return err + } + m.BlockIntervalFilter.SetInterval(wall, math.MaxUint64) + return nil +} + +var _ sstable.BlockIntervalSuffixReplacer = MVCCBlockIntervalSuffixReplacer{} + +// MVCCBlockIntervalSuffixReplacer implements the +// sstable.BlockIntervalSuffixReplacer interface for MVCC timestamp intervals. +type MVCCBlockIntervalSuffixReplacer struct{} + +func (MVCCBlockIntervalSuffixReplacer) ApplySuffixReplacement( + interval sstable.BlockInterval, newSuffix []byte, +) (sstable.BlockInterval, error) { + synthDecodedWalltime, _, err := DecodeMVCCTimestampSuffix(newSuffix) + if err != nil { + return sstable.BlockInterval{}, errors.AssertionFailedf("could not decode synthetic suffix") + } + // The returned bound includes the synthetic suffix, regardless of its logical + // component. + return sstable.BlockInterval{Lower: synthDecodedWalltime, Upper: synthDecodedWalltime + 1}, nil +} + +type pebbleIntervalMapper struct{} + +var _ sstable.IntervalMapper = pebbleIntervalMapper{} + +// MapPointKey is part of the sstable.IntervalMapper interface. +func (pebbleIntervalMapper) MapPointKey( + key pebble.InternalKey, value []byte, +) (sstable.BlockInterval, error) { + return mapSuffixToInterval(key.UserKey) +} + +// MapRangeKey is part of the sstable.IntervalMapper interface. +func (pebbleIntervalMapper) MapRangeKeys(span sstable.Span) (sstable.BlockInterval, error) { + var res sstable.BlockInterval + for _, k := range span.Keys { + i, err := mapSuffixToInterval(k.Suffix) + if err != nil { + return sstable.BlockInterval{}, err + } + res.UnionWith(i) + } + return res, nil +} + +// mapSuffixToInterval maps the suffix of a key to a timestamp interval. +// The buffer can be an entire key or just the suffix. +func mapSuffixToInterval(b []byte) (sstable.BlockInterval, error) { + if len(b) == 0 { + return sstable.BlockInterval{}, nil + } + // Last byte is the version length + 1 when there is a version, + // else it is 0. + versionLen := int(b[len(b)-1]) + if versionLen == 0 { + // This is not an MVCC key that we can collect. + return sstable.BlockInterval{}, nil + } + // prefixPartEnd points to the sentinel byte, unless this is a bare suffix, in + // which case the index is -1. + prefixPartEnd := len(b) - 1 - versionLen + // Sanity check: the index should be >= -1. Additionally, if the index is >= + // 0, it should point to the sentinel byte, as this is a full EngineKey. + if prefixPartEnd < -1 || (prefixPartEnd >= 0 && b[prefixPartEnd] != 0x00) { + return sstable.BlockInterval{}, errors.Errorf("invalid key %x", b) + } + // We don't need the last byte (the version length). + versionLen-- + // Only collect if this looks like an MVCC timestamp. + if versionLen == engineKeyVersionWallTimeLen || + versionLen == engineKeyVersionWallAndLogicalTimeLen || + versionLen == engineKeyVersionWallLogicalAndSyntheticTimeLen { + // INVARIANT: -1 <= prefixPartEnd < len(b) - 1. + // Version consists of the bytes after the sentinel and before the length. + ts := binary.BigEndian.Uint64(b[prefixPartEnd+1:]) + return sstable.BlockInterval{Lower: ts, Upper: ts + 1}, nil + } + return sstable.BlockInterval{}, nil +} diff --git a/cockroachkvs/cockroachkvs.go b/cockroachkvs/cockroachkvs.go index d358590729..3bdae18310 100644 --- a/cockroachkvs/cockroachkvs.go +++ b/cockroachkvs/cockroachkvs.go @@ -184,6 +184,41 @@ func EncodeTimestamp(key []byte, walltime uint64, logical uint32) []byte { return key } +// DecodeMVCCTimestampSuffix decodes an MVCC timestamp from its Pebble representation, +// including the length suffix. +func DecodeMVCCTimestampSuffix(encodedTS []byte) (wallTime uint64, logical uint32, err error) { + if len(encodedTS) == 0 { + return 0, 0, nil + } + encodedLen := len(encodedTS) + if suffixLen := int(encodedTS[encodedLen-1]); suffixLen != encodedLen { + return 0, 0, errors.Errorf( + "bad timestamp: found length suffix %d, actual length %d", suffixLen, encodedLen) + } + return decodeMVCCTimestamp(encodedTS[:encodedLen-1]) +} + +// decodeMVCCTimestamp decodes an MVCC timestamp from its Pebble representation, +// excluding the length suffix. +func decodeMVCCTimestamp(encodedTS []byte) (wallTime uint64, logical uint32, err error) { + // NB: This logic is duplicated in enginepb.DecodeKey() to avoid the + // overhead of an additional function call there (~13%). + switch len(encodedTS) { + case 0: + // No-op. + case 8: + wallTime = binary.BigEndian.Uint64(encodedTS[0:8]) + case 12, 13: + wallTime = binary.BigEndian.Uint64(encodedTS[0:8]) + logical = binary.BigEndian.Uint32(encodedTS[8:12]) + // NOTE: byte 13 used to store the timestamp's synthetic bit, but this is no + // longer consulted and can be ignored during decoding. + default: + return 0, 0, errors.Errorf("bad timestamp %x", encodedTS) + } + return wallTime, logical, nil +} + // DecodeEngineKey decodes the given bytes as an EngineKey. func DecodeEngineKey(b []byte) (roachKey, version []byte, ok bool) { if len(b) == 0 {