Skip to content

Commit

Permalink
MQE debt: use time range to determine argument value (#10050)
Browse files Browse the repository at this point in the history
* MQE debt: use time range to determine argument value

* Update CHANGELOG
  • Loading branch information
jhesketh authored Nov 29, 2024
1 parent 64324a7 commit 37d5e00
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 78 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* [CHANGE] Querier: The `.` pattern in regular expressions in PromQL matches newline characters. With this change regular expressions like `.*` match strings that include `\n`. To maintain the old behaviour, you will have to change regular expressions by replacing all `.` patterns with `[^\n]`, e.g. `foo[^\n]*`. This upgrades PromQL compatibility from Prometheus 2.0 to 3.0. #9844
* [CHANGE] Querier: Lookback and range selectors are left open and right closed (previously left closed and right closed). This change affects queries when the evaluation time perfectly aligns with the sample timestamps. For example assume querying a timeseries with evenly spaced samples exactly 1 minute apart. Previously, a range query with `5m` would usually return 5 samples, or 6 samples if the query evaluation aligns perfectly with a scrape. Now, queries like this will always return 5 samples. This upgrades PromQL compatibility from Prometheus 2.0 to 3.0. #9844
* [CHANGE] Querier: promql(native histograms): Introduce exponential interpolation. #9844
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558 #9588 #9589 #9639 #9641 #9642 #9651 #9664 #9681 #9717 #9719 #9724 #9874 #9929 #9998 #10007 #10010 #10046 #10047 #10048
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558 #9588 #9589 #9639 #9641 #9642 #9651 #9664 #9681 #9717 #9719 #9724 #9874 #9929 #9998 #10007 #10010 #10046 #10047 #10048 #10050
* [FEATURE] Distributor: Add support for `lz4` OTLP compression. #9763
* [FEATURE] Query-frontend: added experimental configuration options `query-frontend.cache-errors` and `query-frontend.results-cache-ttl-for-errors` to allow non-transient responses to be cached. When set to `true` error responses from hitting limits or bad data are cached for a short TTL. #9028
* [FEATURE] Query-frontend: add middleware to control access to specific PromQL experimental functions on a per-tenant basis. #9798
Expand Down
26 changes: 13 additions & 13 deletions pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type ScalarFunctionOperatorFactory func(
// - name: The name of the function
// - f: The function implementation
func SingleInputVectorFunctionOperatorFactory(name string, f functions.FunctionOverInstantVectorDefinition) InstantVectorFunctionOperatorFactory {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
if len(args) != 1 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", name, len(args))
Expand All @@ -51,7 +51,7 @@ func SingleInputVectorFunctionOperatorFactory(name string, f functions.FunctionO
return nil, fmt.Errorf("expected an instant vector argument for %s, got %T", name, args[0])
}

var o types.InstantVectorOperator = functions.NewFunctionOverInstantVector(inner, nil, memoryConsumptionTracker, f, expressionPosition)
var o types.InstantVectorOperator = functions.NewFunctionOverInstantVector(inner, nil, memoryConsumptionTracker, f, expressionPosition, timeRange)

if f.SeriesMetadataFunction.NeedsSeriesDeduplication {
o = operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker)
Expand Down Expand Up @@ -141,7 +141,7 @@ func scalarToInstantVectorOperatorFactory(args []types.Operator, _ *limiting.Mem
}

func LabelReplaceFunctionOperatorFactory() InstantVectorFunctionOperatorFactory {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
if len(args) != 5 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 5 argument for label_replace, got %v", len(args))
Expand Down Expand Up @@ -185,14 +185,14 @@ func LabelReplaceFunctionOperatorFactory() InstantVectorFunctionOperatorFactory
},
}

o := functions.NewFunctionOverInstantVector(inner, nil, memoryConsumptionTracker, f, expressionPosition)
o := functions.NewFunctionOverInstantVector(inner, nil, memoryConsumptionTracker, f, expressionPosition, timeRange)

return operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker), nil
}
}

