From 999d5f0082dd0fdfa84893db83029b38f190c3dd Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Thu, 11 Jul 2024 13:22:53 +0200 Subject: [PATCH 1/3] fix: Return empty vector instead of nil for empty evaluator. --- pkg/logql/engine.go | 3 +-- pkg/logql/engine_test.go | 49 ++++++++++++++++++++++++++++++++-------- 2 files changed, 40 insertions(+), 12 deletions(-) diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index f35a1b397a3b0..84d352f005944 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -378,11 +378,10 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_ return nil, fmt.Errorf("unsupported result type: %T", r) } } - return nil, nil + return promql.Vector{}, nil } func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEvaluator, maxSeries int) (promql_parser.Value, error) { - seriesIndex := map[uint64]*promql.Series{} vec := promql.Vector{} diff --git a/pkg/logql/engine_test.go b/pkg/logql/engine_test.go index b409422c1c8fc..1b031ad5d6047 100644 --- a/pkg/logql/engine_test.go +++ b/pkg/logql/engine_test.go @@ -65,15 +65,17 @@ func TestEngine_LogsRateUnwrap(t *testing.T) { {newSeries(testSize, offset(46, constantValue(1)), `{app="foo"}`)}, }, []SelectSampleParams{ - {&logproto.SampleQueryRequest{ - Start: time.Unix(30, 0), - End: time.Unix(60, 0), - Selector: `rate({app="foo"} | unwrap foo[30s])`, - Plan: &plan.QueryPlan{ - AST: syntax.MustParseExpr(`rate({app="foo"} | unwrap foo[30s])`), + { + &logproto.SampleQueryRequest{ + Start: time.Unix(30, 0), + End: time.Unix(60, 0), + Selector: `rate({app="foo"} | unwrap foo[30s])`, + Plan: &plan.QueryPlan{ + AST: syntax.MustParseExpr(`rate({app="foo"} | unwrap foo[30s])`), + }, }, }, - }}, + }, // there are 15 samples (from 47 to 61) matched from the generated series // SUM(n=47, 61, 1) = 15 // 15 / 30 = 0.5 @@ -955,7 +957,6 @@ func TestEngine_InstantQuery(t *testing.T) { } { test := test t.Run(fmt.Sprintf("%s %s", test.qs, test.direction), func(t *testing.T) { - eng := NewEngine(EngineOpts{}, newQuerierRecorder(t, test.data, test.params), NoLimits, log.NewNopLogger()) params, err := NewLiteralParams(test.qs, test.ts, test.ts, 0, 0, test.direction, test.limit, nil, nil) @@ -1590,10 +1591,12 @@ func TestEngine_RangeQuery(t *testing.T) { promql.Series{ // vector result Metric: labels.Labels(nil), - Floats: []promql.FPoint{{T: 60000, F: 0}, {T: 80000, F: 0}, {T: 100000, F: 0}, {T: 120000, F: 0}, {T: 140000, F: 0}, {T: 160000, F: 0}, {T: 180000, F: 0}}}, + Floats: []promql.FPoint{{T: 60000, F: 0}, {T: 80000, F: 0}, {T: 100000, F: 0}, {T: 120000, F: 0}, {T: 140000, F: 0}, {T: 160000, F: 0}, {T: 180000, F: 0}}, + }, promql.Series{ Metric: labels.FromStrings("app", "foo"), - Floats: []promql.FPoint{{T: 60000, F: 0.03333333333333333}, {T: 80000, F: 0.06666666666666667}, {T: 100000, F: 0.06666666666666667}, {T: 120000, F: 0.03333333333333333}, {T: 180000, F: 0.03333333333333333}}}, + Floats: []promql.FPoint{{T: 60000, F: 0.03333333333333333}, {T: 80000, F: 0.06666666666666667}, {T: 100000, F: 0.06666666666666667}, {T: 120000, F: 0.03333333333333333}, {T: 180000, F: 0.03333333333333333}}, + }, }, }, { @@ -2635,6 +2638,32 @@ func TestHashingStability(t *testing.T) { } } +func TestEmptyResults(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), "fake") + + mock := &mockEvaluatorFactory{SampleEvaluatorFunc(func(context.Context, SampleEvaluatorFactory, syntax.SampleExpr, Params) (StepEvaluator, error) { + return EmptyEvaluator[SampleVector]{value: nil}, nil + })} + + eng := NewEngine(EngineOpts{}, nil, NoLimits, log.NewNopLogger()) + params, err := NewLiteralParams(`first_over_time({a=~".+"} | logfmt | unwrap value [1s])`, time.Now(), time.Now(), 0, 0, logproto.BACKWARD, 0, nil, nil) + require.NoError(t, err) + q := eng.Query(params).(*query) + q.evaluator = mock + + r, err := q.Exec(ctx) + require.NoError(t, err) + require.NotNil(t, r.Data) +} + +type mockEvaluatorFactory struct { + SampleEvaluatorFactory +} + +func (*mockEvaluatorFactory) NewIterator(context.Context, syntax.LogSelectorExpr, Params) (iter.EntryIterator, error) { + return nil, errors.New("unimplemented mock EntryEvaluatorFactory") +} + func getLocalQuerier(size int64) Querier { return &querierRecorder{ series: map[string][]logproto.Series{ From 533e945bff1790b267959ac948918d411ce781f3 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Thu, 11 Jul 2024 15:23:38 +0200 Subject: [PATCH 2/3] Fix evaluator behaviour. --- pkg/logql/engine.go | 2 +- pkg/logql/engine_test.go | 7 +++---- pkg/logql/first_last_over_time.go | 25 ++++++------------------- 3 files changed, 10 insertions(+), 24 deletions(-) diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index 84d352f005944..0a26520b673c7 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -378,7 +378,7 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_ return nil, fmt.Errorf("unsupported result type: %T", r) } } - return promql.Vector{}, nil + return nil, errors.New("unexpected empty result") } func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEvaluator, maxSeries int) (promql_parser.Value, error) { diff --git a/pkg/logql/engine_test.go b/pkg/logql/engine_test.go index 1b031ad5d6047..23bcbac91e0f6 100644 --- a/pkg/logql/engine_test.go +++ b/pkg/logql/engine_test.go @@ -2638,7 +2638,7 @@ func TestHashingStability(t *testing.T) { } } -func TestEmptyResults(t *testing.T) { +func TestUnexpectedEmptyResults(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "fake") mock := &mockEvaluatorFactory{SampleEvaluatorFunc(func(context.Context, SampleEvaluatorFactory, syntax.SampleExpr, Params) (StepEvaluator, error) { @@ -2651,9 +2651,8 @@ func TestEmptyResults(t *testing.T) { q := eng.Query(params).(*query) q.evaluator = mock - r, err := q.Exec(ctx) - require.NoError(t, err) - require.NotNil(t, r.Data) + _, err = q.Exec(ctx) + require.Error(t, err) } type mockEvaluatorFactory struct { diff --git a/pkg/logql/first_last_over_time.go b/pkg/logql/first_last_over_time.go index e24133d13bfec..4ed642e2fc5ab 100644 --- a/pkg/logql/first_last_over_time.go +++ b/pkg/logql/first_last_over_time.go @@ -14,7 +14,8 @@ import ( // of a windowed aggregation. func newFirstWithTimestampIterator( it iter.PeekingSampleIterator, - selRange, step, start, end, offset int64) RangeVectorIterator { + selRange, step, start, end, offset int64, +) RangeVectorIterator { inner := &batchRangeVectorIterator{ iter: it, step: step, @@ -67,7 +68,8 @@ func (r *firstWithTimestampBatchRangeVectorIterator) agg(samples []promql.FPoint func newLastWithTimestampIterator( it iter.PeekingSampleIterator, - selRange, step, start, end, offset int64) RangeVectorIterator { + selRange, step, start, end, offset int64, +) RangeVectorIterator { inner := &batchRangeVectorIterator{ iter: it, step: step, @@ -129,10 +131,7 @@ type mergeOverTimeStepEvaluator struct { // Next returns the first or last element within one step of each matrix. func (e *mergeOverTimeStepEvaluator) Next() (bool, int64, StepResult) { - - var ( - vec promql.Vector - ) + var vec promql.Vector e.ts = e.ts.Add(e.step) if e.ts.After(e.end) { @@ -159,7 +158,7 @@ func (e *mergeOverTimeStepEvaluator) Next() (bool, int64, StepResult) { } if len(vec) == 0 { - return e.hasNext(), ts, SampleVector(vec) + return true, ts, SampleVector(vec) } return true, ts, SampleVector(vec) @@ -179,18 +178,6 @@ func (e *mergeOverTimeStepEvaluator) inRange(t, ts int64) bool { return (ts-e.step.Milliseconds()) <= t && t < ts } -func (e *mergeOverTimeStepEvaluator) hasNext() bool { - for _, m := range e.matrices { - for _, s := range m { - if len(s.Floats) != 0 { - return true - } - } - } - - return false -} - func (*mergeOverTimeStepEvaluator) Close() error { return nil } func (*mergeOverTimeStepEvaluator) Error() error { return nil } From 9f74ea8b56f8a4dbe268c72da40957eef6ba4a9e Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Thu, 11 Jul 2024 17:15:23 +0200 Subject: [PATCH 3/3] Simplify code --- pkg/logql/first_last_over_time.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/logql/first_last_over_time.go b/pkg/logql/first_last_over_time.go index 4ed642e2fc5ab..6d0329cacf8d2 100644 --- a/pkg/logql/first_last_over_time.go +++ b/pkg/logql/first_last_over_time.go @@ -157,10 +157,6 @@ func (e *mergeOverTimeStepEvaluator) Next() (bool, int64, StepResult) { vec[i].T = ts } - if len(vec) == 0 { - return true, ts, SampleVector(vec) - } - return true, ts, SampleVector(vec) }