diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index 8a5d031fb9..4992b171a4 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -218,7 +218,8 @@ func ClampFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracke SeriesMetadataFunction: functions.DropSeriesName, } - return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{min, max}, memoryConsumptionTracker, f, expressionPosition, timeRange), nil + o := functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{min, max}, memoryConsumptionTracker, f, expressionPosition, timeRange) + return operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker), nil } func ClampMinMaxFunctionOperatorFactory(functionName string, isMin bool) InstantVectorFunctionOperatorFactory { @@ -245,7 +246,8 @@ func ClampMinMaxFunctionOperatorFactory(functionName string, isMin bool) Instant SeriesMetadataFunction: functions.DropSeriesName, } - return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{clampTo}, memoryConsumptionTracker, f, expressionPosition, timeRange), nil + o := functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{clampTo}, memoryConsumptionTracker, f, expressionPosition, timeRange) + return operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker), nil } } @@ -277,7 +279,8 @@ func RoundFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracke SeriesMetadataFunction: functions.DropSeriesName, } - return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{toNearest}, memoryConsumptionTracker, f, expressionPosition, timeRange), nil + o := functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{toNearest}, memoryConsumptionTracker, f, expressionPosition, timeRange) + return operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker), nil } func HistogramQuantileFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { @@ -331,7 +334,8 @@ func HistogramFractionFunctionOperatorFactory(args []types.Operator, memoryConsu SeriesMetadataFunction: functions.DropSeriesName, } - return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{lower, upper}, memoryConsumptionTracker, f, expressionPosition, timeRange), nil + o := functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{lower, upper}, memoryConsumptionTracker, f, expressionPosition, timeRange) + return operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker), nil } // These functions return an instant-vector.