Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup and optimize naive query filters #125

Merged
merged 3 commits into from
Apr 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions batch.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package datastore

type verb int

type op struct {
delete bool
value []byte
Expand Down
11 changes: 0 additions & 11 deletions key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package datastore_test

import (
"bytes"
"math/rand"
"path"
"strings"
"testing"
Expand All @@ -14,16 +13,6 @@ import (
// Hook up gocheck into the "go test" runner.
func Test(t *testing.T) { TestingT(t) }

func randomString() string {
chars := "abcdefghijklmnopqrstuvwxyz1234567890"
var buf bytes.Buffer
l := rand.Intn(50)
for j := 0; j < l; j++ {
buf.WriteByte(chars[rand.Intn(len(chars))])
}
return buf.String()
}

type KeySuite struct{}

var _ = Suite(&KeySuite{})
Expand Down
6 changes: 0 additions & 6 deletions query/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@ import (
"testing"
)

type filterTestCase struct {
filter Filter
keys []string
expect []string
}

func testKeyFilter(t *testing.T, f Filter, keys []string, expect []string) {
e := make([]Entry, len(keys))
for i, k := range keys {
Expand Down
6 changes: 0 additions & 6 deletions query/order_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@ import (
"testing"
)

type orderTestCase struct {
order Order
keys []string
expect []string
}

func testKeyOrder(t *testing.T, f Order, keys []string, expect []string) {
e := make([]Entry, len(keys))
for i, k := range keys {
Expand Down
26 changes: 19 additions & 7 deletions query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ type Entry struct {
}

// Result is a special entry that includes an error, so that the client
// may be warned about internal errors.
// may be warned about internal errors. If Error is non-nil, Entry must be
// empty.
type Result struct {
Entry

Expand Down Expand Up @@ -203,12 +204,12 @@ func NewResultBuilder(q Query) *ResultBuilder {
}

// ResultsWithChan returns a Results object from a channel
// of Result entries. Respects its own Close()
// of Result entries.
//
// DEPRECATED: This iterator is impossible to cancel correctly. Canceling it
// will leave anything trying to write to the result channel hanging.
func ResultsWithChan(q Query, res <-chan Result) Results {
b := NewResultBuilder(q)

// go consume all the entries and add them to the results.
b.Process.Go(func(worker goprocess.Process) {
return ResultsWithProcess(q, func(worker goprocess.Process, out chan<- Result) {
for {
select {
case <-worker.Closing(): // client told us to close early
Expand All @@ -219,13 +220,24 @@ func ResultsWithChan(q Query, res <-chan Result) Results {
}

select {
case b.Output <- e:
case out <- e:
case <-worker.Closing(): // client told us to close early
return
}
}
}
})
}

// ResultsWithProcess returns a Results object with the results generated by the
// passed subprocess.
func ResultsWithProcess(q Query, proc func(goprocess.Process, chan<- Result)) Results {
b := NewResultBuilder(q)

// go consume all the entries and add them to the results.
b.Process.Go(func(worker goprocess.Process) {
proc(worker, b.Output)
})

go b.Process.CloseAfterChildren()
return b.Results()
Expand Down
153 changes: 81 additions & 72 deletions query/query_impl.go
Original file line number Diff line number Diff line change
@@ -1,78 +1,79 @@
package query

import "sort"
import (
"sort"

func DerivedResults(qr Results, ch <-chan Result) Results {
return &results{
query: qr.Query(),
proc: qr.Process(),
res: ch,
}
}
goprocess "github.com/jbenet/goprocess"
)

// NaiveFilter applies a filter to the results.
func NaiveFilter(qr Results, filter Filter) Results {
ch := make(chan Result)
go func() {
defer close(ch)
defer qr.Close()

for e := range qr.Next() {
if e.Error != nil || filter.Filter(e.Entry) {
ch <- e
return ResultsFromIterator(qr.Query(), Iterator{
Next: func() (Result, bool) {
for {
e, ok := qr.NextSync()
if !ok {
return Result{}, false
}
if e.Error != nil || filter.Filter(e.Entry) {
return e, true
}
}
}
}()

return ResultsWithChan(qr.Query(), ch)
},
Close: func() error {
return qr.Close()
},
})
}

// NaiveLimit truncates the results to a given int limit
func NaiveLimit(qr Results, limit int) Results {
ch := make(chan Result)
go func() {
defer close(ch)
defer qr.Close()

l := 0
for e := range qr.Next() {
if e.Error != nil {
ch <- e
continue
if limit == 0 {
// 0 means no limit
return qr
}
closed := false
return ResultsFromIterator(qr.Query(), Iterator{
Next: func() (Result, bool) {
if limit == 0 {
if !closed {
closed = true
err := qr.Close()
if err != nil {
return Result{Error: err}, true
}
}
return Result{}, false
}
ch <- e
l++
if limit > 0 && l >= limit {
break
limit--
return qr.NextSync()
},
Close: func() error {
if closed {
return nil
}
}
}()

return ResultsWithChan(qr.Query(), ch)
closed = true
return qr.Close()
},
})
}

// NaiveOffset skips a given number of results
func NaiveOffset(qr Results, offset int) Results {
ch := make(chan Result)
go func() {
defer close(ch)
defer qr.Close()

sent := 0
for e := range qr.Next() {
if e.Error != nil {
ch <- e
}

if sent < offset {
sent++
continue
return ResultsFromIterator(qr.Query(), Iterator{
Next: func() (Result, bool) {
for ; offset > 0; offset-- {
res, ok := qr.NextSync()
if !ok || res.Error != nil {
return res, ok
}
}
ch <- e
}
}()

return ResultsWithChan(qr.Query(), ch)
return qr.NextSync()
},
Close: func() error {
return qr.Close()
},
})
}

// NaiveOrder reorders results according to given orders.
Expand All @@ -83,29 +84,37 @@ func NaiveOrder(qr Results, orders ...Order) Results {
return qr
}

ch := make(chan Result)
var entries []Entry
go func() {
defer close(ch)
return ResultsWithProcess(qr.Query(), func(worker goprocess.Process, out chan<- Result) {
defer qr.Close()

for e := range qr.Next() {
if e.Error != nil {
ch <- e
var entries []Entry
collect:
for {
select {
case <-worker.Closing():
return
case e, ok := <-qr.Next():
if !ok {
break collect
}
if e.Error != nil {
out <- e
continue
}
entries = append(entries, e.Entry)
}

entries = append(entries, e.Entry)
}

sort.Slice(entries, func(i int, j int) bool {
return Less(orders, entries[i], entries[j])
})

for _, e := range entries {
ch <- Result{Entry: e}
select {
case <-worker.Closing():
return
case out <- Result{Entry: e}:
}
}
}()

return DerivedResults(qr, ch)
})
}

func NaiveQueryApply(q Query, qr Results) Results {
Expand Down