From ecff3156d20aea4a5264c8251091dc15778bf389 Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Mon, 6 Jan 2025 11:50:11 +0800 Subject: [PATCH 01/26] Implement time functions Signed-off-by: Jon Kartago Lamida --- pkg/streamingpromql/functions.go | 1 + pkg/streamingpromql/operators/functions/common.go | 15 +++++++++++++++ 2 files changed, 16 insertions(+) diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index 8a5d031fb9c..1c8c9620a89 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -387,6 +387,7 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe "sum_over_time": FunctionOverRangeVectorOperatorFactory("sum_over_time", functions.SumOverTime), "tan": InstantVectorTransformationFunctionOperatorFactory("tan", functions.Tan), "tanh": InstantVectorTransformationFunctionOperatorFactory("tanh", functions.Tanh), + "time": InstantVectorTransformationFunctionOperatorFactory("time", functions.Time), "vector": scalarToInstantVectorOperatorFactory, } diff --git a/pkg/streamingpromql/operators/functions/common.go b/pkg/streamingpromql/operators/functions/common.go index 730aff89f26..d106b816a46 100644 --- a/pkg/streamingpromql/operators/functions/common.go +++ b/pkg/streamingpromql/operators/functions/common.go @@ -4,11 +4,16 @@ package functions import ( "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/promql" "github.com/grafana/mimir/pkg/streamingpromql/limiting" "github.com/grafana/mimir/pkg/streamingpromql/types" ) +var Time = floatTransformationSampleFunc(func(sample promql.FPoint) float64 { + return float64(sample.T) / 1000 +}) + // SeriesMetadataFunction is a function to operate on the metadata across series. type SeriesMetadataFunction func(seriesMetadata []types.SeriesMetadata, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) ([]types.SeriesMetadata, error) @@ -27,6 +32,16 @@ var DropSeriesName = SeriesMetadataFunctionDefinition{ // InstantVectorSeriesFunction is a function that takes in an instant vector and produces an instant vector. type InstantVectorSeriesFunction func(seriesData types.InstantVectorSeriesData, scalarArgsData []types.ScalarData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) +// floatTransformationSampleFunc is not needed elsewhere, so it is not exported yet +func floatTransformationSampleFunc(transform func(sample promql.FPoint) float64) InstantVectorSeriesFunction { + return func(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ types.QueryTimeRange, _ *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { + for i := range seriesData.Floats { + seriesData.Floats[i].F = transform(seriesData.Floats[i]) + } + return seriesData, nil + } +} + // floatTransformationFunc is not needed elsewhere, so it is not exported yet func floatTransformationFunc(transform func(f float64) float64) InstantVectorSeriesFunction { return func(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ types.QueryTimeRange, _ *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { From 7389d109303db0be3e55dca66d013c869db36f9f Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Tue, 7 Jan 2025 00:33:11 +0800 Subject: [PATCH 02/26] Add more times function Signed-off-by: Jon Kartago Lamida --- pkg/streamingpromql/functions.go | 8 +++ .../operators/functions/common.go | 15 ----- .../operators/functions/times.go | 61 +++++++++++++++++++ 3 files changed, 69 insertions(+), 15 deletions(-) create mode 100644 pkg/streamingpromql/operators/functions/times.go diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index 1c8c9620a89..99101870332 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -353,6 +353,10 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe "cos": InstantVectorTransformationFunctionOperatorFactory("cos", functions.Cos), "cosh": InstantVectorTransformationFunctionOperatorFactory("cosh", functions.Cosh), "count_over_time": FunctionOverRangeVectorOperatorFactory("count_over_time", functions.CountOverTime), + "days_in_month": InstantVectorTransformationFunctionOperatorFactory("days_in_month", functions.DaysInMonth), + "day_of_month": InstantVectorTransformationFunctionOperatorFactory("day_of_month", functions.DayOfMonth), + "day_of_week": InstantVectorTransformationFunctionOperatorFactory("day_of_week", functions.DayOfWeek), + "day_of_year": InstantVectorTransformationFunctionOperatorFactory("day_of_year", functions.DayOfYear), "deg": InstantVectorTransformationFunctionOperatorFactory("deg", functions.Deg), "delta": FunctionOverRangeVectorOperatorFactory("delta", functions.Delta), "deriv": FunctionOverRangeVectorOperatorFactory("deriv", functions.Deriv), @@ -365,6 +369,7 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe "histogram_stddev": InstantVectorTransformationFunctionOperatorFactory("histogram_stddev", functions.HistogramStdDevStdVar(true)), "histogram_stdvar": InstantVectorTransformationFunctionOperatorFactory("histogram_stdvar", functions.HistogramStdDevStdVar(false)), "histogram_sum": InstantVectorTransformationFunctionOperatorFactory("histogram_sum", functions.HistogramSum), + "hour": InstantVectorTransformationFunctionOperatorFactory("hour", functions.Hour), "idelta": FunctionOverRangeVectorOperatorFactory("idelta", functions.Idelta), "increase": FunctionOverRangeVectorOperatorFactory("increase", functions.Increase), "irate": FunctionOverRangeVectorOperatorFactory("irate", functions.Irate), @@ -375,6 +380,8 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe "log2": InstantVectorTransformationFunctionOperatorFactory("log2", functions.Log2), "max_over_time": FunctionOverRangeVectorOperatorFactory("max_over_time", functions.MaxOverTime), "min_over_time": FunctionOverRangeVectorOperatorFactory("min_over_time", functions.MinOverTime), + "minute": InstantVectorTransformationFunctionOperatorFactory("minute", functions.Minute), + "month": InstantVectorTransformationFunctionOperatorFactory("month", functions.Month), "present_over_time": FunctionOverRangeVectorOperatorFactory("present_over_time", functions.PresentOverTime), "rad": InstantVectorTransformationFunctionOperatorFactory("rad", functions.Rad), "rate": FunctionOverRangeVectorOperatorFactory("rate", functions.Rate), @@ -389,6 +396,7 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe "tanh": InstantVectorTransformationFunctionOperatorFactory("tanh", functions.Tanh), "time": InstantVectorTransformationFunctionOperatorFactory("time", functions.Time), "vector": scalarToInstantVectorOperatorFactory, + "year": InstantVectorTransformationFunctionOperatorFactory("year", functions.Year), } func RegisterInstantVectorFunctionOperatorFactory(functionName string, factory InstantVectorFunctionOperatorFactory) error { diff --git a/pkg/streamingpromql/operators/functions/common.go b/pkg/streamingpromql/operators/functions/common.go index d106b816a46..730aff89f26 100644 --- a/pkg/streamingpromql/operators/functions/common.go +++ b/pkg/streamingpromql/operators/functions/common.go @@ -4,16 +4,11 @@ package functions import ( "github.com/prometheus/prometheus/model/histogram" - "github.com/prometheus/prometheus/promql" "github.com/grafana/mimir/pkg/streamingpromql/limiting" "github.com/grafana/mimir/pkg/streamingpromql/types" ) -var Time = floatTransformationSampleFunc(func(sample promql.FPoint) float64 { - return float64(sample.T) / 1000 -}) - // SeriesMetadataFunction is a function to operate on the metadata across series. type SeriesMetadataFunction func(seriesMetadata []types.SeriesMetadata, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) ([]types.SeriesMetadata, error) @@ -32,16 +27,6 @@ var DropSeriesName = SeriesMetadataFunctionDefinition{ // InstantVectorSeriesFunction is a function that takes in an instant vector and produces an instant vector. type InstantVectorSeriesFunction func(seriesData types.InstantVectorSeriesData, scalarArgsData []types.ScalarData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) -// floatTransformationSampleFunc is not needed elsewhere, so it is not exported yet -func floatTransformationSampleFunc(transform func(sample promql.FPoint) float64) InstantVectorSeriesFunction { - return func(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ types.QueryTimeRange, _ *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { - for i := range seriesData.Floats { - seriesData.Floats[i].F = transform(seriesData.Floats[i]) - } - return seriesData, nil - } -} - // floatTransformationFunc is not needed elsewhere, so it is not exported yet func floatTransformationFunc(transform func(f float64) float64) InstantVectorSeriesFunction { return func(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ types.QueryTimeRange, _ *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { diff --git a/pkg/streamingpromql/operators/functions/times.go b/pkg/streamingpromql/operators/functions/times.go new file mode 100644 index 00000000000..438ae9de41b --- /dev/null +++ b/pkg/streamingpromql/operators/functions/times.go @@ -0,0 +1,61 @@ +package functions + +import ( + "time" + + "github.com/grafana/mimir/pkg/streamingpromql/limiting" + "github.com/grafana/mimir/pkg/streamingpromql/types" +) + +var Time = timeFunc() + +var DaysInMonth = dayWrapperFunc(func(t time.Time) float64 { + return float64(32 - time.Date(t.Year(), t.Month(), 32, 0, 0, 0, 0, time.UTC).Day()) +}) + +var DayOfMonth = dayWrapperFunc(func(t time.Time) float64 { + return float64(t.Day()) +}) + +var DayOfWeek = dayWrapperFunc(func(t time.Time) float64 { + return float64(t.Weekday()) +}) + +var DayOfYear = dayWrapperFunc(func(t time.Time) float64 { + return float64(t.YearDay()) +}) + +var Hour = dayWrapperFunc(func(t time.Time) float64 { + return float64(t.Hour()) +}) + +var Minute = dayWrapperFunc(func(t time.Time) float64 { + return float64(t.Minute()) +}) + +var Month = dayWrapperFunc(func(t time.Time) float64 { + return float64(t.Minute()) +}) + +var Year = dayWrapperFunc(func(t time.Time) float64 { + return float64(t.Year()) +}) + +func timeFunc() InstantVectorSeriesFunction { + return func(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, qtr types.QueryTimeRange, _ *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { + for i := range seriesData.Floats { + seriesData.Floats[i].F = float64(qtr.StartT) / 1000 + } + return seriesData, nil + } +} + +func dayWrapperFunc(f func(t time.Time) float64) InstantVectorSeriesFunction { + return func(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ types.QueryTimeRange, _ *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { + for i := range seriesData.Floats { + t := time.Unix(int64(seriesData.Floats[i].F), 0).UTC() + seriesData.Floats[i].F = f(t) + } + return seriesData, nil + } +} From a71e5552d3ccaebe9bf70a14c52ac8e8feda8767 Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Fri, 17 Jan 2025 03:09:46 +0800 Subject: [PATCH 03/26] Try restructuring the functions Signed-off-by: Jon Kartago Lamida --- pkg/streamingpromql/functions.go | 62 +++++-- .../operators/functions/times.go | 151 ++++++++++++++---- .../testdata/upstream/functions.test | 10 +- 3 files changed, 171 insertions(+), 52 deletions(-) diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index 99101870332..6b61325f07b 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -5,6 +5,9 @@ package streamingpromql import ( "fmt" "math" + "time" + + //"time" "github.com/prometheus/prometheus/promql/parser/posrange" "github.com/prometheus/prometheus/util/annotations" @@ -32,6 +35,15 @@ type ScalarFunctionOperatorFactory func( timeRange types.QueryTimeRange, ) (types.ScalarOperator, error) +type TimeFunctionOperatorFactory func( + wrapper func(time time.Time) float64, + args []types.Operator, + memoryConsumptionTracker *limiting.MemoryConsumptionTracker, + annotations *annotations.Annotations, + expressionPosition posrange.PositionRange, + timeRange types.QueryTimeRange, +) (types.ScalarOperator, error) + // SingleInputVectorFunctionOperatorFactory creates an InstantVectorFunctionOperatorFactory for functions // that have exactly 1 argument (v instant-vector). // @@ -353,10 +365,6 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe "cos": InstantVectorTransformationFunctionOperatorFactory("cos", functions.Cos), "cosh": InstantVectorTransformationFunctionOperatorFactory("cosh", functions.Cosh), "count_over_time": FunctionOverRangeVectorOperatorFactory("count_over_time", functions.CountOverTime), - "days_in_month": InstantVectorTransformationFunctionOperatorFactory("days_in_month", functions.DaysInMonth), - "day_of_month": InstantVectorTransformationFunctionOperatorFactory("day_of_month", functions.DayOfMonth), - "day_of_week": InstantVectorTransformationFunctionOperatorFactory("day_of_week", functions.DayOfWeek), - "day_of_year": InstantVectorTransformationFunctionOperatorFactory("day_of_year", functions.DayOfYear), "deg": InstantVectorTransformationFunctionOperatorFactory("deg", functions.Deg), "delta": FunctionOverRangeVectorOperatorFactory("delta", functions.Delta), "deriv": FunctionOverRangeVectorOperatorFactory("deriv", functions.Deriv), @@ -369,7 +377,6 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe "histogram_stddev": InstantVectorTransformationFunctionOperatorFactory("histogram_stddev", functions.HistogramStdDevStdVar(true)), "histogram_stdvar": InstantVectorTransformationFunctionOperatorFactory("histogram_stdvar", functions.HistogramStdDevStdVar(false)), "histogram_sum": InstantVectorTransformationFunctionOperatorFactory("histogram_sum", functions.HistogramSum), - "hour": InstantVectorTransformationFunctionOperatorFactory("hour", functions.Hour), "idelta": FunctionOverRangeVectorOperatorFactory("idelta", functions.Idelta), "increase": FunctionOverRangeVectorOperatorFactory("increase", functions.Increase), "irate": FunctionOverRangeVectorOperatorFactory("irate", functions.Irate), @@ -380,8 +387,6 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe "log2": InstantVectorTransformationFunctionOperatorFactory("log2", functions.Log2), "max_over_time": FunctionOverRangeVectorOperatorFactory("max_over_time", functions.MaxOverTime), "min_over_time": FunctionOverRangeVectorOperatorFactory("min_over_time", functions.MinOverTime), - "minute": InstantVectorTransformationFunctionOperatorFactory("minute", functions.Minute), - "month": InstantVectorTransformationFunctionOperatorFactory("month", functions.Month), "present_over_time": FunctionOverRangeVectorOperatorFactory("present_over_time", functions.PresentOverTime), "rad": InstantVectorTransformationFunctionOperatorFactory("rad", functions.Rad), "rate": FunctionOverRangeVectorOperatorFactory("rate", functions.Rate), @@ -394,9 +399,7 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe "sum_over_time": FunctionOverRangeVectorOperatorFactory("sum_over_time", functions.SumOverTime), "tan": InstantVectorTransformationFunctionOperatorFactory("tan", functions.Tan), "tanh": InstantVectorTransformationFunctionOperatorFactory("tanh", functions.Tanh), - "time": InstantVectorTransformationFunctionOperatorFactory("time", functions.Time), "vector": scalarToInstantVectorOperatorFactory, - "year": InstantVectorTransformationFunctionOperatorFactory("year", functions.Year), } func RegisterInstantVectorFunctionOperatorFactory(functionName string, factory InstantVectorFunctionOperatorFactory) error { @@ -411,8 +414,17 @@ func RegisterInstantVectorFunctionOperatorFactory(functionName string, factory I // These functions return a scalar. var scalarFunctionOperatorFactories = map[string]ScalarFunctionOperatorFactory{ // Please keep this list sorted alphabetically. - "pi": piOperatorFactory, - "scalar": instantVectorToScalarOperatorFactory, + "days_in_month": newTimeAdapter(functions.DaysInMonth).wrapperTimeOperatorFactory, + "day_of_month": newTimeAdapter(functions.DayOfMonth).wrapperTimeOperatorFactory, + "day_of_week": newTimeAdapter(functions.DayOfWeek).wrapperTimeOperatorFactory, + "day_of_year": newTimeAdapter(functions.DayOfYear).wrapperTimeOperatorFactory, + "hour": newTimeAdapter(functions.Hour).wrapperTimeOperatorFactory, + "minute": newTimeAdapter(functions.Minute).wrapperTimeOperatorFactory, + "month": newTimeAdapter(functions.Month).wrapperTimeOperatorFactory, + "pi": piOperatorFactory, + "scalar": instantVectorToScalarOperatorFactory, + "time": timeOperatorFactory, + "year": newTimeAdapter(functions.Year).wrapperTimeOperatorFactory, } func RegisterScalarFunctionOperatorFactory(functionName string, factory ScalarFunctionOperatorFactory) error { @@ -433,6 +445,34 @@ func piOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting return scalars.NewScalarConstant(math.Pi, timeRange, memoryConsumptionTracker, expressionPosition), nil } +func timeOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.ScalarOperator, error) { + if len(args) != 0 { + // Should be caught by the PromQL parser, but we check here for safety. + return nil, fmt.Errorf("expected exactly 0 arguments for time, got %v", len(args)) + } + + return functions.NewScalarTime(timeRange, memoryConsumptionTracker, expressionPosition), nil +} + +type adapter struct { + wrapperFunc func(time time.Time) float64 +} + +func newTimeAdapter(wrapperFunc func(time time.Time) float64) adapter { + return adapter{ + wrapperFunc: wrapperFunc, + } +} + +func (a adapter) wrapperTimeOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.ScalarOperator, error) { + if len(args) != 0 { + // Should be caught by the PromQL parser, but we check here for safety. + return nil, fmt.Errorf("expected exactly 0 arguments for time, got %v", len(args)) + } + + return functions.NewWrapperTime(a.wrapperFunc, timeRange, memoryConsumptionTracker, expressionPosition), nil +} + func instantVectorToScalarOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.ScalarOperator, error) { if len(args) != 1 { // Should be caught by the PromQL parser, but we check here for safety. diff --git a/pkg/streamingpromql/operators/functions/times.go b/pkg/streamingpromql/operators/functions/times.go index 438ae9de41b..f8aa753e850 100644 --- a/pkg/streamingpromql/operators/functions/times.go +++ b/pkg/streamingpromql/operators/functions/times.go @@ -1,61 +1,142 @@ package functions import ( + "context" "time" + "github.com/prometheus/prometheus/promql/parser/posrange" + "github.com/grafana/mimir/pkg/streamingpromql/limiting" "github.com/grafana/mimir/pkg/streamingpromql/types" ) -var Time = timeFunc() +type ScalarTime struct { + Value float64 + ts time.Time + TimeRange types.QueryTimeRange + MemoryConsumptionTracker *limiting.MemoryConsumptionTracker + + expressionPosition posrange.PositionRange +} + +var _ types.ScalarOperator = &ScalarTime{} + +func NewScalarTime( + timeRange types.QueryTimeRange, + memoryConsumptionTracker *limiting.MemoryConsumptionTracker, + expressionPosition posrange.PositionRange, +) *ScalarTime { + return &ScalarTime{ + TimeRange: timeRange, + MemoryConsumptionTracker: memoryConsumptionTracker, + expressionPosition: expressionPosition, + } +} + +func (s *ScalarTime) GetValues(_ context.Context) (types.ScalarData, error) { + samples, err := types.FPointSlicePool.Get(s.TimeRange.StepCount, s.MemoryConsumptionTracker) + + if err != nil { + return types.ScalarData{}, err + } + + samples = samples[:s.TimeRange.StepCount] + + for step := 0; step < s.TimeRange.StepCount; step++ { + t := s.TimeRange.StartT + int64(step)*s.TimeRange.IntervalMilliseconds + samples[step].T = t + samples[step].F = float64(t) + } + + return types.ScalarData{Samples: samples}, nil +} + +func (s *ScalarTime) ExpressionPosition() posrange.PositionRange { + return s.expressionPosition +} + +func (s *ScalarTime) Close() { + // Nothing to do. +} + +type WrapperTime struct { + WrapperFunc func(time time.Time) float64 + Value float64 + ts time.Time + TimeRange types.QueryTimeRange + MemoryConsumptionTracker *limiting.MemoryConsumptionTracker + + expressionPosition posrange.PositionRange +} + +func NewWrapperTime( + wrapperFunc func(time time.Time) float64, + timeRange types.QueryTimeRange, + memoryConsumptionTracker *limiting.MemoryConsumptionTracker, + expressionPosition posrange.PositionRange, +) *WrapperTime { + return &WrapperTime{ + WrapperFunc: wrapperFunc, + TimeRange: timeRange, + MemoryConsumptionTracker: memoryConsumptionTracker, + expressionPosition: expressionPosition, + } +} + +func (s *WrapperTime) GetValues(_ context.Context) (types.ScalarData, error) { + samples, err := types.FPointSlicePool.Get(s.TimeRange.StepCount, s.MemoryConsumptionTracker) + + if err != nil { + return types.ScalarData{}, err + } + + samples = samples[:s.TimeRange.StepCount] + + for step := 0; step < s.TimeRange.StepCount; step++ { + t := s.TimeRange.StartT + int64(step)*s.TimeRange.IntervalMilliseconds + samples[step].T = t + samples[step].F = s.WrapperFunc(time.Unix(int64(t), 0).UTC()) + } + + return types.ScalarData{Samples: samples}, nil +} + +func (s *WrapperTime) ExpressionPosition() posrange.PositionRange { + return s.expressionPosition +} + +func (s *WrapperTime) Close() { + // Nothing to do. +} -var DaysInMonth = dayWrapperFunc(func(t time.Time) float64 { +var DaysInMonth = func(t time.Time) float64 { return float64(32 - time.Date(t.Year(), t.Month(), 32, 0, 0, 0, 0, time.UTC).Day()) -}) +} -var DayOfMonth = dayWrapperFunc(func(t time.Time) float64 { +var DayOfMonth = func(t time.Time) float64 { return float64(t.Day()) -}) +} -var DayOfWeek = dayWrapperFunc(func(t time.Time) float64 { +var DayOfWeek = func(t time.Time) float64 { return float64(t.Weekday()) -}) +} -var DayOfYear = dayWrapperFunc(func(t time.Time) float64 { +var DayOfYear = func(t time.Time) float64 { return float64(t.YearDay()) -}) +} -var Hour = dayWrapperFunc(func(t time.Time) float64 { +var Hour = func(t time.Time) float64 { return float64(t.Hour()) -}) +} -var Minute = dayWrapperFunc(func(t time.Time) float64 { +var Minute = func(t time.Time) float64 { return float64(t.Minute()) -}) +} -var Month = dayWrapperFunc(func(t time.Time) float64 { +var Month = func(t time.Time) float64 { return float64(t.Minute()) -}) - -var Year = dayWrapperFunc(func(t time.Time) float64 { - return float64(t.Year()) -}) - -func timeFunc() InstantVectorSeriesFunction { - return func(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, qtr types.QueryTimeRange, _ *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { - for i := range seriesData.Floats { - seriesData.Floats[i].F = float64(qtr.StartT) / 1000 - } - return seriesData, nil - } } -func dayWrapperFunc(f func(t time.Time) float64) InstantVectorSeriesFunction { - return func(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ types.QueryTimeRange, _ *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { - for i := range seriesData.Floats { - t := time.Unix(int64(seriesData.Floats[i].F), 0).UTC() - seriesData.Floats[i].F = f(t) - } - return seriesData, nil - } +var Year = func(t time.Time) float64 { + return float64(t.Year()) } diff --git a/pkg/streamingpromql/testdata/upstream/functions.test b/pkg/streamingpromql/testdata/upstream/functions.test index baca7082bc3..cbdfb2212e4 100644 --- a/pkg/streamingpromql/testdata/upstream/functions.test +++ b/pkg/streamingpromql/testdata/upstream/functions.test @@ -503,9 +503,8 @@ clear eval instant at 0m vector(1) {} 1 -# Unsupported by streaming engine. -# eval instant at 0s vector(time()) -# {} 0 +eval instant at 0s vector(time()) + {} 0 # Unsupported by streaming engine. # eval instant at 5s vector(time()) @@ -1112,9 +1111,8 @@ clear load 5m histogram_sample {{schema:0 sum:1 count:1}} -# Unsupported by streaming engine. -# eval instant at 0m year() -# {} 1970 +eval instant at 0m year() + {} 1970 # Unsupported by streaming engine. # eval instant at 1ms time() From b9b6d83c56b59a6f5a77cab469ab9e33a2a5d1c8 Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Tue, 21 Jan 2025 07:35:15 +0800 Subject: [PATCH 04/26] Implement time related functions Signed-off-by: Jon Kartago Lamida --- pkg/streamingpromql/functions.go | 91 ++++++----- .../operators/functions/times.go | 111 ++++++-------- .../testdata/upstream/functions.test | 141 +++++++----------- 3 files changed, 148 insertions(+), 195 deletions(-) diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index 6b61325f07b..d56887ea53e 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -5,8 +5,6 @@ package streamingpromql import ( "fmt" "math" - "time" - //"time" "github.com/prometheus/prometheus/promql/parser/posrange" @@ -35,15 +33,6 @@ type ScalarFunctionOperatorFactory func( timeRange types.QueryTimeRange, ) (types.ScalarOperator, error) -type TimeFunctionOperatorFactory func( - wrapper func(time time.Time) float64, - args []types.Operator, - memoryConsumptionTracker *limiting.MemoryConsumptionTracker, - annotations *annotations.Annotations, - expressionPosition posrange.PositionRange, - timeRange types.QueryTimeRange, -) (types.ScalarOperator, error) - // SingleInputVectorFunctionOperatorFactory creates an InstantVectorFunctionOperatorFactory for functions // that have exactly 1 argument (v instant-vector). // @@ -73,6 +62,36 @@ func SingleInputVectorFunctionOperatorFactory(name string, f functions.FunctionO } } +func TimeFunctionOperatorFactory(name string, f functions.FunctionOverInstantVectorDefinition) InstantVectorFunctionOperatorFactory { + return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { + if len(args) > 1 { + // Should be caught by the PromQL parser, but we check here for safety. + return nil, fmt.Errorf("expected 1 or 0 argument for %s, got %v", name, len(args)) + } + + var inner types.InstantVectorOperator + var ok bool + if len(args) > 0 { + inner, ok = args[0].(types.InstantVectorOperator) + if !ok { + // Should be caught by the PromQL parser, but we check here for safety. + return nil, fmt.Errorf("expected an instant vector argument for %s, got %T", name, args[0]) + } + + } else { + inner = scalars.NewScalarToInstantVector(functions.NewScalarTime(timeRange, memoryConsumptionTracker, expressionPosition), expressionPosition) + } + + var o types.InstantVectorOperator = functions.NewFunctionOverInstantVector(inner, nil, memoryConsumptionTracker, f, expressionPosition, timeRange) + + if f.SeriesMetadataFunction.NeedsSeriesDeduplication { + o = operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker) + } + + return o, nil + } +} + // InstantVectorTransformationFunctionOperatorFactory creates an InstantVectorFunctionOperatorFactory for functions // that have exactly 1 argument (v instant-vector), and drop the series __name__ label. // @@ -88,6 +107,15 @@ func InstantVectorTransformationFunctionOperatorFactory(name string, seriesDataF return SingleInputVectorFunctionOperatorFactory(name, f) } +func TimeTransformationFunctionOperatorFactory(name string, seriesDataFunc functions.InstantVectorSeriesFunction) InstantVectorFunctionOperatorFactory { + f := functions.FunctionOverInstantVectorDefinition{ + SeriesDataFunc: seriesDataFunc, + SeriesMetadataFunction: functions.DropSeriesName, + } + + return TimeFunctionOperatorFactory(name, f) +} + // InstantVectorLabelManipulationFunctionOperatorFactory creates an InstantVectorFunctionOperator for functions // that have exactly 1 argument (v instant-vector), and need to manipulate the labels of // each series without manipulating the returned samples. @@ -365,6 +393,10 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe "cos": InstantVectorTransformationFunctionOperatorFactory("cos", functions.Cos), "cosh": InstantVectorTransformationFunctionOperatorFactory("cosh", functions.Cosh), "count_over_time": FunctionOverRangeVectorOperatorFactory("count_over_time", functions.CountOverTime), + "days_in_month": TimeTransformationFunctionOperatorFactory("days_in_month", functions.DaysInMonth), + "day_of_month": TimeTransformationFunctionOperatorFactory("day_of_month", functions.DayOfMonth), + "day_of_week": TimeTransformationFunctionOperatorFactory("day_of_week", functions.DayOfWeek), + "day_of_year": TimeTransformationFunctionOperatorFactory("day_of_year", functions.DayOfYear), "deg": InstantVectorTransformationFunctionOperatorFactory("deg", functions.Deg), "delta": FunctionOverRangeVectorOperatorFactory("delta", functions.Delta), "deriv": FunctionOverRangeVectorOperatorFactory("deriv", functions.Deriv), @@ -377,6 +409,7 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe "histogram_stddev": InstantVectorTransformationFunctionOperatorFactory("histogram_stddev", functions.HistogramStdDevStdVar(true)), "histogram_stdvar": InstantVectorTransformationFunctionOperatorFactory("histogram_stdvar", functions.HistogramStdDevStdVar(false)), "histogram_sum": InstantVectorTransformationFunctionOperatorFactory("histogram_sum", functions.HistogramSum), + "hour": TimeTransformationFunctionOperatorFactory("hour", functions.Hour), "idelta": FunctionOverRangeVectorOperatorFactory("idelta", functions.Idelta), "increase": FunctionOverRangeVectorOperatorFactory("increase", functions.Increase), "irate": FunctionOverRangeVectorOperatorFactory("irate", functions.Irate), @@ -387,6 +420,8 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe "log2": InstantVectorTransformationFunctionOperatorFactory("log2", functions.Log2), "max_over_time": FunctionOverRangeVectorOperatorFactory("max_over_time", functions.MaxOverTime), "min_over_time": FunctionOverRangeVectorOperatorFactory("min_over_time", functions.MinOverTime), + "minute": TimeTransformationFunctionOperatorFactory("minute", functions.Minute), + "month": TimeTransformationFunctionOperatorFactory("month", functions.Month), "present_over_time": FunctionOverRangeVectorOperatorFactory("present_over_time", functions.PresentOverTime), "rad": InstantVectorTransformationFunctionOperatorFactory("rad", functions.Rad), "rate": FunctionOverRangeVectorOperatorFactory("rate", functions.Rate), @@ -400,6 +435,7 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe "tan": InstantVectorTransformationFunctionOperatorFactory("tan", functions.Tan), "tanh": InstantVectorTransformationFunctionOperatorFactory("tanh", functions.Tanh), "vector": scalarToInstantVectorOperatorFactory, + "year": TimeTransformationFunctionOperatorFactory("year", functions.Year), } func RegisterInstantVectorFunctionOperatorFactory(functionName string, factory InstantVectorFunctionOperatorFactory) error { @@ -414,17 +450,9 @@ func RegisterInstantVectorFunctionOperatorFactory(functionName string, factory I // These functions return a scalar. var scalarFunctionOperatorFactories = map[string]ScalarFunctionOperatorFactory{ // Please keep this list sorted alphabetically. - "days_in_month": newTimeAdapter(functions.DaysInMonth).wrapperTimeOperatorFactory, - "day_of_month": newTimeAdapter(functions.DayOfMonth).wrapperTimeOperatorFactory, - "day_of_week": newTimeAdapter(functions.DayOfWeek).wrapperTimeOperatorFactory, - "day_of_year": newTimeAdapter(functions.DayOfYear).wrapperTimeOperatorFactory, - "hour": newTimeAdapter(functions.Hour).wrapperTimeOperatorFactory, - "minute": newTimeAdapter(functions.Minute).wrapperTimeOperatorFactory, - "month": newTimeAdapter(functions.Month).wrapperTimeOperatorFactory, - "pi": piOperatorFactory, - "scalar": instantVectorToScalarOperatorFactory, - "time": timeOperatorFactory, - "year": newTimeAdapter(functions.Year).wrapperTimeOperatorFactory, + "pi": piOperatorFactory, + "scalar": instantVectorToScalarOperatorFactory, + "time": timeOperatorFactory, } func RegisterScalarFunctionOperatorFactory(functionName string, factory ScalarFunctionOperatorFactory) error { @@ -454,25 +482,6 @@ func timeOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiti return functions.NewScalarTime(timeRange, memoryConsumptionTracker, expressionPosition), nil } -type adapter struct { - wrapperFunc func(time time.Time) float64 -} - -func newTimeAdapter(wrapperFunc func(time time.Time) float64) adapter { - return adapter{ - wrapperFunc: wrapperFunc, - } -} - -func (a adapter) wrapperTimeOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.ScalarOperator, error) { - if len(args) != 0 { - // Should be caught by the PromQL parser, but we check here for safety. - return nil, fmt.Errorf("expected exactly 0 arguments for time, got %v", len(args)) - } - - return functions.NewWrapperTime(a.wrapperFunc, timeRange, memoryConsumptionTracker, expressionPosition), nil -} - func instantVectorToScalarOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.ScalarOperator, error) { if len(args) != 1 { // Should be caught by the PromQL parser, but we check here for safety. diff --git a/pkg/streamingpromql/operators/functions/times.go b/pkg/streamingpromql/operators/functions/times.go index f8aa753e850..cb760dd120f 100644 --- a/pkg/streamingpromql/operators/functions/times.go +++ b/pkg/streamingpromql/operators/functions/times.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser/posrange" "github.com/grafana/mimir/pkg/streamingpromql/limiting" @@ -11,7 +12,6 @@ import ( ) type ScalarTime struct { - Value float64 ts time.Time TimeRange types.QueryTimeRange MemoryConsumptionTracker *limiting.MemoryConsumptionTracker @@ -45,7 +45,7 @@ func (s *ScalarTime) GetValues(_ context.Context) (types.ScalarData, error) { for step := 0; step < s.TimeRange.StepCount; step++ { t := s.TimeRange.StartT + int64(step)*s.TimeRange.IntervalMilliseconds samples[step].T = t - samples[step].F = float64(t) + samples[step].F = float64(t) / 1000 } return types.ScalarData{Samples: samples}, nil @@ -59,84 +59,59 @@ func (s *ScalarTime) Close() { // Nothing to do. } -type WrapperTime struct { - WrapperFunc func(time time.Time) float64 - Value float64 - ts time.Time - TimeRange types.QueryTimeRange - MemoryConsumptionTracker *limiting.MemoryConsumptionTracker - - expressionPosition posrange.PositionRange -} - -func NewWrapperTime( - wrapperFunc func(time time.Time) float64, - timeRange types.QueryTimeRange, - memoryConsumptionTracker *limiting.MemoryConsumptionTracker, - expressionPosition posrange.PositionRange, -) *WrapperTime { - return &WrapperTime{ - WrapperFunc: wrapperFunc, - TimeRange: timeRange, - MemoryConsumptionTracker: memoryConsumptionTracker, - expressionPosition: expressionPosition, - } -} - -func (s *WrapperTime) GetValues(_ context.Context) (types.ScalarData, error) { - samples, err := types.FPointSlicePool.Get(s.TimeRange.StepCount, s.MemoryConsumptionTracker) - - if err != nil { - return types.ScalarData{}, err - } - - samples = samples[:s.TimeRange.StepCount] - - for step := 0; step < s.TimeRange.StepCount; step++ { - t := s.TimeRange.StartT + int64(step)*s.TimeRange.IntervalMilliseconds - samples[step].T = t - samples[step].F = s.WrapperFunc(time.Unix(int64(t), 0).UTC()) - } - - return types.ScalarData{Samples: samples}, nil -} - -func (s *WrapperTime) ExpressionPosition() posrange.PositionRange { - return s.expressionPosition -} - -func (s *WrapperTime) Close() { - // Nothing to do. -} - -var DaysInMonth = func(t time.Time) float64 { +var DaysInMonth = timeWrapperFunc(func(t time.Time) float64 { return float64(32 - time.Date(t.Year(), t.Month(), 32, 0, 0, 0, 0, time.UTC).Day()) -} +}) -var DayOfMonth = func(t time.Time) float64 { +var DayOfMonth = timeWrapperFunc(func(t time.Time) float64 { return float64(t.Day()) -} +}) -var DayOfWeek = func(t time.Time) float64 { +var DayOfWeek = timeWrapperFunc(func(t time.Time) float64 { return float64(t.Weekday()) -} +}) -var DayOfYear = func(t time.Time) float64 { +var DayOfYear = timeWrapperFunc(func(t time.Time) float64 { return float64(t.YearDay()) -} +}) -var Hour = func(t time.Time) float64 { +var Hour = timeWrapperFunc(func(t time.Time) float64 { return float64(t.Hour()) -} +}) -var Minute = func(t time.Time) float64 { +var Minute = timeWrapperFunc(func(t time.Time) float64 { return float64(t.Minute()) -} +}) -var Month = func(t time.Time) float64 { - return float64(t.Minute()) -} +var Month = timeWrapperFunc(func(t time.Time) float64 { + return float64(t.Month()) +}) -var Year = func(t time.Time) float64 { +var Year = timeWrapperFunc(func(t time.Time) float64 { return float64(t.Year()) +}) + +func timeWrapperFunc(f func(t time.Time) float64) InstantVectorSeriesFunction { + return func(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, tr types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { + if len(seriesData.Floats)+len(seriesData.Histograms) == 0 { + fp := promql.FPoint{ + F: f(time.Unix(tr.StartT, 0).UTC()), + } + seriesData.Floats = append(seriesData.Floats, fp) + return seriesData, nil + } + + if len(seriesData.Floats) > 0 { + for i := range seriesData.Floats { + t := time.Unix(int64(seriesData.Floats[i].F), 0).UTC() + seriesData.Floats[i].F = f(t) + } + return seriesData, nil + } + + // we don't do time based function on histograms + types.HPointSlicePool.Put(seriesData.Histograms, memoryConsumptionTracker) + seriesData.Histograms = nil + return seriesData, nil + } } diff --git a/pkg/streamingpromql/testdata/upstream/functions.test b/pkg/streamingpromql/testdata/upstream/functions.test index cbdfb2212e4..c1ef33482ab 100644 --- a/pkg/streamingpromql/testdata/upstream/functions.test +++ b/pkg/streamingpromql/testdata/upstream/functions.test @@ -1114,131 +1114,100 @@ load 5m eval instant at 0m year() {} 1970 -# Unsupported by streaming engine. -# eval instant at 1ms time() -# 0.001 - -# Unsupported by streaming engine. -# eval instant at 50m time() -# 3000 +eval instant at 1ms time() + 0.001 -# Unsupported by streaming engine. -# eval instant at 0m year(vector(1136239445)) -# {} 2006 +eval instant at 50m time() + 3000 -# Unsupported by streaming engine. -# eval instant at 0m month() -# {} 1 +eval instant at 0m year(vector(1136239445)) + {} 2006 -# Unsupported by streaming engine. -# eval instant at 0m month(vector(1136239445)) -# {} 1 +eval instant at 0m month() + {} 1 -# Unsupported by streaming engine. -# eval instant at 0m day_of_month() -# {} 1 +eval instant at 0m month(vector(1136239445)) + {} 1 -# Unsupported by streaming engine. -# eval instant at 0m day_of_month(vector(1136239445)) -# {} 2 +#eval instant at 0m day_of_month() +# {} 1 +# +#eval instant at 0m day_of_month(vector(1136239445)) +# {} 2 -# Unsupported by streaming engine. -# eval instant at 0m day_of_year() -# {} 1 +eval instant at 0m day_of_year() + {} 1 -# Unsupported by streaming engine. -# eval instant at 0m day_of_year(vector(1136239445)) -# {} 2 +eval instant at 0m day_of_year(vector(1136239445)) + {} 2 # Thursday. -# Unsupported by streaming engine. -# eval instant at 0m day_of_week() -# {} 4 +eval instant at 0m day_of_week() + {} 4 -# Unsupported by streaming engine. -# eval instant at 0m day_of_week(vector(1136239445)) -# {} 1 +eval instant at 0m day_of_week(vector(1136239445)) + {} 1 -# Unsupported by streaming engine. -# eval instant at 0m hour() -# {} 0 +eval instant at 0m hour() + {} 0 -# Unsupported by streaming engine. -# eval instant at 0m hour(vector(1136239445)) -# {} 22 +eval instant at 0m hour(vector(1136239445)) + {} 22 -# Unsupported by streaming engine. -# eval instant at 0m minute() -# {} 0 +eval instant at 0m minute() + {} 0 -# Unsupported by streaming engine. -# eval instant at 0m minute(vector(1136239445)) -# {} 4 +eval instant at 0m minute(vector(1136239445)) + {} 4 # 2008-12-31 23:59:59 just before leap second. -# Unsupported by streaming engine. -# eval instant at 0m year(vector(1230767999)) -# {} 2008 +eval instant at 0m year(vector(1230767999)) + {} 2008 # 2009-01-01 00:00:00 just after leap second. -# Unsupported by streaming engine. -# eval instant at 0m year(vector(1230768000)) -# {} 2009 +eval instant at 0m year(vector(1230768000)) + {} 2009 # 2016-02-29 23:59:59 February 29th in leap year. -# Unsupported by streaming engine. -# eval instant at 0m month(vector(1456790399)) + day_of_month(vector(1456790399)) / 100 -# {} 2.29 +eval instant at 0m month(vector(1456790399)) + day_of_month(vector(1456790399)) / 100 + {} 2.29 # 2016-03-01 00:00:00 March 1st in leap year. -# Unsupported by streaming engine. -# eval instant at 0m month(vector(1456790400)) + day_of_month(vector(1456790400)) / 100 -# {} 3.01 +eval instant at 0m month(vector(1456790400)) + day_of_month(vector(1456790400)) / 100 + {} 3.01 # 2016-12-31 13:37:00 366th day in leap year. -# Unsupported by streaming engine. -# eval instant at 0m day_of_year(vector(1483191420)) -# {} 366 +eval instant at 0m day_of_year(vector(1483191420)) + {} 366 # 2022-12-31 13:37:00 365th day in non-leap year. -# Unsupported by streaming engine. -# eval instant at 0m day_of_year(vector(1672493820)) -# {} 365 +eval instant at 0m day_of_year(vector(1672493820)) + {} 365 # February 1st 2016 in leap year. -# Unsupported by streaming engine. -# eval instant at 0m days_in_month(vector(1454284800)) -# {} 29 +eval instant at 0m days_in_month(vector(1454284800)) + {} 29 # February 1st 2017 not in leap year. -# Unsupported by streaming engine. -# eval instant at 0m days_in_month(vector(1485907200)) -# {} 28 +eval instant at 0m days_in_month(vector(1485907200)) + {} 28 # Test for histograms. -# Unsupported by streaming engine. -# eval instant at 0m day_of_month(histogram_sample) +eval instant at 0m day_of_month(histogram_sample) -# Unsupported by streaming engine. -# eval instant at 0m day_of_week(histogram_sample) +eval instant at 0m day_of_week(histogram_sample) -# Unsupported by streaming engine. -# eval instant at 0m day_of_year(histogram_sample) +eval instant at 0m day_of_year(histogram_sample) -# Unsupported by streaming engine. -# eval instant at 0m days_in_month(histogram_sample) +eval instant at 0m days_in_month(histogram_sample) -# Unsupported by streaming engine. -# eval instant at 0m hour(histogram_sample) +eval instant at 0m hour(histogram_sample) -# Unsupported by streaming engine. -# eval instant at 0m minute(histogram_sample) +eval instant at 0m minute(histogram_sample) -# Unsupported by streaming engine. -# eval instant at 0m month(histogram_sample) +eval instant at 0m month(histogram_sample) -# Unsupported by streaming engine. -# eval instant at 0m year(histogram_sample) +eval instant at 0m year(histogram_sample) clear From 59b203994987767afff48ee808e7744b9287d65a Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Tue, 21 Jan 2025 08:12:27 +0800 Subject: [PATCH 05/26] Remove unused field Signed-off-by: Jon Kartago Lamida --- pkg/streamingpromql/operators/functions/times.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/streamingpromql/operators/functions/times.go b/pkg/streamingpromql/operators/functions/times.go index cb760dd120f..c8d86c66ef0 100644 --- a/pkg/streamingpromql/operators/functions/times.go +++ b/pkg/streamingpromql/operators/functions/times.go @@ -12,11 +12,9 @@ import ( ) type ScalarTime struct { - ts time.Time TimeRange types.QueryTimeRange MemoryConsumptionTracker *limiting.MemoryConsumptionTracker - - expressionPosition posrange.PositionRange + expressionPosition posrange.PositionRange } var _ types.ScalarOperator = &ScalarTime{} From 001b9296df97652741b42d950a6f01a74d18ea62 Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Tue, 21 Jan 2025 08:14:01 +0800 Subject: [PATCH 06/26] Remove commented import Signed-off-by: Jon Kartago Lamida --- pkg/streamingpromql/functions.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index d56887ea53e..58f7b19d0c9 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -5,7 +5,6 @@ package streamingpromql import ( "fmt" "math" - //"time" "github.com/prometheus/prometheus/promql/parser/posrange" "github.com/prometheus/prometheus/util/annotations" From ac31177e133f06a7ffc4a4aa22db8c015031f90a Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Tue, 21 Jan 2025 08:16:21 +0800 Subject: [PATCH 07/26] Add comment for readability Signed-off-by: Jon Kartago Lamida --- pkg/streamingpromql/functions.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index 58f7b19d0c9..8f0e35eb7e0 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -71,6 +71,7 @@ func TimeFunctionOperatorFactory(name string, f functions.FunctionOverInstantVec var inner types.InstantVectorOperator var ok bool if len(args) > 0 { + // time based function expect instant vector argument inner, ok = args[0].(types.InstantVectorOperator) if !ok { // Should be caught by the PromQL parser, but we check here for safety. @@ -78,11 +79,11 @@ func TimeFunctionOperatorFactory(name string, f functions.FunctionOverInstantVec } } else { + // if the argument is not provided, it will default to vector(time()) inner = scalars.NewScalarToInstantVector(functions.NewScalarTime(timeRange, memoryConsumptionTracker, expressionPosition), expressionPosition) } var o types.InstantVectorOperator = functions.NewFunctionOverInstantVector(inner, nil, memoryConsumptionTracker, f, expressionPosition, timeRange) - if f.SeriesMetadataFunction.NeedsSeriesDeduplication { o = operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker) } From 91989e2510af533e712a962bdd740dd0a55fd059 Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Tue, 21 Jan 2025 10:23:19 +0800 Subject: [PATCH 08/26] Missed to uncomment test Signed-off-by: Jon Kartago Lamida --- pkg/streamingpromql/testdata/upstream/functions.test | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/streamingpromql/testdata/upstream/functions.test b/pkg/streamingpromql/testdata/upstream/functions.test index c1ef33482ab..208636fcbe2 100644 --- a/pkg/streamingpromql/testdata/upstream/functions.test +++ b/pkg/streamingpromql/testdata/upstream/functions.test @@ -1129,11 +1129,11 @@ eval instant at 0m month() eval instant at 0m month(vector(1136239445)) {} 1 -#eval instant at 0m day_of_month() -# {} 1 -# -#eval instant at 0m day_of_month(vector(1136239445)) -# {} 2 +eval instant at 0m day_of_month() + {} 1 + +eval instant at 0m day_of_month(vector(1136239445)) + {} 2 eval instant at 0m day_of_year() {} 1 From 9de48b5fbc62510ad40c0c885934d77442392293 Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Tue, 21 Jan 2025 10:24:40 +0800 Subject: [PATCH 09/26] Add missing license header Signed-off-by: Jon Kartago Lamida --- pkg/streamingpromql/operators/functions/times.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/streamingpromql/operators/functions/times.go b/pkg/streamingpromql/operators/functions/times.go index c8d86c66ef0..d856126ab3c 100644 --- a/pkg/streamingpromql/operators/functions/times.go +++ b/pkg/streamingpromql/operators/functions/times.go @@ -1,3 +1,9 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/engine.go +// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/functions.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Prometheus Authors + package functions import ( From 5ebc1ff6868829e501c3699aca5fc54fab6d2290 Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Wed, 22 Jan 2025 21:19:23 +0800 Subject: [PATCH 10/26] Refactor the time function name Signed-off-by: Jon Kartago Lamida --- pkg/streamingpromql/functions.go | 4 ++-- pkg/streamingpromql/operators/functions/times.go | 16 ++++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index 8f0e35eb7e0..7764ad848b5 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -80,7 +80,7 @@ func TimeFunctionOperatorFactory(name string, f functions.FunctionOverInstantVec } else { // if the argument is not provided, it will default to vector(time()) - inner = scalars.NewScalarToInstantVector(functions.NewScalarTime(timeRange, memoryConsumptionTracker, expressionPosition), expressionPosition) + inner = scalars.NewScalarToInstantVector(functions.NewTime(timeRange, memoryConsumptionTracker, expressionPosition), expressionPosition) } var o types.InstantVectorOperator = functions.NewFunctionOverInstantVector(inner, nil, memoryConsumptionTracker, f, expressionPosition, timeRange) @@ -479,7 +479,7 @@ func timeOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiti return nil, fmt.Errorf("expected exactly 0 arguments for time, got %v", len(args)) } - return functions.NewScalarTime(timeRange, memoryConsumptionTracker, expressionPosition), nil + return functions.NewTime(timeRange, memoryConsumptionTracker, expressionPosition), nil } func instantVectorToScalarOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.ScalarOperator, error) { diff --git a/pkg/streamingpromql/operators/functions/times.go b/pkg/streamingpromql/operators/functions/times.go index d856126ab3c..b89ee27d638 100644 --- a/pkg/streamingpromql/operators/functions/times.go +++ b/pkg/streamingpromql/operators/functions/times.go @@ -17,27 +17,27 @@ import ( "github.com/grafana/mimir/pkg/streamingpromql/types" ) -type ScalarTime struct { +type Time struct { TimeRange types.QueryTimeRange MemoryConsumptionTracker *limiting.MemoryConsumptionTracker expressionPosition posrange.PositionRange } -var _ types.ScalarOperator = &ScalarTime{} +var _ types.ScalarOperator = &Time{} -func NewScalarTime( +func NewTime( timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, expressionPosition posrange.PositionRange, -) *ScalarTime { - return &ScalarTime{ +) *Time { + return &Time{ TimeRange: timeRange, MemoryConsumptionTracker: memoryConsumptionTracker, expressionPosition: expressionPosition, } } -func (s *ScalarTime) GetValues(_ context.Context) (types.ScalarData, error) { +func (s *Time) GetValues(_ context.Context) (types.ScalarData, error) { samples, err := types.FPointSlicePool.Get(s.TimeRange.StepCount, s.MemoryConsumptionTracker) if err != nil { @@ -55,11 +55,11 @@ func (s *ScalarTime) GetValues(_ context.Context) (types.ScalarData, error) { return types.ScalarData{Samples: samples}, nil } -func (s *ScalarTime) ExpressionPosition() posrange.PositionRange { +func (s *Time) ExpressionPosition() posrange.PositionRange { return s.expressionPosition } -func (s *ScalarTime) Close() { +func (s *Time) Close() { // Nothing to do. } From eeafd629c1c3a2deea3d74c4a229dbe277fa2232 Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Wed, 22 Jan 2025 23:24:38 +0800 Subject: [PATCH 11/26] Missed function to uncomment Signed-off-by: Jon Kartago Lamida --- pkg/streamingpromql/testdata/upstream/functions.test | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/pkg/streamingpromql/testdata/upstream/functions.test b/pkg/streamingpromql/testdata/upstream/functions.test index 208636fcbe2..36b97e14660 100644 --- a/pkg/streamingpromql/testdata/upstream/functions.test +++ b/pkg/streamingpromql/testdata/upstream/functions.test @@ -506,13 +506,11 @@ eval instant at 0m vector(1) eval instant at 0s vector(time()) {} 0 -# Unsupported by streaming engine. -# eval instant at 5s vector(time()) -# {} 5 +eval instant at 5s vector(time()) + {} 5 -# Unsupported by streaming engine. -# eval instant at 60m vector(time()) -# {} 3600 +eval instant at 60m vector(time()) + {} 3600 # Tests for clamp_max, clamp_min(), and clamp(). From 0e1f3a72c0b9beab59af6eeed519ffd4458069f1 Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Thu, 23 Jan 2025 01:33:26 +0800 Subject: [PATCH 12/26] Split the file between times function and time operator Signed-off-by: Jon Kartago Lamida --- pkg/streamingpromql/functions.go | 4 +- .../operators/functions/times.go | 121 ------------------ pkg/streamingpromql/operators/time.go | 58 +++++++++ 3 files changed, 60 insertions(+), 123 deletions(-) delete mode 100644 pkg/streamingpromql/operators/functions/times.go create mode 100644 pkg/streamingpromql/operators/time.go diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index 7764ad848b5..761334f7b84 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -80,7 +80,7 @@ func TimeFunctionOperatorFactory(name string, f functions.FunctionOverInstantVec } else { // if the argument is not provided, it will default to vector(time()) - inner = scalars.NewScalarToInstantVector(functions.NewTime(timeRange, memoryConsumptionTracker, expressionPosition), expressionPosition) + inner = scalars.NewScalarToInstantVector(operators.NewTime(timeRange, memoryConsumptionTracker, expressionPosition), expressionPosition) } var o types.InstantVectorOperator = functions.NewFunctionOverInstantVector(inner, nil, memoryConsumptionTracker, f, expressionPosition, timeRange) @@ -479,7 +479,7 @@ func timeOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiti return nil, fmt.Errorf("expected exactly 0 arguments for time, got %v", len(args)) } - return functions.NewTime(timeRange, memoryConsumptionTracker, expressionPosition), nil + return operators.NewTime(timeRange, memoryConsumptionTracker, expressionPosition), nil } func instantVectorToScalarOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.ScalarOperator, error) { diff --git a/pkg/streamingpromql/operators/functions/times.go b/pkg/streamingpromql/operators/functions/times.go deleted file mode 100644 index b89ee27d638..00000000000 --- a/pkg/streamingpromql/operators/functions/times.go +++ /dev/null @@ -1,121 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-only -// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/engine.go -// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/functions.go -// Provenance-includes-license: Apache-2.0 -// Provenance-includes-copyright: The Prometheus Authors - -package functions - -import ( - "context" - "time" - - "github.com/prometheus/prometheus/promql" - "github.com/prometheus/prometheus/promql/parser/posrange" - - "github.com/grafana/mimir/pkg/streamingpromql/limiting" - "github.com/grafana/mimir/pkg/streamingpromql/types" -) - -type Time struct { - TimeRange types.QueryTimeRange - MemoryConsumptionTracker *limiting.MemoryConsumptionTracker - expressionPosition posrange.PositionRange -} - -var _ types.ScalarOperator = &Time{} - -func NewTime( - timeRange types.QueryTimeRange, - memoryConsumptionTracker *limiting.MemoryConsumptionTracker, - expressionPosition posrange.PositionRange, -) *Time { - return &Time{ - TimeRange: timeRange, - MemoryConsumptionTracker: memoryConsumptionTracker, - expressionPosition: expressionPosition, - } -} - -func (s *Time) GetValues(_ context.Context) (types.ScalarData, error) { - samples, err := types.FPointSlicePool.Get(s.TimeRange.StepCount, s.MemoryConsumptionTracker) - - if err != nil { - return types.ScalarData{}, err - } - - samples = samples[:s.TimeRange.StepCount] - - for step := 0; step < s.TimeRange.StepCount; step++ { - t := s.TimeRange.StartT + int64(step)*s.TimeRange.IntervalMilliseconds - samples[step].T = t - samples[step].F = float64(t) / 1000 - } - - return types.ScalarData{Samples: samples}, nil -} - -func (s *Time) ExpressionPosition() posrange.PositionRange { - return s.expressionPosition -} - -func (s *Time) Close() { - // Nothing to do. -} - -var DaysInMonth = timeWrapperFunc(func(t time.Time) float64 { - return float64(32 - time.Date(t.Year(), t.Month(), 32, 0, 0, 0, 0, time.UTC).Day()) -}) - -var DayOfMonth = timeWrapperFunc(func(t time.Time) float64 { - return float64(t.Day()) -}) - -var DayOfWeek = timeWrapperFunc(func(t time.Time) float64 { - return float64(t.Weekday()) -}) - -var DayOfYear = timeWrapperFunc(func(t time.Time) float64 { - return float64(t.YearDay()) -}) - -var Hour = timeWrapperFunc(func(t time.Time) float64 { - return float64(t.Hour()) -}) - -var Minute = timeWrapperFunc(func(t time.Time) float64 { - return float64(t.Minute()) -}) - -var Month = timeWrapperFunc(func(t time.Time) float64 { - return float64(t.Month()) -}) - -var Year = timeWrapperFunc(func(t time.Time) float64 { - return float64(t.Year()) -}) - -func timeWrapperFunc(f func(t time.Time) float64) InstantVectorSeriesFunction { - return func(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, tr types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { - if len(seriesData.Floats)+len(seriesData.Histograms) == 0 { - fp := promql.FPoint{ - F: f(time.Unix(tr.StartT, 0).UTC()), - } - seriesData.Floats = append(seriesData.Floats, fp) - return seriesData, nil - } - - if len(seriesData.Floats) > 0 { - for i := range seriesData.Floats { - t := time.Unix(int64(seriesData.Floats[i].F), 0).UTC() - seriesData.Floats[i].F = f(t) - } - return seriesData, nil - } - - // we don't do time based function on histograms - types.HPointSlicePool.Put(seriesData.Histograms, memoryConsumptionTracker) - seriesData.Histograms = nil - return seriesData, nil - } -} diff --git a/pkg/streamingpromql/operators/time.go b/pkg/streamingpromql/operators/time.go new file mode 100644 index 00000000000..d322e53f7e9 --- /dev/null +++ b/pkg/streamingpromql/operators/time.go @@ -0,0 +1,58 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package operators + +import ( + "context" + + "github.com/prometheus/prometheus/promql/parser/posrange" + + "github.com/grafana/mimir/pkg/streamingpromql/limiting" + "github.com/grafana/mimir/pkg/streamingpromql/types" +) + +type Time struct { + TimeRange types.QueryTimeRange + MemoryConsumptionTracker *limiting.MemoryConsumptionTracker + expressionPosition posrange.PositionRange +} + +var _ types.ScalarOperator = &Time{} + +func NewTime( + timeRange types.QueryTimeRange, + memoryConsumptionTracker *limiting.MemoryConsumptionTracker, + expressionPosition posrange.PositionRange, +) *Time { + return &Time{ + TimeRange: timeRange, + MemoryConsumptionTracker: memoryConsumptionTracker, + expressionPosition: expressionPosition, + } +} + +func (s *Time) GetValues(_ context.Context) (types.ScalarData, error) { + samples, err := types.FPointSlicePool.Get(s.TimeRange.StepCount, s.MemoryConsumptionTracker) + + if err != nil { + return types.ScalarData{}, err + } + + samples = samples[:s.TimeRange.StepCount] + + for step := 0; step < s.TimeRange.StepCount; step++ { + t := s.TimeRange.StartT + int64(step)*s.TimeRange.IntervalMilliseconds + samples[step].T = t + samples[step].F = float64(t) / 1000 + } + + return types.ScalarData{Samples: samples}, nil +} + +func (s *Time) ExpressionPosition() posrange.PositionRange { + return s.expressionPosition +} + +func (s *Time) Close() { + // Nothing to do. +} From 9b4361e816c052c49124796593c7203d36c4ac0f Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Thu, 23 Jan 2025 01:35:22 +0800 Subject: [PATCH 13/26] Push new times function Signed-off-by: Jon Kartago Lamida --- .../operators/functions/times.go | 69 +++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 pkg/streamingpromql/operators/functions/times.go diff --git a/pkg/streamingpromql/operators/functions/times.go b/pkg/streamingpromql/operators/functions/times.go new file mode 100644 index 00000000000..3d03de57690 --- /dev/null +++ b/pkg/streamingpromql/operators/functions/times.go @@ -0,0 +1,69 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package functions + +import ( + "time" + + "github.com/prometheus/prometheus/promql" + + "github.com/grafana/mimir/pkg/streamingpromql/limiting" + "github.com/grafana/mimir/pkg/streamingpromql/types" +) + +var DaysInMonth = timeWrapperFunc(func(t time.Time) float64 { + return float64(32 - time.Date(t.Year(), t.Month(), 32, 0, 0, 0, 0, time.UTC).Day()) +}) + +var DayOfMonth = timeWrapperFunc(func(t time.Time) float64 { + return float64(t.Day()) +}) + +var DayOfWeek = timeWrapperFunc(func(t time.Time) float64 { + return float64(t.Weekday()) +}) + +var DayOfYear = timeWrapperFunc(func(t time.Time) float64 { + return float64(t.YearDay()) +}) + +var Hour = timeWrapperFunc(func(t time.Time) float64 { + return float64(t.Hour()) +}) + +var Minute = timeWrapperFunc(func(t time.Time) float64 { + return float64(t.Minute()) +}) + +var Month = timeWrapperFunc(func(t time.Time) float64 { + return float64(t.Month()) +}) + +var Year = timeWrapperFunc(func(t time.Time) float64 { + return float64(t.Year()) +}) + +func timeWrapperFunc(f func(t time.Time) float64) InstantVectorSeriesFunction { + return func(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, tr types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { + if len(seriesData.Floats)+len(seriesData.Histograms) == 0 { + fp := promql.FPoint{ + F: f(time.Unix(tr.StartT, 0).UTC()), + } + seriesData.Floats = append(seriesData.Floats, fp) + return seriesData, nil + } + + if len(seriesData.Floats) > 0 { + for i := range seriesData.Floats { + t := time.Unix(int64(seriesData.Floats[i].F), 0).UTC() + seriesData.Floats[i].F = f(t) + } + return seriesData, nil + } + + // we don't do time based function on histograms + types.HPointSlicePool.Put(seriesData.Histograms, memoryConsumptionTracker) + seriesData.Histograms = nil + return seriesData, nil + } +} From 55e19987b33d6cf1568905534041e14ccc278781 Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Thu, 23 Jan 2025 16:15:02 +0800 Subject: [PATCH 14/26] Update pkg/streamingpromql/functions.go Co-authored-by: Charles Korn --- pkg/streamingpromql/functions.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index 761334f7b84..0d0e156faba 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -65,7 +65,7 @@ func TimeFunctionOperatorFactory(name string, f functions.FunctionOverInstantVec return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { if len(args) > 1 { // Should be caught by the PromQL parser, but we check here for safety. - return nil, fmt.Errorf("expected 1 or 0 argument for %s, got %v", name, len(args)) + return nil, fmt.Errorf("expected 0 or 1 argument for %s, got %v", name, len(args)) } var inner types.InstantVectorOperator From b0d0a7d7df1f87f3da45ac44b5c0e479d777147b Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Mon, 27 Jan 2025 22:59:42 +0800 Subject: [PATCH 15/26] Move ok declaration inside if Signed-off-by: Jon Kartago Lamida --- pkg/streamingpromql/functions.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index 0d0e156faba..224435a54a8 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -69,8 +69,8 @@ func TimeFunctionOperatorFactory(name string, f functions.FunctionOverInstantVec } var inner types.InstantVectorOperator - var ok bool if len(args) > 0 { + var ok bool // time based function expect instant vector argument inner, ok = args[0].(types.InstantVectorOperator) if !ok { From 2aa5ed6251cf77211e1b0b8e4c324d91cfeadc5c Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Mon, 27 Jan 2025 23:09:43 +0800 Subject: [PATCH 16/26] Remove unneeded block Signed-off-by: Jon Kartago Lamida --- pkg/streamingpromql/operators/functions/times.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/pkg/streamingpromql/operators/functions/times.go b/pkg/streamingpromql/operators/functions/times.go index 3d03de57690..6837bd9aa4f 100644 --- a/pkg/streamingpromql/operators/functions/times.go +++ b/pkg/streamingpromql/operators/functions/times.go @@ -5,8 +5,6 @@ package functions import ( "time" - "github.com/prometheus/prometheus/promql" - "github.com/grafana/mimir/pkg/streamingpromql/limiting" "github.com/grafana/mimir/pkg/streamingpromql/types" ) @@ -45,14 +43,6 @@ var Year = timeWrapperFunc(func(t time.Time) float64 { func timeWrapperFunc(f func(t time.Time) float64) InstantVectorSeriesFunction { return func(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, tr types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { - if len(seriesData.Floats)+len(seriesData.Histograms) == 0 { - fp := promql.FPoint{ - F: f(time.Unix(tr.StartT, 0).UTC()), - } - seriesData.Floats = append(seriesData.Floats, fp) - return seriesData, nil - } - if len(seriesData.Floats) > 0 { for i := range seriesData.Floats { t := time.Unix(int64(seriesData.Floats[i].F), 0).UTC() From 05bd2868207bcbaa88780c1a17f703942aae5464 Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Mon, 27 Jan 2025 23:19:00 +0800 Subject: [PATCH 17/26] Return histogram slice to pool in defer Signed-off-by: Jon Kartago Lamida --- pkg/streamingpromql/operators/functions/times.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/streamingpromql/operators/functions/times.go b/pkg/streamingpromql/operators/functions/times.go index 6837bd9aa4f..e64fbcbd3b8 100644 --- a/pkg/streamingpromql/operators/functions/times.go +++ b/pkg/streamingpromql/operators/functions/times.go @@ -43,6 +43,8 @@ var Year = timeWrapperFunc(func(t time.Time) float64 { func timeWrapperFunc(f func(t time.Time) float64) InstantVectorSeriesFunction { return func(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, tr types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { + defer types.HPointSlicePool.Put(seriesData.Histograms, memoryConsumptionTracker) + if len(seriesData.Floats) > 0 { for i := range seriesData.Floats { t := time.Unix(int64(seriesData.Floats[i].F), 0).UTC() @@ -52,7 +54,6 @@ func timeWrapperFunc(f func(t time.Time) float64) InstantVectorSeriesFunction { } // we don't do time based function on histograms - types.HPointSlicePool.Put(seriesData.Histograms, memoryConsumptionTracker) seriesData.Histograms = nil return seriesData, nil } From a75f93be8a58d15ff6aa21f0b502d79bbda457c2 Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Mon, 27 Jan 2025 23:24:29 +0800 Subject: [PATCH 18/26] Fix unused parameter Signed-off-by: Jon Kartago Lamida --- pkg/streamingpromql/operators/functions/times.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/streamingpromql/operators/functions/times.go b/pkg/streamingpromql/operators/functions/times.go index e64fbcbd3b8..db7a89aa8fe 100644 --- a/pkg/streamingpromql/operators/functions/times.go +++ b/pkg/streamingpromql/operators/functions/times.go @@ -42,7 +42,7 @@ var Year = timeWrapperFunc(func(t time.Time) float64 { }) func timeWrapperFunc(f func(t time.Time) float64) InstantVectorSeriesFunction { - return func(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, tr types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { + return func(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { defer types.HPointSlicePool.Put(seriesData.Histograms, memoryConsumptionTracker) if len(seriesData.Floats) > 0 { From aa4d1f238d4b89f82a6843669f5ca88d5d725ec7 Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Mon, 27 Jan 2025 23:43:13 +0800 Subject: [PATCH 19/26] Add range query test for times function Signed-off-by: Jon Kartago Lamida --- .../testdata/ours/functions.test | 101 ++++++++++++++++++ 1 file changed, 101 insertions(+) diff --git a/pkg/streamingpromql/testdata/ours/functions.test b/pkg/streamingpromql/testdata/ours/functions.test index fb0ebaef5ac..ef1ada5e288 100644 --- a/pkg/streamingpromql/testdata/ours/functions.test +++ b/pkg/streamingpromql/testdata/ours/functions.test @@ -682,3 +682,104 @@ eval range from 0 to 7m step 1m last_over_time(some_metric_count[3m]) some_metric_count{env="prod", cluster="eu"} _ _ _ 0 1 2 3 4 some_metric_count{env="prod", cluster="us"} _ _ _ 0 2 4 6 8 some_metric_count{env="prod", cluster="au"} _ _ _ {{count:5}} {{count:10}} {{count:15}} {{count:20}} {{count:25}} + +# Test time-related functions. +load 5m + histogram_sample {{schema:0 sum:1 count:1}} + +eval range from 0 to 7m step 1m year() + {} 1970 1970 1970 1970 1970 1970 1970 1970 + +eval range from 0 to 7m step 1m time() + {} 0 60 120 180 240 300 360 420 + +eval range from 0 to 7m step 1m year(vector(1136239445)) + {} 2006 2006 2006 2006 2006 2006 2006 2006 + +eval range from 0 to 7m step 1m month() + {} 1 1 1 1 1 1 1 1 + +eval range from 0 to 7m step 1m month(vector(1136239445)) + {} 1 1 1 1 1 1 1 1 + +eval range from 0 to 7m step 1m day_of_month() + {} 1 1 1 1 1 1 1 1 + +eval range from 0 to 7m step 1m day_of_month(vector(1136239445)) + {} 2 2 2 2 2 2 2 2 + +eval range from 0 to 7m step 1m day_of_year() + {} 1 1 1 1 1 1 1 1 + +eval range from 0 to 7m step 1m day_of_year(vector(1136239445)) + {} 2 2 2 2 2 2 2 2 + +# Thursday. +eval range from 0 to 7m step 1m day_of_week() + {} 4 4 4 4 4 4 4 4 + +eval range from 0 to 7m step 1m day_of_week(vector(1136239445)) + {} 1 1 1 1 1 1 1 1 + +eval range from 0 to 7m step 1m hour() + {} 0 0 0 0 0 0 0 0 + +eval range from 0 to 7m step 1m hour(vector(1136239445)) + {} 22 22 22 22 22 22 22 22 + +eval range from 0 to 7m step 1m minute() + {} 0 1 2 3 4 5 6 7 + +eval range from 0 to 7m step 1m minute(vector(1136239445)) + {} 4 4 4 4 4 4 4 4 + +# 2008-12-31 23:59:59 just before leap second. +eval range from 0 to 7m step 1m year(vector(1230767999)) + {} 2008 2008 2008 2008 2008 2008 2008 2008 + +# 2009-01-01 00:00:00 just after leap second. +eval range from 0 to 7m step 1m year(vector(1230768000)) + {} 2009 2009 2009 2009 2009 2009 2009 2009 + +# 2016-02-29 23:59:59 February 29th in leap year. +eval range from 0 to 7m step 1m month(vector(1456790399)) + day_of_month(vector(1456790399)) / 100 + {} 2.29 2.29 2.29 2.29 2.29 2.29 2.29 2.29 + +# 2016-03-01 00:00:00 March 1st in leap year. +eval range from 0 to 7m step 1m month(vector(1456790400)) + day_of_month(vector(1456790400)) / 100 + {} 3.01 3.01 3.01 3.01 3.01 3.01 3.01 3.01 + +# 2016-12-31 13:37:00 366th day in leap year. +eval range from 0 to 7m step 1m day_of_year(vector(1483191420)) + {} 366 366 366 366 366 366 366 366 + +# 2022-12-31 13:37:00 365th day in non-leap year. +eval range from 0 to 7m step 1m day_of_year(vector(1672493820)) + {} 365 365 365 365 365 365 365 365 + +# February 1st 2016 in leap year. +eval range from 0 to 7m step 1m days_in_month(vector(1454284800)) + {} 29 29 29 29 29 29 29 29 + +# February 1st 2017 not in leap year. +eval range from 0 to 7m step 1m days_in_month(vector(1485907200)) + {} 28 28 28 28 28 28 28 28 + +# Test for histograms. +eval range from 0 to 7m step 1m day_of_month(histogram_sample) + +eval range from 0 to 7m step 1m day_of_week(histogram_sample) + +eval range from 0 to 7m step 1m day_of_year(histogram_sample) + +eval range from 0 to 7m step 1m days_in_month(histogram_sample) + +eval range from 0 to 7m step 1m hour(histogram_sample) + +eval range from 0 to 7m step 1m minute(histogram_sample) + +eval range from 0 to 7m step 1m month(histogram_sample) + +eval range from 0 to 7m step 1m year(histogram_sample) + +clear From 2185f841e5f1681f6c757dd26af79ce71cec41a4 Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Tue, 28 Jan 2025 00:21:00 +0800 Subject: [PATCH 20/26] Add more time tests Signed-off-by: Jon Kartago Lamida --- .../testdata/ours/functions.test | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/pkg/streamingpromql/testdata/ours/functions.test b/pkg/streamingpromql/testdata/ours/functions.test index ef1ada5e288..88dbc025166 100644 --- a/pkg/streamingpromql/testdata/ours/functions.test +++ b/pkg/streamingpromql/testdata/ours/functions.test @@ -693,24 +693,28 @@ eval range from 0 to 7m step 1m year() eval range from 0 to 7m step 1m time() {} 0 60 120 180 240 300 360 420 +# 2006-01-02 22:04:05 eval range from 0 to 7m step 1m year(vector(1136239445)) {} 2006 2006 2006 2006 2006 2006 2006 2006 eval range from 0 to 7m step 1m month() {} 1 1 1 1 1 1 1 1 +# 2006-01-02 22:04:05 eval range from 0 to 7m step 1m month(vector(1136239445)) {} 1 1 1 1 1 1 1 1 eval range from 0 to 7m step 1m day_of_month() {} 1 1 1 1 1 1 1 1 +# 2006-01-02 22:04:05 eval range from 0 to 7m step 1m day_of_month(vector(1136239445)) {} 2 2 2 2 2 2 2 2 eval range from 0 to 7m step 1m day_of_year() {} 1 1 1 1 1 1 1 1 +# 2006-01-02 22:04:05 eval range from 0 to 7m step 1m day_of_year(vector(1136239445)) {} 2 2 2 2 2 2 2 2 @@ -718,18 +722,21 @@ eval range from 0 to 7m step 1m day_of_year(vector(1136239445)) eval range from 0 to 7m step 1m day_of_week() {} 4 4 4 4 4 4 4 4 +# 2006-01-02 22:04:05 eval range from 0 to 7m step 1m day_of_week(vector(1136239445)) {} 1 1 1 1 1 1 1 1 eval range from 0 to 7m step 1m hour() {} 0 0 0 0 0 0 0 0 +# 2006-01-02 22:04:05 eval range from 0 to 7m step 1m hour(vector(1136239445)) {} 22 22 22 22 22 22 22 22 eval range from 0 to 7m step 1m minute() {} 0 1 2 3 4 5 6 7 +# 2006-01-02 22:04:05 eval range from 0 to 7m step 1m minute(vector(1136239445)) {} 4 4 4 4 4 4 4 4 @@ -783,3 +790,35 @@ eval range from 0 to 7m step 1m month(histogram_sample) eval range from 0 to 7m step 1m year(histogram_sample) clear + +# Test time with custom metric +# 1136239445: 2006-01-02 22:04:05 +# 1485907200: February 1st 2017 not in leap year. +# 1672493820: 2022-12-31 13:37:00 365th day in non-leap year. +load 1m + some_metric{case="series1"} 0+1136239445x7 + another_metric{case="series1"} 0+536254375x7 + some_metric{case="series2"} 0+1672493820x7 + some_metric{case="gap1"} _ 1136239445 _ 1485907200 _ 1672493820 + some_metric{case="gap2"} 1136239445 _ _ 1485907200 _ _ 1672493820 + some_metric{case="histogram-gap"} _ {{count:5}} _ {{count:5}} + some_metric{case="histogram-float"} 1136239445x7 {{count:5}} 1485907200 {{count:5}} + +eval range from 0 to 7m step 1m year(some_metric) + {case="gap1"} _ 2006 2006 2017 2017 2022 2022 2022 + {case="gap2"} 2006 2006 2006 2017 2017 2017 2022 2022 + {case="histogram-float"} 2006 2006 2006 2006 2006 2006 2006 2006 + {case="series1"} 1970 2006 2042 2078 2114 2150 2186 2222 + {case="series2"} 1970 2022 2075 2128 2181 2234 2287 2340 + +eval range from 0 to 7m step 1m month(some_metric) + {case="gap1"} _ 1 1 2 2 12 12 12 + {case="gap2"} 1 1 1 2 2 2 12 12 + {case="histogram-float"} 1 1 1 1 1 1 1 1 + {case="series1"} 1 1 1 1 1 1 1 1 + {case="series2"} 1 12 12 12 12 12 12 12 + +eval range from 0 to 7m step 1m year(some_metric{case="series1"} + another_metric{case="series1"}) + {case="series1"} 1970 2022 2075 2128 2181 2234 2287 2340 + +clear From 83024dac9ecfa64bbeac44f9023d2e5dc99f2eb2 Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Tue, 28 Jan 2025 16:47:45 +0800 Subject: [PATCH 21/26] Update pkg/streamingpromql/testdata/ours/functions.test Co-authored-by: Charles Korn --- pkg/streamingpromql/testdata/ours/functions.test | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/streamingpromql/testdata/ours/functions.test b/pkg/streamingpromql/testdata/ours/functions.test index 88dbc025166..42e6760004e 100644 --- a/pkg/streamingpromql/testdata/ours/functions.test +++ b/pkg/streamingpromql/testdata/ours/functions.test @@ -783,7 +783,7 @@ eval range from 0 to 7m step 1m days_in_month(histogram_sample) eval range from 0 to 7m step 1m hour(histogram_sample) -eval range from 0 to 7m step 1m minute(histogram_sample) +eval range from 0 to 7m step 1m minute(histogram_sample) eval range from 0 to 7m step 1m month(histogram_sample) From 6e42b66f3ac4eaf3cdc2678e70862e20d8d391ee Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Tue, 28 Jan 2025 17:30:16 +0800 Subject: [PATCH 22/26] Update pkg/streamingpromql/testdata/ours/functions.test Co-authored-by: Charles Korn --- pkg/streamingpromql/testdata/ours/functions.test | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/streamingpromql/testdata/ours/functions.test b/pkg/streamingpromql/testdata/ours/functions.test index 42e6760004e..eb39a7b5383 100644 --- a/pkg/streamingpromql/testdata/ours/functions.test +++ b/pkg/streamingpromql/testdata/ours/functions.test @@ -785,7 +785,7 @@ eval range from 0 to 7m step 1m hour(histogram_sample) eval range from 0 to 7m step 1m minute(histogram_sample) -eval range from 0 to 7m step 1m month(histogram_sample) +eval range from 0 to 7m step 1m month(histogram_sample) eval range from 0 to 7m step 1m year(histogram_sample) From 25101163880ed31bcfa9a0fa944ab708395adce5 Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Tue, 28 Jan 2025 17:30:24 +0800 Subject: [PATCH 23/26] Update pkg/streamingpromql/testdata/ours/functions.test Co-authored-by: Charles Korn --- pkg/streamingpromql/testdata/ours/functions.test | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/streamingpromql/testdata/ours/functions.test b/pkg/streamingpromql/testdata/ours/functions.test index eb39a7b5383..62ee30b4166 100644 --- a/pkg/streamingpromql/testdata/ours/functions.test +++ b/pkg/streamingpromql/testdata/ours/functions.test @@ -802,7 +802,7 @@ load 1m some_metric{case="gap1"} _ 1136239445 _ 1485907200 _ 1672493820 some_metric{case="gap2"} 1136239445 _ _ 1485907200 _ _ 1672493820 some_metric{case="histogram-gap"} _ {{count:5}} _ {{count:5}} - some_metric{case="histogram-float"} 1136239445x7 {{count:5}} 1485907200 {{count:5}} + some_metric{case="histogram-float"} 1136239445 {{count:5}} 1485907200 {{count:5}} eval range from 0 to 7m step 1m year(some_metric) {case="gap1"} _ 2006 2006 2017 2017 2022 2022 2022 From 16843e7ab802f2662b2f5bde70dabc8bb4cdb2ce Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Tue, 28 Jan 2025 17:32:12 +0800 Subject: [PATCH 24/26] Inline TimeFunctionOperatorFactory Signed-off-by: Jon Kartago Lamida --- pkg/streamingpromql/functions.go | 46 +++++++++++++++----------------- 1 file changed, 21 insertions(+), 25 deletions(-) diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index 224435a54a8..638a2c1f10b 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -61,7 +61,27 @@ func SingleInputVectorFunctionOperatorFactory(name string, f functions.FunctionO } } -func TimeFunctionOperatorFactory(name string, f functions.FunctionOverInstantVectorDefinition) InstantVectorFunctionOperatorFactory { +// InstantVectorTransformationFunctionOperatorFactory creates an InstantVectorFunctionOperatorFactory for functions +// that have exactly 1 argument (v instant-vector), and drop the series __name__ label. +// +// Parameters: +// - name: The name of the function +// - seriesDataFunc: The function to handle series data +func InstantVectorTransformationFunctionOperatorFactory(name string, seriesDataFunc functions.InstantVectorSeriesFunction) InstantVectorFunctionOperatorFactory { + f := functions.FunctionOverInstantVectorDefinition{ + SeriesDataFunc: seriesDataFunc, + SeriesMetadataFunction: functions.DropSeriesName, + } + + return SingleInputVectorFunctionOperatorFactory(name, f) +} + +func TimeTransformationFunctionOperatorFactory(name string, seriesDataFunc functions.InstantVectorSeriesFunction) InstantVectorFunctionOperatorFactory { + f := functions.FunctionOverInstantVectorDefinition{ + SeriesDataFunc: seriesDataFunc, + SeriesMetadataFunction: functions.DropSeriesName, + } + return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { if len(args) > 1 { // Should be caught by the PromQL parser, but we check here for safety. @@ -92,30 +112,6 @@ func TimeFunctionOperatorFactory(name string, f functions.FunctionOverInstantVec } } -// InstantVectorTransformationFunctionOperatorFactory creates an InstantVectorFunctionOperatorFactory for functions -// that have exactly 1 argument (v instant-vector), and drop the series __name__ label. -// -// Parameters: -// - name: The name of the function -// - seriesDataFunc: The function to handle series data -func InstantVectorTransformationFunctionOperatorFactory(name string, seriesDataFunc functions.InstantVectorSeriesFunction) InstantVectorFunctionOperatorFactory { - f := functions.FunctionOverInstantVectorDefinition{ - SeriesDataFunc: seriesDataFunc, - SeriesMetadataFunction: functions.DropSeriesName, - } - - return SingleInputVectorFunctionOperatorFactory(name, f) -} - -func TimeTransformationFunctionOperatorFactory(name string, seriesDataFunc functions.InstantVectorSeriesFunction) InstantVectorFunctionOperatorFactory { - f := functions.FunctionOverInstantVectorDefinition{ - SeriesDataFunc: seriesDataFunc, - SeriesMetadataFunction: functions.DropSeriesName, - } - - return TimeFunctionOperatorFactory(name, f) -} - // InstantVectorLabelManipulationFunctionOperatorFactory creates an InstantVectorFunctionOperator for functions // that have exactly 1 argument (v instant-vector), and need to manipulate the labels of // each series without manipulating the returned samples. From dbbba00116492e53c7b7f90bd49b7a7b001bc6e8 Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Tue, 28 Jan 2025 17:38:33 +0800 Subject: [PATCH 25/26] Fix the test Signed-off-by: Jon Kartago Lamida --- pkg/streamingpromql/testdata/ours/functions.test | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/streamingpromql/testdata/ours/functions.test b/pkg/streamingpromql/testdata/ours/functions.test index 62ee30b4166..8f0ca59e9e5 100644 --- a/pkg/streamingpromql/testdata/ours/functions.test +++ b/pkg/streamingpromql/testdata/ours/functions.test @@ -807,14 +807,14 @@ load 1m eval range from 0 to 7m step 1m year(some_metric) {case="gap1"} _ 2006 2006 2017 2017 2022 2022 2022 {case="gap2"} 2006 2006 2006 2017 2017 2017 2022 2022 - {case="histogram-float"} 2006 2006 2006 2006 2006 2006 2006 2006 + {case="histogram-float"} 2006 2006 2017 {case="series1"} 1970 2006 2042 2078 2114 2150 2186 2222 {case="series2"} 1970 2022 2075 2128 2181 2234 2287 2340 eval range from 0 to 7m step 1m month(some_metric) {case="gap1"} _ 1 1 2 2 12 12 12 {case="gap2"} 1 1 1 2 2 2 12 12 - {case="histogram-float"} 1 1 1 1 1 1 1 1 + {case="histogram-float"} 1 1 2 {case="series1"} 1 1 1 1 1 1 1 1 {case="series2"} 1 12 12 12 12 12 12 12 From 40cb57a52a9a75f4ac81848381d5d25041e39431 Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Tue, 28 Jan 2025 17:39:06 +0800 Subject: [PATCH 26/26] Return histogram to the poll Signed-off-by: Jon Kartago Lamida --- pkg/streamingpromql/operators/functions/times.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/pkg/streamingpromql/operators/functions/times.go b/pkg/streamingpromql/operators/functions/times.go index db7a89aa8fe..8b5b5cfbe1f 100644 --- a/pkg/streamingpromql/operators/functions/times.go +++ b/pkg/streamingpromql/operators/functions/times.go @@ -43,18 +43,15 @@ var Year = timeWrapperFunc(func(t time.Time) float64 { func timeWrapperFunc(f func(t time.Time) float64) InstantVectorSeriesFunction { return func(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { - defer types.HPointSlicePool.Put(seriesData.Histograms, memoryConsumptionTracker) - - if len(seriesData.Floats) > 0 { - for i := range seriesData.Floats { - t := time.Unix(int64(seriesData.Floats[i].F), 0).UTC() - seriesData.Floats[i].F = f(t) - } - return seriesData, nil - } // we don't do time based function on histograms + types.HPointSlicePool.Put(seriesData.Histograms, memoryConsumptionTracker) seriesData.Histograms = nil + + for i := range seriesData.Floats { + t := time.Unix(int64(seriesData.Floats[i].F), 0).UTC() + seriesData.Floats[i].F = f(t) + } return seriesData, nil } }