diff --git a/CHANGELOG.md b/CHANGELOG.md index 08048ccc42a..4ac46c6fac8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,7 +28,7 @@ * [CHANGE] Ingester: increase the default inactivity timeout of active series (`-ingester.active-series-metrics-idle-timeout`) from `10m` to `20m`. #8975 * [CHANGE] Distributor: Remove `-distributor.enable-otlp-metadata-storage` flag, which was deprecated in version 2.12. #9069 * [CHANGE] Ruler: Removed `-ruler.drain-notification-queue-on-shutdown` option, which is now enabled by default. #9115 -* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 +* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 * [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 #8988 * What it is: * When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path. diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index 6c55fe8db1b..d6f808bc52e 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -35,10 +35,8 @@ type ScalarFunctionOperatorFactory func( // // Parameters: // - name: The name of the function -// - metadataFunc: The function for handling metadata -// - seriesDataFunc: The function to handle series data -// - needsSeriesDeduplication: Set to true if metadataFunc may produce multiple series with the same labels and therefore deduplication is required -func SingleInputVectorFunctionOperatorFactory(name string, metadataFunc functions.SeriesMetadataFunction, seriesDataFunc functions.InstantVectorFunction, needsSeriesDeduplication bool) InstantVectorFunctionOperatorFactory { +// - f: The function implementation +func SingleInputVectorFunctionOperatorFactory(name string, f functions.FunctionOverInstantVector) InstantVectorFunctionOperatorFactory { return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange) (types.InstantVectorOperator, error) { if len(args) != 1 { // Should be caught by the PromQL parser, but we check here for safety. @@ -51,9 +49,9 @@ func SingleInputVectorFunctionOperatorFactory(name string, metadataFunc function return nil, fmt.Errorf("expected an instant vector argument for %s, got %T", name, args[0]) } - var o types.InstantVectorOperator = operators.NewFunctionOverInstantVector(inner, memoryConsumptionTracker, metadataFunc, seriesDataFunc, expressionPosition) + var o types.InstantVectorOperator = operators.NewFunctionOverInstantVector(inner, memoryConsumptionTracker, f, expressionPosition) - if needsSeriesDeduplication { + if f.NeedsSeriesDeduplication { o = operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker) } @@ -67,8 +65,14 @@ func SingleInputVectorFunctionOperatorFactory(name string, metadataFunc function // Parameters: // - name: The name of the function // - seriesDataFunc: The function to handle series data -func InstantVectorTransformationFunctionOperatorFactory(name string, seriesDataFunc functions.InstantVectorFunction) InstantVectorFunctionOperatorFactory { - return SingleInputVectorFunctionOperatorFactory(name, functions.DropSeriesName, seriesDataFunc, true) +func InstantVectorTransformationFunctionOperatorFactory(name string, seriesDataFunc functions.InstantVectorSeriesFunction) InstantVectorFunctionOperatorFactory { + f := functions.FunctionOverInstantVector{ + SeriesDataFunc: seriesDataFunc, + SeriesMetadataFunc: functions.DropSeriesName, + NeedsSeriesDeduplication: true, + } + + return SingleInputVectorFunctionOperatorFactory(name, f) } // InstantVectorLabelManipulationFunctionOperatorFactory creates an InstantVectorFunctionOperator for functions @@ -81,7 +85,13 @@ func InstantVectorTransformationFunctionOperatorFactory(name string, seriesDataF // - metadataFunc: The function for handling metadata // - needsSeriesDeduplication: Set to true if metadataFunc may produce multiple series with the same labels and therefore deduplication is required func InstantVectorLabelManipulationFunctionOperatorFactory(name string, metadataFunc functions.SeriesMetadataFunction, needsSeriesDeduplication bool) InstantVectorFunctionOperatorFactory { - return SingleInputVectorFunctionOperatorFactory(name, metadataFunc, functions.PassthroughData, needsSeriesDeduplication) + f := functions.FunctionOverInstantVector{ + SeriesDataFunc: functions.PassthroughData, + SeriesMetadataFunc: metadataFunc, + NeedsSeriesDeduplication: needsSeriesDeduplication, + } + + return SingleInputVectorFunctionOperatorFactory(name, f) } // FunctionOverRangeVectorOperatorFactory creates an InstantVectorFunctionOperatorFactory for functions diff --git a/pkg/streamingpromql/functions/common.go b/pkg/streamingpromql/functions/common.go index 3f72d197473..a656a22bec0 100644 --- a/pkg/streamingpromql/functions/common.go +++ b/pkg/streamingpromql/functions/common.go @@ -24,11 +24,11 @@ func DropSeriesName(seriesMetadata []types.SeriesMetadata, _ *limiting.MemoryCon return seriesMetadata, nil } -// InstantVectorFunction is a function that takes in a instant vector and produces an instant vector. -type InstantVectorFunction func(seriesData types.InstantVectorSeriesData, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) +// InstantVectorSeriesFunction is a function that takes in an instant vector and produces an instant vector. +type InstantVectorSeriesFunction func(seriesData types.InstantVectorSeriesData, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) // floatTransformationFunc is not needed elsewhere, so it is not exported yet -func floatTransformationFunc(transform func(f float64) float64) InstantVectorFunction { +func floatTransformationFunc(transform func(f float64) float64) InstantVectorSeriesFunction { return func(seriesData types.InstantVectorSeriesData, _ *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { for i := range seriesData.Floats { seriesData.Floats[i].F = transform(seriesData.Floats[i].F) @@ -37,7 +37,7 @@ func floatTransformationFunc(transform func(f float64) float64) InstantVectorFun } } -func FloatTransformationDropHistogramsFunc(transform func(f float64) float64) InstantVectorFunction { +func FloatTransformationDropHistogramsFunc(transform func(f float64) float64) InstantVectorSeriesFunction { ft := floatTransformationFunc(transform) return func(seriesData types.InstantVectorSeriesData, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { // Functions that do not explicitly mention native histograms in their documentation will ignore histogram samples. @@ -89,6 +89,22 @@ type RangeVectorSeriesValidationFunction func(seriesData types.InstantVectorSeri // RangeVectorSeriesValidationFunctionFactory is a factory function that returns a RangeVectorSeriesValidationFunction type RangeVectorSeriesValidationFunctionFactory func() RangeVectorSeriesValidationFunction +type FunctionOverInstantVector struct { + // SeriesDataFunc is the function that computes an output series for a single input series. + SeriesDataFunc InstantVectorSeriesFunction + + // SeriesMetadataFunc is the function that computes the output series for this function based on the given input series. + // + // If SeriesMetadataFunc is nil, the input series are used as-is. + SeriesMetadataFunc SeriesMetadataFunction + + // NeedsSeriesDeduplication enables deduplication and merging of output series with the same labels. + // + // This should be set to true if SeriesMetadataFunc modifies the input series labels in such a way that duplicates may be + // present in the output series labels (eg. dropping a label). + NeedsSeriesDeduplication bool +} + type FunctionOverRangeVector struct { // StepFunc is the function that computes an output sample for a single step. StepFunc RangeVectorStepFunction diff --git a/pkg/streamingpromql/operators/function_over_instant_vector.go b/pkg/streamingpromql/operators/function_over_instant_vector.go index d1de3db13a5..116310380e2 100644 --- a/pkg/streamingpromql/operators/function_over_instant_vector.go +++ b/pkg/streamingpromql/operators/function_over_instant_vector.go @@ -23,9 +23,7 @@ type FunctionOverInstantVector struct { // what we use for the SeriesMetadata. Inner types.InstantVectorOperator MemoryConsumptionTracker *limiting.MemoryConsumptionTracker - - SeriesMetadataFunc functions.SeriesMetadataFunction - SeriesDataFunc functions.InstantVectorFunction + Func functions.FunctionOverInstantVector expressionPosition posrange.PositionRange } @@ -35,16 +33,13 @@ var _ types.InstantVectorOperator = &FunctionOverInstantVector{} func NewFunctionOverInstantVector( inner types.InstantVectorOperator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, - metadataFunc functions.SeriesMetadataFunction, - seriesDataFunc functions.InstantVectorFunction, + f functions.FunctionOverInstantVector, expressionPosition posrange.PositionRange, ) *FunctionOverInstantVector { return &FunctionOverInstantVector{ Inner: inner, MemoryConsumptionTracker: memoryConsumptionTracker, - - SeriesMetadataFunc: metadataFunc, - SeriesDataFunc: seriesDataFunc, + Func: f, expressionPosition: expressionPosition, } @@ -60,8 +55,8 @@ func (m *FunctionOverInstantVector) SeriesMetadata(ctx context.Context) ([]types return nil, err } - if m.SeriesMetadataFunc != nil { - return m.SeriesMetadataFunc(metadata, m.MemoryConsumptionTracker) + if m.Func.SeriesMetadataFunc != nil { + return m.Func.SeriesMetadataFunc(metadata, m.MemoryConsumptionTracker) } return metadata, nil @@ -73,7 +68,7 @@ func (m *FunctionOverInstantVector) NextSeries(ctx context.Context) (types.Insta return types.InstantVectorSeriesData{}, err } - return m.SeriesDataFunc(series, m.MemoryConsumptionTracker) + return m.Func.SeriesDataFunc(series, m.MemoryConsumptionTracker) } func (m *FunctionOverInstantVector) Close() { diff --git a/pkg/streamingpromql/operators/function_over_instant_vector_test.go b/pkg/streamingpromql/operators/function_over_instant_vector_test.go index 8eeec695d5d..b28847efc23 100644 --- a/pkg/streamingpromql/operators/function_over_instant_vector_test.go +++ b/pkg/streamingpromql/operators/function_over_instant_vector_test.go @@ -10,6 +10,7 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/stretchr/testify/require" + "github.com/grafana/mimir/pkg/streamingpromql/functions" "github.com/grafana/mimir/pkg/streamingpromql/limiting" "github.com/grafana/mimir/pkg/streamingpromql/types" ) @@ -41,12 +42,14 @@ func TestFunctionOverInstantVector(t *testing.T) { seriesDataFuncCalledTimes++ return types.InstantVectorSeriesData{}, nil } + operator := &FunctionOverInstantVector{ Inner: inner, MemoryConsumptionTracker: limiting.NewMemoryConsumptionTracker(0, nil), - - SeriesMetadataFunc: mustBeCalledMetadata, - SeriesDataFunc: mustBeCalledSeriesData, + Func: functions.FunctionOverInstantVector{ + SeriesMetadataFunc: mustBeCalledMetadata, + SeriesDataFunc: mustBeCalledSeriesData, + }, } ctx := context.TODO()