From 37d5e00fa4aa4d82f0087511538aee93a6874232 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Fri, 29 Nov 2024 17:20:37 +1100 Subject: [PATCH] MQE debt: use time range to determine argument value (#10050) * MQE debt: use time range to determine argument value * Update CHANGELOG --- CHANGELOG.md | 2 +- pkg/streamingpromql/functions.go | 26 +++++----- .../operators/functions/common.go | 10 ++-- .../operators/functions/common_test.go | 4 +- .../functions/function_over_instant_vector.go | 5 +- .../function_over_instant_vector_test.go | 4 +- .../operators/functions/math.go | 52 +++++-------------- .../operators/functions/native_histograms.go | 23 +++----- pkg/streamingpromql/query.go | 2 +- 9 files changed, 50 insertions(+), 78 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ffd90932e37..498f1efd813 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,7 +22,7 @@ * [CHANGE] Querier: The `.` pattern in regular expressions in PromQL matches newline characters. With this change regular expressions like `.*` match strings that include `\n`. To maintain the old behaviour, you will have to change regular expressions by replacing all `.` patterns with `[^\n]`, e.g. `foo[^\n]*`. This upgrades PromQL compatibility from Prometheus 2.0 to 3.0. #9844 * [CHANGE] Querier: Lookback and range selectors are left open and right closed (previously left closed and right closed). This change affects queries when the evaluation time perfectly aligns with the sample timestamps. For example assume querying a timeseries with evenly spaced samples exactly 1 minute apart. Previously, a range query with `5m` would usually return 5 samples, or 6 samples if the query evaluation aligns perfectly with a scrape. Now, queries like this will always return 5 samples. This upgrades PromQL compatibility from Prometheus 2.0 to 3.0. #9844 * [CHANGE] Querier: promql(native histograms): Introduce exponential interpolation. #9844 -* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558 #9588 #9589 #9639 #9641 #9642 #9651 #9664 #9681 #9717 #9719 #9724 #9874 #9929 #9998 #10007 #10010 #10046 #10047 #10048 +* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558 #9588 #9589 #9639 #9641 #9642 #9651 #9664 #9681 #9717 #9719 #9724 #9874 #9929 #9998 #10007 #10010 #10046 #10047 #10048 #10050 * [FEATURE] Distributor: Add support for `lz4` OTLP compression. #9763 * [FEATURE] Query-frontend: added experimental configuration options `query-frontend.cache-errors` and `query-frontend.results-cache-ttl-for-errors` to allow non-transient responses to be cached. When set to `true` error responses from hitting limits or bad data are cached for a short TTL. #9028 * [FEATURE] Query-frontend: add middleware to control access to specific PromQL experimental functions on a per-tenant basis. #9798 diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index 382781bacd9..95267bbf9fd 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -39,7 +39,7 @@ type ScalarFunctionOperatorFactory func( // - name: The name of the function // - f: The function implementation func SingleInputVectorFunctionOperatorFactory(name string, f functions.FunctionOverInstantVectorDefinition) InstantVectorFunctionOperatorFactory { - return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) { + return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { if len(args) != 1 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", name, len(args)) @@ -51,7 +51,7 @@ func SingleInputVectorFunctionOperatorFactory(name string, f functions.FunctionO return nil, fmt.Errorf("expected an instant vector argument for %s, got %T", name, args[0]) } - var o types.InstantVectorOperator = functions.NewFunctionOverInstantVector(inner, nil, memoryConsumptionTracker, f, expressionPosition) + var o types.InstantVectorOperator = functions.NewFunctionOverInstantVector(inner, nil, memoryConsumptionTracker, f, expressionPosition, timeRange) if f.SeriesMetadataFunction.NeedsSeriesDeduplication { o = operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker) @@ -141,7 +141,7 @@ func scalarToInstantVectorOperatorFactory(args []types.Operator, _ *limiting.Mem } func LabelReplaceFunctionOperatorFactory() InstantVectorFunctionOperatorFactory { - return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) { + return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { if len(args) != 5 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected exactly 5 argument for label_replace, got %v", len(args)) @@ -185,14 +185,14 @@ func LabelReplaceFunctionOperatorFactory() InstantVectorFunctionOperatorFactory }, } - o := functions.NewFunctionOverInstantVector(inner, nil, memoryConsumptionTracker, f, expressionPosition) + o := functions.NewFunctionOverInstantVector(inner, nil, memoryConsumptionTracker, f, expressionPosition, timeRange) return operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker), nil } } func ClampFunctionOperatorFactory() InstantVectorFunctionOperatorFactory { - return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) { + return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { if len(args) != 3 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected exactly 3 argument for clamp, got %v", len(args)) @@ -221,12 +221,12 @@ func ClampFunctionOperatorFactory() InstantVectorFunctionOperatorFactory { SeriesMetadataFunction: functions.DropSeriesName, } - return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{min, max}, memoryConsumptionTracker, f, expressionPosition), nil + return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{min, max}, memoryConsumptionTracker, f, expressionPosition, timeRange), nil } } func ClampMinMaxFunctionOperatorFactory(functionName string, isMin bool) InstantVectorFunctionOperatorFactory { - return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) { + return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { if len(args) != 2 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected exactly 2 argument for %s, got %v", functionName, len(args)) @@ -249,7 +249,7 @@ func ClampMinMaxFunctionOperatorFactory(functionName string, isMin bool) Instant SeriesMetadataFunction: functions.DropSeriesName, } - return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{clampTo}, memoryConsumptionTracker, f, expressionPosition), nil + return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{clampTo}, memoryConsumptionTracker, f, expressionPosition, timeRange), nil } } @@ -282,7 +282,7 @@ func RoundFunctionOperatorFactory() InstantVectorFunctionOperatorFactory { SeriesMetadataFunction: functions.DropSeriesName, } - return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{toNearest}, memoryConsumptionTracker, f, expressionPosition), nil + return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{toNearest}, memoryConsumptionTracker, f, expressionPosition, timeRange), nil } } @@ -311,7 +311,7 @@ func HistogramQuantileFunctionOperatorFactory() InstantVectorFunctionOperatorFac } func HistogramFractionFunctionOperatorFactory() InstantVectorFunctionOperatorFactory { - return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) { + return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { if len(args) != 3 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected exactly 3 argument for histogram_fraction, got %v", len(args)) @@ -340,7 +340,7 @@ func HistogramFractionFunctionOperatorFactory() InstantVectorFunctionOperatorFac SeriesMetadataFunction: functions.DropSeriesName, } - return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{lower, upper}, memoryConsumptionTracker, f, expressionPosition), nil + return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{lower, upper}, memoryConsumptionTracker, f, expressionPosition, timeRange), nil } } @@ -444,12 +444,12 @@ func instantVectorToScalarOperatorFactory(args []types.Operator, memoryConsumpti return scalars.NewInstantVectorToScalar(inner, timeRange, memoryConsumptionTracker, expressionPosition), nil } -func unaryNegationOfInstantVectorOperatorFactory(inner types.InstantVectorOperator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, expressionPosition posrange.PositionRange) types.InstantVectorOperator { +func unaryNegationOfInstantVectorOperatorFactory(inner types.InstantVectorOperator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) types.InstantVectorOperator { f := functions.FunctionOverInstantVectorDefinition{ SeriesDataFunc: functions.UnaryNegation, SeriesMetadataFunction: functions.DropSeriesName, } - o := functions.NewFunctionOverInstantVector(inner, nil, memoryConsumptionTracker, f, expressionPosition) + o := functions.NewFunctionOverInstantVector(inner, nil, memoryConsumptionTracker, f, expressionPosition, timeRange) return operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker) } diff --git a/pkg/streamingpromql/operators/functions/common.go b/pkg/streamingpromql/operators/functions/common.go index 302da1ea053..730aff89f26 100644 --- a/pkg/streamingpromql/operators/functions/common.go +++ b/pkg/streamingpromql/operators/functions/common.go @@ -25,11 +25,11 @@ var DropSeriesName = SeriesMetadataFunctionDefinition{ } // InstantVectorSeriesFunction is a function that takes in an instant vector and produces an instant vector. -type InstantVectorSeriesFunction func(seriesData types.InstantVectorSeriesData, scalarArgsData []types.ScalarData, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) +type InstantVectorSeriesFunction func(seriesData types.InstantVectorSeriesData, scalarArgsData []types.ScalarData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) // floatTransformationFunc is not needed elsewhere, so it is not exported yet func floatTransformationFunc(transform func(f float64) float64) InstantVectorSeriesFunction { - return func(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { + return func(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ types.QueryTimeRange, _ *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { for i := range seriesData.Floats { seriesData.Floats[i].F = transform(seriesData.Floats[i].F) } @@ -39,16 +39,16 @@ func floatTransformationFunc(transform func(f float64) float64) InstantVectorSer func FloatTransformationDropHistogramsFunc(transform func(f float64) float64) InstantVectorSeriesFunction { ft := floatTransformationFunc(transform) - return func(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { + return func(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { // Functions that do not explicitly mention native histograms in their documentation will ignore histogram samples. // https://prometheus.io/docs/prometheus/latest/querying/functions types.HPointSlicePool.Put(seriesData.Histograms, memoryConsumptionTracker) seriesData.Histograms = nil - return ft(seriesData, nil, memoryConsumptionTracker) + return ft(seriesData, nil, timeRange, memoryConsumptionTracker) } } -func PassthroughData(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { +func PassthroughData(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ types.QueryTimeRange, _ *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { return seriesData, nil } diff --git a/pkg/streamingpromql/operators/functions/common_test.go b/pkg/streamingpromql/operators/functions/common_test.go index 64e51e11a5d..a50c073b186 100644 --- a/pkg/streamingpromql/operators/functions/common_test.go +++ b/pkg/streamingpromql/operators/functions/common_test.go @@ -57,7 +57,7 @@ func TestFloatTransformationFunc(t *testing.T) { }, } - modifiedSeriesData, err := transformFunc(seriesData, nil, memoryConsumptionTracker) + modifiedSeriesData, err := transformFunc(seriesData, nil, types.QueryTimeRange{}, memoryConsumptionTracker) require.NoError(t, err) require.Equal(t, expected, modifiedSeriesData) require.Equal(t, types.FPointSize*2+types.HPointSize*1, memoryConsumptionTracker.CurrentEstimatedMemoryConsumptionBytes) @@ -88,7 +88,7 @@ func TestFloatTransformationDropHistogramsFunc(t *testing.T) { Histograms: nil, // Histograms should be dropped } - modifiedSeriesData, err := transformFunc(seriesData, nil, memoryConsumptionTracker) + modifiedSeriesData, err := transformFunc(seriesData, nil, types.QueryTimeRange{}, memoryConsumptionTracker) require.NoError(t, err) require.Equal(t, expected, modifiedSeriesData) // We expect the dropped histogram to be returned to the pool diff --git a/pkg/streamingpromql/operators/functions/function_over_instant_vector.go b/pkg/streamingpromql/operators/functions/function_over_instant_vector.go index c3aade6a62b..02a87d84468 100644 --- a/pkg/streamingpromql/operators/functions/function_over_instant_vector.go +++ b/pkg/streamingpromql/operators/functions/function_over_instant_vector.go @@ -32,6 +32,7 @@ type FunctionOverInstantVector struct { scalarArgsData []types.ScalarData expressionPosition posrange.PositionRange + timeRange types.QueryTimeRange } var _ types.InstantVectorOperator = &FunctionOverInstantVector{} @@ -42,6 +43,7 @@ func NewFunctionOverInstantVector( memoryConsumptionTracker *limiting.MemoryConsumptionTracker, f FunctionOverInstantVectorDefinition, expressionPosition posrange.PositionRange, + timeRange types.QueryTimeRange, ) *FunctionOverInstantVector { return &FunctionOverInstantVector{ Inner: inner, @@ -50,6 +52,7 @@ func NewFunctionOverInstantVector( Func: f, expressionPosition: expressionPosition, + timeRange: timeRange, } } @@ -99,7 +102,7 @@ func (m *FunctionOverInstantVector) NextSeries(ctx context.Context) (types.Insta return types.InstantVectorSeriesData{}, err } - return m.Func.SeriesDataFunc(series, m.scalarArgsData, m.MemoryConsumptionTracker) + return m.Func.SeriesDataFunc(series, m.scalarArgsData, m.timeRange, m.MemoryConsumptionTracker) } func (m *FunctionOverInstantVector) Close() { diff --git a/pkg/streamingpromql/operators/functions/function_over_instant_vector_test.go b/pkg/streamingpromql/operators/functions/function_over_instant_vector_test.go index ccf75397b0a..004e0e2e1fa 100644 --- a/pkg/streamingpromql/operators/functions/function_over_instant_vector_test.go +++ b/pkg/streamingpromql/operators/functions/function_over_instant_vector_test.go @@ -39,7 +39,7 @@ func TestFunctionOverInstantVector(t *testing.T) { } seriesDataFuncCalledTimes := 0 - mustBeCalledSeriesData := func(types.InstantVectorSeriesData, []types.ScalarData, *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { + mustBeCalledSeriesData := func(types.InstantVectorSeriesData, []types.ScalarData, types.QueryTimeRange, *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { seriesDataFuncCalledTimes++ return types.InstantVectorSeriesData{}, nil } @@ -85,7 +85,7 @@ func TestFunctionOverInstantVectorWithScalarArgs(t *testing.T) { } seriesDataFuncCalledTimes := 0 - mustBeCalledSeriesData := func(_ types.InstantVectorSeriesData, scalarArgs []types.ScalarData, _ *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { + mustBeCalledSeriesData := func(_ types.InstantVectorSeriesData, scalarArgs []types.ScalarData, _ types.QueryTimeRange, _ *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { seriesDataFuncCalledTimes++ // Verify that the scalar arguments are correctly passed and in the order we expect require.Equal(t, 2, len(scalarArgs)) diff --git a/pkg/streamingpromql/operators/functions/math.go b/pkg/streamingpromql/operators/functions/math.go index 571ad769992..fe08663e78d 100644 --- a/pkg/streamingpromql/operators/functions/math.go +++ b/pkg/streamingpromql/operators/functions/math.go @@ -52,7 +52,7 @@ var Sgn = FloatTransformationDropHistogramsFunc(func(f float64) float64 { return f }) -var UnaryNegation InstantVectorSeriesFunction = func(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { +var UnaryNegation InstantVectorSeriesFunction = func(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ types.QueryTimeRange, _ *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { for i := range seriesData.Floats { seriesData.Floats[i].F = -seriesData.Floats[i].F } @@ -64,24 +64,16 @@ var UnaryNegation InstantVectorSeriesFunction = func(seriesData types.InstantVec return seriesData, nil } -var Clamp InstantVectorSeriesFunction = func(seriesData types.InstantVectorSeriesData, scalarArgsData []types.ScalarData, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { +var Clamp InstantVectorSeriesFunction = func(seriesData types.InstantVectorSeriesData, scalarArgsData []types.ScalarData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { outputIdx := 0 minArg := scalarArgsData[0] maxArg := scalarArgsData[1] - // There will always be a scalar at every step of the query. - // However, there may not be a sample at a step. So we need to - // keep track of where we are up to step-wise with the scalars, - // incrementing through the scalars until their timestamp matches - // the samples. - argIdx := 0 - for _, data := range seriesData.Floats { - for data.T > minArg.Samples[argIdx].T { - argIdx++ - } - minVal := minArg.Samples[argIdx].F - maxVal := maxArg.Samples[argIdx].F + // Scalars are guaranteed to have a point for each step in the query. + idx := timeRange.PointIndex(data.T) + minVal := minArg.Samples[idx].F + maxVal := maxArg.Samples[idx].F if maxVal < minVal { // Drop this point as there is no valid answer @@ -109,22 +101,14 @@ func ClampMinMaxFactory(isMin bool) InstantVectorSeriesFunction { } } - return func(seriesData types.InstantVectorSeriesData, scalarArgsData []types.ScalarData, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { + return func(seriesData types.InstantVectorSeriesData, scalarArgsData []types.ScalarData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { clampTo := scalarArgsData[0] - // There will always be a scalar at every step of the query. - // However, there may not be a sample at a step. So we need to - // keep track of where we are up to step-wise with the scalars, - // incrementing through the scalars until their timestamp matches - // the samples. - argIdx := 0 - for step, data := range seriesData.Floats { - for data.T > clampTo.Samples[argIdx].T { - argIdx++ - } + // Scalars are guaranteed to have a point for each step in the query. + idx := timeRange.PointIndex(data.T) + val := clampTo.Samples[idx].F - val := clampTo.Samples[argIdx].F // We reuse the existing FPoint slice in place seriesData.Floats[step].F = clampFunc(val, data.F) } @@ -137,23 +121,15 @@ func ClampMinMaxFactory(isMin bool) InstantVectorSeriesFunction { // round returns a number rounded to toNearest. // Ties are solved by rounding up. -var Round InstantVectorSeriesFunction = func(seriesData types.InstantVectorSeriesData, scalarArgsData []types.ScalarData, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { +var Round InstantVectorSeriesFunction = func(seriesData types.InstantVectorSeriesData, scalarArgsData []types.ScalarData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { toNearest := scalarArgsData[0] - // There will always be a scalar at every step of the query. - // However, there may not be a sample at a step. So we need to - // keep track of where we are up to step-wise with the scalars, - // incrementing through the scalars until their timestamp matches - // the samples. - argIdx := 0 - for step, data := range seriesData.Floats { - for data.T > toNearest.Samples[argIdx].T { - argIdx++ - } + // Scalars are guaranteed to have a point for each step in the query. + idx := timeRange.PointIndex(data.T) // Invert as it seems to cause fewer floating point accuracy issues. - toNearestInverse := 1.0 / toNearest.Samples[argIdx].F + toNearestInverse := 1.0 / toNearest.Samples[idx].F // We reuse the existing FPoint slice in place seriesData.Floats[step].F = math.Floor(data.F*toNearestInverse+0.5) / toNearestInverse diff --git a/pkg/streamingpromql/operators/functions/native_histograms.go b/pkg/streamingpromql/operators/functions/native_histograms.go index 1f5345113e7..46c1795e6ac 100644 --- a/pkg/streamingpromql/operators/functions/native_histograms.go +++ b/pkg/streamingpromql/operators/functions/native_histograms.go @@ -13,7 +13,7 @@ import ( "github.com/grafana/mimir/pkg/streamingpromql/types" ) -func HistogramAvg(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { +func HistogramAvg(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { floats, err := types.FPointSlicePool.Get(len(seriesData.Histograms), memoryConsumptionTracker) if err != nil { return types.InstantVectorSeriesData{}, err @@ -35,7 +35,7 @@ func HistogramAvg(seriesData types.InstantVectorSeriesData, _ []types.ScalarData return data, nil } -func HistogramCount(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { +func HistogramCount(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { floats, err := types.FPointSlicePool.Get(len(seriesData.Histograms), memoryConsumptionTracker) if err != nil { return types.InstantVectorSeriesData{}, err @@ -57,7 +57,7 @@ func HistogramCount(seriesData types.InstantVectorSeriesData, _ []types.ScalarDa return data, nil } -func HistogramFraction(seriesData types.InstantVectorSeriesData, scalarArgsData []types.ScalarData, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { +func HistogramFraction(seriesData types.InstantVectorSeriesData, scalarArgsData []types.ScalarData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { floats, err := types.FPointSlicePool.Get(len(seriesData.Histograms), memoryConsumptionTracker) if err != nil { return types.InstantVectorSeriesData{}, err @@ -69,19 +69,12 @@ func HistogramFraction(seriesData types.InstantVectorSeriesData, scalarArgsData lower := scalarArgsData[0] upper := scalarArgsData[1] - // There will always be a scalar at every step of the query. - // However, there may not be a sample at a step. So we need to - // keep track of where we are up to step-wise with the scalars, - // incrementing through the scalars until their timestamp matches - // the samples. - argIdx := 0 for _, histogram := range seriesData.Histograms { - for histogram.T > lower.Samples[argIdx].T { - argIdx++ - } - lowerVal := lower.Samples[argIdx].F - upperVal := upper.Samples[argIdx].F + // Scalars are guaranteed to have a point for each step in the query. + idx := timeRange.PointIndex(histogram.T) + lowerVal := lower.Samples[idx].F + upperVal := upper.Samples[idx].F data.Floats = append(data.Floats, promql.FPoint{ T: histogram.T, @@ -94,7 +87,7 @@ func HistogramFraction(seriesData types.InstantVectorSeriesData, scalarArgsData return data, nil } -func HistogramSum(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { +func HistogramSum(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { floats, err := types.FPointSlicePool.Get(len(seriesData.Histograms), memoryConsumptionTracker) if err != nil { return types.InstantVectorSeriesData{}, err diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index d62238975c7..2f840f2aff6 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -287,7 +287,7 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr, timeRange types return nil, err } - return unaryNegationOfInstantVectorOperatorFactory(inner, q.memoryConsumptionTracker, e.PositionRange()), nil + return unaryNegationOfInstantVectorOperatorFactory(inner, q.memoryConsumptionTracker, e.PositionRange(), timeRange), nil case *parser.StepInvariantExpr: // One day, we'll do something smarter here.