From ec0d6f58b74d0b81eeac575362bbd631089f8765 Mon Sep 17 00:00:00 2001
From: Charles Korn <charles.korn@grafana.com>
Date: Wed, 29 Jan 2025 17:05:09 +1100
Subject: [PATCH] Add failing test

---
 pkg/streamingpromql/functions_test.go | 99 +++++++++++++++++++++++++++
 1 file changed, 99 insertions(+)

diff --git a/pkg/streamingpromql/functions_test.go b/pkg/streamingpromql/functions_test.go
index 6db5b4385f2..761a15e591a 100644
--- a/pkg/streamingpromql/functions_test.go
+++ b/pkg/streamingpromql/functions_test.go
@@ -3,10 +3,18 @@
 package streamingpromql
 
 import (
+	"context"
+	"fmt"
+	"strings"
 	"testing"
+	"time"
 
+	"github.com/go-kit/log"
+	"github.com/prometheus/prometheus/model/timestamp"
+	"github.com/prometheus/prometheus/promql/promqltest"
 	"github.com/stretchr/testify/require"
 
+	"github.com/grafana/mimir/pkg/querier/stats"
 	"github.com/grafana/mimir/pkg/streamingpromql/operators/functions"
 )
 
@@ -50,3 +58,94 @@ func TestRegisterScalarFunctionOperatorFactory(t *testing.T) {
 	// Cleanup changes to instantVectorFunctionOperatorFactories
 	delete(scalarFunctionOperatorFactories, "new_function")
 }
+
+// This test ensures that all functions correctly merge series after dropping the metric name.
+func TestFunctionDeduplicateAndMerge(t *testing.T) {
+	data := `
+		load 30s
+			float_a{env="prod"}      _   0 1                       _ _   _ _   _ _   _ _   _ _   _ _
+			float_b{env="prod"}      _   _ _                       _ _   _ _   _ _   _ _   _ _   8 9
+            histogram_a{env="prod"}  _   {{count:0}} {{count:1}}   _ _   _ _   _ _   _ _   _ _   _ _
+            histogram_b{env="prod"}  _   _ _                       _ _   _ _   _ _   _ _   _ _   {{count:8}} {{count:9}}
+	`
+
+	storage := promqltest.LoadedStorage(t, data)
+	opts := NewTestEngineOpts()
+	engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger())
+	require.NoError(t, err)
+
+	ctx := context.Background()
+	start := timestamp.Time(0).Add(time.Minute)
+	end := timestamp.Time(0).Add(7 * time.Minute)
+	step := time.Minute
+
+	preSelectorFunctionArgs := map[string]string{
+		"histogram_quantile": "0.1",
+		"histogram_fraction": "0, 0.1",
+	}
+
+	postSelectorFunctionArgs := map[string]string{
+		"clamp":         "-Inf, Inf",
+		"clamp_min":     "-Inf",
+		"clamp_max":     "Inf",
+		"label_replace": `"__name__", "$1", "env", "(.*)"`,
+	}
+
+	for name := range instantVectorFunctionOperatorFactories {
+		if name == "vector" || name == "last_over_time" {
+			// This test doesn't apply to vector() because it takes a scalar parameter.
+			// This test doesn't apply to last_over_time() because it doesn't drop the metric name.
+			continue
+		}
+
+		t.Run(name, func(t *testing.T) {
+			metricType := "float"
+
+			if strings.HasPrefix(name, "histogram_") {
+				metricType = "histogram"
+			}
+
+			selector := fmt.Sprintf(`{__name__=~"%s.*"}`, metricType)
+
+			if isFunctionOverRangeVector(name) {
+				selector = selector + "[1m]"
+			}
+
+			preSelectorArgs := preSelectorFunctionArgs[name]
+			if preSelectorArgs != "" {
+				preSelectorArgs = preSelectorArgs + ", "
+			}
+
+			postSelectorArgs := postSelectorFunctionArgs[name]
+			if postSelectorArgs != "" {
+				postSelectorArgs = ", " + postSelectorArgs
+			}
+
+			expr := fmt.Sprintf("%s(%s%s%s)", name, preSelectorArgs, selector, postSelectorArgs)
+
+			q, err := engine.NewRangeQuery(ctx, storage, nil, expr, start, end, step)
+			require.NoError(t, err)
+			defer q.Close()
+
+			mimirResult := q.Exec(ctx)
+			require.NoError(t, mimirResult.Err)
+			m, err := mimirResult.Matrix()
+			require.NoError(t, err)
+
+			require.Len(t, m, 1, "expected a single series")
+		})
+	}
+}
+
+func isFunctionOverRangeVector(name string) bool {
+	if strings.HasSuffix(name, "_over_time") {
+		return true
+	}
+
+	switch name {
+	case "changes", "delta", "deriv", "idelta", "increase", "irate", "rate", "resets":
+		return true
+	default:
+		return false
+	}
+}