Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQE: Add support for resets function #9859

Merged
merged 10 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@
* [CHANGE] Cache: Deprecate experimental support for Redis as a cache backend. The support is set to be removed in the next major release. #9453
* [FEATURE] Alertmanager: Added `-alertmanager.log-parsing-label-matchers` to control logging when parsing label matchers. This flag is intended to be used with `-alertmanager.utf8-strict-mode-enabled` to validate UTF-8 strict mode is working as intended. The default value is `false`. #9173
* [FEATURE] Alertmanager: Added `-alertmanager.utf8-migration-logging-enabled` to enable logging of tenant configurations that are incompatible with UTF-8 strict mode. The default value is `false`. #9174
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280 #9281 #9342 #9343 #9371
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280 #9281 #9342 #9343 #9371 #9859
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 #8988
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
Expand Down
2 changes: 1 addition & 1 deletion pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2309,7 +2309,7 @@ func TestCompareVariousMixedMetricsVectorSelectors(t *testing.T) {

for _, labels := range labelCombinations {
labelRegex := strings.Join(labels, "|")
for _, function := range []string{"rate", "increase", "changes"} {
for _, function := range []string{"rate", "increase", "changes", "resets"} {
expressions = append(expressions, fmt.Sprintf(`%s(series{label=~"(%s)"}[45s])`, function, labelRegex))
expressions = append(expressions, fmt.Sprintf(`%s(series{label=~"(%s)"}[1m])`, function, labelRegex))
expressions = append(expressions, fmt.Sprintf(`sum(%s(series{label=~"(%s)"}[2m15s]))`, function, labelRegex))
Expand Down
1 change: 1 addition & 0 deletions pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe
"present_over_time": FunctionOverRangeVectorOperatorFactory("present_over_time", functions.PresentOverTime),
"rad": InstantVectorTransformationFunctionOperatorFactory("rad", functions.Rad),
"rate": FunctionOverRangeVectorOperatorFactory("rate", functions.Rate),
"resets": FunctionOverRangeVectorOperatorFactory("resets", functions.Resets),
"round": RoundFunctionOperatorFactory(),
"sgn": InstantVectorTransformationFunctionOperatorFactory("sgn", functions.Sgn),
"sin": InstantVectorTransformationFunctionOperatorFactory("sin", functions.Sin),
Expand Down
52 changes: 52 additions & 0 deletions pkg/streamingpromql/operators/functions/range_vectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,3 +375,55 @@ func changes(step *types.RangeVectorStepData, _ float64, _ types.EmitAnnotationF

return changes, true, nil, nil
}

var Resets = FunctionOverRangeVectorDefinition{
SeriesMetadataFunction: DropSeriesName,
StepFunc: resets,
}

func resets(step *types.RangeVectorStepData, _ float64, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
fHead, fTail := step.Floats.UnsafePoints()
hHead, hTail := step.Histograms.UnsafePoints()

haveFloats := len(fHead) > 0 || len(fTail) > 0
haveHistograms := len(hHead) > 0 || len(hTail) > 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] The head slice will always be populated if there are any points, so we could drop the len(xTail) checks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copied from xOverTime function 😆 . Such as. So this means, we can drop tail check there too, right? Maybe we can do it in another PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, we can drop it there too, and I'm happy for you to do that in another PR.


if !haveFloats && !haveHistograms {
return 0, false, nil, nil
}

resets := 0.0

if haveFloats {
prev := fHead[0].F
accumulate := func(points []promql.FPoint) {
for _, sample := range points {
current := sample.F
if current < prev {
resets++
}
prev = current
}

}
accumulate(fHead[1:])
accumulate(fTail)
}

if haveHistograms {
prev := hHead[0].H
accumulate := func(points []promql.HPoint) {
for _, sample := range points {
current := sample.H
if current.DetectReset(prev) {
resets++
}
prev = current
}
}
accumulate(hHead[1:])
accumulate(hTail)
}

return resets, true, nil, nil
}
65 changes: 65 additions & 0 deletions pkg/streamingpromql/testdata/ours/functions.test
Original file line number Diff line number Diff line change
Expand Up @@ -460,3 +460,68 @@ load 6m

eval range from 0 to 96m step 6m round(series, scalar(toNearest))
{a="b"} -5 2.7 0.5 20 10 3.25 0 1000.01 0 -1000000 NaN NaN NaN NaN _ _ 10

clear

# Testing resets
load 1m
simple_metric_all_same_no_reset{num="0"} 0 0 0 0 0 0 0 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] It might be clearer to change these series to use labels like these:

metric{case="all the same, no resets, value 0"}
metric{case="all the same, no resets, value 3"}
metric{case="all floats, no resets"}
metric{case="all floats, some resets"}
...

This would make the cases clearer given we can use full English phrases. Then the evals below can be collapsed into one eval range from 0 to X step Y resets(metric) as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored the test in 647feb3

simple_metric_all_same_no_reset{num="3"} 3 3 3 3 3 3 3 3
simple_metric_no_reset{foo="bar"} 0 1 2 3 _ 5 6 _
simple_metric_has_reset{foo="bar"} 0 1 2 3 2 1 0 _
some_inf_and_nan_metric{foo="bar"} Inf 1 2 3 NaN Inf Inf NaN 100 10 8 7 7 6
hist_metric_no_reset{foo="bar"} {{schema:3 sum:0 count:0 buckets:[1 2 1]}} {{schema:3 sum:0 count:0 buckets:[1 2 1]}}
hist_metric_has_reset_count_decrease{foo="bar"} {{schema:3 sum:0 count:2 buckets:[1 2 1]}} {{schema:3 sum:0 count:1 buckets:[1 2 1]}}
hist_metric_has_reset_bucket_decrease{foo="bar"} {{schema:3 sum:0 count:2 buckets:[1 2 1]}} {{schema:3 sum:0 count:1 buckets:[1 2 1]}} {{schema:3 sum:0 count:1 buckets:[1 0 1]}}
hist_metric_no_reset_for_sum_decrease{foo="bar"} {{schema:3 sum:3 count:2 buckets:[1 2 1]}} {{schema:3 sum:0 count:2 buckets:[1 2 1]}}
hist_metric_has_reset_with_schema_increase{foo="bar"} {{schema:3 sum:0 count:2 buckets:[1 2 1]}} {{schema:4 sum:0 count:2 buckets:[1 2 1]}}
hist_metric_mix_float{foo="bar"} 9 1 2 3 _ _ {{schema:3 sum:4 count:4 buckets:[1 2 1]}} {{schema:3 sum:5 count:3 buckets:[2 5 4]}}
nhcb_metric_no_reset{foo="bar"} {{schema:-53 sum:1 count:5 custom_values:[5 10] buckets:[1 4]}} {{schema:-53 sum:1 count:5 custom_values:[5 10] buckets:[1 4]}} {{schema:-53 sum:1 count:5 custom_values:[5 10] buckets:[1 4]}}
nhcb_metric_has_reset{foo="bar"} {{schema:-53 sum:1 count:5 custom_values:[5 10] buckets:[1 4]}} {{schema:-53 sum:15 count:2 custom_values:[5 10] buckets:[0 2]}} {{schema:-53 sum:3 count:15 custom_values:[5 10] buckets:[7 8]}} {{schema:-53 sum:3 count:15 custom_values:[5 10] buckets:[0 0]}}
mixed_all_has_reset{foo="bar"} 0 1 2 3 2 1 0 _ {{schema:3 sum:0 count:2 buckets:[1 2 1]}} {{schema:3 sum:0 count:1 buckets:[1 2 1]}} {{schema:3 sum:0 count:1 buckets:[1 0 1]}} {{schema:-53 sum:1 count:5 custom_values:[5 10] buckets:[1 4]}} {{schema:-53 sum:15 count:2 custom_values:[5 10] buckets:[0 2]}} {{schema:-53 sum:3 count:15 custom_values:[5 10] buckets:[7 8]}} {{schema:-53 sum:3 count:15 custom_values:[5 10] buckets:[0 0]}}

eval range from 0 to 8m step 1m resets(simple_metric_all_same_no_reset{num="0"}[3m])
{num="0"} 0 0 0 0 0 0 0 0 0

eval range from 0 to 8m step 1m resets(simple_metric_all_same_no_reset{num="3"}[3m])
{num="3"} 0 0 0 0 0 0 0 0 0

eval range from 0 to 8m step 1m resets(simple_metric_all_same_no_reset[3m])
{num="0"} 0 0 0 0 0 0 0 0 0
{num="3"} 0 0 0 0 0 0 0 0 0

eval range from 0 to 8m step 1m resets(simple_metric_no_reset[3m])
{foo="bar"} 0 0 0 0 0 0 0 0 0

eval range from 0 to 8m step 1m resets(simple_metric_has_reset[3m])
{foo="bar"} 0 0 0 0 1 2 3 2 1

eval range from 0 to 15m step 1m resets(some_inf_and_nan_metric[3m])
{foo="bar"} 0 1 1 1 0 0 0 0 0 1 2 3 2 2 1 1

eval range from 0 to 8m step 1m resets(hist_metric_no_reset[3m])
{foo="bar"} 0 0 0 0 0

eval range from 0 to 8m step 1m resets(hist_metric_has_reset_count_decrease[3m])
{foo="bar"} 0 1 1 1 0

eval range from 0 to 8m step 1m resets(hist_metric_has_reset_bucket_decrease[3m])
{foo="bar"} 0 1 2 2 1 0

eval range from 0 to 8m step 1m resets(hist_metric_no_reset_for_sum_decrease[3m])
{foo="bar"} 0 0 0 0 0

eval range from 0 to 8m step 1m resets(hist_metric_has_reset_with_schema_increase[3m])
{foo="bar"} 0 1 1 1 0

eval range from 0 to 8m step 1m resets(hist_metric_mix_float[3m])
{foo="bar"} 0 1 1 1 0 0 0 1 1

eval range from 0 to 8m step 1m resets(nhcb_metric_no_reset[3m])
{foo="bar"} 0 0 0 0 0 0

eval range from 0 to 8m step 1m resets(nhcb_metric_has_reset[3m])
{foo="bar"} 0 1 1 2 1 1 0

eval range from 0 to 20m step 1m resets(mixed_all_has_reset[3m])
{foo="bar"} 0 0 0 0 1 2 3 2 1 1 2 3 3 2 2 1 1 0
48 changes: 21 additions & 27 deletions pkg/streamingpromql/testdata/upstream/functions.test
lamida marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -10,38 +10,32 @@ load 5m
http_requests{path="/biz"} 0 0 0 0 0 1 1 1 1 1

# Tests for resets().
# Unsupported by streaming engine.
# eval instant at 50m resets(http_requests[5m])
# {path="/foo"} 0
# {path="/bar"} 0
# {path="/biz"} 0
eval instant at 50m resets(http_requests[5m])
{path="/foo"} 0
{path="/bar"} 0
{path="/biz"} 0

# Unsupported by streaming engine.
# eval instant at 50m resets(http_requests[300])
# {path="/foo"} 0
# {path="/bar"} 0
# {path="/biz"} 0
eval instant at 50m resets(http_requests[300])
{path="/foo"} 0
{path="/bar"} 0
{path="/biz"} 0

# Unsupported by streaming engine.
# eval instant at 50m resets(http_requests[20m])
# {path="/foo"} 1
# {path="/bar"} 0
# {path="/biz"} 0
eval instant at 50m resets(http_requests[20m])
{path="/foo"} 1
{path="/bar"} 0
{path="/biz"} 0

# Unsupported by streaming engine.
# eval instant at 50m resets(http_requests[30m])
# {path="/foo"} 2
# {path="/bar"} 1
# {path="/biz"} 0
eval instant at 50m resets(http_requests[30m])
{path="/foo"} 2
{path="/bar"} 1
{path="/biz"} 0

# Unsupported by streaming engine.
# eval instant at 50m resets(http_requests[50m])
# {path="/foo"} 3
# {path="/bar"} 1
# {path="/biz"} 0
eval instant at 50m resets(http_requests[50m])
{path="/foo"} 3
{path="/bar"} 1
{path="/biz"} 0

# Unsupported by streaming engine.
# eval instant at 50m resets(nonexistent_metric[50m])
eval instant at 50m resets(nonexistent_metric[50m])

# Tests for changes().
eval instant at 50m changes(http_requests[5m])
Expand Down