Skip to content

Commit 08615bf

Browse files
authored
fix: Return empty vector instead of nil for empty evaluator. (#13485)
1 parent 1b9b111 commit 08615bf

File tree

3 files changed

+44
-34
lines changed

3 files changed

+44
-34
lines changed

pkg/logql/engine.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -378,11 +378,10 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_
378378
return nil, fmt.Errorf("unsupported result type: %T", r)
379379
}
380380
}
381-
return nil, nil
381+
return nil, errors.New("unexpected empty result")
382382
}
383383

384384
func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEvaluator, maxSeries int) (promql_parser.Value, error) {
385-
386385
seriesIndex := map[uint64]*promql.Series{}
387386

388387
vec := promql.Vector{}

pkg/logql/engine_test.go

+38-10
Original file line numberDiff line numberDiff line change
@@ -65,15 +65,17 @@ func TestEngine_LogsRateUnwrap(t *testing.T) {
6565
{newSeries(testSize, offset(46, constantValue(1)), `{app="foo"}`)},
6666
},
6767
[]SelectSampleParams{
68-
{&logproto.SampleQueryRequest{
69-
Start: time.Unix(30, 0),
70-
End: time.Unix(60, 0),
71-
Selector: `rate({app="foo"} | unwrap foo[30s])`,
72-
Plan: &plan.QueryPlan{
73-
AST: syntax.MustParseExpr(`rate({app="foo"} | unwrap foo[30s])`),
68+
{
69+
&logproto.SampleQueryRequest{
70+
Start: time.Unix(30, 0),
71+
End: time.Unix(60, 0),
72+
Selector: `rate({app="foo"} | unwrap foo[30s])`,
73+
Plan: &plan.QueryPlan{
74+
AST: syntax.MustParseExpr(`rate({app="foo"} | unwrap foo[30s])`),
75+
},
7476
},
7577
},
76-
}},
78+
},
7779
// there are 15 samples (from 47 to 61) matched from the generated series
7880
// SUM(n=47, 61, 1) = 15
7981
// 15 / 30 = 0.5
@@ -955,7 +957,6 @@ func TestEngine_InstantQuery(t *testing.T) {
955957
} {
956958
test := test
957959
t.Run(fmt.Sprintf("%s %s", test.qs, test.direction), func(t *testing.T) {
958-
959960
eng := NewEngine(EngineOpts{}, newQuerierRecorder(t, test.data, test.params), NoLimits, log.NewNopLogger())
960961

961962
params, err := NewLiteralParams(test.qs, test.ts, test.ts, 0, 0, test.direction, test.limit, nil, nil)
@@ -1590,10 +1591,12 @@ func TestEngine_RangeQuery(t *testing.T) {
15901591
promql.Series{
15911592
// vector result
15921593
Metric: labels.Labels(nil),
1593-
Floats: []promql.FPoint{{T: 60000, F: 0}, {T: 80000, F: 0}, {T: 100000, F: 0}, {T: 120000, F: 0}, {T: 140000, F: 0}, {T: 160000, F: 0}, {T: 180000, F: 0}}},
1594+
Floats: []promql.FPoint{{T: 60000, F: 0}, {T: 80000, F: 0}, {T: 100000, F: 0}, {T: 120000, F: 0}, {T: 140000, F: 0}, {T: 160000, F: 0}, {T: 180000, F: 0}},
1595+
},
15941596
promql.Series{
15951597
Metric: labels.FromStrings("app", "foo"),
1596-
Floats: []promql.FPoint{{T: 60000, F: 0.03333333333333333}, {T: 80000, F: 0.06666666666666667}, {T: 100000, F: 0.06666666666666667}, {T: 120000, F: 0.03333333333333333}, {T: 180000, F: 0.03333333333333333}}},
1598+
Floats: []promql.FPoint{{T: 60000, F: 0.03333333333333333}, {T: 80000, F: 0.06666666666666667}, {T: 100000, F: 0.06666666666666667}, {T: 120000, F: 0.03333333333333333}, {T: 180000, F: 0.03333333333333333}},
1599+
},
15971600
},
15981601
},
15991602
{
@@ -2635,6 +2638,31 @@ func TestHashingStability(t *testing.T) {
26352638
}
26362639
}
26372640

2641+
func TestUnexpectedEmptyResults(t *testing.T) {
2642+
ctx := user.InjectOrgID(context.Background(), "fake")
2643+
2644+
mock := &mockEvaluatorFactory{SampleEvaluatorFunc(func(context.Context, SampleEvaluatorFactory, syntax.SampleExpr, Params) (StepEvaluator, error) {
2645+
return EmptyEvaluator[SampleVector]{value: nil}, nil
2646+
})}
2647+
2648+
eng := NewEngine(EngineOpts{}, nil, NoLimits, log.NewNopLogger())
2649+
params, err := NewLiteralParams(`first_over_time({a=~".+"} | logfmt | unwrap value [1s])`, time.Now(), time.Now(), 0, 0, logproto.BACKWARD, 0, nil, nil)
2650+
require.NoError(t, err)
2651+
q := eng.Query(params).(*query)
2652+
q.evaluator = mock
2653+
2654+
_, err = q.Exec(ctx)
2655+
require.Error(t, err)
2656+
}
2657+
2658+
type mockEvaluatorFactory struct {
2659+
SampleEvaluatorFactory
2660+
}
2661+
2662+
func (*mockEvaluatorFactory) NewIterator(context.Context, syntax.LogSelectorExpr, Params) (iter.EntryIterator, error) {
2663+
return nil, errors.New("unimplemented mock EntryEvaluatorFactory")
2664+
}
2665+
26382666
func getLocalQuerier(size int64) Querier {
26392667
return &querierRecorder{
26402668
series: map[string][]logproto.Series{

pkg/logql/first_last_over_time.go

+5-22
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ import (
1414
// of a windowed aggregation.
1515
func newFirstWithTimestampIterator(
1616
it iter.PeekingSampleIterator,
17-
selRange, step, start, end, offset int64) RangeVectorIterator {
17+
selRange, step, start, end, offset int64,
18+
) RangeVectorIterator {
1819
inner := &batchRangeVectorIterator{
1920
iter: it,
2021
step: step,
@@ -67,7 +68,8 @@ func (r *firstWithTimestampBatchRangeVectorIterator) agg(samples []promql.FPoint
6768

6869
func newLastWithTimestampIterator(
6970
it iter.PeekingSampleIterator,
70-
selRange, step, start, end, offset int64) RangeVectorIterator {
71+
selRange, step, start, end, offset int64,
72+
) RangeVectorIterator {
7173
inner := &batchRangeVectorIterator{
7274
iter: it,
7375
step: step,
@@ -129,10 +131,7 @@ type mergeOverTimeStepEvaluator struct {
129131

130132
// Next returns the first or last element within one step of each matrix.
131133
func (e *mergeOverTimeStepEvaluator) Next() (bool, int64, StepResult) {
132-
133-
var (
134-
vec promql.Vector
135-
)
134+
var vec promql.Vector
136135

137136
e.ts = e.ts.Add(e.step)
138137
if e.ts.After(e.end) {
@@ -158,10 +157,6 @@ func (e *mergeOverTimeStepEvaluator) Next() (bool, int64, StepResult) {
158157
vec[i].T = ts
159158
}
160159

161-
if len(vec) == 0 {
162-
return e.hasNext(), ts, SampleVector(vec)
163-
}
164-
165160
return true, ts, SampleVector(vec)
166161
}
167162

@@ -179,18 +174,6 @@ func (e *mergeOverTimeStepEvaluator) inRange(t, ts int64) bool {
179174
return (ts-e.step.Milliseconds()) <= t && t < ts
180175
}
181176

182-
func (e *mergeOverTimeStepEvaluator) hasNext() bool {
183-
for _, m := range e.matrices {
184-
for _, s := range m {
185-
if len(s.Floats) != 0 {
186-
return true
187-
}
188-
}
189-
}
190-
191-
return false
192-
}
193-
194177
func (*mergeOverTimeStepEvaluator) Close() error { return nil }
195178

196179
func (*mergeOverTimeStepEvaluator) Error() error { return nil }

0 commit comments

Comments
 (0)