func ClampFunctionOperatorFactory() InstantVectorFunctionOperatorFactory {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
if len(args) != 3 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 3 argument for clamp, got %v", len(args))
Expand Down Expand Up @@ -221,12 +221,12 @@ func ClampFunctionOperatorFactory() InstantVectorFunctionOperatorFactory {
SeriesMetadataFunction: functions.DropSeriesName,
}

return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{min, max}, memoryConsumptionTracker, f, expressionPosition), nil
return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{min, max}, memoryConsumptionTracker, f, expressionPosition, timeRange), nil
}
}

func ClampMinMaxFunctionOperatorFactory(functionName string, isMin bool) InstantVectorFunctionOperatorFactory {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
if len(args) != 2 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 2 argument for %s, got %v", functionName, len(args))
Expand All @@ -249,7 +249,7 @@ func ClampMinMaxFunctionOperatorFactory(functionName string, isMin bool) Instant
SeriesMetadataFunction: functions.DropSeriesName,
}

return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{clampTo}, memoryConsumptionTracker, f, expressionPosition), nil
return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{clampTo}, memoryConsumptionTracker, f, expressionPosition, timeRange), nil
}
}

Expand Down Expand Up @@ -282,7 +282,7 @@ func RoundFunctionOperatorFactory() InstantVectorFunctionOperatorFactory {
SeriesMetadataFunction: functions.DropSeriesName,
}

return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{toNearest}, memoryConsumptionTracker, f, expressionPosition), nil
return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{toNearest}, memoryConsumptionTracker, f, expressionPosition, timeRange), nil
}
}

Expand Down Expand Up @@ -311,7 +311,7 @@ func HistogramQuantileFunctionOperatorFactory() InstantVectorFunctionOperatorFac
}

func HistogramFractionFunctionOperatorFactory() InstantVectorFunctionOperatorFactory {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
if len(args) != 3 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 3 argument for histogram_fraction, got %v", len(args))
Expand Down Expand Up @@ -340,7 +340,7 @@ func HistogramFractionFunctionOperatorFactory() InstantVectorFunctionOperatorFac
SeriesMetadataFunction: functions.DropSeriesName,
}

return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{lower, upper}, memoryConsumptionTracker, f, expressionPosition), nil
return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{lower, upper}, memoryConsumptionTracker, f, expressionPosition, timeRange), nil
}
}

Expand Down Expand Up @@ -444,12 +444,12 @@ func instantVectorToScalarOperatorFactory(args []types.Operator, memoryConsumpti
return scalars.NewInstantVectorToScalar(inner, timeRange, memoryConsumptionTracker, expressionPosition), nil
}

func unaryNegationOfInstantVectorOperatorFactory(inner types.InstantVectorOperator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, expressionPosition posrange.PositionRange) types.InstantVectorOperator {
func unaryNegationOfInstantVectorOperatorFactory(inner types.InstantVectorOperator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) types.InstantVectorOperator {
f := functions.FunctionOverInstantVectorDefinition{
SeriesDataFunc: functions.UnaryNegation,
SeriesMetadataFunction: functions.DropSeriesName,
}

o := functions.NewFunctionOverInstantVector(inner, nil, memoryConsumptionTracker, f, expressionPosition)
o := functions.NewFunctionOverInstantVector(inner, nil, memoryConsumptionTracker, f, expressionPosition, timeRange)
return operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker)
}
10 changes: 5 additions & 5 deletions pkg/streamingpromql/operators/functions/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ var DropSeriesName = SeriesMetadataFunctionDefinition{
}

// InstantVectorSeriesFunction is a function that takes in an instant vector and produces an instant vector.
type InstantVectorSeriesFunction func(seriesData types.InstantVectorSeriesData, scalarArgsData []types.ScalarData, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error)
type InstantVectorSeriesFunction func(seriesData types.InstantVectorSeriesData, scalarArgsData []types.ScalarData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error)

