Skip to content

Commit

Permalink
kv: simplify interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
dennwc committed Aug 9, 2017
1 parent 89595f6 commit 179c86e
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 123 deletions.
85 changes: 57 additions & 28 deletions graph/bolt2/bolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package bolt2

import (
"bytes"
"context"
"fmt"
"os"
"path/filepath"

Expand Down Expand Up @@ -83,15 +85,8 @@ func (db *DB) Type() string {
func (db *DB) Close() error {
return db.DB.Close()
}
func (db *DB) View() (kv.BucketTx, error) {
tx, err := db.DB.Begin(false)
if err != nil {
return nil, err
}
return &Tx{Tx: tx}, nil
}
func (db *DB) Update() (kv.BucketTx, error) {
tx, err := db.DB.Begin(true)
func (db *DB) Tx(update bool) (kv.BucketTx, error) {
tx, err := db.DB.Begin(update)
if err != nil {
return nil, err
}
Expand All @@ -108,22 +103,26 @@ func (tx *Tx) Commit() error {
func (tx *Tx) Rollback() error {
return tx.Tx.Rollback()
}
func (tx *Tx) Bucket(name []byte) kv.Bucket {
b := tx.Tx.Bucket(name)
if b == nil {
return nil
}
return &Bucket{b}
}
func (tx *Tx) CreateBucket(name []byte, excl bool) (kv.Bucket, error) {
func (tx *Tx) Bucket(name []byte, op kv.Op) (kv.Bucket, error) {
var (
b *bolt.Bucket
err error
)
if excl {
switch op {
case kv.OpGet:
b = tx.Tx.Bucket(name)
if b == nil {
return nil, kv.ErrNoBucket
}
case kv.OpCreate:
b, err = tx.Tx.CreateBucket(name)
} else {
if err == bolt.ErrBucketExists {
return nil, kv.ErrBucketExists
}
case kv.OpUpsert:
b, err = tx.Tx.CreateBucketIfNotExists(name)
default:
return nil, fmt.Errorf("unsupported operation")
}
if err != nil {
return nil, err
Expand All @@ -137,20 +136,50 @@ type Bucket struct {
Bucket *bolt.Bucket
}

func (b *Bucket) Get(k []byte) []byte { return b.Bucket.Get(k) }
func (b *Bucket) Get(k []byte) ([]byte, error) {
v := b.Bucket.Get(k)
if v == nil {
return nil, kv.ErrNotFound
}
return v, nil
}
func (b *Bucket) Put(k, v []byte) error { return b.Bucket.Put(k, v) }
func (b *Bucket) ForEach(pref []byte, fnc func(k, v []byte) error) error {
if pref == nil {
return b.Bucket.ForEach(fnc)
func (b *Bucket) Del(k []byte) error { return b.Bucket.Delete(k) }
func (b *Bucket) Scan(pref []byte) kv.KVIterator {
return &Iterator{b: b, pref: pref}
}

type Iterator struct {
b *Bucket
pref []byte
c *bolt.Cursor
k, v []byte
}

func (it *Iterator) Next(ctx context.Context) bool {
if it.b == nil {
return false
}
c := b.Bucket.Cursor()
for k, v := c.Seek(pref); bytes.HasPrefix(k, pref); k, v = c.Next() {
if err := fnc(k, v); err != nil {
return err
if it.c == nil {
it.c = it.b.Bucket.Cursor()
if len(it.pref) == 0 {
it.k, it.v = it.c.First()
} else {
it.k, it.v = it.c.Seek(it.pref)
}
} else {
it.k, it.v = it.c.Next()
}
ok := it.k != nil && bytes.HasPrefix(it.k, it.pref)
if !ok {
it.b = nil
}
return nil
return ok
}
func (it *Iterator) Key() []byte { return it.k }
func (it *Iterator) Val() []byte { return it.v }
func (it *Iterator) Err() error { return nil }
func (it *Iterator) Close() error { return nil }

func (b *Bucket) SetFillPercent(v float64) {
b.Bucket.FillPercent = v
Expand Down
130 changes: 90 additions & 40 deletions graph/kv/indexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package kv

import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
Expand Down Expand Up @@ -57,12 +58,13 @@ func (qs *QuadStore) createBuckets(upfront bool) error {
err := Update(qs.db, func(tx BucketTx) error {
var err error
for _, index := range buckets {
_, err = tx.CreateBucket(index, false)
_, err = tx.Bucket(index, OpCreate)
if err != nil {
return fmt.Errorf("could not create bucket %s: %s", string(index), err)
}
}
if f, ok := tx.Bucket(logIndex).(FillBucket); ok {
b, _ := tx.Bucket(logIndex, OpGet)
if f, ok := b.(FillBucket); ok {
f.SetFillPercent(0.9)
}
//tx.Bucket(valueIndex).FillPercent = 0.4
Expand All @@ -78,7 +80,7 @@ func (qs *QuadStore) createBuckets(upfront bool) error {
err := Update(qs.db, func(tx BucketTx) error {
var err error
for j := 0; j < 256; j++ {
_, err = tx.CreateBucket(bucketFor(byte(i), byte(j)), false)
_, err = tx.Bucket(bucketFor(byte(i), byte(j)), OpCreate)
if err != nil {
return fmt.Errorf("could not create subbucket %d %d : %s", i, j, err)
}
Expand All @@ -102,11 +104,14 @@ func (qs *QuadStore) writeHorizonAndSize(tx BucketTx, horizon, size int64) error
if horizon < 0 {
horizon, size = qs.horizon, qs.size
}
b := tx.Bucket(metaBucket)
b, err := tx.Bucket(metaBucket, OpGet)
if err != nil {
return err
}

buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, uint64(size))
err := b.Put([]byte("size"), buf)
err = b.Put([]byte("size"), buf)

if err != nil {
clog.Errorf("Couldn't write size!")
Expand All @@ -127,12 +132,13 @@ func (qs *QuadStore) writeHorizonAndSize(tx BucketTx, horizon, size int64) error
func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreOpts graph.IgnoreOpts) error {
qs.writer.Lock()
defer qs.writer.Unlock()
tx, err := qs.db.Update()
tx, err := qs.db.Tx(true)
if err != nil {
return err
}
defer tx.Rollback()
if f, ok := tx.Bucket(logIndex).(FillBucket); ok {
b, _ := tx.Bucket(logIndex, OpGet)
if f, ok := b.(FillBucket); ok {
f.SetFillPercent(0.9)
}
qs.mu.RLock()
Expand All @@ -149,7 +155,10 @@ nextDelta:
if val == nil {
continue
}
v := qs.resolveQuadValue(tx, val)
v, err := qs.resolveQuadValue(tx, val)
if err != nil {
return err
}
if v == 0 {
// Not found
if d.Action == graph.Delete {
Expand Down Expand Up @@ -248,7 +257,7 @@ func (qs *QuadStore) indexNode(tx BucketTx, p *proto.Primitive, val quad.Value)
qs.bufLock.Lock()
defer qs.bufLock.Unlock()
quad.HashTo(val, qs.hashBuf)
bucket, err := tx.CreateBucket(bucketFor(qs.hashBuf[0], qs.hashBuf[1]), false)
bucket, err := tx.Bucket(bucketFor(qs.hashBuf[0], qs.hashBuf[1]), OpUpsert)
if err != nil {
return err
}
Expand Down Expand Up @@ -292,9 +301,17 @@ func (qs *QuadStore) markAsDead(tx BucketTx, id uint64) error {
}

func (qs *QuadStore) getBucketIndex(tx BucketTx, bucket []byte, key uint64) ([]uint64, error) {
b := tx.Bucket([]byte(bucket))
b, err := tx.Bucket([]byte(bucket), OpGet)
if err != nil {
return nil, err
}
kbytes := uint64KeyBytes(key)
v := b.Get(kbytes)
v, err := b.Get(kbytes)
if err == ErrNotFound {
return nil, nil
} else if err != nil {
return nil, err
}
return decodeIndex(v)
}

Expand Down Expand Up @@ -397,11 +414,17 @@ func (qs *QuadStore) addToMapBucket(tx BucketTx, bucket string, key, value uint6
func (qs *QuadStore) flushMapBucket(tx BucketTx) error {
kbytes := make([]byte, 8)
for bucket, m := range qs.mapBucket {
var b Bucket
var bname []byte
if bucket == "sub" {
b = tx.Bucket(subjectIndex)
bname = subjectIndex
} else if bucket == "obj" {
b = tx.Bucket(objectIndex)
bname = objectIndex
} else {
return fmt.Errorf("unexpected bucket name: %q", bucket)
}
b, err := tx.Bucket(bname, OpGet)
if err != nil {
return err
}
keys := make(Int64Set, len(m))
i := 0
Expand All @@ -413,9 +436,14 @@ func (qs *QuadStore) flushMapBucket(tx BucketTx) error {
for _, k := range keys {
l := m[k]
binary.BigEndian.PutUint64(kbytes, k)
bytelist := b.Get(kbytes)
bytes := appendIndex(bytelist, l)
err := b.Put(kbytes, bytes)
bytelist, err := b.Get(kbytes)
if err == ErrNotFound {
err = nil
} else if err != nil {
return err
}
buf := appendIndex(bytelist, l)
err = b.Put(kbytes, buf)
if err != nil {
return err
}
Expand All @@ -430,11 +458,15 @@ func (qs *QuadStore) indexSchema(tx BucketTx, p *proto.Primitive) error {
}

func (qs *QuadStore) addToLog(tx BucketTx, p *proto.Primitive) error {
b, err := p.Marshal()
buf, err := p.Marshal()
if err != nil {
return err
}
return tx.Bucket(logIndex).Put(uint64KeyBytes(p.ID), b)
b, err := tx.Bucket(logIndex, OpGet)
if err != nil {
return err
}
return b.Put(uint64KeyBytes(p.ID), buf)
}

func (qs *QuadStore) createNodePrimitive(v quad.Value) (*proto.Primitive, error) {
Expand All @@ -448,31 +480,37 @@ func (qs *QuadStore) createNodePrimitive(v quad.Value) (*proto.Primitive, error)
return p, nil
}

func (qs *QuadStore) resolveQuadValue(tx BucketTx, v quad.Value) uint64 {
func (qs *QuadStore) resolveQuadValue(tx BucketTx, v quad.Value) (uint64, error) {
var isIRI bool
if iri, ok := v.(quad.IRI); ok {
isIRI = true
if x, ok := qs.valueLRU.Get(string(iri)); ok {
return x.(uint64)
return x.(uint64), nil
}
}

qs.bufLock.Lock()
defer qs.bufLock.Unlock()
quad.HashTo(v, qs.hashBuf)
buck := tx.Bucket(bucketFor(qs.hashBuf[0], qs.hashBuf[1]))
if buck == nil {
return 0
b, err := tx.Bucket(bucketFor(qs.hashBuf[0], qs.hashBuf[1]), OpGet)
if err == ErrNoBucket {
return 0, nil
} else if err != nil {
return 0, err
}
val := buck.Get(qs.hashBuf)
if val == nil {
return 0
val, err := b.Get(qs.hashBuf)
if err == ErrNotFound {
return 0, nil
} else if err != nil {
return 0, err
} else if val == nil {
return 0, nil
}
out, _ := binary.Uvarint(val)
if isIRI {
qs.valueLRU.Put(string(v.(quad.IRI)), out)
}
return out
return out, nil
}

func uint64toBytes(x uint64) []byte {
Expand All @@ -493,37 +531,49 @@ func uint64KeyBytes(x uint64) []byte {

func (qs *QuadStore) getPrimitiveFromLog(tx BucketTx, k uint64) (*proto.Primitive, error) {
p := &proto.Primitive{}
b := tx.Bucket(logIndex).Get(uint64KeyBytes(k))
if b == nil {
b, err := tx.Bucket(logIndex, OpGet)
if err != nil {
return nil, err
}
v, err := b.Get(uint64KeyBytes(k))
if err != nil && err != ErrNotFound {
return nil, err
} else if v == nil {
return p, fmt.Errorf("no such log entry")
}
err := p.Unmarshal(b)
err = p.Unmarshal(v)
return p, err
}

func (qs *QuadStore) initBloomFilter() error {
qs.exists = boom.NewDeletableBloomFilter(100*1000*1000, 120, 0.05)
ctx := context.TODO()
qs.bufLock.Lock()
defer qs.bufLock.Unlock()
return View(qs.db, func(tx BucketTx) error {
p := proto.Primitive{}
b := tx.Bucket(logIndex)
return b.ForEach(nil, func(k, v []byte) error {
b, err := tx.Bucket(logIndex, OpGet)
if err != nil {
return err
}
it := b.Scan(nil)
defer it.Close()
for it.Next(ctx) {
v := it.Key()
p = proto.Primitive{}
err := p.Unmarshal(v)
err = p.Unmarshal(v)
if err != nil {
return err
}
if p.IsNode() {
return nil
}
if p.Deleted {
return nil
continue
} else if p.Deleted {
continue
}
writePrimToBuf(&p, qs.bloomBuf)
qs.exists.Add(qs.bloomBuf)
return nil
})
}
return it.Err()
})
}

Expand Down
Loading

0 comments on commit 179c86e

Please sign in to comment.