Skip to content

Commit

Permalink
feat: Persist schema version at time of commit (#1055)
Browse files Browse the repository at this point in the history
* Remove unused dag.GetSchemaID func

* Break up long lines

Will be added to shortly

* BREAKING - Remove unused SchemaId prop

Breaks some tests as although the property is always empty it is accounted for in CID generation.

* BREAKING - Remove unused Schema.Key prop

Note: The empty unused prop was used to construct some cids, removing it thus changes the cids even though it was always empty.

* Replace commit schemaId with versionKey

The primary driving change in this commit is the addition of SchemaVersionKey to CompositeDAGDelta in core/crdt/composite.go, this persists the schema state alongside the document change.  All other changes in this commit are to facilitate this change.

This commit got a bit more involved than I intended as getCollectionByVersionId was regenerating the schema id instead of taking it from the object it already had - possibly because the property on the object that it already had never actualy set the property.  This got a bit awkward as the id lives on the object it is generated from - when the prop was populated on regeneration of itself it was taken into account as part of the regenerated key causing it to not match the originally (saved) key.

Cids in the tests have been corrected in this commit for the production code changes in this commit and the prior BREAKING commits (to save fixing them 3 times).
  • Loading branch information
AndrewSisley authored Jan 26, 2023
1 parent 93ea57a commit 825e749
Show file tree
Hide file tree
Showing 30 changed files with 172 additions and 141 deletions.
14 changes: 11 additions & 3 deletions client/descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,17 @@ func (index IndexDescription) IDString() string {

// SchemaDescription describes a Schema and its associated metadata.
type SchemaDescription struct {
ID uint32
Name string
Key []byte // DocKey for versioned source schema
// SchemaID is the version agnostic identifier for this schema.
//
// It remains constant throughout the lifetime of this schema.
SchemaID string

// VersionID is the version-specific identifier for this schema.
//
// It is generated on mutation of this schema and can be used to uniquely
// identify a schema at a specific version.
VersionID string
Name string
// Schema schema.Schema
FieldIDs []uint32
Fields []FieldDescription
Expand Down
47 changes: 24 additions & 23 deletions core/crdt/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@ var (

// CompositeDAGDelta represents a delta-state update made of sub-MerkleCRDTs.
type CompositeDAGDelta struct {
SchemaID string
Priority uint64
Data []byte
SubDAGs []core.DAGLink
// SchemaVersionID is the schema version datastore key at the time of commit.
//
// It can be used to identify the collection datastructure state at time of commit.
SchemaVersionID string
Priority uint64
Data []byte
SubDAGs []core.DAGLink
}

// GetPriority gets the current priority for this delta.
Expand All @@ -55,10 +58,10 @@ func (delta *CompositeDAGDelta) Marshal() ([]byte, error) {
buf := bytes.NewBuffer(nil)
enc := codec.NewEncoder(buf, h)
err := enc.Encode(struct {
SchemaID string
Priority uint64
Data []byte
}{delta.SchemaID, delta.Priority, delta.Data})
SchemaVersionID string
Priority uint64
Data []byte
}{delta.SchemaVersionID, delta.Priority, delta.Data})
if err != nil {
return nil, err
}
Expand All @@ -75,28 +78,26 @@ func (delta *CompositeDAGDelta) Links() []core.DAGLink {
return delta.SubDAGs
}

// GetSchemaID returns the schema ID for this delta.
func (delta *CompositeDAGDelta) GetSchemaID() string {
return delta.SchemaID
}

// CompositeDAG is a CRDT structure that is used to track a collection of sub MerkleCRDTs.
type CompositeDAG struct {
store datastore.DSReaderWriter
key core.DataStoreKey
schemaID string
store datastore.DSReaderWriter
key core.DataStoreKey
// schemaVersionKey is the schema version datastore key at the time of commit.
//
// It can be used to identify the collection datastructure state at time of commit.
schemaVersionKey core.CollectionSchemaVersionKey
}

func NewCompositeDAG(
store datastore.DSReaderWriter,
schemaID string,
schemaVersionKey core.CollectionSchemaVersionKey,
namespace core.Key,
key core.DataStoreKey,
) CompositeDAG {
return CompositeDAG{
store: store,
key: key,
schemaID: schemaID,
store: store,
key: key,
schemaVersionKey: schemaVersionKey,
}
}

Expand All @@ -117,9 +118,9 @@ func (c CompositeDAG) Set(patch []byte, links []core.DAGLink) *CompositeDAGDelta
return strings.Compare(links[i].Cid.String(), links[j].Cid.String()) < 0
})
return &CompositeDAGDelta{
Data: patch,
SubDAGs: links,
SchemaID: c.schemaID,
Data: patch,
SubDAGs: links,
SchemaVersionID: c.schemaVersionKey.SchemaVersionId,
}
}

