Skip to content

Commit

Permalink
graph: rewrite pk; remove horizon and ts from delta; rewrite memstore
Browse files Browse the repository at this point in the history
Primary key was broken in many ways, for example it carried mutex,
but all updates happened with Next that copied value (with mutex).
Most backends are able to provide their own pk for log entries, so it
better to use it instead of our own implementation.

Delta was changed in incompatible way: ID and Timestamp was removed.
As described above, ID shold be set by QS after TX, while TS should
not be sent from client. Both values are now set in backend code.

Memstore was rewritten to reflect changes. It was also updated to
use Primitives instead of nodes/quads.
  • Loading branch information
dennwc committed Aug 11, 2017
1 parent 179c86e commit 524c8e2
Show file tree
Hide file tree
Showing 29 changed files with 689 additions and 548 deletions.
9 changes: 5 additions & 4 deletions graph/bolt/bolt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func makeBolt(t testing.TB) (graph.QuadStore, graph.Options, func()) {

func TestBoltAll(t *testing.T) {
graphtest.TestAll(t, makeBolt, &graphtest.Config{
NoPrimitives: true,
SkipNodeDelAfterQuadDel: true,
})
}
Expand Down Expand Up @@ -131,8 +132,8 @@ func TestLoadDatabase(t *testing.T) {

//Test horizon
horizon := qs.Horizon()
if horizon.Int() != 1 {
t.Errorf("Unexpected horizon value, got:%d expect:1", horizon.Int())
if v, _ := horizon.Int(); v != 1 {
t.Errorf("Unexpected horizon value, got:%d expect:1", v)
}

w.AddQuadSet(graphtest.MakeQuadSet())
Expand All @@ -143,8 +144,8 @@ func TestLoadDatabase(t *testing.T) {
t.Errorf("Unexpected quadstore size, got:%d expect:5", s)
}
horizon = qs.Horizon()
if horizon.Int() != 12 {
t.Errorf("Unexpected horizon value, got:%d expect:12", horizon.Int())
if v, _ := horizon.Int(); v != 12 {
t.Errorf("Unexpected horizon value, got:%d expect:12", v)
}

w.RemoveQuad(quad.MakeRaw(
Expand Down
5 changes: 4 additions & 1 deletion graph/bolt/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package bolt
import (
"encoding/json"
"fmt"
"time"

"github.com/boltdb/bolt"
"github.com/cayleygraph/cayley/clog"
Expand Down Expand Up @@ -94,13 +95,15 @@ func upgrade1To2(db *bolt.DB) error {
fmt.Println("Upgrading bucket", string(logBucket))
lb := tx.Bucket(logBucket)
c := lb.Cursor()
id := int64(0)
for k, v := c.First(); k != nil; k, v = c.Next() {
var delta graph.Delta
err := json.Unmarshal(v, &delta)
if err != nil {
return err
}
newd := deltaToProto(delta)
id++
newd := deltaToProto(delta, id, time.Now())
data, err := newd.Marshal()
if err != nil {
return err
Expand Down
22 changes: 13 additions & 9 deletions graph/bolt/quadstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"sync"
"time"

"github.com/boltdb/bolt"
"github.com/cayleygraph/cayley/clog"
Expand Down Expand Up @@ -228,11 +229,11 @@ var (
metaBucket = []byte("meta")
)

func deltaToProto(delta graph.Delta) proto.LogDelta {
func deltaToProto(delta graph.Delta, id int64, t time.Time) proto.LogDelta {
var newd proto.LogDelta
newd.ID = uint64(delta.ID.Int())
newd.ID = uint64(id)
newd.Action = int32(delta.Action)
newd.Timestamp = delta.Timestamp.UnixNano()
newd.Timestamp = t.UnixNano()
newd.Quad = pquads.MakeQuad(delta.Quad)
return newd
}
Expand All @@ -243,26 +244,29 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreOpts graph.IgnoreOp
oldSize := qs.size
oldHorizon := qs.horizon
err := qs.db.Update(func(tx *bolt.Tx) error {
id, t := oldHorizon+1, time.Now()
b := tx.Bucket(logBucket)
b.FillPercent = localFillPercent
resizeMap := make(map[quad.Value]int64)
sizeChange := int64(0)
for _, d := range deltas {
for i, d := range deltas {
if d.Action != graph.Add && d.Action != graph.Delete {
return &graph.DeltaError{Delta: d, Err: graph.ErrInvalidAction}
}
p := deltaToProto(d)
di := id + int64(i)
p := deltaToProto(d, di, t)
bytes, err := p.Marshal()
if err != nil {
return &graph.DeltaError{Delta: d, Err: err}
}
err = b.Put(qs.createDeltaKeyFor(d.ID.Int()), bytes)
err = b.Put(qs.createDeltaKeyFor(di), bytes)
if err != nil {
return &graph.DeltaError{Delta: d, Err: err}
}
}
for _, d := range deltas {
err := qs.buildQuadWrite(tx, d.Quad, d.ID.Int(), d.Action == graph.Add)
for i, d := range deltas {
di := id + int64(i)
err := qs.buildQuadWrite(tx, d.Quad, di, d.Action == graph.Add)
if err != nil {
if err == graph.ErrQuadExists && ignoreOpts.IgnoreDup {
continue
Expand All @@ -283,7 +287,7 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreOpts graph.IgnoreOp
resizeMap[d.Quad.Label] += delta
}
sizeChange += delta
qs.horizon = d.ID.Int()
qs.horizon = di
}
for k, v := range resizeMap {
if v != 0 {
Expand Down
52 changes: 28 additions & 24 deletions graph/gaedatastore/quadstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"math"
"net/http"
"time"

"github.com/cayleygraph/cayley/clog"

Expand Down Expand Up @@ -64,12 +65,12 @@ func (t Token) Key() interface{} { return t }

type QuadEntry struct {
Hash string
Added []string `datastore:",noindex"`
Deleted []string `datastore:",noindex"`
Subject string `datastore:"subject"`
Predicate string `datastore:"predicate"`
Object string `datastore:"object"`
Label string `datastore:"label"`
Added []int64 `datastore:",noindex"`
Deleted []int64 `datastore:",noindex"`
Subject string `datastore:"subject"`
Predicate string `datastore:"predicate"`
Object string `datastore:"object"`
Label string `datastore:"label"`
}

type NodeEntry struct {
Expand All @@ -78,7 +79,6 @@ type NodeEntry struct {
}

type LogEntry struct {
LogID string
Action string
Key string
Timestamp int64
Expand Down Expand Up @@ -123,8 +123,8 @@ func (qs *QuadStore) createKeyForMetadata() *datastore.Key {
return qs.createKeyFromToken(&Token{"metadata", "metadataentry"})
}

func (qs *QuadStore) createKeyForLog(deltaID graph.PrimaryKey) *datastore.Key {
return datastore.NewKey(qs.context, "logentry", deltaID.String(), 0, nil)
func (qs *QuadStore) createKeyForLog() *datastore.Key {
return datastore.NewKey(qs.context, "logentry", "", 0, nil)
}

func (qs *QuadStore) createKeyFromToken(t *Token) *datastore.Key {
Expand Down Expand Up @@ -210,7 +210,7 @@ func (qs *QuadStore) ApplyDeltas(in []graph.Delta, ignoreOpts graph.IgnoreOpts)
if len(toKeep) == 0 {
return nil
}
err := qs.updateLog(toKeep)
ids, err := qs.updateLog(toKeep)
if err != nil {
clog.Errorf("Updating log failed %v", err)
return err
Expand All @@ -220,7 +220,7 @@ func (qs *QuadStore) ApplyDeltas(in []graph.Delta, ignoreOpts graph.IgnoreOpts)
clog.Infof("Existence verified. Proceeding.")
}

quadsAdded, err := qs.updateQuads(toKeep)
quadsAdded, err := qs.updateQuads(toKeep, ids)
if err != nil {
clog.Errorf("UpdateQuads failed %v", err)
return err
Expand Down Expand Up @@ -298,7 +298,7 @@ func (qs *QuadStore) updateNodes(in []graph.Delta) (int64, error) {
return nodesAdded, nil
}

func (qs *QuadStore) updateQuads(in []graph.Delta) (int64, error) {
func (qs *QuadStore) updateQuads(in []graph.Delta, ids []int64) (int64, error) {
keys := make([]*datastore.Key, 0, len(in))
for _, d := range in {
keys = append(keys, qs.createKeyForQuad(d.Quad))
Expand All @@ -322,10 +322,10 @@ func (qs *QuadStore) updateQuads(in []graph.Delta) (int64, error) {

// If the quad exists the Added[] will be non-empty
if in[x].Action == graph.Add {
foundQuads[k].Added = append(foundQuads[k].Added, in[x].ID.String())
foundQuads[k].Added = append(foundQuads[k].Added, ids[x])
quadCount += 1
} else {
foundQuads[k].Deleted = append(foundQuads[k].Deleted, in[x].ID.String())
foundQuads[k].Deleted = append(foundQuads[k].Deleted, ids[x])
quadCount -= 1
}
}
Expand Down Expand Up @@ -359,13 +359,13 @@ func (qs *QuadStore) updateMetadata(quadsAdded int64, nodesAdded int64) error {
return err
}

func (qs *QuadStore) updateLog(in []graph.Delta) error {
func (qs *QuadStore) updateLog(in []graph.Delta) ([]int64, error) {
if qs.context == nil {
err := errors.New("Error updating log, context is nil, graph not correctly initialised")
return err
return nil, err
}
if len(in) == 0 {
return errors.New("Nothing to log")
return nil, errors.New("Nothing to log")
}
logEntries := make([]LogEntry, 0, len(in))
logKeys := make([]*datastore.Key, 0, len(in))
Expand All @@ -378,20 +378,24 @@ func (qs *QuadStore) updateLog(in []graph.Delta) error {
}

entry := LogEntry{
LogID: d.ID.String(),
Action: action,
Key: qs.createKeyForQuad(d.Quad).String(),
Timestamp: d.Timestamp.UnixNano(),
Timestamp: time.Now().UnixNano(),
}
logEntries = append(logEntries, entry)
logKeys = append(logKeys, qs.createKeyForLog(d.ID))
logKeys = append(logKeys, qs.createKeyForLog())
}

_, err := datastore.PutMulti(qs.context, logKeys, logEntries)
ids, err := datastore.PutMulti(qs.context, logKeys, logEntries)
if err != nil {
clog.Errorf("Error updating log: %v", err)
return nil, err
}
return err
out := make([]int64, 0, len(ids))
for _, id := range ids {
out = append(out, id.IntID())
}
return out, nil
}

func (qs *QuadStore) QuadIterator(dir quad.Direction, v graph.Value) graph.Iterator {
Expand Down Expand Up @@ -504,12 +508,12 @@ func (qs *QuadStore) Horizon() graph.PrimaryKey {
// Query log for last entry...
q := datastore.NewQuery("logentry").Order("-Timestamp").Limit(1)
var logEntries []LogEntry
_, err := q.GetAll(qs.context, &logEntries)
keys, err := q.GetAll(qs.context, &logEntries)
if err != nil || len(logEntries) == 0 {
// Error fetching horizon, probably graph is empty
return graph.NewUniqueKey("")
}
return graph.NewUniqueKey(logEntries[0].LogID)
return graph.NewSequentialKey(keys[0].IntID())
}

func compareTokens(a, b graph.Value) bool {
Expand Down
1 change: 1 addition & 0 deletions graph/gaedatastore/quadstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func makeGAE(t testing.TB) (graph.QuadStore, graph.Options, func()) {

func TestGAEAll(t *testing.T) {
graphtest.TestAll(t, makeGAE, &graphtest.Config{
NoPrimitives: true,
SkipIntHorizon: true,
UnTyped: true,
})
Expand Down
Loading

0 comments on commit 524c8e2

Please sign in to comment.