Skip to content

Commit

Permalink
kv: registration, scan by prefix, support for flat kv
Browse files Browse the repository at this point in the history
kv: add registration for kv backends (mapped to qs registration)
kv: support scanning by prefix
kv: support for flat kvs (mapped to bolt-like)
bolt2: do not initialize all index buckets upfront, add an option
  • Loading branch information
dennwc committed Aug 8, 2017
1 parent 9a7d809 commit 89595f6
Show file tree
Hide file tree
Showing 7 changed files with 418 additions and 211 deletions.
89 changes: 60 additions & 29 deletions graph/bolt2/bolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package bolt2

import (
"bytes"
"os"
"path/filepath"

Expand All @@ -26,101 +27,131 @@ import (
)

func init() {
graph.RegisterQuadStore(QuadStoreType, graph.QuadStoreRegistration{
NewFunc: newQuadStore,
UpgradeFunc: nil,
InitFunc: createNewBolt,
kv.Register(Type, kv.Registration{
NewFunc: Open,
InitFunc: Create,
IsPersistent: true,
})
}

const (
QuadStoreType = "bolt2"
Type = "bolt2"
)

func getBoltFile(cfgpath string) string {
return filepath.Join(cfgpath, "indexes.bolt")
}

func createNewBolt(path string, options graph.Options) error {
func Create(path string, _ graph.Options) (kv.BucketKV, error) {
err := os.MkdirAll(path, 0700)
if err != nil {
return err
return nil, err
}
db, err := bolt.Open(getBoltFile(path), 0600, nil)
if err != nil {
clog.Errorf("Error: couldn't create Bolt database: %v", err)
return err
return nil, err
}
defer db.Close()
return kv.Init(&DB{DB: db}, options)
return &DB{DB: db}, nil
}

func newQuadStore(path string, options graph.Options) (graph.QuadStore, error) {
func Open(path string, opt graph.Options) (kv.BucketKV, error) {
db, err := bolt.Open(getBoltFile(path), 0600, nil)
if err != nil {
clog.Errorf("Error, couldn't open! %v", err)
return nil, err
}
// BoolKey returns false on non-existence. IE, Sync by default.
db.NoSync, _, err = options.BoolKey("nosync")
db.NoSync, _, err = opt.BoolKey("nosync")
if err != nil {
db.Close()
return nil, err
}
if db.NoSync {
clog.Infof("Running in nosync mode")
}
return kv.New(&DB{DB: db}, options)
return &DB{DB: db}, nil
}

type DB struct {
*bolt.DB
DB *bolt.DB
}

func (db *DB) Type() string {
return QuadStoreType
return Type
}
func (db *DB) Close() error {
return db.DB.Close()
}
func (db *DB) View() (kv.Tx, error) {
func (db *DB) View() (kv.BucketTx, error) {
tx, err := db.DB.Begin(false)
if err != nil {
return nil, err
}
return Tx{tx}, nil
return &Tx{Tx: tx}, nil
}
func (db *DB) Update() (kv.Tx, error) {
func (db *DB) Update() (kv.BucketTx, error) {
tx, err := db.DB.Begin(true)
if err != nil {
return nil, err
}
return Tx{tx}, nil
return &Tx{Tx: tx}, nil
}

type Tx struct {
*bolt.Tx
Tx *bolt.Tx
}

func (tx Tx) Bucket(name []byte) kv.Bucket {
func (tx *Tx) Commit() error {
return tx.Tx.Commit()
}
func (tx *Tx) Rollback() error {
return tx.Tx.Rollback()
}
func (tx *Tx) Bucket(name []byte) kv.Bucket {
b := tx.Tx.Bucket(name)
if b == nil {
return nil
}
return Bucket{b}
return &Bucket{b}
}
func (tx Tx) CreateBucket(name []byte) (kv.Bucket, error) {
b, err := tx.Tx.CreateBucket(name)
func (tx *Tx) CreateBucket(name []byte, excl bool) (kv.Bucket, error) {
var (
b *bolt.Bucket
err error
)
if excl {
b, err = tx.Tx.CreateBucket(name)
} else {
b, err = tx.Tx.CreateBucketIfNotExists(name)
}
if err != nil {
return nil, err
}
return Bucket{b}, nil
return &Bucket{b}, nil
}

var _ kv.FillBucket = Bucket{}
var _ kv.FillBucket = (*Bucket)(nil)

type Bucket struct {
*bolt.Bucket
Bucket *bolt.Bucket
}

func (b *Bucket) Get(k []byte) []byte { return b.Bucket.Get(k) }
func (b *Bucket) Put(k, v []byte) error { return b.Bucket.Put(k, v) }
func (b *Bucket) ForEach(pref []byte, fnc func(k, v []byte) error) error {
if pref == nil {
return b.Bucket.ForEach(fnc)
}
c := b.Bucket.Cursor()
for k, v := c.Seek(pref); bytes.HasPrefix(k, pref); k, v = c.Next() {
if err := fnc(k, v); err != nil {
return err
}
}
return nil
}

func (b Bucket) SetFillPercent(v float64) {
b.FillPercent = v
func (b *Bucket) SetFillPercent(v float64) {
b.Bucket.FillPercent = v
}
92 changes: 6 additions & 86 deletions graph/bolt2/bolt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,109 +17,29 @@ package bolt2
import (
"io/ioutil"
"os"
"reflect"
"testing"

"github.com/cayleygraph/cayley/graph"
"github.com/cayleygraph/cayley/graph/graphtest"
"github.com/cayleygraph/cayley/graph/iterator"
"github.com/cayleygraph/cayley/graph/kv"
"github.com/cayleygraph/cayley/graph/path/pathtest"
"github.com/cayleygraph/cayley/quad"
"github.com/cayleygraph/cayley/graph/kv/kvtest"
)

//var _ graphtest.ValueSizer = (*QuadStore)(nil)

func TestCreateDatabase(t *testing.T) {
func makeBolt(t testing.TB) (kv.BucketKV, graph.Options, func()) {
tmpDir, err := ioutil.TempDir(os.TempDir(), "cayley_test_bolt2")
if err != nil {
t.Fatalf("Could not create working directory: %v", err)
}
t.Log(tmpDir)

err = createNewBolt(tmpDir, nil)
if err != nil {
t.Fatal("Failed to create Bolt database.")
}

qs, err := newQuadStore(tmpDir, nil)
if qs == nil || err != nil {
t.Error("Failed to create bolt QuadStore.")
}
if s := qs.Size(); s != 0 {
t.Errorf("Unexpected size, got:%d expected:0", s)
}
qs.Close()

os.RemoveAll(tmpDir)
}

func makeBolt(t testing.TB) (graph.QuadStore, graph.Options, func()) {
tmpDir, err := ioutil.TempDir(os.TempDir(), "cayley_test_bolt2")
if err != nil {
t.Fatalf("Could not create working directory: %v", err)
}
err = createNewBolt(tmpDir, nil)
db, err := Create(tmpDir, nil)
if err != nil {
os.RemoveAll(tmpDir)
t.Fatal("Failed to create Bolt database.", err)
}
qs, err := newQuadStore(tmpDir, nil)
if qs == nil || err != nil {
os.RemoveAll(tmpDir)
t.Fatal("Failed to create Bolt QuadStore.")
}
return qs, nil, func() {
qs.Close()
return db, nil, func() {
db.Close()
os.RemoveAll(tmpDir)
}
}

func TestBoltAll(t *testing.T) {
graphtest.TestAll(t, makeBolt, &graphtest.Config{
SkipNodeDelAfterQuadDel: true,
SkipIntHorizon: true,
})
}

func TestOptimize(t *testing.T) {
qs, opts, closer := makeBolt(t)
defer closer()

graphtest.MakeWriter(t, qs, opts, graphtest.MakeQuadSet()...)

// With an linksto-fixed pair
fixed := qs.FixedIterator()
fixed.Add(qs.ValueOf(quad.Raw("F")))
fixed.Tagger().Add("internal")
lto := iterator.NewLinksTo(qs, fixed, quad.Object)

oldIt := lto.Clone()
newIt, ok := lto.Optimize()
if !ok {
t.Errorf("Failed to optimize iterator")
}
if newIt.Type() != kv.Type() {
t.Errorf("Optimized iterator type does not match original, got:%v expect:%v", newIt.Type(), kv.Type())
}

newQuads := graphtest.IteratedQuads(t, qs, newIt)
oldQuads := graphtest.IteratedQuads(t, qs, oldIt)
if !reflect.DeepEqual(newQuads, oldQuads) {
t.Errorf("Optimized iteration does not match original")
}

oldIt.Next()
oldResults := make(map[string]graph.Value)
oldIt.TagResults(oldResults)
newIt.Next()
newResults := make(map[string]graph.Value)
newIt.TagResults(newResults)
if !reflect.DeepEqual(newResults, oldResults) {
t.Errorf("Discordant tag results, new:%v old:%v", newResults, oldResults)
}
}

func TestBoltPaths(t *testing.T) {
pathtest.RunTestMorphisms(t, makeBolt)
kvtest.TestAll(t, makeBolt, nil)
}
Loading

0 comments on commit 89595f6

Please sign in to comment.