Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQE: Fix native histogram bugs #9145

Merged
merged 8 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* [CHANGE] Ingester: increase the default inactivity timeout of active series (`-ingester.active-series-metrics-idle-timeout`) from `10m` to `20m`. #8975
* [CHANGE] Distributor: Remove `-distributor.enable-otlp-metadata-storage` flag, which was deprecated in version 2.12. #9069
* [CHANGE] Ruler: Removed `-ruler.drain-notification-queue-on-shutdown` option, which is now enabled by default. #9115
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9145
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 #8988
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
Expand Down
3 changes: 1 addition & 2 deletions pkg/streamingpromql/aggregations/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,7 @@ func (g *SumAggregationGroup) ComputeOutputSeries(start int64, interval int64, m
for i, h := range g.histogramSums {
if h != nil && h != invalidCombinationOfHistograms {
t := start + int64(i)*interval
// TODO(jhesketh): histograms should be compacted here
histogramPoints = append(histogramPoints, promql.HPoint{T: t, H: g.histogramSums[i]})
histogramPoints = append(histogramPoints, promql.HPoint{T: t, H: h.Compact(0)})
}
}
}
Expand Down
21 changes: 20 additions & 1 deletion pkg/streamingpromql/operators/instant_vector_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,26 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe
// if they are going to mutate it, so this is safe to do.
t, h = atT, lastHistogram
} else {
t, h = v.memoizedIterator.AtFloatHistogram()
// v.memoizedIterator.AtFloatHistogram() does not allow us to supply an existing histogram and instead creates one and returns it.
// The problem is that histograms with the same schema returned from vendor/github.com/prometheus/prometheus/tsdb/chunkenc/float_histogram.go
// share the same underlying Span slices.
// This causes a problem when a NH schema is modified (for example, during a Sum) then the
// other NH's with the same spans are also modified. As such, we need to create a new Span
// for each NH.
// We can guarantee new spans by providing a NH to chunkIterator.AtFloatHistogram(). This is because the
// chunkIterator copies the values into the NH.
// This doesn't have an overhead for us because memoizedIterator.AtFloatHistogram creates a NH when none is supplied.

// The original line:
// t, h = v.memoizedIterator.AtFloatHistogram()
// May be restorable if upstream accepts creating new Span slices for each native histogram.
// This creates an overhead for them since they don't currently experience any problems sharing spans
// because they copy NH's before performing any operations.
// TODO(jhesketh): change this back if https://github.com/prometheus/prometheus/pull/14771 merges

t = v.memoizedIterator.AtT()
h = &histogram.FloatHistogram{}
v.chunkIterator.AtFloatHistogram(h)
lastHistogramT = t
lastHistogram = h
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/streamingpromql/operators/instant_vector_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,20 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) {
requireNotSame(t, points[0].H, points[2].H)
},
},
"different histograms have different spans": {
data: `
load 1m
my_metric {{schema:0 sum:1 count:1 buckets:[1 0 1]}} {{schema:0 sum:3 count:2 buckets:[1 0 1]}}
`,
stepCount: 2,
check: func(t *testing.T, points []promql.HPoint, _ []promql.FPoint) {
require.Len(t, points, 2)
requireNotSame(t, points[0].H, points[1].H)
// requireNotSame only checks the slice headers are different. It does not check that the slices do not point the same underlying arrays
// So specifically check the if the first elements are different
require.NotSame(t, &points[0].H.PositiveSpans[0], &points[1].H.PositiveSpans[0], "must not point to the same underlying array")
},
},
// FIXME: this test currently fails due to https://github.com/prometheus/prometheus/issues/14172
//
//"point has same value as a previous point, but there is a float value in between": {
Expand Down
25 changes: 24 additions & 1 deletion pkg/streamingpromql/testdata/ours/aggregators.test
Original file line number Diff line number Diff line change
Expand Up @@ -182,4 +182,27 @@ eval range from 0 to 5m step 1m max(series)
eval range from 0 to 5m step 1m min(series)
{} -20 -10 -10 0 -10 0

clear
clear

# Test native histogram compaction
load 1m
single_histogram{label="value"} {{schema:1 sum:5 count:5 buckets:[1 3 1 4 0 2]}}
single_histogram{label="value2"} {{schema:1 sum:5 count:5 buckets:[1 3 1]}}

# Compaction will happen in this sum. It will fail without it.
eval instant at 1m sum(single_histogram)
{} {{schema:1 count:10 sum:10 buckets:[2 6 2 4 0 2]}}

clear

# This test is for checking NH's do not have a copied PositiveSpan
# It works because a sum will modify a span that a subsequent value is using.
load 5m
native_histogram{instance="1"} {{schema:5 sum:10 count:7 buckets:[1 2 3 1]}} {{schema:5 sum:8 count:7 buckets:[1 5 1]}} {{schema:5 sum:18 count:17 buckets:[1 5 1]}}
native_histogram{instance="2"} {{schema:3 sum:4 count:4 buckets:[4]}}

# Test range query with native histograms
eval range from 0m to 10m step 5m sum (native_histogram)
{} {{schema:3 count:11 sum:14 buckets:[5 6]}} {{schema:3 count:11 sum:12 buckets:[5 6]}} {{schema:5 count:17 sum:18 buckets:[1 5 1]}}

clear
Loading