diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index 8a5d031fb9..128236202b 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -6,6 +6,7 @@ import ( "fmt" "math" + "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/promql/parser/posrange" "github.com/prometheus/prometheus/util/annotations" @@ -22,6 +23,7 @@ type InstantVectorFunctionOperatorFactory func( annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, + argExpressions parser.Expressions, ) (types.InstantVectorOperator, error) type ScalarFunctionOperatorFactory func( @@ -39,7 +41,7 @@ type ScalarFunctionOperatorFactory func( // - name: The name of the function // - f: The function implementation func SingleInputVectorFunctionOperatorFactory(name string, f functions.FunctionOverInstantVectorDefinition) InstantVectorFunctionOperatorFactory { - return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { + return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) { if len(args) != 1 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", name, len(args)) @@ -76,6 +78,23 @@ func InstantVectorTransformationFunctionOperatorFactory(name string, seriesDataF return SingleInputVectorFunctionOperatorFactory(name, f) } +func AbsentFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, innerExpressions parser.Expressions) (types.InstantVectorOperator, error) { + functionName := "absent" + if len(args) != 1 && len(innerExpressions) != 1 { + // Should be caught by the PromQL parser, but we check here for safety. + return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", functionName, len(args)) + } + inner, ok := args[0].(types.InstantVectorOperator) + if !ok { + // Should be caught by the PromQL parser, but we check here for safety. + return nil, fmt.Errorf("expected an instant vector argument for %s, got %T", functionName, args[0]) + } + + var o types.InstantVectorOperator = functions.NewAbsent(inner, innerExpressions[0], timeRange, expressionPosition, memoryConsumptionTracker) + + return o, nil +} + // InstantVectorLabelManipulationFunctionOperatorFactory creates an InstantVectorFunctionOperator for functions // that have exactly 1 argument (v instant-vector), and need to manipulate the labels of // each series without manipulating the returned samples. @@ -103,7 +122,7 @@ func FunctionOverRangeVectorOperatorFactory( name string, f functions.FunctionOverRangeVectorDefinition, ) InstantVectorFunctionOperatorFactory { - return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { + return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) { if len(args) != 1 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", name, len(args)) @@ -125,7 +144,7 @@ func FunctionOverRangeVectorOperatorFactory( } } -func scalarToInstantVectorOperatorFactory(args []types.Operator, _ *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) { +func scalarToInstantVectorOperatorFactory(args []types.Operator, _ *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) { if len(args) != 1 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected exactly 1 argument for vector, got %v", len(args)) @@ -140,7 +159,7 @@ func scalarToInstantVectorOperatorFactory(args []types.Operator, _ *limiting.Mem return scalars.NewScalarToInstantVector(inner, expressionPosition), nil } -func LabelReplaceFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { +func LabelReplaceFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) { if len(args) != 5 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected exactly 5 arguments for label_replace, got %v", len(args)) @@ -189,7 +208,7 @@ func LabelReplaceFunctionOperatorFactory(args []types.Operator, memoryConsumptio return operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker), nil } -func ClampFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { +func ClampFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) { if len(args) != 3 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected exactly 3 arguments for clamp, got %v", len(args)) @@ -222,7 +241,7 @@ func ClampFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracke } func ClampMinMaxFunctionOperatorFactory(functionName string, isMin bool) InstantVectorFunctionOperatorFactory { - return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { + return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) { if len(args) != 2 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected exactly 2 arguments for %s, got %v", functionName, len(args)) @@ -249,7 +268,7 @@ func ClampMinMaxFunctionOperatorFactory(functionName string, isMin bool) Instant } } -func RoundFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { +func RoundFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) { if len(args) != 1 && len(args) != 2 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected 1 or 2 arguments for round, got %v", len(args)) @@ -280,7 +299,7 @@ func RoundFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracke return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{toNearest}, memoryConsumptionTracker, f, expressionPosition, timeRange), nil } -func HistogramQuantileFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { +func HistogramQuantileFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) { if len(args) != 2 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected exactly 2 arguments for histogram_quantile, got %v", len(args)) @@ -302,7 +321,7 @@ func HistogramQuantileFunctionOperatorFactory(args []types.Operator, memoryConsu return operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker), nil } -func HistogramFractionFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { +func HistogramFractionFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) { if len(args) != 3 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected exactly 3 arguments for histogram_fraction, got %v", len(args)) @@ -338,6 +357,7 @@ func HistogramFractionFunctionOperatorFactory(args []types.Operator, memoryConsu var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOperatorFactory{ // Please keep this list sorted alphabetically. "abs": InstantVectorTransformationFunctionOperatorFactory("abs", functions.Abs), + "absent": AbsentFunctionOperatorFactory, "acos": InstantVectorTransformationFunctionOperatorFactory("acos", functions.Acos), "acosh": InstantVectorTransformationFunctionOperatorFactory("acosh", functions.Acosh), "asin": InstantVectorTransformationFunctionOperatorFactory("asin", functions.Asin), diff --git a/pkg/streamingpromql/operators/functions/absent.go b/pkg/streamingpromql/operators/functions/absent.go new file mode 100644 index 0000000000..7d5d9ba0b8 --- /dev/null +++ b/pkg/streamingpromql/operators/functions/absent.go @@ -0,0 +1,127 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/functions.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Prometheus Authors. + +package functions + +import ( + "context" + "errors" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/promql/parser/posrange" + + "github.com/grafana/mimir/pkg/streamingpromql/limiting" + "github.com/grafana/mimir/pkg/streamingpromql/types" +) + +// Absent is an operator that implements the absent() function. +type Absent struct { + timeRange types.QueryTimeRange + innerExpr parser.Expr + inner types.InstantVectorOperator + expressionPosition posrange.PositionRange + absentCount int + memoryConsumptionTracker *limiting.MemoryConsumptionTracker +} + +var _ types.InstantVectorOperator = &Absent{} + +// NewAbsent creates a new Absent. +func NewAbsent(inner types.InstantVectorOperator, innerExpr parser.Expr, timeRange types.QueryTimeRange, expressionPosition posrange.PositionRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) *Absent { + return &Absent{ + timeRange: timeRange, + inner: inner, + innerExpr: innerExpr, + expressionPosition: expressionPosition, + memoryConsumptionTracker: memoryConsumptionTracker, + } +} + +func (s *Absent) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) { + innerMetadata, err := s.inner.SeriesMetadata(ctx) + if err != nil { + return nil, err + } + + defer types.PutSeriesMetadataSlice(innerMetadata) + + if innerMetadata == nil { + s.absentCount++ + } + + metadata := types.GetSeriesMetadataSlice(s.absentCount) + for range s.absentCount { + metadata = append(metadata, types.SeriesMetadata{ + Labels: createLabelsForAbsentFunction(s.innerExpr), + }) + } + return metadata, nil +} + +func (s *Absent) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) { + output := types.InstantVectorSeriesData{} + var err error + output.Floats, err = types.FPointSlicePool.Get(s.absentCount, s.memoryConsumptionTracker) + if err != nil { + return output, err + } + + for range s.absentCount { + innerResult, err := s.inner.NextSeries(ctx) + types.PutInstantVectorSeriesData(innerResult, s.memoryConsumptionTracker) + + if err != nil && errors.Is(err, types.EOS) { + output.Floats = append(output.Floats, promql.FPoint{T: s.timeRange.StartT + s.timeRange.IntervalMilliseconds, F: 1}) + } else { + return types.InstantVectorSeriesData{}, nil + } + } + return output, nil +} + +func (s *Absent) ExpressionPosition() posrange.PositionRange { + return s.expressionPosition +} + +func (s *Absent) Close() { + s.inner.Close() +} + +// createLabelsForAbsentFunction returns the labels that are uniquely and exactly matched +// in a given expression. It is used in the absent functions. +// This function is copied from Prometheus +func createLabelsForAbsentFunction(expr parser.Expr) labels.Labels { + b := labels.NewBuilder(labels.EmptyLabels()) + + var lm []*labels.Matcher + switch n := expr.(type) { + case *parser.VectorSelector: + lm = n.LabelMatchers + case *parser.MatrixSelector: + lm = n.VectorSelector.(*parser.VectorSelector).LabelMatchers + default: + return labels.EmptyLabels() + } + + // The 'has' map implements backwards-compatibility for historic behaviour: + // e.g. in `absent(x{job="a",job="b",foo="bar"})` then `job` is removed from the output. + // Note this gives arguably wrong behaviour for `absent(x{job="a",job="a",foo="bar"})`. + has := make(map[string]bool, len(lm)) + for _, ma := range lm { + if ma.Name == labels.MetricName { + continue + } + if ma.Type == labels.MatchEqual && !has[ma.Name] { + b.Set(ma.Name, ma.Value) + has[ma.Name] = true + } else { + b.Del(ma.Name) + } + } + + return b.Labels() +} diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index 46ca9f6eef..9b48c51b9d 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -333,7 +333,7 @@ func (q *Query) convertFunctionCallToInstantVectorOperator(e *parser.Call, timeR args[i] = a } - return factory(args, q.memoryConsumptionTracker, q.annotations, e.PosRange, timeRange) + return factory(args, q.memoryConsumptionTracker, q.annotations, e.PosRange, timeRange, e.Args) } func (q *Query) convertToRangeVectorOperator(expr parser.Expr, timeRange types.QueryTimeRange) (types.RangeVectorOperator, error) { diff --git a/pkg/streamingpromql/testdata/upstream/functions.test b/pkg/streamingpromql/testdata/upstream/functions.test index baca7082bc..c965dd8e92 100644 --- a/pkg/streamingpromql/testdata/upstream/functions.test +++ b/pkg/streamingpromql/testdata/upstream/functions.test @@ -1310,25 +1310,20 @@ eval instant at 1m count_over_time({__name__=~"data(_histogram)?"}[2m]) clear # Test for absent() -# Unsupported by streaming engine. -# eval instant at 50m absent(nonexistent) -# {} 1 +eval instant at 50m absent(nonexistent) + {} 1 -# Unsupported by streaming engine. -# eval instant at 50m absent(nonexistent{job="testjob", instance="testinstance", method=~".x"}) -# {instance="testinstance", job="testjob"} 1 +eval instant at 50m absent(nonexistent{job="testjob", instance="testinstance", method=~".x"}) + {instance="testinstance", job="testjob"} 1 -# Unsupported by streaming engine. -# eval instant at 50m absent(nonexistent{job="testjob",job="testjob2",foo="bar"}) -# {foo="bar"} 1 +eval instant at 50m absent(nonexistent{job="testjob",job="testjob2",foo="bar"}) + {foo="bar"} 1 -# Unsupported by streaming engine. -# eval instant at 50m absent(nonexistent{job="testjob",job="testjob2",job="three",foo="bar"}) -# {foo="bar"} 1 +eval instant at 50m absent(nonexistent{job="testjob",job="testjob2",job="three",foo="bar"}) + {foo="bar"} 1 -# Unsupported by streaming engine. -# eval instant at 50m absent(nonexistent{job="testjob",job=~"testjob2",foo="bar"}) -# {foo="bar"} 1 +eval instant at 50m absent(nonexistent{job="testjob",job=~"testjob2",foo="bar"}) + {foo="bar"} 1 clear @@ -1337,43 +1332,33 @@ load 5m http_requests{job="api-server", instance="0", group="production"} 0+10x10 http_requests_histogram{job="api-server", instance="0", group="production"} {{schema:0 sum:1 count:1}}x11 -# Unsupported by streaming engine. -# eval instant at 50m absent(http_requests) +eval instant at 50m absent(http_requests) -# Unsupported by streaming engine. -# eval instant at 50m absent(sum(http_requests)) +eval instant at 50m absent(sum(http_requests)) -# Unsupported by streaming engine. -# eval instant at 50m absent(http_requests_histogram) +eval instant at 50m absent(http_requests_histogram) -# Unsupported by streaming engine. -# eval instant at 50m absent(sum(http_requests_histogram)) +eval instant at 50m absent(sum(http_requests_histogram)) clear -# Unsupported by streaming engine. -# eval instant at 50m absent(sum(nonexistent{job="testjob", instance="testinstance"})) -# {} 1 +eval instant at 50m absent(sum(nonexistent{job="testjob", instance="testinstance"})) + {} 1 -# Unsupported by streaming engine. -# eval instant at 50m absent(max(nonexistant)) -# {} 1 +eval instant at 50m absent(max(nonexistant)) + {} 1 -# Unsupported by streaming engine. -# eval instant at 50m absent(nonexistant > 1) -# {} 1 +eval instant at 50m absent(nonexistant > 1) + {} 1 -# Unsupported by streaming engine. -# eval instant at 50m absent(a + b) -# {} 1 +eval instant at 50m absent(a + b) + {} 1 -# Unsupported by streaming engine. -# eval instant at 50m absent(a and b) -# {} 1 +eval instant at 50m absent(a and b) + {} 1 -# Unsupported by streaming engine. -# eval instant at 50m absent(rate(nonexistant[5m])) -# {} 1 +eval instant at 50m absent(rate(nonexistant[5m])) + {} 1 clear