Skip to content

Commit

Permalink
MQE: add support for predict_linear (#10503)
Browse files Browse the repository at this point in the history
* Enable upstream test cases

* Simplify `deriv` handling of mixed floats and histograms

* Add support for scalar args to `FunctionOverRangeVector`

* Add support for `predict_linear`

* Add tests for annotations

* Add test for `predict_linear` with varying time parameter

* Address PR feedback: remove unnecessary check

* Fix typo

* Address PR feedback: add to mixed metrics tests

* Add `predict_linear` to `TestFunctionDeduplicateAndMerge`
  • Loading branch information
charleskorn authored Feb 3, 2025
1 parent d3dd093 commit e3862e4
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 52 deletions.
52 changes: 51 additions & 1 deletion pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2413,6 +2413,54 @@ func TestDeltaAnnotations(t *testing.T) {
runAnnotationTests(t, testCases)
}

func TestDerivPredictLinearAnnotations(t *testing.T) {
data := `
only_floats 0 1
only_histograms {{count:0}} {{count:0}}
mixed 0 {{count:0}}
`

testCases := map[string]annotationTestCase{
"deriv() over series with only floats": {
data: data,
expr: `deriv(only_floats[1m1s])`,
// Expect no annotations.
},
"deriv() over series with only histograms": {
data: data,
expr: `deriv(only_histograms[1m1s])`,
// Expect no annotations.
},
"deriv() over series with both floats and histograms": {
data: data,
expr: `deriv(mixed[1m1s])`,
expectedInfoAnnotations: []string{
`PromQL info: ignored histograms in a range containing both floats and histograms for metric name "mixed" (1:7)`,
},
},

"predict_linear() over series with only floats": {
data: data,
expr: `predict_linear(only_floats[1m1s], 5)`,
// Expect no annotations.
},
"predict_linear() over series with only histograms": {
data: data,
expr: `predict_linear(only_histograms[1m1s], 5)`,
// Expect no annotations.
},
"predict_linear() over series with both floats and histograms": {
data: data,
expr: `predict_linear(mixed[1m1s], 5)`,
expectedInfoAnnotations: []string{
`PromQL info: ignored histograms in a range containing both floats and histograms for metric name "mixed" (1:16)`,
},
},
}

runAnnotationTests(t, testCases)
}

func TestBinaryOperationAnnotations(t *testing.T) {
mixedFloatHistogramData := `
metric{type="float", series="1"} 0+1x3
Expand Down Expand Up @@ -2875,12 +2923,14 @@ func TestCompareVariousMixedMetricsVectorSelectors(t *testing.T) {

for _, labels := range labelCombinations {
labelRegex := strings.Join(labels, "|")
for _, function := range []string{"rate", "increase", "changes", "resets", "deriv", "irate", "idelta", "delta"} {
for _, function := range []string{"rate", "increase", "changes", "resets", "deriv", "irate", "idelta", "delta", "deriv"} {
expressions = append(expressions, fmt.Sprintf(`%s(series{label=~"(%s)"}[45s])`, function, labelRegex))
expressions = append(expressions, fmt.Sprintf(`%s(series{label=~"(%s)"}[1m])`, function, labelRegex))
expressions = append(expressions, fmt.Sprintf(`sum(%s(series{label=~"(%s)"}[2m15s]))`, function, labelRegex))
expressions = append(expressions, fmt.Sprintf(`sum(%s(series{label=~"(%s)"}[5m]))`, function, labelRegex))
}

expressions = append(expressions, fmt.Sprintf(`predict_linear(series{label=~"(%s)"}[1m], 30)`, labelRegex))
}

runMixedMetricsTests(t, expressions, pointsPerSeries, seriesData, false)
Expand Down
32 changes: 31 additions & 1 deletion pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func FunctionOverRangeVectorOperatorFactory(
return nil, fmt.Errorf("expected a range vector argument for %s, got %T", name, args[0])
}

var o types.InstantVectorOperator = functions.NewFunctionOverRangeVector(inner, memoryConsumptionTracker, f, annotations, expressionPosition, timeRange)
var o types.InstantVectorOperator = functions.NewFunctionOverRangeVector(inner, nil, memoryConsumptionTracker, f, annotations, expressionPosition, timeRange)

if f.SeriesMetadataFunction.NeedsSeriesDeduplication {
o = operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker)
Expand All @@ -159,6 +159,35 @@ func FunctionOverRangeVectorOperatorFactory(
}
}

func PredictLinearFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
f := functions.PredictLinear

if len(args) != 2 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 2 arguments for predict_linear, got %v", len(args))
}

inner, ok := args[0].(types.RangeVectorOperator)
if !ok {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected first argument for predict_linear to be a range vector, got %T", args[0])
}

arg, ok := args[1].(types.ScalarOperator)
if !ok {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected second argument for predict_linear to be a scalar, got %T", args[1])
}

var o types.InstantVectorOperator = functions.NewFunctionOverRangeVector(inner, []types.ScalarOperator{arg}, memoryConsumptionTracker, f, annotations, expressionPosition, timeRange)

if f.SeriesMetadataFunction.NeedsSeriesDeduplication {
o = operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker)
}

return o, nil
}

func scalarToInstantVectorOperatorFactory(args []types.Operator, _ *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) {
if len(args) != 1 {
// Should be caught by the PromQL parser, but we check here for safety.
Expand Down Expand Up @@ -444,6 +473,7 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe
"min_over_time": FunctionOverRangeVectorOperatorFactory("min_over_time", functions.MinOverTime),
"minute": TimeTransformationFunctionOperatorFactory("minute", functions.Minute),
"month": TimeTransformationFunctionOperatorFactory("month", functions.Month),
"predict_linear": PredictLinearFactory,
"present_over_time": FunctionOverRangeVectorOperatorFactory("present_over_time", functions.PresentOverTime),
"rad": InstantVectorTransformationFunctionOperatorFactory("rad", functions.Rad),
"rate": FunctionOverRangeVectorOperatorFactory("rate", functions.Rate),
Expand Down
1 change: 1 addition & 0 deletions pkg/streamingpromql/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func TestFunctionDeduplicateAndMerge(t *testing.T) {
"min_over_time": `min_over_time({__name__=~"float.*"}[1m])`,
"minute": `minute({__name__=~"float.*"})`,
"month": `month({__name__=~"float.*"})`,
"predict_linear": `predict_linear({__name__=~"float.*"}[1m], 30)`,
"present_over_time": `present_over_time({__name__=~"float.*"}[1m])`,
"rad": `rad({__name__=~"float.*"})`,
"rate": `rate({__name__=~"float.*"}[1m])`,
Expand Down
2 changes: 2 additions & 0 deletions pkg/streamingpromql/operators/functions/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ func PassthroughData(seriesData types.InstantVectorSeriesData, _ []types.ScalarD
type RangeVectorStepFunction func(
step *types.RangeVectorStepData,
rangeSeconds float64,
scalarArgsData []types.ScalarData,
timeRange types.QueryTimeRange,
emitAnnotation types.EmitAnnotationFunc,
) (f float64, hasFloat bool, h *histogram.FloatHistogram, err error)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ import (
// FunctionOverRangeVector performs a rate calculation over a range vector.
type FunctionOverRangeVector struct {
Inner types.RangeVectorOperator
ScalarArgs []types.ScalarOperator
MemoryConsumptionTracker *limiting.MemoryConsumptionTracker
Func FunctionOverRangeVectorDefinition

Annotations *annotations.Annotations

scalarArgsData []types.ScalarData

metricNames *operators.MetricNames
currentSeriesIndex int

Expand All @@ -41,6 +44,7 @@ var _ types.InstantVectorOperator = &FunctionOverRangeVector{}

func NewFunctionOverRangeVector(
inner types.RangeVectorOperator,
scalarArgs []types.ScalarOperator,
memoryConsumptionTracker *limiting.MemoryConsumptionTracker,
f FunctionOverRangeVectorDefinition,
annotations *annotations.Annotations,
Expand All @@ -49,6 +53,7 @@ func NewFunctionOverRangeVector(
) *FunctionOverRangeVector {
o := &FunctionOverRangeVector{
Inner: inner,
ScalarArgs: scalarArgs,
MemoryConsumptionTracker: memoryConsumptionTracker,
Func: f,
Annotations: annotations,
Expand All @@ -74,6 +79,10 @@ func (m *FunctionOverRangeVector) ExpressionPosition() posrange.PositionRange {
}

func (m *FunctionOverRangeVector) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) {
if err := m.processScalarArgs(ctx); err != nil {
return nil, err
}

metadata, err := m.Inner.SeriesMetadata(ctx)
if err != nil {
return nil, err
Expand All @@ -92,6 +101,24 @@ func (m *FunctionOverRangeVector) SeriesMetadata(ctx context.Context) ([]types.S
return metadata, nil
}

func (m *FunctionOverRangeVector) processScalarArgs(ctx context.Context) error {
if len(m.ScalarArgs) == 0 {
return nil
}

m.scalarArgsData = make([]types.ScalarData, 0, len(m.ScalarArgs))

for _, arg := range m.ScalarArgs {
d, err := arg.GetValues(ctx)
if err != nil {
return err
}
m.scalarArgsData = append(m.scalarArgsData, d)
}

return nil
}

func (m *FunctionOverRangeVector) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) {
if err := m.Inner.NextSeries(ctx); err != nil {
return types.InstantVectorSeriesData{}, err
Expand All @@ -117,7 +144,7 @@ func (m *FunctionOverRangeVector) NextSeries(ctx context.Context) (types.Instant
return types.InstantVectorSeriesData{}, err
}

f, hasFloat, h, err := m.Func.StepFunc(step, m.rangeSeconds, m.emitAnnotationFunc)
f, hasFloat, h, err := m.Func.StepFunc(step, m.rangeSeconds, m.scalarArgsData, m.timeRange, m.emitAnnotationFunc)
if err != nil {
return types.InstantVectorSeriesData{}, err
}
Expand Down Expand Up @@ -157,4 +184,8 @@ func (m *FunctionOverRangeVector) emitAnnotation(generator types.AnnotationGener

func (m *FunctionOverRangeVector) Close() {
m.Inner.Close()

for _, d := range m.scalarArgsData {
types.FPointSlicePool.Put(d.Samples, m.MemoryConsumptionTracker)
}
}
47 changes: 33 additions & 14 deletions pkg/streamingpromql/operators/functions/range_vectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var CountOverTime = FunctionOverRangeVectorDefinition{
StepFunc: countOverTime,
}

func countOverTime(step *types.RangeVectorStepData, _ float64, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
func countOverTime(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
fPointCount := step.Floats.Count()
hPointCount := step.Histograms.Count()

Expand All @@ -37,7 +37,7 @@ var LastOverTime = FunctionOverRangeVectorDefinition{
StepFunc: lastOverTime,
}

func lastOverTime(step *types.RangeVectorStepData, _ float64, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
func lastOverTime(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
lastFloat, floatAvailable := step.Floats.Last()
lastHistogram, histogramAvailable := step.Histograms.Last()

Expand All @@ -58,7 +58,7 @@ var PresentOverTime = FunctionOverRangeVectorDefinition{
StepFunc: presentOverTime,
}

func presentOverTime(step *types.RangeVectorStepData, _ float64, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
func presentOverTime(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
if step.Floats.Any() || step.Histograms.Any() {
return 1, true, nil, nil
}
Expand All @@ -72,7 +72,7 @@ var MaxOverTime = FunctionOverRangeVectorDefinition{
NeedsSeriesNamesForAnnotations: true,
}

func maxOverTime(step *types.RangeVectorStepData, _ float64, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
func maxOverTime(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
head, tail := step.Floats.UnsafePoints()

if len(head) == 0 && len(tail) == 0 {
Expand Down Expand Up @@ -107,7 +107,7 @@ var MinOverTime = FunctionOverRangeVectorDefinition{
NeedsSeriesNamesForAnnotations: true,
}

func minOverTime(step *types.RangeVectorStepData, _ float64, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
func minOverTime(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
head, tail := step.Floats.UnsafePoints()

if len(head) == 0 && len(tail) == 0 {
Expand Down Expand Up @@ -142,7 +142,7 @@ var SumOverTime = FunctionOverRangeVectorDefinition{
NeedsSeriesNamesForAnnotations: true,
}

func sumOverTime(step *types.RangeVectorStepData, _ float64, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
func sumOverTime(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
fHead, fTail := step.Floats.UnsafePoints()
hHead, hTail := step.Histograms.UnsafePoints()

Expand Down Expand Up @@ -207,7 +207,7 @@ var AvgOverTime = FunctionOverRangeVectorDefinition{
NeedsSeriesNamesForAnnotations: true,
}

func avgOverTime(step *types.RangeVectorStepData, _ float64, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
func avgOverTime(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
fHead, fTail := step.Floats.UnsafePoints()
hHead, hTail := step.Histograms.UnsafePoints()

Expand Down Expand Up @@ -357,7 +357,7 @@ var Resets = FunctionOverRangeVectorDefinition{
}

func resetsChanges(isReset bool) RangeVectorStepFunction {
return func(step *types.RangeVectorStepData, _ float64, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
return func(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
fHead, fTail := step.Floats.UnsafePoints()
hHead, hTail := step.Histograms.UnsafePoints()

Expand Down Expand Up @@ -483,12 +483,11 @@ var Deriv = FunctionOverRangeVectorDefinition{
NeedsSeriesNamesForAnnotations: true,
}

func deriv(step *types.RangeVectorStepData, _ float64, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
func deriv(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
fHead, fTail := step.Floats.UnsafePoints()

if len(fHead)+len(fTail) == 1 && step.Histograms.Any() {
if step.Floats.Any() && step.Histograms.Any() {
emitAnnotation(annotations.NewHistogramIgnoredInMixedRangeInfo)
return 0, false, nil, nil
}

if (len(fHead) + len(fTail)) < 2 {
Expand All @@ -497,11 +496,31 @@ func deriv(step *types.RangeVectorStepData, _ float64, emitAnnotation types.Emit

slope, _ := linearRegression(fHead, fTail, fHead[0].T)

if step.Histograms.Any() {
return slope, true, nil, nil
}

var PredictLinear = FunctionOverRangeVectorDefinition{
SeriesMetadataFunction: DropSeriesName,
StepFunc: predictLinear,
NeedsSeriesNamesForAnnotations: true,
}

func predictLinear(step *types.RangeVectorStepData, _ float64, args []types.ScalarData, timeRange types.QueryTimeRange, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
fHead, fTail := step.Floats.UnsafePoints()

if step.Floats.Any() && step.Histograms.Any() {
emitAnnotation(annotations.NewHistogramIgnoredInMixedRangeInfo)
}

return slope, true, nil, nil
if (len(fHead) + len(fTail)) < 2 {
return 0, false, nil, nil
}

slope, intercept := linearRegression(fHead, fTail, step.StepT)
tArg := args[0]
duration := tArg.Samples[timeRange.PointIndex(step.StepT)].F

return slope*duration + intercept, true, nil, nil
}

func linearRegression(head, tail []promql.FPoint, interceptTime int64) (slope, intercept float64) {
Expand Down Expand Up @@ -565,7 +584,7 @@ var Idelta = FunctionOverRangeVectorDefinition{
}

func irateIdelta(isRate bool) RangeVectorStepFunction {
return func(step *types.RangeVectorStepData, _ float64, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
return func(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
// Histograms are ignored
fHead, fTail := step.Floats.UnsafePoints()

Expand Down
4 changes: 2 additions & 2 deletions pkg/streamingpromql/operators/functions/rate_increase.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var Delta = FunctionOverRangeVectorDefinition{

// isRate is true for `rate` function, or false for `instant` function
func rate(isRate bool) RangeVectorStepFunction {
return func(step *types.RangeVectorStepData, rangeSeconds float64, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
return func(step *types.RangeVectorStepData, rangeSeconds float64, _ []types.ScalarData, _ types.QueryTimeRange, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
fHead, fTail := step.Floats.UnsafePoints()
fCount := len(fHead) + len(fTail)

Expand Down Expand Up @@ -279,7 +279,7 @@ func rateSeriesValidator() RangeVectorSeriesValidationFunction {
}
}

func delta(step *types.RangeVectorStepData, rangeSeconds float64, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
func delta(step *types.RangeVectorStepData, rangeSeconds float64, _ []types.ScalarData, _ types.QueryTimeRange, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
fHead, fTail := step.Floats.UnsafePoints()
fCount := len(fHead) + len(fTail)

Expand Down
9 changes: 9 additions & 0 deletions pkg/streamingpromql/testdata/ours/functions.test
Original file line number Diff line number Diff line change
Expand Up @@ -843,3 +843,12 @@ eval range from 0 to 2m step 1m timestamp(some_metric * 2)
eval_info range from 0 to 2m step 1m timestamp(rate(some_metric{case=~".* only"}[2m]))
{case="floats only"} _ 60 120
{case="histograms only"} _ 60 120

clear

load 30s
prediction_time 5 10 15 20 25
metric 3 20 4 7 80

eval range from 0 to 2m step 1m predict_linear(metric[1m1s], scalar(prediction_time))
{} _ 9.75 100
Loading

0 comments on commit e3862e4

Please sign in to comment.