From eeb77f30b80398c61fc9f11db126d6f2ae3a2e9e Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Wed, 28 Aug 2024 15:24:19 +1000 Subject: [PATCH 1/8] Add test the checks for NH compaction on sum --- pkg/streamingpromql/aggregations/sum.go | 3 +-- pkg/streamingpromql/testdata/ours/aggregators.test | 13 ++++++++++++- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/pkg/streamingpromql/aggregations/sum.go b/pkg/streamingpromql/aggregations/sum.go index 135404361ec..46dbfe97f08 100644 --- a/pkg/streamingpromql/aggregations/sum.go +++ b/pkg/streamingpromql/aggregations/sum.go @@ -170,8 +170,7 @@ func (g *SumAggregationGroup) ComputeOutputSeries(start int64, interval int64, m for i, h := range g.histogramSums { if h != nil && h != invalidCombinationOfHistograms { t := start + int64(i)*interval - // TODO(jhesketh): histograms should be compacted here - histogramPoints = append(histogramPoints, promql.HPoint{T: t, H: g.histogramSums[i]}) + histogramPoints = append(histogramPoints, promql.HPoint{T: t, H: h.Compact(0)}) } } } diff --git a/pkg/streamingpromql/testdata/ours/aggregators.test b/pkg/streamingpromql/testdata/ours/aggregators.test index 9ecc3ca4d02..f4c19bcecc6 100644 --- a/pkg/streamingpromql/testdata/ours/aggregators.test +++ b/pkg/streamingpromql/testdata/ours/aggregators.test @@ -182,4 +182,15 @@ eval range from 0 to 5m step 1m max(series) eval range from 0 to 5m step 1m min(series) {} -20 -10 -10 0 -10 0 -clear \ No newline at end of file +clear + +# Test native histogram compaction +load 1m + single_histogram{label="value"} {{schema:1 sum:5 count:5 buckets:[1 3 1 4 0 2]}} + single_histogram{label="value2"} {{schema:1 sum:5 count:5 buckets:[1 3 1]}} + +# Compaction will happen in this sum. It will fail without it. +eval instant at 1m sum(single_histogram) + {} {{schema:1 count:10 sum:10 buckets:[2 6 2 4 0 2]}} + +clear From fceb4d57c9e50958e80f5131ae6c992a0af2688a Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Fri, 30 Aug 2024 21:43:14 +1000 Subject: [PATCH 2/8] Use chunkIterator.AtFloatHistogram instead of memoizedIterator.AtFloatHistogram This copies the spans and buckets so that they are not reused between native histograms. --- .../operators/instant_vector_selector.go | 20 ++++++++++++++++++- .../operators/instant_vector_selector_test.go | 14 +++++++++++++ .../testdata/ours/aggregators.test | 12 +++++++++++ 3 files changed, 45 insertions(+), 1 deletion(-) diff --git a/pkg/streamingpromql/operators/instant_vector_selector.go b/pkg/streamingpromql/operators/instant_vector_selector.go index 443c8b0a25c..3af6843eeb3 100644 --- a/pkg/streamingpromql/operators/instant_vector_selector.go +++ b/pkg/streamingpromql/operators/instant_vector_selector.go @@ -99,7 +99,25 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe // if they are going to mutate it, so this is safe to do. t, h = atT, lastHistogram } else { - t, h = v.memoizedIterator.AtFloatHistogram() + // v.memoizedIterator.AtFloatHistogram() does not allow us to supply an existing histogram and instead creates one and returns it. + // The problem is that histograms with the same schema returned from vendor/github.com/prometheus/prometheus/tsdb/chunkenc/float_histogram.go + // share the same underlying Span slices. + // This causes a problem when a NH schema is modified (for example, during a Sum) then the + // other NH's with the same spans are also modified. As such, we need to create a new Span + // for each NH. + // We can guarantee new spans by providing a NH to chunkIterator.AtFloatHistogram(). This is because the + // chunkIterator copies the values into the NH. + // This doesn't have an overhead for us because memoizedIterator.AtFloatHistogram creates a NH when none is supplied. + + // The original line: + // t, h = v.memoizedIterator.AtFloatHistogram() + // May be restorable if upstream accepts creating new Span slices for each native histogram. + // This creates an overhead for them since they don't currently experience any problems sharing spans + // because they copy NH's before performing any operations. + + t = v.memoizedIterator.AtT() + h = &histogram.FloatHistogram{} + v.chunkIterator.AtFloatHistogram(h) lastHistogramT = t lastHistogram = h } diff --git a/pkg/streamingpromql/operators/instant_vector_selector_test.go b/pkg/streamingpromql/operators/instant_vector_selector_test.go index 62ad258653d..1558c8219f6 100644 --- a/pkg/streamingpromql/operators/instant_vector_selector_test.go +++ b/pkg/streamingpromql/operators/instant_vector_selector_test.go @@ -151,6 +151,20 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) { requireNotSame(t, points[0].H, points[2].H) }, }, + "different histograms have different spans": { + data: ` + load 1m + my_metric {{schema:0 sum:1 count:1 buckets:[1 0 1]}} {{schema:0 sum:3 count:2 buckets:[1 0 1]}} + `, + stepCount: 2, + check: func(t *testing.T, points []promql.HPoint, _ []promql.FPoint) { + require.Len(t, points, 2) + requireNotSame(t, points[0].H, points[1].H) + // requireNotSame only checks the slice headers are different. It does not check that the slices do not point the same underlying arrays + // So specifically check the if the first elements are different + require.NotSame(t, &points[0].H.PositiveSpans[0], &points[1].H.PositiveSpans[0], "must not point to the same underlying array") + }, + }, // FIXME: this test currently fails due to https://github.com/prometheus/prometheus/issues/14172 // //"point has same value as a previous point, but there is a float value in between": { diff --git a/pkg/streamingpromql/testdata/ours/aggregators.test b/pkg/streamingpromql/testdata/ours/aggregators.test index f4c19bcecc6..abbfeb32232 100644 --- a/pkg/streamingpromql/testdata/ours/aggregators.test +++ b/pkg/streamingpromql/testdata/ours/aggregators.test @@ -194,3 +194,15 @@ eval instant at 1m sum(single_histogram) {} {{schema:1 count:10 sum:10 buckets:[2 6 2 4 0 2]}} clear + +# This test is for checking NH's do not have a copied PositiveSpan +# It works because a sum will modify a span that a subsequent value is using. +load 5m + native_histogram{instance="1"} {{schema:5 sum:10 count:7 buckets:[1 2 3 1]}} {{schema:5 sum:8 count:7 buckets:[1 5 1]}} {{schema:5 sum:18 count:17 buckets:[1 5 1]}} + native_histogram{instance="2"} {{schema:3 sum:4 count:4 buckets:[4]}} + +# Test range query with native histograms +eval range from 0m to 10m step 5m sum (native_histogram) + {} {{schema:3 count:11 sum:14 buckets:[5 6]}} {{schema:3 count:11 sum:12 buckets:[5 6]}} {{schema:5 count:17 sum:18 buckets:[1 5 1]}} + +clear From 6b64584b70893c798719c7463a22eca0c31598a5 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Fri, 30 Aug 2024 23:36:17 +1000 Subject: [PATCH 3/8] Update CHANGELOG --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index df6ad1d5d88..95faa3757f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,7 +29,7 @@ * [CHANGE] Ingester: increase the default inactivity timeout of active series (`-ingester.active-series-metrics-idle-timeout`) from `10m` to `20m`. #8975 * [CHANGE] Distributor: Remove `-distributor.enable-otlp-metadata-storage` flag, which was deprecated in version 2.12. #9069 * [CHANGE] Ruler: Removed `-ruler.drain-notification-queue-on-shutdown` option, which is now enabled by default. #9115 -* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 +* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9145 * [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 #8988 * What it is: * When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path. From 430bb9050cefb2aa2aca75a81135c77d90e7cf6a Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Mon, 2 Sep 2024 11:34:30 +1000 Subject: [PATCH 4/8] Add TODO to remove workaround --- pkg/streamingpromql/operators/instant_vector_selector.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/streamingpromql/operators/instant_vector_selector.go b/pkg/streamingpromql/operators/instant_vector_selector.go index 3af6843eeb3..f266c7d133e 100644 --- a/pkg/streamingpromql/operators/instant_vector_selector.go +++ b/pkg/streamingpromql/operators/instant_vector_selector.go @@ -114,6 +114,7 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe // May be restorable if upstream accepts creating new Span slices for each native histogram. // This creates an overhead for them since they don't currently experience any problems sharing spans // because they copy NH's before performing any operations. + // TODO(jhesketh): change this back if https://github.com/prometheus/prometheus/pull/14771 merges t = v.memoizedIterator.AtT() h = &histogram.FloatHistogram{} From 8bae026611b4f0dd325e3512e77b6d00e42fb8ad Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Mon, 2 Sep 2024 13:24:46 +1000 Subject: [PATCH 5/8] Add failing test case --- .../operators/instant_vector_selector_test.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/pkg/streamingpromql/operators/instant_vector_selector_test.go b/pkg/streamingpromql/operators/instant_vector_selector_test.go index 1558c8219f6..7c6d4cb2e6a 100644 --- a/pkg/streamingpromql/operators/instant_vector_selector_test.go +++ b/pkg/streamingpromql/operators/instant_vector_selector_test.go @@ -165,6 +165,21 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) { require.NotSame(t, &points[0].H.PositiveSpans[0], &points[1].H.PositiveSpans[0], "must not point to the same underlying array") }, }, + "successive histograms returned due to lookback, but refer to different histograms": { + data: ` + load 30s + my_metric _ {{schema:5 sum:10 count:7 buckets:[1 2 3 1]}} _ {{schema:5 sum:12 count:8 buckets:[1 2 3 2]}} _ + # 0m 30s 1m 1m30s 2m (nothing) + `, + stepCount: 3, + check: func(t *testing.T, points []promql.HPoint, _ []promql.FPoint) { + require.Len(t, points, 2) + requireNotSame(t, points[0].H, points[1].H) + // requireNotSame only checks the slice headers are different. It does not check that the slices do not point the same underlying arrays + // So specifically check the if the first elements are different + require.NotSame(t, &points[0].H.PositiveSpans[0], &points[1].H.PositiveSpans[0], "must not point to the same underlying array") + }, + }, // FIXME: this test currently fails due to https://github.com/prometheus/prometheus/issues/14172 // //"point has same value as a previous point, but there is a float value in between": { From 52f9c1f0626537a40c565bd19b2d1b68f5763e6d Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Mon, 2 Sep 2024 13:38:52 +1000 Subject: [PATCH 6/8] Apply same assertions to all test cases --- .../operators/instant_vector_selector_test.go | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/pkg/streamingpromql/operators/instant_vector_selector_test.go b/pkg/streamingpromql/operators/instant_vector_selector_test.go index 7c6d4cb2e6a..46eb550e195 100644 --- a/pkg/streamingpromql/operators/instant_vector_selector_test.go +++ b/pkg/streamingpromql/operators/instant_vector_selector_test.go @@ -20,11 +20,12 @@ import ( func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) { requireNotSame := func(t *testing.T, h1, h2 *histogram.FloatHistogram) { require.NotSame(t, h1, h2, "must not point to the same *FloatHistogram") - require.NotSame(t, h1.PositiveSpans, h2.PositiveSpans, "must not point to the same positive spans slice") - require.NotSame(t, h1.NegativeSpans, h2.NegativeSpans, "must not point to the same negative spans slice") - require.NotSame(t, h1.PositiveBuckets, h2.PositiveBuckets, "must not point to the same positive buckets slice") - require.NotSame(t, h1.NegativeBuckets, h2.NegativeBuckets, "must not point to the same negative buckets slice") - require.NotSame(t, h1.CustomValues, h2.CustomValues, "must not point to the same custom values slice") + + requireNotSameSlices(t, h1.PositiveSpans, h2.PositiveSpans, "positive spans") + requireNotSameSlices(t, h1.NegativeSpans, h2.NegativeSpans, "negative spans") + requireNotSameSlices(t, h1.PositiveBuckets, h2.PositiveBuckets, "positive buckets") + requireNotSameSlices(t, h1.NegativeBuckets, h2.NegativeBuckets, "negative buckets") + requireNotSameSlices(t, h1.CustomValues, h2.CustomValues, "custom values") } testCases := map[string]struct { @@ -160,9 +161,6 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) { check: func(t *testing.T, points []promql.HPoint, _ []promql.FPoint) { require.Len(t, points, 2) requireNotSame(t, points[0].H, points[1].H) - // requireNotSame only checks the slice headers are different. It does not check that the slices do not point the same underlying arrays - // So specifically check the if the first elements are different - require.NotSame(t, &points[0].H.PositiveSpans[0], &points[1].H.PositiveSpans[0], "must not point to the same underlying array") }, }, "successive histograms returned due to lookback, but refer to different histograms": { @@ -175,9 +173,6 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) { check: func(t *testing.T, points []promql.HPoint, _ []promql.FPoint) { require.Len(t, points, 2) requireNotSame(t, points[0].H, points[1].H) - // requireNotSame only checks the slice headers are different. It does not check that the slices do not point the same underlying arrays - // So specifically check the if the first elements are different - require.NotSame(t, &points[0].H.PositiveSpans[0], &points[1].H.PositiveSpans[0], "must not point to the same underlying array") }, }, // FIXME: this test currently fails due to https://github.com/prometheus/prometheus/issues/14172 @@ -231,3 +226,13 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) { }) } } + +func requireNotSameSlices[T any](t *testing.T, s1, s2 []T, description string) { + require.NotSamef(t, s1, s2, "must not point to the same %v slice", description) + + // require.NotSame only checks the slice headers are different. It does not check that the slices do not point the same underlying arrays. + // So specifically check if the first elements are different. + if len(s1) > 0 && len(s2) > 0 { + require.NotSamef(t, &s1[0], &s2[0], "must not point to the same underlying %v array", description) + } +} From 1413668a5877eb9506f9b403c9c5be45c37d7523 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Mon, 2 Sep 2024 13:55:08 +1000 Subject: [PATCH 7/8] Apply workaround to lookback case as well --- .../operators/instant_vector_selector.go | 34 +++++++++++++------ 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/pkg/streamingpromql/operators/instant_vector_selector.go b/pkg/streamingpromql/operators/instant_vector_selector.go index f266c7d133e..0107dd95f26 100644 --- a/pkg/streamingpromql/operators/instant_vector_selector.go +++ b/pkg/streamingpromql/operators/instant_vector_selector.go @@ -9,6 +9,7 @@ import ( "context" "fmt" "math" + "slices" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/value" @@ -99,21 +100,21 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe // if they are going to mutate it, so this is safe to do. t, h = atT, lastHistogram } else { - // v.memoizedIterator.AtFloatHistogram() does not allow us to supply an existing histogram and instead creates one and returns it. - // The problem is that histograms with the same schema returned from vendor/github.com/prometheus/prometheus/tsdb/chunkenc/float_histogram.go - // share the same underlying Span slices. - // This causes a problem when a NH schema is modified (for example, during a Sum) then the - // other NH's with the same spans are also modified. As such, we need to create a new Span - // for each NH. + // v.memoizedIterator.AtFloatHistogram() does not allow us to supply an existing histogram and instead creates one + // and returns it. + // The problem is that histograms with the same schema returned from chunkenc.floatHistogramIterator share the same + // underlying Span slices. + // This causes a problem when a NH schema is modified (for example, during a Sum) then the other NHs with the same + // spans are also modified. As such, we need to create a new Span for each NH. // We can guarantee new spans by providing a NH to chunkIterator.AtFloatHistogram(). This is because the // chunkIterator copies the values into the NH. // This doesn't have an overhead for us because memoizedIterator.AtFloatHistogram creates a NH when none is supplied. // The original line: - // t, h = v.memoizedIterator.AtFloatHistogram() - // May be restorable if upstream accepts creating new Span slices for each native histogram. - // This creates an overhead for them since they don't currently experience any problems sharing spans - // because they copy NH's before performing any operations. + // t, h = v.memoizedIterator.AtFloatHistogram() + // may be restorable if Prometheus accepts creating new Span slices for each native histogram. + // This creates an overhead for Prometheus since they don't currently experience any problems sharing spans + // because they copy NHs before performing any operations. // TODO(jhesketh): change this back if https://github.com/prometheus/prometheus/pull/14771 merges t = v.memoizedIterator.AtT() @@ -143,6 +144,7 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe // this, we can end up with incorrect query results. h = lastHistogram } else { + applyWorkaroundForSharedSpanSlices(h) lastHistogramT = t lastHistogram = h } @@ -188,3 +190,15 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe func (v *InstantVectorSelector) Close() { v.Selector.Close() } + +func applyWorkaroundForSharedSpanSlices(h *histogram.FloatHistogram) { + // v.memoizedIterator.PeekPrev() does not allow us to supply an existing histogram and instead creates one and returns it. + // The problem is that histograms with the same schema returned from chunkenc.floatHistogramIterator share the same + // underlying Span slices. + // This causes a problem when a NH schema is modified (for example, during a Sum) then the other NHs with the same + // spans are also modified. As such, we need to create new Span slices for each NH. + // TODO(jhesketh): remove this if https://github.com/prometheus/prometheus/pull/14771 merges + + h.PositiveSpans = slices.Clone(h.PositiveSpans) + h.NegativeSpans = slices.Clone(h.NegativeSpans) +} From f2331478c51a662c442e40ae8f0d5ab8e71a1634 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Mon, 2 Sep 2024 15:33:19 +1000 Subject: [PATCH 8/8] Address PR feedback --- .../operators/instant_vector_selector.go | 13 ++++++------- .../operators/instant_vector_selector_test.go | 4 ++-- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/pkg/streamingpromql/operators/instant_vector_selector.go b/pkg/streamingpromql/operators/instant_vector_selector.go index 0107dd95f26..cefb774bf12 100644 --- a/pkg/streamingpromql/operators/instant_vector_selector.go +++ b/pkg/streamingpromql/operators/instant_vector_selector.go @@ -191,14 +191,13 @@ func (v *InstantVectorSelector) Close() { v.Selector.Close() } +// v.memoizedIterator.PeekPrev() does not allow us to supply an existing histogram and instead creates one and returns it. +// The problem is that histograms with the same schema returned from chunkenc.floatHistogramIterator share the same +// underlying Span slices. +// This causes a problem when a NH schema is modified (for example, during a Sum) then the other NHs with the same +// spans are also modified. As such, we need to create new Span slices for each NH. +// TODO(jhesketh): remove this if https://github.com/prometheus/prometheus/pull/14771 merges func applyWorkaroundForSharedSpanSlices(h *histogram.FloatHistogram) { - // v.memoizedIterator.PeekPrev() does not allow us to supply an existing histogram and instead creates one and returns it. - // The problem is that histograms with the same schema returned from chunkenc.floatHistogramIterator share the same - // underlying Span slices. - // This causes a problem when a NH schema is modified (for example, during a Sum) then the other NHs with the same - // spans are also modified. As such, we need to create new Span slices for each NH. - // TODO(jhesketh): remove this if https://github.com/prometheus/prometheus/pull/14771 merges - h.PositiveSpans = slices.Clone(h.PositiveSpans) h.NegativeSpans = slices.Clone(h.NegativeSpans) } diff --git a/pkg/streamingpromql/operators/instant_vector_selector_test.go b/pkg/streamingpromql/operators/instant_vector_selector_test.go index 46eb550e195..7263ec8e2fd 100644 --- a/pkg/streamingpromql/operators/instant_vector_selector_test.go +++ b/pkg/streamingpromql/operators/instant_vector_selector_test.go @@ -152,7 +152,7 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) { requireNotSame(t, points[0].H, points[2].H) }, }, - "different histograms have different spans": { + "different histograms should have different spans": { data: ` load 1m my_metric {{schema:0 sum:1 count:1 buckets:[1 0 1]}} {{schema:0 sum:3 count:2 buckets:[1 0 1]}} @@ -163,7 +163,7 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) { requireNotSame(t, points[0].H, points[1].H) }, }, - "successive histograms returned due to lookback, but refer to different histograms": { + "successive histograms returned due to lookback should create different histograms at each point": { data: ` load 30s my_metric _ {{schema:5 sum:10 count:7 buckets:[1 2 3 1]}} _ {{schema:5 sum:12 count:8 buckets:[1 2 3 2]}} _