Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQE Absent function #10523

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
38 changes: 29 additions & 9 deletions pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math"

"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/promql/parser/posrange"
"github.com/prometheus/prometheus/util/annotations"

Expand All @@ -22,6 +23,7 @@ type InstantVectorFunctionOperatorFactory func(
annotations *annotations.Annotations,
expressionPosition posrange.PositionRange,
timeRange types.QueryTimeRange,
argExpressions parser.Expressions,
) (types.InstantVectorOperator, error)

type ScalarFunctionOperatorFactory func(
Expand All @@ -39,7 +41,7 @@ type ScalarFunctionOperatorFactory func(
// - name: The name of the function
// - f: The function implementation
func SingleInputVectorFunctionOperatorFactory(name string, f functions.FunctionOverInstantVectorDefinition) InstantVectorFunctionOperatorFactory {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
if len(args) != 1 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", name, len(args))
Expand Down Expand Up @@ -76,6 +78,23 @@ func InstantVectorTransformationFunctionOperatorFactory(name string, seriesDataF
return SingleInputVectorFunctionOperatorFactory(name, f)
}

func AbsentFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, innerExpressions parser.Expressions) (types.InstantVectorOperator, error) {
functionName := "absent"
if len(args) != 1 && len(innerExpressions) != 1 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", functionName, len(args))
}
inner, ok := args[0].(types.InstantVectorOperator)
if !ok {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected an instant vector argument for %s, got %T", functionName, args[0])
}

var o types.InstantVectorOperator = functions.NewAbsent(inner, innerExpressions[0], timeRange, expressionPosition, memoryConsumptionTracker)

return o, nil
}

// InstantVectorLabelManipulationFunctionOperatorFactory creates an InstantVectorFunctionOperator for functions
// that have exactly 1 argument (v instant-vector), and need to manipulate the labels of
// each series without manipulating the returned samples.
Expand Down Expand Up @@ -103,7 +122,7 @@ func FunctionOverRangeVectorOperatorFactory(
name string,
f functions.FunctionOverRangeVectorDefinition,
) InstantVectorFunctionOperatorFactory {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
if len(args) != 1 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", name, len(args))
Expand All @@ -125,7 +144,7 @@ func FunctionOverRangeVectorOperatorFactory(
}
}

func scalarToInstantVectorOperatorFactory(args []types.Operator, _ *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) {
func scalarToInstantVectorOperatorFactory(args []types.Operator, _ *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
if len(args) != 1 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 1 argument for vector, got %v", len(args))
Expand All @@ -140,7 +159,7 @@ func scalarToInstantVectorOperatorFactory(args []types.Operator, _ *limiting.Mem
return scalars.NewScalarToInstantVector(inner, expressionPosition), nil
}

func LabelReplaceFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
func LabelReplaceFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
if len(args) != 5 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 5 arguments for label_replace, got %v", len(args))
Expand Down Expand Up @@ -189,7 +208,7 @@ func LabelReplaceFunctionOperatorFactory(args []types.Operator, memoryConsumptio
return operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker), nil
}

func ClampFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
func ClampFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
if len(args) != 3 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 3 arguments for clamp, got %v", len(args))
Expand Down Expand Up @@ -222,7 +241,7 @@ func ClampFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracke
}

func ClampMinMaxFunctionOperatorFactory(functionName string, isMin bool) InstantVectorFunctionOperatorFactory {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
if len(args) != 2 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 2 arguments for %s, got %v", functionName, len(args))
Expand All @@ -249,7 +268,7 @@ func ClampMinMaxFunctionOperatorFactory(functionName string, isMin bool) Instant
}
}

func RoundFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
func RoundFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
if len(args) != 1 && len(args) != 2 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected 1 or 2 arguments for round, got %v", len(args))
Expand Down Expand Up @@ -280,7 +299,7 @@ func RoundFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracke
return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{toNearest}, memoryConsumptionTracker, f, expressionPosition, timeRange), nil
}

func HistogramQuantileFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
func HistogramQuantileFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
if len(args) != 2 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 2 arguments for histogram_quantile, got %v", len(args))
Expand All @@ -302,7 +321,7 @@ func HistogramQuantileFunctionOperatorFactory(args []types.Operator, memoryConsu
return operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker), nil
}

func HistogramFractionFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
func HistogramFractionFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
if len(args) != 3 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 3 arguments for histogram_fraction, got %v", len(args))
Expand Down Expand Up @@ -338,6 +357,7 @@ func HistogramFractionFunctionOperatorFactory(args []types.Operator, memoryConsu
var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOperatorFactory{
// Please keep this list sorted alphabetically.
"abs": InstantVectorTransformationFunctionOperatorFactory("abs", functions.Abs),
"absent": AbsentFunctionOperatorFactory,
"acos": InstantVectorTransformationFunctionOperatorFactory("acos", functions.Acos),
"acosh": InstantVectorTransformationFunctionOperatorFactory("acosh", functions.Acosh),
"asin": InstantVectorTransformationFunctionOperatorFactory("asin", functions.Asin),
Expand Down
127 changes: 127 additions & 0 deletions pkg/streamingpromql/operators/functions/absent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/functions.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors.

package functions

import (
"context"
"errors"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/promql/parser/posrange"

"github.com/grafana/mimir/pkg/streamingpromql/limiting"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

// Absent is an operator that implements the absent() function.
type Absent struct {
timeRange types.QueryTimeRange
innerExpr parser.Expr
inner types.InstantVectorOperator
expressionPosition posrange.PositionRange
absentCount int
memoryConsumptionTracker *limiting.MemoryConsumptionTracker
}

var _ types.InstantVectorOperator = &Absent{}

// NewAbsent creates a new Absent.
func NewAbsent(inner types.InstantVectorOperator, innerExpr parser.Expr, timeRange types.QueryTimeRange, expressionPosition posrange.PositionRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) *Absent {
return &Absent{
timeRange: timeRange,
inner: inner,
innerExpr: innerExpr,
expressionPosition: expressionPosition,
memoryConsumptionTracker: memoryConsumptionTracker,
}
}

func (s *Absent) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) {
innerMetadata, err := s.inner.SeriesMetadata(ctx)
if err != nil {
return nil, err
}

defer types.PutSeriesMetadataSlice(innerMetadata)

if innerMetadata == nil {
s.absentCount++
}

metadata := types.GetSeriesMetadataSlice(s.absentCount)
for range s.absentCount {
metadata = append(metadata, types.SeriesMetadata{
Labels: createLabelsForAbsentFunction(s.innerExpr),
})
}
return metadata, nil
}

func (s *Absent) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) {
output := types.InstantVectorSeriesData{}
var err error
output.Floats, err = types.FPointSlicePool.Get(s.absentCount, s.memoryConsumptionTracker)
if err != nil {
return output, err
}

for range s.absentCount {
innerResult, err := s.inner.NextSeries(ctx)
types.PutInstantVectorSeriesData(innerResult, s.memoryConsumptionTracker)

if err != nil && errors.Is(err, types.EOS) {
output.Floats = append(output.Floats, promql.FPoint{T: s.timeRange.StartT + s.timeRange.IntervalMilliseconds, F: 1})
} else {
return types.InstantVectorSeriesData{}, nil
}
}
return output, nil
}

func (s *Absent) ExpressionPosition() posrange.PositionRange {
return s.expressionPosition
}

func (s *Absent) Close() {
s.inner.Close()
}

// createLabelsForAbsentFunction returns the labels that are uniquely and exactly matched
// in a given expression. It is used in the absent functions.
// This function is copied from Prometheus
func createLabelsForAbsentFunction(expr parser.Expr) labels.Labels {
b := labels.NewBuilder(labels.EmptyLabels())

var lm []*labels.Matcher
switch n := expr.(type) {
case *parser.VectorSelector:
lm = n.LabelMatchers
case *parser.MatrixSelector:
lm = n.VectorSelector.(*parser.VectorSelector).LabelMatchers
default:
return labels.EmptyLabels()
}

// The 'has' map implements backwards-compatibility for historic behaviour:
// e.g. in `absent(x{job="a",job="b",foo="bar"})` then `job` is removed from the output.
// Note this gives arguably wrong behaviour for `absent(x{job="a",job="a",foo="bar"})`.
has := make(map[string]bool, len(lm))
for _, ma := range lm {
if ma.Name == labels.MetricName {
continue
}
if ma.Type == labels.MatchEqual && !has[ma.Name] {
b.Set(ma.Name, ma.Value)
has[ma.Name] = true
} else {
b.Del(ma.Name)
}
}

return b.Labels()
}
2 changes: 1 addition & 1 deletion pkg/streamingpromql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func (q *Query) convertFunctionCallToInstantVectorOperator(e *parser.Call, timeR
args[i] = a
}

return factory(args, q.memoryConsumptionTracker, q.annotations, e.PosRange, timeRange)
return factory(args, q.memoryConsumptionTracker, q.annotations, e.PosRange, timeRange, e.Args)
}

