From 448529a090bc879073a64c7bfffc83df299e3492 Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Wed, 22 Jan 2025 23:53:32 +0800 Subject: [PATCH 01/14] Add UniqueSeriesLabel SeriesMetadataFunctionDefinition Signed-off-by: Jon Kartago Lamida --- .../operators/functions/common.go | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/pkg/streamingpromql/operators/functions/common.go b/pkg/streamingpromql/operators/functions/common.go index 730aff89f26..0ed315585c2 100644 --- a/pkg/streamingpromql/operators/functions/common.go +++ b/pkg/streamingpromql/operators/functions/common.go @@ -4,6 +4,7 @@ package functions import ( "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" "github.com/grafana/mimir/pkg/streamingpromql/limiting" "github.com/grafana/mimir/pkg/streamingpromql/types" @@ -24,6 +25,35 @@ var DropSeriesName = SeriesMetadataFunctionDefinition{ NeedsSeriesDeduplication: true, } +// UniqueSeriesLabel set series labels to be unique. +var UniqueSeriesLabel = SeriesMetadataFunctionDefinition{ + Func: func(seriesMetadata []types.SeriesMetadata, _ *limiting.MemoryConsumptionTracker) ([]types.SeriesMetadata, error) { + for i := range seriesMetadata { + b := labels.NewBuilder(labels.EmptyLabels()) + lm := seriesMetadata[i].Labels + + // The 'has' map implements backwards-compatibility for historic behaviour: + // e.g. in `absent(x{job="a",job="b",foo="bar"})` then `job` is removed from the output. + // Note this gives arguably wrong behaviour for `absent(x{job="a",job="a",foo="bar"})`. + has := make(map[string]bool, len(lm)) + for _, ma := range lm { + if ma.Name == labels.MetricName { + continue + } + if !has[ma.Name] { + b.Set(ma.Name, ma.Value) + has[ma.Name] = true + } else { + b.Del(ma.Name) + } + } + seriesMetadata[i].Labels = b.Labels() + } + + return seriesMetadata, nil + }, +} + // InstantVectorSeriesFunction is a function that takes in an instant vector and produces an instant vector. type InstantVectorSeriesFunction func(seriesData types.InstantVectorSeriesData, scalarArgsData []types.ScalarData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) From 13b80f9419bd4f3f0ffdd7cb10df3b3849354c85 Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Thu, 23 Jan 2025 17:18:17 +0800 Subject: [PATCH 02/14] Implement absent function Signed-off-by: Jon Kartago Lamida --- pkg/streamingpromql/functions.go | 20 +++ pkg/streamingpromql/operators/absent.go | 120 ++++++++++++++++++ pkg/streamingpromql/query.go | 3 + .../testdata/upstream/functions.test | 5 +- 4 files changed, 145 insertions(+), 3 deletions(-) create mode 100644 pkg/streamingpromql/operators/absent.go diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index 8a5d031fb9c..a3f75d277f9 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -6,6 +6,7 @@ import ( "fmt" "math" + "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/promql/parser/posrange" "github.com/prometheus/prometheus/util/annotations" @@ -76,6 +77,24 @@ func InstantVectorTransformationFunctionOperatorFactory(name string, seriesDataF return SingleInputVectorFunctionOperatorFactory(name, f) } +func AbsentFunctionOperatorFactory(name string, innerExpr parser.Expr) InstantVectorFunctionOperatorFactory { + return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { + if len(args) != 1 { + // Should be caught by the PromQL parser, but we check here for safety. + return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", name, len(args)) + } + inner, ok := args[0].(types.InstantVectorOperator) + if !ok { + // Should be caught by the PromQL parser, but we check here for safety. + return nil, fmt.Errorf("expected an instant vector argument for %s, got %T", name, args[0]) + } + + var o types.InstantVectorOperator = operators.NewAbsent(inner, innerExpr, timeRange, expressionPosition, memoryConsumptionTracker) + + return o, nil + } +} + // InstantVectorLabelManipulationFunctionOperatorFactory creates an InstantVectorFunctionOperator for functions // that have exactly 1 argument (v instant-vector), and need to manipulate the labels of // each series without manipulating the returned samples. @@ -338,6 +357,7 @@ func HistogramFractionFunctionOperatorFactory(args []types.Operator, memoryConsu var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOperatorFactory{ // Please keep this list sorted alphabetically. "abs": InstantVectorTransformationFunctionOperatorFactory("abs", functions.Abs), + "absent": AbsentFunctionOperatorFactory("absent", nil), "acos": InstantVectorTransformationFunctionOperatorFactory("acos", functions.Acos), "acosh": InstantVectorTransformationFunctionOperatorFactory("acosh", functions.Acosh), "asin": InstantVectorTransformationFunctionOperatorFactory("asin", functions.Asin), diff --git a/pkg/streamingpromql/operators/absent.go b/pkg/streamingpromql/operators/absent.go new file mode 100644 index 00000000000..0a29bc9ede4 --- /dev/null +++ b/pkg/streamingpromql/operators/absent.go @@ -0,0 +1,120 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package operators + +import ( + "context" + "errors" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/promql/parser/posrange" + + "github.com/grafana/mimir/pkg/streamingpromql/limiting" + "github.com/grafana/mimir/pkg/streamingpromql/types" +) + +// AbsentOperator is an operator that implements the vector() function. +type AbsentOperator struct { + timeRange types.QueryTimeRange + innerExpr parser.Expr + inner types.InstantVectorOperator + expressionPosition posrange.PositionRange + absentCount int + memoryConsumptionTracker *limiting.MemoryConsumptionTracker +} + +var _ types.InstantVectorOperator = &AbsentOperator{} + +func NewAbsent(inner types.InstantVectorOperator, innerExpr parser.Expr, timeRange types.QueryTimeRange, expressionPosition posrange.PositionRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) *AbsentOperator { + return &AbsentOperator{ + timeRange: timeRange, + inner: inner, + innerExpr: innerExpr, + expressionPosition: expressionPosition, + memoryConsumptionTracker: memoryConsumptionTracker, + } +} + +func (s *AbsentOperator) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) { + innerMetadata, err := s.inner.SeriesMetadata(ctx) + if err != nil { + return nil, err + } + + defer types.PutSeriesMetadataSlice(innerMetadata) + + if innerMetadata == nil { + s.absentCount++ + } + + metadata := types.GetSeriesMetadataSlice(s.absentCount) + for range s.absentCount { + metadata = append(metadata, types.SeriesMetadata{ + Labels: createLabelsForAbsentFunction(s.innerExpr), + }) + } + return metadata, nil +} + +func (s *AbsentOperator) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) { + output := types.InstantVectorSeriesData{} + var err error + output.Floats, err = types.FPointSlicePool.Get(s.absentCount, s.memoryConsumptionTracker) + if err != nil { + return output, err + } + + for range s.absentCount { + innerResult, err := s.inner.NextSeries(ctx) + types.PutInstantVectorSeriesData(innerResult, s.memoryConsumptionTracker) + + if err != nil && errors.Is(err, types.EOS) { + output.Floats = append(output.Floats, promql.FPoint{T: s.timeRange.StartT + s.timeRange.IntervalMilliseconds, F: 1}) + } else { + return types.InstantVectorSeriesData{}, nil + } + } + return output, nil +} + +func (s *AbsentOperator) ExpressionPosition() posrange.PositionRange { + return s.expressionPosition +} + +func (s *AbsentOperator) Close() { + s.inner.Close() +} + +func createLabelsForAbsentFunction(expr parser.Expr) labels.Labels { + b := labels.NewBuilder(labels.EmptyLabels()) + + var lm []*labels.Matcher + switch n := expr.(type) { + case *parser.VectorSelector: + lm = n.LabelMatchers + case *parser.MatrixSelector: + lm = n.VectorSelector.(*parser.VectorSelector).LabelMatchers + default: + return labels.EmptyLabels() + } + + // The 'has' map implements backwards-compatibility for historic behaviour: + // e.g. in `absent(x{job="a",job="b",foo="bar"})` then `job` is removed from the output. + // Note this gives arguably wrong behaviour for `absent(x{job="a",job="a",foo="bar"})`. + has := make(map[string]bool, len(lm)) + for _, ma := range lm { + if ma.Name == labels.MetricName { + continue + } + if ma.Type == labels.MatchEqual && !has[ma.Name] { + b.Set(ma.Name, ma.Value) + has[ma.Name] = true + } else { + b.Del(ma.Name) + } + } + + return b.Labels() +} diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index 46ca9f6eefa..d37e2625577 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -196,6 +196,9 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr, timeRange types e.PosRange, ) case *parser.Call: + if e.Func.Name == "absent" { + fmt.Println("here") + } return q.convertFunctionCallToInstantVectorOperator(e, timeRange) case *parser.BinaryExpr: // We only need to handle three combinations of types here: diff --git a/pkg/streamingpromql/testdata/upstream/functions.test b/pkg/streamingpromql/testdata/upstream/functions.test index baca7082bc3..94578ed1563 100644 --- a/pkg/streamingpromql/testdata/upstream/functions.test +++ b/pkg/streamingpromql/testdata/upstream/functions.test @@ -1310,9 +1310,8 @@ eval instant at 1m count_over_time({__name__=~"data(_histogram)?"}[2m]) clear # Test for absent() -# Unsupported by streaming engine. -# eval instant at 50m absent(nonexistent) -# {} 1 +eval instant at 50m absent(nonexistent) + {} 1 # Unsupported by streaming engine. # eval instant at 50m absent(nonexistent{job="testjob", instance="testinstance", method=~".x"}) From 770326e14793d7e76c069126f152a009e31af79f Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Mon, 27 Jan 2025 22:35:19 +0800 Subject: [PATCH 03/14] Pass inner expr arg for absent Signed-off-by: Jon Kartago Lamida --- pkg/streamingpromql/functions.go | 33 +++++----- pkg/streamingpromql/query.go | 2 +- .../testdata/upstream/functions.test | 62 +++++++------------ 3 files changed, 43 insertions(+), 54 deletions(-) diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index a3f75d277f9..99f2045012f 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -23,6 +23,7 @@ type InstantVectorFunctionOperatorFactory func( annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, + innerExpressions parser.Expressions, ) (types.InstantVectorOperator, error) type ScalarFunctionOperatorFactory func( @@ -40,7 +41,7 @@ type ScalarFunctionOperatorFactory func( // - name: The name of the function // - f: The function implementation func SingleInputVectorFunctionOperatorFactory(name string, f functions.FunctionOverInstantVectorDefinition) InstantVectorFunctionOperatorFactory { - return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { + return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) { if len(args) != 1 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", name, len(args)) @@ -77,19 +78,21 @@ func InstantVectorTransformationFunctionOperatorFactory(name string, seriesDataF return SingleInputVectorFunctionOperatorFactory(name, f) } -func AbsentFunctionOperatorFactory(name string, innerExpr parser.Expr) InstantVectorFunctionOperatorFactory { - return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { +func AbsentFunctionOperatorFactory() InstantVectorFunctionOperatorFactory { + functionName := "absent" + return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, innerExpressions parser.Expressions) (types.InstantVectorOperator, error) { if len(args) != 1 { // Should be caught by the PromQL parser, but we check here for safety. - return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", name, len(args)) + return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", functionName, len(args)) } inner, ok := args[0].(types.InstantVectorOperator) if !ok { // Should be caught by the PromQL parser, but we check here for safety. - return nil, fmt.Errorf("expected an instant vector argument for %s, got %T", name, args[0]) + return nil, fmt.Errorf("expected an instant vector argument for %s, got %T", functionName, args[0]) } - var o types.InstantVectorOperator = operators.NewAbsent(inner, innerExpr, timeRange, expressionPosition, memoryConsumptionTracker) + // TODO check innerExpressions length too + var o types.InstantVectorOperator = operators.NewAbsent(inner, innerExpressions[0], timeRange, expressionPosition, memoryConsumptionTracker) return o, nil } @@ -122,7 +125,7 @@ func FunctionOverRangeVectorOperatorFactory( name string, f functions.FunctionOverRangeVectorDefinition, ) InstantVectorFunctionOperatorFactory { - return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { + return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) { if len(args) != 1 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", name, len(args)) @@ -144,7 +147,7 @@ func FunctionOverRangeVectorOperatorFactory( } } -func scalarToInstantVectorOperatorFactory(args []types.Operator, _ *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) { +func scalarToInstantVectorOperatorFactory(args []types.Operator, _ *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) { if len(args) != 1 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected exactly 1 argument for vector, got %v", len(args)) @@ -159,7 +162,7 @@ func scalarToInstantVectorOperatorFactory(args []types.Operator, _ *limiting.Mem return scalars.NewScalarToInstantVector(inner, expressionPosition), nil } -func LabelReplaceFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { +func LabelReplaceFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) { if len(args) != 5 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected exactly 5 arguments for label_replace, got %v", len(args)) @@ -208,7 +211,7 @@ func LabelReplaceFunctionOperatorFactory(args []types.Operator, memoryConsumptio return operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker), nil } -func ClampFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { +func ClampFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) { if len(args) != 3 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected exactly 3 arguments for clamp, got %v", len(args)) @@ -241,7 +244,7 @@ func ClampFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracke } func ClampMinMaxFunctionOperatorFactory(functionName string, isMin bool) InstantVectorFunctionOperatorFactory { - return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { + return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) { if len(args) != 2 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected exactly 2 arguments for %s, got %v", functionName, len(args)) @@ -268,7 +271,7 @@ func ClampMinMaxFunctionOperatorFactory(functionName string, isMin bool) Instant } } -func RoundFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { +func RoundFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) { if len(args) != 1 && len(args) != 2 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected 1 or 2 arguments for round, got %v", len(args)) @@ -299,7 +302,7 @@ func RoundFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracke return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{toNearest}, memoryConsumptionTracker, f, expressionPosition, timeRange), nil } -func HistogramQuantileFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { +func HistogramQuantileFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) { if len(args) != 2 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected exactly 2 arguments for histogram_quantile, got %v", len(args)) @@ -321,7 +324,7 @@ func HistogramQuantileFunctionOperatorFactory(args []types.Operator, memoryConsu return operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker), nil } -func HistogramFractionFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { +func HistogramFractionFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) { if len(args) != 3 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected exactly 3 arguments for histogram_fraction, got %v", len(args)) @@ -357,7 +360,7 @@ func HistogramFractionFunctionOperatorFactory(args []types.Operator, memoryConsu var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOperatorFactory{ // Please keep this list sorted alphabetically. "abs": InstantVectorTransformationFunctionOperatorFactory("abs", functions.Abs), - "absent": AbsentFunctionOperatorFactory("absent", nil), + "absent": AbsentFunctionOperatorFactory(), "acos": InstantVectorTransformationFunctionOperatorFactory("acos", functions.Acos), "acosh": InstantVectorTransformationFunctionOperatorFactory("acosh", functions.Acosh), "asin": InstantVectorTransformationFunctionOperatorFactory("asin", functions.Asin), diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index d37e2625577..36ff0514fd9 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -336,7 +336,7 @@ func (q *Query) convertFunctionCallToInstantVectorOperator(e *parser.Call, timeR args[i] = a } - return factory(args, q.memoryConsumptionTracker, q.annotations, e.PosRange, timeRange) + return factory(args, q.memoryConsumptionTracker, q.annotations, e.PosRange, timeRange, e.Args) } func (q *Query) convertToRangeVectorOperator(expr parser.Expr, timeRange types.QueryTimeRange) (types.RangeVectorOperator, error) { diff --git a/pkg/streamingpromql/testdata/upstream/functions.test b/pkg/streamingpromql/testdata/upstream/functions.test index 94578ed1563..8ac2b0a7236 100644 --- a/pkg/streamingpromql/testdata/upstream/functions.test +++ b/pkg/streamingpromql/testdata/upstream/functions.test @@ -1313,21 +1313,17 @@ clear eval instant at 50m absent(nonexistent) {} 1 -# Unsupported by streaming engine. -# eval instant at 50m absent(nonexistent{job="testjob", instance="testinstance", method=~".x"}) -# {instance="testinstance", job="testjob"} 1 +eval instant at 50m absent(nonexistent{job="testjob", instance="testinstance", method=~".x"}) + {instance="testinstance", job="testjob"} 1 -# Unsupported by streaming engine. -# eval instant at 50m absent(nonexistent{job="testjob",job="testjob2",foo="bar"}) -# {foo="bar"} 1 +eval instant at 50m absent(nonexistent{job="testjob",job="testjob2",foo="bar"}) + {foo="bar"} 1 -# Unsupported by streaming engine. -# eval instant at 50m absent(nonexistent{job="testjob",job="testjob2",job="three",foo="bar"}) -# {foo="bar"} 1 +eval instant at 50m absent(nonexistent{job="testjob",job="testjob2",job="three",foo="bar"}) + {foo="bar"} 1 -# Unsupported by streaming engine. -# eval instant at 50m absent(nonexistent{job="testjob",job=~"testjob2",foo="bar"}) -# {foo="bar"} 1 +eval instant at 50m absent(nonexistent{job="testjob",job=~"testjob2",foo="bar"}) + {foo="bar"} 1 clear @@ -1336,43 +1332,33 @@ load 5m http_requests{job="api-server", instance="0", group="production"} 0+10x10 http_requests_histogram{job="api-server", instance="0", group="production"} {{schema:0 sum:1 count:1}}x11 -# Unsupported by streaming engine. -# eval instant at 50m absent(http_requests) +eval instant at 50m absent(http_requests) -# Unsupported by streaming engine. -# eval instant at 50m absent(sum(http_requests)) +eval instant at 50m absent(sum(http_requests)) -# Unsupported by streaming engine. -# eval instant at 50m absent(http_requests_histogram) +eval instant at 50m absent(http_requests_histogram) -# Unsupported by streaming engine. -# eval instant at 50m absent(sum(http_requests_histogram)) +eval instant at 50m absent(sum(http_requests_histogram)) clear -# Unsupported by streaming engine. -# eval instant at 50m absent(sum(nonexistent{job="testjob", instance="testinstance"})) -# {} 1 +eval instant at 50m absent(sum(nonexistent{job="testjob", instance="testinstance"})) + {} 1 -# Unsupported by streaming engine. -# eval instant at 50m absent(max(nonexistant)) -# {} 1 +eval instant at 50m absent(max(nonexistant)) + {} 1 -# Unsupported by streaming engine. -# eval instant at 50m absent(nonexistant > 1) -# {} 1 +eval instant at 50m absent(nonexistant > 1) + {} 1 -# Unsupported by streaming engine. -# eval instant at 50m absent(a + b) -# {} 1 +eval instant at 50m absent(a + b) + {} 1 -# Unsupported by streaming engine. -# eval instant at 50m absent(a and b) -# {} 1 +eval instant at 50m absent(a and b) + {} 1 -# Unsupported by streaming engine. -# eval instant at 50m absent(rate(nonexistant[5m])) -# {} 1 +eval instant at 50m absent(rate(nonexistant[5m])) + {} 1 clear From 67a19eda13cc2cf61db191dd1f0b72585bcf7f06 Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Mon, 27 Jan 2025 22:37:53 +0800 Subject: [PATCH 04/14] Remove println Signed-off-by: Jon Kartago Lamida --- pkg/streamingpromql/query.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index 36ff0514fd9..9b48c51b9dc 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -196,9 +196,6 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr, timeRange types e.PosRange, ) case *parser.Call: - if e.Func.Name == "absent" { - fmt.Println("here") - } return q.convertFunctionCallToInstantVectorOperator(e, timeRange) case *parser.BinaryExpr: // We only need to handle three combinations of types here: From bce8f0d8d784e95a7658cbf894f74826e4fe76a1 Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Mon, 27 Jan 2025 22:42:19 +0800 Subject: [PATCH 05/14] Simplify factory function Signed-off-by: Jon Kartago Lamida --- pkg/streamingpromql/functions.go | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index 99f2045012f..8c277898958 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -78,24 +78,22 @@ func InstantVectorTransformationFunctionOperatorFactory(name string, seriesDataF return SingleInputVectorFunctionOperatorFactory(name, f) } -func AbsentFunctionOperatorFactory() InstantVectorFunctionOperatorFactory { +func AbsentFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, innerExpressions parser.Expressions) (types.InstantVectorOperator, error) { functionName := "absent" - return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, innerExpressions parser.Expressions) (types.InstantVectorOperator, error) { - if len(args) != 1 { - // Should be caught by the PromQL parser, but we check here for safety. - return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", functionName, len(args)) - } - inner, ok := args[0].(types.InstantVectorOperator) - if !ok { - // Should be caught by the PromQL parser, but we check here for safety. - return nil, fmt.Errorf("expected an instant vector argument for %s, got %T", functionName, args[0]) - } + if len(args) != 1 { + // Should be caught by the PromQL parser, but we check here for safety. + return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", functionName, len(args)) + } + inner, ok := args[0].(types.InstantVectorOperator) + if !ok { + // Should be caught by the PromQL parser, but we check here for safety. + return nil, fmt.Errorf("expected an instant vector argument for %s, got %T", functionName, args[0]) + } - // TODO check innerExpressions length too - var o types.InstantVectorOperator = operators.NewAbsent(inner, innerExpressions[0], timeRange, expressionPosition, memoryConsumptionTracker) + // TODO check innerExpressions length too + var o types.InstantVectorOperator = operators.NewAbsent(inner, innerExpressions[0], timeRange, expressionPosition, memoryConsumptionTracker) - return o, nil - } + return o, nil } // InstantVectorLabelManipulationFunctionOperatorFactory creates an InstantVectorFunctionOperator for functions @@ -360,7 +358,7 @@ func HistogramFractionFunctionOperatorFactory(args []types.Operator, memoryConsu var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOperatorFactory{ // Please keep this list sorted alphabetically. "abs": InstantVectorTransformationFunctionOperatorFactory("abs", functions.Abs), - "absent": AbsentFunctionOperatorFactory(), + "absent": AbsentFunctionOperatorFactory, "acos": InstantVectorTransformationFunctionOperatorFactory("acos", functions.Acos), "acosh": InstantVectorTransformationFunctionOperatorFactory("acosh", functions.Acosh), "asin": InstantVectorTransformationFunctionOperatorFactory("asin", functions.Asin), From f6e9090b281a2fafbb79b20875b7caea1178917d Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Mon, 27 Jan 2025 22:46:51 +0800 Subject: [PATCH 06/14] Add innerExpression length check too Signed-off-by: Jon Kartago Lamida --- pkg/streamingpromql/functions.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index 8c277898958..afc4baf42c9 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -80,7 +80,7 @@ func InstantVectorTransformationFunctionOperatorFactory(name string, seriesDataF func AbsentFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, innerExpressions parser.Expressions) (types.InstantVectorOperator, error) { functionName := "absent" - if len(args) != 1 { + if len(args) != 1 && len(innerExpressions) != 1 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", functionName, len(args)) } @@ -90,7 +90,6 @@ func AbsentFunctionOperatorFactory(args []types.Operator, memoryConsumptionTrack return nil, fmt.Errorf("expected an instant vector argument for %s, got %T", functionName, args[0]) } - // TODO check innerExpressions length too var o types.InstantVectorOperator = operators.NewAbsent(inner, innerExpressions[0], timeRange, expressionPosition, memoryConsumptionTracker) return o, nil From fcb20eb7ca43294ee0042322c882fd8dc783a6a0 Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Mon, 27 Jan 2025 22:50:21 +0800 Subject: [PATCH 07/14] Add comment on copied function Signed-off-by: Jon Kartago Lamida --- pkg/streamingpromql/operators/absent.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/streamingpromql/operators/absent.go b/pkg/streamingpromql/operators/absent.go index 0a29bc9ede4..195d76ffc80 100644 --- a/pkg/streamingpromql/operators/absent.go +++ b/pkg/streamingpromql/operators/absent.go @@ -1,4 +1,7 @@ // SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/functions.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Prometheus Authors. package operators @@ -87,6 +90,9 @@ func (s *AbsentOperator) Close() { s.inner.Close() } +// createLabelsForAbsentFunction returns the labels that are uniquely and exactly matched +// in a given expression. It is used in the absent functions. +// This function is copied from Prometheus func createLabelsForAbsentFunction(expr parser.Expr) labels.Labels { b := labels.NewBuilder(labels.EmptyLabels()) From b0f680ad8bc95b4d81a32f24bd485ac30ed13d47 Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Mon, 27 Jan 2025 22:52:18 +0800 Subject: [PATCH 08/14] Remove unused function Signed-off-by: Jon Kartago Lamida --- .../operators/functions/common.go | 30 ------------------- 1 file changed, 30 deletions(-) diff --git a/pkg/streamingpromql/operators/functions/common.go b/pkg/streamingpromql/operators/functions/common.go index 0ed315585c2..730aff89f26 100644 --- a/pkg/streamingpromql/operators/functions/common.go +++ b/pkg/streamingpromql/operators/functions/common.go @@ -4,7 +4,6 @@ package functions import ( "github.com/prometheus/prometheus/model/histogram" - "github.com/prometheus/prometheus/model/labels" "github.com/grafana/mimir/pkg/streamingpromql/limiting" "github.com/grafana/mimir/pkg/streamingpromql/types" @@ -25,35 +24,6 @@ var DropSeriesName = SeriesMetadataFunctionDefinition{ NeedsSeriesDeduplication: true, } -// UniqueSeriesLabel set series labels to be unique. -var UniqueSeriesLabel = SeriesMetadataFunctionDefinition{ - Func: func(seriesMetadata []types.SeriesMetadata, _ *limiting.MemoryConsumptionTracker) ([]types.SeriesMetadata, error) { - for i := range seriesMetadata { - b := labels.NewBuilder(labels.EmptyLabels()) - lm := seriesMetadata[i].Labels - - // The 'has' map implements backwards-compatibility for historic behaviour: - // e.g. in `absent(x{job="a",job="b",foo="bar"})` then `job` is removed from the output. - // Note this gives arguably wrong behaviour for `absent(x{job="a",job="a",foo="bar"})`. - has := make(map[string]bool, len(lm)) - for _, ma := range lm { - if ma.Name == labels.MetricName { - continue - } - if !has[ma.Name] { - b.Set(ma.Name, ma.Value) - has[ma.Name] = true - } else { - b.Del(ma.Name) - } - } - seriesMetadata[i].Labels = b.Labels() - } - - return seriesMetadata, nil - }, -} - // InstantVectorSeriesFunction is a function that takes in an instant vector and produces an instant vector. type InstantVectorSeriesFunction func(seriesData types.InstantVectorSeriesData, scalarArgsData []types.ScalarData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) From 5455467b2ae725e049921db02ae9b9bb4b9b10d0 Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Mon, 27 Jan 2025 22:55:12 +0800 Subject: [PATCH 09/14] Fix godoc Signed-off-by: Jon Kartago Lamida --- pkg/streamingpromql/operators/absent.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/streamingpromql/operators/absent.go b/pkg/streamingpromql/operators/absent.go index 195d76ffc80..3da10cab162 100644 --- a/pkg/streamingpromql/operators/absent.go +++ b/pkg/streamingpromql/operators/absent.go @@ -18,7 +18,7 @@ import ( "github.com/grafana/mimir/pkg/streamingpromql/types" ) -// AbsentOperator is an operator that implements the vector() function. +// AbsentOperator is an operator that implements the absent() function. type AbsentOperator struct { timeRange types.QueryTimeRange innerExpr parser.Expr @@ -30,6 +30,7 @@ type AbsentOperator struct { var _ types.InstantVectorOperator = &AbsentOperator{} +// NewAbsent creates a new AbsentOperator. func NewAbsent(inner types.InstantVectorOperator, innerExpr parser.Expr, timeRange types.QueryTimeRange, expressionPosition posrange.PositionRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) *AbsentOperator { return &AbsentOperator{ timeRange: timeRange, From 776e1477a28c9383cbcce99af5a8c5994d7d76eb Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Mon, 27 Jan 2025 23:22:48 +0800 Subject: [PATCH 10/14] Fix functions.test whitespace Signed-off-by: Jon Kartago Lamida --- .../testdata/upstream/functions.test | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/streamingpromql/testdata/upstream/functions.test b/pkg/streamingpromql/testdata/upstream/functions.test index 8ac2b0a7236..c965dd8e929 100644 --- a/pkg/streamingpromql/testdata/upstream/functions.test +++ b/pkg/streamingpromql/testdata/upstream/functions.test @@ -1311,7 +1311,7 @@ clear # Test for absent() eval instant at 50m absent(nonexistent) - {} 1 + {} 1 eval instant at 50m absent(nonexistent{job="testjob", instance="testinstance", method=~".x"}) {instance="testinstance", job="testjob"} 1 @@ -1320,10 +1320,10 @@ eval instant at 50m absent(nonexistent{job="testjob",job="testjob2",foo="bar"}) {foo="bar"} 1 eval instant at 50m absent(nonexistent{job="testjob",job="testjob2",job="three",foo="bar"}) - {foo="bar"} 1 + {foo="bar"} 1 eval instant at 50m absent(nonexistent{job="testjob",job=~"testjob2",foo="bar"}) - {foo="bar"} 1 + {foo="bar"} 1 clear @@ -1343,22 +1343,22 @@ eval instant at 50m absent(sum(http_requests_histogram)) clear eval instant at 50m absent(sum(nonexistent{job="testjob", instance="testinstance"})) - {} 1 + {} 1 eval instant at 50m absent(max(nonexistant)) - {} 1 + {} 1 eval instant at 50m absent(nonexistant > 1) - {} 1 + {} 1 eval instant at 50m absent(a + b) - {} 1 + {} 1 eval instant at 50m absent(a and b) - {} 1 + {} 1 eval instant at 50m absent(rate(nonexistant[5m])) - {} 1 + {} 1 clear From 8c0f0c9dbc84ba81cd0d5e23e4bf9e43a5473c1a Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Tue, 28 Jan 2025 01:27:13 +0800 Subject: [PATCH 11/14] Rename constructor for consistency Signed-off-by: Jon Kartago Lamida --- pkg/streamingpromql/functions.go | 2 +- pkg/streamingpromql/operators/absent.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index afc4baf42c9..408e34f36e1 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -90,7 +90,7 @@ func AbsentFunctionOperatorFactory(args []types.Operator, memoryConsumptionTrack return nil, fmt.Errorf("expected an instant vector argument for %s, got %T", functionName, args[0]) } - var o types.InstantVectorOperator = operators.NewAbsent(inner, innerExpressions[0], timeRange, expressionPosition, memoryConsumptionTracker) + var o types.InstantVectorOperator = operators.NewAbsentOperator(inner, innerExpressions[0], timeRange, expressionPosition, memoryConsumptionTracker) return o, nil } diff --git a/pkg/streamingpromql/operators/absent.go b/pkg/streamingpromql/operators/absent.go index 3da10cab162..1c13a671f91 100644 --- a/pkg/streamingpromql/operators/absent.go +++ b/pkg/streamingpromql/operators/absent.go @@ -30,8 +30,8 @@ type AbsentOperator struct { var _ types.InstantVectorOperator = &AbsentOperator{} -// NewAbsent creates a new AbsentOperator. -func NewAbsent(inner types.InstantVectorOperator, innerExpr parser.Expr, timeRange types.QueryTimeRange, expressionPosition posrange.PositionRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) *AbsentOperator { +// NewAbsentOperator creates a new AbsentOperator. +func NewAbsentOperator(inner types.InstantVectorOperator, innerExpr parser.Expr, timeRange types.QueryTimeRange, expressionPosition posrange.PositionRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) *AbsentOperator { return &AbsentOperator{ timeRange: timeRange, inner: inner, From e784b9331ced641ddc5644784fb309c56c5ed708 Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Tue, 28 Jan 2025 17:46:10 +0800 Subject: [PATCH 12/14] Move absent to function package Signed-off-by: Jon Kartago Lamida --- pkg/streamingpromql/functions.go | 2 +- pkg/streamingpromql/operators/{ => functions}/absent.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename pkg/streamingpromql/operators/{ => functions}/absent.go (99%) diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index 408e34f36e1..537b2576bb9 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -90,7 +90,7 @@ func AbsentFunctionOperatorFactory(args []types.Operator, memoryConsumptionTrack return nil, fmt.Errorf("expected an instant vector argument for %s, got %T", functionName, args[0]) } - var o types.InstantVectorOperator = operators.NewAbsentOperator(inner, innerExpressions[0], timeRange, expressionPosition, memoryConsumptionTracker) + var o types.InstantVectorOperator = functions.NewAbsentOperator(inner, innerExpressions[0], timeRange, expressionPosition, memoryConsumptionTracker) return o, nil } diff --git a/pkg/streamingpromql/operators/absent.go b/pkg/streamingpromql/operators/functions/absent.go similarity index 99% rename from pkg/streamingpromql/operators/absent.go rename to pkg/streamingpromql/operators/functions/absent.go index 1c13a671f91..962e5475d84 100644 --- a/pkg/streamingpromql/operators/absent.go +++ b/pkg/streamingpromql/operators/functions/absent.go @@ -3,7 +3,7 @@ // Provenance-includes-license: Apache-2.0 // Provenance-includes-copyright: The Prometheus Authors. -package operators +package functions import ( "context" From 3b6cc486f9c67ecd58ee2199a7fe5c1df629f575 Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Tue, 28 Jan 2025 17:47:08 +0800 Subject: [PATCH 13/14] Rename AbsentOperator to just Absent Signed-off-by: Jon Kartago Lamida --- pkg/streamingpromql/functions.go | 2 +- .../operators/functions/absent.go | 20 +++++++++---------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index 537b2576bb9..1330c9ed4ae 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -90,7 +90,7 @@ func AbsentFunctionOperatorFactory(args []types.Operator, memoryConsumptionTrack return nil, fmt.Errorf("expected an instant vector argument for %s, got %T", functionName, args[0]) } - var o types.InstantVectorOperator = functions.NewAbsentOperator(inner, innerExpressions[0], timeRange, expressionPosition, memoryConsumptionTracker) + var o types.InstantVectorOperator = functions.NewAbsent(inner, innerExpressions[0], timeRange, expressionPosition, memoryConsumptionTracker) return o, nil } diff --git a/pkg/streamingpromql/operators/functions/absent.go b/pkg/streamingpromql/operators/functions/absent.go index 962e5475d84..7d5d9ba0b83 100644 --- a/pkg/streamingpromql/operators/functions/absent.go +++ b/pkg/streamingpromql/operators/functions/absent.go @@ -18,8 +18,8 @@ import ( "github.com/grafana/mimir/pkg/streamingpromql/types" ) -// AbsentOperator is an operator that implements the absent() function. -type AbsentOperator struct { +// Absent is an operator that implements the absent() function. +type Absent struct { timeRange types.QueryTimeRange innerExpr parser.Expr inner types.InstantVectorOperator @@ -28,11 +28,11 @@ type AbsentOperator struct { memoryConsumptionTracker *limiting.MemoryConsumptionTracker } -var _ types.InstantVectorOperator = &AbsentOperator{} +var _ types.InstantVectorOperator = &Absent{} -// NewAbsentOperator creates a new AbsentOperator. -func NewAbsentOperator(inner types.InstantVectorOperator, innerExpr parser.Expr, timeRange types.QueryTimeRange, expressionPosition posrange.PositionRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) *AbsentOperator { - return &AbsentOperator{ +// NewAbsent creates a new Absent. +func NewAbsent(inner types.InstantVectorOperator, innerExpr parser.Expr, timeRange types.QueryTimeRange, expressionPosition posrange.PositionRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) *Absent { + return &Absent{ timeRange: timeRange, inner: inner, innerExpr: innerExpr, @@ -41,7 +41,7 @@ func NewAbsentOperator(inner types.InstantVectorOperator, innerExpr parser.Expr, } } -func (s *AbsentOperator) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) { +func (s *Absent) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) { innerMetadata, err := s.inner.SeriesMetadata(ctx) if err != nil { return nil, err @@ -62,7 +62,7 @@ func (s *AbsentOperator) SeriesMetadata(ctx context.Context) ([]types.SeriesMeta return metadata, nil } -func (s *AbsentOperator) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) { +func (s *Absent) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) { output := types.InstantVectorSeriesData{} var err error output.Floats, err = types.FPointSlicePool.Get(s.absentCount, s.memoryConsumptionTracker) @@ -83,11 +83,11 @@ func (s *AbsentOperator) NextSeries(ctx context.Context) (types.InstantVectorSer return output, nil } -func (s *AbsentOperator) ExpressionPosition() posrange.PositionRange { +func (s *Absent) ExpressionPosition() posrange.PositionRange { return s.expressionPosition } -func (s *AbsentOperator) Close() { +func (s *Absent) Close() { s.inner.Close() } From 8cf2fae77733088300f177cef21a730c013417ab Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Tue, 28 Jan 2025 17:49:48 +0800 Subject: [PATCH 14/14] Rename to argExpressions Signed-off-by: Jon Kartago Lamida --- pkg/streamingpromql/functions.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index 1330c9ed4ae..128236202b4 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -23,7 +23,7 @@ type InstantVectorFunctionOperatorFactory func( annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, - innerExpressions parser.Expressions, + argExpressions parser.Expressions, ) (types.InstantVectorOperator, error) type ScalarFunctionOperatorFactory func(