From 4b03a7ad9331a440a133a4f5cedfba2ab0091dd0 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Wed, 26 Jul 2023 17:51:51 +0530 Subject: [PATCH] implementing string interning to optimize resource usage for storing and processing non-indexed labels (#10044) **What this PR does / why we need it**: In PR #9700, we added support for storing non-indexed labels in chunks. This PR optimizes resource usage for storing and processing non-indexed labels by doing string interning. We will store deduped label names and values as a list in chunks in the newly added non-indexed labels section. The labels would then be referenced in blocks by their index(called symbols). Additionally, I have started the convention of writing lengths of sections with their offsets within chunks, making it easier to introduce new sections. The section offsets and lengths would be stored at the end of the chunk, similar to [TOC](https://ganeshvernekar.com/blog/prometheus-tsdb-persistent-block-and-its-index/#a-toc) in TSDB. **Checklist** - [x] Tests updated (cherry picked from commit 9b554bba8d27991820a4ce5e01c17f3753264415) --- pkg/chunkenc/memchunk.go | 406 +++++++++++++++++++++------------ pkg/chunkenc/memchunk_test.go | 141 ++++++++---- pkg/chunkenc/pool.go | 2 + pkg/chunkenc/symbols.go | 397 ++++++++++++++++++++++++++++++++ pkg/chunkenc/symbols_test.go | 163 +++++++++++++ pkg/chunkenc/unordered.go | 163 +++++++------ pkg/chunkenc/unordered_test.go | 62 ++--- 7 files changed, 1025 insertions(+), 309 deletions(-) create mode 100644 pkg/chunkenc/symbols.go create mode 100644 pkg/chunkenc/symbols_test.go diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index d2aa131b297bc..1a4bd2c76c373 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -42,6 +42,9 @@ const ( // This could wary from configured block size using `ingester.chunks-block-size` flag or equivalent yaml config resulting in // different block size in the new chunk which should be fine. defaultBlockSize = 256 * 1024 + + chunkMetasSectionIdx = 1 + chunkNonIndexedLabelsSectionIdx = 2 ) var HeadBlockFmts = []HeadBlockFmt{OrderedHeadBlockFmt, UnorderedHeadBlockFmt, UnorderedWithNonIndexedLabelsHeadBlockFmt} @@ -63,12 +66,12 @@ func (f HeadBlockFmt) String() string { } } -func (f HeadBlockFmt) NewBlock() HeadBlock { +func (f HeadBlockFmt) NewBlock(symbolizer *symbolizer) HeadBlock { switch { case f < UnorderedHeadBlockFmt: return &headBlock{} default: - return newUnorderedHeadBlock(f) + return newUnorderedHeadBlock(f, symbolizer) } } @@ -109,6 +112,7 @@ type MemChunk struct { // Target size in compressed bytes targetSize int + symbolizer *symbolizer // The finished blocks. blocks []block // The compressed size of all the blocks @@ -120,6 +124,9 @@ type MemChunk struct { format byte encoding Encoding headFmt HeadBlockFmt + + // compressed size of chunk. Set when chunk is cut or while decoding chunk from storage. + compressedSize int } type block struct { @@ -316,11 +323,11 @@ func (hb *headBlock) LoadBytes(b []byte) error { return nil } -func (hb *headBlock) Convert(version HeadBlockFmt) (HeadBlock, error) { +func (hb *headBlock) Convert(version HeadBlockFmt, symbolizer *symbolizer) (HeadBlock, error) { if version < UnorderedHeadBlockFmt { return hb, nil } - out := version.NewBlock() + out := version.NewBlock(symbolizer) for _, e := range hb.entries { if err := out.Append(e.t, e.s, e.nonIndexedLabels); err != nil { @@ -343,25 +350,33 @@ func NewMemChunk(enc Encoding, head HeadBlockFmt, blockSize, targetSize int) *Me // NewMemChunk returns a new in-mem chunk. func newMemChunkWithFormat(format byte, enc Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk { + symbolizer := newSymbolizer() return &MemChunk{ blockSize: blockSize, // The blockSize in bytes. targetSize: targetSize, // Desired chunk size in compressed bytes blocks: []block{}, format: format, - head: head.NewBlock(), + head: head.NewBlock(symbolizer), - encoding: enc, - headFmt: head, + encoding: enc, + headFmt: head, + symbolizer: symbolizer, } } // NewByteChunk returns a MemChunk on the passed bytes. func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) { + return newByteChunk(b, blockSize, targetSize, false) +} + +func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*MemChunk, error) { bc := &MemChunk{ - head: &headBlock{}, // Dummy, empty headblock. - blockSize: blockSize, - targetSize: targetSize, + head: &headBlock{}, // Dummy, empty headblock. + blockSize: blockSize, + targetSize: targetSize, + symbolizer: newSymbolizer(), + compressedSize: len(b), } db := decbuf{b: b} @@ -388,11 +403,31 @@ func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) { return nil, errors.Errorf("invalid version %d", version) } - metasOffset := binary.BigEndian.Uint64(b[len(b)-8:]) - mb := b[metasOffset : len(b)-(8+4)] // storing the metasOffset + checksum of meta + // readSectionLenAndOffset reads len and offset for different sections within the chunk. + // Starting from chunk version 4, we have started writing offset and length of various sections within the chunk. + // These len and offset pairs would be stored together at the end of the chunk. + // Considering N stored length and offset pairs, they can be referenced by index starting from [1-N] + // where 1 would be referring to last entry, 2 would be referring to last 2nd entry and so on. + readSectionLenAndOffset := func(idx int) (uint64, uint64) { + lenAndOffsetPos := len(b) - (idx * 16) + lenAndOffset := b[lenAndOffsetPos : lenAndOffsetPos+16] + return binary.BigEndian.Uint64(lenAndOffset[:8]), binary.BigEndian.Uint64(lenAndOffset[8:]) + } + + metasOffset := uint64(0) + metasLen := uint64(0) + if version >= chunkFormatV4 { + // version >= 4 starts writing length of sections after their offsets + metasLen, metasOffset = readSectionLenAndOffset(chunkMetasSectionIdx) + } else { + // version <= 3 does not store length of metas. metas are followed by metasOffset + hash and then the chunk ends + metasOffset = binary.BigEndian.Uint64(b[len(b)-8:]) + metasLen = uint64(len(b)-(8+4)) - metasOffset + } + mb := b[metasOffset : metasOffset+metasLen] db = decbuf{b: mb} - expCRC := binary.BigEndian.Uint32(b[len(b)-(8+4):]) + expCRC := binary.BigEndian.Uint32(b[metasOffset+metasLen:]) if expCRC != db.crc32() { return nil, ErrInvalidChecksum } @@ -435,6 +470,27 @@ func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) { } } + if version >= chunkFormatV4 { + nonIndexedLabelsLen, nonIndexedLabelsOffset := readSectionLenAndOffset(chunkNonIndexedLabelsSectionIdx) + lb := b[nonIndexedLabelsOffset : nonIndexedLabelsOffset+nonIndexedLabelsLen] // non-indexed labels Offset + checksum + db = decbuf{b: lb} + + expCRC := binary.BigEndian.Uint32(b[nonIndexedLabelsOffset+nonIndexedLabelsLen:]) + if expCRC != db.crc32() { + return nil, ErrInvalidChecksum + } + + if fromCheckpoint { + bc.symbolizer = symbolizerFromCheckpoint(lb) + } else { + symbolizer, err := symbolizerFromEnc(lb, getReaderPool(bc.encoding)) + if err != nil { + return nil, err + } + bc.symbolizer = symbolizer + } + } + return bc, nil } @@ -482,16 +538,29 @@ func (c *MemChunk) BytesSize() int { size += crc32.Size // metablock crc size += 8 // metaoffset + + if c.format >= chunkFormatV4 { + size += 8 // metablock length + + size += c.symbolizer.CheckpointSize() // non-indexed labels block + size += crc32.Size // non-indexed labels block crc + + size += 8 + 8 // non-indexed labels offset and length + } return size } +func (c *MemChunk) WriteTo(w io.Writer) (int64, error) { + return c.writeTo(w, false) +} + // WriteTo Implements io.WriterTo // NOTE: Does not cut head block or include any head block data. // For this to be the case you must call Close() first. // This decision notably enables WAL checkpointing, which would otherwise // result in different content addressable chunks in storage based on the timing of when // they were checkpointed (which would cause new blocks to be cut early). -func (c *MemChunk) WriteTo(w io.Writer) (int64, error) { +func (c *MemChunk) writeTo(w io.Writer, forCheckpoint bool) (int64, error) { crc32Hash := crc32HashPool.Get().(hash.Hash32) defer crc32HashPool.Put(crc32Hash) crc32Hash.Reset() @@ -516,6 +585,36 @@ func (c *MemChunk) WriteTo(w io.Writer) (int64, error) { return offset, errors.Wrap(err, "write blockMeta #entries") } offset += int64(n) + nonIndexedLabelsOffset := offset + nonIndexedLabelsLen := 0 + + if c.format >= chunkFormatV4 { + var ( + n int + crcHash []byte + ) + if forCheckpoint { + var err error + n, crcHash, err = c.symbolizer.CheckpointTo(w) + if err != nil { + return offset, errors.Wrap(err, "write non-indexed labels") + } + } else { + var err error + n, crcHash, err = c.symbolizer.SerializeTo(w, getWriterPool(c.encoding)) + if err != nil { + return offset, errors.Wrap(err, "write non-indexed labels") + } + } + offset += int64(n) + nonIndexedLabelsLen = n + + n, err = w.Write(crcHash) + if err != nil { + return offset, errors.Wrap(err, "write crc32 hash for non-indexed labels") + } + offset += int64(n) + } // Write Blocks. for i, b := range c.blocks { @@ -550,6 +649,7 @@ func (c *MemChunk) WriteTo(w io.Writer) (int64, error) { } eb.putUvarint(len(b.b)) } + metasLen := len(eb.get()) eb.putHash(crc32Hash) n, err = w.Write(eb.get()) @@ -558,8 +658,23 @@ func (c *MemChunk) WriteTo(w io.Writer) (int64, error) { } offset += int64(n) + if c.format >= chunkFormatV4 { + // Write non-indexed labels offset and length + eb.reset() + eb.putBE64int(nonIndexedLabelsLen) + eb.putBE64int(int(nonIndexedLabelsOffset)) + n, err = w.Write(eb.get()) + if err != nil { + return offset, errors.Wrap(err, "write non-indexed labels offset and length") + } + offset += int64(n) + } + // Write the metasOffset. eb.reset() + if c.format >= chunkFormatV4 { + eb.putBE64int(metasLen) + } eb.putBE64int(int(metasOffset)) n, err = w.Write(eb.get()) if err != nil { @@ -567,6 +682,7 @@ func (c *MemChunk) WriteTo(w io.Writer) (int64, error) { } offset += int64(n) + c.compressedSize = int(offset) return offset, nil } @@ -574,39 +690,38 @@ func (c *MemChunk) WriteTo(w io.Writer) (int64, error) { // This is to ensure eventually flushed chunks don't have different substructures depending on when they were checkpointed. // In turn this allows us to maintain a more effective dedupe ratio in storage. func (c *MemChunk) SerializeForCheckpointTo(chk, head io.Writer) error { - _, err := c.WriteTo(chk) - if err != nil { - return err - } - - if c.head.IsEmpty() { - return nil - } - - err = c.head.CheckpointTo(head) - if err != nil { - return err + // serialize the head before the MemChunk because: + // * We store non-indexed labels with chunks(using symbolizer) which are then referenced by blocks and head. + // * When a write request is received with some new non-indexed labels, we update symbolizer first and then append log entry to head. + // * Labels stored in symbolizer are serialized with MemChunk. + // This means if we serialize the MemChunk before the head, we might miss writing some newly added non-indexed labels which are referenced by head. + if !c.head.IsEmpty() { + err := c.head.CheckpointTo(head) + if err != nil { + return err + } } - return nil + _, err := c.writeTo(chk, true) + return err } func (c *MemChunk) CheckpointSize() (chunk, head int) { return c.BytesSize(), c.head.CheckpointSize() } -func MemchunkFromCheckpoint(chk, head []byte, desired HeadBlockFmt, blockSize int, targetSize int) (*MemChunk, error) { - mc, err := NewByteChunk(chk, blockSize, targetSize) +func MemchunkFromCheckpoint(chk, head []byte, desiredIfNotUnordered HeadBlockFmt, blockSize int, targetSize int) (*MemChunk, error) { + mc, err := newByteChunk(chk, blockSize, targetSize, true) if err != nil { return nil, err } - h, err := HeadFromCheckpoint(head, desired) + h, err := HeadFromCheckpoint(head, desiredIfNotUnordered, mc.symbolizer) if err != nil { return nil, err } mc.head = h - mc.headFmt = desired + mc.headFmt = h.Format() return mc, nil } @@ -638,10 +753,15 @@ func (c *MemChunk) SpaceFor(e *logproto.Entry) bool { // This is looking to see if the uncompressed lines will fit which is not // a great check, but it will guarantee we are always under the target size newHBSize := c.head.UncompressedSize() + len(e.Line) + nonIndexedLabelsSize := 0 if c.format >= chunkFormatV4 { newHBSize += metaLabelsLen(logproto.FromLabelAdaptersToLabels(e.NonIndexedLabels)) + // non-indexed labels are compressed while serializing the chunk so we don't know what their size would be after compression. + // As adoption increases, their overall size can be non-trivial so we can't ignore them while calculating chunk size. + // ToDo(Sandeep): See if we can just use some average compression ratio for each compression format we support and use it here + nonIndexedLabelsSize = c.symbolizer.UncompressedSize() } - return (c.cutBlockSize + newHBSize) < c.targetSize + return (nonIndexedLabelsSize + c.cutBlockSize + newHBSize) < c.targetSize } // if targetSize is not defined, default to the original behavior of fixed blocks per chunk return len(c.blocks) < blocksPerChunk @@ -657,14 +777,26 @@ func (c *MemChunk) UncompressedSize() int { size += b.uncompressedSize } + if c.format >= chunkFormatV4 { + size += c.symbolizer.UncompressedSize() + } + return size } // CompressedSize implements Chunk. func (c *MemChunk) CompressedSize() int { + if c.compressedSize != 0 { + return c.compressedSize + } + size := 0 // Better to account for any uncompressed data than ignore it even though this isn't accurate. size += c.head.UncompressedSize() + if c.format >= chunkFormatV4 { + size += c.symbolizer.UncompressedSize() // length of each symbol + } + size += c.cutBlockSize return size } @@ -688,6 +820,9 @@ func (c *MemChunk) Append(entry *logproto.Entry) error { return ErrOutOfOrder } + if c.format < chunkFormatV4 { + entry.NonIndexedLabels = nil + } if err := c.head.Append(entryTimestamp, entry.Line, logproto.FromLabelAdaptersToLabels(entry.NonIndexedLabels)); err != nil { return err } @@ -737,7 +872,7 @@ func (c *MemChunk) reorder() error { func (c *MemChunk) ConvertHead(desired HeadBlockFmt) error { if c.head != nil && c.head.Format() != desired { - newH, err := c.head.Convert(desired) + newH, err := c.head.Convert(desired, c.symbolizer) if err != nil { return err } @@ -795,6 +930,14 @@ func (c *MemChunk) Bounds() (fromT, toT time.Time) { func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error) { mint, maxt := mintT.UnixNano(), maxtT.UnixNano() blockItrs := make([]iter.EntryIterator, 0, len(c.blocks)+1) + + if c.format >= chunkFormatV4 { + stats := stats.FromContext(ctx) + stats.AddCompressedBytes(int64(c.symbolizer.CompressedSize())) + decompressedSize := int64(c.symbolizer.DecompressedSize()) + stats.AddDecompressedBytes(decompressedSize) + stats.AddDecompressedNonIndexedLabelsBytes(decompressedSize) + } var headIterator iter.EntryIterator var lastMax int64 // placeholder to check order across blocks @@ -811,7 +954,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi } lastMax = b.maxt - blockItrs = append(blockItrs, encBlock{c.encoding, c.format, b}.Iterator(ctx, pipeline)) + blockItrs = append(blockItrs, encBlock{c.encoding, c.format, c.symbolizer, b}.Iterator(ctx, pipeline)) } if !c.head.IsEmpty() { @@ -873,6 +1016,14 @@ func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, mint, maxt := from.UnixNano(), through.UnixNano() its := make([]iter.SampleIterator, 0, len(c.blocks)+1) + if c.format >= chunkFormatV4 { + stats := stats.FromContext(ctx) + stats.AddCompressedBytes(int64(c.symbolizer.CompressedSize())) + decompressedSize := int64(c.symbolizer.DecompressedSize()) + stats.AddDecompressedBytes(decompressedSize) + stats.AddDecompressedNonIndexedLabelsBytes(decompressedSize) + } + var lastMax int64 // placeholder to check order across blocks ordered := true for _, b := range c.blocks { @@ -885,7 +1036,7 @@ func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, ordered = false } lastMax = b.maxt - its = append(its, encBlock{c.encoding, c.format, b}.SampleIterator(ctx, extractor)) + its = append(its, encBlock{c.encoding, c.format, c.symbolizer, b}.SampleIterator(ctx, extractor)) } if !c.head.IsEmpty() { @@ -917,7 +1068,7 @@ func (c *MemChunk) Blocks(mintT, maxtT time.Time) []Block { for _, b := range c.blocks { if maxt >= b.mint && b.maxt >= mint { - blocks = append(blocks, encBlock{c.encoding, c.format, b}) + blocks = append(blocks, encBlock{c.encoding, c.format, c.symbolizer, b}) } } return blocks @@ -969,8 +1120,9 @@ func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, err // then allows us to bind a decoding context to a block when requested, but otherwise helps reduce the // chances of chunk<>block encoding drift in the codebase as the latter is parameterized by the former. type encBlock struct { - enc Encoding - format byte + enc Encoding + format byte + symbolizer *symbolizer block } @@ -978,14 +1130,14 @@ func (b encBlock) Iterator(ctx context.Context, pipeline log.StreamPipeline) ite if len(b.b) == 0 { return iter.NoopIterator } - return newEntryIterator(ctx, getReaderPool(b.enc), b.b, pipeline, b.format) + return newEntryIterator(ctx, getReaderPool(b.enc), b.b, pipeline, b.format, b.symbolizer) } func (b encBlock) SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator { if len(b.b) == 0 { return iter.NoopIterator } - return newSampleIterator(ctx, getReaderPool(b.enc), b.b, b.format, extractor) + return newSampleIterator(ctx, getReaderPool(b.enc), b.b, b.format, extractor, b.symbolizer) } func (b block) Offset() int { @@ -1131,8 +1283,9 @@ type bufferedIterator struct { origBytes []byte stats *stats.Context - reader io.Reader - pool ReaderPool + reader io.Reader + pool ReaderPool + symbolizer *symbolizer err error @@ -1144,21 +1297,22 @@ type bufferedIterator struct { currLine []byte // the current line, this is the same as the buffer but sliced the line size. currTs int64 - nonIndexedLabelsBuf [][]byte // The buffer for a single entry's non-indexed labels. + symbolsBuf []symbol // The buffer for a single entry's symbols. currNonIndexedLabels labels.Labels // The current labels. closed bool } -func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte, format byte) *bufferedIterator { +func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte, format byte, symbolizer *symbolizer) *bufferedIterator { stats := stats.FromContext(ctx) stats.AddCompressedBytes(int64(len(b))) return &bufferedIterator{ - stats: stats, - origBytes: b, - reader: nil, // will be initialized later - pool: pool, - format: format, + stats: stats, + origBytes: b, + reader: nil, // will be initialized later + pool: pool, + format: format, + symbolizer: symbolizer, } } @@ -1177,26 +1331,12 @@ func (si *bufferedIterator) Next() bool { } } - ts, line, nonIndexedLabelsBuff, ok := si.moveNext() + ts, line, nonIndexedLabels, ok := si.moveNext() if !ok { si.Close() return false } - var nonIndexedLabels labels.Labels - if len(nonIndexedLabelsBuff) > 0 { - if len(nonIndexedLabelsBuff)%2 != 0 { - si.err = fmt.Errorf("expected even number of metadata labels, got %d", len(nonIndexedLabelsBuff)) - return false - } - - nonIndexedLabels = make(labels.Labels, len(nonIndexedLabelsBuff)/2) - for i := 0; i < len(nonIndexedLabelsBuff); i += 2 { - nonIndexedLabels[i/2].Name = string(nonIndexedLabelsBuff[i]) - nonIndexedLabels[i/2].Value = string(nonIndexedLabelsBuff[i+1]) - } - } - si.currTs = ts si.currLine = line si.currNonIndexedLabels = nonIndexedLabels @@ -1204,7 +1344,7 @@ func (si *bufferedIterator) Next() bool { } // moveNext moves the buffer to the next entry -func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) { +func (si *bufferedIterator) moveNext() (int64, []byte, labels.Labels, bool) { var decompressedBytes int64 var decompressedNonIndexedLabelsBytes int64 var ts int64 @@ -1280,11 +1420,9 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) { return ts, si.buf[:lineSize], nil, true } - // TODO: This is pretty similar to how we read the line size, and the metadata name and value sizes - // Maybe we can extract it to a separate function and reuse it? lastAttempt = 0 - var labelsWidth, nLabels int - for labelsWidth == 0 { // Read until we have enough bytes for the labels. + var symbolsSectionLengthWidth, nSymbolsWidth, nSymbols int + for nSymbolsWidth == 0 { // Read until we have enough bytes for the labels. n, err := si.reader.Read(si.readBuf[si.readBufValid:]) si.readBufValid += n if err != nil { @@ -1301,43 +1439,63 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) { } } var l uint64 - l, labelsWidth = binary.Uvarint(si.readBuf[:si.readBufValid]) - nLabels = int(l) + _, symbolsSectionLengthWidth = binary.Uvarint(si.readBuf[:si.readBufValid]) + l, nSymbolsWidth = binary.Uvarint(si.readBuf[symbolsSectionLengthWidth:si.readBufValid]) + nSymbols = int(l) lastAttempt = si.readBufValid } // Number of labels decompressedNonIndexedLabelsBytes += binary.MaxVarintLen64 + // Label symbols + decompressedNonIndexedLabelsBytes += int64(nSymbols * 2 * binary.MaxVarintLen64) // Shift down what is still left in the fixed-size read buffer, if any. - si.readBufValid = copy(si.readBuf[:], si.readBuf[labelsWidth:si.readBufValid]) - - // If not enough space for the labels, create a new buffer slice and put the old one back in the pool. - nonIndexedLabelsBufLen := nLabels * 2 - if si.nonIndexedLabelsBuf == nil || nonIndexedLabelsBufLen > cap(si.nonIndexedLabelsBuf) { - if si.nonIndexedLabelsBuf != nil { - for i := range si.nonIndexedLabelsBuf { - if si.nonIndexedLabelsBuf[i] != nil { - BytesBufferPool.Put(si.nonIndexedLabelsBuf[i]) + si.readBufValid = copy(si.readBuf[:], si.readBuf[symbolsSectionLengthWidth+nSymbolsWidth:si.readBufValid]) + + /* + Commented out tested code, which lets us skip reading the symbols section altogether. + Leaving it here if in case we need it in future. + + symbolsSectionLength -= nSymbolsWidth + if symbolsSectionLength > 0 { + readBufValid := si.readBufValid + if symbolsSectionLength >= si.readBufValid { + si.readBufValid = 0 + } else { + si.readBufValid = copy(si.readBuf[:], si.readBuf[symbolsSectionLength:si.readBufValid]) + } + symbolsSectionLength -= readBufValid - si.readBufValid + if symbolsSectionLength > 0 { + _, err := si.reader.Read(make([]byte, symbolsSectionLength)) + if err != nil { + si.err = err + return 0, nil, nil, false } } - LabelsPool.Put(si.nonIndexedLabelsBuf) + nSymbols = 0 + } + */ + + // If not enough space for the symbols, create a new buffer slice and put the old one back in the pool. + if nSymbols > cap(si.symbolsBuf) { + if si.symbolsBuf != nil { + SymbolsPool.Put(si.symbolsBuf) } - si.nonIndexedLabelsBuf = LabelsPool.Get(nonIndexedLabelsBufLen).([][]byte) - if nonIndexedLabelsBufLen > cap(si.nonIndexedLabelsBuf) { - si.err = fmt.Errorf("could not get a labels matrix of size %d, actual %d", nonIndexedLabelsBufLen, cap(si.nonIndexedLabelsBuf)) + si.symbolsBuf = SymbolsPool.Get(nSymbols).([]symbol) + if nSymbols > cap(si.symbolsBuf) { + si.err = fmt.Errorf("could not get a symbols matrix of size %d, actual %d", nSymbols, cap(si.symbolsBuf)) return 0, nil, nil, false } } - si.nonIndexedLabelsBuf = si.nonIndexedLabelsBuf[:nLabels*2] + si.symbolsBuf = si.symbolsBuf[:nSymbols] - // Read all the label-value pairs, into the buffer slice. - for i := 0; i < nonIndexedLabelsBufLen; i++ { - // Read the length of the label. - lastAttempt = 0 - var labelWidth, labelSize int - for labelWidth == 0 { // Read until we have enough bytes for the name. + // Read all the symbols, into the buffer. + for i := 0; i < nSymbols; i++ { + var sName, sValue uint64 + var nWidth, vWidth, lastAttempt int + for vWidth == 0 { // Read until both varints have enough bytes. n, err := si.reader.Read(si.readBuf[si.readBufValid:]) si.readBufValid += n if err != nil { @@ -1353,57 +1511,23 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) { return 0, nil, nil, false } } - var l uint64 - l, labelWidth = binary.Uvarint(si.readBuf[:si.readBufValid]) - labelSize = int(l) + sName, nWidth = binary.Uvarint(si.readBuf[:si.readBufValid]) + sValue, vWidth = binary.Uvarint(si.readBuf[nWidth:si.readBufValid]) lastAttempt = si.readBufValid } - // Label size - decompressedNonIndexedLabelsBytes += binary.MaxVarintLen64 - - // If the buffer is not yet initialize or too small, we get a new one. - if si.nonIndexedLabelsBuf[i] == nil || labelSize > cap(si.nonIndexedLabelsBuf[i]) { - // in case of a replacement we replace back the buffer in the pool - if si.nonIndexedLabelsBuf[i] != nil { - BytesBufferPool.Put(si.nonIndexedLabelsBuf[i]) - } - si.nonIndexedLabelsBuf[i] = BytesBufferPool.Get(labelSize).([]byte) - if labelSize > cap(si.nonIndexedLabelsBuf[i]) { - si.err = fmt.Errorf("could not get a label buffer of size %d, actual %d", labelSize, cap(si.nonIndexedLabelsBuf[i])) - return 0, nil, nil, false - } - } - - si.nonIndexedLabelsBuf[i] = si.nonIndexedLabelsBuf[i][:labelSize] - // Take however many bytes are left in the read buffer. - n := copy(si.nonIndexedLabelsBuf[i], si.readBuf[labelWidth:si.readBufValid]) // Shift down what is still left in the fixed-size read buffer, if any. - si.readBufValid = copy(si.readBuf[:], si.readBuf[labelWidth+n:si.readBufValid]) - - // Then process reading the label. - for n < labelSize { - r, err := si.reader.Read(si.nonIndexedLabelsBuf[i][n:labelSize]) - n += r - if err != nil { - // We might get EOF after reading enough bytes to fill the buffer, which is OK. - // EOF and zero bytes read when the buffer isn't full is an error. - if err == io.EOF && r != 0 { - continue - } - si.err = err - return 0, nil, nil, false - } - } + si.readBufValid = copy(si.readBuf[:], si.readBuf[nWidth+vWidth:si.readBufValid]) - decompressedNonIndexedLabelsBytes += int64(labelSize) + si.symbolsBuf[i].Name = uint32(sName) + si.symbolsBuf[i].Value = uint32(sValue) } si.stats.AddDecompressedLines(1) si.stats.AddDecompressedNonIndexedLabelsBytes(decompressedNonIndexedLabelsBytes) si.stats.AddDecompressedBytes(decompressedBytes + decompressedNonIndexedLabelsBytes) - return ts, si.buf[:lineSize], si.nonIndexedLabelsBuf[:nonIndexedLabelsBufLen], true + return ts, si.buf[:lineSize], si.symbolizer.Lookup(si.symbolsBuf[:nSymbols]), true } func (si *bufferedIterator) Error() error { return si.err } @@ -1427,23 +1551,17 @@ func (si *bufferedIterator) close() { si.buf = nil } - if si.nonIndexedLabelsBuf != nil { - for i := range si.nonIndexedLabelsBuf { - if si.nonIndexedLabelsBuf[i] != nil { - BytesBufferPool.Put(si.nonIndexedLabelsBuf[i]) - si.nonIndexedLabelsBuf[i] = nil - } - } - LabelsPool.Put(si.nonIndexedLabelsBuf) - si.nonIndexedLabelsBuf = nil + if si.symbolsBuf != nil { + SymbolsPool.Put(si.symbolsBuf) + si.symbolsBuf = nil } si.origBytes = nil } -func newEntryIterator(ctx context.Context, pool ReaderPool, b []byte, pipeline log.StreamPipeline, format byte) iter.EntryIterator { +func newEntryIterator(ctx context.Context, pool ReaderPool, b []byte, pipeline log.StreamPipeline, format byte, symbolizer *symbolizer) iter.EntryIterator { return &entryBufferedIterator{ - bufferedIterator: newBufferedIterator(ctx, pool, b, format), + bufferedIterator: newBufferedIterator(ctx, pool, b, format, symbolizer), pipeline: pipeline, } } @@ -1481,9 +1599,9 @@ func (e *entryBufferedIterator) Next() bool { return false } -func newSampleIterator(ctx context.Context, pool ReaderPool, b []byte, format byte, extractor log.StreamSampleExtractor) iter.SampleIterator { +func newSampleIterator(ctx context.Context, pool ReaderPool, b []byte, format byte, extractor log.StreamSampleExtractor, symbolizer *symbolizer) iter.SampleIterator { it := &sampleBufferedIterator{ - bufferedIterator: newBufferedIterator(ctx, pool, b, format), + bufferedIterator: newBufferedIterator(ctx, pool, b, format, symbolizer), extractor: extractor, } return it diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index d48d2753d0118..4418d97ee9c35 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -171,6 +171,8 @@ func TestBlock(t *testing.T) { } } + var noopStreamPipeline = log.NewNoopPipeline().ForStream(labels.Labels{}) + it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline) require.NoError(t, err) @@ -191,6 +193,14 @@ func TestBlock(t *testing.T) { require.NoError(t, it.Close()) require.Equal(t, len(cases), idx) + countExtractor = func() log.StreamSampleExtractor { + ex, err := log.NewLineSampleExtractor(log.CountExtractor, nil, nil, false, false) + if err != nil { + panic(err) + } + return ex.ForStream(labels.Labels{}) + }() + sampleIt := chk.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), countExtractor) idx = 0 for sampleIt.Next() { @@ -384,50 +394,83 @@ func TestSerialization(t *testing.T) { for _, testData := range allPossibleFormats { for _, enc := range testEncoding { enc := enc - t.Run(testNameWithFormats(enc, testData.chunkFormat, testData.headBlockFmt), func(t *testing.T) { - t.Parallel() - - chk := NewMemChunk(enc, testData.headBlockFmt, testBlockSize, testTargetSize) - chk.format = testData.chunkFormat - numSamples := 50000 - - for i := 0; i < numSamples; i++ { - require.NoError(t, chk.Append(logprotoEntry(int64(i), strconv.Itoa(i)))) + // run tests with and without non-indexed labels set since it is optional + for _, appendWithNonIndexedLabels := range []bool{false, true} { + appendWithNonIndexedLabels := appendWithNonIndexedLabels + testName := testNameWithFormats(enc, testData.chunkFormat, testData.headBlockFmt) + if appendWithNonIndexedLabels { + testName = fmt.Sprintf("%s - append non-indexed labels", testName) + } else { + testName = fmt.Sprintf("%s - without non-indexed labels", testName) } - require.NoError(t, chk.Close()) - - byt, err := chk.Bytes() - require.NoError(t, err) + t.Run(testName, func(t *testing.T) { + t.Parallel() + + chk := NewMemChunk(enc, testData.headBlockFmt, testBlockSize, testTargetSize) + chk.format = testData.chunkFormat + numSamples := 50000 + var entry *logproto.Entry + + for i := 0; i < numSamples; i++ { + entry = logprotoEntry(int64(i), strconv.Itoa(i)) + if appendWithNonIndexedLabels { + entry.NonIndexedLabels = []logproto.LabelAdapter{{Name: "foo", Value: strconv.Itoa(i)}} + } + require.NoError(t, chk.Append(entry)) + } + require.NoError(t, chk.Close()) - bc, err := NewByteChunk(byt, testBlockSize, testTargetSize) - require.NoError(t, err) + byt, err := chk.Bytes() + require.NoError(t, err) - it, err := bc.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline) - require.NoError(t, err) - for i := 0; i < numSamples; i++ { - require.True(t, it.Next()) + bc, err := NewByteChunk(byt, testBlockSize, testTargetSize) + require.NoError(t, err) - e := it.Entry() - require.Equal(t, int64(i), e.Timestamp.UnixNano()) - require.Equal(t, strconv.Itoa(i), e.Line) - } - require.NoError(t, it.Error()) + it, err := bc.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{})) + require.NoError(t, err) + for i := 0; i < numSamples; i++ { + require.True(t, it.Next()) - sampleIt := bc.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), countExtractor) - for i := 0; i < numSamples; i++ { - require.True(t, sampleIt.Next(), i) + e := it.Entry() + require.Equal(t, int64(i), e.Timestamp.UnixNano()) + require.Equal(t, strconv.Itoa(i), e.Line) + if appendWithNonIndexedLabels && testData.chunkFormat >= chunkFormatV4 { + require.Equal(t, push.LabelsAdapter{{Name: "foo", Value: strconv.Itoa(i)}}, e.NonIndexedLabels) + } else { + require.Nil(t, e.NonIndexedLabels) + } + } + require.NoError(t, it.Error()) - s := sampleIt.Sample() - require.Equal(t, int64(i), s.Timestamp) - require.Equal(t, 1., s.Value) - } - require.NoError(t, sampleIt.Error()) + countExtractor = func() log.StreamSampleExtractor { + ex, err := log.NewLineSampleExtractor(log.CountExtractor, nil, nil, false, false) + if err != nil { + panic(err) + } + return ex.ForStream(labels.Labels{}) + }() + + sampleIt := bc.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), countExtractor) + for i := 0; i < numSamples; i++ { + require.True(t, sampleIt.Next(), i) + + s := sampleIt.Sample() + require.Equal(t, int64(i), s.Timestamp) + require.Equal(t, 1., s.Value) + if appendWithNonIndexedLabels && testData.chunkFormat >= chunkFormatV4 { + require.Equal(t, fmt.Sprintf(`{foo="%d"}`, i), sampleIt.Labels()) + } else { + require.Equal(t, "{}", sampleIt.Labels()) + } + } + require.NoError(t, sampleIt.Error()) - byt2, err := chk.Bytes() - require.NoError(t, err) + byt2, err := chk.Bytes() + require.NoError(t, err) - require.True(t, bytes.Equal(byt, byt2)) - }) + require.True(t, bytes.Equal(byt, byt2)) + }) + } } } } @@ -1043,15 +1086,19 @@ func TestCheckpointEncoding(t *testing.T) { t.Parallel() blockSize, targetSize := 256*1024, 1500*1024 - for _, f := range HeadBlockFmts { - t.Run(f.String(), func(t *testing.T) { - c := NewMemChunk(EncSnappy, f, blockSize, targetSize) + for _, f := range allPossibleFormats { + t.Run(testNameWithFormats(EncSnappy, f.chunkFormat, f.headBlockFmt), func(t *testing.T) { + c := newMemChunkWithFormat(f.chunkFormat, EncSnappy, f.headBlockFmt, blockSize, targetSize) // add a few entries for i := 0; i < 5; i++ { entry := &logproto.Entry{ Timestamp: time.Unix(int64(i), 0), Line: fmt.Sprintf("hi there - %d", i), + NonIndexedLabels: push.LabelsAdapter{{ + Name: fmt.Sprintf("name%d", i), + Value: fmt.Sprintf("val%d", i), + }}, } require.Equal(t, true, c.SpaceFor(entry)) require.Nil(t, c.Append(entry)) @@ -1077,9 +1124,15 @@ func TestCheckpointEncoding(t *testing.T) { err := c.SerializeForCheckpointTo(&chk, &head) require.Nil(t, err) - cpy, err := MemchunkFromCheckpoint(chk.Bytes(), head.Bytes(), f, blockSize, targetSize) + cpy, err := MemchunkFromCheckpoint(chk.Bytes(), head.Bytes(), f.headBlockFmt, blockSize, targetSize) require.Nil(t, err) + if f.chunkFormat <= chunkFormatV2 { + for i := range c.blocks { + c.blocks[i].uncompressedSize = 0 + } + } + require.Equal(t, c, cpy) }) } @@ -1574,14 +1627,14 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) { // The expected bytes is the sum of bytes decompressed and bytes read from the head chunk. // First we add the bytes read from the store (aka decompressed). That's - // nonIndexedLabelsBytes = n. lines * (n. labels + (2 * n. labels) * (label length + label)) + // nonIndexedLabelsBytes = n. lines * (n. labels + (2 * n. nonIndexedLabelsSymbols * symbol )) // lineBytes = n. lines * (ts + line length + line) - expectedNonIndexedLabelsBytes := 2 * (binary.MaxVarintLen64 + (binary.MaxVarintLen64 + len("traceID") + binary.MaxVarintLen64 + len("123") + binary.MaxVarintLen64 + len("user") + binary.MaxVarintLen64 + len("a"))) + expectedNonIndexedLabelsBytes := 2 * (binary.MaxVarintLen64 + (2 * 2 * binary.MaxVarintLen64)) lineBytes := 2 * (2*binary.MaxVarintLen64 + len("lineA")) // Now we add the bytes read from the head chunk. That's - // nonIndexedLabelsBytes = n. lines * (n. labels * (label name + label value)) + // nonIndexedLabelsBytes = n. lines * (2 * n. nonIndexedLabelsSymbols * symbol ) // lineBytes = n. lines * (line) - expectedNonIndexedLabelsBytes += 2 * (len("traceID") + len("789") + len("user") + len("c")) + expectedNonIndexedLabelsBytes += 2 * (2 * 2 * 4) lineBytes += 2 * (len("lineC")) // Finally, the expected total bytes is the line bytes + non-indexed labels bytes expectedBytes := lineBytes + expectedNonIndexedLabelsBytes diff --git a/pkg/chunkenc/pool.go b/pkg/chunkenc/pool.go index 1bf7887d6b770..a19ef28849191 100644 --- a/pkg/chunkenc/pool.go +++ b/pkg/chunkenc/pool.go @@ -54,6 +54,8 @@ var ( // So we will be able to store from 0 to 128 labels. LabelsPool = pool.New(1<<3, 1<<8, 2, func(size int) interface{} { return make([][]byte, 0, size) }) + SymbolsPool = pool.New(1<<3, 1<<8, 2, func(size int) interface{} { return make([]symbol, 0, size) }) + // SamplesPool pooling array of samples [512,1024,...,16k] SamplesPool = pool.New(1<<9, 1<<14, 2, func(size int) interface{} { return make([]logproto.Sample, 0, size) }) diff --git a/pkg/chunkenc/symbols.go b/pkg/chunkenc/symbols.go new file mode 100644 index 0000000000000..94d04d863f9c8 --- /dev/null +++ b/pkg/chunkenc/symbols.go @@ -0,0 +1,397 @@ +package chunkenc + +import ( + "bytes" + "encoding/binary" + "fmt" + "hash" + "io" + "sync" + + "github.com/pkg/errors" + "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/loki/pkg/util" +) + +// symbol holds reference to a label name and value pair +type symbol struct { + Name, Value uint32 +} + +type symbols []symbol + +// symbolizer holds a collection of label names and values and assign symbols to them. +// symbols are actually index numbers assigned based on when the entry is seen for the first time. +type symbolizer struct { + mtx sync.RWMutex + symbolsMap map[string]uint32 + labels []string + size int + compressedSize int +} + +func newSymbolizer() *symbolizer { + return &symbolizer{ + symbolsMap: map[string]uint32{}, + } +} + +// Reset resets all the data and makes the symbolizer ready for reuse +func (s *symbolizer) Reset() { + s.mtx.Lock() + defer s.mtx.Unlock() + + s.symbolsMap = map[string]uint32{} + s.labels = s.labels[:0] + s.size = 0 + s.compressedSize = 0 +} + +// Add adds new labels pairs to the collection and returns back a symbol for each existing and new label pair +func (s *symbolizer) Add(lbls labels.Labels) symbols { + if len(lbls) == 0 { + return nil + } + + syms := make([]symbol, len(lbls)) + + for i, label := range lbls { + syms[i].Name = s.add(label.Name) + syms[i].Value = s.add(label.Value) + } + + return syms +} + +func (s *symbolizer) add(lbl string) uint32 { + s.mtx.RLock() + idx, ok := s.symbolsMap[lbl] + s.mtx.RUnlock() + + if ok { + return idx + } + + s.mtx.Lock() + defer s.mtx.Unlock() + + idx, ok = s.symbolsMap[lbl] + if !ok { + idx = uint32(len(s.labels)) + s.symbolsMap[lbl] = idx + s.labels = append(s.labels, lbl) + s.size += len(lbl) + } + + return idx +} + +// Lookup coverts and returns labels pairs for the given symbols +func (s *symbolizer) Lookup(syms symbols) labels.Labels { + if len(syms) == 0 { + return nil + } + lbls := make([]labels.Label, len(syms)) + + for i, symbol := range syms { + lbls[i].Name = s.lookup(symbol.Name) + lbls[i].Value = s.lookup(symbol.Value) + } + + return lbls +} + +func (s *symbolizer) lookup(idx uint32) string { + // take a read lock only if we will be getting new entries + if s.symbolsMap != nil { + s.mtx.RLock() + defer s.mtx.RUnlock() + } + + if idx > uint32(len(s.labels)-1) { + return "" + } + + return s.labels[idx] +} + +// UncompressedSize returns the number of bytes taken up by deduped string labels +func (s *symbolizer) UncompressedSize() int { + s.mtx.RLock() + defer s.mtx.RUnlock() + + return s.size +} + +// CompressedSize returns the number of bytes that were taken by serialized symbolizer which means it is set only when deserialized from serialized bytes +func (s *symbolizer) CompressedSize() int { + return s.compressedSize +} + +// DecompressedSize returns the number of bytes we would have gotten after decompressing serialized bytes. +// It returns non-zero value only if we actually loaded it from serialized bytes by looking at value returned by CompressedSize() because +// we only want to consider data being decompressed if it actually was done. +func (s *symbolizer) DecompressedSize() int { + if s.CompressedSize() == 0 { + return 0 + } + return s.CheckpointSize() +} + +// CheckpointSize returns the number of bytes it would take for writing labels as checkpoint +func (s *symbolizer) CheckpointSize() int { + s.mtx.RLock() + defer s.mtx.RUnlock() + + size := binary.MaxVarintLen32 // number of labels + size += len(s.labels) * binary.MaxVarintLen32 // length of each label + size += s.size // number of bytes occupied by labels + + return size +} + +// SerializeTo serializes all the labels and writes to the writer in compressed format. +// It returns back the number of bytes written and a checksum of the data written. +func (s *symbolizer) SerializeTo(w io.Writer, pool WriterPool) (int, []byte, error) { + crc32Hash := crc32HashPool.Get().(hash.Hash32) + defer crc32HashPool.Put(crc32Hash) + + eb := EncodeBufferPool.Get().(*encbuf) + defer EncodeBufferPool.Put(eb) + + writtenBytes := 0 + crc32Hash.Reset() + eb.reset() + + s.mtx.RLock() + defer s.mtx.RUnlock() + + // write the number of labels without compressing it to make it easier to read the number of labels + eb.putUvarint(len(s.labels)) + + _, err := crc32Hash.Write(eb.get()) + if err != nil { + return 0, nil, errors.Wrap(err, "write num non-indexed labels to crc32hash") + } + n, err := w.Write(eb.get()) + if err != nil { + return 0, nil, errors.Wrap(err, "write num non-indexed labels to writer") + } + writtenBytes += n + + inBuf := serializeBytesBufferPool.Get().(*bytes.Buffer) + defer func() { + inBuf.Reset() + serializeBytesBufferPool.Put(inBuf) + }() + outBuf := &bytes.Buffer{} + + encBuf := make([]byte, binary.MaxVarintLen64) + // write the labels + for _, label := range s.labels { + // write the length of the label + n := binary.PutUvarint(encBuf, uint64(len(label))) + inBuf.Write(encBuf[:n]) + + // write the label + inBuf.WriteString(label) + } + + // compress the labels block + compressedWriter := pool.GetWriter(outBuf) + defer pool.PutWriter(compressedWriter) + + if _, err := compressedWriter.Write(inBuf.Bytes()); err != nil { + return 0, nil, errors.Wrap(err, "appending entry") + } + if err := compressedWriter.Close(); err != nil { + return 0, nil, errors.Wrap(err, "flushing pending compress buffer") + } + + b := outBuf.Bytes() + + // hash the labels block + _, err = crc32Hash.Write(b) + if err != nil { + return writtenBytes, nil, errors.Wrap(err, "build non-indexed labels hash") + } + + // write the labels block to writer + n, err = w.Write(b) + if err != nil { + return writtenBytes, nil, errors.Wrap(err, "write non-indexed labels block") + } + writtenBytes += n + return writtenBytes, crc32Hash.Sum(nil), nil +} + +// CheckpointTo writes all the labels to the writer. +// It returns back the number of bytes written and a checksum of the data written. +func (s *symbolizer) CheckpointTo(w io.Writer) (int, []byte, error) { + crcHash := crc32HashPool.Get().(hash.Hash32) + defer crc32HashPool.Put(crcHash) + + eb := EncodeBufferPool.Get().(*encbuf) + defer EncodeBufferPool.Put(eb) + + writtenBytes := 0 + crcHash.Reset() + eb.reset() + + s.mtx.RLock() + defer s.mtx.RUnlock() + + // write the number of labels + eb.putUvarint(len(s.labels)) + n, err := w.Write(eb.get()) + if err != nil { + return 0, nil, err + } + + _, err = crcHash.Write(eb.get()) + if err != nil { + return writtenBytes, nil, err + } + + eb.reset() + writtenBytes += n + + for _, label := range s.labels { + // write label length + eb.putUvarint(len(label)) + n, err := w.Write(eb.get()) + if err != nil { + return writtenBytes, nil, err + } + + _, err = crcHash.Write(eb.get()) + if err != nil { + return writtenBytes, nil, err + } + + eb.reset() + writtenBytes += n + + // write the label + + _, err = crcHash.Write(util.YoloBuf(label)) + if err != nil { + return writtenBytes, nil, err + } + + n, err = io.WriteString(w, label) + if err != nil { + return writtenBytes, nil, err + } + writtenBytes += n + } + + return writtenBytes, crcHash.Sum(nil), nil +} + +// symbolizerFromCheckpoint builds symbolizer from the bytes generated during a checkpoint. +func symbolizerFromCheckpoint(b []byte) *symbolizer { + if len(b) == 0 { + return newSymbolizer() + } + + db := decbuf{b: b} + numLabels := db.uvarint() + s := symbolizer{ + symbolsMap: make(map[string]uint32, numLabels), + labels: make([]string, 0, numLabels), + } + + for i := 0; i < numLabels; i++ { + label := string(db.bytes(db.uvarint())) + s.labels = append(s.labels, label) + s.symbolsMap[label] = uint32(i) + s.size += len(label) + } + + return &s +} + +// symbolizerFromEnc builds symbolizer from the bytes generated during serialization. +func symbolizerFromEnc(b []byte, pool ReaderPool) (*symbolizer, error) { + db := decbuf{b: b} + numLabels := db.uvarint() + + b = db.b + + reader, err := pool.GetReader(bytes.NewBuffer(b)) + if err != nil { + return nil, err + } + defer pool.PutReader(reader) + + s := symbolizer{ + labels: make([]string, 0, numLabels), + compressedSize: len(b), + } + + var ( + readBuf [10]byte // Enough bytes to store one varint. + readBufValid int // How many bytes are left in readBuf from previous read. + buf []byte + ) + + for i := 0; i < numLabels; i++ { + var lWidth, labelSize, lastAttempt int + for lWidth == 0 { // Read until both varints have enough bytes. + n, err := reader.Read(readBuf[readBufValid:]) + readBufValid += n + if err != nil { + if err != io.EOF { + return nil, err + } + if readBufValid == 0 { // Got EOF and no data in the buffer. + return nil, fmt.Errorf("got unexpected EOF") + } + if readBufValid == lastAttempt { // Got EOF and could not parse same data last time. + return nil, fmt.Errorf("invalid non-indexed labels block in chunk") + } + } + var l uint64 + l, lWidth = binary.Uvarint(readBuf[:readBufValid]) + labelSize = int(l) + lastAttempt = readBufValid + } + + // If the buffer is not yet initialize or too small, we get a new one. + if buf == nil || labelSize > cap(buf) { + // in case of a replacement we replace back the buffer in the pool + if buf != nil { + BytesBufferPool.Put(buf) + } + buf = BytesBufferPool.Get(labelSize).([]byte) + if labelSize > cap(buf) { + return nil, fmt.Errorf("could not get a label buffer of size %d, actual %d", labelSize, cap(buf)) + } + } + buf = buf[:labelSize] + // Take however many bytes are left in the read buffer. + n := copy(buf, readBuf[lWidth:readBufValid]) + // Shift down what is still left in the fixed-size read buffer, if any. + readBufValid = copy(readBuf[:], readBuf[lWidth+n:readBufValid]) + + // Then process reading the line. + for n < labelSize { + r, err := reader.Read(buf[n:labelSize]) + n += r + if err != nil { + // We might get EOF after reading enough bytes to fill the buffer, which is OK. + // EOF and zero bytes read when the buffer isn't full is an error. + if err == io.EOF && r != 0 { + continue + } + return nil, err + } + } + s.labels = append(s.labels, string(buf)) + s.size += len(buf) + } + + return &s, nil +} diff --git a/pkg/chunkenc/symbols_test.go b/pkg/chunkenc/symbols_test.go new file mode 100644 index 0000000000000..661a042974b2b --- /dev/null +++ b/pkg/chunkenc/symbols_test.go @@ -0,0 +1,163 @@ +package chunkenc + +import ( + "bytes" + "encoding/binary" + "fmt" + "testing" + + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" +) + +func TestSymbolizer(t *testing.T) { + for _, tc := range []struct { + name string + labelsToAdd []labels.Labels + expectedSymbols []symbols + + expectedNumLabels int + expectedCheckpointSize int + expectedUncompressedSize int + }{ + { + name: "no labels", + expectedCheckpointSize: binary.MaxVarintLen32, + }, + { + name: "no duplicate labels", + labelsToAdd: []labels.Labels{ + { + labels.Label{ + Name: "foo", + Value: "bar", + }, + }, + { + labels.Label{ + Name: "fizz", + Value: "buzz", + }, + labels.Label{ + Name: "ping", + Value: "pong", + }, + }, + }, + expectedSymbols: []symbols{ + { + symbol{ + Name: 0, + Value: 1, + }, + }, + { + symbol{ + Name: 2, + Value: 3, + }, + symbol{ + Name: 4, + Value: 5, + }, + }, + }, + expectedNumLabels: 6, + expectedCheckpointSize: binary.MaxVarintLen32 + 6*binary.MaxVarintLen32 + 22, + expectedUncompressedSize: 22, + }, + { + name: "with duplicate labels", + labelsToAdd: []labels.Labels{ + { + labels.Label{ + Name: "foo", + Value: "bar", + }, + { + Name: "bar", + Value: "foo", + }, + }, + { + labels.Label{ + Name: "foo", + Value: "bar", + }, + labels.Label{ + Name: "fizz", + Value: "buzz", + }, + labels.Label{ + Name: "ping", + Value: "pong", + }, + }, + }, + expectedSymbols: []symbols{ + { + symbol{ + Name: 0, + Value: 1, + }, + symbol{ + Name: 1, + Value: 0, + }, + }, + { + symbol{ + Name: 0, + Value: 1, + }, + symbol{ + Name: 2, + Value: 3, + }, + symbol{ + Name: 4, + Value: 5, + }, + }, + }, + expectedNumLabels: 6, + expectedCheckpointSize: binary.MaxVarintLen32 + 6*binary.MaxVarintLen32 + 22, + expectedUncompressedSize: 22, + }, + } { + for _, encoding := range testEncoding { + t.Run(fmt.Sprintf("%s - %s", tc.name, encoding), func(t *testing.T) { + s := newSymbolizer() + for i, labels := range tc.labelsToAdd { + symbols := s.Add(labels) + require.Equal(t, tc.expectedSymbols[i], symbols) + require.Equal(t, labels, s.Lookup(symbols)) + } + + require.Equal(t, tc.expectedNumLabels, len(s.labels)) + require.Equal(t, tc.expectedCheckpointSize, s.CheckpointSize()) + require.Equal(t, tc.expectedUncompressedSize, s.UncompressedSize()) + + buf := bytes.NewBuffer(nil) + numBytesWritten, _, err := s.CheckpointTo(buf) + require.NoError(t, err) + require.LessOrEqual(t, numBytesWritten, tc.expectedCheckpointSize) + + loaded := symbolizerFromCheckpoint(buf.Bytes()) + for i, symbols := range tc.expectedSymbols { + require.Equal(t, tc.labelsToAdd[i], loaded.Lookup(symbols)) + } + + buf.Reset() + _, _, err = s.SerializeTo(buf, getWriterPool(encoding)) + require.NoError(t, err) + + loaded, err = symbolizerFromEnc(buf.Bytes(), getReaderPool(encoding)) + require.NoError(t, err) + for i, symbols := range tc.expectedSymbols { + require.Equal(t, tc.labelsToAdd[i], loaded.Lookup(symbols)) + } + }) + } + } +} diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index da4887c891449..7e34a607b1dbe 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -33,7 +33,7 @@ type HeadBlock interface { Bounds() (mint, maxt int64) Entries() int UncompressedSize() int - Convert(HeadBlockFmt) (HeadBlock, error) + Convert(HeadBlockFmt, *symbolizer) (HeadBlock, error) Append(int64, string, labels.Labels) error Iterator( ctx context.Context, @@ -58,15 +58,17 @@ type unorderedHeadBlock struct { // Scans: (O(k+log(n))) where k=num_scanned_entries & n=total_entries rt rangetree.RangeTree + symbolizer *symbolizer lines int // number of entries size int // size of uncompressed bytes. mint, maxt int64 // upper and lower bounds } -func newUnorderedHeadBlock(headBlockFmt HeadBlockFmt) *unorderedHeadBlock { +func newUnorderedHeadBlock(headBlockFmt HeadBlockFmt, symbolizer *symbolizer) *unorderedHeadBlock { return &unorderedHeadBlock{ - format: headBlockFmt, - rt: rangetree.New(1), + format: headBlockFmt, + symbolizer: symbolizer, + rt: rangetree.New(1), } } @@ -89,13 +91,13 @@ func (hb *unorderedHeadBlock) UncompressedSize() int { } func (hb *unorderedHeadBlock) Reset() { - x := newUnorderedHeadBlock(hb.format) + x := newUnorderedHeadBlock(hb.format, hb.symbolizer) *hb = *x } type nsEntry struct { - line string - nonIndexedLabels labels.Labels + line string + nonIndexedLabelsSymbols symbols } // collection of entries belonging to the same nanosecond @@ -108,10 +110,10 @@ func (e *nsEntries) ValueAtDimension(_ uint64) int64 { return e.ts } -func (hb *unorderedHeadBlock) Append(ts int64, line string, metaLabels labels.Labels) error { +func (hb *unorderedHeadBlock) Append(ts int64, line string, nonIndexedLabels labels.Labels) error { if hb.format < UnorderedWithNonIndexedLabelsHeadBlockFmt { - // metaLabels must be ignored for the previous head block formats - metaLabels = nil + // nonIndexedLabels must be ignored for the previous head block formats + nonIndexedLabels = nil } // This is an allocation hack. The rangetree lib does not // support the ability to pass a "mutate" function during an insert @@ -136,9 +138,9 @@ func (hb *unorderedHeadBlock) Append(ts int64, line string, metaLabels labels.La return nil } } - e.entries = append(displaced[0].(*nsEntries).entries, nsEntry{line, metaLabels}) + e.entries = append(displaced[0].(*nsEntries).entries, nsEntry{line, hb.symbolizer.Add(nonIndexedLabels)}) } else { - e.entries = []nsEntry{{line, metaLabels}} + e.entries = []nsEntry{{line, hb.symbolizer.Add(nonIndexedLabels)}} } // Update hb metdata @@ -150,7 +152,8 @@ func (hb *unorderedHeadBlock) Append(ts int64, line string, metaLabels labels.La hb.maxt = ts } - hb.size += len(line) + metaLabelsLen(metaLabels) + hb.size += len(line) + hb.size += len(nonIndexedLabels) * 2 * 4 // 4 bytes per label and value pair as nonIndexedLabelsSymbols hb.lines++ return nil @@ -181,7 +184,7 @@ func (hb *unorderedHeadBlock) forEntries( direction logproto.Direction, mint, maxt int64, - entryFn func(*stats.Context, int64, string, labels.Labels) error, // returning an error exits early + entryFn func(*stats.Context, int64, string, symbols) error, // returning an error exits early ) (err error) { if hb.IsEmpty() || (maxt < hb.mint || hb.maxt < mint) { return @@ -211,16 +214,12 @@ func (hb *unorderedHeadBlock) forEntries( for ; i < len(es.entries) && i >= 0; next() { line := es.entries[i].line - nonIndexedLabels := es.entries[i].nonIndexedLabels - - var nonIndexedLabelsBytes int64 - for _, label := range nonIndexedLabels { - nonIndexedLabelsBytes += int64(len(label.Name) + len(label.Value)) - } + nonIndexedLabelsSymbols := es.entries[i].nonIndexedLabelsSymbols + nonIndexedLabelsBytes := int64(2 * len(nonIndexedLabelsSymbols) * 4) // 2 * num_symbols * 4 bytes(uint32) chunkStats.AddHeadChunkNonIndexedLabelsBytes(nonIndexedLabelsBytes) chunkStats.AddHeadChunkBytes(int64(len(line)) + nonIndexedLabelsBytes) - err = entryFn(chunkStats, es.ts, line, nonIndexedLabels) + err = entryFn(chunkStats, es.ts, line, nonIndexedLabelsSymbols) } } @@ -262,8 +261,8 @@ func (hb *unorderedHeadBlock) Iterator( direction, mint, maxt, - func(statsCtx *stats.Context, ts int64, line string, nonIndexedLabels labels.Labels) error { - newLine, parsedLbs, matches := pipeline.ProcessString(ts, line, nonIndexedLabels...) + func(statsCtx *stats.Context, ts int64, line string, nonIndexedLabelsSymbols symbols) error { + newLine, parsedLbs, matches := pipeline.ProcessString(ts, line, hb.symbolizer.Lookup(nonIndexedLabelsSymbols)...) if !matches { return nil } @@ -282,7 +281,7 @@ func (hb *unorderedHeadBlock) Iterator( stream.Entries = append(stream.Entries, logproto.Entry{ Timestamp: time.Unix(0, ts), Line: newLine, - NonIndexedLabels: logproto.FromLabelsToLabelAdapters(nonIndexedLabels), + NonIndexedLabels: logproto.FromLabelsToLabelAdapters(hb.symbolizer.Lookup(nonIndexedLabelsSymbols)), }) return nil }, @@ -312,8 +311,8 @@ func (hb *unorderedHeadBlock) SampleIterator( logproto.FORWARD, mint, maxt, - func(statsCtx *stats.Context, ts int64, line string, metaLabels labels.Labels) error { - value, parsedLabels, ok := extractor.ProcessString(ts, line, metaLabels...) + func(statsCtx *stats.Context, ts int64, line string, nonIndexedLabelsSymbols symbols) error { + value, parsedLabels, ok := extractor.ProcessString(ts, line, hb.symbolizer.Lookup(nonIndexedLabelsSymbols)...) if !ok { return nil } @@ -364,6 +363,13 @@ func (hb *unorderedHeadBlock) Serialise(pool WriterPool) ([]byte, error) { inBuf.Reset() serializeBytesBufferPool.Put(inBuf) }() + + symbolsSectionBuf := serializeBytesBufferPool.Get().(*bytes.Buffer) + defer func() { + symbolsSectionBuf.Reset() + serializeBytesBufferPool.Put(symbolsSectionBuf) + }() + outBuf := &bytes.Buffer{} encBuf := make([]byte, binary.MaxVarintLen64) @@ -375,7 +381,7 @@ func (hb *unorderedHeadBlock) Serialise(pool WriterPool) ([]byte, error) { logproto.FORWARD, 0, math.MaxInt64, - func(_ *stats.Context, ts int64, line string, metaLabels labels.Labels) error { + func(_ *stats.Context, ts int64, line string, nonIndexedLabelsSymbols symbols) error { n := binary.PutVarint(encBuf, ts) inBuf.Write(encBuf[:n]) @@ -385,18 +391,29 @@ func (hb *unorderedHeadBlock) Serialise(pool WriterPool) ([]byte, error) { inBuf.WriteString(line) if hb.format >= UnorderedWithNonIndexedLabelsHeadBlockFmt { - // Serialize non-indexed labels - n = binary.PutUvarint(encBuf, uint64(len(metaLabels))) - inBuf.Write(encBuf[:n]) - for _, l := range metaLabels { - n = binary.PutUvarint(encBuf, uint64(len(l.Name))) - inBuf.Write(encBuf[:n]) - inBuf.WriteString(l.Name) - - n = binary.PutUvarint(encBuf, uint64(len(l.Value))) - inBuf.Write(encBuf[:n]) - inBuf.WriteString(l.Value) + symbolsSectionBuf.Reset() + // Serialize non-indexed labels symbols to symbolsSectionBuf so that we can find and write its length before + // writing symbols section to inbuf since we can't estimate its size beforehand due to variable length encoding. + + // write the number of symbol pairs + n = binary.PutUvarint(encBuf, uint64(len(nonIndexedLabelsSymbols))) + symbolsSectionBuf.Write(encBuf[:n]) + + // write the symbols + for _, l := range nonIndexedLabelsSymbols { + n = binary.PutUvarint(encBuf, uint64(l.Name)) + symbolsSectionBuf.Write(encBuf[:n]) + + n = binary.PutUvarint(encBuf, uint64(l.Value)) + symbolsSectionBuf.Write(encBuf[:n]) } + + // write the length of symbols section first + n = binary.PutUvarint(encBuf, uint64(symbolsSectionBuf.Len())) + inBuf.Write(encBuf[:n]) + + // copy the symbols section + inBuf.Write(symbolsSectionBuf.Bytes()) } return nil }, @@ -412,19 +429,19 @@ func (hb *unorderedHeadBlock) Serialise(pool WriterPool) ([]byte, error) { return outBuf.Bytes(), nil } -func (hb *unorderedHeadBlock) Convert(version HeadBlockFmt) (HeadBlock, error) { +func (hb *unorderedHeadBlock) Convert(version HeadBlockFmt, symbolizer *symbolizer) (HeadBlock, error) { if hb.format == version { return hb, nil } - out := version.NewBlock() + out := version.NewBlock(symbolizer) err := hb.forEntries( context.Background(), logproto.FORWARD, 0, math.MaxInt64, - func(_ *stats.Context, ts int64, line string, metaLabels labels.Labels) error { - return out.Append(ts, line, metaLabels) + func(_ *stats.Context, ts int64, line string, nonIndexedLabelsSymbols symbols) error { + return out.Append(ts, line, hb.symbolizer.Lookup(nonIndexedLabelsSymbols)) }, ) return out, err @@ -437,19 +454,8 @@ func (hb *unorderedHeadBlock) CheckpointSize() int { size += binary.MaxVarintLen64 * 2 // mint,maxt size += (binary.MaxVarintLen64 + binary.MaxVarintLen32) * hb.lines // ts + len of log line. if hb.format >= UnorderedWithNonIndexedLabelsHeadBlockFmt { - _ = hb.forEntries( - context.Background(), - logproto.FORWARD, - 0, - math.MaxInt64, - func(_ *stats.Context, ts int64, line string, metaLabels labels.Labels) error { - // len of meta labels - size += binary.MaxVarintLen32 - // len of name and value of each meta label, the size of values is already included into hb.size - size += (binary.MaxVarintLen32 * 2) * len(metaLabels) - return nil - }, - ) + // number of non-indexed labels stored for each log entry + size += binary.MaxVarintLen32 * hb.lines } size += hb.size // uncompressed bytes of lines return size @@ -491,7 +497,7 @@ func (hb *unorderedHeadBlock) CheckpointTo(w io.Writer) error { logproto.FORWARD, 0, math.MaxInt64, - func(_ *stats.Context, ts int64, line string, metaLabels labels.Labels) error { + func(_ *stats.Context, ts int64, line string, nonIndexedLabelsSymbols symbols) error { eb.putVarint64(ts) eb.putUvarint(len(line)) _, err = w.Write(eb.get()) @@ -507,29 +513,20 @@ func (hb *unorderedHeadBlock) CheckpointTo(w io.Writer) error { if hb.format >= UnorderedWithNonIndexedLabelsHeadBlockFmt { // non-indexed labels - eb.putUvarint(len(metaLabels)) + eb.putUvarint(len(nonIndexedLabelsSymbols)) _, err = w.Write(eb.get()) if err != nil { return errors.Wrap(err, "write headBlock entry meta labels length") } eb.reset() - for _, l := range metaLabels { - eb.putUvarint(len(l.Name)) - eb.putUvarint(len(l.Value)) + for _, l := range nonIndexedLabelsSymbols { + eb.putUvarint(int(l.Name)) + eb.putUvarint(int(l.Value)) _, err = w.Write(eb.get()) if err != nil { - return errors.Wrap(err, "write headBlock entry meta label name and value length") + return errors.Wrap(err, "write headBlock entry nonIndexedLabelsSymbols") } eb.reset() - - _, err = io.WriteString(w, l.Name) - if err != nil { - return errors.Wrap(err, "write headBlock entry meta label name") - } - _, err = io.WriteString(w, l.Value) - if err != nil { - return errors.Wrap(err, "write headBlock entry meta label value") - } } } @@ -542,7 +539,7 @@ func (hb *unorderedHeadBlock) CheckpointTo(w io.Writer) error { func (hb *unorderedHeadBlock) LoadBytes(b []byte) error { // ensure it's empty - *hb = *newUnorderedHeadBlock(hb.format) + *hb = *newUnorderedHeadBlock(hb.format, hb.symbolizer) if len(b) < 1 { return nil @@ -570,23 +567,21 @@ func (hb *unorderedHeadBlock) LoadBytes(b []byte) error { lineLn := db.uvarint() line := string(db.bytes(lineLn)) - var metaLabels labels.Labels + var nonIndexedLabelsSymbols symbols if version >= UnorderedWithNonIndexedLabelsHeadBlockFmt.Byte() { metaLn := db.uvarint() if metaLn > 0 { - metaLabels = make(labels.Labels, metaLn) + nonIndexedLabelsSymbols = make([]symbol, metaLn) for j := 0; j < metaLn && db.err() == nil; j++ { - nameLn := db.uvarint() - valueLn := db.uvarint() - metaLabels[j] = labels.Label{ - Name: string(db.bytes(nameLn)), - Value: string(db.bytes(valueLn)), + nonIndexedLabelsSymbols[j] = symbol{ + Name: uint32(db.uvarint()), + Value: uint32(db.uvarint()), } } } } - if err := hb.Append(ts, line, metaLabels); err != nil { + if err := hb.Append(ts, line, hb.symbolizer.Lookup(nonIndexedLabelsSymbols)); err != nil { return err } } @@ -601,9 +596,9 @@ func (hb *unorderedHeadBlock) LoadBytes(b []byte) error { // HeadFromCheckpoint handles reading any head block format and returning the desired form. // This is particularly helpful replaying WALs from different configurations // such as after enabling unordered writes. -func HeadFromCheckpoint(b []byte, desired HeadBlockFmt) (HeadBlock, error) { +func HeadFromCheckpoint(b []byte, desiredIfNotUnordered HeadBlockFmt, symbolizer *symbolizer) (HeadBlock, error) { if len(b) == 0 { - return desired.NewBlock(), nil + return desiredIfNotUnordered.NewBlock(symbolizer), nil } db := decbuf{b: b} @@ -617,13 +612,13 @@ func HeadFromCheckpoint(b []byte, desired HeadBlockFmt) (HeadBlock, error) { return nil, fmt.Errorf("unexpected head block version: %v", format) } - decodedBlock := format.NewBlock() + decodedBlock := format.NewBlock(symbolizer) if err := decodedBlock.LoadBytes(b); err != nil { return nil, err } - if decodedBlock.Format() != desired { - return decodedBlock.Convert(desired) + if decodedBlock.Format() < UnorderedHeadBlockFmt && decodedBlock.Format() != desiredIfNotUnordered { + return decodedBlock.Convert(desiredIfNotUnordered, nil) } return decodedBlock, nil } diff --git a/pkg/chunkenc/unordered_test.go b/pkg/chunkenc/unordered_test.go index 532039681bbf3..5a17b0aeb551a 100644 --- a/pkg/chunkenc/unordered_test.go +++ b/pkg/chunkenc/unordered_test.go @@ -32,7 +32,7 @@ func iterEq(t *testing.T, exp []entry, got iter.EntryIterator) { } func Test_forEntriesEarlyReturn(t *testing.T) { - hb := newUnorderedHeadBlock(UnorderedHeadBlockFmt) + hb := newUnorderedHeadBlock(UnorderedHeadBlockFmt, newSymbolizer()) for i := 0; i < 10; i++ { require.Nil(t, hb.Append(int64(i), fmt.Sprint(i), labels.Labels{{Name: "i", Value: fmt.Sprint(i)}})) } @@ -45,7 +45,7 @@ func Test_forEntriesEarlyReturn(t *testing.T) { logproto.FORWARD, 0, math.MaxInt64, - func(_ *stats.Context, ts int64, _ string, _ labels.Labels) error { + func(_ *stats.Context, ts int64, _ string, _ symbols) error { forwardCt++ forwardStop = ts if ts == 5 { @@ -66,7 +66,7 @@ func Test_forEntriesEarlyReturn(t *testing.T) { logproto.BACKWARD, 0, math.MaxInt64, - func(_ *stats.Context, ts int64, _ string, _ labels.Labels) error { + func(_ *stats.Context, ts int64, _ string, _ symbols) error { backwardCt++ backwardStop = ts if ts == 5 { @@ -170,7 +170,7 @@ func Test_Unordered_InsertRetrieval(t *testing.T) { UnorderedWithNonIndexedLabelsHeadBlockFmt, } { t.Run(format.String(), func(t *testing.T) { - hb := newUnorderedHeadBlock(format) + hb := newUnorderedHeadBlock(format, newSymbolizer()) for _, e := range tc.input { require.Nil(t, hb.Append(e.t, e.s, e.nonIndexedLabels)) } @@ -247,7 +247,7 @@ func Test_UnorderedBoundedIter(t *testing.T) { UnorderedWithNonIndexedLabelsHeadBlockFmt, } { t.Run(format.String(), func(t *testing.T) { - hb := newUnorderedHeadBlock(format) + hb := newUnorderedHeadBlock(format, newSymbolizer()) for _, e := range tc.input { require.Nil(t, hb.Append(e.t, e.s, e.nonIndexedLabels)) } @@ -276,8 +276,8 @@ func Test_UnorderedBoundedIter(t *testing.T) { } func TestHeadBlockInterop(t *testing.T) { - unordered, ordered := newUnorderedHeadBlock(UnorderedHeadBlockFmt), &headBlock{} - unorderedWithNonIndexedLabels := newUnorderedHeadBlock(UnorderedWithNonIndexedLabelsHeadBlockFmt) + unordered, ordered := newUnorderedHeadBlock(UnorderedHeadBlockFmt, nil), &headBlock{} + unorderedWithNonIndexedLabels := newUnorderedHeadBlock(UnorderedWithNonIndexedLabelsHeadBlockFmt, newSymbolizer()) for i := 0; i < 100; i++ { metaLabels := labels.Labels{{Name: "foo", Value: fmt.Sprint(99 - i)}} require.Nil(t, unordered.Append(int64(99-i), fmt.Sprint(99-i), metaLabels)) @@ -294,17 +294,17 @@ func TestHeadBlockInterop(t *testing.T) { require.Nil(t, err) // Ensure we can recover ordered checkpoint into ordered headblock - recovered, err := HeadFromCheckpoint(orderedCheckpointBytes, OrderedHeadBlockFmt) + recovered, err := HeadFromCheckpoint(orderedCheckpointBytes, OrderedHeadBlockFmt, nil) require.Nil(t, err) require.Equal(t, ordered, recovered) // Ensure we can recover ordered checkpoint into unordered headblock - recovered, err = HeadFromCheckpoint(orderedCheckpointBytes, UnorderedHeadBlockFmt) + recovered, err = HeadFromCheckpoint(orderedCheckpointBytes, UnorderedHeadBlockFmt, nil) require.Nil(t, err) require.Equal(t, unordered, recovered) // Ensure we can recover ordered checkpoint into unordered headblock with non-indexed labels - recovered, err = HeadFromCheckpoint(orderedCheckpointBytes, UnorderedWithNonIndexedLabelsHeadBlockFmt) + recovered, err = HeadFromCheckpoint(orderedCheckpointBytes, UnorderedWithNonIndexedLabelsHeadBlockFmt, nil) require.NoError(t, err) require.Equal(t, &unorderedHeadBlock{ format: UnorderedWithNonIndexedLabelsHeadBlockFmt, @@ -315,40 +315,28 @@ func TestHeadBlockInterop(t *testing.T) { maxt: unordered.maxt, }, recovered) - // Ensure we can recover unordered checkpoint into ordered headblock - recovered, err = HeadFromCheckpoint(unorderedCheckpointBytes, OrderedHeadBlockFmt) - require.Nil(t, err) - require.Equal(t, ordered, recovered) - // Ensure we can recover unordered checkpoint into unordered headblock - recovered, err = HeadFromCheckpoint(unorderedCheckpointBytes, UnorderedHeadBlockFmt) + recovered, err = HeadFromCheckpoint(unorderedCheckpointBytes, UnorderedHeadBlockFmt, nil) require.Nil(t, err) require.Equal(t, unordered, recovered) - // Ensure we can recover unordered checkpoint into unordered with non-indexed labels - recovered, err = HeadFromCheckpoint(unorderedCheckpointBytes, UnorderedWithNonIndexedLabelsHeadBlockFmt) + // Ensure trying to recover unordered checkpoint into unordered with non-indexed labels keeps it in unordered format + recovered, err = HeadFromCheckpoint(unorderedCheckpointBytes, UnorderedWithNonIndexedLabelsHeadBlockFmt, nil) require.NoError(t, err) - require.Equal(t, &unorderedHeadBlock{ - format: UnorderedWithNonIndexedLabelsHeadBlockFmt, - rt: unordered.rt, - lines: unordered.lines, - size: unordered.size, - mint: unordered.mint, - maxt: unordered.maxt, - }, recovered) + require.Equal(t, unordered, recovered) - // Ensure we can recover unordered with non-indexed labels checkpoint into ordered headblock - recovered, err = HeadFromCheckpoint(unorderedWithNonIndexedLabelsCheckpointBytes, OrderedHeadBlockFmt) + // Ensure trying to recover unordered with non-indexed labels checkpoint into ordered headblock keeps it in unordered with non-indexed labels format + recovered, err = HeadFromCheckpoint(unorderedWithNonIndexedLabelsCheckpointBytes, OrderedHeadBlockFmt, unorderedWithNonIndexedLabels.symbolizer) require.Nil(t, err) - require.Equal(t, ordered, recovered) // we compare the data with unordered because unordered head block does not contain metaLabels. + require.Equal(t, unorderedWithNonIndexedLabels, recovered) // we compare the data with unordered because unordered head block does not contain metaLabels. - // Ensure we can recover unordered with non-indexed labels checkpoint into unordered headblock - recovered, err = HeadFromCheckpoint(unorderedWithNonIndexedLabelsCheckpointBytes, UnorderedHeadBlockFmt) + // Ensure trying to recover unordered with non-indexed labels checkpoint into unordered headblock keeps it in unordered with non-indexed labels format + recovered, err = HeadFromCheckpoint(unorderedWithNonIndexedLabelsCheckpointBytes, UnorderedHeadBlockFmt, unorderedWithNonIndexedLabels.symbolizer) require.Nil(t, err) - require.Equal(t, unordered, recovered) // we compare the data with unordered because unordered head block does not contain metaLabels. + require.Equal(t, unorderedWithNonIndexedLabels, recovered) // we compare the data with unordered because unordered head block does not contain metaLabels. - // Ensure we can recover unordered with non-indexed labels checkpoint into unordered with non-indexed labels headblock - recovered, err = HeadFromCheckpoint(unorderedWithNonIndexedLabelsCheckpointBytes, UnorderedWithNonIndexedLabelsHeadBlockFmt) + // Ensure we can recover unordered with non-indexed checkpoint into unordered with non-indexed headblock + recovered, err = HeadFromCheckpoint(unorderedWithNonIndexedLabelsCheckpointBytes, UnorderedWithNonIndexedLabelsHeadBlockFmt, unorderedWithNonIndexedLabels.symbolizer) require.Nil(t, err) require.Equal(t, unorderedWithNonIndexedLabels, recovered) } @@ -375,7 +363,7 @@ func BenchmarkHeadBlockWrites(b *testing.B) { } unorderedHeadBlockFn := func() func(int64, string, labels.Labels) { - hb := newUnorderedHeadBlock(UnorderedHeadBlockFmt) + hb := newUnorderedHeadBlock(UnorderedHeadBlockFmt, nil) return func(ts int64, line string, metaLabels labels.Labels) { _ = hb.Append(ts, line, metaLabels) } @@ -723,8 +711,8 @@ func Test_HeadIteratorHash(t *testing.T) { } for name, b := range map[string]HeadBlock{ - "unordered": newUnorderedHeadBlock(UnorderedHeadBlockFmt), - "unordered with non-indexed labels": newUnorderedHeadBlock(UnorderedWithNonIndexedLabelsHeadBlockFmt), + "unordered": newUnorderedHeadBlock(UnorderedHeadBlockFmt, nil), + "unordered with non-indexed labels": newUnorderedHeadBlock(UnorderedWithNonIndexedLabelsHeadBlockFmt, newSymbolizer()), "ordered": &headBlock{}, } { t.Run(name, func(t *testing.T) {