Skip to content

Commit

Permalink
Initial implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
charleskorn committed Jan 29, 2025
1 parent 620be2b commit 6e9c6c1
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 0 deletions.
26 changes: 26 additions & 0 deletions pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/grafana/mimir/pkg/streamingpromql/operators"
"github.com/grafana/mimir/pkg/streamingpromql/operators/functions"
"github.com/grafana/mimir/pkg/streamingpromql/operators/scalars"
"github.com/grafana/mimir/pkg/streamingpromql/operators/selectors"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

Expand Down Expand Up @@ -334,6 +335,30 @@ func HistogramFractionFunctionOperatorFactory(args []types.Operator, memoryConsu
return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{lower, upper}, memoryConsumptionTracker, f, expressionPosition, timeRange), nil
}

func TimestampFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
if len(args) != 1 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 1 argument for timestamp, got %v", len(args))
}

inner, ok := args[0].(types.InstantVectorOperator)
if !ok {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected an instant vector for 1st argument for timestamp, got %T", args[0])
}

f := functions.Timestamp
selector, isSelector := args[0].(*selectors.InstantVectorSelector)

if isSelector {
selector.ReturnSampleTimestamps = true
f.SeriesDataFunc = functions.PassthroughData
}

o := functions.NewFunctionOverInstantVector(inner, nil, memoryConsumptionTracker, f, expressionPosition, timeRange)
return operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker), nil
}

// These functions return an instant-vector.
var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOperatorFactory{
// Please keep this list sorted alphabetically.
Expand Down Expand Up @@ -387,6 +412,7 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe
"sum_over_time": FunctionOverRangeVectorOperatorFactory("sum_over_time", functions.SumOverTime),
"tan": InstantVectorTransformationFunctionOperatorFactory("tan", functions.Tan),
"tanh": InstantVectorTransformationFunctionOperatorFactory("tanh", functions.Tanh),
"timestamp": TimestampFunctionOperatorFactory,
"vector": scalarToInstantVectorOperatorFactory,
}

Expand Down
51 changes: 51 additions & 0 deletions pkg/streamingpromql/operators/functions/timestamp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/engine.go
// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/functions.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors

package functions

import (
"github.com/prometheus/prometheus/promql"

"github.com/grafana/mimir/pkg/streamingpromql/limiting"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

var Timestamp = FunctionOverInstantVectorDefinition{
SeriesMetadataFunction: DropSeriesName,
SeriesDataFunc: timestamp,
}

func timestamp(data types.InstantVectorSeriesData, _ []types.ScalarData, _ types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) {
output := types.InstantVectorSeriesData{}

defer types.HPointSlicePool.Put(data.Histograms, memoryConsumptionTracker)

if len(data.Histograms) > 0 {
defer types.FPointSlicePool.Put(data.Floats, memoryConsumptionTracker)

var err error
output.Floats, err = types.FPointSlicePool.Get(len(data.Floats)+len(data.Histograms), memoryConsumptionTracker)

if err != nil {
return types.InstantVectorSeriesData{}, err
}
} else {
// Only have floats, so it's safe to reuse the input float slice.
output.Floats = data.Floats[:0]
}

it := types.InstantVectorSeriesDataIterator{}
it.Reset(data)

t, _, _, ok := it.Next()

for ok {
output.Floats = append(output.Floats, promql.FPoint{T: t, F: float64(t) / 1000})
t, _, _, ok = it.Next()
}

return output, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type InstantVectorSelector struct {
Selector *Selector
MemoryConsumptionTracker *limiting.MemoryConsumptionTracker
Stats *types.QueryStats
ReturnSampleTimestamps bool // true if this operator is wrapped directly in the timestamp() function and so should return the underlying sample timestamps

chunkIterator chunkenc.Iterator
memoizedIterator *storage.MemoizedSeriesIterator
Expand Down Expand Up @@ -116,10 +117,16 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe
}
}
}

if value.IsStaleNaN(f) || (h != nil && value.IsStaleNaN(h.Sum)) {
continue
}

if v.ReturnSampleTimestamps {
f = float64(t) / 1000
h = nil
}

// if (f, h) have been set by PeekPrev, we do not know if f is 0 because that's the actual value, or because
// the previous value had a histogram.
// PeekPrev will set the histogram to nil, or the value to 0 if the other type exists.
Expand Down

0 comments on commit 6e9c6c1

Please sign in to comment.