Skip to content

Commit

Permalink
MQE: Tidy up pool size handling (#10372)
Browse files Browse the repository at this point in the history
* MQE: Tidy up pool size handling

Primarily from feedback on #10261

Rather than allowing an arbitrary maxSize in pools, require them to
be a power of two since that is what happens internally anyway.

We keep the checks and protections around requiring it to be a power of
two.

* Fix lint

* add comment
  • Loading branch information
jhesketh authored Jan 13, 2025
1 parent ed3160e commit ccb73ed
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 174 deletions.
9 changes: 3 additions & 6 deletions pkg/streamingpromql/types/fpoint_ring_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/prometheus/prometheus/promql"

"github.com/grafana/mimir/pkg/streamingpromql/limiting"
"github.com/grafana/mimir/pkg/util/pool"
)

// FPointRingBuffer and HPointRingBuffer are nearly identical, but exist for each
Expand Down Expand Up @@ -61,7 +62,7 @@ func (b *FPointRingBuffer) Append(p promql.FPoint) error {
return err
}

if !isPowerOfTwo(cap(newSlice)) {
if !pool.IsPowerOfTwo(cap(newSlice)) {
// We rely on the capacity being a power of two for the pointsIndexMask optimisation below.
// If we can guarantee that newSlice has a capacity that is a power of two in the future, then we can drop this check.
return fmt.Errorf("pool returned slice of capacity %v (requested %v), but wanted a power of two", cap(newSlice), newSize)
Expand Down Expand Up @@ -148,7 +149,7 @@ func (b *FPointRingBuffer) Release() {
// should not return s to the pool themselves.
// s must have a capacity that is a power of two.
func (b *FPointRingBuffer) Use(s []promql.FPoint) error {
if !isPowerOfTwo(cap(s)) {
if !pool.IsPowerOfTwo(cap(s)) {
// We rely on the capacity being a power of two for the pointsIndexMask optimisation below.
return fmt.Errorf("slice capacity must be a power of two, but is %v", cap(s))
}
Expand Down Expand Up @@ -261,7 +262,3 @@ func (v FPointRingBufferView) Any() bool {
// These hooks exist so we can override them during unit tests.
var getFPointSliceForRingBuffer = FPointSlicePool.Get
var putFPointSliceForRingBuffer = FPointSlicePool.Put

func isPowerOfTwo(n int) bool {
return (n & (n - 1)) == 0
}
5 changes: 3 additions & 2 deletions pkg/streamingpromql/types/hpoint_ring_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/prometheus/prometheus/promql"

"github.com/grafana/mimir/pkg/streamingpromql/limiting"
"github.com/grafana/mimir/pkg/util/pool"
)

// FPointRingBuffer and HPointRingBuffer are nearly identical, but exist for each
Expand Down Expand Up @@ -121,7 +122,7 @@ func (b *HPointRingBuffer) NextPoint() (*promql.HPoint, error) {
return nil, err
}

if !isPowerOfTwo(cap(newSlice)) {
if !pool.IsPowerOfTwo(cap(newSlice)) {
// We rely on the capacity being a power of two for the pointsIndexMask optimisation below.
// If we can guarantee that newSlice has a capacity that is a power of two in the future, then we can drop this check.
return nil, fmt.Errorf("pool returned slice of capacity %v (requested %v), but wanted a power of two", cap(newSlice), newSize)
Expand Down Expand Up @@ -185,7 +186,7 @@ func (b *HPointRingBuffer) Release() {
// should not return s to the pool themselves.
// s must have a capacity that is a power of two.
func (b *HPointRingBuffer) Use(s []promql.HPoint) error {
if !isPowerOfTwo(cap(s)) {
if !pool.IsPowerOfTwo(cap(s)) {
// We rely on the capacity being a power of two for the pointsIndexMask optimisation below.
return fmt.Errorf("slice capacity must be a power of two, but is %v", cap(s))
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/streamingpromql/types/limiting_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
)

const (
MaxExpectedPointsPerSeries = 131_072 // There's not too much science behind this number: 100,000 points allows for a point per minute for just under 70 days. Then we use the next power of two.
// There's not too much science behind this number: 100,000 points allows for a point per minute for just under 70 days.
// Then we use the next power of two, given the pools always return slices with capacity equal to a power of two.
MaxExpectedPointsPerSeries = 131_072

// Treat a native histogram sample as equivalent to this many float samples when considering max in-memory bytes limit.
// Keep in mind that float sample = timestamp + float value, so 5x this is equivalent to five timestamps and five floats.
Expand Down
97 changes: 4 additions & 93 deletions pkg/streamingpromql/types/limiting_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestLimitingBucketedPool_Unlimited(t *testing.T) {
tracker := limiting.NewMemoryConsumptionTracker(0, metric)

p := NewLimitingBucketedPool(
pool.NewBucketedPool(1000, func(size int) []promql.FPoint { return make([]promql.FPoint, 0, size) }),
pool.NewBucketedPool(1024, func(size int) []promql.FPoint { return make([]promql.FPoint, 0, size) }),
FPointSize,
false,
nil,
Expand Down Expand Up @@ -78,7 +78,7 @@ func TestLimitingPool_Limited(t *testing.T) {
tracker := limiting.NewMemoryConsumptionTracker(limit, metric)

p := NewLimitingBucketedPool(
pool.NewBucketedPool(1000, func(size int) []promql.FPoint { return make([]promql.FPoint, 0, size) }),
pool.NewBucketedPool(1024, func(size int) []promql.FPoint { return make([]promql.FPoint, 0, size) }),
FPointSize,
false,
nil,
Expand Down Expand Up @@ -203,7 +203,7 @@ func TestLimitingPool_Mangling(t *testing.T) {
tracker := limiting.NewMemoryConsumptionTracker(0, metric)

p := NewLimitingBucketedPool(
pool.NewBucketedPool(1000, func(size int) []int { return make([]int, 0, size) }),
pool.NewBucketedPool(1024, func(size int) []int { return make([]int, 0, size) }),
1,
false,
func(_ int) int { return 123 },
Expand All @@ -228,99 +228,10 @@ func TestLimitingPool_Mangling(t *testing.T) {
require.Equal(t, []int{123, 123, 123, 123}, s, "returned slice should be mangled when mangling is enabled")
}

func TestLimitingBucketedPool_PowerOfTwoCapacities(t *testing.T) {
memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(0, nil)

pool := NewLimitingBucketedPool(
pool.NewBucketedPool(100_000, func(size int) []int { return make([]int, 0, size) }),
1,
false,
nil,
)

cases := []struct {
requestedSize int
expectedCap int
}{
{3, 4},
{5, 8},
{10, 16},
{65_000, 65_536},
{100_001, 131_072}, // Exceeds max, expect next power of two
}

for _, c := range cases {
slice, err := pool.Get(c.requestedSize, memoryConsumptionTracker)
require.NoError(t, err, "Unexpected error when requesting size %d", c.requestedSize)
require.Equal(t, c.expectedCap, cap(slice),
"LimitingBucketedPool.Get() returned slice with capacity %d; expected %d", cap(slice), c.expectedCap)
pool.Put(slice, memoryConsumptionTracker)
}
}

func TestLimitingBucketedPool_UnreasonableSizeRequest(t *testing.T) {
const maxMemoryLimit = 1_000_000 * FPointSize

reg, metric := createRejectedMetric()
memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(uint64(maxMemoryLimit), metric)

pool := NewLimitingBucketedPool(
pool.NewBucketedPool(100_000, func(size int) []int { return make([]int, 0, size) }),
1,
false,
nil,
)

// Request a reasonable size
slice, err := pool.Get(500_000, memoryConsumptionTracker)
require.NoError(t, err, "Expected to succeed for reasonable size request")
require.Equal(t, 524_288, cap(slice), "Capacity should be next power of two")
assertRejectedQueryCount(t, reg, 0)

pool.Put(slice, memoryConsumptionTracker)

// Request an unreasonable size
_, err = pool.Get(10_000_000, memoryConsumptionTracker)
require.Error(t, err, "Expected an error for unreasonably large size request")
require.Contains(t, err.Error(), "exceeded", "Error message should indicate memory consumption limit exceeded")
assertRejectedQueryCount(t, reg, 1)

require.Equal(t, uint64(0), memoryConsumptionTracker.CurrentEstimatedMemoryConsumptionBytes,
"Current memory consumption should remain at 0 after rejected request")
}

func TestLimitingBucketedPool_MaxExpectedPointsPerSeriesConstantIsPowerOfTwo(t *testing.T) {
// Although not strictly required (as the code should handle MaxExpectedPointsPerSeries not being a power of two correctly),
// it is best that we keep it as one for now.
require.True(t, isPowerOfTwo(MaxExpectedPointsPerSeries), "MaxExpectedPointsPerSeries must be a power of two")
}

func TestIsPowerOfTwo(t *testing.T) {
cases := []struct {
input int
expected bool
}{
{-2, false},
{1, true},
{2, true},
{3, false},
{4, true},
{5, false},
{6, false},
{7, false},
{8, true},
{16, true},
{32, true},
{1023, false},
{1024, true},
{1<<12 - 1, false},
{1 << 12, true},
}

for _, c := range cases {
result := isPowerOfTwo(c.input)
require.Equalf(t, c.expected, result, "isPowerOfTwo(%d) should return %v", c.input, c.expected)
}
require.True(t, pool.IsPowerOfTwo(MaxExpectedPointsPerSeries), "MaxExpectedPointsPerSeries must be a power of two")
}

func assertRejectedQueryCount(t *testing.T, reg *prometheus.Registry, expectedRejectionCount int) {
Expand Down
4 changes: 3 additions & 1 deletion pkg/streamingpromql/types/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
)

const (
maxExpectedSeriesPerResult = 10_000_000 // There's not too much science behind this number: this is the based on examining the largest queries seen at Grafana Labs.
// There's not too much science behind this number: this is the based on examining the largest queries seen at Grafana Labs.
// The number must also align with a power of two for our pools.
maxExpectedSeriesPerResult = 8_388_608
)

var (
Expand Down
17 changes: 17 additions & 0 deletions pkg/streamingpromql/types/pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// SPDX-License-Identifier: AGPL-3.0-only

package types

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/grafana/mimir/pkg/util/pool"
)

func TestMaxExpectedSeriesPerResultConstantIsPowerOfTwo(t *testing.T) {
// Although not strictly required (as the code should handle maxExpectedSeriesPerResult not being a power of two correctly),
// it is best that we keep it as one for now.
require.True(t, pool.IsPowerOfTwo(maxExpectedSeriesPerResult), "maxExpectedSeriesPerResult must be a power of two")
}
29 changes: 5 additions & 24 deletions pkg/streamingpromql/types/ring_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ func testRingBuffer[T any](t *testing.T, buf ringBuffer[T], points []T) {
err = buf.Use(subsliceWithPowerOfTwoCapacity)
require.NoError(t, err)
shouldHavePoints(t, buf, points[4:]...)

nonPowerOfTwoSlice := make([]T, 0, 15)
err = buf.Use(nonPowerOfTwoSlice)
require.EqualError(t, err, "slice capacity must be a power of two, but is 15",
"Error message should indicate the invalid capacity")
}

func TestRingBuffer_DiscardPointsBefore_ThroughWrapAround(t *testing.T) {
Expand Down Expand Up @@ -502,27 +507,3 @@ func setupRingBufferTestingPools(t *testing.T) {
putHPointSliceForRingBuffer = originalPutHPointSlice
})
}

func TestFPointRingBuffer_UseReturnsErrorOnNonPowerOfTwoSlice(t *testing.T) {
memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(0, nil)
buf := NewFPointRingBuffer(memoryConsumptionTracker)

nonPowerOfTwoSlice := make([]promql.FPoint, 0, 15)

err := buf.Use(nonPowerOfTwoSlice)
require.Error(t, err, "Use() should return an error for a non-power-of-two slice")
require.EqualError(t, err, "slice capacity must be a power of two, but is 15",
"Error message should indicate the invalid capacity")
}

func TestHPointRingBuffer_UseReturnsErrorOnNonPowerOfTwoSlice(t *testing.T) {
memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(0, nil)
buf := NewHPointRingBuffer(memoryConsumptionTracker)

nonPowerOfTwoSlice := make([]promql.HPoint, 0, 15)

err := buf.Use(nonPowerOfTwoSlice)
require.Error(t, err, "Use() should return an error for a non-power-of-two slice")
require.EqualError(t, err, "slice capacity must be a power of two, but is 15",
"Error message should indicate the invalid capacity")
}
16 changes: 11 additions & 5 deletions pkg/util/pool/bucketed_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type BucketedPool[T ~[]E, E any] struct {
func NewBucketedPool[T ~[]E, E any](maxSize uint, makeFunc func(int) T) *BucketedPool[T, E] {
if maxSize <= 1 {
panic("invalid maximum pool size")
} else if !IsPowerOfTwo(int(maxSize)) {
panic("bucket maxSize is not a power of two")
}

bucketCount := bits.Len(maxSize)
Expand All @@ -42,9 +44,8 @@ func NewBucketedPool[T ~[]E, E any](maxSize uint, makeFunc func(int) T) *Buckete
}

// Get returns a new slice with capacity greater than or equal to size.
// If no bucket large enough exists, a slice larger than the requested size
// of the next power of two is returned.
// Get guarantees the resulting slice always has a capacity in power of twos.
// The resulting slice always has a capacity that is a power of two.
// If size is greater than maxSize, then a slice is still returned, however it may not be drawn from a pool.
func (p *BucketedPool[T, E]) Get(size int) T {
if size < 0 {
panic(fmt.Sprintf("BucketedPool.Get with negative size %v", size))
Expand All @@ -56,7 +57,7 @@ func (p *BucketedPool[T, E]) Get(size int) T {

bucketIndex := bits.Len(uint(size - 1))

// If bucketIndex exceeds the number of available buckets, return a slice of the next power of two.
// If the requested size is larger than the size of the largest bucket, return a slice of the next power of two greater than or equal to size.
if bucketIndex >= len(p.buckets) {
nextPowerOfTwo := 1 << bucketIndex
return p.make(nextPowerOfTwo)
Expand All @@ -83,14 +84,19 @@ func (p *BucketedPool[T, E]) Put(s T) {

bucketIndex := bits.Len(size - 1)
if bucketIndex >= len(p.buckets) {
// This should never happen as maxSize is checked above, and enforced to be a power of 2
return // Ignore slices larger than the largest bucket
}

// Ignore slices that do not align to the current power of 2
// Ignore slices with capacity that is not a power of 2
// (this will only happen where a slice did not originally come from the pool).
if size != (1 << bucketIndex) {
return
}

p.buckets[bucketIndex].Put(s[0:0])
}

func IsPowerOfTwo(n int) bool {
return (n & (n - 1)) == 0
}
Loading

0 comments on commit ccb73ed

Please sign in to comment.