From 5ccc6790cb66aa1d2726d90e2d5c4f42c1a7cb90 Mon Sep 17 00:00:00 2001 From: Robert Fratto <robertfratto@gmail.com> Date: Thu, 30 Jan 2025 12:03:35 -0500 Subject: [PATCH] chore(dataobj): support computation of min/max values in pages and columns (#16015) --- .../internal/dataset/column_builder.go | 53 ++++++++++-- pkg/dataobj/internal/dataset/column_test.go | 82 +++++++++++++++++++ pkg/dataobj/internal/dataset/page_builder.go | 60 ++++++++++++-- pkg/dataobj/internal/dataset/value.go | 68 +++++++++++++++ pkg/dataobj/internal/dataset/value_test.go | 78 ++++++++++++++++++ .../metadata/datasetmd/datasetmd.pb.go | 12 ++- .../metadata/datasetmd/datasetmd.proto | 12 ++- pkg/dataobj/internal/sections/logs/table.go | 25 ++++-- .../internal/sections/streams/streams.go | 18 ++-- 9 files changed, 373 insertions(+), 35 deletions(-) create mode 100644 pkg/dataobj/internal/dataset/value_test.go diff --git a/pkg/dataobj/internal/dataset/column_builder.go b/pkg/dataobj/internal/dataset/column_builder.go index 0b6833e0abc86..a54d782054ef1 100644 --- a/pkg/dataobj/internal/dataset/column_builder.go +++ b/pkg/dataobj/internal/dataset/column_builder.go @@ -26,6 +26,10 @@ type BuilderOptions struct { // CompressionOptions holds optional configuration for compression. CompressionOptions CompressionOptions + + // StoreRangeStats indicates whether to store value range statistics for the + // column and pages. + StoreRangeStats bool } // CompressionOptions customizes the compressor used when building pages. @@ -155,14 +159,9 @@ func (cb *ColumnBuilder) Flush() (*MemColumn, error) { Type: cb.opts.Value, Compression: cb.opts.Compression, + Statistics: cb.buildStats(), } - // TODO(rfratto): Should we compute column-wide statistics if they're - // available in pages? - // - // That would potentially work for min/max values, but not for count - // distinct, unless we had a way to pass sketches around. - for _, page := range cb.pages { info.RowsCount += page.Info.RowCount info.ValuesCount += page.Info.ValuesCount @@ -179,6 +178,48 @@ func (cb *ColumnBuilder) Flush() (*MemColumn, error) { return column, nil } +func (cb *ColumnBuilder) buildStats() *datasetmd.Statistics { + if !cb.opts.StoreRangeStats { + return nil + } + + var stats datasetmd.Statistics + + var minValue, maxValue Value + + for i, page := range cb.pages { + if page.Info.Stats == nil { + // This should never hit; if cb.opts.StoreRangeStats is true, then + // page.Info.Stats will be populated. + panic("ColumnBuilder.buildStats: page missing stats") + } + + var pageMin, pageMax Value + + if err := pageMin.UnmarshalBinary(page.Info.Stats.MinValue); err != nil { + panic(fmt.Sprintf("ColumnBuilder.buildStats: failed to unmarshal min value: %s", err)) + } else if err := pageMax.UnmarshalBinary(page.Info.Stats.MaxValue); err != nil { + panic(fmt.Sprintf("ColumnBuilder.buildStats: failed to unmarshal max value: %s", err)) + } + + if i == 0 || CompareValues(pageMin, minValue) < 0 { + minValue = pageMin + } + if i == 0 || CompareValues(pageMax, maxValue) > 0 { + maxValue = pageMax + } + } + + var err error + if stats.MinValue, err = minValue.MarshalBinary(); err != nil { + panic(fmt.Sprintf("ColumnBuilder.buildStats: failed to marshal min value: %s", err)) + } + if stats.MaxValue, err = maxValue.MarshalBinary(); err != nil { + panic(fmt.Sprintf("ColumnBuilder.buildStats: failed to marshal max value: %s", err)) + } + return &stats +} + func (cb *ColumnBuilder) flushPage() { if cb.builder.Rows() == 0 { return diff --git a/pkg/dataobj/internal/dataset/column_test.go b/pkg/dataobj/internal/dataset/column_test.go index d065402921665..de2f3f3549461 100644 --- a/pkg/dataobj/internal/dataset/column_test.go +++ b/pkg/dataobj/internal/dataset/column_test.go @@ -1,6 +1,7 @@ package dataset import ( + "strings" "testing" "github.com/stretchr/testify/require" @@ -61,3 +62,84 @@ func TestColumnBuilder_ReadWrite(t *testing.T) { } require.Equal(t, in, actual) } + +func TestColumnBuilder_MinMax(t *testing.T) { + var ( + // We include the null string in the test to ensure that it's never + // considered in min/max ranges. + nullString = "" + + aString = strings.Repeat("a", 100) + bString = strings.Repeat("b", 100) + cString = strings.Repeat("c", 100) + + dString = strings.Repeat("d", 100) + eString = strings.Repeat("e", 100) + fString = strings.Repeat("f", 100) + ) + + in := []string{ + nullString, + + // We append strings out-of-order below to ensure that the min/max + // comparisons are working properly. + // + // Strings are grouped by which page they'll be appended to. + + bString, + cString, + aString, + + eString, + fString, + dString, + } + + opts := BuilderOptions{ + PageSizeHint: 301, // Slightly larger than the string length of 3 strings per page. + Value: datasetmd.VALUE_TYPE_STRING, + Compression: datasetmd.COMPRESSION_TYPE_NONE, + Encoding: datasetmd.ENCODING_TYPE_PLAIN, + + StoreRangeStats: true, + } + b, err := NewColumnBuilder("", opts) + require.NoError(t, err) + + for i, s := range in { + require.NoError(t, b.Append(i, StringValue(s))) + } + + col, err := b.Flush() + require.NoError(t, err) + require.Equal(t, datasetmd.VALUE_TYPE_STRING, col.Info.Type) + require.NotNil(t, col.Info.Statistics) + + columnMin, columnMax := getMinMax(t, col.Info.Statistics) + require.Equal(t, aString, columnMin.String()) + require.Equal(t, fString, columnMax.String()) + + require.Len(t, col.Pages, 2) + require.Equal(t, 3, col.Pages[0].Info.ValuesCount) + require.Equal(t, 3, col.Pages[1].Info.ValuesCount) + + page0Min, page0Max := getMinMax(t, col.Pages[0].Info.Stats) + require.Equal(t, aString, page0Min.String()) + require.Equal(t, cString, page0Max.String()) + + page1Min, page1Max := getMinMax(t, col.Pages[1].Info.Stats) + require.Equal(t, dString, page1Min.String()) + require.Equal(t, fString, page1Max.String()) +} + +func getMinMax(t *testing.T, stats *datasetmd.Statistics) (min, max Value) { + t.Helper() + require.NotNil(t, stats) + + var minValue, maxValue Value + + require.NoError(t, minValue.UnmarshalBinary(stats.MinValue)) + require.NoError(t, maxValue.UnmarshalBinary(stats.MaxValue)) + + return minValue, maxValue +} diff --git a/pkg/dataobj/internal/dataset/page_builder.go b/pkg/dataobj/internal/dataset/page_builder.go index 3f16e9e6b2174..fa4963db1661b 100644 --- a/pkg/dataobj/internal/dataset/page_builder.go +++ b/pkg/dataobj/internal/dataset/page_builder.go @@ -46,6 +46,10 @@ type pageBuilder struct { rows int // Number of rows appended to the builder. values int // Number of non-NULL values appended to the builder. + + // minValue and maxValue track the minimum and maximum values appended to the + // page. These are used to compute statistics for the page if requested. + minValue, maxValue Value } // newPageBuilder creates a new pageBuilder that stores a sequence of [Value]s. @@ -95,6 +99,11 @@ func (b *pageBuilder) Append(value Value) bool { return false } + // Update min/max values for stats. We only do this for non-NULL values, + // otherwise NULL would always be the min for columns that contain a single + // NULL. + b.updateMinMax(value) + // The following calls won't fail; they only return errors when the // underlying writers fail, which ours cannot. if err := b.presenceEnc.Encode(Uint64Value(1)); err != nil { @@ -131,6 +140,25 @@ func (b *pageBuilder) AppendNull() bool { return true } +func (b *pageBuilder) updateMinMax(value Value) { + // As a small optimization, we only update min/max values if we're intending + // on populating the stats. This avoids unnecessary comparisons for very + // large values. + if !b.opts.StoreRangeStats { + return + } + + // We'll init minValue/maxValue if this is our first non-NULL value (b.values == 0). + // This allows us to only avoid comparing against NULL values, which would lead to + // NULL always being the min. + if b.values == 0 || CompareValues(value, b.minValue) < 0 { + b.minValue = value + } + if b.values == 0 || CompareValues(value, b.maxValue) > 0 { + b.maxValue = value + } +} + func valueSize(v Value) int { switch v.Type() { case datasetmd.VALUE_TYPE_INT64: @@ -220,15 +248,7 @@ func (b *pageBuilder) Flush() (*MemPage, error) { ValuesCount: b.values, Encoding: b.opts.Encoding, - - // TODO(rfratto): At the moment we don't compute stats because they're - // not going to be valuable in every scenario: the min/max values for log - // lines is less useful compared to the min/max values for timestamps. - // - // In the future, we may wish to add more options to pageBuilder to tell - // it to compute a subset of stats to avoid needing a second iteration - // over the page to compute them. - Stats: nil, + Stats: b.buildStats(), }, Data: finalData.Bytes(), @@ -238,6 +258,26 @@ func (b *pageBuilder) Flush() (*MemPage, error) { return &page, nil } +func (b *pageBuilder) buildStats() *datasetmd.Statistics { + if !b.opts.StoreRangeStats { + return nil + } + + minValueBytes, err := b.minValue.MarshalBinary() + if err != nil { + panic(fmt.Sprintf("pageBuilder.buildStats: failed to marshal min value: %s", err)) + } + maxValueBytes, err := b.maxValue.MarshalBinary() + if err != nil { + panic(fmt.Sprintf("pageBuilder.buildStats: failed to marshal max value: %s", err)) + } + + return &datasetmd.Statistics{ + MinValue: minValueBytes, + MaxValue: maxValueBytes, + } +} + // Reset resets the pageBuilder to a fresh state, allowing it to be reused. func (b *pageBuilder) Reset() { b.presenceBuffer.Reset() @@ -247,4 +287,6 @@ func (b *pageBuilder) Reset() { b.valuesEnc.Reset(b.valuesWriter) b.rows = 0 b.values = 0 + b.minValue = Value{} + b.maxValue = Value{} } diff --git a/pkg/dataobj/internal/dataset/value.go b/pkg/dataobj/internal/dataset/value.go index 02b66b9504bf6..6033d2b7327af 100644 --- a/pkg/dataobj/internal/dataset/value.go +++ b/pkg/dataobj/internal/dataset/value.go @@ -2,6 +2,7 @@ package dataset import ( "cmp" + "encoding/binary" "fmt" "unsafe" @@ -125,6 +126,73 @@ func (v Value) String() string { return v.Type().String() } +// MarshalBinary encodes v into a binary representation. Non-NULL values encode +// first with the type (encoded as uvarint), followed by an encoded value, +// where: +// +// - [datasetmd.VALUE_TYPE_INT64] encodes as a varint. +// - [datasetmd.VALUE_TYPE_UINT64] encodes as a uvarint. +// - [datasetmd.VALUE_TYPE_STRING] encodes the string as a sequence of bytes. +// +// NULL values encode as nil. +func (v Value) MarshalBinary() (data []byte, err error) { + if v.IsNil() { + return nil, nil + } + + buf := binary.AppendUvarint(nil, uint64(v.Type())) + + switch v.Type() { + case datasetmd.VALUE_TYPE_INT64: + buf = binary.AppendVarint(buf, v.Int64()) + case datasetmd.VALUE_TYPE_UINT64: + buf = binary.AppendUvarint(buf, v.Uint64()) + case datasetmd.VALUE_TYPE_STRING: + str := v.String() + buf = append(buf, unsafe.Slice(unsafe.StringData(str), len(str))...) + default: + return nil, fmt.Errorf("dataset.Value.MarshalBinary: unsupported type %s", v.Type()) + } + + return buf, nil +} + +// UnmarshalBinary decodes a Value from a binary representation. See +// [Value.MarshalBinary] for the encoding format. +func (v *Value) UnmarshalBinary(data []byte) error { + if len(data) == 0 { + *v = Value{} // NULL + return nil + } + + typ, n := binary.Uvarint(data) + if n <= 0 { + return fmt.Errorf("dataset.Value.UnmarshalBinary: invalid type") + } + + switch vtyp := datasetmd.ValueType(typ); vtyp { + case datasetmd.VALUE_TYPE_INT64: + val, n := binary.Varint(data[n:]) + if n <= 0 { + return fmt.Errorf("dataset.Value.UnmarshalBinary: invalid int64 value") + } + *v = Int64Value(val) + case datasetmd.VALUE_TYPE_UINT64: + val, n := binary.Uvarint(data[n:]) + if n <= 0 { + return fmt.Errorf("dataset.Value.UnmarshalBinary: invalid uint64 value") + } + *v = Uint64Value(val) + case datasetmd.VALUE_TYPE_STRING: + str := string(data[n:]) + *v = StringValue(str) + default: + return fmt.Errorf("dataset.Value.UnmarshalBinary: unsupported type %s", vtyp) + } + + return nil +} + // CompareValues returns -1 if a<b, 0 if a==b, or 1 if a>b. CompareValues // panics if a and b are not the same type. // diff --git a/pkg/dataobj/internal/dataset/value_test.go b/pkg/dataobj/internal/dataset/value_test.go new file mode 100644 index 0000000000000..1d273e62e4bc8 --- /dev/null +++ b/pkg/dataobj/internal/dataset/value_test.go @@ -0,0 +1,78 @@ +package dataset_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/dataset" + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" +) + +func TestValue_MarshalBinary(t *testing.T) { + t.Run("Null", func(t *testing.T) { + var expect dataset.Value + require.True(t, expect.IsNil()) + + b, err := expect.MarshalBinary() + require.NoError(t, err) + + var actual dataset.Value + require.NoError(t, actual.UnmarshalBinary(b)) + require.True(t, actual.IsNil()) + }) + + t.Run("Int64Value", func(t *testing.T) { + expect := dataset.Int64Value(-1234) + require.Equal(t, datasetmd.VALUE_TYPE_INT64, expect.Type()) + + b, err := expect.MarshalBinary() + require.NoError(t, err) + + var actual dataset.Value + require.NoError(t, actual.UnmarshalBinary(b)) + require.Equal(t, datasetmd.VALUE_TYPE_INT64, actual.Type()) + require.Equal(t, expect.Int64(), actual.Int64()) + }) + + t.Run("Uint64Value", func(t *testing.T) { + expect := dataset.Uint64Value(1234) + require.Equal(t, datasetmd.VALUE_TYPE_UINT64, expect.Type()) + + b, err := expect.MarshalBinary() + require.NoError(t, err) + + var actual dataset.Value + require.NoError(t, actual.UnmarshalBinary(b)) + require.Equal(t, datasetmd.VALUE_TYPE_UINT64, actual.Type()) + require.Equal(t, expect.Uint64(), actual.Uint64()) + }) + + t.Run("StringValue", func(t *testing.T) { + t.Run("Empty", func(t *testing.T) { + expect := dataset.StringValue("") + require.Equal(t, datasetmd.VALUE_TYPE_STRING, expect.Type()) + + b, err := expect.MarshalBinary() + require.NoError(t, err) + + var actual dataset.Value + require.NoError(t, actual.UnmarshalBinary(b)) + require.Equal(t, datasetmd.VALUE_TYPE_STRING, actual.Type()) + require.Equal(t, expect.String(), actual.String()) + }) + + t.Run("Non-empty", func(t *testing.T) { + expect := dataset.StringValue("hello, world!") + require.Equal(t, datasetmd.VALUE_TYPE_STRING, expect.Type()) + + b, err := expect.MarshalBinary() + require.NoError(t, err) + + var actual dataset.Value + require.NoError(t, actual.UnmarshalBinary(b)) + require.Equal(t, datasetmd.VALUE_TYPE_STRING, actual.Type()) + require.Equal(t, expect.String(), actual.String()) + }) + }) +} diff --git a/pkg/dataobj/internal/metadata/datasetmd/datasetmd.pb.go b/pkg/dataobj/internal/metadata/datasetmd/datasetmd.pb.go index daadac1693561..1920ffde67281 100644 --- a/pkg/dataobj/internal/metadata/datasetmd/datasetmd.pb.go +++ b/pkg/dataobj/internal/metadata/datasetmd/datasetmd.pb.go @@ -256,9 +256,17 @@ func (m *ColumnInfo) GetValuesCount() uint64 { // Statistics about a column or a page. All statistics are optional and are // conditionally set depending on the column type. type Statistics struct { - // Minimum value. + // Minimum value. Applications should only set min_value to an encoding of a + // non-NULL value. If there is no non-NULL value, min_value should be unset. + // + // Applications must not assume that an unset min_value means that the column + // is empty; check for values_count == 0 instead. MinValue []byte `protobuf:"bytes,1,opt,name=min_value,json=minValue,proto3" json:"min_value,omitempty"` - // Maximum value. + // Maximum value. Applications should only set max_value to an encoding of a + // non-NULL value. If there is no non-NULL value, max_value should be unset. + // + // Applications must not assume that an unset max_value means that the column + // is empty; check for values_count == 0 instead. MaxValue []byte `protobuf:"bytes,2,opt,name=max_value,json=maxValue,proto3" json:"max_value,omitempty"` } diff --git a/pkg/dataobj/internal/metadata/datasetmd/datasetmd.proto b/pkg/dataobj/internal/metadata/datasetmd/datasetmd.proto index 957e0be1fc5e7..450b5c241bcf7 100644 --- a/pkg/dataobj/internal/metadata/datasetmd/datasetmd.proto +++ b/pkg/dataobj/internal/metadata/datasetmd/datasetmd.proto @@ -73,10 +73,18 @@ enum CompressionType { // Statistics about a column or a page. All statistics are optional and are // conditionally set depending on the column type. message Statistics { - // Minimum value. + // Minimum value. Applications should only set min_value to an encoding of a + // non-NULL value. If there is no non-NULL value, min_value should be unset. + // + // Applications must not assume that an unset min_value means that the column + // is empty; check for values_count == 0 instead. bytes min_value = 1; - // Maximum value. + // Maximum value. Applications should only set max_value to an encoding of a + // non-NULL value. If there is no non-NULL value, max_value should be unset. + // + // Applications must not assume that an unset max_value means that the column + // is empty; check for values_count == 0 instead. bytes max_value = 2; } diff --git a/pkg/dataobj/internal/sections/logs/table.go b/pkg/dataobj/internal/sections/logs/table.go index 27508fd511403..aea082be064c2 100644 --- a/pkg/dataobj/internal/sections/logs/table.go +++ b/pkg/dataobj/internal/sections/logs/table.go @@ -117,10 +117,11 @@ func (b *tableBuffer) StreamID(pageSize int) *dataset.ColumnBuilder { } col, err := dataset.NewColumnBuilder("", dataset.BuilderOptions{ - PageSizeHint: pageSize, - Value: datasetmd.VALUE_TYPE_INT64, - Encoding: datasetmd.ENCODING_TYPE_DELTA, - Compression: datasetmd.COMPRESSION_TYPE_NONE, + PageSizeHint: pageSize, + Value: datasetmd.VALUE_TYPE_INT64, + Encoding: datasetmd.ENCODING_TYPE_DELTA, + Compression: datasetmd.COMPRESSION_TYPE_NONE, + StoreRangeStats: true, }) if err != nil { // We control the Value/Encoding tuple so this can't fail; if it does, @@ -140,10 +141,11 @@ func (b *tableBuffer) Timestamp(pageSize int) *dataset.ColumnBuilder { } col, err := dataset.NewColumnBuilder("", dataset.BuilderOptions{ - PageSizeHint: pageSize, - Value: datasetmd.VALUE_TYPE_INT64, - Encoding: datasetmd.ENCODING_TYPE_DELTA, - Compression: datasetmd.COMPRESSION_TYPE_NONE, + PageSizeHint: pageSize, + Value: datasetmd.VALUE_TYPE_INT64, + Encoding: datasetmd.ENCODING_TYPE_DELTA, + Compression: datasetmd.COMPRESSION_TYPE_NONE, + StoreRangeStats: true, }) if err != nil { // We control the Value/Encoding tuple so this can't fail; if it does, @@ -176,6 +178,7 @@ func (b *tableBuffer) Metadata(key string, pageSize int, compressionOpts dataset Encoding: datasetmd.ENCODING_TYPE_PLAIN, Compression: datasetmd.COMPRESSION_TYPE_ZSTD, CompressionOptions: compressionOpts, + StoreRangeStats: true, }) if err != nil { // We control the Value/Encoding tuple so this can't fail; if it does, @@ -206,6 +209,12 @@ func (b *tableBuffer) Message(pageSize int, compressionOpts dataset.CompressionO Encoding: datasetmd.ENCODING_TYPE_PLAIN, Compression: datasetmd.COMPRESSION_TYPE_ZSTD, CompressionOptions: compressionOpts, + + // We explicitly don't have range stats for the message column: + // + // A "min log line" and "max log line" isn't very valuable, and since log + // lines can be quite long, it would consume a fair amount of metadata. + StoreRangeStats: false, }) if err != nil { // We control the Value/Encoding tuple so this can't fail; if it does, diff --git a/pkg/dataobj/internal/sections/streams/streams.go b/pkg/dataobj/internal/sections/streams/streams.go index f9f4aeabd8124..518807e2104ec 100644 --- a/pkg/dataobj/internal/sections/streams/streams.go +++ b/pkg/dataobj/internal/sections/streams/streams.go @@ -226,10 +226,11 @@ func (s *Streams) EncodeTo(enc *encoding.Encoder) error { } builder, err := dataset.NewColumnBuilder(name, dataset.BuilderOptions{ - PageSizeHint: s.pageSize, - Value: datasetmd.VALUE_TYPE_STRING, - Encoding: datasetmd.ENCODING_TYPE_PLAIN, - Compression: datasetmd.COMPRESSION_TYPE_ZSTD, + PageSizeHint: s.pageSize, + Value: datasetmd.VALUE_TYPE_STRING, + Encoding: datasetmd.ENCODING_TYPE_PLAIN, + Compression: datasetmd.COMPRESSION_TYPE_ZSTD, + StoreRangeStats: true, }) if err != nil { return nil, fmt.Errorf("creating label column: %w", err) @@ -297,10 +298,11 @@ func (s *Streams) EncodeTo(enc *encoding.Encoder) error { func numberColumnBuilder(pageSize int) (*dataset.ColumnBuilder, error) { return dataset.NewColumnBuilder("", dataset.BuilderOptions{ - PageSizeHint: pageSize, - Value: datasetmd.VALUE_TYPE_INT64, - Encoding: datasetmd.ENCODING_TYPE_DELTA, - Compression: datasetmd.COMPRESSION_TYPE_NONE, + PageSizeHint: pageSize, + Value: datasetmd.VALUE_TYPE_INT64, + Encoding: datasetmd.ENCODING_TYPE_DELTA, + Compression: datasetmd.COMPRESSION_TYPE_NONE, + StoreRangeStats: true, }) }