Skip to content

Commit

Permalink
Pass inner expr arg for absent
Browse files Browse the repository at this point in the history
Signed-off-by: Jon Kartago Lamida <[email protected]>
  • Loading branch information
lamida committed Jan 27, 2025
1 parent 13b80f9 commit 770326e
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 54 deletions.
33 changes: 18 additions & 15 deletions pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type InstantVectorFunctionOperatorFactory func(
annotations *annotations.Annotations,
expressionPosition posrange.PositionRange,
timeRange types.QueryTimeRange,
innerExpressions parser.Expressions,
) (types.InstantVectorOperator, error)

type ScalarFunctionOperatorFactory func(
Expand All @@ -40,7 +41,7 @@ type ScalarFunctionOperatorFactory func(
// - name: The name of the function
// - f: The function implementation
func SingleInputVectorFunctionOperatorFactory(name string, f functions.FunctionOverInstantVectorDefinition) InstantVectorFunctionOperatorFactory {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
if len(args) != 1 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", name, len(args))
Expand Down Expand Up @@ -77,19 +78,21 @@ func InstantVectorTransformationFunctionOperatorFactory(name string, seriesDataF
return SingleInputVectorFunctionOperatorFactory(name, f)
}

func AbsentFunctionOperatorFactory(name string, innerExpr parser.Expr) InstantVectorFunctionOperatorFactory {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
func AbsentFunctionOperatorFactory() InstantVectorFunctionOperatorFactory {
functionName := "absent"
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, innerExpressions parser.Expressions) (types.InstantVectorOperator, error) {
if len(args) != 1 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", name, len(args))
return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", functionName, len(args))
}
inner, ok := args[0].(types.InstantVectorOperator)
if !ok {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected an instant vector argument for %s, got %T", name, args[0])
return nil, fmt.Errorf("expected an instant vector argument for %s, got %T", functionName, args[0])
}

var o types.InstantVectorOperator = operators.NewAbsent(inner, innerExpr, timeRange, expressionPosition, memoryConsumptionTracker)
// TODO check innerExpressions length too
var o types.InstantVectorOperator = operators.NewAbsent(inner, innerExpressions[0], timeRange, expressionPosition, memoryConsumptionTracker)

return o, nil
}
Expand Down Expand Up @@ -122,7 +125,7 @@ func FunctionOverRangeVectorOperatorFactory(
name string,
f functions.FunctionOverRangeVectorDefinition,
) InstantVectorFunctionOperatorFactory {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
if len(args) != 1 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", name, len(args))
Expand All @@ -144,7 +147,7 @@ func FunctionOverRangeVectorOperatorFactory(
}
}

func scalarToInstantVectorOperatorFactory(args []types.Operator, _ *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) {
func scalarToInstantVectorOperatorFactory(args []types.Operator, _ *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
if len(args) != 1 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 1 argument for vector, got %v", len(args))
Expand All @@ -159,7 +162,7 @@ func scalarToInstantVectorOperatorFactory(args []types.Operator, _ *limiting.Mem
return scalars.NewScalarToInstantVector(inner, expressionPosition), nil
}

func LabelReplaceFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
func LabelReplaceFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
if len(args) != 5 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 5 arguments for label_replace, got %v", len(args))
Expand Down Expand Up @@ -208,7 +211,7 @@ func LabelReplaceFunctionOperatorFactory(args []types.Operator, memoryConsumptio
return operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker), nil
}

func ClampFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
func ClampFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
if len(args) != 3 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 3 arguments for clamp, got %v", len(args))
Expand Down Expand Up @@ -241,7 +244,7 @@ func ClampFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracke
}

func ClampMinMaxFunctionOperatorFactory(functionName string, isMin bool) InstantVectorFunctionOperatorFactory {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
if len(args) != 2 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 2 arguments for %s, got %v", functionName, len(args))
Expand All @@ -268,7 +271,7 @@ func ClampMinMaxFunctionOperatorFactory(functionName string, isMin bool) Instant
}
}

func RoundFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
func RoundFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
if len(args) != 1 && len(args) != 2 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected 1 or 2 arguments for round, got %v", len(args))
Expand Down Expand Up @@ -299,7 +302,7 @@ func RoundFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracke
return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{toNearest}, memoryConsumptionTracker, f, expressionPosition, timeRange), nil
}

func HistogramQuantileFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
func HistogramQuantileFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
if len(args) != 2 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 2 arguments for histogram_quantile, got %v", len(args))
Expand All @@ -321,7 +324,7 @@ func HistogramQuantileFunctionOperatorFactory(args []types.Operator, memoryConsu
return operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker), nil
}

func HistogramFractionFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
func HistogramFractionFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
if len(args) != 3 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 3 arguments for histogram_fraction, got %v", len(args))
Expand Down Expand Up @@ -357,7 +360,7 @@ func HistogramFractionFunctionOperatorFactory(args []types.Operator, memoryConsu
var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOperatorFactory{
// Please keep this list sorted alphabetically.
"abs": InstantVectorTransformationFunctionOperatorFactory("abs", functions.Abs),
"absent": AbsentFunctionOperatorFactory("absent", nil),
"absent": AbsentFunctionOperatorFactory(),
"acos": InstantVectorTransformationFunctionOperatorFactory("acos", functions.Acos),
"acosh": InstantVectorTransformationFunctionOperatorFactory("acosh", functions.Acosh),
"asin": InstantVectorTransformationFunctionOperatorFactory("asin", functions.Asin),
Expand Down
2 changes: 1 addition & 1 deletion pkg/streamingpromql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func (q *Query) convertFunctionCallToInstantVectorOperator(e *parser.Call, timeR
args[i] = a
}

return factory(args, q.memoryConsumptionTracker, q.annotations, e.PosRange, timeRange)
return factory(args, q.memoryConsumptionTracker, q.annotations, e.PosRange, timeRange, e.Args)
}

func (q *Query) convertToRangeVectorOperator(expr parser.Expr, timeRange types.QueryTimeRange) (types.RangeVectorOperator, error) {
Expand Down
62 changes: 24 additions & 38 deletions pkg/streamingpromql/testdata/upstream/functions.test
Original file line number Diff line number Diff line change
Expand Up @@ -1313,21 +1313,17 @@ clear
eval instant at 50m absent(nonexistent)
{} 1

# Unsupported by streaming engine.
# eval instant at 50m absent(nonexistent{job="testjob", instance="testinstance", method=~".x"})
# {instance="testinstance", job="testjob"} 1
eval instant at 50m absent(nonexistent{job="testjob", instance="testinstance", method=~".x"})
{instance="testinstance", job="testjob"} 1

# Unsupported by streaming engine.
# eval instant at 50m absent(nonexistent{job="testjob",job="testjob2",foo="bar"})
# {foo="bar"} 1
eval instant at 50m absent(nonexistent{job="testjob",job="testjob2",foo="bar"})
{foo="bar"} 1

# Unsupported by streaming engine.
# eval instant at 50m absent(nonexistent{job="testjob",job="testjob2",job="three",foo="bar"})
# {foo="bar"} 1
eval instant at 50m absent(nonexistent{job="testjob",job="testjob2",job="three",foo="bar"})
{foo="bar"} 1

# Unsupported by streaming engine.
# eval instant at 50m absent(nonexistent{job="testjob",job=~"testjob2",foo="bar"})
# {foo="bar"} 1
eval instant at 50m absent(nonexistent{job="testjob",job=~"testjob2",foo="bar"})
{foo="bar"} 1

clear

Expand All @@ -1336,43 +1332,33 @@ load 5m
http_requests{job="api-server", instance="0", group="production"} 0+10x10
http_requests_histogram{job="api-server", instance="0", group="production"} {{schema:0 sum:1 count:1}}x11

# Unsupported by streaming engine.
# eval instant at 50m absent(http_requests)
eval instant at 50m absent(http_requests)

# Unsupported by streaming engine.
# eval instant at 50m absent(sum(http_requests))
eval instant at 50m absent(sum(http_requests))

# Unsupported by streaming engine.
# eval instant at 50m absent(http_requests_histogram)
eval instant at 50m absent(http_requests_histogram)

# Unsupported by streaming engine.
# eval instant at 50m absent(sum(http_requests_histogram))
eval instant at 50m absent(sum(http_requests_histogram))

clear

# Unsupported by streaming engine.
# eval instant at 50m absent(sum(nonexistent{job="testjob", instance="testinstance"}))
# {} 1
eval instant at 50m absent(sum(nonexistent{job="testjob", instance="testinstance"}))
{} 1

# Unsupported by streaming engine.
# eval instant at 50m absent(max(nonexistant))
# {} 1
eval instant at 50m absent(max(nonexistant))
{} 1

# Unsupported by streaming engine.
# eval instant at 50m absent(nonexistant > 1)
# {} 1
eval instant at 50m absent(nonexistant > 1)
{} 1

# Unsupported by streaming engine.
# eval instant at 50m absent(a + b)
# {} 1
eval instant at 50m absent(a + b)
{} 1

# Unsupported by streaming engine.
# eval instant at 50m absent(a and b)
# {} 1
eval instant at 50m absent(a and b)
{} 1

# Unsupported by streaming engine.
# eval instant at 50m absent(rate(nonexistant[5m]))
# {} 1
eval instant at 50m absent(rate(nonexistant[5m]))
{} 1

clear

Expand Down

0 comments on commit 770326e

Please sign in to comment.