Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQE: introduce series metadata function type #9558

Merged
merged 4 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* `cortex_alertmanager_alerts`
* `cortex_alertmanager_silences`
* [CHANGE] Cache: Deprecate experimental support for Redis as a cache backend. #9453
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558
* [FEATURE] Query-frontend: added experimental configuration options `query-frontend.cache-errors` and `query-frontend.results-cache-ttl-for-errors` to allow non-transient responses to be cached. When set to `true` error responses from hitting limits or bad data are cached for a short TTL. #9028
* [FEATURE] gRPC: Support S2 compression. #9322
* `-alertmanager.alertmanager-client.grpc-compression=s2`
Expand Down
30 changes: 14 additions & 16 deletions pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func SingleInputVectorFunctionOperatorFactory(name string, f functions.FunctionO

var o types.InstantVectorOperator = operators.NewFunctionOverInstantVector(inner, memoryConsumptionTracker, f, expressionPosition)

if f.NeedsSeriesDeduplication {
if f.SeriesMetadataFunction.NeedsSeriesDeduplication {
o = operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker)
}

Expand All @@ -67,9 +67,8 @@ func SingleInputVectorFunctionOperatorFactory(name string, f functions.FunctionO
// - seriesDataFunc: The function to handle series data
func InstantVectorTransformationFunctionOperatorFactory(name string, seriesDataFunc functions.InstantVectorSeriesFunction) InstantVectorFunctionOperatorFactory {
f := functions.FunctionOverInstantVector{
SeriesDataFunc: seriesDataFunc,
SeriesMetadataFunc: functions.DropSeriesName,
NeedsSeriesDeduplication: true,
SeriesDataFunc: seriesDataFunc,
SeriesMetadataFunction: functions.DropSeriesName,
}

return SingleInputVectorFunctionOperatorFactory(name, f)
Expand All @@ -83,12 +82,10 @@ func InstantVectorTransformationFunctionOperatorFactory(name string, seriesDataF
// Parameters:
// - name: The name of the function
// - metadataFunc: The function for handling metadata
// - needsSeriesDeduplication: Set to true if metadataFunc may produce multiple series with the same labels and therefore deduplication is required
func InstantVectorLabelManipulationFunctionOperatorFactory(name string, metadataFunc functions.SeriesMetadataFunction, needsSeriesDeduplication bool) InstantVectorFunctionOperatorFactory {
func InstantVectorLabelManipulationFunctionOperatorFactory(name string, metadataFunc functions.SeriesMetadataFunctionDefinition) InstantVectorFunctionOperatorFactory {
f := functions.FunctionOverInstantVector{
SeriesDataFunc: functions.PassthroughData,
SeriesMetadataFunc: metadataFunc,
NeedsSeriesDeduplication: needsSeriesDeduplication,
SeriesDataFunc: functions.PassthroughData,
SeriesMetadataFunction: metadataFunc,
}

return SingleInputVectorFunctionOperatorFactory(name, f)
Expand Down Expand Up @@ -118,7 +115,7 @@ func FunctionOverRangeVectorOperatorFactory(

var o types.InstantVectorOperator = operators.NewFunctionOverRangeVector(inner, memoryConsumptionTracker, f, annotations, expressionPosition)

if f.NeedsSeriesDeduplication {
if f.SeriesMetadataFunction.NeedsSeriesDeduplication {
o = operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker)
}

Expand Down Expand Up @@ -179,9 +176,11 @@ func LabelReplaceFunctionOperatorFactory() InstantVectorFunctionOperatorFactory
}

f := functions.FunctionOverInstantVector{
SeriesDataFunc: functions.PassthroughData,
SeriesMetadataFunc: functions.LabelReplaceFactory(dstLabel, replacement, srcLabel, regex),
NeedsSeriesDeduplication: true,
SeriesDataFunc: functions.PassthroughData,
SeriesMetadataFunction: functions.SeriesMetadataFunctionDefinition{
Func: functions.LabelReplaceFactory(dstLabel, replacement, srcLabel, regex),
NeedsSeriesDeduplication: true,
},
}

o := operators.NewFunctionOverInstantVector(inner, memoryConsumptionTracker, f, expressionPosition)
Expand Down Expand Up @@ -283,9 +282,8 @@ func instantVectorToScalarOperatorFactory(args []types.Operator, memoryConsumpti

func unaryNegationOfInstantVectorOperatorFactory(inner types.InstantVectorOperator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, expressionPosition posrange.PositionRange) types.InstantVectorOperator {
f := functions.FunctionOverInstantVector{
SeriesDataFunc: functions.UnaryNegation,
SeriesMetadataFunc: functions.DropSeriesName,
NeedsSeriesDeduplication: true,
SeriesDataFunc: functions.UnaryNegation,
SeriesMetadataFunction: functions.DropSeriesName,
}

o := operators.NewFunctionOverInstantVector(inner, memoryConsumptionTracker, f, expressionPosition)
Expand Down
50 changes: 26 additions & 24 deletions pkg/streamingpromql/functions/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@ import (
// SeriesMetadataFunction is a function to operate on the metadata across series.
type SeriesMetadataFunction func(seriesMetadata []types.SeriesMetadata, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) ([]types.SeriesMetadata, error)

// DropSeriesName is a SeriesMetadataFunc that removes the __name__ label from all series in seriesMetadata.
//
// It does not check that the list of returned series is free of duplicates.
func DropSeriesName(seriesMetadata []types.SeriesMetadata, _ *limiting.MemoryConsumptionTracker) ([]types.SeriesMetadata, error) {
for i := range seriesMetadata {
seriesMetadata[i].Labels = seriesMetadata[i].Labels.DropMetricName()
}
// DropSeriesName is a series metadata function that removes the __name__ label from all series.
var DropSeriesName = SeriesMetadataFunctionDefinition{
Func: func(seriesMetadata []types.SeriesMetadata, _ *limiting.MemoryConsumptionTracker) ([]types.SeriesMetadata, error) {
for i := range seriesMetadata {
seriesMetadata[i].Labels = seriesMetadata[i].Labels.DropMetricName()
}

return seriesMetadata, nil
return seriesMetadata, nil
},
NeedsSeriesDeduplication: true,
}

// InstantVectorSeriesFunction is a function that takes in an instant vector and produces an instant vector.
Expand Down Expand Up @@ -93,16 +94,10 @@ type FunctionOverInstantVector struct {
// SeriesDataFunc is the function that computes an output series for a single input series.
SeriesDataFunc InstantVectorSeriesFunction

// SeriesMetadataFunc is the function that computes the output series for this function based on the given input series.
// SeriesMetadataFunction is the function that computes the output series for this function based on the given input series.
//
// If SeriesMetadataFunc is nil, the input series are used as-is.
SeriesMetadataFunc SeriesMetadataFunction

// NeedsSeriesDeduplication enables deduplication and merging of output series with the same labels.
//
// This should be set to true if SeriesMetadataFunc modifies the input series labels in such a way that duplicates may be
// present in the output series labels (eg. dropping a label).
NeedsSeriesDeduplication bool
// If SeriesMetadataFunction.Func is nil, the input series are used as-is.
SeriesMetadataFunction SeriesMetadataFunctionDefinition
}

type FunctionOverRangeVector struct {
Expand All @@ -120,17 +115,24 @@ type FunctionOverRangeVector struct {
// SeriesValidationFuncFactory can be nil, in which case no validation is performed.
SeriesValidationFuncFactory RangeVectorSeriesValidationFunctionFactory

// SeriesMetadataFunc is the function that computes the output series for this function based on the given input series.
// SeriesMetadataFunction is the function that computes the output series for this function based on the given input series.
//
// If SeriesMetadataFunc is nil, the input series are used as-is.
SeriesMetadataFunc SeriesMetadataFunction
// If SeriesMetadataFunction.Func is nil, the input series are used as-is.
SeriesMetadataFunction SeriesMetadataFunctionDefinition

// NeedsSeriesNamesForAnnotations indicates that this function uses the names of input series when emitting annotations.
NeedsSeriesNamesForAnnotations bool
}

type SeriesMetadataFunctionDefinition struct {
// Func is the function that computes the output series for this function based on the given input series.
//
// If Func is nil, the input series are used as-is.
Func SeriesMetadataFunction

// NeedsSeriesDeduplication enables deduplication and merging of output series with the same labels.
//
// This should be set to true if SeriesMetadataFunc modifies the input series labels in such a way that duplicates may be
// This should be set to true if Func modifies the input series labels in such a way that duplicates may be
// present in the output series labels (eg. dropping a label).
NeedsSeriesDeduplication bool

// NeedsSeriesNamesForAnnotations indicates that this function uses the names of input series when emitting annotations.
NeedsSeriesNamesForAnnotations bool
}
2 changes: 1 addition & 1 deletion pkg/streamingpromql/functions/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestDropSeriesName(t *testing.T) {
{Labels: labels.FromStrings("label2", "value2")},
}

modifiedMetadata, err := DropSeriesName(seriesMetadata, limiting.NewMemoryConsumptionTracker(0, nil))
modifiedMetadata, err := DropSeriesName.Func(seriesMetadata, limiting.NewMemoryConsumptionTracker(0, nil))
require.NoError(t, err)
require.Equal(t, expected, modifiedMetadata)
}
Expand Down
30 changes: 12 additions & 18 deletions pkg/streamingpromql/functions/range_vectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ import (
)

var CountOverTime = FunctionOverRangeVector{
SeriesMetadataFunc: DropSeriesName,
NeedsSeriesDeduplication: true,
StepFunc: countOverTime,
SeriesMetadataFunction: DropSeriesName,
StepFunc: countOverTime,
}

func countOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, hPoints *types.HPointRingBuffer, _ EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
Expand All @@ -34,8 +33,8 @@ func countOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPo
}

var LastOverTime = FunctionOverRangeVector{
SeriesMetadataFunc: nil, // We want to use the input series as-is.
StepFunc: lastOverTime,
// We want to use the input series as-is, so no need to set SeriesMetadataFunction.
StepFunc: lastOverTime,
}

func lastOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, hPoints *types.HPointRingBuffer, _ EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
Expand All @@ -55,9 +54,8 @@ func lastOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPoi
}

var PresentOverTime = FunctionOverRangeVector{
SeriesMetadataFunc: DropSeriesName,
NeedsSeriesDeduplication: true,
StepFunc: presentOverTime,
SeriesMetadataFunction: DropSeriesName,
StepFunc: presentOverTime,
}

func presentOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, hPoints *types.HPointRingBuffer, _ EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
Expand All @@ -69,9 +67,8 @@ func presentOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.F
}

var MaxOverTime = FunctionOverRangeVector{
SeriesMetadataFunc: DropSeriesName,
NeedsSeriesDeduplication: true,
StepFunc: maxOverTime,
SeriesMetadataFunction: DropSeriesName,
StepFunc: maxOverTime,
}

func maxOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, _ *types.HPointRingBuffer, _ EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
Expand Down Expand Up @@ -107,9 +104,8 @@ func maxOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPoin
}

var MinOverTime = FunctionOverRangeVector{
SeriesMetadataFunc: DropSeriesName,
NeedsSeriesDeduplication: true,
StepFunc: minOverTime,
SeriesMetadataFunction: DropSeriesName,
StepFunc: minOverTime,
}

func minOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, _ *types.HPointRingBuffer, _ EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
Expand Down Expand Up @@ -145,8 +141,7 @@ func minOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPoin
}

var SumOverTime = FunctionOverRangeVector{
SeriesMetadataFunc: DropSeriesName,
NeedsSeriesDeduplication: true,
SeriesMetadataFunction: DropSeriesName,
StepFunc: sumOverTime,
NeedsSeriesNamesForAnnotations: true,
}
Expand Down Expand Up @@ -221,8 +216,7 @@ func sumHistograms(head, tail []promql.HPoint, emitAnnotation EmitAnnotationFunc
}

var AvgOverTime = FunctionOverRangeVector{
SeriesMetadataFunc: DropSeriesName,
NeedsSeriesDeduplication: true,
SeriesMetadataFunction: DropSeriesName,
StepFunc: avgOverTime,
NeedsSeriesNamesForAnnotations: true,
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/streamingpromql/functions/rate_increase.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,15 @@ import (
var Rate = FunctionOverRangeVector{
StepFunc: rate(true),
SeriesValidationFuncFactory: rateSeriesValidator,
SeriesMetadataFunc: DropSeriesName,
SeriesMetadataFunction: DropSeriesName,
NeedsSeriesNamesForAnnotations: true,
NeedsSeriesDeduplication: true,
}

var Increase = FunctionOverRangeVector{
StepFunc: rate(false),
SeriesValidationFuncFactory: rateSeriesValidator,
SeriesMetadataFunc: DropSeriesName,
SeriesMetadataFunction: DropSeriesName,
NeedsSeriesNamesForAnnotations: true,
NeedsSeriesDeduplication: true,
}

// isRate is true for `rate` function, or false for `instant` function
Expand Down
4 changes: 2 additions & 2 deletions pkg/streamingpromql/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import (

func TestRegisterInstantVectorFunctionOperatorFactory(t *testing.T) {
// Register an already existing function
err := RegisterInstantVectorFunctionOperatorFactory("acos", InstantVectorLabelManipulationFunctionOperatorFactory("acos", functions.DropSeriesName, true))
err := RegisterInstantVectorFunctionOperatorFactory("acos", InstantVectorLabelManipulationFunctionOperatorFactory("acos", functions.DropSeriesName))
require.Error(t, err)
require.Equal(t, "function 'acos' has already been registered", err.Error())

// Register a new function
newFunc := InstantVectorLabelManipulationFunctionOperatorFactory("new_function", functions.DropSeriesName, true)
newFunc := InstantVectorLabelManipulationFunctionOperatorFactory("new_function", functions.DropSeriesName)
err = RegisterInstantVectorFunctionOperatorFactory("new_function", newFunc)
require.NoError(t, err)
require.Contains(t, instantVectorFunctionOperatorFactories, "new_function")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ func (m *FunctionOverInstantVector) SeriesMetadata(ctx context.Context) ([]types
return nil, err
}

if m.Func.SeriesMetadataFunc != nil {
return m.Func.SeriesMetadataFunc(metadata, m.MemoryConsumptionTracker)
if m.Func.SeriesMetadataFunction.Func != nil {
return m.Func.SeriesMetadataFunction.Func(metadata, m.MemoryConsumptionTracker)
}

return metadata, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ func TestFunctionOverInstantVector(t *testing.T) {
Inner: inner,
MemoryConsumptionTracker: limiting.NewMemoryConsumptionTracker(0, nil),
Func: functions.FunctionOverInstantVector{
SeriesMetadataFunc: mustBeCalledMetadata,
SeriesDataFunc: mustBeCalledSeriesData,
SeriesDataFunc: mustBeCalledSeriesData,
SeriesMetadataFunction: functions.SeriesMetadataFunctionDefinition{
Func: mustBeCalledMetadata,
},
},
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/streamingpromql/operators/function_over_range_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ func (m *FunctionOverRangeVector) SeriesMetadata(ctx context.Context) ([]types.S
m.numSteps = m.Inner.StepCount()
m.rangeSeconds = m.Inner.Range().Seconds()

if m.Func.SeriesMetadataFunc != nil {
return m.Func.SeriesMetadataFunc(metadata, m.MemoryConsumptionTracker)
if m.Func.SeriesMetadataFunction.Func != nil {
return m.Func.SeriesMetadataFunction.Func(metadata, m.MemoryConsumptionTracker)
}

return metadata, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (v *VectorScalarBinaryOperation) SeriesMetadata(ctx context.Context) ([]typ
if !v.Op.IsComparisonOperator() || v.ReturnBool {
// We don't need to do deduplication and merging of series in this operator: we expect that this operator
// is wrapped in a DeduplicateAndMerge.
metadata, err = functions.DropSeriesName(metadata, v.MemoryConsumptionTracker)
metadata, err = functions.DropSeriesName.Func(metadata, v.MemoryConsumptionTracker)
if err != nil {
return nil, err
}
Expand Down
Loading