Skip to content

Commit 29a37d5

Browse files
authored
fix: special case the return values from a sharded first/last_over_time query (#13578)
Signed-off-by: Callum Styan <[email protected]>
1 parent bd20171 commit 29a37d5

File tree

5 files changed

+108
-29
lines changed

5 files changed

+108
-29
lines changed

pkg/logql/downstream.go

-1
Original file line numberDiff line numberDiff line change
@@ -599,7 +599,6 @@ func (ev *DownstreamEvaluator) NewStepEvaluator(
599599
return nil, fmt.Errorf("unexpected type (%s) uncoercible to StepEvaluator", data.Type())
600600
}
601601
}
602-
603602
return NewMergeLastOverTimeStepEvaluator(params, xs), nil
604603
default:
605604
return ev.defaultEvaluator.NewStepEvaluator(ctx, nextEvFactory, e, params)

pkg/logql/downstream_test.go

+59-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func TestMappingEquivalence(t *testing.T) {
8989
regular := NewEngine(opts, q, NoLimits, log.NewNopLogger())
9090
sharded := NewDownstreamEngine(opts, MockDownstreamer{regular}, NoLimits, log.NewNopLogger())
9191

92-
t.Run(tc.query, func(t *testing.T) {
92+
t.Run(tc.query+"_range", func(t *testing.T) {
9393
params, err := NewLiteralParams(
9494
tc.query,
9595
start,
@@ -125,6 +125,46 @@ func TestMappingEquivalence(t *testing.T) {
125125
require.Equal(t, res.Data, shardedRes.Data)
126126
}
127127
})
128+
t.Run(tc.query+"_instant", func(t *testing.T) {
129+
// for an instant query we set the start and end to the same timestamp
130+
// plus set step and interval to 0
131+
params, err := NewLiteralParams(
132+
tc.query,
133+
time.Unix(0, int64(rounds+1)),
134+
time.Unix(0, int64(rounds+1)),
135+
0,
136+
0,
137+
logproto.FORWARD,
138+
uint32(limit),
139+
nil,
140+
nil,
141+
)
142+
require.NoError(t, err)
143+
qry := regular.Query(params)
144+
ctx := user.InjectOrgID(context.Background(), "fake")
145+
146+
strategy := NewPowerOfTwoStrategy(ConstantShards(shards))
147+
mapper := NewShardMapper(strategy, nilShardMetrics, tc.shardAgg)
148+
_, _, mapped, err := mapper.Parse(params.GetExpression())
149+
require.NoError(t, err)
150+
151+
shardedQry := sharded.Query(ctx, ParamsWithExpressionOverride{
152+
Params: params,
153+
ExpressionOverride: mapped,
154+
})
155+
156+
res, err := qry.Exec(ctx)
157+
require.NoError(t, err)
158+
159+
shardedRes, err := shardedQry.Exec(ctx)
160+
require.NoError(t, err)
161+
162+
if tc.approximate {
163+
approximatelyEqualsVector(t, res.Data.(promql.Vector), shardedRes.Data.(promql.Vector)) //, tc.realtiveError)
164+
} else {
165+
require.Equal(t, res.Data, shardedRes.Data)
166+
}
167+
})
128168
}
129169
}
130170

@@ -579,6 +619,24 @@ func approximatelyEquals(t *testing.T, as, bs promql.Matrix) {
579619
}
580620
}
581621

622+
// approximatelyEqualsVector ensures two responses are approximately equal,
623+
// up to 6 decimals precision per sample
624+
func approximatelyEqualsVector(t *testing.T, as, bs promql.Vector) {
625+
require.Len(t, bs, len(as))
626+
627+
for i := 0; i < len(as); i++ {
628+
a := as[i]
629+
b := bs[i]
630+
require.Equal(t, a.Metric, b.Metric)
631+
632+
aSample := a.F
633+
aSample = math.Round(aSample*1e6) / 1e6
634+
bSample := b.F
635+
bSample = math.Round(bSample*1e6) / 1e6
636+
require.Equalf(t, aSample, bSample, "metric %s differs from %s at %d", a.Metric, b.Metric, i)
637+
}
638+
}
639+
582640
func relativeError(t *testing.T, expected, actual promql.Matrix, alpha float64) {
583641
require.Len(t, actual, len(expected))
584642

pkg/logql/engine.go

+44-24
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,11 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_
371371
case SampleVector:
372372
maxSeriesCapture := func(id string) int { return q.limits.MaxQuerySeries(ctx, id) }
373373
maxSeries := validation.SmallestPositiveIntPerTenant(tenantIDs, maxSeriesCapture)
374-
return q.JoinSampleVector(next, vec, stepEvaluator, maxSeries)
374+
mfl := false
375+
if rae, ok := expr.(*syntax.RangeAggregationExpr); ok && (rae.Operation == syntax.OpRangeTypeFirstWithTimestamp || rae.Operation == syntax.OpRangeTypeLastWithTimestamp) {
376+
mfl = true
377+
}
378+
return q.JoinSampleVector(next, vec, stepEvaluator, maxSeries, mfl)
375379
case ProbabilisticQuantileVector:
376380
return MergeQuantileSketchVector(next, vec, stepEvaluator, q.params)
377381
default:
@@ -381,9 +385,31 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_
381385
return nil, errors.New("unexpected empty result")
382386
}
383387

384-
func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEvaluator, maxSeries int) (promql_parser.Value, error) {
385-
seriesIndex := map[uint64]*promql.Series{}
388+
func vectorsToSeries(vec promql.Vector, sm map[uint64]promql.Series) {
389+
for _, p := range vec {
390+
var (
391+
series promql.Series
392+
hash = p.Metric.Hash()
393+
ok bool
394+
)
395+
396+
series, ok = sm[hash]
397+
if !ok {
398+
series = promql.Series{
399+
Metric: p.Metric,
400+
Floats: make([]promql.FPoint, 0, 1),
401+
}
402+
sm[hash] = series
403+
}
404+
series.Floats = append(series.Floats, promql.FPoint{
405+
T: p.T,
406+
F: p.F,
407+
})
408+
sm[hash] = series
409+
}
410+
}
386411

412+
func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEvaluator, maxSeries int, mergeFirstLast bool) (promql_parser.Value, error) {
387413
vec := promql.Vector{}
388414
if next {
389415
vec = r.SampleVector()
@@ -393,8 +419,21 @@ func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEval
393419
if len(vec) > maxSeries {
394420
return nil, logqlmodel.NewSeriesLimitError(maxSeries)
395421
}
422+
seriesIndex := map[uint64]promql.Series{}
396423

397424
if GetRangeType(q.params) == InstantType {
425+
// an instant query sharded first/last_over_time can return a single vector
426+
if mergeFirstLast {
427+
vectorsToSeries(vec, seriesIndex)
428+
series := make([]promql.Series, 0, len(seriesIndex))
429+
for _, s := range seriesIndex {
430+
series = append(series, s)
431+
}
432+
result := promql.Matrix(series)
433+
sort.Sort(result)
434+
return result, stepEvaluator.Error()
435+
}
436+
398437
sortByValue, err := Sortable(q.params)
399438
if err != nil {
400439
return nil, fmt.Errorf("fail to check Sortable, logql: %s ,err: %s", q.params.QueryString(), err)
@@ -412,26 +451,7 @@ func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEval
412451

413452
for next {
414453
vec = r.SampleVector()
415-
for _, p := range vec {
416-
var (
417-
series *promql.Series
418-
hash = p.Metric.Hash()
419-
ok bool
420-
)
421-
422-
series, ok = seriesIndex[hash]
423-
if !ok {
424-
series = &promql.Series{
425-
Metric: p.Metric,
426-
Floats: make([]promql.FPoint, 0, stepCount),
427-
}
428-
seriesIndex[hash] = series
429-
}
430-
series.Floats = append(series.Floats, promql.FPoint{
431-
T: p.T,
432-
F: p.F,
433-
})
434-
}
454+
vectorsToSeries(vec, seriesIndex)
435455
// as we slowly build the full query for each steps, make sure we don't go over the limit of unique series.
436456
if len(seriesIndex) > maxSeries {
437457
return nil, logqlmodel.NewSeriesLimitError(maxSeries)
@@ -444,7 +464,7 @@ func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEval
444464

445465
series := make([]promql.Series, 0, len(seriesIndex))
446466
for _, s := range seriesIndex {
447-
series = append(series, *s)
467+
series = append(series, s)
448468
}
449469
result := promql.Matrix(series)
450470
sort.Sort(result)

pkg/logql/first_last_over_time.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ type mergeOverTimeStepEvaluator struct {
131131

132132
// Next returns the first or last element within one step of each matrix.
133133
func (e *mergeOverTimeStepEvaluator) Next() (bool, int64, StepResult) {
134-
var vec promql.Vector
134+
vec := promql.Vector{}
135135

136136
e.ts = e.ts.Add(e.step)
137137
if e.ts.After(e.end) {
@@ -142,7 +142,6 @@ func (e *mergeOverTimeStepEvaluator) Next() (bool, int64, StepResult) {
142142
// Merge other results
143143
for i, m := range e.matrices {
144144
for j, series := range m {
145-
146145
if len(series.Floats) == 0 || !e.inRange(series.Floats[0].T, ts) {
147146
continue
148147
}
@@ -171,6 +170,10 @@ func (e *mergeOverTimeStepEvaluator) pop(r, s int) {
171170

172171
// inRange returns true if t is in step range of ts.
173172
func (e *mergeOverTimeStepEvaluator) inRange(t, ts int64) bool {
173+
// special case instant queries
174+
if e.step.Milliseconds() == 0 {
175+
return true
176+
}
174177
return (ts-e.step.Milliseconds()) <= t && t < ts
175178
}
176179

pkg/logql/test_utils.go

-1
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,6 @@ func (m MockDownstreamer) Downstream(ctx context.Context, queries []DownstreamQu
234234
if err != nil {
235235
return nil, err
236236
}
237-
238237
results = append(results, res)
239238
}
240239

0 commit comments

Comments
 (0)