From c85f8d8f4a4bb1d01f800a2a800c57f100ab8813 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Tue, 17 Dec 2024 20:17:32 +1100 Subject: [PATCH 1/8] MQE: Fix panic in loading too many samples BucketedPool would create pools in powers of two up to a set `maxSize`. That is, if `maxSize` isn't a power of two itself, then the maximum bucket would be less than the maxSize. However, when requesting a slice from the pool we were only checking against the maxSize, and not whether a bucket existed for that size. Instead calculate the bucketIndex and check if that exists before using it. --- pkg/streamingpromql/testdata/ours/selectors.test | 9 +++++++++ pkg/util/pool/bucketed_pool.go | 4 ++-- pkg/util/pool/bucketed_pool_test.go | 12 ++++++++++++ 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/pkg/streamingpromql/testdata/ours/selectors.test b/pkg/streamingpromql/testdata/ours/selectors.test index 25fbdd13fa..bf12a62243 100644 --- a/pkg/streamingpromql/testdata/ours/selectors.test +++ b/pkg/streamingpromql/testdata/ours/selectors.test @@ -73,3 +73,12 @@ eval range from 0 to 7m step 1m some_metric some_metric{env="prod", cluster="eu"} _ _ _ 0 1 2 3 4 some_metric{env="prod", cluster="us"} _ _ _ 0 2 4 6 8 some_metric{env="prod", cluster="au"} _ _ _ {{count:5}} {{count:10}} {{count:15}} {{count:20}} {{count:25}} + +clear + +load 1s + metric_total 0+2x86400 + +# Test our bucket pool can get more than the max points +eval instant at 24h rate(metric_total[24h]) + {} 2 \ No newline at end of file diff --git a/pkg/util/pool/bucketed_pool.go b/pkg/util/pool/bucketed_pool.go index 2fa8d23cac..4b524bb953 100644 --- a/pkg/util/pool/bucketed_pool.go +++ b/pkg/util/pool/bucketed_pool.go @@ -51,11 +51,11 @@ func (p *BucketedPool[T, E]) Get(size int) T { return nil } - if uint(size) > p.maxSize { + bucketIndex := bits.Len(uint(size - 1)) + if bucketIndex >= len(p.buckets) { return p.make(size) } - bucketIndex := bits.Len(uint(size - 1)) s := p.buckets[bucketIndex].Get() if s == nil { diff --git a/pkg/util/pool/bucketed_pool_test.go b/pkg/util/pool/bucketed_pool_test.go index a7d2e754f5..29b1916dcc 100644 --- a/pkg/util/pool/bucketed_pool_test.go +++ b/pkg/util/pool/bucketed_pool_test.go @@ -122,3 +122,15 @@ func TestBucketedPool_PutSliceLargerThanMaximum(t *testing.T) { require.NotSame(t, &s1[0], &s2[0]) require.Equal(t, 101, cap(s2)) } + +func TestBucketedPool_GetSizeCloseToMax(t *testing.T) { + maxSize := 100000 + pool := NewBucketedPool(uint(maxSize), makeFunc) + + // Request a size that triggers the last bucket boundary. + s := pool.Get(86401) + + // Check that we still get a slice with the correct size. + require.Equal(t, 86401, cap(s)) + require.Len(t, s, 0) +} From 8484c31dca4c93e59bd8d973876036b38f7bf21b Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Tue, 17 Dec 2024 20:19:03 +1100 Subject: [PATCH 2/8] Use a power of two size max bucket --- pkg/streamingpromql/types/limiting_pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/streamingpromql/types/limiting_pool.go b/pkg/streamingpromql/types/limiting_pool.go index 4866e25844..d7efe165a8 100644 --- a/pkg/streamingpromql/types/limiting_pool.go +++ b/pkg/streamingpromql/types/limiting_pool.go @@ -13,7 +13,7 @@ import ( ) const ( - MaxExpectedPointsPerSeries = 100_000 // There's not too much science behind this number: 100000 points allows for a point per minute for just under 70 days. + MaxExpectedPointsPerSeries = 131_072 // There's not too much science behind this number: 100,000 points allows for a point per minute for just under 70 days. Then we use the next power of two. // Treat a native histogram sample as equivalent to this many float samples when considering max in-memory bytes limit. // Keep in mind that float sample = timestamp + float value, so 5x this is equivalent to five timestamps and five floats. From 4c1aa8e42664878d70b184f827bd6f0747ebf89b Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Tue, 17 Dec 2024 20:27:09 +1100 Subject: [PATCH 3/8] MQE: Ensure BucketedPool always returns slices of power two This is to guarantee they work with the ring buffers which expect slices to always be power of two. The limiting pool still protects us from requesting an unreasonable amount of points with the MemoryConsumptionTracker. --- .../testdata/ours/selectors.test | 4 ++ .../types/limiting_pool_test.go | 61 +++++++++++++++++++ pkg/streamingpromql/types/ring_buffer_test.go | 24 ++++++++ pkg/util/pool/bucketed_pool.go | 5 +- pkg/util/pool/bucketed_pool_test.go | 25 ++++++++ 5 files changed, 118 insertions(+), 1 deletion(-) diff --git a/pkg/streamingpromql/testdata/ours/selectors.test b/pkg/streamingpromql/testdata/ours/selectors.test index bf12a62243..1e5a6acdef 100644 --- a/pkg/streamingpromql/testdata/ours/selectors.test +++ b/pkg/streamingpromql/testdata/ours/selectors.test @@ -81,4 +81,8 @@ load 1s # Test our bucket pool can get more than the max points eval instant at 24h rate(metric_total[24h]) + {} 2 + +# Make sure the ring buffer Use and Append work with power of two pools +eval instant at 24h rate(metric_total[1d:1s]) {} 2 \ No newline at end of file diff --git a/pkg/streamingpromql/types/limiting_pool_test.go b/pkg/streamingpromql/types/limiting_pool_test.go index 2f9ff89fb0..b16ca5acb0 100644 --- a/pkg/streamingpromql/types/limiting_pool_test.go +++ b/pkg/streamingpromql/types/limiting_pool_test.go @@ -228,6 +228,67 @@ func TestLimitingPool_Mangling(t *testing.T) { require.Equal(t, []int{123, 123, 123, 123}, s, "returned slice should be mangled when mangling is enabled") } +func TestLimitingBucketedPool_PowerOfTwoCapacities(t *testing.T) { + memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(0, nil) + + pool := NewLimitingBucketedPool( + pool.NewBucketedPool(100_000, func(size int) []int { return make([]int, 0, size) }), + 1, + false, + nil, + ) + + cases := []struct { + requestedSize int + expectedCap int + }{ + {3, 4}, + {5, 8}, + {10, 16}, + {65_000, 65_536}, + {100_001, 131_072}, // Exceeds max, expect next power of two + } + + for _, c := range cases { + slice, err := pool.Get(c.requestedSize, memoryConsumptionTracker) + require.NoError(t, err, "Unexpected error when requesting size %d", c.requestedSize) + require.Equal(t, c.expectedCap, cap(slice), + "LimitingBucketedPool.Get() returned slice with capacity %d; expected %d", cap(slice), c.expectedCap) + pool.Put(slice, memoryConsumptionTracker) + } +} + +func TestLimitingBucketedPool_UnreasonableSizeRequest(t *testing.T) { + const maxMemoryLimit = 1_000_000 * FPointSize + + reg, metric := createRejectedMetric() + memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(uint64(maxMemoryLimit), metric) + + pool := NewLimitingBucketedPool( + pool.NewBucketedPool(100_000, func(size int) []int { return make([]int, 0, size) }), + 1, + false, + nil, + ) + + // Request a reasonable size + slice, err := pool.Get(500_000, memoryConsumptionTracker) + require.NoError(t, err, "Expected to succeed for reasonable size request") + require.Equal(t, 524_288, cap(slice), "Capacity should be next power of two") + assertRejectedQueryCount(t, reg, 0) + + pool.Put(slice, memoryConsumptionTracker) + + // Request an unreasonable size + _, err = pool.Get(10_000_000, memoryConsumptionTracker) + require.Error(t, err, "Expected an error for unreasonably large size request") + require.Contains(t, err.Error(), "exceeded", "Error message should indicate memory consumption limit exceeded") + assertRejectedQueryCount(t, reg, 1) + + require.Equal(t, uint64(0), memoryConsumptionTracker.CurrentEstimatedMemoryConsumptionBytes, + "Current memory consumption should remain at 0 after rejected request") +} + func assertRejectedQueryCount(t *testing.T, reg *prometheus.Registry, expectedRejectionCount int) { expected := fmt.Sprintf(` # TYPE %s counter diff --git a/pkg/streamingpromql/types/ring_buffer_test.go b/pkg/streamingpromql/types/ring_buffer_test.go index 2af450806a..87b44f4487 100644 --- a/pkg/streamingpromql/types/ring_buffer_test.go +++ b/pkg/streamingpromql/types/ring_buffer_test.go @@ -502,3 +502,27 @@ func setupRingBufferTestingPools(t *testing.T) { putHPointSliceForRingBuffer = originalPutHPointSlice }) } + +func TestFPointRingBuffer_UseReturnsErrorOnNonPowerOfTwoSlice(t *testing.T) { + memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(0, nil) + buf := NewFPointRingBuffer(memoryConsumptionTracker) + + nonPowerOfTwoSlice := make([]promql.FPoint, 0, 15) + + err := buf.Use(nonPowerOfTwoSlice) + require.Error(t, err, "Use() should return an error for a non-power-of-two slice") + require.EqualError(t, err, "slice capacity must be a power of two, but is 15", + "Error message should indicate the invalid capacity") +} + +func TestHPointRingBuffer_UseReturnsErrorOnNonPowerOfTwoSlice(t *testing.T) { + memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(0, nil) + buf := NewHPointRingBuffer(memoryConsumptionTracker) + + nonPowerOfTwoSlice := make([]promql.HPoint, 0, 15) + + err := buf.Use(nonPowerOfTwoSlice) + require.Error(t, err, "Use() should return an error for a non-power-of-two slice") + require.EqualError(t, err, "slice capacity must be a power of two, but is 15", + "Error message should indicate the invalid capacity") +} diff --git a/pkg/util/pool/bucketed_pool.go b/pkg/util/pool/bucketed_pool.go index 4b524bb953..8cb410be01 100644 --- a/pkg/util/pool/bucketed_pool.go +++ b/pkg/util/pool/bucketed_pool.go @@ -42,6 +42,8 @@ func NewBucketedPool[T ~[]E, E any](maxSize uint, makeFunc func(int) T) *Buckete } // Get returns a new slice with capacity greater than or equal to size. +// If no bucket large enough exists, a slice larger than the requested size +// of the next power of two is returned. func (p *BucketedPool[T, E]) Get(size int) T { if size < 0 { panic(fmt.Sprintf("BucketedPool.Get with negative size %v", size)) @@ -53,7 +55,8 @@ func (p *BucketedPool[T, E]) Get(size int) T { bucketIndex := bits.Len(uint(size - 1)) if bucketIndex >= len(p.buckets) { - return p.make(size) + nextPowerOfTwo := 1 << bucketIndex + return p.make(nextPowerOfTwo) } s := p.buckets[bucketIndex].Get() diff --git a/pkg/util/pool/bucketed_pool_test.go b/pkg/util/pool/bucketed_pool_test.go index 29b1916dcc..10e3ba6743 100644 --- a/pkg/util/pool/bucketed_pool_test.go +++ b/pkg/util/pool/bucketed_pool_test.go @@ -134,3 +134,28 @@ func TestBucketedPool_GetSizeCloseToMax(t *testing.T) { require.Equal(t, 86401, cap(s)) require.Len(t, s, 0) } + +func TestBucketedPool_AlwaysReturnsPowerOfTwoCapacities(t *testing.T) { + pool := NewBucketedPool(100_000, makeFunc) + + cases := []struct { + requestedSize int + expectedCap int + }{ + {3, 4}, + {5, 8}, + {10, 16}, + {20, 32}, + {65_000, 65_536}, + {100_001, 131_072}, // Exceeds max bucket: next power of two is 131,072 + } + + for _, c := range cases { + slice := pool.Get(c.requestedSize) + + require.Equal(t, c.expectedCap, cap(slice), + "BucketedPool.Get() returned slice with capacity %d; expected %d", cap(slice), c.expectedCap) + + pool.Put(slice) + } +} From b8256beb118061fba94a10e3c42a3bd18a700bc5 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Tue, 17 Dec 2024 21:23:29 +1100 Subject: [PATCH 4/8] Fix tests --- pkg/util/pool/bucketed_pool_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/util/pool/bucketed_pool_test.go b/pkg/util/pool/bucketed_pool_test.go index 10e3ba6743..ed744071f1 100644 --- a/pkg/util/pool/bucketed_pool_test.go +++ b/pkg/util/pool/bucketed_pool_test.go @@ -59,7 +59,7 @@ func TestBucketedPool_HappyPath(t *testing.T) { }, { size: 20, - expectedCap: 20, // Max size is 19, so we expect to get a slice with the size requested (20), not 32 (the next power of two). + expectedCap: 32, // Although max size is 19, we expect to get a slice with the next power of two back. This slice would not have come from a bucket. }, } @@ -120,7 +120,7 @@ func TestBucketedPool_PutSliceLargerThanMaximum(t *testing.T) { pool.Put(s1) s2 := pool.Get(101)[:101] require.NotSame(t, &s1[0], &s2[0]) - require.Equal(t, 101, cap(s2)) + require.Equal(t, 128, cap(s2)) } func TestBucketedPool_GetSizeCloseToMax(t *testing.T) { @@ -131,7 +131,7 @@ func TestBucketedPool_GetSizeCloseToMax(t *testing.T) { s := pool.Get(86401) // Check that we still get a slice with the correct size. - require.Equal(t, 86401, cap(s)) + require.Equal(t, 131072, cap(s)) require.Len(t, s, 0) } From 1e76f290b30fe5721e6a88001853d35e7f7ed877 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Tue, 17 Dec 2024 21:26:15 +1100 Subject: [PATCH 5/8] Correctly return slices to their respective buckets or discard them --- pkg/util/pool/bucketed_pool.go | 8 +++++++- pkg/util/pool/bucketed_pool_test.go | 18 ++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/pkg/util/pool/bucketed_pool.go b/pkg/util/pool/bucketed_pool.go index 8cb410be01..efdaf1a647 100644 --- a/pkg/util/pool/bucketed_pool.go +++ b/pkg/util/pool/bucketed_pool.go @@ -79,8 +79,14 @@ func (p *BucketedPool[T, E]) Put(s T) { } bucketIndex := bits.Len(size - 1) + if bucketIndex >= len(p.buckets) { + return // Ignore slices larger than the largest bucket + } + + // Ignore slices that do not align to the current power of 2 + // (this will only happen where a slice did not originally come from the pool). if size != (1 << bucketIndex) { - bucketIndex-- + return } p.buckets[bucketIndex].Put(s[0:0]) diff --git a/pkg/util/pool/bucketed_pool_test.go b/pkg/util/pool/bucketed_pool_test.go index ed744071f1..a7183694d1 100644 --- a/pkg/util/pool/bucketed_pool_test.go +++ b/pkg/util/pool/bucketed_pool_test.go @@ -159,3 +159,21 @@ func TestBucketedPool_AlwaysReturnsPowerOfTwoCapacities(t *testing.T) { pool.Put(slice) } } + +func TestBucketedPool_PutSizeCloseToMax(t *testing.T) { + maxSize := 100000 + pool := NewBucketedPool(uint(maxSize), makeFunc) + + // Create a slice with capacity that triggers the upper edge case + s := make([]int, 0, 65_000) // 86401 is close to maxSize but not aligned to power of 2 + + // Ensure Put does not panic when adding this slice + require.NotPanics(t, func() { + pool.Put(s) + }, "Put should not panic for sizes close to maxSize") + + // Validate that a subsequent Get for a smaller size works fine + ret := pool.Get(1) + require.Equal(t, 1, cap(ret)) + require.Len(t, ret, 0) +} From 8510e9107e4e6752f50b57976609088d83f09cad Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Tue, 17 Dec 2024 22:06:04 +1100 Subject: [PATCH 6/8] Extra tests --- .../types/limiting_pool_test.go | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/pkg/streamingpromql/types/limiting_pool_test.go b/pkg/streamingpromql/types/limiting_pool_test.go index b16ca5acb0..d8340b392b 100644 --- a/pkg/streamingpromql/types/limiting_pool_test.go +++ b/pkg/streamingpromql/types/limiting_pool_test.go @@ -289,6 +289,40 @@ func TestLimitingBucketedPool_UnreasonableSizeRequest(t *testing.T) { "Current memory consumption should remain at 0 after rejected request") } +func TestLimitingBucketedPool_MaxExpectedPointsPerSeriesConstantIsPowerOfTwo(t *testing.T) { + // Although not strictly required (as the code should handle MaxExpectedPointsPerSeries not being a power of two correctly), + // it is best that we keep it as one for now. + require.True(t, isPowerOfTwo(MaxExpectedPointsPerSeries), "MaxExpectedPointsPerSeries must be a power of two") +} + +func TestIsPowerOfTwo(t *testing.T) { + cases := []struct { + input int + expected bool + }{ + {-2, false}, + {1, true}, + {2, true}, + {3, false}, + {4, true}, + {5, false}, + {6, false}, + {7, false}, + {8, true}, + {16, true}, + {32, true}, + {1023, false}, + {1024, true}, + {1<<12 - 1, false}, + {1 << 12, true}, + } + + for _, c := range cases { + result := isPowerOfTwo(c.input) + require.Equalf(t, c.expected, result, "isPowerOfTwo(%d) should return %v", c.input, c.expected) + } +} + func assertRejectedQueryCount(t *testing.T, reg *prometheus.Registry, expectedRejectionCount int) { expected := fmt.Sprintf(` # TYPE %s counter From 473f95ece4cb25575c0438f75a27e43cc69c7259 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Wed, 18 Dec 2024 12:58:03 +1100 Subject: [PATCH 7/8] Address review feedback --- pkg/util/pool/bucketed_pool.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/pkg/util/pool/bucketed_pool.go b/pkg/util/pool/bucketed_pool.go index efdaf1a647..458139b4bd 100644 --- a/pkg/util/pool/bucketed_pool.go +++ b/pkg/util/pool/bucketed_pool.go @@ -44,6 +44,7 @@ func NewBucketedPool[T ~[]E, E any](maxSize uint, makeFunc func(int) T) *Buckete // Get returns a new slice with capacity greater than or equal to size. // If no bucket large enough exists, a slice larger than the requested size // of the next power of two is returned. +// Get guarantees the resulting slice always has a capacity in power of twos. func (p *BucketedPool[T, E]) Get(size int) T { if size < 0 { panic(fmt.Sprintf("BucketedPool.Get with negative size %v", size)) @@ -53,8 +54,23 @@ func (p *BucketedPool[T, E]) Get(size int) T { return nil } + // Enforce Go's maximum slice size limitation. + // Go slices are limited to (1<<31)-1 elements due to runtime constraints. + const maxSliceSize = (1 << 31) - 1 + if size > maxSliceSize { + panic(fmt.Sprintf("Cannot allocate slice with size (%d) as it exceeds Go's maximum slice size (%d)", size, maxSliceSize)) + } + bucketIndex := bits.Len(uint(size - 1)) + + // If bucketIndex exceeds the number of available buckets, return a slice of the next power of two. if bucketIndex >= len(p.buckets) { + // Check for wrap around case when bucketIndex equals or exceeds the bit size limit. + if bucketIndex >= bits.UintSize { + // This should not occur due to the above slice size check. + panic(fmt.Sprintf("Cannot allocate slice with size (%d) as it exceeds the system limit", size)) + } + nextPowerOfTwo := 1 << bucketIndex return p.make(nextPowerOfTwo) } From 0cfe82bdb1605fda7718f35a6017c4183ca7ae63 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Wed, 18 Dec 2024 20:19:28 +1100 Subject: [PATCH 8/8] Remove unncessary slice length check --- pkg/util/pool/bucketed_pool.go | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/pkg/util/pool/bucketed_pool.go b/pkg/util/pool/bucketed_pool.go index 458139b4bd..37a01f096b 100644 --- a/pkg/util/pool/bucketed_pool.go +++ b/pkg/util/pool/bucketed_pool.go @@ -54,23 +54,10 @@ func (p *BucketedPool[T, E]) Get(size int) T { return nil } - // Enforce Go's maximum slice size limitation. - // Go slices are limited to (1<<31)-1 elements due to runtime constraints. - const maxSliceSize = (1 << 31) - 1 - if size > maxSliceSize { - panic(fmt.Sprintf("Cannot allocate slice with size (%d) as it exceeds Go's maximum slice size (%d)", size, maxSliceSize)) - } - bucketIndex := bits.Len(uint(size - 1)) // If bucketIndex exceeds the number of available buckets, return a slice of the next power of two. if bucketIndex >= len(p.buckets) { - // Check for wrap around case when bucketIndex equals or exceeds the bit size limit. - if bucketIndex >= bits.UintSize { - // This should not occur due to the above slice size check. - panic(fmt.Sprintf("Cannot allocate slice with size (%d) as it exceeds the system limit", size)) - } - nextPowerOfTwo := 1 << bucketIndex return p.make(nextPowerOfTwo) }