Skip to content

Commit

Permalink
basic search
Browse files Browse the repository at this point in the history
  • Loading branch information
gernest committed Jul 10, 2024
1 parent 2c6c9c4 commit 0ae5298
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 251 deletions.
263 changes: 160 additions & 103 deletions gen/go/fri/v1/fri.pb.go

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/RoaringBitmap/roaring v1.9.4
github.com/blevesearch/vellum v1.0.10
github.com/cespare/xxhash/v2 v2.3.0
github.com/gernest/rbf v0.22.14
github.com/gernest/rbf v0.22.15
github.com/gernest/roaring v0.22.3
github.com/gernest/rows v1.22.9
github.com/go-kit/log v0.2.1
Expand Down Expand Up @@ -327,3 +327,5 @@ require (
replace github.com/gocql/gocql => github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85

exclude k8s.io/client-go v8.0.0+incompatible

replace github.com/gernest/rbf => ../fusion/rbf
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,8 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/fsouza/fake-gcs-server v1.7.0 h1:Un0BXUXrRWYSmYyC1Rqm2e2WJfTPyDy/HGMz31emTi8=
github.com/fsouza/fake-gcs-server v1.7.0/go.mod h1:5XIRs4YvwNbNoz+1JF8j6KLAyDh7RHGAyAK3EP2EsNk=
github.com/gernest/rbf v0.22.14 h1:T39F8aLSmvUpuj6WIPCqjtGf+D2iLu3K9pMvcNCGPm4=
github.com/gernest/rbf v0.22.14/go.mod h1:o6zinP4M2yB5SyskHpEYYW5nm/hkJ41gNSbFGuab0Mk=
github.com/gernest/rbf v0.22.15 h1:E2FHfPBypkEltVadIS7p8RaKfqfbAWM6Dd6eJ9/5V70=
github.com/gernest/rbf v0.22.15/go.mod h1:o6zinP4M2yB5SyskHpEYYW5nm/hkJ41gNSbFGuab0Mk=
github.com/gernest/roaring v0.22.3 h1:/7uzKzgMsvtYnBndo53oVYti7dc9iQmwvaZDaU3SJ8Q=
github.com/gernest/roaring v0.22.3/go.mod h1:7zYgTaDIXI0V6GCZj9u+130yQ3wK0OUxk6cLurb/jeI=
github.com/gernest/rows v1.22.9 h1:lCgslJirNMnkDq1mFUBeMfcb0Dcs/yvwtKpk3CDyXR4=
Expand Down
7 changes: 4 additions & 3 deletions internal/lbx/bsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ func BSI(base Data, c *rbf.Cursor, exists *rows.Row, shard uint64, f func(column
}

func Distinct(c *rbf.Cursor, exists *rows.Row, shard uint64, f func(value uint64, columns *rows.Row) error) error {
data := NewData(exists.Columns())
cols := exists.Columns()
data := NewData(cols)
bitDepth, err := depth(c)
if err != nil {
return err
Expand All @@ -78,8 +79,8 @@ func Distinct(c *rbf.Cursor, exists *rows.Row, shard uint64, f func(value uint64
val = uint64((2*(int64(val)>>63) + 1) * int64(val&^(1<<63)))
idx[val] = append(idx[val], columnID)
}
for row, cols := range idx {
err := f(row, rows.NewRow(cols...))
for i := range cols {
err := f(cols[i], rows.NewRow(idx[cols[i]]...))
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion internal/metrics/metricsproto/metricsproto.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func fromTS(ts *prompb.TimeSeries, o []*v1.Sample) []*v1.Sample {
Series: id,
Labels: labels,
Exemplars: exemplars,
Kind: true,
Kind: v1.Sample_FLOAT,
Value: ts.Samples[i].Value,
Timestamp: ts.Samples[i].Timestamp,
})
Expand All @@ -61,6 +61,7 @@ func fromTS(ts *prompb.TimeSeries, o []*v1.Sample) []*v1.Sample {
data, _ := ts.Histograms[i].Marshal()
o = append(o, &v1.Sample{
Series: id,
Kind: v1.Sample_HISTOGRAM,
Labels: labels,
Exemplars: exemplars,
Histogram: data,
Expand Down
24 changes: 19 additions & 5 deletions internal/metrics/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/blevesearch/vellum"
v1 "github.com/gernest/frieren/gen/go/fri/v1"
"github.com/gernest/frieren/internal/lbx"
"github.com/gernest/rbf"
"github.com/gernest/rbf/dsl"
Expand Down Expand Up @@ -164,14 +165,16 @@ func (s *Querier) Select(ctx context.Context, sortSeries bool, hints *storage.Se
defer r.Release()
isSeriesQuery := hints.Func == "series"
for i := range s.shards {
r.View(s.shards[i], func(txn *tx.Tx) error {

err := r.View(s.shards[i], func(txn *tx.Tx) error {
r, err := base.Apply(txn, nil)
if err != nil {
return err
}
if r.IsEmpty() {
return nil
}

f, err := filter.Apply(txn, r)
if err != nil {
return err
Expand Down Expand Up @@ -215,10 +218,17 @@ func (s *Querier) Select(ctx context.Context, sortSeries bool, hints *storage.Se
}
defer hc.Close()

isHistogram, err := lbx.Histograms(kind, txn)
floats, err := cursor.Row(kind, txn.Shard, uint64(v1.Sample_FLOAT))
if err != nil {
return err
}

histograms, err := cursor.Row(kind, txn.Shard, uint64(v1.Sample_HISTOGRAM))
if err != nil {
return err
}
histograms = histograms.Intersect(f)

return lbx.Distinct(series, f, txn.Shard, func(value uint64, columns *rows.Row) error {
cols := columns.Columns()
sx, ok := m[value]
Expand All @@ -241,8 +251,8 @@ func (s *Querier) Select(ctx context.Context, sortSeries bool, hints *storage.Se
mapping[cols[i]] = i
}
data := lbx.NewData(cols)
if isHistogram.Includes(cols[0]) {
// Histogram series

if histograms.Includes(cols[0]) {
hs := &prompb.Histogram{}
err = lbx.BSI(data, ts, columns, txn.Shard, func(column uint64, value int64) {
hs.Reset()
Expand All @@ -256,7 +266,8 @@ func (s *Querier) Select(ctx context.Context, sortSeries bool, hints *storage.Se
if err != nil {
return fmt.Errorf("reading histogram %w", err)
}
} else {
}
if floats.Includes(cols[0]) {
err = lbx.BSI(data, ts, columns, txn.Shard, func(column uint64, value int64) {
chunks[mapping[column]] = &V{ts: value}
})
Expand All @@ -274,6 +285,9 @@ func (s *Querier) Select(ctx context.Context, sortSeries bool, hints *storage.Se
return nil
})
})
if err != nil {
return storage.ErrSeriesSet(err)
}
}

return NewSeriesSet(m)
Expand Down
2 changes: 1 addition & 1 deletion internal/metrics/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestSelect(t *testing.T) {
require.NoError(t, err)
set := qry.Select(context.TODO(), false, &storage.SelectHints{
Start: ms, End: ms,
}, labels.MustNewMatcher(labels.MatchEqual, "service_name", "test-service"))
}, labels.MustNewMatcher(labels.MatchEqual, "job", "test-service"))
require.NoError(t, set.Err())

data, err := json.MarshalIndent(matrix(set), "", " ")
Expand Down
Loading

0 comments on commit 0ae5298

Please sign in to comment.