Skip to content

Commit

Permalink
feat: add context to interfaces (#181)
Browse files Browse the repository at this point in the history
This adds contexts to all the Datastore interfaces. The motivation for
this change is for instrumentation, not cancellation, although these
can certainly be used in the future for adding cancellation. We
default to adding context to everything, even if we don't immediately
use it, because we might need them in the future and making this
change again is quite painful due to the large number of repos this
fans out to.

Note that we have not added context to Close() methods, due to it
being surprising given that it breaks the io.Closer interface, and
many Close() methods are quick and don't do much work.

This also disables the fuzz test, because it has a submodule which
transitively depends on this module, so it will fail to build until
this change is plumbed through go-ds-flatfs.
  • Loading branch information
guseggert authored Oct 27, 2021
1 parent a6c97c1 commit b23ab2f
Show file tree
Hide file tree
Showing 24 changed files with 572 additions and 480 deletions.
55 changes: 29 additions & 26 deletions autobatch/autobatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package autobatch

import (
"context"

ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
)
Expand Down Expand Up @@ -34,16 +36,16 @@ func NewAutoBatching(d ds.Batching, size int) *Datastore {
}

// Delete deletes a key/value
func (d *Datastore) Delete(k ds.Key) error {
func (d *Datastore) Delete(ctx context.Context, k ds.Key) error {
d.buffer[k] = op{delete: true}
if len(d.buffer) > d.maxBufferEntries {
return d.Flush()
return d.Flush(ctx)
}
return nil
}

// Get retrieves a value given a key.
func (d *Datastore) Get(k ds.Key) ([]byte, error) {
func (d *Datastore) Get(ctx context.Context, k ds.Key) ([]byte, error) {
o, ok := d.buffer[k]
if ok {
if o.delete {
Expand All @@ -52,22 +54,22 @@ func (d *Datastore) Get(k ds.Key) ([]byte, error) {
return o.value, nil
}

return d.child.Get(k)
return d.child.Get(ctx, k)
}

// Put stores a key/value.
func (d *Datastore) Put(k ds.Key, val []byte) error {
func (d *Datastore) Put(ctx context.Context, k ds.Key, val []byte) error {
d.buffer[k] = op{value: val}
if len(d.buffer) > d.maxBufferEntries {
return d.Flush()
return d.Flush(ctx)
}
return nil
}

// Sync flushes all operations on keys at or under the prefix
// from the current batch to the underlying datastore
func (d *Datastore) Sync(prefix ds.Key) error {
b, err := d.child.Batch()
func (d *Datastore) Sync(ctx context.Context, prefix ds.Key) error {
b, err := d.child.Batch(ctx)
if err != nil {
return err
}
Expand All @@ -79,9 +81,9 @@ func (d *Datastore) Sync(prefix ds.Key) error {

var err error
if o.delete {
err = b.Delete(k)
err = b.Delete(ctx, k)
} else {
err = b.Put(k, o.value)
err = b.Put(ctx, k, o.value)
}
if err != nil {
return err
Expand All @@ -90,22 +92,22 @@ func (d *Datastore) Sync(prefix ds.Key) error {
delete(d.buffer, k)
}

return b.Commit()
return b.Commit(ctx)
}

// Flush flushes the current batch to the underlying datastore.
func (d *Datastore) Flush() error {
b, err := d.child.Batch()
func (d *Datastore) Flush(ctx context.Context) error {
b, err := d.child.Batch(ctx)
if err != nil {
return err
}

for k, o := range d.buffer {
var err error
if o.delete {
err = b.Delete(k)
err = b.Delete(ctx, k)
} else {
err = b.Put(k, o.value)
err = b.Put(ctx, k, o.value)
}
if err != nil {
return err
Expand All @@ -114,21 +116,21 @@ func (d *Datastore) Flush() error {
// clear out buffer
d.buffer = make(map[ds.Key]op, d.maxBufferEntries)

return b.Commit()
return b.Commit(ctx)
}

// Has checks if a key is stored.
func (d *Datastore) Has(k ds.Key) (bool, error) {
func (d *Datastore) Has(ctx context.Context, k ds.Key) (bool, error) {
o, ok := d.buffer[k]
if ok {
return !o.delete, nil
}

return d.child.Has(k)
return d.child.Has(ctx, k)
}

// GetSize implements Datastore.GetSize
func (d *Datastore) GetSize(k ds.Key) (int, error) {
func (d *Datastore) GetSize(ctx context.Context, k ds.Key) (int, error) {
o, ok := d.buffer[k]
if ok {
if o.delete {
Expand All @@ -137,26 +139,27 @@ func (d *Datastore) GetSize(k ds.Key) (int, error) {
return len(o.value), nil
}

return d.child.GetSize(k)
return d.child.GetSize(ctx, k)
}

// Query performs a query
func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
err := d.Flush()
func (d *Datastore) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) {
err := d.Flush(ctx)
if err != nil {
return nil, err
}

return d.child.Query(q)
return d.child.Query(ctx, q)
}

// DiskUsage implements the PersistentDatastore interface.
func (d *Datastore) DiskUsage() (uint64, error) {
return ds.DiskUsage(d.child)
func (d *Datastore) DiskUsage(ctx context.Context) (uint64, error) {
return ds.DiskUsage(ctx, d.child)
}

func (d *Datastore) Close() error {
err1 := d.Flush()
ctx := context.Background()
err1 := d.Flush(ctx)
err2 := d.child.Close()
if err1 != nil {
return err1
Expand Down
48 changes: 28 additions & 20 deletions autobatch/autobatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package autobatch

import (
"bytes"
"context"
"fmt"
"testing"

Expand All @@ -14,6 +15,8 @@ func TestAutobatch(t *testing.T) {
}

func TestFlushing(t *testing.T) {
ctx := context.Background()

child := ds.NewMapDatastore()
d := NewAutoBatching(child, 16)

Expand All @@ -24,15 +27,15 @@ func TestFlushing(t *testing.T) {
v := []byte("hello world")

for _, k := range keys {
err := d.Put(k, v)
err := d.Put(ctx, k, v)
if err != nil {
t.Fatal(err)
}
}

// Get works normally.
for _, k := range keys {
val, err := d.Get(k)
val, err := d.Get(ctx, k)
if err != nil {
t.Fatal(err)
}
Expand All @@ -43,36 +46,36 @@ func TestFlushing(t *testing.T) {
}

// Not flushed
_, err := child.Get(keys[0])
_, err := child.Get(ctx, keys[0])
if err != ds.ErrNotFound {
t.Fatal("shouldnt have found value")
}

// Delete works.
err = d.Delete(keys[14])
err = d.Delete(ctx, keys[14])
if err != nil {
t.Fatal(err)
}
_, err = d.Get(keys[14])
_, err = d.Get(ctx, keys[14])
if err != ds.ErrNotFound {
t.Fatal(err)
}

// Still not flushed
_, err = child.Get(keys[0])
_, err = child.Get(ctx, keys[0])
if err != ds.ErrNotFound {
t.Fatal("shouldnt have found value")
}

// Final put flushes.
err = d.Put(ds.NewKey("test16"), v)
err = d.Put(ctx, ds.NewKey("test16"), v)
if err != nil {
t.Fatal(err)
}

// should be flushed now, try to get keys from child datastore
for _, k := range keys[:14] {
val, err := child.Get(k)
val, err := child.Get(ctx, k)
if err != nil {
t.Fatal(err)
}
Expand All @@ -83,18 +86,18 @@ func TestFlushing(t *testing.T) {
}

// Never flushed the deleted key.
_, err = child.Get(keys[14])
_, err = child.Get(ctx, keys[14])
if err != ds.ErrNotFound {
t.Fatal("shouldnt have found value")
}

// Delete doesn't flush
err = d.Delete(keys[0])
err = d.Delete(ctx, keys[0])
if err != nil {
t.Fatal(err)
}

val, err := child.Get(keys[0])
val, err := child.Get(ctx, keys[0])
if err != nil {
t.Fatal(err)
}
Expand All @@ -105,22 +108,24 @@ func TestFlushing(t *testing.T) {
}

func TestSync(t *testing.T) {
ctx := context.Background()

child := ds.NewMapDatastore()
d := NewAutoBatching(child, 100)

put := func(key ds.Key) {
if err := d.Put(key, []byte(key.String())); err != nil {
if err := d.Put(ctx, key, []byte(key.String())); err != nil {
t.Fatal(err)
}
}
del := func(key ds.Key) {
if err := d.Delete(key); err != nil {
if err := d.Delete(ctx, key); err != nil {
t.Fatal(err)
}
}

get := func(d ds.Datastore, key ds.Key) {
val, err := d.Get(key)
val, err := d.Get(ctx, key)
if err != nil {
t.Fatal(err)
}
Expand All @@ -130,7 +135,7 @@ func TestSync(t *testing.T) {
}
}
invalidGet := func(d ds.Datastore, key ds.Key) {
if _, err := d.Get(key); err != ds.ErrNotFound {
if _, err := d.Get(ctx, key); err != ds.ErrNotFound {
t.Fatal("should not have found value")
}
}
Expand All @@ -146,6 +151,9 @@ func TestSync(t *testing.T) {
// For clarity comments are written as if op = Put and undoOp = Delete
func internalSyncTest(t *testing.T, d, child ds.Datastore, op, undoOp func(ds.Key),
checkOp, checkUndoOp func(ds.Datastore, ds.Key)) {

ctx := context.Background()

var keys []ds.Key
keymap := make(map[ds.Key]int)
for i := 0; i < 4; i++ {
Expand Down Expand Up @@ -185,7 +193,7 @@ func internalSyncTest(t *testing.T, d, child ds.Datastore, op, undoOp func(ds.Ke
checkUndoOp(child, ds.NewKey("0"))

// Sync the tree "0/*/*"
if err := d.Sync(ds.NewKey("0")); err != nil {
if err := d.Sync(ctx, ds.NewKey("0")); err != nil {
t.Fatal(err)
}

Expand All @@ -196,7 +204,7 @@ func internalSyncTest(t *testing.T, d, child ds.Datastore, op, undoOp func(ds.Ke
checkKeyRange(t, keymap, keys, child, [][]string{{"1", "3/1/1"}}, checkUndoOp)

// Sync the tree "1/1/*"
if err := d.Sync(ds.NewKey("1/1")); err != nil {
if err := d.Sync(ctx, ds.NewKey("1/1")); err != nil {
t.Fatal(err)
}

Expand All @@ -207,7 +215,7 @@ func internalSyncTest(t *testing.T, d, child ds.Datastore, op, undoOp func(ds.Ke
checkKeyRange(t, keymap, keys, child, [][]string{{"1", "1/0/1"}, {"2", "3/1/1"}}, checkUndoOp)

// Sync the tree "3/1/1"
if err := d.Sync(ds.NewKey("3/1/1")); err != nil {
if err := d.Sync(ctx, ds.NewKey("3/1/1")); err != nil {
t.Fatal(err)
}

Expand All @@ -217,7 +225,7 @@ func internalSyncTest(t *testing.T, d, child ds.Datastore, op, undoOp func(ds.Ke
// Verify no other keys were synchronized
checkKeyRange(t, keymap, keys, child, [][]string{{"1", "1/0/1"}, {"2", "3/1/0"}}, checkUndoOp)

if err := d.Sync(ds.Key{}); err != nil {
if err := d.Sync(ctx, ds.Key{}); err != nil {
t.Fatal(err)
}

Expand All @@ -231,7 +239,7 @@ func internalSyncTest(t *testing.T, d, child ds.Datastore, op, undoOp func(ds.Ke
op(deletedKey)

// Sync it
if err := d.Sync(deletedKey); err != nil {
if err := d.Sync(ctx, deletedKey); err != nil {
t.Fatal(err)
}

Expand Down
Loading

0 comments on commit b23ab2f

Please sign in to comment.