Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix combining query filters, offsets, and limits #71

Merged
merged 4 commits into from
Oct 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 104 additions & 34 deletions datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,36 +450,50 @@ func (t *txn) Query(q dsq.Query) (dsq.Results, error) {
}

func (t *txn) query(q dsq.Query) (dsq.Results, error) {
prefix := []byte(q.Prefix)
opt := badger.DefaultIteratorOptions
opt.PrefetchValues = !q.KeysOnly
opt.Prefix = []byte(q.Prefix)

// Special case order by key.
orders := q.Orders
if len(orders) > 0 {
// Handle ordering
if len(q.Orders) > 0 {
switch q.Orders[0].(type) {
case dsq.OrderByKey, *dsq.OrderByKey:
// Already ordered by key.
orders = nil
// We order by key by default.
case dsq.OrderByKeyDescending, *dsq.OrderByKeyDescending:
orders = nil
// Reverse order by key
opt.Reverse = true
}
}
default:
// Ok, we have a weird order we can't handle. Let's
// perform the _base_ query (prefix, filter, etc.), then
// handle sort/offset/limit later.

// Skip the stuff we can't apply.
baseQuery := q
baseQuery.Limit = 0
baseQuery.Offset = 0
baseQuery.Orders = nil

// perform the base query.
res, err := t.query(baseQuery)
if err != nil {
return nil, err
}

txn := t.txn
// fix the query
res = dsq.ResultsReplaceQuery(res, q)

it := txn.NewIterator(opt)
it.Seek(prefix)
// Remove the parts we've already applied.
naiveQuery := q
naiveQuery.Prefix = ""
naiveQuery.Filters = nil

if q.Offset > 0 {
for j := 0; j < q.Offset; j++ {
it.Next()
// Apply the rest of the query
return dsq.NaiveQueryApply(naiveQuery, res), nil
}
}

it := t.txn.NewIterator(opt)
qrb := dsq.NewResultBuilder(q)

qrb.Process.Go(func(worker goprocess.Process) {
t.ds.closeLk.RLock()
closedEarly := false
Expand Down Expand Up @@ -509,16 +523,63 @@ func (t *txn) query(q dsq.Query) (dsq.Results, error) {

defer it.Close()

for sent := 0; it.ValidForPrefix(prefix); sent++ {
if qrb.Query.Limit > 0 && sent >= qrb.Query.Limit {
break
// All iterators must be started by rewinding.
it.Rewind()

// skip to the offset
for skipped := 0; skipped < q.Offset && it.Valid(); it.Next() {
// On the happy path, we have no filters and we can go
// on our way.
if len(q.Filters) == 0 {
skipped++
continue
}

// On the sad path, we need to apply filters before
// counting the item as "skipped" as the offset comes
// _after_ the filter.
item := it.Item()

k := string(item.Key())
e := dsq.Entry{Key: k}
matches := true
check := func(value []byte) error {
e := dsq.Entry{Key: string(item.Key()), Value: value}

// Only calculate expirations if we need them.
if q.ReturnExpirations {
e.Expiration = expires(item)
}
matches = filter(q.Filters, e)
return nil
}

// Maybe check with the value, only if we need it.
var err error
if q.KeysOnly {
err = check(nil)
} else {
err = item.Value(check)
}

if err != nil {
select {
case qrb.Output <- dsq.Result{Error: err}:
case <-t.ds.closing: // datastore closing.
closedEarly = true
return
case <-worker.Closing(): // client told us to close early
return
}
}
if !matches {
skipped++
}
}

for sent := 0; (q.Limit <= 0 || sent < q.Limit) && it.Valid(); it.Next() {
item := it.Item()
e := dsq.Entry{Key: string(item.Key())}

// Maybe get the value
var result dsq.Result
if !q.KeysOnly {
b, err := item.ValueCopy(nil)
Expand All @@ -533,34 +594,29 @@ func (t *txn) query(q dsq.Query) (dsq.Results, error) {
}

if q.ReturnExpirations {
result.Expiration = time.Unix(int64(item.ExpiresAt()), 0)
result.Expiration = expires(item)
}

// Finally, filter it (unless we're dealing with an error).
if result.Error == nil && filter(q.Filters, e) {
continue
}

select {
case qrb.Output <- result:
sent++
case <-t.ds.closing: // datastore closing.
closedEarly = true
return
case <-worker.Closing(): // client told us to close early
return
}

it.Next()
}
})

go qrb.Process.CloseAfterChildren() //nolint

// Now, apply remaining things (filters, order)
qr := qrb.Results()
for _, f := range q.Filters {
qr = dsq.NaiveFilter(qr, f)
}
if len(orders) > 0 {
qr = dsq.NaiveOrder(qr, orders...)
}

return qr, nil
return qrb.Results(), nil
}

func (t *txn) Commit() error {
Expand Down Expand Up @@ -604,3 +660,17 @@ func (t *txn) Discard() {
func (t *txn) discard() {
t.txn.Discard()
}

// filter returns _true_ if we should filter (skip) the entry
func filter(filters []dsq.Filter, entry dsq.Entry) bool {
for _, f := range filters {
if !f.Filter(entry) {
return true
}
}
return false
}

func expires(item *badger.Item) time.Time {
return time.Unix(int64(item.ExpiresAt()), 0)
}
8 changes: 8 additions & 0 deletions ds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
dstest "github.com/ipfs/go-datastore/test"
)

var testcases = map[string]string{
Expand Down Expand Up @@ -821,3 +822,10 @@ func TestExpirations(t *testing.T) {
t.Fatalf("wrong error type: %v", err)
}
}

func TestSuite(t *testing.T) {
d, done := newDS(t)
defer done()

dstest.SubtestAll(t, d)
}
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ module github.com/ipfs/go-ds-badger

require (
github.com/dgraph-io/badger v1.6.0
github.com/ipfs/go-datastore v0.1.0
github.com/ipfs/go-datastore v0.1.1
github.com/ipfs/go-log v0.0.1
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8
)

go 1.12
9 changes: 3 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/badger v1.6.0-rc1 h1:JphPpoBZJ3WHha133BGYlQqltSGIhV+VsEID0++nN9A=
github.com/dgraph-io/badger v1.6.0-rc1/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
github.com/dgraph-io/badger v1.6.0 h1:DshxFxZWXUcO0xX476VJC07Xsr6ZCBVRHKZ93Oh7Evo=
github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
Expand All @@ -29,10 +27,8 @@ github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/ipfs/go-datastore v0.0.1 h1:AW/KZCScnBWlSb5JbnEnLKFWXL224LBEh/9KXXOrUms=
github.com/ipfs/go-datastore v0.0.1/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE=
github.com/ipfs/go-datastore v0.1.0 h1:TOxI04l8CmO4zGtesENhzm4PwkFwJXY3rKiYaaMf9fI=
github.com/ipfs/go-datastore v0.1.0/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE=
github.com/ipfs/go-datastore v0.1.1 h1:F4k0TkTAZGLFzBOrVKDAvch6JZtuN4NHkfdcEZL50aI=
github.com/ipfs/go-datastore v0.1.1/go.mod h1:w38XXW9kVFNp57Zj5knbKWM2T+KOZCGDRVNdgPHtbHw=
github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw=
github.com/ipfs/go-log v0.0.1 h1:9XTUN/rW64BCG1YhPK9Hoy3q8nr4gOmHHBpgFdfw6Lc=
github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM=
Expand Down Expand Up @@ -88,6 +84,7 @@ golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb h1:fgwFCsaw9buMuxNd6+DQfAuSF
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down