From 5f00be8cc5899128d2f4a28ec84c303a156104ac Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Fri, 30 Aug 2024 16:42:49 +0200 Subject: [PATCH 01/10] Introduce new binary format for bloom index Got rid of backwards compatibility shenanigans and introduced a new version that includes the indexed fields. Signed-off-by: Christian Haudum --- pkg/storage/bloom/v1/bloom_builder.go | 2 + pkg/storage/bloom/v1/builder.go | 14 +- pkg/storage/bloom/v1/builder_test.go | 7 +- pkg/storage/bloom/v1/fuse.go | 2 +- pkg/storage/bloom/v1/index.go | 261 +++++------------- pkg/storage/bloom/v1/index_builder.go | 59 +--- pkg/storage/bloom/v1/index_querier.go | 4 +- pkg/storage/bloom/v1/index_test.go | 96 ++----- pkg/storage/bloom/v1/schema.go | 123 +++++++++ pkg/storage/bloom/v1/util.go | 16 -- pkg/storage/bloom/v1/versioned_builder.go | 131 ++------- .../bloom/v1/versioned_builder_test.go | 55 +--- 12 files changed, 265 insertions(+), 505 deletions(-) create mode 100644 pkg/storage/bloom/v1/schema.go diff --git a/pkg/storage/bloom/v1/bloom_builder.go b/pkg/storage/bloom/v1/bloom_builder.go index ea54ba248f7c4..2d75fc224d521 100644 --- a/pkg/storage/bloom/v1/bloom_builder.go +++ b/pkg/storage/bloom/v1/bloom_builder.go @@ -46,6 +46,8 @@ func (b *BloomBlockBuilder) Append(bloom *Bloom) (BloomOffset, error) { } } + // version := b.opts.Schema.version + b.scratch.Reset() if err := bloom.Encode(b.scratch); err != nil { return BloomOffset{}, errors.Wrap(err, "encoding bloom") diff --git a/pkg/storage/bloom/v1/builder.go b/pkg/storage/bloom/v1/builder.go index 56882c4cb140a..6546f1afcb95c 100644 --- a/pkg/storage/bloom/v1/builder.go +++ b/pkg/storage/bloom/v1/builder.go @@ -66,6 +66,18 @@ func (b BlockOptions) Encode(enc *encoding.Encbuf) { enc.PutBE64(b.BlockSize) } +// func NewDefaultBlockOptions(maxBlockSizeBytes, maxBloomSizeBytes uint64) BlockOptions { +// opts := NewBlockOptionsFromSchema(Schema{ +// version: DefaultSchemaVersion, +// encoding: chunkenc.EncNone, +// nGramLength: 0, +// nGramSkip: 0, +// }) +// opts.BlockSize = maxBlockSizeBytes +// opts.UnencodedBlockOptions.MaxBloomSizeBytes = maxBloomSizeBytes +// return opts +// } + func NewBlockOptions(enc chunkenc.Encoding, nGramLength, nGramSkip, maxBlockSizeBytes, maxBloomSizeBytes uint64) BlockOptions { opts := NewBlockOptionsFromSchema(Schema{ version: DefaultSchemaVersion, @@ -289,7 +301,7 @@ func (mb *MergeBuilder) processNextSeries( bytesAdded += bloom.SourceBytesAdded } - done, err := builder.AddSeries(*nextInStore, offsets) + done, err := builder.AddSeries(*nextInStore, offsets, []Field{Field("__line__")}) if err != nil { return nil, bytesAdded, 0, false, false, errors.Wrap(err, "committing series") } diff --git a/pkg/storage/bloom/v1/builder_test.go b/pkg/storage/bloom/v1/builder_test.go index 6abed637d7c79..2eb7a0f797d64 100644 --- a/pkg/storage/bloom/v1/builder_test.go +++ b/pkg/storage/bloom/v1/builder_test.go @@ -24,11 +24,11 @@ var blockEncodings = []chunkenc.Encoding{ chunkenc.EncZstd, } -func TestBlockOptionsRoundTrip(t *testing.T) { +func TestBlockOptions_RoundTrip(t *testing.T) { t.Parallel() opts := BlockOptions{ Schema: Schema{ - version: V1, + version: DefaultSchemaVersion, encoding: chunkenc.EncSnappy, nGramLength: 10, nGramSkip: 2, @@ -548,9 +548,8 @@ func TestMergeBuilder_Roundtrip(t *testing.T) { builder, err := NewBlockBuilder(blockOpts, writer) require.Nil(t, err) - checksum, _, err := mb.Build(builder) + _, _, err = mb.Build(builder) require.Nil(t, err) - require.Equal(t, uint32(0x2a6cdba6), checksum) // ensure the new block contains one copy of all the data // by comparing it against an iterator over the source data diff --git a/pkg/storage/bloom/v1/fuse.go b/pkg/storage/bloom/v1/fuse.go index ace67b0496c2a..1ed9016bca046 100644 --- a/pkg/storage/bloom/v1/fuse.go +++ b/pkg/storage/bloom/v1/fuse.go @@ -253,7 +253,7 @@ func (fq *FusedQuerier) Run() error { return nil } -func (fq *FusedQuerier) runSeries(schema Schema, series *SeriesWithOffsets, reqs []Request) { +func (fq *FusedQuerier) runSeries(schema Schema, series *SeriesWithMeta, reqs []Request) { // For a given chunk|series to be removed, it must fail to match all blooms. // Because iterating/loading blooms can be expensive, we iterate blooms one at a time, collecting // the removals (failures) for each (bloom, chunk) pair. diff --git a/pkg/storage/bloom/v1/index.go b/pkg/storage/bloom/v1/index.go index 8cae1d8a87f1e..3855063a71180 100644 --- a/pkg/storage/bloom/v1/index.go +++ b/pkg/storage/bloom/v1/index.go @@ -2,97 +2,16 @@ package v1 import ( "bytes" - "fmt" "io" "sort" "github.com/pkg/errors" "github.com/prometheus/common/model" - "github.com/grafana/loki/v3/pkg/chunkenc" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/util/encoding" ) -type Schema struct { - version Version - encoding chunkenc.Encoding - nGramLength, nGramSkip uint64 -} - -func (s Schema) String() string { - return fmt.Sprintf("v%d,encoding=%s,ngram=%d,skip=%d", s.version, s.encoding, s.nGramLength, s.nGramSkip) -} - -func (s Schema) Compatible(other Schema) bool { - return s == other -} - -func (s Schema) NGramLen() int { - return int(s.nGramLength) -} - -func (s Schema) NGramSkip() int { - return int(s.nGramSkip) -} - -// byte length -func (s Schema) Len() int { - // magic number + version + encoding + ngram length + ngram skip - return 4 + 1 + 1 + 8 + 8 -} - -func (s *Schema) DecompressorPool() chunkenc.ReaderPool { - return chunkenc.GetReaderPool(s.encoding) -} - -func (s *Schema) CompressorPool() chunkenc.WriterPool { - return chunkenc.GetWriterPool(s.encoding) -} - -func (s *Schema) Encode(enc *encoding.Encbuf) { - enc.Reset() - enc.PutBE32(magicNumber) - enc.PutByte(byte(s.version)) - enc.PutByte(byte(s.encoding)) - enc.PutBE64(s.nGramLength) - enc.PutBE64(s.nGramSkip) - -} - -func (s *Schema) DecodeFrom(r io.ReadSeeker) error { - // TODO(owen-d): improve allocations - schemaBytes := make([]byte, s.Len()) - _, err := io.ReadFull(r, schemaBytes) - if err != nil { - return errors.Wrap(err, "reading schema") - } - - dec := encoding.DecWith(schemaBytes) - return s.Decode(&dec) -} - -func (s *Schema) Decode(dec *encoding.Decbuf) error { - number := dec.Be32() - if number != magicNumber { - return errors.Errorf("invalid magic number. expected %x, got %x", magicNumber, number) - } - s.version = Version(dec.Byte()) - if s.version != 1 && s.version != 2 { - return errors.Errorf("invalid version. expected %d, got %d", 1, s.version) - } - - s.encoding = chunkenc.Encoding(dec.Byte()) - if _, err := chunkenc.ParseEncoding(s.encoding.String()); err != nil { - return errors.Wrap(err, "parsing encoding") - } - - s.nGramLength = dec.Be64() - s.nGramSkip = dec.Be64() - - return dec.Err() -} - // Block index is a set of series pages along with // the headers for each page type BlockIndex struct { @@ -275,7 +194,7 @@ type SeriesPageDecoder struct { // state i int // current index - cur *SeriesWithOffsets + cur *SeriesWithMeta err error previousFp model.Fingerprint // previous series' fingerprint for delta-decoding previousOffset BloomOffset // previous series' bloom offset for delta-decoding @@ -300,8 +219,8 @@ func (d *SeriesPageDecoder) Next() bool { return false } - var res SeriesWithOffsets - d.previousFp, d.previousOffset, d.err = res.Decode(d.schema.version, &d.dec, d.previousFp, d.previousOffset) + var res SeriesWithMeta + d.previousFp, d.previousOffset, d.err = res.Decode(&d.dec, d.schema.version, d.previousFp, d.previousOffset) if d.err != nil { return false } @@ -350,7 +269,7 @@ func (d *SeriesPageDecoder) Seek(fp model.Fingerprint) { } } -func (d *SeriesPageDecoder) At() (res *SeriesWithOffsets) { +func (d *SeriesPageDecoder) At() (res *SeriesWithMeta) { return d.cur } @@ -361,25 +280,31 @@ func (d *SeriesPageDecoder) Err() error { return d.dec.Err() } +// series encoding/decoding -------------------------------------------------- + type Series struct { Fingerprint model.Fingerprint Chunks ChunkRefs } -// SeriesWithOffsets is a series with a a variable number -// of bloom offsets. Used in v2+ to store blooms for larger series -// in parts -type SeriesWithOffsets struct { +type Meta struct { + Fields Fields Offsets []BloomOffset +} + +// SeriesWithMeta is a series with a a variable number of bloom offsets. +// Used in v2+ to store blooms for larger series in parts +type SeriesWithMeta struct { Series + Meta } -func (s *SeriesWithOffsets) Encode( +func (s *SeriesWithMeta) Encode( enc *encoding.Encbuf, + version Version, previousFp model.Fingerprint, previousOffset BloomOffset, ) BloomOffset { - sort.Sort(s.Chunks) // ensure order // delta encode fingerprint enc.PutUvarint64(uint64(s.Fingerprint - previousFp)) // encode number of bloom offsets in this series @@ -388,132 +313,114 @@ func (s *SeriesWithOffsets) Encode( lastOffset := previousOffset for _, offset := range s.Offsets { // delta encode offsets. - // Multiple offsets per series is a v2+ feature with different encoding implementation, - // so we signal that to the encoder - offset.Encode(enc, V2, lastOffset) + offset.Encode(enc, version, lastOffset) lastOffset = offset } // encode chunks using delta encoded timestamps var lastEnd model.Time enc.PutUvarint(len(s.Chunks)) + sort.Sort(s.Chunks) // ensure order for _, chunk := range s.Chunks { - lastEnd = chunk.Encode(enc, lastEnd) + lastEnd = chunk.Encode(enc, version, lastEnd) + } + + enc.PutUvarint(len(s.Fields)) + sort.Sort(s.Fields) // ensure order + for _, field := range s.Fields { + field.Encode(enc, version) } return lastOffset } -func (s *SeriesWithOffsets) Decode( - version Version, +func (s *SeriesWithMeta) Decode( dec *encoding.Decbuf, + version Version, previousFp model.Fingerprint, previousOffset BloomOffset, ) (model.Fingerprint, BloomOffset, error) { - // Since *SeriesWithOffsets is is still representable by the v1 schema as a len=1 offset group, - // we can decode it even though multiple offsets were introduced in v2 - if version == V1 { - return s.decodeV1(dec, previousFp, previousOffset) + if version < V3 { + return 0, BloomOffset{}, errUnsupportedSchemaVersion } s.Fingerprint = previousFp + model.Fingerprint(dec.Uvarint64()) numOffsets := dec.Uvarint() - s.Offsets = make([]BloomOffset, numOffsets) var ( err error lastEnd model.Time lastOffset = previousOffset ) + + s.Offsets = make([]BloomOffset, numOffsets) for i := range s.Offsets { // SeriesWithOffsets is a v2+ feature with multiple bloom offsets per series // so we signal that to the decoder - err = s.Offsets[i].Decode(dec, V2, lastOffset) + err = s.Offsets[i].Decode(dec, version, lastOffset) lastOffset = s.Offsets[i] if err != nil { return 0, BloomOffset{}, errors.Wrapf(err, "decoding %dth bloom offset", i) } } - // TODO(owen-d): use pool s.Chunks = make([]ChunkRef, dec.Uvarint()) for i := range s.Chunks { - lastEnd, err = s.Chunks[i].Decode(dec, lastEnd) + lastEnd, err = s.Chunks[i].Decode(dec, version, lastEnd) if err != nil { return 0, BloomOffset{}, errors.Wrapf(err, "decoding %dth chunk", i) } } + + s.Fields = make([]Field, dec.Uvarint()) + for i := range s.Fields { + err = s.Fields[i].Decode(dec, version) + if err != nil { + return 0, BloomOffset{}, errors.Wrapf(err, "decoding %dth field", i) + } + } + return s.Fingerprint, lastOffset, dec.Err() } -// Decodes a v2 compatible series from a v1 encoding -func (s *SeriesWithOffsets) decodeV1( - dec *encoding.Decbuf, - previousFp model.Fingerprint, - previousOffset BloomOffset, -) (model.Fingerprint, BloomOffset, error) { - var single SeriesWithOffset - fp, last, err := single.Decode(dec, previousFp, previousOffset) - if err != nil { - return 0, BloomOffset{}, errors.Wrap(err, "decoding series with offset") - } - s.Offsets = []BloomOffset{last} - s.Series = single.Series - return fp, last, nil +// field encoding/decoding --------------------------------------------------- + +type Field []byte // key of an indexed structured metadata field + +func (f *Field) Encode(enc *encoding.Encbuf, _ Version) { + enc.PutUvarintBytes(*f) } -// Used in v1 schema -type SeriesWithOffset struct { - Offset BloomOffset - Series +func (f *Field) Decode(dec *encoding.Decbuf, _ Version) error { + *f = Field(dec.UvarintBytes()) + return dec.Err() } -func (s *SeriesWithOffset) Encode( - enc *encoding.Encbuf, - previousFp model.Fingerprint, - previousOffset BloomOffset, -) (model.Fingerprint, BloomOffset) { - sort.Sort(s.Chunks) // ensure order - // delta encode fingerprint - enc.PutBE64(uint64(s.Fingerprint - previousFp)) - // delta encode offsets - // V1 only has 1 offset per series which has a legacy encoding scheme; - // we signal that to the encoder - s.Offset.Encode(enc, V1, previousOffset) +func (f *Field) String() string { + return string(*f) +} - // encode chunks using delta encoded timestamps - var lastEnd model.Time - enc.PutUvarint(len(s.Chunks)) - for _, chunk := range s.Chunks { - lastEnd = chunk.Encode(enc, lastEnd) - } +func (f *Field) Less(other Field) bool { + // avoid string allocations + return string(*f) < string(other) +} + +type Fields []Field - return s.Fingerprint, s.Offset +func (f Fields) Len() int { + return len(f) } -func (s *SeriesWithOffset) Decode(dec *encoding.Decbuf, previousFp model.Fingerprint, previousOffset BloomOffset) (model.Fingerprint, BloomOffset, error) { - s.Fingerprint = previousFp + model.Fingerprint(dec.Be64()) - // V1 only has 1 offset per series which has a legacy encoding scheme; - // we signal that to the decoder - if err := s.Offset.Decode(dec, V1, previousOffset); err != nil { - return 0, BloomOffset{}, errors.Wrap(err, "decoding bloom offset") - } +func (f Fields) Less(i, j int) bool { + return f[i].Less(f[j]) +} - // TODO(owen-d): use pool - s.Chunks = make([]ChunkRef, dec.Uvarint()) - var ( - err error - lastEnd model.Time - ) - for i := range s.Chunks { - lastEnd, err = s.Chunks[i].Decode(dec, lastEnd) - if err != nil { - return 0, BloomOffset{}, errors.Wrapf(err, "decoding %dth chunk", i) - } - } - return s.Fingerprint, s.Offset, dec.Err() +func (f Fields) Swap(i, j int) { + f[i], f[j] = f[j], f[i] } +// chunk encoding/decoding --------------------------------------------------- + type ChunkRef logproto.ShortRef func (r *ChunkRef) Less(other ChunkRef) bool { @@ -540,7 +447,7 @@ func (r *ChunkRef) Cmp(other ChunkRef) int { return int(r.Checksum) - int(other.Checksum) } -func (r *ChunkRef) Encode(enc *encoding.Encbuf, previousEnd model.Time) model.Time { +func (r *ChunkRef) Encode(enc *encoding.Encbuf, _ Version, previousEnd model.Time) model.Time { // delta encode start time enc.PutVarint64(int64(r.From - previousEnd)) enc.PutVarint64(int64(r.Through - r.From)) @@ -548,7 +455,7 @@ func (r *ChunkRef) Encode(enc *encoding.Encbuf, previousEnd model.Time) model.Ti return r.Through } -func (r *ChunkRef) Decode(dec *encoding.Decbuf, previousEnd model.Time) (model.Time, error) { +func (r *ChunkRef) Decode(dec *encoding.Decbuf, _ Version, previousEnd model.Time) (model.Time, error) { r.From = previousEnd + model.Time(dec.Varint64()) r.Through = r.From + model.Time(dec.Varint64()) r.Checksum = dec.Be32() @@ -560,33 +467,15 @@ type BloomOffset struct { ByteOffset int // offset to beginning of bloom within page } -func (o *BloomOffset) Encode(enc *encoding.Encbuf, v Version, previousOffset BloomOffset) { +func (o *BloomOffset) Encode(enc *encoding.Encbuf, _ Version, previousOffset BloomOffset) { // page offsets diffs are always ascending enc.PutUvarint(o.Page - previousOffset.Page) - - switch v { - case V1: - // V1 uses UVarint for bloom offset deltas. This is fine because there is only 1 bloom per series in v1 - enc.PutUvarint(o.ByteOffset - previousOffset.ByteOffset) - default: - // V2 encodes multiple bloom offsets per series and successive blooms may belong to - // separate bloom pages. Therefore, we use Varint64 for byte offset deltas as - // byteOffsets will not be ascending when a new bloom page is written. - enc.PutVarint64(int64(o.ByteOffset - previousOffset.ByteOffset)) - } + enc.PutVarint64(int64(o.ByteOffset - previousOffset.ByteOffset)) } -func (o *BloomOffset) Decode(dec *encoding.Decbuf, v Version, previousOffset BloomOffset) error { +func (o *BloomOffset) Decode(dec *encoding.Decbuf, _ Version, previousOffset BloomOffset) error { o.Page = previousOffset.Page + dec.Uvarint() - - // Explained by the Encode method - switch v { - case V1: - o.ByteOffset = previousOffset.ByteOffset + dec.Uvarint() - default: - o.ByteOffset = previousOffset.ByteOffset + int(dec.Varint64()) - } - + o.ByteOffset = previousOffset.ByteOffset + int(dec.Varint64()) return dec.Err() } diff --git a/pkg/storage/bloom/v1/index_builder.go b/pkg/storage/bloom/v1/index_builder.go index 36c74a9d87ab1..067a79ad03f4e 100644 --- a/pkg/storage/bloom/v1/index_builder.go +++ b/pkg/storage/bloom/v1/index_builder.go @@ -46,18 +46,20 @@ func (b *IndexBuilder) WriteOpts() error { return nil } -func (b *IndexBuilder) AppendV2(series SeriesWithOffsets) error { +func (b *IndexBuilder) Append(series SeriesWithMeta) error { if !b.writtenSchema { if err := b.WriteOpts(); err != nil { return errors.Wrap(err, "appending series") } } + version := b.opts.Schema.version + b.scratch.Reset() // we don't want to update the previous pointers yet in case // we need to flush the page first which would // be passed the incorrect final fp/offset - lastOffset := series.Encode(b.scratch, b.previousFp, b.previousOffset) + lastOffset := series.Encode(b.scratch, version, b.previousFp, b.previousOffset) if !b.page.SpaceFor(b.scratch.Len()) && b.page.Count() > 0 { if err := b.flushPage(); err != nil { @@ -66,7 +68,7 @@ func (b *IndexBuilder) AppendV2(series SeriesWithOffsets) error { // re-encode now that a new page has been cut and we use delta-encoding b.scratch.Reset() - lastOffset = series.Encode(b.scratch, b.previousFp, b.previousOffset) + lastOffset = series.Encode(b.scratch, version, b.previousFp, b.previousOffset) } switch { @@ -95,57 +97,6 @@ func (b *IndexBuilder) AppendV2(series SeriesWithOffsets) error { return nil } -func (b *IndexBuilder) AppendV1(series SeriesWithOffset) error { - if !b.writtenSchema { - if err := b.WriteOpts(); err != nil { - return errors.Wrap(err, "appending series") - } - } - - b.scratch.Reset() - // we don't want to update the previous pointers yet in case - // we need to flush the page first which would - // be passed the incorrect final fp/offset - previousFp, previousOffset := series.Encode(b.scratch, b.previousFp, b.previousOffset) - - if !b.page.SpaceFor(b.scratch.Len()) { - if err := b.flushPage(); err != nil { - return errors.Wrap(err, "flushing series page") - } - - // re-encode now that a new page has been cut and we use delta-encoding - b.scratch.Reset() - previousFp, previousOffset = series.Encode(b.scratch, b.previousFp, b.previousOffset) - } - b.previousFp = previousFp - b.previousOffset = previousOffset - - switch { - case b.page.Count() == 0: - // Special case: this is the first series in a page - if len(series.Chunks) < 1 { - return fmt.Errorf("series with zero chunks for fingerprint %v", series.Fingerprint) - } - b.fromFp = series.Fingerprint - b.fromTs, b.throughTs = chkBounds(series.Chunks) - case b.previousFp > series.Fingerprint: - return fmt.Errorf("out of order series fingerprint for series %v", series.Fingerprint) - default: - from, through := chkBounds(series.Chunks) - if b.fromTs.After(from) { - b.fromTs = from - } - if b.throughTs.Before(through) { - b.throughTs = through - } - } - - _ = b.page.Add(b.scratch.Get()) - b.previousFp = series.Fingerprint - b.previousOffset = series.Offset - return nil -} - // must be > 1 func chkBounds(chks []ChunkRef) (from, through model.Time) { from, through = chks[0].From, chks[0].Through diff --git a/pkg/storage/bloom/v1/index_querier.go b/pkg/storage/bloom/v1/index_querier.go index 7fdaa4617571f..fe05f7bcddfda 100644 --- a/pkg/storage/bloom/v1/index_querier.go +++ b/pkg/storage/bloom/v1/index_querier.go @@ -10,7 +10,7 @@ import ( ) type SeriesIterator interface { - iter.Iterator[*SeriesWithOffset] + iter.Iterator[*SeriesWithMeta] Reset() } @@ -138,7 +138,7 @@ func (it *LazySeriesIter) next() bool { return false } -func (it *LazySeriesIter) At() *SeriesWithOffsets { +func (it *LazySeriesIter) At() *SeriesWithMeta { return it.curPage.At() } diff --git a/pkg/storage/bloom/v1/index_test.go b/pkg/storage/bloom/v1/index_test.go index 8b3a078bc0c46..54d7459c50dad 100644 --- a/pkg/storage/bloom/v1/index_test.go +++ b/pkg/storage/bloom/v1/index_test.go @@ -9,8 +9,6 @@ import ( "github.com/grafana/loki/v3/pkg/util/encoding" ) -var SupportedVersions = []Version{V1, V2} - func TestBloomOffsetEncoding(t *testing.T) { for _, v := range SupportedVersions { t.Run(v.String(), func(t *testing.T) { @@ -28,9 +26,10 @@ func TestBloomOffsetEncoding(t *testing.T) { } -func TestSeriesEncoding_V1(t *testing.T) { +func TestSeriesEncoding_V3(t *testing.T) { t.Parallel() - src := SeriesWithOffset{ + version := V3 + src := SeriesWithMeta{ Series: Series{ Fingerprint: model.Fingerprint(1), Chunks: []ChunkRef{ @@ -46,93 +45,34 @@ func TestSeriesEncoding_V1(t *testing.T) { }, }, }, - Offset: BloomOffset{Page: 2, ByteOffset: 3}, - } - - enc := &encoding.Encbuf{} - src.Encode(enc, 0, BloomOffset{}) - - dec := encoding.DecWith(enc.Get()) - var dst SeriesWithOffset - fp, offset, err := dst.Decode(&dec, 0, BloomOffset{}) - require.Nil(t, err) - require.Equal(t, src.Fingerprint, fp) - require.Equal(t, src.Offset, offset) - require.Equal(t, src, dst) -} - -func TestSeriesEncoding_V2(t *testing.T) { - t.Parallel() - src := SeriesWithOffsets{ - Series: Series{ - Fingerprint: model.Fingerprint(1), - Chunks: []ChunkRef{ - { - From: 1, - Through: 2, - Checksum: 3, - }, - { - From: 4, - Through: 5, - Checksum: 6, - }, + Meta: Meta{ + Offsets: []BloomOffset{ + {Page: 0, ByteOffset: 0}, + {Page: 0, ByteOffset: 100}, + {Page: 1, ByteOffset: 2}, + {Page: 2, ByteOffset: 1}, + }, + Fields: []Field{ + Field("foo"), + Field("bar"), }, - }, - Offsets: []BloomOffset{ - {Page: 0, ByteOffset: 0}, - {Page: 0, ByteOffset: 100}, - {Page: 1, ByteOffset: 2}, - {Page: 2, ByteOffset: 1}, }, } enc := &encoding.Encbuf{} - src.Encode(enc, 0, BloomOffset{}) + src.Encode(enc, version, 0, BloomOffset{}) dec := encoding.DecWith(enc.Get()) - var dst SeriesWithOffsets - fp, offset, err := dst.Decode(V2, &dec, 0, BloomOffset{}) + var dst SeriesWithMeta + fp, offset, err := dst.Decode(&dec, version, 0, BloomOffset{}) require.Nil(t, err) require.Equal(t, src.Fingerprint, fp) require.Equal(t, src.Offsets[len(src.Offsets)-1], offset) + require.Equal(t, src.Offsets, dst.Offsets) + require.Equal(t, src.Fields, dst.Fields) require.Equal(t, src, dst) } -func TestV2SeriesDecodesV1(t *testing.T) { - t.Parallel() - src := SeriesWithOffset{ - Series: Series{ - Fingerprint: model.Fingerprint(1), - Chunks: []ChunkRef{ - { - From: 1, - Through: 2, - Checksum: 3, - }, - { - From: 4, - Through: 5, - Checksum: 6, - }, - }, - }, - Offset: BloomOffset{Page: 1, ByteOffset: 2}, - } - - enc := &encoding.Encbuf{} - src.Encode(enc, 0, BloomOffset{}) - - dec := encoding.DecWith(enc.Get()) - var dst SeriesWithOffsets - fp, offset, err := dst.decodeV1(&dec, 0, BloomOffset{}) - require.Nil(t, err) - require.Equal(t, src.Fingerprint, fp) - require.Equal(t, src.Offset, offset) - require.Equal(t, []BloomOffset{src.Offset}, dst.Offsets) - require.Equal(t, src.Series, dst.Series) -} - func TestChunkRefCmpLess(t *testing.T) { t.Parallel() for _, tc := range []struct { diff --git a/pkg/storage/bloom/v1/schema.go b/pkg/storage/bloom/v1/schema.go new file mode 100644 index 0000000000000..58f622b86b10d --- /dev/null +++ b/pkg/storage/bloom/v1/schema.go @@ -0,0 +1,123 @@ +package v1 + +import ( + "fmt" + "io" + + "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/util/encoding" + "github.com/pkg/errors" +) + +type Version byte + +func (v Version) String() string { + return fmt.Sprintf("v%d", v) +} + +const ( + // Add new versions below + V1 Version = iota + // V2 supports single series blooms encoded over multiple pages + // to accommodate larger single series + V2 + // V2 indicated schema for indexed structured metadata + V3 + + DefaultSchemaVersion = V3 +) + +var ( + SupportedVersions = []Version{V3} + + errInvalidSchemaVerunkion = errors.New("invalid schema version") + errUnsupportedSchemaVersion = errors.New("unsupported schema version") +) + +type Schema struct { + version Version + encoding chunkenc.Encoding + nGramLength, nGramSkip uint64 +} + +func NewSchema() Schema { + return Schema{ + version: DefaultSchemaVersion, + encoding: chunkenc.EncNone, + nGramLength: 0, + nGramSkip: 0, + } +} + +func (s Schema) String() string { + return fmt.Sprintf("%s,encoding=%s,ngram=%d,skip=%d", s.version, s.encoding, s.nGramLength, s.nGramSkip) +} + +func (s Schema) Compatible(other Schema) bool { + return s == other +} + +func (s Schema) NGramLen() int { + return int(s.nGramLength) +} + +func (s Schema) NGramSkip() int { + return int(s.nGramSkip) +} + +// byte length +func (s Schema) Len() int { + // magic number + version + encoding + ngram length + ngram skip + return 4 + 1 + 1 + 8 + 8 +} + +func (s *Schema) DecompressorPool() chunkenc.ReaderPool { + return chunkenc.GetReaderPool(s.encoding) +} + +func (s *Schema) CompressorPool() chunkenc.WriterPool { + return chunkenc.GetWriterPool(s.encoding) +} + +func (s *Schema) Encode(enc *encoding.Encbuf) { + enc.Reset() + enc.PutBE32(magicNumber) + enc.PutByte(byte(s.version)) + enc.PutByte(byte(s.encoding)) + enc.PutBE64(s.nGramLength) + enc.PutBE64(s.nGramSkip) + +} + +func (s *Schema) DecodeFrom(r io.ReadSeeker) error { + // TODO(owen-d): improve allocations + schemaBytes := make([]byte, s.Len()) + _, err := io.ReadFull(r, schemaBytes) + if err != nil { + return errors.Wrap(err, "reading schema") + } + + dec := encoding.DecWith(schemaBytes) + return s.Decode(&dec) +} + +func (s *Schema) Decode(dec *encoding.Decbuf) error { + number := dec.Be32() + if number != magicNumber { + return errors.Errorf("invalid magic number. expected %x, got %x", magicNumber, number) + } + s.version = Version(dec.Byte()) + if s.version != V3 { + return errors.Errorf("invalid version. expected %d, got %d", 3, s.version) + } + + s.encoding = chunkenc.Encoding(dec.Byte()) + if _, err := chunkenc.ParseEncoding(s.encoding.String()); err != nil { + return errors.Wrap(err, "parsing encoding") + } + + s.nGramLength = dec.Be64() + s.nGramSkip = dec.Be64() + + return dec.Err() +} diff --git a/pkg/storage/bloom/v1/util.go b/pkg/storage/bloom/v1/util.go index ec46d2633b7ad..f5169f01ef19d 100644 --- a/pkg/storage/bloom/v1/util.go +++ b/pkg/storage/bloom/v1/util.go @@ -1,7 +1,6 @@ package v1 import ( - "fmt" "hash" "hash/crc32" "io" @@ -10,23 +9,8 @@ import ( "github.com/grafana/loki/v3/pkg/util/mempool" ) -type Version byte - -func (v Version) String() string { - return fmt.Sprintf("v%d", v) -} - const ( magicNumber = uint32(0xCA7CAFE5) - // Add new versions below - V1 Version = iota - // V2 supports single series blooms encoded over multiple pages - // to accommodate larger single series - V2 -) - -const ( - DefaultSchemaVersion = V2 ) var ( diff --git a/pkg/storage/bloom/v1/versioned_builder.go b/pkg/storage/bloom/v1/versioned_builder.go index 175d651dc460c..9545951840571 100644 --- a/pkg/storage/bloom/v1/versioned_builder.go +++ b/pkg/storage/bloom/v1/versioned_builder.go @@ -8,8 +8,8 @@ import ( /* Each binary format (version) has it's own builder. This provides type-safe way to build the binary format -while allowing reuse of underlying logic. As an example, the V2Builder will prevent encoding v1 series (only 1 bloom per series) -as it only provides methods that are v2 compatible. The opposite is also true. +while allowing reuse of underlying logic. As an example, the V3Builder will prevent encoding v1 and v2 series +as it only provides methods that are v3 compatible. The opposite is also true. Builders provide the following methods: - [Convenience method] BuildFrom: builds the binary format from an iterator of the relevant type. @@ -21,14 +21,14 @@ Builders provide the following methods: */ // Convenience constructor targeting the most current version. -func NewBlockBuilder(opts BlockOptions, writer BlockWriter) (*V2Builder, error) { - return NewBlockBuilderV2(opts, writer) +func NewBlockBuilder(opts BlockOptions, writer BlockWriter) (*BlockBuilder, error) { + return NewBlockBuilderV3(opts, writer) } // Convenience alias for the most current version. -type BlockBuilder = V2Builder +type BlockBuilder = V3Builder -type V2Builder struct { +type V3Builder struct { opts BlockOptions writer BlockWriter @@ -41,9 +41,9 @@ type SeriesWithBlooms struct { Blooms iter.SizedIterator[*Bloom] } -func NewBlockBuilderV2(opts BlockOptions, writer BlockWriter) (*V2Builder, error) { - if opts.Schema.version != V2 { - return nil, errors.Errorf("schema mismatch creating v2 builder, expected %v, got %v", V2, opts.Schema.version) +func NewBlockBuilderV3(opts BlockOptions, writer BlockWriter) (*V3Builder, error) { + if opts.Schema.version != V3 { + return nil, errors.Errorf("schema mismatch creating builder, expected v3, got %v", opts.Schema.version) } index, err := writer.Index() @@ -55,7 +55,7 @@ func NewBlockBuilderV2(opts BlockOptions, writer BlockWriter) (*V2Builder, error return nil, errors.Wrap(err, "initializing blooms writer") } - return &V2Builder{ + return &V3Builder{ opts: opts, writer: writer, index: NewIndexBuilder(opts, index), @@ -63,7 +63,7 @@ func NewBlockBuilderV2(opts BlockOptions, writer BlockWriter) (*V2Builder, error }, nil } -func (b *V2Builder) BuildFrom(itr iter.Iterator[SeriesWithBlooms]) (uint32, error) { +func (b *V3Builder) BuildFrom(itr iter.Iterator[SeriesWithBlooms]) (uint32, error) { for itr.Next() { at := itr.At() var offsets []BloomOffset @@ -78,7 +78,7 @@ func (b *V2Builder) BuildFrom(itr iter.Iterator[SeriesWithBlooms]) (uint32, erro if err := at.Blooms.Err(); err != nil { return 0, errors.Wrap(err, "iterating blooms") } - blockFull, err := b.AddSeries(*at.Series, offsets) + blockFull, err := b.AddSeries(*at.Series, offsets, []Field{Field("__line__")}) if err != nil { return 0, errors.Wrapf(err, "writing series") } @@ -94,7 +94,7 @@ func (b *V2Builder) BuildFrom(itr iter.Iterator[SeriesWithBlooms]) (uint32, erro return b.Close() } -func (b *V2Builder) Close() (uint32, error) { +func (b *V3Builder) Close() (uint32, error) { bloomChecksum, err := b.blooms.Close() if err != nil { return 0, errors.Wrap(err, "closing bloom file") @@ -106,109 +106,18 @@ func (b *V2Builder) Close() (uint32, error) { return combineChecksums(indexCheckSum, bloomChecksum), nil } -func (b *V2Builder) AddBloom(bloom *Bloom) (BloomOffset, error) { +func (b *V3Builder) AddBloom(bloom *Bloom) (BloomOffset, error) { return b.blooms.Append(bloom) } // AddSeries adds a series to the block. It returns true after adding the series, the block is full. -func (b *V2Builder) AddSeries(series Series, offsets []BloomOffset) (bool, error) { - if err := b.index.AppendV2(SeriesWithOffsets{ - Offsets: offsets, - Series: series, - }); err != nil { - return false, errors.Wrapf(err, "writing index for series %v", series.Fingerprint) - } - - full, _, err := b.writer.Full(b.opts.BlockSize) - if err != nil { - return false, errors.Wrap(err, "checking if block is full") - } - - return full, nil -} - -// Now the same for legacy V1 -type SeriesWithBloom struct { - Series *Series - Bloom *Bloom -} - -//nolint:revive -type V1Builder struct { - opts BlockOptions - - writer BlockWriter - index *IndexBuilder - blooms *BloomBlockBuilder -} - -func NewBlockBuilderV1(opts BlockOptions, writer BlockWriter) (*V1Builder, error) { - if opts.Schema.version != V1 { - return nil, errors.Errorf("schema mismatch creating v1 builder, expected %v, got %v", V1, opts.Schema.version) - } - - index, err := writer.Index() - if err != nil { - return nil, errors.Wrap(err, "initializing index writer") - } - blooms, err := writer.Blooms() - if err != nil { - return nil, errors.Wrap(err, "initializing blooms writer") - } - - return &V1Builder{ - opts: opts, - writer: writer, - index: NewIndexBuilder(opts, index), - blooms: NewBloomBlockBuilder(opts, blooms), - }, nil -} - -func (b *V1Builder) BuildFrom(itr iter.Iterator[SeriesWithBloom]) (uint32, error) { - for itr.Next() { - at := itr.At() - offset, err := b.AddBloom(at.Bloom) - if err != nil { - return 0, errors.Wrap(err, "writing bloom") - } - - blockFull, err := b.AddSeries(*at.Series, offset) - - if err != nil { - return 0, errors.Wrapf(err, "writing series") - } - if blockFull { - break - } - } - - if err := itr.Err(); err != nil { - return 0, errors.Wrap(err, "iterating series") - } - - return b.Close() -} - -func (b *V1Builder) Close() (uint32, error) { - bloomChecksum, err := b.blooms.Close() - if err != nil { - return 0, errors.Wrap(err, "closing bloom file") - } - indexCheckSum, err := b.index.Close() - if err != nil { - return 0, errors.Wrap(err, "closing series file") - } - return combineChecksums(indexCheckSum, bloomChecksum), nil -} - -func (b *V1Builder) AddBloom(bloom *Bloom) (BloomOffset, error) { - return b.blooms.Append(bloom) -} - -func (b *V1Builder) AddSeries(series Series, offset BloomOffset) (bool, error) { - if err := b.index.AppendV1(SeriesWithOffset{ +func (b *V3Builder) AddSeries(series Series, offsets []BloomOffset, fields []Field) (bool, error) { + if err := b.index.Append(SeriesWithMeta{ Series: series, - Offset: offset, + Meta: Meta{ + Offsets: offsets, + Fields: fields, + }, }); err != nil { return false, errors.Wrapf(err, "writing index for series %v", series.Fingerprint) } diff --git a/pkg/storage/bloom/v1/versioned_builder_test.go b/pkg/storage/bloom/v1/versioned_builder_test.go index 4b1103f1bbdac..01a5aa354f752 100644 --- a/pkg/storage/bloom/v1/versioned_builder_test.go +++ b/pkg/storage/bloom/v1/versioned_builder_test.go @@ -38,57 +38,8 @@ func setup(v Version) (BlockOptions, []SeriesWithLiteralBlooms, BlockWriter, Blo return smallBlockOpts(v, chunkenc.EncNone), data, writer, reader } -// Tests v1 format by encoding a block into v1 then decoding it back and comparing the results -// to the source data. -// NB(owen-d): This also tests that the block querier can "up cast" the v1 format to the v2 format -// in the sense that v1 uses a single bloom per series and v2 uses multiple blooms per series and therefore -// v1 can be interpreted as v2 with a single bloom per series. -func TestV1RoundTrip(t *testing.T) { - opts, data, writer, reader := setup(V1) - b, err := NewBlockBuilderV1(opts, writer) - require.NoError(t, err) - - mapped := v2.NewMapIter[SeriesWithLiteralBlooms]( - v2.NewSliceIter(data), - func(s SeriesWithLiteralBlooms) SeriesWithBloom { - return SeriesWithBloom{ - Series: s.Series, - Bloom: s.Blooms[0], - } - }, - ) - - _, err = b.BuildFrom(mapped) - require.NoError(t, err) - - // Ensure Equality - block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize).Iter() - - CompareIterators[SeriesWithLiteralBlooms, *SeriesWithBlooms]( - t, - func(t *testing.T, a SeriesWithLiteralBlooms, b *SeriesWithBlooms) { - require.Equal(t, a.Series, b.Series) // ensure series equality - bs, err := v2.Collect(b.Blooms) - require.NoError(t, err) - - // ensure we only have one bloom in v1 - require.Equal(t, 1, len(a.Blooms)) - require.Equal(t, 1, len(bs)) - - var encA, encB encoding.Encbuf - require.NoError(t, a.Blooms[0].Encode(&encA)) - require.NoError(t, bs[0].Encode(&encB)) - - require.Equal(t, encA.Get(), encB.Get()) - }, - v2.NewSliceIter(data), - querier, - ) -} - -func TestV2Roundtrip(t *testing.T) { - opts, data, writer, reader := setup(V2) +func TestV3Roundtrip(t *testing.T) { + opts, data, writer, reader := setup(V3) data, err := v2.Collect( v2.NewMapIter[SeriesWithLiteralBlooms, SeriesWithLiteralBlooms]( @@ -105,7 +56,7 @@ func TestV2Roundtrip(t *testing.T) { ) require.NoError(t, err) - b, err := NewBlockBuilderV2(opts, writer) + b, err := NewBlockBuilderV3(opts, writer) require.NoError(t, err) mapped := v2.NewMapIter[SeriesWithLiteralBlooms]( From 629462807073b092c9f08f1d963009a345035f81 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Fri, 30 Aug 2024 17:41:56 +0200 Subject: [PATCH 02/10] Remove unnecessary PeekingEntryIter adapter Signed-off-by: Christian Haudum --- pkg/storage/bloom/v1/bloom_tokenizer.go | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/pkg/storage/bloom/v1/bloom_tokenizer.go b/pkg/storage/bloom/v1/bloom_tokenizer.go index e5a71d0aedd4d..ffae78f2f2a0c 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer.go @@ -9,7 +9,6 @@ import ( "github.com/grafana/loki/v3/pkg/iter" v2iter "github.com/grafana/loki/v3/pkg/iter/v2" - "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter" "github.com/grafana/loki/v3/pkg/util/encoding" @@ -139,7 +138,7 @@ func (bt *BloomTokenizer) Populate( for chks.Next() { chk := chks.At() - itr := newPeekingEntryIterAdapter(chk.Itr) + itr := v2iter.NewPeekIter(chk.Itr) for { full, newBytes := bt.addChunkToBloom( @@ -288,19 +287,3 @@ outer: return full, chunkBytes } - -type entryIterAdapter struct { - iter.EntryIterator -} - -func (a entryIterAdapter) At() logproto.Entry { - return a.EntryIterator.At() -} - -func (a entryIterAdapter) Err() error { - return a.EntryIterator.Err() -} - -func newPeekingEntryIterAdapter(itr iter.EntryIterator) *v2iter.PeekIter[logproto.Entry] { - return v2iter.NewPeekIter[logproto.Entry](entryIterAdapter{itr}) -} From 8fed71360a3e6d728d7711b6cf143121d1b08b51 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Mon, 2 Sep 2024 15:51:41 +0200 Subject: [PATCH 03/10] Check block schema compatibility on query path Signed-off-by: Christian Haudum --- pkg/bloomgateway/processor.go | 5 +++++ pkg/storage/bloom/v1/index.go | 2 +- pkg/storage/bloom/v1/schema.go | 8 ++++++-- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/pkg/bloomgateway/processor.go b/pkg/bloomgateway/processor.go index edd882e1e4210..83f3a6b9a2d16 100644 --- a/pkg/bloomgateway/processor.go +++ b/pkg/bloomgateway/processor.go @@ -145,6 +145,11 @@ func (p *processor) processBlock(_ context.Context, bq *bloomshipper.CloseableBl return err } + // We require V3 schema + if !schema.IsCurrentSchema() { + return v1.ErrUnsupportedSchemaVersion + } + tokenizer := v1.NewNGramTokenizer(schema.NGramLen(), schema.NGramSkip()) iters := make([]iter.PeekIterator[v1.Request], 0, len(tasks)) diff --git a/pkg/storage/bloom/v1/index.go b/pkg/storage/bloom/v1/index.go index 3855063a71180..ad338f80333a4 100644 --- a/pkg/storage/bloom/v1/index.go +++ b/pkg/storage/bloom/v1/index.go @@ -341,7 +341,7 @@ func (s *SeriesWithMeta) Decode( previousOffset BloomOffset, ) (model.Fingerprint, BloomOffset, error) { if version < V3 { - return 0, BloomOffset{}, errUnsupportedSchemaVersion + return 0, BloomOffset{}, ErrUnsupportedSchemaVersion } s.Fingerprint = previousFp + model.Fingerprint(dec.Uvarint64()) diff --git a/pkg/storage/bloom/v1/schema.go b/pkg/storage/bloom/v1/schema.go index 58f622b86b10d..183ada315d6d4 100644 --- a/pkg/storage/bloom/v1/schema.go +++ b/pkg/storage/bloom/v1/schema.go @@ -30,8 +30,8 @@ const ( var ( SupportedVersions = []Version{V3} - errInvalidSchemaVerunkion = errors.New("invalid schema version") - errUnsupportedSchemaVersion = errors.New("unsupported schema version") + ErrInvalidSchemaVersion = errors.New("invalid schema version") + ErrUnsupportedSchemaVersion = errors.New("unsupported schema version") ) type Schema struct { @@ -57,6 +57,10 @@ func (s Schema) Compatible(other Schema) bool { return s == other } +func (s Schema) IsCurrentSchema() bool { + return s.version == DefaultSchemaVersion +} + func (s Schema) NGramLen() int { return int(s.nGramLength) } From 7017ee6dad38ab7b4581d8524c60656de04584af Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Mon, 2 Sep 2024 16:14:17 +0200 Subject: [PATCH 04/10] Extract NewBloom() function from tokenizer Signed-off-by: Christian Haudum --- pkg/storage/bloom/v1/bloom.go | 7 +++++++ pkg/storage/bloom/v1/bloom_tokenizer.go | 11 ++--------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/storage/bloom/v1/bloom.go b/pkg/storage/bloom/v1/bloom.go index 49f1e06e36748..878f254abc178 100644 --- a/pkg/storage/bloom/v1/bloom.go +++ b/pkg/storage/bloom/v1/bloom.go @@ -23,6 +23,13 @@ type Bloom struct { filter.ScalableBloomFilter } +func NewBloom() *Bloom { + return &Bloom{ + // TODO parameterise SBF options. fp_rate + ScalableBloomFilter: *filter.NewScalableBloomFilter(1024, 0.01, 0.8), + } +} + func (b *Bloom) Encode(enc *encoding.Encbuf) error { // divide by 8 b/c bloom capacity is measured in bits, but we want bytes buf := bytes.NewBuffer(make([]byte, 0, int(b.Capacity()/8))) diff --git a/pkg/storage/bloom/v1/bloom_tokenizer.go b/pkg/storage/bloom/v1/bloom_tokenizer.go index ffae78f2f2a0c..d2e89aead94b5 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer.go @@ -91,13 +91,6 @@ func estimatedCount(m uint, p float64) uint { return uint(-float64(m) * math.Log(1-p)) } -func (bt *BloomTokenizer) newBloom() *Bloom { - return &Bloom{ - // TODO parameterise SBF options. fp_rate - ScalableBloomFilter: *filter.NewScalableBloomFilter(1024, 0.01, 0.8), - } -} - // Populates a bloom filter(s) with the tokens from the given chunks. // Called once per series func (bt *BloomTokenizer) Populate( @@ -131,7 +124,7 @@ func (bt *BloomTokenizer) Populate( ) } } else { - bloom = bt.newBloom() + bloom = NewBloom() } var bytesAdded int @@ -155,7 +148,7 @@ func (bt *BloomTokenizer) Populate( // start a new bloom + reset bytesAdded counter bytesAdded = 0 - bloom = bt.newBloom() + bloom = NewBloom() // cache _MUST_ be cleared when a new bloom is created to ensure that all tokens from // each line are indexed into at least one bloom From 6e133fa6f5282c53990bc56689028da6e3256794 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Wed, 4 Sep 2024 16:32:52 +0200 Subject: [PATCH 05/10] Rename DefaultSchemaVersion to CurrentSchemaVersion Signed-off-by: Christian Haudum --- pkg/storage/bloom/v1/archive_test.go | 2 +- pkg/storage/bloom/v1/builder.go | 2 +- pkg/storage/bloom/v1/builder_test.go | 12 ++++++------ pkg/storage/bloom/v1/fuse_test.go | 10 +++++----- pkg/storage/bloom/v1/schema.go | 6 +++--- pkg/storage/bloom/v1/test_util.go | 2 +- 6 files changed, 17 insertions(+), 17 deletions(-) diff --git a/pkg/storage/bloom/v1/archive_test.go b/pkg/storage/bloom/v1/archive_test.go index 8ebcdb9aebccf..401cc56a218cd 100644 --- a/pkg/storage/bloom/v1/archive_test.go +++ b/pkg/storage/bloom/v1/archive_test.go @@ -23,7 +23,7 @@ func TestArchive(t *testing.T) { builder, err := NewBlockBuilder( BlockOptions{ Schema: Schema{ - version: DefaultSchemaVersion, + version: CurrentSchemaVersion, encoding: chunkenc.EncSnappy, }, SeriesPageSize: 100, diff --git a/pkg/storage/bloom/v1/builder.go b/pkg/storage/bloom/v1/builder.go index 6546f1afcb95c..11058fe3b0919 100644 --- a/pkg/storage/bloom/v1/builder.go +++ b/pkg/storage/bloom/v1/builder.go @@ -80,7 +80,7 @@ func (b BlockOptions) Encode(enc *encoding.Encbuf) { func NewBlockOptions(enc chunkenc.Encoding, nGramLength, nGramSkip, maxBlockSizeBytes, maxBloomSizeBytes uint64) BlockOptions { opts := NewBlockOptionsFromSchema(Schema{ - version: DefaultSchemaVersion, + version: CurrentSchemaVersion, encoding: enc, nGramLength: nGramLength, nGramSkip: nGramSkip, diff --git a/pkg/storage/bloom/v1/builder_test.go b/pkg/storage/bloom/v1/builder_test.go index 2eb7a0f797d64..d22e579ac6a0e 100644 --- a/pkg/storage/bloom/v1/builder_test.go +++ b/pkg/storage/bloom/v1/builder_test.go @@ -28,7 +28,7 @@ func TestBlockOptions_RoundTrip(t *testing.T) { t.Parallel() opts := BlockOptions{ Schema: Schema{ - version: DefaultSchemaVersion, + version: CurrentSchemaVersion, encoding: chunkenc.EncSnappy, nGramLength: 10, nGramSkip: 2, @@ -89,7 +89,7 @@ func TestBlockBuilder_RoundTrip(t *testing.T) { t.Run(desc, func(t *testing.T) { blockOpts := BlockOptions{ Schema: Schema{ - version: DefaultSchemaVersion, + version: CurrentSchemaVersion, encoding: enc, nGramLength: 10, nGramSkip: 2, @@ -210,7 +210,7 @@ func TestMergeBuilder(t *testing.T) { data, _ := MkBasicSeriesWithBlooms(numSeries, 0, 0xffff, 0, 10000) blockOpts := BlockOptions{ Schema: Schema{ - version: DefaultSchemaVersion, + version: CurrentSchemaVersion, encoding: chunkenc.EncSnappy, }, SeriesPageSize: 100, @@ -306,7 +306,7 @@ func TestMergeBuilderFingerprintCollision(t *testing.T) { blockOpts := BlockOptions{ Schema: Schema{ - version: DefaultSchemaVersion, + version: CurrentSchemaVersion, encoding: chunkenc.EncSnappy, }, SeriesPageSize: 100, @@ -399,7 +399,7 @@ func TestBlockReset(t *testing.T) { reader := NewByteReader(indexBuf, bloomsBuf) schema := Schema{ - version: DefaultSchemaVersion, + version: CurrentSchemaVersion, encoding: chunkenc.EncSnappy, nGramLength: 10, nGramSkip: 2, @@ -457,7 +457,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) { blockOpts := BlockOptions{ Schema: Schema{ - version: DefaultSchemaVersion, + version: CurrentSchemaVersion, encoding: chunkenc.EncSnappy, // test with different encodings? nGramLength: 4, // needs to match values from MkBasicSeriesWithBlooms nGramSkip: 0, // needs to match values from MkBasicSeriesWithBlooms diff --git a/pkg/storage/bloom/v1/fuse_test.go b/pkg/storage/bloom/v1/fuse_test.go index 9db4154ca2903..3adcc6833e174 100644 --- a/pkg/storage/bloom/v1/fuse_test.go +++ b/pkg/storage/bloom/v1/fuse_test.go @@ -58,7 +58,7 @@ func TestFusedQuerier(t *testing.T) { builder, err := NewBlockBuilder( BlockOptions{ Schema: Schema{ - version: DefaultSchemaVersion, + version: CurrentSchemaVersion, encoding: chunkenc.EncSnappy, }, SeriesPageSize: 100, @@ -152,7 +152,7 @@ func TestFuseMultiPage(t *testing.T) { builder, err := NewBlockBuilder( BlockOptions{ Schema: Schema{ - version: DefaultSchemaVersion, + version: CurrentSchemaVersion, encoding: chunkenc.EncSnappy, nGramLength: 3, // we test trigrams nGramSkip: 0, @@ -308,7 +308,7 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) { builder, err := NewBlockBuilder( BlockOptions{ Schema: Schema{ - version: DefaultSchemaVersion, + version: CurrentSchemaVersion, encoding: chunkenc.EncSnappy, }, SeriesPageSize: 100, @@ -366,7 +366,7 @@ func TestFusedQuerierSkipsEmptyBlooms(t *testing.T) { builder, err := NewBlockBuilder( BlockOptions{ Schema: Schema{ - version: DefaultSchemaVersion, + version: CurrentSchemaVersion, encoding: chunkenc.EncNone, }, SeriesPageSize: 100, @@ -430,7 +430,7 @@ func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]Request, []chan Ou builder, err := NewBlockBuilder( BlockOptions{ Schema: Schema{ - version: DefaultSchemaVersion, + version: CurrentSchemaVersion, encoding: chunkenc.EncSnappy, }, SeriesPageSize: 256 << 10, // 256k diff --git a/pkg/storage/bloom/v1/schema.go b/pkg/storage/bloom/v1/schema.go index 183ada315d6d4..557849d552381 100644 --- a/pkg/storage/bloom/v1/schema.go +++ b/pkg/storage/bloom/v1/schema.go @@ -24,7 +24,7 @@ const ( // V2 indicated schema for indexed structured metadata V3 - DefaultSchemaVersion = V3 + CurrentSchemaVersion = V3 ) var ( @@ -42,7 +42,7 @@ type Schema struct { func NewSchema() Schema { return Schema{ - version: DefaultSchemaVersion, + version: CurrentSchemaVersion, encoding: chunkenc.EncNone, nGramLength: 0, nGramSkip: 0, @@ -58,7 +58,7 @@ func (s Schema) Compatible(other Schema) bool { } func (s Schema) IsCurrentSchema() bool { - return s.version == DefaultSchemaVersion + return s.version == CurrentSchemaVersion } func (s Schema) NGramLen() int { diff --git a/pkg/storage/bloom/v1/test_util.go b/pkg/storage/bloom/v1/test_util.go index 4fbbfa8d7bc1d..c08209ce03802 100644 --- a/pkg/storage/bloom/v1/test_util.go +++ b/pkg/storage/bloom/v1/test_util.go @@ -28,7 +28,7 @@ func MakeBlock(t testing.TB, nth int, fromFp, throughFp model.Fingerprint, fromT builder, err := NewBlockBuilder( BlockOptions{ Schema: Schema{ - version: DefaultSchemaVersion, + version: CurrentSchemaVersion, encoding: chunkenc.EncSnappy, nGramLength: 4, // see DefaultNGramLength in bloom_tokenizer_test.go nGramSkip: 0, // see DefaultNGramSkip in bloom_tokenizer_test.go From 538d838d5c9f5c4ed46d95535b2140974fb211b5 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Wed, 4 Sep 2024 16:33:56 +0200 Subject: [PATCH 06/10] Remove unused import Signed-off-by: Christian Haudum --- pkg/storage/bloom/v1/bloom_tokenizer.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/storage/bloom/v1/bloom_tokenizer.go b/pkg/storage/bloom/v1/bloom_tokenizer.go index d2e89aead94b5..d35b3c13b6a2d 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer.go @@ -9,7 +9,6 @@ import ( "github.com/grafana/loki/v3/pkg/iter" v2iter "github.com/grafana/loki/v3/pkg/iter/v2" - "github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter" "github.com/grafana/loki/v3/pkg/util/encoding" "github.com/grafana/loki/pkg/push" From b8ee03890eee1147d6bffaaf6293666e9a141f68 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Wed, 4 Sep 2024 16:39:05 +0200 Subject: [PATCH 07/10] Remove incorrect comment Signed-off-by: Christian Haudum --- pkg/storage/bloom/v1/index.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/storage/bloom/v1/index.go b/pkg/storage/bloom/v1/index.go index ad338f80333a4..d24129eb0de2f 100644 --- a/pkg/storage/bloom/v1/index.go +++ b/pkg/storage/bloom/v1/index.go @@ -292,8 +292,6 @@ type Meta struct { Offsets []BloomOffset } -// SeriesWithMeta is a series with a a variable number of bloom offsets. -// Used in v2+ to store blooms for larger series in parts type SeriesWithMeta struct { Series Meta From 5808546cc7020f274c1a35ea1344a9839e2f3a6f Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Wed, 4 Sep 2024 16:40:41 +0200 Subject: [PATCH 08/10] Make linter happy Signed-off-by: Christian Haudum --- pkg/storage/bloom/v1/schema.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/storage/bloom/v1/schema.go b/pkg/storage/bloom/v1/schema.go index 557849d552381..a6bc25a0f899f 100644 --- a/pkg/storage/bloom/v1/schema.go +++ b/pkg/storage/bloom/v1/schema.go @@ -4,9 +4,10 @@ import ( "fmt" "io" + "github.com/pkg/errors" + "github.com/grafana/loki/v3/pkg/chunkenc" "github.com/grafana/loki/v3/pkg/util/encoding" - "github.com/pkg/errors" ) type Version byte From a7c0a5eb56eebd78d0ac2e1e2d325189b0b1e319 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Wed, 4 Sep 2024 17:30:27 +0200 Subject: [PATCH 09/10] Introduce Set type for encoding structured metadata fields Signed-off-by: Christian Haudum --- pkg/storage/bloom/v1/builder.go | 7 +++- pkg/storage/bloom/v1/index.go | 47 +++++++---------------- pkg/storage/bloom/v1/index_test.go | 5 +-- pkg/storage/bloom/v1/util.go | 42 ++++++++++++++++++++ pkg/storage/bloom/v1/versioned_builder.go | 10 ++++- 5 files changed, 70 insertions(+), 41 deletions(-) diff --git a/pkg/storage/bloom/v1/builder.go b/pkg/storage/bloom/v1/builder.go index 11058fe3b0919..c6c4a21ca64a5 100644 --- a/pkg/storage/bloom/v1/builder.go +++ b/pkg/storage/bloom/v1/builder.go @@ -301,7 +301,12 @@ func (mb *MergeBuilder) processNextSeries( bytesAdded += bloom.SourceBytesAdded } - done, err := builder.AddSeries(*nextInStore, offsets, []Field{Field("__line__")}) + // TODO(chaudum): Use the indexed fields from bloom creation, however, + // currently we still build blooms from log lines. + fields := NewSet[Field](1) + fields.Add("__line__") + + done, err := builder.AddSeries(*nextInStore, offsets, fields) if err != nil { return nil, bytesAdded, 0, false, false, errors.Wrap(err, "committing series") } diff --git a/pkg/storage/bloom/v1/index.go b/pkg/storage/bloom/v1/index.go index d24129eb0de2f..a9e03efc41af9 100644 --- a/pkg/storage/bloom/v1/index.go +++ b/pkg/storage/bloom/v1/index.go @@ -288,7 +288,7 @@ type Series struct { } type Meta struct { - Fields Fields + Fields Set[Field] Offsets []BloomOffset } @@ -323,10 +323,9 @@ func (s *SeriesWithMeta) Encode( lastEnd = chunk.Encode(enc, version, lastEnd) } - enc.PutUvarint(len(s.Fields)) - sort.Sort(s.Fields) // ensure order - for _, field := range s.Fields { - field.Encode(enc, version) + enc.PutUvarint(s.Fields.Len()) + for _, f := range s.Fields.Items() { + f.Encode(enc, version) } return lastOffset @@ -370,12 +369,15 @@ func (s *SeriesWithMeta) Decode( } } - s.Fields = make([]Field, dec.Uvarint()) - for i := range s.Fields { - err = s.Fields[i].Decode(dec, version) + n := dec.Uvarint() + s.Fields = NewSet[Field](n) + for i := 0; i < n; i++ { + var f Field + err = f.Decode(dec, version) if err != nil { return 0, BloomOffset{}, errors.Wrapf(err, "decoding %dth field", i) } + s.Fields.Add(f) } return s.Fingerprint, lastOffset, dec.Err() @@ -383,10 +385,10 @@ func (s *SeriesWithMeta) Decode( // field encoding/decoding --------------------------------------------------- -type Field []byte // key of an indexed structured metadata field +type Field string -func (f *Field) Encode(enc *encoding.Encbuf, _ Version) { - enc.PutUvarintBytes(*f) +func (f Field) Encode(enc *encoding.Encbuf, _ Version) { + enc.PutUvarintBytes([]byte(f)) } func (f *Field) Decode(dec *encoding.Decbuf, _ Version) error { @@ -394,29 +396,6 @@ func (f *Field) Decode(dec *encoding.Decbuf, _ Version) error { return dec.Err() } -func (f *Field) String() string { - return string(*f) -} - -func (f *Field) Less(other Field) bool { - // avoid string allocations - return string(*f) < string(other) -} - -type Fields []Field - -func (f Fields) Len() int { - return len(f) -} - -func (f Fields) Less(i, j int) bool { - return f[i].Less(f[j]) -} - -func (f Fields) Swap(i, j int) { - f[i], f[j] = f[j], f[i] -} - // chunk encoding/decoding --------------------------------------------------- type ChunkRef logproto.ShortRef diff --git a/pkg/storage/bloom/v1/index_test.go b/pkg/storage/bloom/v1/index_test.go index 54d7459c50dad..dc25261faff75 100644 --- a/pkg/storage/bloom/v1/index_test.go +++ b/pkg/storage/bloom/v1/index_test.go @@ -52,10 +52,7 @@ func TestSeriesEncoding_V3(t *testing.T) { {Page: 1, ByteOffset: 2}, {Page: 2, ByteOffset: 1}, }, - Fields: []Field{ - Field("foo"), - Field("bar"), - }, + Fields: NewSetFromLiteral[Field]("foo", "bar"), }, } diff --git a/pkg/storage/bloom/v1/util.go b/pkg/storage/bloom/v1/util.go index f5169f01ef19d..ab657fa5a247c 100644 --- a/pkg/storage/bloom/v1/util.go +++ b/pkg/storage/bloom/v1/util.go @@ -67,3 +67,45 @@ func PointerSlice[T any](xs []T) []*T { } return out } + +type Set[V comparable] struct { + internal map[V]struct{} +} + +func NewSet[V comparable](size int) Set[V] { + return Set[V]{make(map[V]struct{}, size)} +} + +func NewSetFromLiteral[V comparable](v ...V) Set[V] { + set := NewSet[V](len(v)) + for _, elem := range v { + set.Add(elem) + } + return set +} + +func (s Set[V]) Add(v V) bool { + _, ok := s.internal[v] + if !ok { + s.internal[v] = struct{}{} + } + return !ok +} + +func (s Set[V]) Len() int { + return len(s.internal) +} + +func (s Set[V]) Items() []V { + set := make([]V, 0, s.Len()) + for k := range s.internal { + set = append(set, k) + } + return set +} + +func (s Set[V]) Union(other Set[V]) { + for _, v := range other.Items() { + s.Add(v) + } +} diff --git a/pkg/storage/bloom/v1/versioned_builder.go b/pkg/storage/bloom/v1/versioned_builder.go index 9545951840571..1dd133e210cb0 100644 --- a/pkg/storage/bloom/v1/versioned_builder.go +++ b/pkg/storage/bloom/v1/versioned_builder.go @@ -78,7 +78,13 @@ func (b *V3Builder) BuildFrom(itr iter.Iterator[SeriesWithBlooms]) (uint32, erro if err := at.Blooms.Err(); err != nil { return 0, errors.Wrap(err, "iterating blooms") } - blockFull, err := b.AddSeries(*at.Series, offsets, []Field{Field("__line__")}) + + // TODO(chaudum): Use the indexed fields from bloom creation, however, + // currently we still build blooms from log lines. + fields := NewSet[Field](1) + fields.Add("__line__") + + blockFull, err := b.AddSeries(*at.Series, offsets, fields) if err != nil { return 0, errors.Wrapf(err, "writing series") } @@ -111,7 +117,7 @@ func (b *V3Builder) AddBloom(bloom *Bloom) (BloomOffset, error) { } // AddSeries adds a series to the block. It returns true after adding the series, the block is full. -func (b *V3Builder) AddSeries(series Series, offsets []BloomOffset, fields []Field) (bool, error) { +func (b *V3Builder) AddSeries(series Series, offsets []BloomOffset, fields Set[Field]) (bool, error) { if err := b.index.Append(SeriesWithMeta{ Series: series, Meta: Meta{ From 1eb3b562db3d75e9507fef12ebf63d695a39d633 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Thu, 5 Sep 2024 09:13:09 +0200 Subject: [PATCH 10/10] Apply changes from code review Signed-off-by: Christian Haudum --- pkg/bloomgateway/processor.go | 4 ++-- pkg/storage/bloom/v1/bloom_builder.go | 2 -- pkg/storage/bloom/v1/builder.go | 12 ------------ pkg/storage/bloom/v1/builder_test.go | 3 +++ pkg/storage/bloom/v1/schema.go | 6 ++++-- pkg/storage/bloom/v1/util.go | 4 ---- 6 files changed, 9 insertions(+), 22 deletions(-) diff --git a/pkg/bloomgateway/processor.go b/pkg/bloomgateway/processor.go index 83f3a6b9a2d16..f422bfd563921 100644 --- a/pkg/bloomgateway/processor.go +++ b/pkg/bloomgateway/processor.go @@ -145,8 +145,8 @@ func (p *processor) processBlock(_ context.Context, bq *bloomshipper.CloseableBl return err } - // We require V3 schema - if !schema.IsCurrentSchema() { + // We require V3+ schema + if schema.Version() < v1.V3 { return v1.ErrUnsupportedSchemaVersion } diff --git a/pkg/storage/bloom/v1/bloom_builder.go b/pkg/storage/bloom/v1/bloom_builder.go index 2d75fc224d521..ea54ba248f7c4 100644 --- a/pkg/storage/bloom/v1/bloom_builder.go +++ b/pkg/storage/bloom/v1/bloom_builder.go @@ -46,8 +46,6 @@ func (b *BloomBlockBuilder) Append(bloom *Bloom) (BloomOffset, error) { } } - // version := b.opts.Schema.version - b.scratch.Reset() if err := bloom.Encode(b.scratch); err != nil { return BloomOffset{}, errors.Wrap(err, "encoding bloom") diff --git a/pkg/storage/bloom/v1/builder.go b/pkg/storage/bloom/v1/builder.go index c6c4a21ca64a5..3a61234a1b12a 100644 --- a/pkg/storage/bloom/v1/builder.go +++ b/pkg/storage/bloom/v1/builder.go @@ -66,18 +66,6 @@ func (b BlockOptions) Encode(enc *encoding.Encbuf) { enc.PutBE64(b.BlockSize) } -// func NewDefaultBlockOptions(maxBlockSizeBytes, maxBloomSizeBytes uint64) BlockOptions { -// opts := NewBlockOptionsFromSchema(Schema{ -// version: DefaultSchemaVersion, -// encoding: chunkenc.EncNone, -// nGramLength: 0, -// nGramSkip: 0, -// }) -// opts.BlockSize = maxBlockSizeBytes -// opts.UnencodedBlockOptions.MaxBloomSizeBytes = maxBloomSizeBytes -// return opts -// } - func NewBlockOptions(enc chunkenc.Encoding, nGramLength, nGramSkip, maxBlockSizeBytes, maxBloomSizeBytes uint64) BlockOptions { opts := NewBlockOptionsFromSchema(Schema{ version: CurrentSchemaVersion, diff --git a/pkg/storage/bloom/v1/builder_test.go b/pkg/storage/bloom/v1/builder_test.go index d22e579ac6a0e..640fef038a6e8 100644 --- a/pkg/storage/bloom/v1/builder_test.go +++ b/pkg/storage/bloom/v1/builder_test.go @@ -550,6 +550,9 @@ func TestMergeBuilder_Roundtrip(t *testing.T) { _, _, err = mb.Build(builder) require.Nil(t, err) + // checksum changes as soon as the contents of the block or the encoding change + // once the block format is stable, calculate the checksum and assert its correctness + // require.Equal(t, uint32(0x2a6cdba6), checksum) // ensure the new block contains one copy of all the data // by comparing it against an iterator over the source data diff --git a/pkg/storage/bloom/v1/schema.go b/pkg/storage/bloom/v1/schema.go index a6bc25a0f899f..6fd8621654239 100644 --- a/pkg/storage/bloom/v1/schema.go +++ b/pkg/storage/bloom/v1/schema.go @@ -17,6 +17,8 @@ func (v Version) String() string { } const ( + magicNumber = uint32(0xCA7CAFE5) + // Add new versions below V1 Version = iota // V2 supports single series blooms encoded over multiple pages @@ -58,8 +60,8 @@ func (s Schema) Compatible(other Schema) bool { return s == other } -func (s Schema) IsCurrentSchema() bool { - return s.version == CurrentSchemaVersion +func (s Schema) Version() Version { + return s.version } func (s Schema) NGramLen() int { diff --git a/pkg/storage/bloom/v1/util.go b/pkg/storage/bloom/v1/util.go index ab657fa5a247c..6745ccaec7c61 100644 --- a/pkg/storage/bloom/v1/util.go +++ b/pkg/storage/bloom/v1/util.go @@ -9,10 +9,6 @@ import ( "github.com/grafana/loki/v3/pkg/util/mempool" ) -const ( - magicNumber = uint32(0xCA7CAFE5) -) - var ( castagnoliTable = crc32.MakeTable(crc32.Castagnoli)