Skip to content

Commit cd11367

Browse files
chore(dataobj): Wire dataset.Reader into dataobj.LogsReader and dataobj.StreamsReader (#16701)
Co-authored-by: Ashwanth Goli <[email protected]>
1 parent 8c4eb42 commit cd11367

11 files changed

+746
-234
lines changed

pkg/dataobj/internal/dataset/predicate.go

+13
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,13 @@ type (
1515
// included if either the Left or Right Predicate are true.
1616
OrPredicate struct{ Left, Right Predicate }
1717

18+
// A NotePredicate is a [Predicate] which asserts that a row may only be
19+
// included if the inner Predicate is false.
20+
NotPredicate struct{ Inner Predicate }
21+
22+
// FalsePredicate is a [Predicate] which always returns false.
23+
FalsePredicate struct{}
24+
1825
// An EqualPredicate is a [Predicate] which asserts that a row may only be
1926
// included if the Value of the Column is equal to the Value.
2027
EqualPredicate struct {
@@ -55,6 +62,8 @@ type (
5562

5663
func (AndPredicate) isPredicate() {}
5764
func (OrPredicate) isPredicate() {}
65+
func (NotPredicate) isPredicate() {}
66+
func (FalsePredicate) isPredicate() {}
5867
func (EqualPredicate) isPredicate() {}
5968
func (GreaterThanPredicate) isPredicate() {}
6069
func (LessThanPredicate) isPredicate() {}
@@ -78,6 +87,10 @@ func WalkPredicate(p Predicate, fn func(p Predicate) bool) {
7887
WalkPredicate(p.Left, fn)
7988
WalkPredicate(p.Right, fn)
8089

90+
case NotPredicate:
91+
WalkPredicate(p.Inner, fn)
92+
93+
case FalsePredicate: // No children.
8194
case EqualPredicate: // No children.
8295
case GreaterThanPredicate: // No children.
8396
case LessThanPredicate: // No children.

pkg/dataobj/internal/dataset/reader.go

+94-6
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ func (r *Reader) Read(ctx context.Context, s []Row) (n int, err error) {
126126
count, err := r.inner.ReadColumns(ctx, r.dl.PrimaryColumns(), s[:readSize])
127127
if err != nil && !errors.Is(err, io.EOF) {
128128
return n, err
129+
} else if count == 0 && errors.Is(err, io.EOF) {
130+
return 0, io.EOF
129131
}
130132

131133
var passCount int // passCount tracks how many rows pass the predicate.
@@ -196,6 +198,12 @@ func checkPredicate(p Predicate, lookup map[Column]int, row Row) bool {
196198
case OrPredicate:
197199
return checkPredicate(p.Left, lookup, row) || checkPredicate(p.Right, lookup, row)
198200

201+
case NotPredicate:
202+
return !checkPredicate(p.Inner, lookup, row)
203+
204+
case FalsePredicate:
205+
return false
206+
199207
case EqualPredicate:
200208
columnIndex, ok := lookup[p.Column]
201209
if !ok {
@@ -350,7 +358,7 @@ func (r *Reader) validatePredicate() error {
350358
err = process(p.Column)
351359
case FuncPredicate:
352360
err = process(p.Column)
353-
case AndPredicate, OrPredicate, nil:
361+
case AndPredicate, OrPredicate, NotPredicate, FalsePredicate, nil:
354362
// No columns to process.
355363
default:
356364
panic(fmt.Sprintf("dataset.Reader.validatePredicate: unsupported predicate type %T", p))
@@ -422,7 +430,7 @@ func (r *Reader) fillPrimaryMask(mask *bitmask.Mask) {
422430
process(p.Column)
423431
case FuncPredicate:
424432
process(p.Column)
425-
case AndPredicate, OrPredicate, nil:
433+
case AndPredicate, OrPredicate, NotPredicate, FalsePredicate, nil:
426434
// No columns to process.
427435
default:
428436
panic(fmt.Sprintf("dataset.Reader.fillPrimaryMask: unsupported predicate type %T", p))
@@ -463,6 +471,25 @@ func (r *Reader) buildPredicateRanges(ctx context.Context, p Predicate) (rowRang
463471
}
464472
return unionRanges(nil, left, right), nil
465473

474+
case NotPredicate:
475+
// De Morgan's laws must be applied to reduce the NotPredicate to a set of
476+
// predicates that can be applied to pages.
477+
//
478+
// See comment on [simplifyNotPredicate] for more information.
479+
simplified, err := simplifyNotPredicate(p)
480+
if err != nil {
481+
// Predicate can't be simplfied, so we permit the full range.
482+
var rowsCount uint64
483+
for _, column := range r.dl.AllColumns() {
484+
rowsCount = max(rowsCount, uint64(column.ColumnInfo().RowsCount))
485+
}
486+
return rowRanges{{Start: 0, End: rowsCount - 1}}, nil
487+
}
488+
return r.buildPredicateRanges(ctx, simplified)
489+
490+
case FalsePredicate:
491+
return nil, nil // No valid ranges.
492+
466493
case EqualPredicate:
467494
return r.buildColumnPredicateRanges(ctx, p.Column, p)
468495

@@ -489,6 +516,67 @@ func (r *Reader) buildPredicateRanges(ctx context.Context, p Predicate) (rowRang
489516
}
490517
}
491518

519+
// simplifyNotPredicate applies De Morgan's laws to a NotPredicate to permit
520+
// page filtering.
521+
//
522+
// While during evaluation, a NotPredicate inverts the result of the inner
523+
// predicate, the same can't be done for page filtering. For example, imagine
524+
// that a page is included from a rule "a > 10." If we inverted that inclusion,
525+
// we may be incorrectly filtering out that page, as that page may also have
526+
// values less than 10.
527+
//
528+
// To correctly apply page filtering to a NotPredicate, we reduce the
529+
// NotPredicate to a set of predicates that can be applied to pages. This may
530+
// result in other NotPredicates that also need to be simplified.
531+
//
532+
// If the NotPredicate can't be simplified, simplifyNotPredicate returns an
533+
// error.
534+
func simplifyNotPredicate(p NotPredicate) (Predicate, error) {
535+
switch inner := p.Inner.(type) {
536+
case AndPredicate: // De Morgan's law: !(A && B) == !A || !B
537+
return OrPredicate{
538+
Left: NotPredicate{Inner: inner.Left},
539+
Right: NotPredicate{Inner: inner.Right},
540+
}, nil
541+
542+
case OrPredicate: // De Morgan's law: !(A || B) == !A && !B
543+
return AndPredicate{
544+
Left: NotPredicate{Inner: inner.Left},
545+
Right: NotPredicate{Inner: inner.Right},
546+
}, nil
547+
548+
case NotPredicate: // De Morgan's law: !!A == A
549+
return inner.Inner, nil
550+
551+
case FalsePredicate:
552+
return nil, fmt.Errorf("can't simplify FalsePredicate")
553+
554+
case EqualPredicate: // De Morgan's law: !(A == B) == A != B == A < B || A > B
555+
return OrPredicate{
556+
Left: LessThanPredicate(inner),
557+
Right: GreaterThanPredicate(inner),
558+
}, nil
559+
560+
case GreaterThanPredicate: // De Morgan's law: !(A > B) == A <= B
561+
return OrPredicate{
562+
Left: EqualPredicate(inner),
563+
Right: LessThanPredicate(inner),
564+
}, nil
565+
566+
case LessThanPredicate: // De Morgan's law: !(A < B) == A >= B
567+
return OrPredicate{
568+
Left: EqualPredicate(inner),
569+
Right: GreaterThanPredicate(inner),
570+
}, nil
571+
572+
case FuncPredicate:
573+
return nil, fmt.Errorf("can't simplify FuncPredicate")
574+
575+
default:
576+
panic(fmt.Sprintf("unsupported predicate type %T", inner))
577+
}
578+
}
579+
492580
// buildColumnPredicateRanges returns a set of rowRanges that are valid based
493581
// on whether EqualPredicate, GreaterThanPredicate, or LessThanPredicate may be
494582
// true for each page in a column.
@@ -536,10 +624,10 @@ func (r *Reader) buildColumnPredicateRanges(ctx context.Context, c Column, p Pre
536624
switch p := p.(type) {
537625
case EqualPredicate: // EqualPredicate may be true if p.Value is inside the range of the page.
538626
include = CompareValues(p.Value, minValue) >= 0 && CompareValues(p.Value, maxValue) <= 0
539-
case GreaterThanPredicate: // GreaterThanPredicate may be true if p.Value is greater than the min value.
540-
include = CompareValues(p.Value, minValue) > 0
541-
case LessThanPredicate: // LessThanPredicate may be true if p.Value is less than the max value.
542-
include = CompareValues(p.Value, maxValue) < 0
627+
case GreaterThanPredicate: // GreaterThanPredicate may be true if maxValue of a page is greater than p.Value
628+
include = CompareValues(maxValue, p.Value) > 0
629+
case LessThanPredicate: // LessThanPredicate may be true if minValue of a page is less than p.Value
630+
include = CompareValues(minValue, p.Value) < 0
543631
default:
544632
panic(fmt.Sprintf("unsupported predicate type %T", p))
545633
}

pkg/dataobj/internal/dataset/reader_basic.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func (pr *basicReader) ReadColumns(ctx context.Context, columns []Column, s []Ro
9090
// Fill does not advance the offset of the basicReader.
9191
func (pr *basicReader) Fill(ctx context.Context, columns []Column, s []Row) (n int, err error) {
9292
if len(columns) == 0 {
93-
return 0, fmt.Errorf("no columns to read")
93+
return 0, fmt.Errorf("no columns to fill")
9494
}
9595

9696
for partition := range partitionRows(s) {

pkg/dataobj/internal/dataset/reader_test.go

+159
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ import (
88
"testing"
99

1010
"github.com/stretchr/testify/require"
11+
12+
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
13+
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
1114
)
1215

1316
func Test_Reader_ReadAll(t *testing.T) {
@@ -228,3 +231,159 @@ func readDataset(br *Reader, batchSize int) ([]Row, error) {
228231
}
229232
}
230233
}
234+
235+
func Test_BuildPredicateRanges(t *testing.T) {
236+
ds, cols := buildMemDatasetWithStats(t)
237+
tt := []struct {
238+
name string
239+
predicate Predicate
240+
want rowRanges
241+
}{
242+
{
243+
name: "nil predicate returns full range",
244+
predicate: nil,
245+
want: rowRanges{{Start: 0, End: 999}}, // Full dataset range
246+
},
247+
{
248+
name: "equal predicate in range",
249+
predicate: EqualPredicate{Column: cols[1], Value: Int64Value(50)},
250+
want: rowRanges{{Start: 0, End: 249}}, // Page 1 of Timestamp column
251+
},
252+
{
253+
name: "equal predicate not in any range",
254+
predicate: EqualPredicate{Column: cols[1], Value: Int64Value(1500)},
255+
want: nil, // No ranges should match
256+
},
257+
{
258+
name: "greater than predicate",
259+
predicate: GreaterThanPredicate{Column: cols[1], Value: Int64Value(400)},
260+
want: rowRanges{{Start: 250, End: 749}, {Start: 750, End: 999}}, // Pages 2 and 3 of Timestamp column
261+
},
262+
{
263+
name: "less than predicate",
264+
predicate: LessThanPredicate{Column: cols[1], Value: Int64Value(300)},
265+
want: rowRanges{{Start: 0, End: 249}, {Start: 250, End: 749}}, // Pages 1 and 2 of Timestamp column
266+
},
267+
{
268+
name: "and predicate",
269+
predicate: AndPredicate{
270+
Left: EqualPredicate{Column: cols[0], Value: Int64Value(1)}, // Rows 0 - 299 of stream column
271+
Right: LessThanPredicate{Column: cols[1], Value: Int64Value(600)}, // Rows 0 - 249, 250 - 749 of timestamp column
272+
},
273+
want: rowRanges{{Start: 0, End: 249}, {Start: 250, End: 299}},
274+
},
275+
{
276+
name: "or predicate",
277+
predicate: OrPredicate{
278+
Left: EqualPredicate{Column: cols[0], Value: Int64Value(1)}, // Rows 0 - 299 of stream column
279+
Right: GreaterThanPredicate{Column: cols[1], Value: Int64Value(800)}, // Rows 750 - 999 of timestamp column
280+
},
281+
want: rowRanges{{Start: 0, End: 299}, {Start: 750, End: 999}}, // Rows 0 - 299, 750 - 999
282+
},
283+
}
284+
285+
ctx := context.Background()
286+
for _, tc := range tt {
287+
t.Run(tc.name, func(t *testing.T) {
288+
r := NewReader(ReaderOptions{
289+
Dataset: ds,
290+
Columns: cols,
291+
Predicate: tc.predicate,
292+
})
293+
defer r.Close()
294+
295+
// Initialize downloader
296+
require.NoError(t, r.initDownloader(ctx))
297+
298+
got, err := r.buildPredicateRanges(ctx, tc.predicate)
299+
require.NoError(t, err)
300+
301+
require.Equal(t, tc.want, got, "row ranges should match expected ranges")
302+
})
303+
}
304+
}
305+
306+
// buildMemDatasetWithStats creates a test dataset with only column and page stats.
307+
func buildMemDatasetWithStats(t *testing.T) (Dataset, []Column) {
308+
t.Helper()
309+
310+
dset := FromMemory([]*MemColumn{
311+
{
312+
Info: ColumnInfo{
313+
Name: "stream",
314+
Type: datasetmd.VALUE_TYPE_INT64,
315+
RowsCount: 1000, // 0 - 999
316+
},
317+
Pages: []*MemPage{
318+
{
319+
Info: PageInfo{
320+
RowCount: 300, // 0 - 299
321+
Stats: &datasetmd.Statistics{
322+
MinValue: encodeInt64Value(t, 1),
323+
MaxValue: encodeInt64Value(t, 2),
324+
},
325+
},
326+
},
327+
{
328+
Info: PageInfo{
329+
RowCount: 700, // 300 - 999
330+
Stats: &datasetmd.Statistics{
331+
MinValue: encodeInt64Value(t, 2),
332+
MaxValue: encodeInt64Value(t, 2),
333+
},
334+
},
335+
},
336+
},
337+
},
338+
{
339+
Info: ColumnInfo{
340+
Name: "timestamp",
341+
Type: datasetmd.VALUE_TYPE_INT64,
342+
RowsCount: 1000, // 0 - 999
343+
},
344+
Pages: []*MemPage{
345+
{
346+
Info: PageInfo{
347+
RowCount: 250, // 0 - 249
348+
Stats: &datasetmd.Statistics{
349+
MinValue: encodeInt64Value(t, 0),
350+
MaxValue: encodeInt64Value(t, 100),
351+
},
352+
},
353+
},
354+
{
355+
Info: PageInfo{
356+
RowCount: 500, // 249 - 749
357+
Stats: &datasetmd.Statistics{
358+
MinValue: encodeInt64Value(t, 200),
359+
MaxValue: encodeInt64Value(t, 500),
360+
},
361+
},
362+
},
363+
{
364+
Info: PageInfo{
365+
RowCount: 250, // 750 - 999
366+
Stats: &datasetmd.Statistics{
367+
MinValue: encodeInt64Value(t, 800),
368+
MaxValue: encodeInt64Value(t, 1000),
369+
},
370+
},
371+
},
372+
},
373+
},
374+
})
375+
376+
cols, err := result.Collect(dset.ListColumns(context.Background()))
377+
require.NoError(t, err)
378+
379+
return dset, cols
380+
}
381+
382+
// Helper function to encode an integer value for statistics
383+
func encodeInt64Value(t *testing.T, v int64) []byte {
384+
t.Helper()
385+
386+
data, err := Int64Value(v).MarshalBinary()
387+
require.NoError(t, err)
388+
return data
389+
}

pkg/dataobj/internal/sections/logs/iter.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func IterSection(ctx context.Context, dec encoding.LogsDecoder, section *filemd.
8080
}
8181

8282
for _, row := range rows[:n] {
83-
record, err := decodeRecord(streamsColumns, row)
83+
record, err := Decode(streamsColumns, row)
8484
if err != nil || !yield(record) {
8585
return err
8686
}
@@ -89,7 +89,10 @@ func IterSection(ctx context.Context, dec encoding.LogsDecoder, section *filemd.
8989
})
9090
}
9191

92-
func decodeRecord(columns []*logsmd.ColumnDesc, row dataset.Row) (Record, error) {
92+
// Decode decodes a record from a [dataset.Row], using the provided columns to
93+
// determine the column type. The list of columns must match the columns used
94+
// to create the row.
95+
func Decode(columns []*logsmd.ColumnDesc, row dataset.Row) (Record, error) {
9396
record := Record{
9497
// Preallocate metadata to exact number of metadata columns to avoid
9598
// oversizing.

0 commit comments

Comments
 (0)