func (q *Query) convertToRangeVectorOperator(expr parser.Expr, timeRange types.QueryTimeRange) (types.RangeVectorOperator, error) {
Expand Down
67 changes: 26 additions & 41 deletions pkg/streamingpromql/testdata/upstream/functions.test
Original file line number Diff line number Diff line change
Expand Up @@ -1310,25 +1310,20 @@ eval instant at 1m count_over_time({__name__=~"data(_histogram)?"}[2m])
clear

# Test for absent()
# Unsupported by streaming engine.
# eval instant at 50m absent(nonexistent)
# {} 1
eval instant at 50m absent(nonexistent)
{} 1

# Unsupported by streaming engine.
# eval instant at 50m absent(nonexistent{job="testjob", instance="testinstance", method=~".x"})
# {instance="testinstance", job="testjob"} 1
eval instant at 50m absent(nonexistent{job="testjob", instance="testinstance", method=~".x"})
{instance="testinstance", job="testjob"} 1

# Unsupported by streaming engine.
# eval instant at 50m absent(nonexistent{job="testjob",job="testjob2",foo="bar"})
# {foo="bar"} 1
eval instant at 50m absent(nonexistent{job="testjob",job="testjob2",foo="bar"})
{foo="bar"} 1

# Unsupported by streaming engine.
# eval instant at 50m absent(nonexistent{job="testjob",job="testjob2",job="three",foo="bar"})
# {foo="bar"} 1
eval instant at 50m absent(nonexistent{job="testjob",job="testjob2",job="three",foo="bar"})
{foo="bar"} 1

# Unsupported by streaming engine.
# eval instant at 50m absent(nonexistent{job="testjob",job=~"testjob2",foo="bar"})
# {foo="bar"} 1
eval instant at 50m absent(nonexistent{job="testjob",job=~"testjob2",foo="bar"})
{foo="bar"} 1

clear

Expand All @@ -1337,43 +1332,33 @@ load 5m
http_requests{job="api-server", instance="0", group="production"} 0+10x10
http_requests_histogram{job="api-server", instance="0", group="production"} {{schema:0 sum:1 count:1}}x11

# Unsupported by streaming engine.
# eval instant at 50m absent(http_requests)
eval instant at 50m absent(http_requests)

# Unsupported by streaming engine.
# eval instant at 50m absent(sum(http_requests))
eval instant at 50m absent(sum(http_requests))

# Unsupported by streaming engine.
# eval instant at 50m absent(http_requests_histogram)
eval instant at 50m absent(http_requests_histogram)

# Unsupported by streaming engine.
# eval instant at 50m absent(sum(http_requests_histogram))
eval instant at 50m absent(sum(http_requests_histogram))

clear

# Unsupported by streaming engine.
# eval instant at 50m absent(sum(nonexistent{job="testjob", instance="testinstance"}))
# {} 1
eval instant at 50m absent(sum(nonexistent{job="testjob", instance="testinstance"}))
{} 1

# Unsupported by streaming engine.
# eval instant at 50m absent(max(nonexistant))
# {} 1
eval instant at 50m absent(max(nonexistant))
{} 1

# Unsupported by streaming engine.
# eval instant at 50m absent(nonexistant > 1)
# {} 1
eval instant at 50m absent(nonexistant > 1)
{} 1

# Unsupported by streaming engine.
# eval instant at 50m absent(a + b)
# {} 1
eval instant at 50m absent(a + b)
{} 1

# Unsupported by streaming engine.
# eval instant at 50m absent(a and b)
# {} 1
eval instant at 50m absent(a and b)
{} 1

# Unsupported by streaming engine.
# eval instant at 50m absent(rate(nonexistant[5m]))
# {} 1
eval instant at 50m absent(rate(nonexistant[5m]))
{} 1

clear

Expand Down
Loading