Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Clarify compression package #14257

Merged
merged 4 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
chore: Remove Enc prefix from Codec items
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Sep 30, 2024
commit d9918fa5cf921f94f5149792cf6eb4a0f7fd57c8
2 changes: 1 addition & 1 deletion pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
)

// TODO(chaudum): Make configurable via (per-tenant?) setting.
var blockCompressionAlgo = compression.EncNone
var blockCompressionAlgo = compression.None

type Builder struct {
services.Service
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloombuild/builder/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v2.Iterator[*v1.Ser

func TestSimpleBloomGenerator(t *testing.T) {
const maxBlockSize = 100 << 20 // 100MB
for _, enc := range []compression.Codec{compression.EncNone, compression.EncGZIP, compression.EncSnappy} {
for _, enc := range []compression.Codec{compression.None, compression.GZIP, compression.Snappy} {
for _, tc := range []struct {
desc string
fromSchema, toSchema v1.BlockOptions
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloombuild/common/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (b *BloomTSDBStore) LoadTSDB(
}
defer data.Close()

decompressorPool := compression.GetReaderPool(compression.EncGZIP)
decompressorPool := compression.GetReaderPool(compression.GZIP)
decompressor, err := decompressorPool.GetReader(data)
if err != nil {
return nil, errors.Wrap(err, "failed to get decompressor")
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func genBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) {
writer := v1.NewMemoryBlockWriter(indexBuf, bloomsBuf)
reader := v1.NewByteReader(indexBuf, bloomsBuf)

blockOpts := v1.NewBlockOptions(compression.EncNone, 0, 0)
blockOpts := v1.NewBlockOptions(compression.None, 0, 0)

builder, err := v1.NewBlockBuilder(blockOpts, writer)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunkenc/dumb_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (c *dumbChunk) Utilization() float64 {
return float64(len(c.entries)) / float64(tmpNumEntries)
}

func (c *dumbChunk) Encoding() compression.Codec { return compression.EncNone }
func (c *dumbChunk) Encoding() compression.Codec { return compression.None }

// Returns an iterator that goes from _most_ recent to _least_ recent (ie,
// backwards).
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me
bc.format = version
switch version {
case ChunkFormatV1:
bc.encoding = compression.EncGZIP
bc.encoding = compression.GZIP
case ChunkFormatV2, ChunkFormatV3, ChunkFormatV4:
// format v2+ has a byte for block encoding.
enc := compression.Codec(db.byte())
Expand Down
52 changes: 26 additions & 26 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ import (
)

var testEncodings = []compression.Codec{
compression.EncNone,
compression.EncGZIP,
compression.EncLZ4_64k,
compression.EncLZ4_256k,
compression.EncLZ4_1M,
compression.EncLZ4_4M,
compression.EncSnappy,
compression.EncFlate,
compression.EncZstd,
compression.None,
compression.GZIP,
compression.LZ4_64k,
compression.LZ4_256k,
compression.LZ4_1M,
compression.LZ4_4M,
compression.Snappy,
compression.Flate,
compression.Zstd,
}

var (
Expand Down Expand Up @@ -299,7 +299,7 @@ func TestCorruptChunk(t *testing.T) {
func TestReadFormatV1(t *testing.T) {
t.Parallel()

c := NewMemChunk(ChunkFormatV3, compression.EncGZIP, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize)
c := NewMemChunk(ChunkFormatV3, compression.GZIP, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize)
fillChunk(c)
// overrides to v1 for testing that specific version.
c.format = ChunkFormatV1
Expand Down Expand Up @@ -558,7 +558,7 @@ func TestChunkFilling(t *testing.T) {
func TestGZIPChunkTargetSize(t *testing.T) {
t.Parallel()

chk := NewMemChunk(ChunkFormatV3, compression.EncGZIP, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize)
chk := NewMemChunk(ChunkFormatV3, compression.GZIP, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize)

lineSize := 512
entry := &logproto.Entry{
Expand Down Expand Up @@ -681,7 +681,7 @@ func TestMemChunk_AppendOutOfOrder(t *testing.T) {
t.Run(testName, func(t *testing.T) {
t.Parallel()

tester(t, NewMemChunk(ChunkFormatV3, compression.EncGZIP, f, testBlockSize, testTargetSize))
tester(t, NewMemChunk(ChunkFormatV3, compression.GZIP, f, testBlockSize, testTargetSize))
})
}
}
Expand Down Expand Up @@ -726,7 +726,7 @@ func TestChunkSize(t *testing.T) {
}

func TestChunkStats(t *testing.T) {
c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, DefaultTestHeadBlockFmt, testBlockSize, 0)
c := NewMemChunk(ChunkFormatV4, compression.Snappy, DefaultTestHeadBlockFmt, testBlockSize, 0)
first := time.Now()
entry := &logproto.Entry{
Timestamp: first,
Expand Down Expand Up @@ -968,7 +968,7 @@ func BenchmarkBackwardIterator(b *testing.B) {
for _, bs := range testBlockSizes {
b.Run(humanize.Bytes(uint64(bs)), func(b *testing.B) {
b.ReportAllocs()
c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, DefaultTestHeadBlockFmt, bs, testTargetSize)
c := NewMemChunk(ChunkFormatV4, compression.Snappy, DefaultTestHeadBlockFmt, bs, testTargetSize)
_ = fillChunk(c)
b.ResetTimer()
for n := 0; n < b.N; n++ {
Expand Down Expand Up @@ -1082,7 +1082,7 @@ func BenchmarkHeadBlockSampleIterator(b *testing.B) {
func TestMemChunk_IteratorBounds(t *testing.T) {
createChunk := func() *MemChunk {
t.Helper()
c := NewMemChunk(ChunkFormatV3, compression.EncNone, DefaultTestHeadBlockFmt, 1e6, 1e6)
c := NewMemChunk(ChunkFormatV3, compression.None, DefaultTestHeadBlockFmt, 1e6, 1e6)

if _, err := c.Append(&logproto.Entry{
Timestamp: time.Unix(0, 1),
Expand Down Expand Up @@ -1168,9 +1168,9 @@ func TestMemchunkLongLine(t *testing.T) {
func TestBytesWith(t *testing.T) {
t.Parallel()

exp, err := NewMemChunk(ChunkFormatV3, compression.EncNone, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize).BytesWith(nil)
exp, err := NewMemChunk(ChunkFormatV3, compression.None, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize).BytesWith(nil)
require.Nil(t, err)
out, err := NewMemChunk(ChunkFormatV3, compression.EncNone, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize).BytesWith([]byte{1, 2, 3})
out, err := NewMemChunk(ChunkFormatV3, compression.None, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize).BytesWith([]byte{1, 2, 3})
require.Nil(t, err)

require.Equal(t, exp, out)
Expand All @@ -1181,8 +1181,8 @@ func TestCheckpointEncoding(t *testing.T) {

blockSize, targetSize := 256*1024, 1500*1024
for _, f := range allPossibleFormats {
t.Run(testNameWithFormats(compression.EncSnappy, f.chunkFormat, f.headBlockFmt), func(t *testing.T) {
c := newMemChunkWithFormat(f.chunkFormat, compression.EncSnappy, f.headBlockFmt, blockSize, targetSize)
t.Run(testNameWithFormats(compression.Snappy, f.chunkFormat, f.headBlockFmt), func(t *testing.T) {
c := newMemChunkWithFormat(f.chunkFormat, compression.Snappy, f.headBlockFmt, blockSize, targetSize)

// add a few entries
for i := 0; i < 5; i++ {
Expand Down Expand Up @@ -1267,7 +1267,7 @@ var (
func BenchmarkBufferedIteratorLabels(b *testing.B) {
for _, f := range HeadBlockFmts {
b.Run(f.String(), func(b *testing.B) {
c := NewMemChunk(ChunkFormatV3, compression.EncSnappy, f, testBlockSize, testTargetSize)
c := NewMemChunk(ChunkFormatV3, compression.Snappy, f, testBlockSize, testTargetSize)
_ = fillChunk(c)

labelsSet := []labels.Labels{
Expand Down Expand Up @@ -1367,8 +1367,8 @@ func BenchmarkBufferedIteratorLabels(b *testing.B) {

func Test_HeadIteratorReverse(t *testing.T) {
for _, testData := range allPossibleFormats {
t.Run(testNameWithFormats(compression.EncSnappy, testData.chunkFormat, testData.headBlockFmt), func(t *testing.T) {
c := newMemChunkWithFormat(testData.chunkFormat, compression.EncSnappy, testData.headBlockFmt, testBlockSize, testTargetSize)
t.Run(testNameWithFormats(compression.Snappy, testData.chunkFormat, testData.headBlockFmt), func(t *testing.T) {
c := newMemChunkWithFormat(testData.chunkFormat, compression.Snappy, testData.headBlockFmt, testBlockSize, testTargetSize)
genEntry := func(i int64) *logproto.Entry {
return &logproto.Entry{
Timestamp: time.Unix(0, i),
Expand Down Expand Up @@ -1483,7 +1483,7 @@ func TestMemChunk_Rebound(t *testing.T) {
}

func buildTestMemChunk(t *testing.T, from, through time.Time) *MemChunk {
chk := NewMemChunk(ChunkFormatV3, compression.EncGZIP, DefaultTestHeadBlockFmt, defaultBlockSize, 0)
chk := NewMemChunk(ChunkFormatV3, compression.GZIP, DefaultTestHeadBlockFmt, defaultBlockSize, 0)
for ; from.Before(through); from = from.Add(time.Second) {
_, err := chk.Append(&logproto.Entry{
Line: from.String(),
Expand Down Expand Up @@ -1604,7 +1604,7 @@ func TestMemChunk_ReboundAndFilter_with_filter(t *testing.T) {
}

func buildFilterableTestMemChunk(t *testing.T, from, through time.Time, matchingFrom, matchingTo *time.Time, withStructuredMetadata bool) *MemChunk {
chk := NewMemChunk(ChunkFormatV4, compression.EncGZIP, DefaultTestHeadBlockFmt, defaultBlockSize, 0)
chk := NewMemChunk(ChunkFormatV4, compression.GZIP, DefaultTestHeadBlockFmt, defaultBlockSize, 0)
t.Logf("from : %v", from.String())
t.Logf("through: %v", through.String())
var structuredMetadata push.LabelsAdapter
Expand Down Expand Up @@ -1753,7 +1753,7 @@ func TestMemChunk_SpaceFor(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
for _, format := range allPossibleFormats {
t.Run(fmt.Sprintf("chunk_v%d_head_%s", format.chunkFormat, format.headBlockFmt), func(t *testing.T) {
chk := newMemChunkWithFormat(format.chunkFormat, compression.EncNone, format.headBlockFmt, 1024, tc.targetSize)
chk := newMemChunkWithFormat(format.chunkFormat, compression.None, format.headBlockFmt, 1024, tc.targetSize)

chk.blocks = make([]block, tc.nBlocks)
chk.cutBlockSize = tc.cutBlockSize
Expand Down Expand Up @@ -2055,7 +2055,7 @@ func TestDecodeChunkIncorrectBlockOffset(t *testing.T) {
t.Run(fmt.Sprintf("chunkFormat:%v headBlockFmt:%v", format.chunkFormat, format.headBlockFmt), func(t *testing.T) {
for incorrectOffsetBlockNum := 0; incorrectOffsetBlockNum < 3; incorrectOffsetBlockNum++ {
t.Run(fmt.Sprintf("inorrect offset block: %d", incorrectOffsetBlockNum), func(t *testing.T) {
chk := NewMemChunk(format.chunkFormat, compression.EncNone, format.headBlockFmt, blockSize, testTargetSize)
chk := NewMemChunk(format.chunkFormat, compression.None, format.headBlockFmt, blockSize, testTargetSize)
ts := time.Now().Unix()
for i := 0; i < 3; i++ {
dup, err := chk.Append(&logproto.Entry{
Expand Down
16 changes: 8 additions & 8 deletions pkg/chunkenc/unordered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ func BenchmarkHeadBlockWrites(b *testing.B) {
}

func TestUnorderedChunkIterators(t *testing.T) {
c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
c := NewMemChunk(ChunkFormatV4, compression.Snappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
for i := 0; i < 100; i++ {
// push in reverse order
dup, err := c.Append(&logproto.Entry{
Expand Down Expand Up @@ -497,11 +497,11 @@ func TestUnorderedChunkIterators(t *testing.T) {
}

func BenchmarkUnorderedRead(b *testing.B) {
legacy := NewMemChunk(ChunkFormatV3, compression.EncSnappy, OrderedHeadBlockFmt, testBlockSize, testTargetSize)
legacy := NewMemChunk(ChunkFormatV3, compression.Snappy, OrderedHeadBlockFmt, testBlockSize, testTargetSize)
fillChunkClose(legacy, false)
ordered := NewMemChunk(ChunkFormatV3, compression.EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize)
ordered := NewMemChunk(ChunkFormatV3, compression.Snappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize)
fillChunkClose(ordered, false)
unordered := NewMemChunk(ChunkFormatV3, compression.EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize)
unordered := NewMemChunk(ChunkFormatV3, compression.Snappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize)
fillChunkRandomOrder(unordered, false)

tcs := []struct {
Expand Down Expand Up @@ -559,7 +559,7 @@ func BenchmarkUnorderedRead(b *testing.B) {
}

func TestUnorderedIteratorCountsAllEntries(t *testing.T) {
c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
c := NewMemChunk(ChunkFormatV4, compression.Snappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
fillChunkRandomOrder(c, false)

ct := 0
Expand Down Expand Up @@ -596,7 +596,7 @@ func TestUnorderedIteratorCountsAllEntries(t *testing.T) {
}

func chunkFrom(xs []logproto.Entry) ([]byte, error) {
c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
c := NewMemChunk(ChunkFormatV4, compression.Snappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
for _, x := range xs {
if _, err := c.Append(&x); err != nil {
return nil, err
Expand Down Expand Up @@ -656,7 +656,7 @@ func TestReorder(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
c := NewMemChunk(ChunkFormatV4, compression.Snappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
for _, x := range tc.input {
dup, err := c.Append(&x)
require.False(t, dup)
Expand All @@ -675,7 +675,7 @@ func TestReorder(t *testing.T) {
}

func TestReorderAcrossBlocks(t *testing.T) {
c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
c := NewMemChunk(ChunkFormatV4, compression.Snappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
for _, batch := range [][]int{
// ensure our blocks have overlapping bounds and must be reordered
// before closing.
Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/deletion/delete_requests_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (t *deleteRequestsTable) uploadFile() error {
}()

err = t.db.View(func(tx *bbolt.Tx) (err error) {
gzipPool := compression.GetWriterPool(compression.EncGZIP)
gzipPool := compression.GetWriterPool(compression.GZIP)
compressedWriter := gzipPool.GetWriter(f)
defer gzipPool.PutWriter(compressedWriter)

Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/index_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (is *indexSet) upload() error {
}
}()

gzipPool := compression.GetWriterPool(compression.EncGZIP)
gzipPool := compression.GetWriterPool(compression.GZIP)
compressedWriter := gzipPool.GetWriter(f)
defer gzipPool.PutWriter(compressedWriter)

Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/retention/retention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func createChunk(t testing.TB, userID string, lbs labels.Labels, from model.Time
labelsBuilder.Set(labels.MetricName, "logs")
metric := labelsBuilder.Labels()
fp := ingesterclient.Fingerprint(lbs)
chunkEnc := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncSnappy, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, blockSize, targetSize)
chunkEnc := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.Snappy, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, blockSize, targetSize)

for ts := from; !ts.After(through); ts = ts.Add(1 * time.Minute) {
dup, err := chunkEnc.Append(&logproto.Entry{
Expand Down
Loading