Skip to content

Commit

Permalink
refactor: add context to quad store and iterators
Browse files Browse the repository at this point in the history
Add a context parameter to all functions that require one: this required
significant refactoring across the tree to add context parameters to iterators,
the quad store, quad writer, and the schema system.

Signed-off-by: Christian Stewart <[email protected]>
  • Loading branch information
paralin committed May 26, 2023
1 parent 0423d8c commit bcbeda7
Show file tree
Hide file tree
Showing 155 changed files with 3,218 additions and 2,059 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ ui/
packrd/packed-*
# The build binary
/cayley
.#*
12 changes: 7 additions & 5 deletions cmd/cayley/command/convert.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package command

import (
"context"
"errors"
"fmt"
"io"
Expand All @@ -21,15 +22,15 @@ type lazyReader struct {
open func() (quad.ReadCloser, error)
}

func (r *lazyReader) ReadQuad() (quad.Quad, error) {
func (r *lazyReader) ReadQuad(ctx context.Context) (quad.Quad, error) {
if r.rc == nil {
rc, err := r.open()
if err != nil {
return quad.Quad{}, err
}
r.rc = rc
}
return r.rc.ReadQuad()
return r.rc.ReadQuad(ctx)
}
func (r *lazyReader) Close() (err error) {
if r.rc != nil {
Expand All @@ -43,13 +44,13 @@ type multiReader struct {
i int
}

func (r *multiReader) ReadQuad() (quad.Quad, error) {
func (r *multiReader) ReadQuad(ctx context.Context) (quad.Quad, error) {
for {
if r.i >= len(r.rc) {
return quad.Quad{}, io.EOF
}
rc := r.rc[r.i]
q, err := rc.ReadQuad()
q, err := rc.ReadQuad(ctx)
if err == io.EOF {
rc.Close()
r.i++
Expand Down Expand Up @@ -105,7 +106,8 @@ func NewConvertCmd() *cobra.Command {
}))
}
// TODO: print additional stats
return writerQuadsTo(dump, dumpf, &multi)
ctx := cmd.Context()
return writerQuadsTo(ctx, dump, dumpf, &multi)
},
}
registerLoadFlags(cmd)
Expand Down
15 changes: 9 additions & 6 deletions cmd/cayley/command/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,22 @@ func NewLoadDatabaseCmd() *cobra.Command {
}
defer h.Close()

qw, err := h.NewQuadWriter()
ctx := cmd.Context()
qw, err := h.NewQuadWriter(ctx)
if err != nil {
return err
}
defer qw.Close()

// TODO: check read-only flag in config before that?
typ, _ := cmd.Flags().GetString(flagLoadFormat)
if err = internal.Load(qw, quad.DefaultBatch, load, typ); err != nil {
if err = internal.Load(ctx, qw, quad.DefaultBatch, load, typ); err != nil {
return err
}

if dump, _ := cmd.Flags().GetString(flagDump); dump != "" {
typ, _ := cmd.Flags().GetString(flagDumpFormat)
if err = dumpDatabase(h, dump, typ); err != nil {
if err = dumpDatabase(ctx, h, dump, typ); err != nil {
return err
}
}
Expand Down Expand Up @@ -159,7 +160,8 @@ func NewDumpDatabaseCmd() *cobra.Command {
defer h.Close()

typ, _ := cmd.Flags().GetString(flagDumpFormat)
return dumpDatabase(h, dump, typ)
ctx := cmd.Context()
return dumpDatabase(ctx, h, dump, typ)
},
}
registerDumpFlags(cmd)
Expand Down Expand Up @@ -246,8 +248,9 @@ func openForQueries(cmd *cobra.Command) (*graph.Handle, error) {
}
load = load2
}
ctx := cmd.Context()
if load != "" {
qw, err := h.NewQuadWriter()
qw, err := h.NewQuadWriter(ctx)
if err != nil {
h.Close()
return nil, err
Expand All @@ -257,7 +260,7 @@ func openForQueries(cmd *cobra.Command) (*graph.Handle, error) {
typ, _ := cmd.Flags().GetString(flagLoadFormat)
// TODO: check read-only flag in config before that?
start := time.Now()
if err = internal.Load(qw, quad.DefaultBatch, load, typ); err != nil {
if err = internal.Load(ctx, qw, quad.DefaultBatch, load, typ); err != nil {
h.Close()
return nil, err
}
Expand Down
33 changes: 22 additions & 11 deletions cmd/cayley/command/dedup.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func dedupProperties(ctx context.Context, h *graph.Handle, pred, typ quad.IRI) e
if txn == 0 {
return
}
err := h.ApplyTransaction(tx)
err := h.ApplyTransaction(ctx, tx)
if err == nil {
tx = graph.NewTransaction()
dedup += txn
Expand All @@ -139,18 +139,21 @@ func dedupProperties(ctx context.Context, h *graph.Handle, pred, typ quad.IRI) e
)
}
}
err := p.Iterate(ictx).Each(func(s graph.Ref) error {
err := p.Iterate(ictx).Each(ictx, func(s graph.Ref) error {
cnt++
it := qs.QuadIterator(quad.Subject, s).Iterate()
it := qs.QuadIterator(ctx, quad.Subject, s).Iterate(ctx)
defer it.Close()
m := make(map[interface{}]property)
for it.Next(ictx) {
q := it.Result()
p, err := qs.QuadDirection(q, quad.Predicate)
q, err := it.Result(ctx)
if err != nil {
return err
}
o, err := qs.QuadDirection(q, quad.Object)
p, err := qs.QuadDirection(ctx, q, quad.Predicate)
if err != nil {
return err
}
o, err := qs.QuadDirection(ctx, q, quad.Object)
if err != nil {
return err
}
Expand Down Expand Up @@ -187,15 +190,19 @@ func dedupProperties(ctx context.Context, h *graph.Handle, pred, typ quad.IRI) e
}

func dedupValueTx(ctx context.Context, h *graph.Handle, tx *graph.Transaction, a, b graph.Ref) error {
v, err := h.NameOf(b)
v, err := h.NameOf(ctx, b)
if err != nil {
return err
}
it := h.QuadIterator(quad.Object, a).Iterate()
it := h.QuadIterator(ctx, quad.Object, a).Iterate(ctx)
defer it.Close()
for it.Next(ctx) {
// TODO(dennwc): we should be able to add "raw" quads without getting values for directions
q, err := h.Quad(it.Result())
res, err := it.Result(ctx)
if err != nil {
return err
}
q, err := h.Quad(ctx, res)
if err != nil {
return err
}
Expand All @@ -208,10 +215,14 @@ func dedupValueTx(ctx context.Context, h *graph.Handle, tx *graph.Transaction, a
}
it.Close()

it = h.QuadIterator(quad.Subject, a).Iterate()
it = h.QuadIterator(ctx, quad.Subject, a).Iterate(ctx)
defer it.Close()
for it.Next(ctx) {
q, err := h.Quad(it.Result())
res, err := it.Result(ctx)
if err != nil {
return err
}
q, err := h.Quad(ctx, res)
if err != nil {
return err
}
Expand Down
11 changes: 6 additions & 5 deletions cmd/cayley/command/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package command

import (
"compress/gzip"
"context"
"fmt"
"io"
"os"
Expand All @@ -13,7 +14,7 @@ import (
"github.com/cayleygraph/quad"
)

func writerQuadsTo(path string, typ string, qr quad.Reader) error {
func writerQuadsTo(ctx context.Context, path string, typ string, qr quad.Reader) error {
var f *os.File
if path == "-" {
f = os.Stdout
Expand Down Expand Up @@ -54,7 +55,7 @@ func writerQuadsTo(path string, typ string, qr quad.Reader) error {
qw := format.Writer(w)
defer qw.Close()

n, err := quad.Copy(qw, qr)
n, err := quad.Copy(ctx, qw, qr)
if err != nil {
return err
} else if err = qw.Close(); err != nil {
Expand All @@ -66,9 +67,9 @@ func writerQuadsTo(path string, typ string, qr quad.Reader) error {
return nil
}

func dumpDatabase(h *graph.Handle, path string, typ string) error {
func dumpDatabase(ctx context.Context, h *graph.Handle, path string, typ string) error {
//TODO: add possible support for exporting specific queries only
qr := graph.NewQuadStoreReader(h.QuadStore)
qr := graph.NewQuadStoreReader(ctx, h.QuadStore)
defer qr.Close()
return writerQuadsTo(path, typ, qr)
return writerQuadsTo(ctx, path, typ, qr)
}
6 changes: 5 additions & 1 deletion cmd/cayley/command/repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ func NewQueryCmd() *cobra.Command {
}
defer it.Close()
for i := 0; it.Next(ctx) && (limit <= 0 || i < limit); i++ {
if err = enc.Encode(it.Result()); err != nil {
res, err := it.Result(ctx)
if err != nil {
return err
}
if err = enc.Encode(res); err != nil {
return err
}
}
Expand Down
4 changes: 3 additions & 1 deletion cmd/cayleyexport/cayleyexport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"bytes"
"context"
"fmt"
"net/http"
"testing"
Expand All @@ -28,7 +29,8 @@ var testData = []quad.Quad{
func serializeTestData() string {
buffer := bytes.NewBuffer(nil)
writer := jsonld.NewWriter(buffer)
writer.WriteQuads(testData)
ctx := context.Background()
writer.WriteQuads(ctx, testData)
writer.Close()
return buffer.String()
}
Expand Down
4 changes: 2 additions & 2 deletions docs/getting-involved/glossary.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ _Note: this definitions in this glossary are sequenced so that they build on eac
```go
type Shape interface {
BuildIterator(qs graph.QuadStore) graph.Iterator
Optimize(r Optimizer) (Shape, bool)
BuildIterator(ctx context.Context, qs graph.QuadStore) graph.Iterator
Optimize(ctx context.Context, r Optimizer) (Shape, bool, error)
}
```
Expand Down
4 changes: 2 additions & 2 deletions docs/glossary.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ _Note: this definitions in this glossary are sequenced so that they build on eac
```go
type Shape interface {
BuildIterator(qs graph.QuadStore) graph.Iterator
Optimize(ctx context.Context, r Optimizer) (Shape, bool)
BuildIterator(ctx context.Context, qs graph.QuadStore) graph.Iterator
Optimize(ctx context.Context, r Optimizer) (Shape, bool, error)
}
```
Expand Down
13 changes: 8 additions & 5 deletions examples/hello_bolt/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func main() {
log.Fatalln(err)
}

store.AddQuad(quad.Make("phrase of the day", "is of course", "Hello BoltDB!", "demo graph"))
store.AddQuad(ctx, quad.Make("phrase of the day", "is of course", "Hello BoltDB!", "demo graph"))

// Now we create the path, to get to our data
p := cayley.StartPath(store, quad.String("phrase of the day")).Out(quad.String("is of course"))
Expand All @@ -45,16 +45,19 @@ func main() {

// Now we get an iterator for the path and optimize it.
// The second return is if it was optimized, but we don't care for now.
its, _ := p.BuildIterator(ctx).Optimize(ctx)
it := its.Iterate()
its, _, _ := p.BuildIterator(ctx).Optimize(ctx)
it := its.Iterate(ctx)

// remember to cleanup after yourself
defer it.Close()

// While we have items
for it.Next(ctx) {
token := it.Result() // get a ref to a node (backend-specific)
value, err := store.NameOf(token) // get the value in the node (RDF)
token, err := it.Result(ctx) // get a ref to a node (backend-specific)
if err != nil {
log.Fatalln(err)
}
value, err := store.NameOf(ctx, token) // get the value in the node (RDF)
if err != nil {
log.Fatalln(err)
}
Expand Down
12 changes: 8 additions & 4 deletions examples/hello_schema/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func main() {
Name: "Bob", Age: 32,
}
fmt.Printf("saving: %+v\n", bob)
id, err := sch.WriteAsQuads(qw, bob)
id, err := sch.WriteAsQuads(ctx, qw, bob)
checkErr(err)
err = qw.Close()
checkErr(err)
Expand Down Expand Up @@ -107,7 +107,7 @@ func main() {
}
qw = graph.NewWriter(store)
for _, c := range coords {
id, err = sch.WriteAsQuads(qw, c)
id, err = sch.WriteAsQuads(ctx, qw, c)
checkErr(err)
fmt.Println("generated id:", id)
}
Expand All @@ -122,9 +122,13 @@ func main() {

// Print quads
fmt.Println("\nquads:")
it := store.QuadsAllIterator().Iterate()
it := store.QuadsAllIterator(ctx).Iterate(ctx)
defer it.Close()
for it.Next(ctx) {
fmt.Println(store.Quad(it.Result()))
res, err := it.Result(ctx)
checkErr(err)
q, err := store.Quad(ctx, res)
checkErr(err)
fmt.Println(q)
}
}
4 changes: 2 additions & 2 deletions examples/hello_world/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ func main() {
log.Fatalln(err)
}

store.AddQuad(quad.Make("phrase of the day", "is of course", "Hello World!", nil))
store.AddQuad(ctx, quad.Make("phrase of the day", "is of course", "Hello World!", nil))

// Now we create the path, to get to our data
p := cayley.StartPath(store, quad.String("phrase of the day")).Out(quad.String("is of course"))

// Now we iterate over results. Arguments:
// 1. Optional context used for cancellation.
// 2. Quad store, but we can omit it because we have already built path with it.
err = p.Iterate(nil).EachValue(nil, func(value quad.Value) error {
err = p.Iterate(nil).EachValue(ctx, nil, func(value quad.Value) error {
nativeValue := quad.NativeOf(value) // this converts RDF values to normal Go types
fmt.Println(nativeValue)
return nil
Expand Down
4 changes: 2 additions & 2 deletions examples/transaction/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ func main() {
t.AddQuad(quad.Make("cats", "want to", "kill you", nil))

// Apply the transaction
err = store.ApplyTransaction(t)
err = store.ApplyTransaction(ctx, t)
if err != nil {
log.Fatalln(err)
}

p := cayley.StartPath(store, quad.String("cats")).Out(quad.String("are"))

err = p.Iterate(nil).EachValue(nil, func(v quad.Value) error {
err = p.Iterate(nil).EachValue(ctx, nil, func(v quad.Value) error {
fmt.Println("cats are", v.Native())
return nil
})
Expand Down
Loading

0 comments on commit bcbeda7

Please sign in to comment.