Skip to content

Commit

Permalink
Merge pull request #28 from ipfs/feat/fast-reverse-query
Browse files Browse the repository at this point in the history
Fast reverse query
  • Loading branch information
Stebalien authored Apr 9, 2019
2 parents 253c31f + 272a284 commit 47a9627
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 135 deletions.
2 changes: 0 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ env:
global:
- GOTFLAGS="-race -cpu=5"
matrix:
- BUILD_DEPTYPE=gx
- BUILD_DEPTYPE=gomod


Expand All @@ -24,7 +23,6 @@ script:

cache:
directories:
- $GOPATH/src/gx
- $GOPATH/pkg/mod
- $HOME/.cache/go-build

Expand Down
115 changes: 22 additions & 93 deletions datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
"github.com/jbenet/goprocess"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/iterator"
Expand Down Expand Up @@ -114,25 +113,33 @@ func (a *accessor) Delete(key ds.Key) (err error) {
}

func (a *accessor) Query(q dsq.Query) (dsq.Results, error) {
return a.queryNew(q)
}

func (a *accessor) queryNew(q dsq.Query) (dsq.Results, error) {
if len(q.Filters) > 0 ||
len(q.Orders) > 0 ||
q.Limit > 0 ||
q.Offset > 0 {
return a.queryOrig(q)
}
var rnge *util.Range

// make a copy of the query for the fallback naive query implementation.
// don't modify the original so res.Query() returns the correct results.
qNaive := q
if q.Prefix != "" {
rnge = util.BytesPrefix([]byte(q.Prefix))
qNaive.Prefix = ""
}
i := a.ldb.NewIterator(rnge, nil)
return dsq.ResultsFromIterator(q, dsq.Iterator{
next := i.Next
if len(q.Orders) > 0 {
switch q.Orders[0].(type) {
case dsq.OrderByKey, *dsq.OrderByKey:
qNaive.Orders = nil
case dsq.OrderByKeyDescending, *dsq.OrderByKeyDescending:
next = func() bool {
next = i.Prev
return i.Last()
}
qNaive.Orders = nil
default:
}
}
r := dsq.ResultsFromIterator(q, dsq.Iterator{
Next: func() (dsq.Result, bool) {
ok := i.Next()
if !ok {
if !next() {
return dsq.Result{}, false
}
k := string(i.Key())
Expand All @@ -149,86 +156,8 @@ func (a *accessor) queryNew(q dsq.Query) (dsq.Results, error) {
i.Release()
return nil
},
}), nil
}

func (a *accessor) queryOrig(q dsq.Query) (dsq.Results, error) {
// we can use multiple iterators concurrently. see:
// https://godoc.org/github.com/syndtr/goleveldb/leveldb#DB.NewIterator
// advance the iterator only if the reader reads
//
// run query in own sub-process tied to Results.Process(), so that
// it waits for us to finish AND so that clients can signal to us
// that resources should be reclaimed.
qrb := dsq.NewResultBuilder(q)
qrb.Process.Go(func(worker goprocess.Process) {
a.runQuery(worker, qrb)
})

// go wait on the worker (without signaling close)
go qrb.Process.CloseAfterChildren()

// Now, apply remaining things (filters, order)
qr := qrb.Results()
for _, f := range q.Filters {
qr = dsq.NaiveFilter(qr, f)
}
if len(q.Orders) > 0 {
switch q.Orders[0].(type) {
case dsq.OrderByKey, *dsq.OrderByKey:
// Default ordering
default:
qr = dsq.NaiveOrder(qr, q.Orders...)
}
}
return qr, nil
}

func (a *accessor) runQuery(worker goprocess.Process, qrb *dsq.ResultBuilder) {
var rnge *util.Range
if qrb.Query.Prefix != "" {
rnge = util.BytesPrefix([]byte(qrb.Query.Prefix))
}
i := a.ldb.NewIterator(rnge, nil)
defer i.Release()

// advance iterator for offset
if qrb.Query.Offset > 0 {
for j := 0; j < qrb.Query.Offset; j++ {
i.Next()
}
}

// iterate, and handle limit, too
for sent := 0; i.Next(); sent++ {
// end early if we hit the limit
if qrb.Query.Limit > 0 && sent >= qrb.Query.Limit {
break
}

k := string(i.Key())
e := dsq.Entry{Key: k}

if !qrb.Query.KeysOnly {
buf := make([]byte, len(i.Value()))
copy(buf, i.Value())
e.Value = buf
}

select {
case qrb.Output <- dsq.Result{Entry: e}: // we sent it out
case <-worker.Closing(): // client told us to end early.
break
}
}

if err := i.Error(); err != nil {
select {
case qrb.Output <- dsq.Result{Error: err}: // client read our error
case <-worker.Closing(): // client told us to end.
return
}
}
return dsq.NaiveQueryApply(qNaive, r), nil
}

// DiskUsage returns the current disk size used by this levelDB.
Expand Down
46 changes: 46 additions & 0 deletions ds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io/ioutil"
"os"
"sort"
"testing"

ds "github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -101,6 +102,33 @@ func testQuery(t *testing.T, d *Datastore) {
"/a/b/d",
"/a/c",
}, rs)

// test order

rs, err = d.Query(dsq.Query{Orders: []dsq.Order{dsq.OrderByKey{}}})
if err != nil {
t.Fatal(err)
}

keys := make([]string, 0, len(testcases))
for k := range testcases {
keys = append(keys, k)
}
sort.Strings(keys)

expectOrderedMatches(t, keys, rs)

rs, err = d.Query(dsq.Query{Orders: []dsq.Order{dsq.OrderByKeyDescending{}}})
if err != nil {
t.Fatal(err)
}

// reverse
for i, j := 0, len(keys)-1; i < j; i, j = i+1, j-1 {
keys[i], keys[j] = keys[j], keys[i]
}

expectOrderedMatches(t, keys, rs)
}

func TestQuery(t *testing.T) {
Expand All @@ -125,6 +153,7 @@ func TestQueryRespectsProcessMem(t *testing.T) {
}

func expectMatches(t *testing.T, expect []string, actualR dsq.Results) {
t.Helper()
actual, err := actualR.Rest()
if err != nil {
t.Error(err)
Expand All @@ -146,6 +175,23 @@ func expectMatches(t *testing.T, expect []string, actualR dsq.Results) {
}
}

func expectOrderedMatches(t *testing.T, expect []string, actualR dsq.Results) {
t.Helper()
actual, err := actualR.Rest()
if err != nil {
t.Error(err)
}

if len(actual) != len(expect) {
t.Error("not enough", expect, actual)
}
for i := range expect {
if expect[i] != actual[i].Key {
t.Errorf("expected %q, got %q", expect[i], actual[i].Key)
}
}
}

func testBatching(t *testing.T, d *Datastore) {
b, err := d.Batch()
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
module github.com/ipfs/go-ds-leveldb

require (
github.com/ipfs/go-datastore v0.0.1
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8
github.com/ipfs/go-datastore v0.0.3
github.com/syndtr/goleveldb v1.0.0
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ipfs/go-datastore v0.0.1 h1:AW/KZCScnBWlSb5JbnEnLKFWXL224LBEh/9KXXOrUms=
github.com/ipfs/go-datastore v0.0.1/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE=
github.com/ipfs/go-datastore v0.0.3 h1:/eP3nMDmLzMJNoWSSYvEkmMTTrm9FFCN+JraP9NdlwU=
github.com/ipfs/go-datastore v0.0.3/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE=
github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw=
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8 h1:bspPhN+oKYFk5fcGNuQzp6IGzYQSenLEgH3s6jkXrWw=
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY=
Expand Down
36 changes: 0 additions & 36 deletions package.json

This file was deleted.

0 comments on commit 47a9627

Please sign in to comment.