diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index 8a5d031fb9..638a2c1f10 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -76,6 +76,42 @@ func InstantVectorTransformationFunctionOperatorFactory(name string, seriesDataF return SingleInputVectorFunctionOperatorFactory(name, f) } +func TimeTransformationFunctionOperatorFactory(name string, seriesDataFunc functions.InstantVectorSeriesFunction) InstantVectorFunctionOperatorFactory { + f := functions.FunctionOverInstantVectorDefinition{ + SeriesDataFunc: seriesDataFunc, + SeriesMetadataFunction: functions.DropSeriesName, + } + + return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { + if len(args) > 1 { + // Should be caught by the PromQL parser, but we check here for safety. + return nil, fmt.Errorf("expected 0 or 1 argument for %s, got %v", name, len(args)) + } + + var inner types.InstantVectorOperator + if len(args) > 0 { + var ok bool + // time based function expect instant vector argument + inner, ok = args[0].(types.InstantVectorOperator) + if !ok { + // Should be caught by the PromQL parser, but we check here for safety. + return nil, fmt.Errorf("expected an instant vector argument for %s, got %T", name, args[0]) + } + + } else { + // if the argument is not provided, it will default to vector(time()) + inner = scalars.NewScalarToInstantVector(operators.NewTime(timeRange, memoryConsumptionTracker, expressionPosition), expressionPosition) + } + + var o types.InstantVectorOperator = functions.NewFunctionOverInstantVector(inner, nil, memoryConsumptionTracker, f, expressionPosition, timeRange) + if f.SeriesMetadataFunction.NeedsSeriesDeduplication { + o = operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker) + } + + return o, nil + } +} + // InstantVectorLabelManipulationFunctionOperatorFactory creates an InstantVectorFunctionOperator for functions // that have exactly 1 argument (v instant-vector), and need to manipulate the labels of // each series without manipulating the returned samples. @@ -353,6 +389,10 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe "cos": InstantVectorTransformationFunctionOperatorFactory("cos", functions.Cos), "cosh": InstantVectorTransformationFunctionOperatorFactory("cosh", functions.Cosh), "count_over_time": FunctionOverRangeVectorOperatorFactory("count_over_time", functions.CountOverTime), + "days_in_month": TimeTransformationFunctionOperatorFactory("days_in_month", functions.DaysInMonth), + "day_of_month": TimeTransformationFunctionOperatorFactory("day_of_month", functions.DayOfMonth), + "day_of_week": TimeTransformationFunctionOperatorFactory("day_of_week", functions.DayOfWeek), + "day_of_year": TimeTransformationFunctionOperatorFactory("day_of_year", functions.DayOfYear), "deg": InstantVectorTransformationFunctionOperatorFactory("deg", functions.Deg), "delta": FunctionOverRangeVectorOperatorFactory("delta", functions.Delta), "deriv": FunctionOverRangeVectorOperatorFactory("deriv", functions.Deriv), @@ -365,6 +405,7 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe "histogram_stddev": InstantVectorTransformationFunctionOperatorFactory("histogram_stddev", functions.HistogramStdDevStdVar(true)), "histogram_stdvar": InstantVectorTransformationFunctionOperatorFactory("histogram_stdvar", functions.HistogramStdDevStdVar(false)), "histogram_sum": InstantVectorTransformationFunctionOperatorFactory("histogram_sum", functions.HistogramSum), + "hour": TimeTransformationFunctionOperatorFactory("hour", functions.Hour), "idelta": FunctionOverRangeVectorOperatorFactory("idelta", functions.Idelta), "increase": FunctionOverRangeVectorOperatorFactory("increase", functions.Increase), "irate": FunctionOverRangeVectorOperatorFactory("irate", functions.Irate), @@ -375,6 +416,8 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe "log2": InstantVectorTransformationFunctionOperatorFactory("log2", functions.Log2), "max_over_time": FunctionOverRangeVectorOperatorFactory("max_over_time", functions.MaxOverTime), "min_over_time": FunctionOverRangeVectorOperatorFactory("min_over_time", functions.MinOverTime), + "minute": TimeTransformationFunctionOperatorFactory("minute", functions.Minute), + "month": TimeTransformationFunctionOperatorFactory("month", functions.Month), "present_over_time": FunctionOverRangeVectorOperatorFactory("present_over_time", functions.PresentOverTime), "rad": InstantVectorTransformationFunctionOperatorFactory("rad", functions.Rad), "rate": FunctionOverRangeVectorOperatorFactory("rate", functions.Rate), @@ -388,6 +431,7 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe "tan": InstantVectorTransformationFunctionOperatorFactory("tan", functions.Tan), "tanh": InstantVectorTransformationFunctionOperatorFactory("tanh", functions.Tanh), "vector": scalarToInstantVectorOperatorFactory, + "year": TimeTransformationFunctionOperatorFactory("year", functions.Year), } func RegisterInstantVectorFunctionOperatorFactory(functionName string, factory InstantVectorFunctionOperatorFactory) error { @@ -404,6 +448,7 @@ var scalarFunctionOperatorFactories = map[string]ScalarFunctionOperatorFactory{ // Please keep this list sorted alphabetically. "pi": piOperatorFactory, "scalar": instantVectorToScalarOperatorFactory, + "time": timeOperatorFactory, } func RegisterScalarFunctionOperatorFactory(functionName string, factory ScalarFunctionOperatorFactory) error { @@ -424,6 +469,15 @@ func piOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting return scalars.NewScalarConstant(math.Pi, timeRange, memoryConsumptionTracker, expressionPosition), nil } +func timeOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.ScalarOperator, error) { + if len(args) != 0 { + // Should be caught by the PromQL parser, but we check here for safety. + return nil, fmt.Errorf("expected exactly 0 arguments for time, got %v", len(args)) + } + + return operators.NewTime(timeRange, memoryConsumptionTracker, expressionPosition), nil +} + func instantVectorToScalarOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.ScalarOperator, error) { if len(args) != 1 { // Should be caught by the PromQL parser, but we check here for safety. diff --git a/pkg/streamingpromql/operators/functions/times.go b/pkg/streamingpromql/operators/functions/times.go new file mode 100644 index 0000000000..8b5b5cfbe1 --- /dev/null +++ b/pkg/streamingpromql/operators/functions/times.go @@ -0,0 +1,57 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package functions + +import ( + "time" + + "github.com/grafana/mimir/pkg/streamingpromql/limiting" + "github.com/grafana/mimir/pkg/streamingpromql/types" +) + +var DaysInMonth = timeWrapperFunc(func(t time.Time) float64 { + return float64(32 - time.Date(t.Year(), t.Month(), 32, 0, 0, 0, 0, time.UTC).Day()) +}) + +var DayOfMonth = timeWrapperFunc(func(t time.Time) float64 { + return float64(t.Day()) +}) + +var DayOfWeek = timeWrapperFunc(func(t time.Time) float64 { + return float64(t.Weekday()) +}) + +var DayOfYear = timeWrapperFunc(func(t time.Time) float64 { + return float64(t.YearDay()) +}) + +var Hour = timeWrapperFunc(func(t time.Time) float64 { + return float64(t.Hour()) +}) + +var Minute = timeWrapperFunc(func(t time.Time) float64 { + return float64(t.Minute()) +}) + +var Month = timeWrapperFunc(func(t time.Time) float64 { + return float64(t.Month()) +}) + +var Year = timeWrapperFunc(func(t time.Time) float64 { + return float64(t.Year()) +}) + +func timeWrapperFunc(f func(t time.Time) float64) InstantVectorSeriesFunction { + return func(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { + + // we don't do time based function on histograms + types.HPointSlicePool.Put(seriesData.Histograms, memoryConsumptionTracker) + seriesData.Histograms = nil + + for i := range seriesData.Floats { + t := time.Unix(int64(seriesData.Floats[i].F), 0).UTC() + seriesData.Floats[i].F = f(t) + } + return seriesData, nil + } +} diff --git a/pkg/streamingpromql/operators/time.go b/pkg/streamingpromql/operators/time.go new file mode 100644 index 0000000000..d322e53f7e --- /dev/null +++ b/pkg/streamingpromql/operators/time.go @@ -0,0 +1,58 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package operators + +import ( + "context" + + "github.com/prometheus/prometheus/promql/parser/posrange" + + "github.com/grafana/mimir/pkg/streamingpromql/limiting" + "github.com/grafana/mimir/pkg/streamingpromql/types" +) + +type Time struct { + TimeRange types.QueryTimeRange + MemoryConsumptionTracker *limiting.MemoryConsumptionTracker + expressionPosition posrange.PositionRange +} + +var _ types.ScalarOperator = &Time{} + +func NewTime( + timeRange types.QueryTimeRange, + memoryConsumptionTracker *limiting.MemoryConsumptionTracker, + expressionPosition posrange.PositionRange, +) *Time { + return &Time{ + TimeRange: timeRange, + MemoryConsumptionTracker: memoryConsumptionTracker, + expressionPosition: expressionPosition, + } +} + +func (s *Time) GetValues(_ context.Context) (types.ScalarData, error) { + samples, err := types.FPointSlicePool.Get(s.TimeRange.StepCount, s.MemoryConsumptionTracker) + + if err != nil { + return types.ScalarData{}, err + } + + samples = samples[:s.TimeRange.StepCount] + + for step := 0; step < s.TimeRange.StepCount; step++ { + t := s.TimeRange.StartT + int64(step)*s.TimeRange.IntervalMilliseconds + samples[step].T = t + samples[step].F = float64(t) / 1000 + } + + return types.ScalarData{Samples: samples}, nil +} + +func (s *Time) ExpressionPosition() posrange.PositionRange { + return s.expressionPosition +} + +func (s *Time) Close() { + // Nothing to do. +} diff --git a/pkg/streamingpromql/testdata/ours/functions.test b/pkg/streamingpromql/testdata/ours/functions.test index fb0ebaef5a..8f0ca59e9e 100644 --- a/pkg/streamingpromql/testdata/ours/functions.test +++ b/pkg/streamingpromql/testdata/ours/functions.test @@ -682,3 +682,143 @@ eval range from 0 to 7m step 1m last_over_time(some_metric_count[3m]) some_metric_count{env="prod", cluster="eu"} _ _ _ 0 1 2 3 4 some_metric_count{env="prod", cluster="us"} _ _ _ 0 2 4 6 8 some_metric_count{env="prod", cluster="au"} _ _ _ {{count:5}} {{count:10}} {{count:15}} {{count:20}} {{count:25}} + +# Test time-related functions. +load 5m + histogram_sample {{schema:0 sum:1 count:1}} + +eval range from 0 to 7m step 1m year() + {} 1970 1970 1970 1970 1970 1970 1970 1970 + +eval range from 0 to 7m step 1m time() + {} 0 60 120 180 240 300 360 420 + +# 2006-01-02 22:04:05 +eval range from 0 to 7m step 1m year(vector(1136239445)) + {} 2006 2006 2006 2006 2006 2006 2006 2006 + +eval range from 0 to 7m step 1m month() + {} 1 1 1 1 1 1 1 1 + +# 2006-01-02 22:04:05 +eval range from 0 to 7m step 1m month(vector(1136239445)) + {} 1 1 1 1 1 1 1 1 + +eval range from 0 to 7m step 1m day_of_month() + {} 1 1 1 1 1 1 1 1 + +# 2006-01-02 22:04:05 +eval range from 0 to 7m step 1m day_of_month(vector(1136239445)) + {} 2 2 2 2 2 2 2 2 + +eval range from 0 to 7m step 1m day_of_year() + {} 1 1 1 1 1 1 1 1 + +# 2006-01-02 22:04:05 +eval range from 0 to 7m step 1m day_of_year(vector(1136239445)) + {} 2 2 2 2 2 2 2 2 + +# Thursday. +eval range from 0 to 7m step 1m day_of_week() + {} 4 4 4 4 4 4 4 4 + +# 2006-01-02 22:04:05 +eval range from 0 to 7m step 1m day_of_week(vector(1136239445)) + {} 1 1 1 1 1 1 1 1 + +eval range from 0 to 7m step 1m hour() + {} 0 0 0 0 0 0 0 0 + +# 2006-01-02 22:04:05 +eval range from 0 to 7m step 1m hour(vector(1136239445)) + {} 22 22 22 22 22 22 22 22 + +eval range from 0 to 7m step 1m minute() + {} 0 1 2 3 4 5 6 7 + +# 2006-01-02 22:04:05 +eval range from 0 to 7m step 1m minute(vector(1136239445)) + {} 4 4 4 4 4 4 4 4 + +# 2008-12-31 23:59:59 just before leap second. +eval range from 0 to 7m step 1m year(vector(1230767999)) + {} 2008 2008 2008 2008 2008 2008 2008 2008 + +# 2009-01-01 00:00:00 just after leap second. +eval range from 0 to 7m step 1m year(vector(1230768000)) + {} 2009 2009 2009 2009 2009 2009 2009 2009 + +# 2016-02-29 23:59:59 February 29th in leap year. +eval range from 0 to 7m step 1m month(vector(1456790399)) + day_of_month(vector(1456790399)) / 100 + {} 2.29 2.29 2.29 2.29 2.29 2.29 2.29 2.29 + +# 2016-03-01 00:00:00 March 1st in leap year. +eval range from 0 to 7m step 1m month(vector(1456790400)) + day_of_month(vector(1456790400)) / 100 + {} 3.01 3.01 3.01 3.01 3.01 3.01 3.01 3.01 + +# 2016-12-31 13:37:00 366th day in leap year. +eval range from 0 to 7m step 1m day_of_year(vector(1483191420)) + {} 366 366 366 366 366 366 366 366 + +# 2022-12-31 13:37:00 365th day in non-leap year. +eval range from 0 to 7m step 1m day_of_year(vector(1672493820)) + {} 365 365 365 365 365 365 365 365 + +# February 1st 2016 in leap year. +eval range from 0 to 7m step 1m days_in_month(vector(1454284800)) + {} 29 29 29 29 29 29 29 29 + +# February 1st 2017 not in leap year. +eval range from 0 to 7m step 1m days_in_month(vector(1485907200)) + {} 28 28 28 28 28 28 28 28 + +# Test for histograms. +eval range from 0 to 7m step 1m day_of_month(histogram_sample) + +eval range from 0 to 7m step 1m day_of_week(histogram_sample) + +eval range from 0 to 7m step 1m day_of_year(histogram_sample) + +eval range from 0 to 7m step 1m days_in_month(histogram_sample) + +eval range from 0 to 7m step 1m hour(histogram_sample) + +eval range from 0 to 7m step 1m minute(histogram_sample) + +eval range from 0 to 7m step 1m month(histogram_sample) + +eval range from 0 to 7m step 1m year(histogram_sample) + +clear + +# Test time with custom metric +# 1136239445: 2006-01-02 22:04:05 +# 1485907200: February 1st 2017 not in leap year. +# 1672493820: 2022-12-31 13:37:00 365th day in non-leap year. +load 1m + some_metric{case="series1"} 0+1136239445x7 + another_metric{case="series1"} 0+536254375x7 + some_metric{case="series2"} 0+1672493820x7 + some_metric{case="gap1"} _ 1136239445 _ 1485907200 _ 1672493820 + some_metric{case="gap2"} 1136239445 _ _ 1485907200 _ _ 1672493820 + some_metric{case="histogram-gap"} _ {{count:5}} _ {{count:5}} + some_metric{case="histogram-float"} 1136239445 {{count:5}} 1485907200 {{count:5}} + +eval range from 0 to 7m step 1m year(some_metric) + {case="gap1"} _ 2006 2006 2017 2017 2022 2022 2022 + {case="gap2"} 2006 2006 2006 2017 2017 2017 2022 2022 + {case="histogram-float"} 2006 2006 2017 + {case="series1"} 1970 2006 2042 2078 2114 2150 2186 2222 + {case="series2"} 1970 2022 2075 2128 2181 2234 2287 2340 + +eval range from 0 to 7m step 1m month(some_metric) + {case="gap1"} _ 1 1 2 2 12 12 12 + {case="gap2"} 1 1 1 2 2 2 12 12 + {case="histogram-float"} 1 1 2 + {case="series1"} 1 1 1 1 1 1 1 1 + {case="series2"} 1 12 12 12 12 12 12 12 + +eval range from 0 to 7m step 1m year(some_metric{case="series1"} + another_metric{case="series1"}) + {case="series1"} 1970 2022 2075 2128 2181 2234 2287 2340 + +clear diff --git a/pkg/streamingpromql/testdata/upstream/functions.test b/pkg/streamingpromql/testdata/upstream/functions.test index baca7082bc..36b97e1466 100644 --- a/pkg/streamingpromql/testdata/upstream/functions.test +++ b/pkg/streamingpromql/testdata/upstream/functions.test @@ -503,17 +503,14 @@ clear eval instant at 0m vector(1) {} 1 -# Unsupported by streaming engine. -# eval instant at 0s vector(time()) -# {} 0 +eval instant at 0s vector(time()) + {} 0 -# Unsupported by streaming engine. -# eval instant at 5s vector(time()) -# {} 5 +eval instant at 5s vector(time()) + {} 5 -# Unsupported by streaming engine. -# eval instant at 60m vector(time()) -# {} 3600 +eval instant at 60m vector(time()) + {} 3600 # Tests for clamp_max, clamp_min(), and clamp(). @@ -1112,135 +1109,103 @@ clear load 5m histogram_sample {{schema:0 sum:1 count:1}} -# Unsupported by streaming engine. -# eval instant at 0m year() -# {} 1970 +eval instant at 0m year() + {} 1970 -# Unsupported by streaming engine. -# eval instant at 1ms time() -# 0.001 +eval instant at 1ms time() + 0.001 -# Unsupported by streaming engine. -# eval instant at 50m time() -# 3000 +eval instant at 50m time() + 3000 -# Unsupported by streaming engine. -# eval instant at 0m year(vector(1136239445)) -# {} 2006 +eval instant at 0m year(vector(1136239445)) + {} 2006 -# Unsupported by streaming engine. -# eval instant at 0m month() -# {} 1 +eval instant at 0m month() + {} 1 -# Unsupported by streaming engine. -# eval instant at 0m month(vector(1136239445)) -# {} 1 +eval instant at 0m month(vector(1136239445)) + {} 1 -# Unsupported by streaming engine. -# eval instant at 0m day_of_month() -# {} 1 +eval instant at 0m day_of_month() + {} 1 -# Unsupported by streaming engine. -# eval instant at 0m day_of_month(vector(1136239445)) -# {} 2 +eval instant at 0m day_of_month(vector(1136239445)) + {} 2 -# Unsupported by streaming engine. -# eval instant at 0m day_of_year() -# {} 1 +eval instant at 0m day_of_year() + {} 1 -# Unsupported by streaming engine. -# eval instant at 0m day_of_year(vector(1136239445)) -# {} 2 +eval instant at 0m day_of_year(vector(1136239445)) + {} 2 # Thursday. -# Unsupported by streaming engine. -# eval instant at 0m day_of_week() -# {} 4 +eval instant at 0m day_of_week() + {} 4 -# Unsupported by streaming engine. -# eval instant at 0m day_of_week(vector(1136239445)) -# {} 1 +eval instant at 0m day_of_week(vector(1136239445)) + {} 1 -# Unsupported by streaming engine. -# eval instant at 0m hour() -# {} 0 +eval instant at 0m hour() + {} 0 -# Unsupported by streaming engine. -# eval instant at 0m hour(vector(1136239445)) -# {} 22 +eval instant at 0m hour(vector(1136239445)) + {} 22 -# Unsupported by streaming engine. -# eval instant at 0m minute() -# {} 0 +eval instant at 0m minute() + {} 0 -# Unsupported by streaming engine. -# eval instant at 0m minute(vector(1136239445)) -# {} 4 +eval instant at 0m minute(vector(1136239445)) + {} 4 # 2008-12-31 23:59:59 just before leap second. -# Unsupported by streaming engine. -# eval instant at 0m year(vector(1230767999)) -# {} 2008 +eval instant at 0m year(vector(1230767999)) + {} 2008 # 2009-01-01 00:00:00 just after leap second. -# Unsupported by streaming engine. -# eval instant at 0m year(vector(1230768000)) -# {} 2009 +eval instant at 0m year(vector(1230768000)) + {} 2009 # 2016-02-29 23:59:59 February 29th in leap year. -# Unsupported by streaming engine. -# eval instant at 0m month(vector(1456790399)) + day_of_month(vector(1456790399)) / 100 -# {} 2.29 +eval instant at 0m month(vector(1456790399)) + day_of_month(vector(1456790399)) / 100 + {} 2.29 # 2016-03-01 00:00:00 March 1st in leap year. -# Unsupported by streaming engine. -# eval instant at 0m month(vector(1456790400)) + day_of_month(vector(1456790400)) / 100 -# {} 3.01 +eval instant at 0m month(vector(1456790400)) + day_of_month(vector(1456790400)) / 100 + {} 3.01 # 2016-12-31 13:37:00 366th day in leap year. -# Unsupported by streaming engine. -# eval instant at 0m day_of_year(vector(1483191420)) -# {} 366 +eval instant at 0m day_of_year(vector(1483191420)) + {} 366 # 2022-12-31 13:37:00 365th day in non-leap year. -# Unsupported by streaming engine. -# eval instant at 0m day_of_year(vector(1672493820)) -# {} 365 +eval instant at 0m day_of_year(vector(1672493820)) + {} 365 # February 1st 2016 in leap year. -# Unsupported by streaming engine. -# eval instant at 0m days_in_month(vector(1454284800)) -# {} 29 +eval instant at 0m days_in_month(vector(1454284800)) + {} 29 # February 1st 2017 not in leap year. -# Unsupported by streaming engine. -# eval instant at 0m days_in_month(vector(1485907200)) -# {} 28 +eval instant at 0m days_in_month(vector(1485907200)) + {} 28 # Test for histograms. -# Unsupported by streaming engine. -# eval instant at 0m day_of_month(histogram_sample) +eval instant at 0m day_of_month(histogram_sample) -# Unsupported by streaming engine. -# eval instant at 0m day_of_week(histogram_sample) +eval instant at 0m day_of_week(histogram_sample) -# Unsupported by streaming engine. -# eval instant at 0m day_of_year(histogram_sample) +eval instant at 0m day_of_year(histogram_sample) -# Unsupported by streaming engine. -# eval instant at 0m days_in_month(histogram_sample) +eval instant at 0m days_in_month(histogram_sample) -# Unsupported by streaming engine. -# eval instant at 0m hour(histogram_sample) +eval instant at 0m hour(histogram_sample) -# Unsupported by streaming engine. -# eval instant at 0m minute(histogram_sample) +eval instant at 0m minute(histogram_sample) -# Unsupported by streaming engine. -# eval instant at 0m month(histogram_sample) +eval instant at 0m month(histogram_sample) -# Unsupported by streaming engine. -# eval instant at 0m year(histogram_sample) +eval instant at 0m year(histogram_sample) clear