Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming engine: Native histogram aggregations #8360

Merged
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

* [CHANGE] Store-gateway / querier: enable streaming chunks from store-gateways to queriers by default. #6646
* [CHANGE] Querier: honor the start/end time range specified in the read hints when executing a remote read request. #8431
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360
* [ENHANCEMENT] Compactor: Add `cortex_compactor_compaction_job_duration_seconds` and `cortex_compactor_compaction_job_blocks` histogram metrics to track duration of individual compaction jobs and number of blocks per job. #8371
* [ENHANCEMENT] Rules: Added per namespace max rules per rule group limit. The maximum number of rules per rule groups for all namespaces continues to be configured by `-ruler.max-rules-per-rule-group`, but now, this can be superseded by the new `-ruler.max-rules-per-rule-group-by-namespace` option on a per namespace basis. This new limit can be overridden using the overrides mechanism to be applied per-tenant. #8378
* [ENHANCEMENT] Rules: Added per namespace max rule groups per tenant limit. The maximum number of rule groups per rule tenant for all namespaces continues to be configured by `-ruler.max-rule-groups-per-tenant`, but now, this can be superseded by the new `-ruler.max-rule-groups-per-tenant-by-namespace` option on a per namespace basis. This new limit can be overridden using the overrides mechanism to be applied per-tenant. #8425
Expand Down
58 changes: 54 additions & 4 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,8 @@ func TestMemoryConsumptionLimit_SingleQueries(t *testing.T) {
some_metric{idx="3"} 0+1x5
some_metric{idx="4"} 0+1x5
some_metric{idx="5"} 0+1x5
some_histogram{idx="1"} {{schema:1 sum:10 count:9 buckets:[3 3 3]}}x5
some_histogram{idx="2"} {{schema:1 sum:10 count:9 buckets:[3 3 3]}}x5
`)
t.Cleanup(func() { require.NoError(t, storage.Close()) })

Expand Down Expand Up @@ -524,12 +526,17 @@ func TestMemoryConsumptionLimit_SingleQueries(t *testing.T) {
shouldSucceed: true,

// Each series has five samples, which will be rounded up to 8 (the nearest power of two) by the bucketed pool.
// At peak we'll hold in memory: the running total for the sum() (a float and a bool at each step, with the number of steps rounded to the nearest power of 2), and the next series from the selector.
// At peak we'll hold in memory:
// - the running total for the sum() (a float and a bool at each step, with the number of steps rounded to the nearest power of 2),
// - and the next series from the selector.
rangeQueryExpectedPeak: 8*(pooling.Float64Size+pooling.BoolSize) + 8*pooling.FPointSize,
rangeQueryLimit: 8*(pooling.Float64Size+pooling.BoolSize) + 8*pooling.FPointSize,

// Each series has one sample, which is already a power of two.
// At peak we'll hold in memory: the running total for the sum() (a float and a bool), the next series from the selector, and the output sample.
// At peak we'll hold in memory:
// - the running total for the sum() (a float and a bool),
// - the next series from the selector,
// - and the output sample.
instantQueryExpectedPeak: pooling.Float64Size + pooling.BoolSize + pooling.FPointSize + pooling.VectorSampleSize,
instantQueryLimit: pooling.Float64Size + pooling.BoolSize + pooling.FPointSize + pooling.VectorSampleSize,
},
Expand All @@ -538,17 +545,60 @@ func TestMemoryConsumptionLimit_SingleQueries(t *testing.T) {
shouldSucceed: false,

// Each series has five samples, which will be rounded up to 8 (the nearest power of two) by the bucketed pool.
// At peak we'll hold in memory: the running total for the sum() (a float and a bool at each step, with the number of steps rounded to the nearest power of 2), and the next series from the selector.
// At peak we'll hold in memory:
// - the running total for the sum() (a float and a bool at each step, with the number of steps rounded to the nearest power of 2),
// - and the next series from the selector.
// The last thing to be allocated is the bool slice for the running total, so that won't contribute to the peak before the query is aborted.
rangeQueryExpectedPeak: 8*pooling.Float64Size + 8*pooling.FPointSize,
rangeQueryLimit: 8*(pooling.Float64Size+pooling.BoolSize) + 8*pooling.FPointSize - 1,

// Each series has one sample, which is already a power of two.
// At peak we'll hold in memory: the running total for the sum() (a float and a bool), the next series from the selector, and the output sample.
// At peak we'll hold in memory:
// - the running total for the sum() (a float and a bool),
// - the next series from the selector,
// - and the output sample.
// The last thing to be allocated is the bool slice for the running total, so that won't contribute to the peak before the query is aborted.
instantQueryExpectedPeak: pooling.Float64Size + pooling.FPointSize + pooling.VectorSampleSize,
instantQueryLimit: pooling.Float64Size + pooling.BoolSize + pooling.FPointSize + pooling.VectorSampleSize - 1,
},
"histogram: limit enabled, but query does not exceed limit": {
expr: "sum(some_histogram)",
shouldSucceed: true,

// Each series has five samples, which will be rounded up to 8 (the nearest power of two) by the bucketed pool.
// At peak we'll hold in memory:
// - the running total for the sum() (a histogram pointer at each step, with the number of steps rounded to the nearest power of 2),
// - and the next series from the selector.
rangeQueryExpectedPeak: 8*pooling.HistogramPointerSize + 8*pooling.HPointSize,
rangeQueryLimit: 8*pooling.HistogramPointerSize + 8*pooling.HPointSize,
// Each series has one sample, which is already a power of two.
// At peak we'll hold in memory:
// - the running total for the sum() (a histogram pointer),
// - the next series from the selector,
// - and the output sample.
instantQueryExpectedPeak: pooling.HistogramPointerSize + pooling.HPointSize + pooling.VectorSampleSize,
instantQueryLimit: pooling.HistogramPointerSize + pooling.HPointSize + pooling.VectorSampleSize,
},
"histogram: limit enabled, and query exceeds limit": {
expr: "sum(some_histogram)",
shouldSucceed: false,

// Each series has five samples, which will be rounded up to 8 (the nearest power of two) by the bucketed pool.
// At peak we'll hold in memory:
// - the running total for the sum() (a histogram pointer at each step, with the number of steps rounded to the nearest power of 2),
// - and the next series from the selector.
// The last thing to be allocated is the HistogramPointerSize slice for the running total, so that won't contribute to the peak before the query is aborted.
rangeQueryExpectedPeak: 8 * pooling.HPointSize,
rangeQueryLimit: 8*pooling.HistogramPointerSize + 8*pooling.HPointSize - 1,
// Each series has one sample, which is already a power of two.
// At peak we'll hold in memory:
// - the running total for the sum() (a histogram pointer),
// - the next series from the selector,
// - and the output sample.
// The last thing to be allocated is the HistogramPointerSize slice for the running total, so that won't contribute to the peak before the query is aborted.
instantQueryExpectedPeak: pooling.HPointSize + pooling.VectorSampleSize,
instantQueryLimit: pooling.HistogramPointerSize + pooling.HPointSize + pooling.VectorSampleSize - 1,
},
}

createEngine := func(t *testing.T, limit uint64) (promql.QueryEngine, *prometheus.Registry, opentracing.Span, context.Context) {
Expand Down
109 changes: 82 additions & 27 deletions pkg/streamingpromql/operators/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sort"
"time"

"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/promql"
Expand Down Expand Up @@ -46,9 +47,11 @@ type group struct {
// Used to sort groups in the order that they'll be completed in.
lastSeriesIndex int

// Sum and presence for each step.
sums []float64
present []bool
// Sum, presence, and histograms for each step.
floatSums []float64
floatPresent []bool
histogramSums []*histogram.FloatHistogram
histogramPointCount int
}

var _ types.InstantVectorOperator = &Aggregation{}
Expand Down Expand Up @@ -152,30 +155,60 @@ func (a *Aggregation) NextSeries(ctx context.Context) (types.InstantVectorSeries
thisSeriesGroup := a.remainingInnerSeriesToGroup[0]
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
a.remainingInnerSeriesToGroup = a.remainingInnerSeriesToGroup[1:]

if thisSeriesGroup.sums == nil {
if len(s.Floats) > 0 && thisSeriesGroup.floatSums == nil {
// First series for this group, populate it.

thisSeriesGroup.sums, err = a.Pool.GetFloatSlice(steps)
thisSeriesGroup.floatSums, err = a.Pool.GetFloatSlice(steps)
if err != nil {
return types.InstantVectorSeriesData{}, err
}

thisSeriesGroup.present, err = a.Pool.GetBoolSlice(steps)
thisSeriesGroup.floatPresent, err = a.Pool.GetBoolSlice(steps)
if err != nil {
return types.InstantVectorSeriesData{}, err
}
thisSeriesGroup.floatSums = thisSeriesGroup.floatSums[:steps]
thisSeriesGroup.floatPresent = thisSeriesGroup.floatPresent[:steps]
}

thisSeriesGroup.sums = thisSeriesGroup.sums[:steps]
thisSeriesGroup.present = thisSeriesGroup.present[:steps]
if len(s.Histograms) > 0 && thisSeriesGroup.histogramSums == nil {
// First series for this group, populate it.
thisSeriesGroup.histogramSums, err = a.Pool.GetHistogramPointerSlice(steps)
if err != nil {
return types.InstantVectorSeriesData{}, err
}
thisSeriesGroup.histogramSums = thisSeriesGroup.histogramSums[:steps]
}

for _, p := range s.Floats {
idx := (p.T - start) / interval
thisSeriesGroup.sums[idx] += p.F
thisSeriesGroup.present[idx] = true
thisSeriesGroup.floatSums[idx] += p.F
thisSeriesGroup.floatPresent[idx] = true
}

a.Pool.PutFPointSlice(s.Floats)
for _, p := range s.Histograms {
idx := (p.T - start) / interval

// If a mix of histogram samples and float samples, the corresponding vector element is removed from the output vector entirely.
if thisSeriesGroup.floatPresent != nil && thisSeriesGroup.floatPresent[idx] {
thisSeriesGroup.floatPresent[idx] = false
continue
}
jhesketh marked this conversation as resolved.
Show resolved Hide resolved

if thisSeriesGroup.histogramSums[idx] == nil {
// We copy here because we modify the histogram through Add later on.
// It is necessary to preserve the original Histogram in case of any range-queries using lookback.
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
thisSeriesGroup.histogramSums[idx] = p.H.Copy()
// We already have to do the check if the histogram exists at this idx,
// so we can count the histogram points present at this point instead
// of needing to loop again later like we do for floats.
thisSeriesGroup.histogramPointCount++
} else {
thisSeriesGroup.histogramSums[idx] = thisSeriesGroup.histogramSums[idx].Add(p.H)
}
}

a.Pool.PutInstantVectorSeriesData(s)
thisSeriesGroup.remainingSeriesCount--
}

Expand All @@ -185,33 +218,55 @@ func (a *Aggregation) NextSeries(ctx context.Context) (types.InstantVectorSeries
// series which is more costly than looping again here and just checking each
// point of the already grouped series.
// See: https://github.com/grafana/mimir/pull/8442
pointCount := 0
for _, p := range thisGroup.present {
floatPointCount := 0
for _, p := range thisGroup.floatPresent {
if p {
pointCount++
floatPointCount++
}
}

points, err := a.Pool.GetFPointSlice(pointCount)
if err != nil {
return types.InstantVectorSeriesData{}, err
var floatPoints []promql.FPoint
var err error
if floatPointCount > 0 {
floatPoints, err = a.Pool.GetFPointSlice(floatPointCount)
if err != nil {
return types.InstantVectorSeriesData{}, err
}

for i, havePoint := range thisGroup.floatPresent {
if havePoint {
t := start + int64(i)*interval
floatPoints = append(floatPoints, promql.FPoint{T: t, F: thisGroup.floatSums[i]})
}
}
}

for i, havePoint := range thisGroup.present {
if havePoint {
t := start + int64(i)*interval
points = append(points, promql.FPoint{T: t, F: thisGroup.sums[i]})
a.Pool.PutFloatSlice(thisGroup.floatSums)
a.Pool.PutBoolSlice(thisGroup.floatPresent)
thisGroup.floatSums = nil
thisGroup.floatPresent = nil

var histogramPoints []promql.HPoint
if thisGroup.histogramPointCount > 0 {
histogramPoints, err = a.Pool.GetHPointSlice(thisGroup.histogramPointCount)
if err != nil {
return types.InstantVectorSeriesData{}, err
}

for i, h := range thisGroup.histogramSums {
if h != nil {
t := start + int64(i)*interval
histogramPoints = append(histogramPoints, promql.HPoint{T: t, H: thisGroup.histogramSums[i]})
}
}
}

a.Pool.PutFloatSlice(thisGroup.sums)
a.Pool.PutBoolSlice(thisGroup.present)
a.Pool.PutHistogramPointerSlice(thisGroup.histogramSums)
thisGroup.histogramSums = nil
thisGroup.histogramPointCount = 0

thisGroup.sums = nil
thisGroup.present = nil
groupPool.Put(thisGroup)

return types.InstantVectorSeriesData{Floats: points}, nil
return types.InstantVectorSeriesData{Floats: floatPoints, Histograms: histogramPoints}, nil
}

func (a *Aggregation) Close() {
Expand Down
41 changes: 36 additions & 5 deletions pkg/streamingpromql/pooling/limiting_pool.go
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"unsafe"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/promql"

"github.com/grafana/mimir/pkg/streamingpromql/types"
Expand All @@ -21,11 +22,12 @@ const (
// Keep in mind that float sample = timestamp + float value, so 5x this is equivalent to five timestamps and five floats.
nativeHistogramSampleSizeFactor = 5

FPointSize = uint64(unsafe.Sizeof(promql.FPoint{}))
HPointSize = uint64(FPointSize * nativeHistogramSampleSizeFactor)
VectorSampleSize = uint64(unsafe.Sizeof(promql.Sample{})) // This assumes each sample is a float sample, not a histogram.
Float64Size = uint64(unsafe.Sizeof(float64(0)))
BoolSize = uint64(unsafe.Sizeof(false))
FPointSize = uint64(unsafe.Sizeof(promql.FPoint{}))
HPointSize = uint64(FPointSize * nativeHistogramSampleSizeFactor)
VectorSampleSize = uint64(unsafe.Sizeof(promql.Sample{})) // This assumes each sample is a float sample, not a histogram.
Float64Size = uint64(unsafe.Sizeof(float64(0)))
BoolSize = uint64(unsafe.Sizeof(false))
HistogramPointerSize = uint64(unsafe.Sizeof((*histogram.FloatHistogram)(nil)))
)

var (
Expand All @@ -48,6 +50,10 @@ var (
boolSlicePool = pool.NewBucketedPool(1, maxExpectedPointsPerSeries, pointsPerSeriesBucketFactor, func(size int) []bool {
return make([]bool, 0, size)
})

histogramSlicePool = pool.NewBucketedPool(1, maxExpectedPointsPerSeries, pointsPerSeriesBucketFactor, func(size int) []*histogram.FloatHistogram {
return make([]*histogram.FloatHistogram, 0, size)
})
)

// LimitingPool manages sample slices for a single query evaluation, and applies any max in-memory bytes limit.
Expand Down Expand Up @@ -206,6 +212,31 @@ func (p *LimitingPool) PutBoolSlice(s []bool) {
putWithElementSize(p, boolSlicePool, BoolSize, s)
}

// GetHistogramPointerSlice returns a slice of FloatHistogram of length 0 and capacity greater than or equal to size.
//
// If the capacity of the returned slice would cause the max memory consumption limit to be exceeded, then an error is returned.
//
// Every element of the returned slice up to the requested size will have an empty histogram.
//
// Note that the capacity of the returned slice may be significantly larger than size, depending on the configuration of the underlying bucketed pool.
func (p *LimitingPool) GetHistogramPointerSlice(size int) ([]*histogram.FloatHistogram, error) {
s, err := getWithElementSize(p, histogramSlicePool, size, HistogramPointerSize)
if err != nil {
return nil, err
}

// This is not necessary if we've just created a new slice, it'll already have all elements reset.
// But we do it unconditionally for simplicity.
clear(s[:size])

return s, nil
}

// PutHistogramPointerSlice returns a slice of FloatHistogram to the pool and updates the current number of in-memory samples.
func (p *LimitingPool) PutHistogramPointerSlice(s []*histogram.FloatHistogram) {
putWithElementSize(p, histogramSlicePool, HistogramPointerSize, s)
}

// PutInstantVectorSeriesData is equivalent to calling PutFPointSlice(d.Floats) and PutHPointSlice(d.Histograms).
func (p *LimitingPool) PutInstantVectorSeriesData(d types.InstantVectorSeriesData) {
p.PutFPointSlice(d.Floats)
Expand Down
28 changes: 28 additions & 0 deletions pkg/streamingpromql/pooling/limiting_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/model/histogram"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -45,6 +46,12 @@ func TestLimitingPool_Unlimited(t *testing.T) {
pool := NewLimitingPool(0, metric)
testUnlimitedPool(t, pool.GetBoolSlice, pool.PutBoolSlice, pool, BoolSize, reg)
})

t.Run("[]*histogram.FloatHistogram", func(t *testing.T) {
reg, metric := createRejectedMetric()
pool := NewLimitingPool(0, metric)
testUnlimitedPool(t, pool.GetHistogramPointerSlice, pool.PutHistogramPointerSlice, pool, HistogramPointerSize, reg)
})
}

func testUnlimitedPool[E any, S ~[]E](t *testing.T, get func(int) (S, error), put func(S), pool *LimitingPool, elementSize uint64, reg *prometheus.Registry) {
Expand Down Expand Up @@ -119,6 +126,12 @@ func TestLimitingPool_Limited(t *testing.T) {
pool := NewLimitingPool(11*BoolSize, metric)
testLimitedPool(t, pool.GetBoolSlice, pool.PutBoolSlice, pool, BoolSize, reg)
})

t.Run("[]*histogram.FloatHistogram", func(t *testing.T) {
reg, metric := createRejectedMetric()
pool := NewLimitingPool(11*HistogramPointerSize, metric)
testLimitedPool(t, pool.GetHistogramPointerSlice, pool.PutHistogramPointerSlice, pool, HistogramPointerSize, reg)
})
}

func testLimitedPool[E any, S ~[]E](t *testing.T, get func(int) (S, error), put func(S), pool *LimitingPool, elementSize uint64, reg *prometheus.Registry) {
Expand Down Expand Up @@ -223,6 +236,21 @@ func TestLimitingPool_ClearsReturnedSlices(t *testing.T) {
boolSlice = boolSlice[:2]
require.Equal(t, []bool{false, false}, boolSlice)
})

t.Run("[]*histogram.FloatHistogram", func(t *testing.T) {
HistogramPointerSlice, err := pool.GetHistogramPointerSlice(2)
require.NoError(t, err)
HistogramPointerSlice = HistogramPointerSlice[:2]
HistogramPointerSlice[0] = &histogram.FloatHistogram{Count: 1}
HistogramPointerSlice[1] = &histogram.FloatHistogram{Count: 2}

pool.PutHistogramPointerSlice(HistogramPointerSlice)

HistogramPointerSlice, err = pool.GetHistogramPointerSlice(2)
require.NoError(t, err)
HistogramPointerSlice = HistogramPointerSlice[:2]
require.Equal(t, []*histogram.FloatHistogram{nil, nil}, HistogramPointerSlice)
})
}

func createRejectedMetric() (*prometheus.Registry, prometheus.Counter) {
Expand Down
Loading