// floatTransformationFunc is not needed elsewhere, so it is not exported yet
func floatTransformationFunc(transform func(f float64) float64) InstantVectorSeriesFunction {
return func(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) {
return func(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ types.QueryTimeRange, _ *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) {
for i := range seriesData.Floats {
seriesData.Floats[i].F = transform(seriesData.Floats[i].F)
}
Expand All @@ -39,16 +39,16 @@ func floatTransformationFunc(transform func(f float64) float64) InstantVectorSer

func FloatTransformationDropHistogramsFunc(transform func(f float64) float64) InstantVectorSeriesFunction {
ft := floatTransformationFunc(transform)
return func(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) {
return func(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) {
// Functions that do not explicitly mention native histograms in their documentation will ignore histogram samples.
// https://prometheus.io/docs/prometheus/latest/querying/functions
types.HPointSlicePool.Put(seriesData.Histograms, memoryConsumptionTracker)
seriesData.Histograms = nil
return ft(seriesData, nil, memoryConsumptionTracker)
return ft(seriesData, nil, timeRange, memoryConsumptionTracker)
}
}

func PassthroughData(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) {
func PassthroughData(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ types.QueryTimeRange, _ *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) {
return seriesData, nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/streamingpromql/operators/functions/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestFloatTransformationFunc(t *testing.T) {
},
}

modifiedSeriesData, err := transformFunc(seriesData, nil, memoryConsumptionTracker)
modifiedSeriesData, err := transformFunc(seriesData, nil, types.QueryTimeRange{}, memoryConsumptionTracker)
require.NoError(t, err)
require.Equal(t, expected, modifiedSeriesData)
require.Equal(t, types.FPointSize*2+types.HPointSize*1, memoryConsumptionTracker.CurrentEstimatedMemoryConsumptionBytes)
Expand Down Expand Up @@ -88,7 +88,7 @@ func TestFloatTransformationDropHistogramsFunc(t *testing.T) {
Histograms: nil, // Histograms should be dropped
}

modifiedSeriesData, err := transformFunc(seriesData, nil, memoryConsumptionTracker)
modifiedSeriesData, err := transformFunc(seriesData, nil, types.QueryTimeRange{}, memoryConsumptionTracker)
require.NoError(t, err)
require.Equal(t, expected, modifiedSeriesData)
// We expect the dropped histogram to be returned to the pool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type FunctionOverInstantVector struct {
scalarArgsData []types.ScalarData

expressionPosition posrange.PositionRange
timeRange types.QueryTimeRange
}

var _ types.InstantVectorOperator = &FunctionOverInstantVector{}
Expand All @@ -42,6 +43,7 @@ func NewFunctionOverInstantVector(
memoryConsumptionTracker *limiting.MemoryConsumptionTracker,
f FunctionOverInstantVectorDefinition,
expressionPosition posrange.PositionRange,
timeRange types.QueryTimeRange,
) *FunctionOverInstantVector {
return &FunctionOverInstantVector{
Inner: inner,
Expand All @@ -50,6 +52,7 @@ func NewFunctionOverInstantVector(
Func: f,

expressionPosition: expressionPosition,
timeRange: timeRange,
}
}

Expand Down Expand Up @@ -99,7 +102,7 @@ func (m *FunctionOverInstantVector) NextSeries(ctx context.Context) (types.Insta
return types.InstantVectorSeriesData{}, err
}

return m.Func.SeriesDataFunc(series, m.scalarArgsData, m.MemoryConsumptionTracker)
return m.Func.SeriesDataFunc(series, m.scalarArgsData, m.timeRange, m.MemoryConsumptionTracker)
}

func (m *FunctionOverInstantVector) Close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestFunctionOverInstantVector(t *testing.T) {
}

seriesDataFuncCalledTimes := 0
mustBeCalledSeriesData := func(types.InstantVectorSeriesData, []types.ScalarData, *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) {
mustBeCalledSeriesData := func(types.InstantVectorSeriesData, []types.ScalarData, types.QueryTimeRange, *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) {
seriesDataFuncCalledTimes++
return types.InstantVectorSeriesData{}, nil
}
Expand Down Expand Up @@ -85,7 +85,7 @@ func TestFunctionOverInstantVectorWithScalarArgs(t *testing.T) {
}

seriesDataFuncCalledTimes := 0
mustBeCalledSeriesData := func(_ types.InstantVectorSeriesData, scalarArgs []types.ScalarData, _ *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) {
mustBeCalledSeriesData := func(_ types.InstantVectorSeriesData, scalarArgs []types.ScalarData, _ types.QueryTimeRange, _ *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) {
seriesDataFuncCalledTimes++
// Verify that the scalar arguments are correctly passed and in the order we expect
require.Equal(t, 2, len(scalarArgs))
Expand Down
Loading

0 comments on commit 37d5e00

Please sign in to comment.