Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mimir query engine: binary operations between two scalars #9277

Merged
merged 4 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* [CHANGE] Query-scheduler: Remove the experimental `-query-scheduler.use-multi-algorithm-query-queue` flag. The new multi-algorithm tree queue is always used for the scheduler. #9210
* [FEATURE] Alertmanager: Added `-alertmanager.log-parsing-label-matchers` to control logging when parsing label matchers. This flag is intended to be used with `-alertmanager.utf8-strict-mode-enabled` to validate UTF-8 strict mode is working as intended. The default value is `false`. #9173
* [FEATURE] Alertmanager: Added `-alertmanager.utf8-migration-logging-enabled` to enable logging of tenant configurations that are incompatible with UTF-8 strict mode. The default value is `false`. #9174
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9278
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 #8988
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
Expand Down
14 changes: 11 additions & 3 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ func TestUnsupportedPromQLFeatures(t *testing.T) {
// The goal of this is not to list every conceivable expression that is unsupported, but to cover all the
// different cases and make sure we produce a reasonable error message when these cases are encountered.
unsupportedExpressions := map[string]string{
"1 + 2": "binary expression between two scalars",
"metric{} < other_metric{}": "binary expression with '<'",
"metric{} or other_metric{}": "binary expression with many-to-many matching",
"metric{} < other_metric{}": "binary expression with '<'",
"metric{} or other_metric{}": "binary expression with many-to-many matching",
"metric{} + on() group_left() other_metric{}": "binary expression with many-to-one matching",
"metric{} + on() group_right() other_metric{}": "binary expression with one-to-many matching",
"topk(5, metric{})": "'topk' aggregation with parameter",
Expand Down Expand Up @@ -88,6 +87,15 @@ func TestUnsupportedPromQLFeaturesWithFeatureToggles(t *testing.T) {

requireRangeQueryIsUnsupported(t, featureToggles, "metric{} + other_metric{}", "binary expressions")
requireInstantQueryIsUnsupported(t, featureToggles, "metric{} + other_metric{}", "binary expressions")

requireRangeQueryIsUnsupported(t, featureToggles, "metric{} + 1", "binary expressions")
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
requireInstantQueryIsUnsupported(t, featureToggles, "metric{} + 1", "binary expressions")

requireRangeQueryIsUnsupported(t, featureToggles, "1 + metric{}", "binary expressions")
requireInstantQueryIsUnsupported(t, featureToggles, "1 + metric{}", "binary expressions")

requireRangeQueryIsUnsupported(t, featureToggles, "2 + 1", "binary expressions")
requireInstantQueryIsUnsupported(t, featureToggles, "2 + 1", "binary expressions")
})

t.Run("..._over_time functions", func(t *testing.T) {
Expand Down
102 changes: 102 additions & 0 deletions pkg/streamingpromql/operators/scalar_scalar_binary_operation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// SPDX-License-Identifier: AGPL-3.0-only

package operators

import (
"context"
"fmt"

"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/promql/parser/posrange"

"github.com/grafana/mimir/pkg/streamingpromql/compat"
"github.com/grafana/mimir/pkg/streamingpromql/limiting"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

type ScalarScalarBinaryOperation struct {
Left types.ScalarOperator
Right types.ScalarOperator
Op parser.ItemType
MemoryConsumptionTracker *limiting.MemoryConsumptionTracker

opFunc binaryOperationFunc
expressionPosition posrange.PositionRange
}

var _ types.ScalarOperator = &ScalarScalarBinaryOperation{}

func NewScalarScalarBinaryOperation(
left, right types.ScalarOperator,
op parser.ItemType,
memoryConsumptionTracker *limiting.MemoryConsumptionTracker,
expressionPosition posrange.PositionRange,
) (*ScalarScalarBinaryOperation, error) {
f := arithmeticOperationFuncs[op]
if f == nil {
return nil, compat.NewNotSupportedError(fmt.Sprintf("binary expression with '%s'", op))
}

return &ScalarScalarBinaryOperation{
Left: left,
Right: right,
Op: op,
MemoryConsumptionTracker: memoryConsumptionTracker,

opFunc: f,
expressionPosition: expressionPosition,
}, nil
}

func (s *ScalarScalarBinaryOperation) GetValues(ctx context.Context) (types.ScalarData, error) {
leftValues, err := s.Left.GetValues(ctx)
if err != nil {
return types.ScalarData{}, err
}

rightValues, err := s.Right.GetValues(ctx)
if err != nil {
return types.ScalarData{}, err
}

// Binary operations between two scalars always produce a float value, as only arithmetic operators or comparison
// operators (with the bool keyword) are supported between two scalars.
//
// Furthermore, scalar values always have a value at each step.
//
// So we can just compute the result of each pairwise operation without examining the timestamps of each sample.
//
// We store the result in the slice from the left operator, and return the right operator's slice once we're done.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for safety we should have a check such as len(leftValues) != len(rightValues) and raise an error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We rely on this fact in other places, and don't have checks there either (eg. scalar / vector binary operations).

If the lengths are different, the code will panic, which seems like a reasonable thing to do for a bug where an invariant has been broken.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It only panics if the right side is shorter than the left. I don't think the overhead here is very high, what do you think about it adding it to the other scalar operators too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this makes sense as a precedent to set: we don't do similar validation-type checks in other situations (eg. checking that the data points from an instant vector operator are within the query time range, or align to the requested step), and if those situations were to occur, we'd have similarly undefined behaviour.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, okay. I still think it's reasonable, but I don't hold it strongly. LGTM :-)

for i, left := range leftValues.Samples {
right := rightValues.Samples[i]

f, h, ok, err := s.opFunc(left.F, right.F, nil, nil)

if err != nil {
return types.ScalarData{}, err
}

if !ok {
panic(fmt.Sprintf("%v binary operation between two scalars (%v and %v) did not produce a result, this should never happen", s.Op.String(), left.F, right.F))
}

if h != nil {
panic(fmt.Sprintf("%v binary operation between two scalars (%v and %v) produced a histogram result, this should never happen", s.Op.String(), left.F, right.F))
}

leftValues.Samples[i].F = f
}

types.FPointSlicePool.Put(rightValues.Samples, s.MemoryConsumptionTracker)

return leftValues, nil
}

func (s *ScalarScalarBinaryOperation) ExpressionPosition() posrange.PositionRange {
return s.expressionPosition
}

func (s *ScalarScalarBinaryOperation) Close() {
s.Left.Close()
s.Right.Close()
}
17 changes: 16 additions & 1 deletion pkg/streamingpromql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,22 @@ func (q *Query) convertToScalarOperator(expr parser.Expr) (types.ScalarOperator,
case *parser.ParenExpr:
return q.convertToScalarOperator(e.Expr)
case *parser.BinaryExpr:
return nil, compat.NewNotSupportedError("binary expression between two scalars")
if !q.engine.featureToggles.EnableBinaryOperations {
return nil, compat.NewNotSupportedError("binary expressions")
}

lhs, err := q.convertToScalarOperator(e.LHS)
if err != nil {
return nil, err
}

rhs, err := q.convertToScalarOperator(e.RHS)
if err != nil {
return nil, err
}

return operators.NewScalarScalarBinaryOperation(lhs, rhs, e.Op, q.memoryConsumptionTracker, e.PositionRange())

default:
return nil, compat.NewNotSupportedError(fmt.Sprintf("PromQL expression type %T for scalars", e))
}
Expand Down
28 changes: 28 additions & 0 deletions pkg/streamingpromql/testdata/ours/binary_operators.test
Original file line number Diff line number Diff line change
Expand Up @@ -359,4 +359,32 @@ eval range from 0m to 18m step 6m 2 * {job="bar"}
eval_fail range from 0m to 24m step 6m 2 * {job="bar"}
expected_fail_message vector cannot contain metrics with the same labelset

clear

load 6m
metric 1 2 _ 3 stale 4 {{schema:3 sum:4 count:4 buckets:[1 2 1]}} 5

# Scalars on both sides.
eval range from 0m to 3m step 1m 1 + 2
{} 3 3 3 3

eval range from 0m to 3m step 1m 1 - 2
{} -1 -1 -1 -1

eval range from 0m to 3m step 1m 2 * 3
{} 6 6 6 6

eval range from 0m to 3m step 1m 1 / 2
{} 0.5 0.5 0.5 0.5

eval range from 0m to 3m step 1m 5 % 2
{} 1 1 1 1

eval range from 0m to 3m step 1m 2 ^ 3
{} 8 8 8 8

eval range from 0m to 3m step 1m 2 atan2 3
{} 0.5880026035475675 0.5880026035475675 0.5880026035475675 0.5880026035475675

eval range from 0m to 42m step 6m scalar(metric) + 2
{} 3 4 NaN 5 NaN 6 NaN 7
40 changes: 16 additions & 24 deletions pkg/streamingpromql/testdata/upstream/literals.test
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,14 @@ eval instant at 50m 12.34e+6
eval instant at 50m 12.34e-6
0.00001234

# Unsupported by streaming engine.
# eval instant at 50m 1+1
# 2
eval instant at 50m 1+1
2

# Unsupported by streaming engine.
# eval instant at 50m 1-1
# 0
eval instant at 50m 1-1
0

# Unsupported by streaming engine.
# eval instant at 50m 1 - -1
# 2
eval instant at 50m 1 - -1
2

eval instant at 50m .2
0.2
Expand Down Expand Up @@ -51,22 +48,17 @@ eval instant at 50m nan
eval instant at 50m 2.
2

# Unsupported by streaming engine.
# eval instant at 50m 1 / 0
# +Inf
eval instant at 50m 1 / 0
+Inf

# Unsupported by streaming engine.
# eval instant at 50m ((1) / (0))
# +Inf
eval instant at 50m ((1) / (0))
+Inf

# Unsupported by streaming engine.
# eval instant at 50m -1 / 0
# -Inf
eval instant at 50m -1 / 0
-Inf

# Unsupported by streaming engine.
# eval instant at 50m 0 / 0
# NaN
eval instant at 50m 0 / 0
NaN

# Unsupported by streaming engine.
# eval instant at 50m 1 % 0
# NaN
eval instant at 50m 1 % 0
NaN
55 changes: 23 additions & 32 deletions pkg/streamingpromql/testdata/upstream/operators.test
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,15 @@ eval instant at 50m - - - SUM(http_requests) BY (job)
eval instant at 50m - - - 1
-1

# Unsupported by streaming engine.
# eval instant at 50m -2^---1*3
# -1.5
eval instant at 50m -2^---1*3
-1.5

# Unsupported by streaming engine.
# eval instant at 50m 2/-2^---1*3+2
# -10
eval instant at 50m 2/-2^---1*3+2
-10

# Unsupported by streaming engine.
# eval instant at 50m -10^3 * - SUM(http_requests) BY (job) ^ -1
# {job="api-server"} 1
# {job="app-server"} 0.38461538461538464
eval instant at 50m -10^3 * - SUM(http_requests) BY (job) ^ -1
{job="api-server"} 1
{job="app-server"} 0.38461538461538464

eval instant at 50m 1000 / SUM(http_requests) BY (job)
{job="api-server"} 1
Expand All @@ -75,25 +72,21 @@ eval instant at 50m SUM(http_requests) BY (job) ^ 2
{job="api-server"} 1000000
{job="app-server"} 6760000

# Unsupported by streaming engine.
# eval instant at 50m SUM(http_requests) BY (job) % 3 ^ 2
# {job="api-server"} 1
# {job="app-server"} 8
eval instant at 50m SUM(http_requests) BY (job) % 3 ^ 2
{job="api-server"} 1
{job="app-server"} 8

# Unsupported by streaming engine.
# eval instant at 50m SUM(http_requests) BY (job) % 2 ^ (3 ^ 2)
# {job="api-server"} 488
# {job="app-server"} 40
eval instant at 50m SUM(http_requests) BY (job) % 2 ^ (3 ^ 2)
{job="api-server"} 488
{job="app-server"} 40

# Unsupported by streaming engine.
# eval instant at 50m SUM(http_requests) BY (job) % 2 ^ 3 ^ 2
# {job="api-server"} 488
# {job="app-server"} 40
eval instant at 50m SUM(http_requests) BY (job) % 2 ^ 3 ^ 2
{job="api-server"} 488
{job="app-server"} 40

# Unsupported by streaming engine.
# eval instant at 50m SUM(http_requests) BY (job) % 2 ^ 3 ^ 2 ^ 2
# {job="api-server"} 1000
# {job="app-server"} 2600
eval instant at 50m SUM(http_requests) BY (job) % 2 ^ 3 ^ 2 ^ 2
{job="api-server"} 1000
{job="app-server"} 2600

# Unsupported by streaming engine.
# eval instant at 50m COUNT(http_requests) BY (job) ^ COUNT(http_requests) BY (job)
Expand Down Expand Up @@ -545,10 +538,8 @@ eval instant at 5m trigy atan2 trigx
eval instant at 5m trigy atan2 trigNaN
{} NaN

# Unsupported by streaming engine.
# eval instant at 5m 10 atan2 20
# 0.4636476090008061
eval instant at 5m 10 atan2 20
0.4636476090008061

# Unsupported by streaming engine.
# eval instant at 5m 10 atan2 NaN
# NaN
eval instant at 5m 10 atan2 NaN
NaN
Loading