@@ -28,12 +28,10 @@ import (
28
28
29
29
const (
30
30
_ byte = iota
31
- chunkFormatV1
32
- chunkFormatV2
33
- chunkFormatV3
34
- chunkFormatV4
35
-
36
- DefaultChunkFormat = chunkFormatV4 // the currently used chunk format
31
+ ChunkFormatV1
32
+ ChunkFormatV2
33
+ ChunkFormatV3
34
+ ChunkFormatV4
37
35
38
36
blocksPerChunk = 10
39
37
maxLineLength = 1024 * 1024 * 1024
@@ -84,10 +82,22 @@ const (
84
82
OrderedHeadBlockFmt
85
83
UnorderedHeadBlockFmt
86
84
UnorderedWithNonIndexedLabelsHeadBlockFmt
87
-
88
- DefaultHeadBlockFmt = UnorderedWithNonIndexedLabelsHeadBlockFmt
89
85
)
90
86
87
+ // ChunkHeadFormatFor returns corresponding head block format for the given `chunkfmt`.
88
+ func ChunkHeadFormatFor (chunkfmt byte ) HeadBlockFmt {
89
+ if chunkfmt < ChunkFormatV3 {
90
+ return OrderedHeadBlockFmt
91
+ }
92
+
93
+ if chunkfmt == ChunkFormatV3 {
94
+ return UnorderedHeadBlockFmt
95
+ }
96
+
97
+ // return the latest head format for all chunkformat >v3
98
+ return UnorderedWithNonIndexedLabelsHeadBlockFmt
99
+ }
100
+
91
101
var magicNumber = uint32 (0x12EE56A )
92
102
93
103
// The table gets initialized with sync.Once but may still cause a race
@@ -293,7 +303,7 @@ func (hb *headBlock) LoadBytes(b []byte) error {
293
303
return errors .Wrap (db .err (), "verifying headblock header" )
294
304
}
295
305
switch version {
296
- case chunkFormatV1 , chunkFormatV2 , chunkFormatV3 , chunkFormatV4 :
306
+ case ChunkFormatV1 , ChunkFormatV2 , ChunkFormatV3 , ChunkFormatV4 :
297
307
default :
298
308
return errors .Errorf ("incompatible headBlock version (%v), only V1,V2,V3 is currently supported" , version )
299
309
}
@@ -344,15 +354,16 @@ type entry struct {
344
354
}
345
355
346
356
// NewMemChunk returns a new in-mem chunk.
347
- func NewMemChunk (enc Encoding , head HeadBlockFmt , blockSize , targetSize int ) * MemChunk {
348
- return newMemChunkWithFormat (DefaultChunkFormat , enc , head , blockSize , targetSize )
357
+ func NewMemChunk (chunkFormat byte , enc Encoding , head HeadBlockFmt , blockSize , targetSize int ) * MemChunk {
358
+ return newMemChunkWithFormat (chunkFormat , enc , head , blockSize , targetSize )
349
359
}
350
360
351
361
func panicIfInvalidFormat (chunkFmt byte , head HeadBlockFmt ) {
352
- if chunkFmt == chunkFormatV2 && head != OrderedHeadBlockFmt {
362
+ if chunkFmt == ChunkFormatV2 && head != OrderedHeadBlockFmt {
353
363
panic ("only OrderedHeadBlockFmt is supported for V2 chunks" )
354
364
}
355
- if chunkFmt == chunkFormatV4 && head != UnorderedWithNonIndexedLabelsHeadBlockFmt {
365
+ if chunkFmt == ChunkFormatV4 && head != UnorderedWithNonIndexedLabelsHeadBlockFmt {
366
+ fmt .Println ("received head fmt" , head .String ())
356
367
panic ("only UnorderedWithNonIndexedLabelsHeadBlockFmt is supported for V4 chunks" )
357
368
}
358
369
}
@@ -401,9 +412,9 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me
401
412
}
402
413
bc .format = version
403
414
switch version {
404
- case chunkFormatV1 :
415
+ case ChunkFormatV1 :
405
416
bc .encoding = EncGZIP
406
- case chunkFormatV2 , chunkFormatV3 , chunkFormatV4 :
417
+ case ChunkFormatV2 , ChunkFormatV3 , ChunkFormatV4 :
407
418
// format v2+ has a byte for block encoding.
408
419
enc := Encoding (db .byte ())
409
420
if db .err () != nil {
@@ -414,6 +425,9 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me
414
425
return nil , errors .Errorf ("invalid version %d" , version )
415
426
}
416
427
428
+ // Set the correct headblock format based on chunk format
429
+ bc .headFmt = ChunkHeadFormatFor (version )
430
+
417
431
// readSectionLenAndOffset reads len and offset for different sections within the chunk.
418
432
// Starting from chunk version 4, we have started writing offset and length of various sections within the chunk.
419
433
// These len and offset pairs would be stored together at the end of the chunk.
@@ -427,7 +441,7 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me
427
441
428
442
metasOffset := uint64 (0 )
429
443
metasLen := uint64 (0 )
430
- if version >= chunkFormatV4 {
444
+ if version >= ChunkFormatV4 {
431
445
// version >= 4 starts writing length of sections after their offsets
432
446
metasLen , metasOffset = readSectionLenAndOffset (chunkMetasSectionIdx )
433
447
} else {
@@ -458,7 +472,7 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me
458
472
459
473
// Read offset and length.
460
474
blk .offset = db .uvarint ()
461
- if version >= chunkFormatV3 {
475
+ if version >= ChunkFormatV3 {
462
476
blk .uncompressedSize = db .uvarint ()
463
477
}
464
478
l := db .uvarint ()
@@ -481,7 +495,7 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me
481
495
}
482
496
}
483
497
484
- if version >= chunkFormatV4 {
498
+ if version >= ChunkFormatV4 {
485
499
nonIndexedLabelsLen , nonIndexedLabelsOffset := readSectionLenAndOffset (chunkNonIndexedLabelsSectionIdx )
486
500
lb := b [nonIndexedLabelsOffset : nonIndexedLabelsOffset + nonIndexedLabelsLen ] // non-indexed labels Offset + checksum
487
501
db = decbuf {b : lb }
@@ -526,7 +540,7 @@ func (c *MemChunk) Bytes() ([]byte, error) {
526
540
func (c * MemChunk ) BytesSize () int {
527
541
size := 4 // magic number
528
542
size ++ // format
529
- if c .format > chunkFormatV1 {
543
+ if c .format > ChunkFormatV1 {
530
544
size ++ // chunk format v2+ has a byte for encoding.
531
545
}
532
546
@@ -538,7 +552,7 @@ func (c *MemChunk) BytesSize() int {
538
552
size += binary .MaxVarintLen64 // mint
539
553
size += binary .MaxVarintLen64 // maxt
540
554
size += binary .MaxVarintLen32 // offset
541
- if c .format >= chunkFormatV3 {
555
+ if c .format >= ChunkFormatV3 {
542
556
size += binary .MaxVarintLen32 // uncompressed size
543
557
}
544
558
size += binary .MaxVarintLen32 // len(b)
@@ -550,7 +564,7 @@ func (c *MemChunk) BytesSize() int {
550
564
size += crc32 .Size // metablock crc
551
565
size += 8 // metaoffset
552
566
553
- if c .format >= chunkFormatV4 {
567
+ if c .format >= ChunkFormatV4 {
554
568
size += 8 // metablock length
555
569
556
570
size += c .symbolizer .CheckpointSize () // non-indexed labels block
@@ -586,7 +600,7 @@ func (c *MemChunk) writeTo(w io.Writer, forCheckpoint bool) (int64, error) {
586
600
// Write the header (magicNum + version).
587
601
eb .putBE32 (magicNumber )
588
602
eb .putByte (c .format )
589
- if c .format > chunkFormatV1 {
603
+ if c .format > ChunkFormatV1 {
590
604
// chunk format v2+ has a byte for encoding.
591
605
eb .putByte (byte (c .encoding ))
592
606
}
@@ -599,7 +613,7 @@ func (c *MemChunk) writeTo(w io.Writer, forCheckpoint bool) (int64, error) {
599
613
nonIndexedLabelsOffset := offset
600
614
nonIndexedLabelsLen := 0
601
615
602
- if c .format >= chunkFormatV4 {
616
+ if c .format >= ChunkFormatV4 {
603
617
var (
604
618
n int
605
619
crcHash []byte
@@ -655,7 +669,7 @@ func (c *MemChunk) writeTo(w io.Writer, forCheckpoint bool) (int64, error) {
655
669
eb .putVarint64 (b .mint )
656
670
eb .putVarint64 (b .maxt )
657
671
eb .putUvarint (b .offset )
658
- if c .format >= chunkFormatV3 {
672
+ if c .format >= ChunkFormatV3 {
659
673
eb .putUvarint (b .uncompressedSize )
660
674
}
661
675
eb .putUvarint (len (b .b ))
@@ -669,7 +683,7 @@ func (c *MemChunk) writeTo(w io.Writer, forCheckpoint bool) (int64, error) {
669
683
}
670
684
offset += int64 (n )
671
685
672
- if c .format >= chunkFormatV4 {
686
+ if c .format >= ChunkFormatV4 {
673
687
// Write non-indexed labels offset and length
674
688
eb .reset ()
675
689
eb .putBE64int (nonIndexedLabelsLen )
@@ -683,7 +697,7 @@ func (c *MemChunk) writeTo(w io.Writer, forCheckpoint bool) (int64, error) {
683
697
684
698
// Write the metasOffset.
685
699
eb .reset ()
686
- if c .format >= chunkFormatV4 {
700
+ if c .format >= ChunkFormatV4 {
687
701
eb .putBE64int (metasLen )
688
702
}
689
703
eb .putBE64int (int (metasOffset ))
@@ -763,7 +777,7 @@ func (c *MemChunk) SpaceFor(e *logproto.Entry) bool {
763
777
// a great check, but it will guarantee we are always under the target size
764
778
newHBSize := c .head .UncompressedSize () + len (e .Line )
765
779
nonIndexedLabelsSize := 0
766
- if c .format >= chunkFormatV4 {
780
+ if c .format >= ChunkFormatV4 {
767
781
newHBSize += metaLabelsLen (logproto .FromLabelAdaptersToLabels (e .NonIndexedLabels ))
768
782
// non-indexed labels are compressed while serializing the chunk so we don't know what their size would be after compression.
769
783
// As adoption increases, their overall size can be non-trivial so we can't ignore them while calculating chunk size.
@@ -786,7 +800,7 @@ func (c *MemChunk) UncompressedSize() int {
786
800
size += b .uncompressedSize
787
801
}
788
802
789
- if c .format >= chunkFormatV4 {
803
+ if c .format >= ChunkFormatV4 {
790
804
size += c .symbolizer .UncompressedSize ()
791
805
}
792
806
@@ -802,7 +816,7 @@ func (c *MemChunk) CompressedSize() int {
802
816
size := 0
803
817
// Better to account for any uncompressed data than ignore it even though this isn't accurate.
804
818
size += c .head .UncompressedSize ()
805
- if c .format >= chunkFormatV4 {
819
+ if c .format >= ChunkFormatV4 {
806
820
size += c .symbolizer .UncompressedSize () // length of each symbol
807
821
}
808
822
@@ -829,7 +843,7 @@ func (c *MemChunk) Append(entry *logproto.Entry) error {
829
843
return ErrOutOfOrder
830
844
}
831
845
832
- if c .format < chunkFormatV4 {
846
+ if c .format < ChunkFormatV4 {
833
847
entry .NonIndexedLabels = nil
834
848
}
835
849
if err := c .head .Append (entryTimestamp , entry .Line , logproto .FromLabelAdaptersToLabels (entry .NonIndexedLabels )); err != nil {
@@ -940,7 +954,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
940
954
mint , maxt := mintT .UnixNano (), maxtT .UnixNano ()
941
955
blockItrs := make ([]iter.EntryIterator , 0 , len (c .blocks )+ 1 )
942
956
943
- if c .format >= chunkFormatV4 {
957
+ if c .format >= ChunkFormatV4 {
944
958
stats := stats .FromContext (ctx )
945
959
stats .AddCompressedBytes (int64 (c .symbolizer .CompressedSize ()))
946
960
decompressedSize := int64 (c .symbolizer .DecompressedSize ())
@@ -1025,7 +1039,7 @@ func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time,
1025
1039
mint , maxt := from .UnixNano (), through .UnixNano ()
1026
1040
its := make ([]iter.SampleIterator , 0 , len (c .blocks )+ 1 )
1027
1041
1028
- if c .format >= chunkFormatV4 {
1042
+ if c .format >= ChunkFormatV4 {
1029
1043
stats := stats .FromContext (ctx )
1030
1044
stats .AddCompressedBytes (int64 (c .symbolizer .CompressedSize ()))
1031
1045
decompressedSize := int64 (c .symbolizer .DecompressedSize ())
@@ -1095,12 +1109,12 @@ func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, err
1095
1109
// as close as possible, respect the block/target sizes specified. However,
1096
1110
// if the blockSize is not set, use reasonable defaults.
1097
1111
if c .blockSize > 0 {
1098
- newChunk = NewMemChunk (c .Encoding (), DefaultHeadBlockFmt , c .blockSize , c .targetSize )
1112
+ newChunk = NewMemChunk (c .format , c . Encoding (), c . headFmt , c .blockSize , c .targetSize )
1099
1113
} else {
1100
1114
// Using defaultBlockSize for target block size.
1101
1115
// The alternative here could be going over all the blocks and using the size of the largest block as target block size but I(Sandeep) feel that it is not worth the complexity.
1102
1116
// For target chunk size I am using compressed size of original chunk since the newChunk should anyways be lower in size than that.
1103
- newChunk = NewMemChunk (c .Encoding (), DefaultHeadBlockFmt , defaultBlockSize , c .CompressedSize ())
1117
+ newChunk = NewMemChunk (c .format , c . Encoding (), c . headFmt , defaultBlockSize , c .CompressedSize ())
1104
1118
}
1105
1119
1106
1120
for itr .Next () {
@@ -1423,7 +1437,7 @@ func (si *bufferedIterator) moveNext() (int64, []byte, labels.Labels, bool) {
1423
1437
1424
1438
decompressedBytes += int64 (lineSize )
1425
1439
1426
- if si .format < chunkFormatV4 {
1440
+ if si .format < ChunkFormatV4 {
1427
1441
si .stats .AddDecompressedBytes (decompressedBytes )
1428
1442
si .stats .AddDecompressedLines (1 )
1429
1443
return ts , si .buf [:lineSize ], nil , true
0 commit comments