Expand Down
57 changes: 20 additions & 37 deletions db/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,6 @@ func (db *db) CreateCollection(
return nil, err
}

buf, err := json.Marshal(col.desc)
if err != nil {
return nil, err
}

// Local elements such as secondary indexes should be excluded
// from the (global) schemaId.
globalSchemaBuf, err := json.Marshal(struct {
Expand All @@ -154,26 +149,36 @@ func (db *db) CreateCollection(
if err != nil {
return nil, err
}
schemaId := cid.String()
col.schemaID = schemaId
schemaID := cid.String()
col.schemaID = schemaID

// For new schemas the initial version id will match the schema id
schemaVersionId := schemaId
collectionSchemaVersionKey := core.NewCollectionSchemaVersionKey(schemaVersionId)
schemaVersionID := schemaID

col.desc.Schema.VersionID = schemaVersionID
col.desc.Schema.SchemaID = schemaID

// buffer must include all the ids, as it is saved and loaded from the store later.
buf, err := json.Marshal(col.desc)
if err != nil {
return nil, err
}

collectionSchemaVersionKey := core.NewCollectionSchemaVersionKey(schemaVersionID)
// Whilst the schemaVersionKey is global, the data persisted at the key's location
// is local to the node (the global only elements are not useful beyond key generation).
err = db.systemstore().Put(ctx, collectionSchemaVersionKey.ToDS(), buf)
if err != nil {
return nil, err
}

collectionSchemaKey := core.NewCollectionSchemaKey(schemaId)
err = db.systemstore().Put(ctx, collectionSchemaKey.ToDS(), []byte(schemaVersionId))
collectionSchemaKey := core.NewCollectionSchemaKey(schemaID)
err = db.systemstore().Put(ctx, collectionSchemaKey.ToDS(), []byte(schemaVersionID))
if err != nil {
return nil, err
}

err = db.systemstore().Put(ctx, collectionKey.ToDS(), []byte(schemaVersionId))
err = db.systemstore().Put(ctx, collectionKey.ToDS(), []byte(schemaVersionID))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -207,33 +212,11 @@ func (db *db) getCollectionByVersionId(ctx context.Context, schemaVersionId stri
return nil, err
}

buf, err = json.Marshal(struct {
Name string
Schema client.SchemaDescription
}{desc.Name, desc.Schema})
if err != nil {
return nil, err
}

// add a reference to this DB by desc hash
cid, err := core.NewSHA256CidV1(buf)
if err != nil {
return nil, err
}

sid := cid.String()
log.Debug(
ctx,
"Retrieved collection",
logging.NewKV("Name", desc.Name),
logging.NewKV("ID", sid),
)

return &collection{
db: db,
desc: desc,
colID: desc.ID,
schemaID: sid,
schemaID: desc.Schema.SchemaID,
}, nil
}

Expand Down Expand Up @@ -838,7 +821,7 @@ func (c *collection) saveValueToMerkleCRDT(
case client.LWW_REGISTER:
datatype, err := c.db.crdtFactory.InstanceWithStores(
txn,
c.schemaID,
core.NewCollectionSchemaVersionKey(c.Schema().VersionID),
c.db.events.Updates,
ctype,
key,
Expand All @@ -863,7 +846,7 @@ func (c *collection) saveValueToMerkleCRDT(
key = key.WithFieldId(core.COMPOSITE_NAMESPACE)
datatype, err := c.db.crdtFactory.InstanceWithStores(
txn,
c.SchemaID(),
core.NewCollectionSchemaVersionKey(c.Schema().VersionID),
c.db.events.Updates,
ctype,
key,
Expand Down
8 changes: 7 additions & 1 deletion db/fetcher/versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,13 @@ func (vf *VersionedFetcher) processNode(
if err != nil {
return err
}
mcrdt, err = crdt.DefaultFactory.InstanceWithStores(vf.store, "", events.EmptyUpdateChannel, ctype, key)
mcrdt, err = crdt.DefaultFactory.InstanceWithStores(
vf.store,
core.CollectionSchemaVersionKey{},
events.EmptyUpdateChannel,
ctype,
key,
)
if err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion db/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ func newTestCollectionDescription() client.CollectionDescription {
Name: "users",
ID: uint32(1),
Schema: client.SchemaDescription{
ID: uint32(1),
FieldIDs: []uint32{1, 2, 3},
Fields: []client.FieldDescription{
{
Expand Down
3 changes: 3 additions & 0 deletions docs/data_format_changes/i1026-persist-schema-state.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Persist schema version at time of commit

A handful of changes were made to the client.SchemaDescription object which caused cid calculations to change.
6 changes: 3 additions & 3 deletions merkle/crdt/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var (
compFactoryFn = MerkleCRDTFactory(
func(
mstore datastore.MultiStore,
schemaID string,
schemaID core.CollectionSchemaVersionKey,
uCh events.UpdateChannel,
) MerkleCRDTInitFn {
return func(key core.DataStoreKey) MerkleCRDT {
Expand Down Expand Up @@ -65,14 +65,14 @@ func NewMerkleCompositeDAG(
datastore datastore.DSReaderWriter,
headstore datastore.DSReaderWriter,
dagstore datastore.DAGStore,
schemaID string,
schemaVersionKey core.CollectionSchemaVersionKey,
uCh events.UpdateChannel,
ns,
key core.DataStoreKey,
) *MerkleCompositeDAG {
compositeDag := corecrdt.NewCompositeDAG(
datastore,
schemaID,
schemaVersionKey,
ns,
key, /* stuff like namespace and ID */
)
Expand Down
10 changes: 5 additions & 5 deletions merkle/crdt/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type MerkleCRDTInitFn func(core.DataStoreKey) MerkleCRDT
// Returns a MerkleCRDTInitFn with all the necessary stores set.
type MerkleCRDTFactory func(
mstore datastore.MultiStore,
schemaID string,
schemaVersionKey core.CollectionSchemaVersionKey,
uCh events.UpdateChannel,
) MerkleCRDTInitFn

Expand Down Expand Up @@ -62,7 +62,7 @@ func (factory *Factory) Register(t client.CType, fn *MerkleCRDTFactory) error {
// Instance and execute the registered factory function for a given MerkleCRDT type
// supplied with all the current stores (passed in as a datastore.MultiStore object).
func (factory Factory) Instance(
schemaID string,
schemaVersionKey core.CollectionSchemaVersionKey,
uCh events.UpdateChannel,
t client.CType,
key core.DataStoreKey,
Expand All @@ -73,14 +73,14 @@ func (factory Factory) Instance(
if err != nil {
return nil, err
}
return (*fn)(factory, schemaID, uCh)(key), nil
return (*fn)(factory, schemaVersionKey, uCh)(key), nil
}

// InstanceWithStore executes the registered factory function for the given MerkleCRDT type
// with the additional supplied datastore.MultiStore instead of the saved one on the main Factory.
func (factory Factory) InstanceWithStores(
store datastore.MultiStore,
schemaID string,
schemaVersionKey core.CollectionSchemaVersionKey,
uCh events.UpdateChannel,
t client.CType,
key core.DataStoreKey,
Expand All @@ -90,7 +90,7 @@ func (factory Factory) InstanceWithStores(
return nil, err
}

return (*fn)(store, schemaID, uCh)(key), nil
return (*fn)(store, schemaVersionKey, uCh)(key), nil
}

func (factory Factory) getRegisteredFactory(t client.CType) (*MerkleCRDTFactory, error) {
Expand Down
39 changes: 32 additions & 7 deletions merkle/crdt/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,12 @@ func TestFactoryInstanceMissing(t *testing.T) {
m := newStores()
f := NewFactory(m)

_, err := f.Instance("", events.EmptyUpdateChannel, client.LWW_REGISTER, core.MustNewDataStoreKey("/1/0/MyKey"))
_, err := f.Instance(
core.CollectionSchemaVersionKey{},
events.EmptyUpdateChannel,
client.LWW_REGISTER,
core.MustNewDataStoreKey("/1/0/MyKey"),
)
assert.Equal(t, err, ErrFactoryTypeNoExist)
}

Expand All @@ -138,7 +143,12 @@ func TestBlankFactoryInstanceWithLWWRegister(t *testing.T) {
f1.Register(client.LWW_REGISTER, &lwwFactoryFn)
f := f1.WithStores(m)

crdt, err := f.Instance("", events.EmptyUpdateChannel, client.LWW_REGISTER, core.MustNewDataStoreKey("/1/0/MyKey"))
crdt, err := f.Instance(
core.CollectionSchemaVersionKey{},
events.EmptyUpdateChannel,
client.LWW_REGISTER,
core.MustNewDataStoreKey("/1/0/MyKey"),
)
assert.NoError(t, err)

_, ok := crdt.(*MerkleLWWRegister)
Expand All @@ -151,7 +161,12 @@ func TestBlankFactoryInstanceWithCompositeRegister(t *testing.T) {
f1.Register(client.COMPOSITE, &compFactoryFn)
f := f1.WithStores(m)

crdt, err := f.Instance("", events.EmptyUpdateChannel, client.COMPOSITE, core.MustNewDataStoreKey("/1/0/MyKey"))
crdt, err := f.Instance(
core.CollectionSchemaVersionKey{},
events.EmptyUpdateChannel,
client.COMPOSITE,
core.MustNewDataStoreKey("/1/0/MyKey"),
)
assert.NoError(t, err)

_, ok := crdt.(*MerkleCompositeDAG)
Expand All @@ -163,7 +178,12 @@ func TestFullFactoryInstanceLWWRegister(t *testing.T) {
f := NewFactory(m)
f.Register(client.LWW_REGISTER, &lwwFactoryFn)

crdt, err := f.Instance("", events.EmptyUpdateChannel, client.LWW_REGISTER, core.MustNewDataStoreKey("/1/0/MyKey"))
crdt, err := f.Instance(
core.CollectionSchemaVersionKey{},
events.EmptyUpdateChannel,
client.LWW_REGISTER,
core.MustNewDataStoreKey("/1/0/MyKey"),
)
assert.NoError(t, err)

_, ok := crdt.(*MerkleLWWRegister)
Expand All @@ -175,7 +195,12 @@ func TestFullFactoryInstanceCompositeRegister(t *testing.T) {
f := NewFactory(m)
f.Register(client.COMPOSITE, &compFactoryFn)

crdt, err := f.Instance("", events.EmptyUpdateChannel, client.COMPOSITE, core.MustNewDataStoreKey("/1/0/MyKey"))
crdt, err := f.Instance(
core.CollectionSchemaVersionKey{},
events.EmptyUpdateChannel,
client.COMPOSITE,
core.MustNewDataStoreKey("/1/0/MyKey"),
)
assert.NoError(t, err)

_, ok := crdt.(*MerkleCompositeDAG)
Expand All @@ -186,7 +211,7 @@ func TestLWWRegisterFactoryFn(t *testing.T) {
ctx := context.Background()
m := newStores()
f := NewFactory(m) // here factory is only needed to satisfy datastore.MultiStore interface
crdt := lwwFactoryFn(f, "", events.EmptyUpdateChannel)(core.MustNewDataStoreKey("/1/0/MyKey"))
crdt := lwwFactoryFn(f, core.CollectionSchemaVersionKey{}, events.EmptyUpdateChannel)(core.MustNewDataStoreKey("/1/0/MyKey"))

lwwreg, ok := crdt.(*MerkleLWWRegister)
assert.True(t, ok)
Expand All @@ -199,7 +224,7 @@ func TestCompositeRegisterFactoryFn(t *testing.T) {
ctx := context.Background()
m := newStores()
f := NewFactory(m) // here factory is only needed to satisfy datastore.MultiStore interface
crdt := compFactoryFn(f, "", events.EmptyUpdateChannel)(core.MustNewDataStoreKey("/1/0/MyKey"))
crdt := compFactoryFn(f, core.CollectionSchemaVersionKey{}, events.EmptyUpdateChannel)(core.MustNewDataStoreKey("/1/0/MyKey"))

merkleReg, ok := crdt.(*MerkleCompositeDAG)
assert.True(t, ok)
Expand Down
Loading

0 comments on commit 825e749

Please sign in to comment.