Skip to content

Commit

Permalink
chore(dataobj): support computation of min/max values in pages and co…
Browse files Browse the repository at this point in the history
…lumns (#16015)
  • Loading branch information
rfratto authored Jan 30, 2025
1 parent c99771e commit 5ccc679
Show file tree
Hide file tree
Showing 9 changed files with 373 additions and 35 deletions.
53 changes: 47 additions & 6 deletions pkg/dataobj/internal/dataset/column_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ type BuilderOptions struct {

// CompressionOptions holds optional configuration for compression.
CompressionOptions CompressionOptions

// StoreRangeStats indicates whether to store value range statistics for the
// column and pages.
StoreRangeStats bool
}

// CompressionOptions customizes the compressor used when building pages.
Expand Down Expand Up @@ -155,14 +159,9 @@ func (cb *ColumnBuilder) Flush() (*MemColumn, error) {
Type: cb.opts.Value,

Compression: cb.opts.Compression,
Statistics: cb.buildStats(),
}

// TODO(rfratto): Should we compute column-wide statistics if they're
// available in pages?
//
// That would potentially work for min/max values, but not for count
// distinct, unless we had a way to pass sketches around.

for _, page := range cb.pages {
info.RowsCount += page.Info.RowCount
info.ValuesCount += page.Info.ValuesCount
Expand All @@ -179,6 +178,48 @@ func (cb *ColumnBuilder) Flush() (*MemColumn, error) {
return column, nil
}

func (cb *ColumnBuilder) buildStats() *datasetmd.Statistics {
if !cb.opts.StoreRangeStats {
return nil
}

var stats datasetmd.Statistics

var minValue, maxValue Value

for i, page := range cb.pages {
if page.Info.Stats == nil {
// This should never hit; if cb.opts.StoreRangeStats is true, then
// page.Info.Stats will be populated.
panic("ColumnBuilder.buildStats: page missing stats")
}

var pageMin, pageMax Value

if err := pageMin.UnmarshalBinary(page.Info.Stats.MinValue); err != nil {
panic(fmt.Sprintf("ColumnBuilder.buildStats: failed to unmarshal min value: %s", err))
} else if err := pageMax.UnmarshalBinary(page.Info.Stats.MaxValue); err != nil {
panic(fmt.Sprintf("ColumnBuilder.buildStats: failed to unmarshal max value: %s", err))
}

if i == 0 || CompareValues(pageMin, minValue) < 0 {
minValue = pageMin
}
if i == 0 || CompareValues(pageMax, maxValue) > 0 {
maxValue = pageMax
}
}

var err error
if stats.MinValue, err = minValue.MarshalBinary(); err != nil {
panic(fmt.Sprintf("ColumnBuilder.buildStats: failed to marshal min value: %s", err))
}
if stats.MaxValue, err = maxValue.MarshalBinary(); err != nil {
panic(fmt.Sprintf("ColumnBuilder.buildStats: failed to marshal max value: %s", err))
}
return &stats
}

func (cb *ColumnBuilder) flushPage() {
if cb.builder.Rows() == 0 {
return
Expand Down
82 changes: 82 additions & 0 deletions pkg/dataobj/internal/dataset/column_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dataset

import (
"strings"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -61,3 +62,84 @@ func TestColumnBuilder_ReadWrite(t *testing.T) {
}
require.Equal(t, in, actual)
}

func TestColumnBuilder_MinMax(t *testing.T) {
var (
// We include the null string in the test to ensure that it's never
// considered in min/max ranges.
nullString = ""

aString = strings.Repeat("a", 100)
bString = strings.Repeat("b", 100)
cString = strings.Repeat("c", 100)

dString = strings.Repeat("d", 100)
eString = strings.Repeat("e", 100)
fString = strings.Repeat("f", 100)
)

in := []string{
nullString,

// We append strings out-of-order below to ensure that the min/max
// comparisons are working properly.
//
// Strings are grouped by which page they'll be appended to.

bString,
cString,
aString,

eString,
fString,
dString,
}

opts := BuilderOptions{
PageSizeHint: 301, // Slightly larger than the string length of 3 strings per page.
Value: datasetmd.VALUE_TYPE_STRING,
Compression: datasetmd.COMPRESSION_TYPE_NONE,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,

StoreRangeStats: true,
}
b, err := NewColumnBuilder("", opts)
require.NoError(t, err)

for i, s := range in {
require.NoError(t, b.Append(i, StringValue(s)))
}

col, err := b.Flush()
require.NoError(t, err)
require.Equal(t, datasetmd.VALUE_TYPE_STRING, col.Info.Type)
require.NotNil(t, col.Info.Statistics)

columnMin, columnMax := getMinMax(t, col.Info.Statistics)
require.Equal(t, aString, columnMin.String())
require.Equal(t, fString, columnMax.String())

require.Len(t, col.Pages, 2)
require.Equal(t, 3, col.Pages[0].Info.ValuesCount)
require.Equal(t, 3, col.Pages[1].Info.ValuesCount)

page0Min, page0Max := getMinMax(t, col.Pages[0].Info.Stats)
require.Equal(t, aString, page0Min.String())
require.Equal(t, cString, page0Max.String())

page1Min, page1Max := getMinMax(t, col.Pages[1].Info.Stats)
require.Equal(t, dString, page1Min.String())
require.Equal(t, fString, page1Max.String())
}

func getMinMax(t *testing.T, stats *datasetmd.Statistics) (min, max Value) {
t.Helper()
require.NotNil(t, stats)

var minValue, maxValue Value

require.NoError(t, minValue.UnmarshalBinary(stats.MinValue))
require.NoError(t, maxValue.UnmarshalBinary(stats.MaxValue))

return minValue, maxValue
}
60 changes: 51 additions & 9 deletions pkg/dataobj/internal/dataset/page_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ type pageBuilder struct {

rows int // Number of rows appended to the builder.
values int // Number of non-NULL values appended to the builder.

// minValue and maxValue track the minimum and maximum values appended to the
// page. These are used to compute statistics for the page if requested.
minValue, maxValue Value
}

// newPageBuilder creates a new pageBuilder that stores a sequence of [Value]s.
Expand Down Expand Up @@ -95,6 +99,11 @@ func (b *pageBuilder) Append(value Value) bool {
return false
}

// Update min/max values for stats. We only do this for non-NULL values,
// otherwise NULL would always be the min for columns that contain a single
// NULL.
b.updateMinMax(value)

// The following calls won't fail; they only return errors when the
// underlying writers fail, which ours cannot.
if err := b.presenceEnc.Encode(Uint64Value(1)); err != nil {
Expand Down Expand Up @@ -131,6 +140,25 @@ func (b *pageBuilder) AppendNull() bool {
return true
}

func (b *pageBuilder) updateMinMax(value Value) {
// As a small optimization, we only update min/max values if we're intending
// on populating the stats. This avoids unnecessary comparisons for very
// large values.
if !b.opts.StoreRangeStats {
return
}

// We'll init minValue/maxValue if this is our first non-NULL value (b.values == 0).
// This allows us to only avoid comparing against NULL values, which would lead to
// NULL always being the min.
if b.values == 0 || CompareValues(value, b.minValue) < 0 {
b.minValue = value
}
if b.values == 0 || CompareValues(value, b.maxValue) > 0 {
b.maxValue = value
}
}

func valueSize(v Value) int {
switch v.Type() {
case datasetmd.VALUE_TYPE_INT64:
Expand Down Expand Up @@ -220,15 +248,7 @@ func (b *pageBuilder) Flush() (*MemPage, error) {
ValuesCount: b.values,

Encoding: b.opts.Encoding,

// TODO(rfratto): At the moment we don't compute stats because they're
// not going to be valuable in every scenario: the min/max values for log
// lines is less useful compared to the min/max values for timestamps.
//
// In the future, we may wish to add more options to pageBuilder to tell
// it to compute a subset of stats to avoid needing a second iteration
// over the page to compute them.
Stats: nil,
Stats: b.buildStats(),
},

Data: finalData.Bytes(),
Expand All @@ -238,6 +258,26 @@ func (b *pageBuilder) Flush() (*MemPage, error) {
return &page, nil
}

func (b *pageBuilder) buildStats() *datasetmd.Statistics {
if !b.opts.StoreRangeStats {
return nil
}

minValueBytes, err := b.minValue.MarshalBinary()
if err != nil {
panic(fmt.Sprintf("pageBuilder.buildStats: failed to marshal min value: %s", err))
}
maxValueBytes, err := b.maxValue.MarshalBinary()
if err != nil {
panic(fmt.Sprintf("pageBuilder.buildStats: failed to marshal max value: %s", err))
}

return &datasetmd.Statistics{
MinValue: minValueBytes,
MaxValue: maxValueBytes,
}
}

// Reset resets the pageBuilder to a fresh state, allowing it to be reused.
func (b *pageBuilder) Reset() {
b.presenceBuffer.Reset()
Expand All @@ -247,4 +287,6 @@ func (b *pageBuilder) Reset() {
b.valuesEnc.Reset(b.valuesWriter)
b.rows = 0
b.values = 0
b.minValue = Value{}
b.maxValue = Value{}
}
68 changes: 68 additions & 0 deletions pkg/dataobj/internal/dataset/value.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dataset

import (
"cmp"
"encoding/binary"
"fmt"
"unsafe"

Expand Down Expand Up @@ -125,6 +126,73 @@ func (v Value) String() string {
return v.Type().String()
}

// MarshalBinary encodes v into a binary representation. Non-NULL values encode
// first with the type (encoded as uvarint), followed by an encoded value,
// where:
//
// - [datasetmd.VALUE_TYPE_INT64] encodes as a varint.
// - [datasetmd.VALUE_TYPE_UINT64] encodes as a uvarint.
// - [datasetmd.VALUE_TYPE_STRING] encodes the string as a sequence of bytes.
//
// NULL values encode as nil.
func (v Value) MarshalBinary() (data []byte, err error) {
if v.IsNil() {
return nil, nil
}

buf := binary.AppendUvarint(nil, uint64(v.Type()))

switch v.Type() {
case datasetmd.VALUE_TYPE_INT64:
buf = binary.AppendVarint(buf, v.Int64())
case datasetmd.VALUE_TYPE_UINT64:
buf = binary.AppendUvarint(buf, v.Uint64())
case datasetmd.VALUE_TYPE_STRING:
str := v.String()
buf = append(buf, unsafe.Slice(unsafe.StringData(str), len(str))...)
default:
return nil, fmt.Errorf("dataset.Value.MarshalBinary: unsupported type %s", v.Type())
}

return buf, nil
}

// UnmarshalBinary decodes a Value from a binary representation. See
// [Value.MarshalBinary] for the encoding format.
func (v *Value) UnmarshalBinary(data []byte) error {
if len(data) == 0 {
*v = Value{} // NULL
return nil
}

typ, n := binary.Uvarint(data)
if n <= 0 {
return fmt.Errorf("dataset.Value.UnmarshalBinary: invalid type")
}

switch vtyp := datasetmd.ValueType(typ); vtyp {
case datasetmd.VALUE_TYPE_INT64:
val, n := binary.Varint(data[n:])
if n <= 0 {
return fmt.Errorf("dataset.Value.UnmarshalBinary: invalid int64 value")
}
*v = Int64Value(val)
case datasetmd.VALUE_TYPE_UINT64:
val, n := binary.Uvarint(data[n:])
if n <= 0 {
return fmt.Errorf("dataset.Value.UnmarshalBinary: invalid uint64 value")
}
*v = Uint64Value(val)
case datasetmd.VALUE_TYPE_STRING:
str := string(data[n:])
*v = StringValue(str)
default:
return fmt.Errorf("dataset.Value.UnmarshalBinary: unsupported type %s", vtyp)
}

return nil
}

// CompareValues returns -1 if a<b, 0 if a==b, or 1 if a>b. CompareValues
// panics if a and b are not the same type.
//
Expand Down
Loading

0 comments on commit 5ccc679

Please sign in to comment.