Skip to content

Commit

Permalink
feat: Add commits fieldName and fieldId fields (#1451)
Browse files Browse the repository at this point in the history
## Relevant issue(s)

Resolves #851  #852

## Description

Adds commits fieldName and fieldId fields, and allows grouping by them.
Also fixes a bug in planner where errors in Init would result in
unhelpful iterator not closed errors.
  • Loading branch information
AndrewSisley authored May 5, 2023
1 parent 18221b9 commit 1dcfcae
Show file tree
Hide file tree
Showing 44 changed files with 695 additions and 165 deletions.
13 changes: 13 additions & 0 deletions client/descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@ func (col CollectionDescription) GetField(name string) (FieldDescription, bool)
return FieldDescription{}, false
}

// GetFieldByID searches for a field with the given ID. If such a field is found it
// will return it and true, if it is not found it will return false.
func (col CollectionDescription) GetFieldByID(id string) (FieldDescription, bool) {
if !col.Schema.IsEmpty() {
for _, field := range col.Schema.Fields {
if field.ID.String() == id {
return field, true
}
}
}
return FieldDescription{}, false
}

// GetRelation returns the field that supports the relation of the given name.
func (col CollectionDescription) GetRelation(name string) (FieldDescription, bool) {
if !col.Schema.IsEmpty() {
Expand Down
4 changes: 4 additions & 0 deletions client/request/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ const (
DockeyFieldName = "dockey"
CollectionIDFieldName = "collectionID"
SchemaVersionIDFieldName = "schemaVersionId"
FieldNameFieldName = "fieldName"
FieldIDFieldName = "fieldId"
DeltaFieldName = "delta"

LinksNameFieldName = "name"
Expand Down Expand Up @@ -94,6 +96,8 @@ var (
DockeyFieldName,
CollectionIDFieldName,
SchemaVersionIDFieldName,
FieldNameFieldName,
FieldIDFieldName,
DeltaFieldName,
}

Expand Down
10 changes: 9 additions & 1 deletion core/crdt/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type CompositeDAGDelta struct {
// Status represents the status of the document. By default it is `Active`.
// Alternatively, if can be set to `Deleted`.
Status client.DocumentStatus

FieldName string
}

// GetPriority gets the current priority for this delta.
Expand All @@ -70,7 +72,8 @@ func (delta *CompositeDAGDelta) Marshal() ([]byte, error) {
Data []byte
DocKey []byte
Status uint8
}{delta.SchemaVersionID, delta.Priority, delta.Data, delta.DocKey, delta.Status.UInt8()})
FieldName string
}{delta.SchemaVersionID, delta.Priority, delta.Data, delta.DocKey, delta.Status.UInt8(), delta.FieldName})
if err != nil {
return nil, err
}
Expand All @@ -95,18 +98,22 @@ type CompositeDAG struct {
//
// It can be used to identify the collection datastructure state at time of commit.
schemaVersionKey core.CollectionSchemaVersionKey

fieldName string
}

func NewCompositeDAG(
store datastore.DSReaderWriter,
schemaVersionKey core.CollectionSchemaVersionKey,
namespace core.Key,
key core.DataStoreKey,
fieldName string,
) CompositeDAG {
return CompositeDAG{
store: store,
key: key,
schemaVersionKey: schemaVersionKey,
fieldName: fieldName,
}
}

Expand All @@ -131,6 +138,7 @@ func (c CompositeDAG) Set(patch []byte, links []core.DAGLink) *CompositeDAGDelta
DocKey: []byte(c.key.DocKey),
SubDAGs: links,
SchemaVersionID: c.schemaVersionKey.SchemaVersionId,
FieldName: c.fieldName,
}
}

Expand Down
9 changes: 8 additions & 1 deletion core/crdt/lwwreg.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type LWWRegDelta struct {
Priority uint64
Data []byte
DocKey []byte
FieldName string
}

// GetPriority gets the current priority for this delta.
Expand All @@ -62,7 +63,8 @@ func (delta *LWWRegDelta) Marshal() ([]byte, error) {
Priority uint64
Data []byte
DocKey []byte
}{delta.SchemaVersionID, delta.Priority, delta.Data, delta.DocKey})
FieldName string
}{delta.SchemaVersionID, delta.Priority, delta.Data, delta.DocKey, delta.FieldName})
if err != nil {
return nil, err
}
Expand All @@ -82,17 +84,21 @@ type LWWRegister struct {
//
// It can be used to identify the collection datastructure state at time of commit.
schemaVersionKey core.CollectionSchemaVersionKey

fieldName string
}

// NewLWWRegister returns a new instance of the LWWReg with the given ID.
func NewLWWRegister(
store datastore.DSReaderWriter,
schemaVersionKey core.CollectionSchemaVersionKey,
key core.DataStoreKey,
fieldName string,
) LWWRegister {
return LWWRegister{
baseCRDT: newBaseCRDT(store, key),
schemaVersionKey: schemaVersionKey,
fieldName: fieldName,
// id: id,
// data: data,
// ts: ts,
Expand Down Expand Up @@ -120,6 +126,7 @@ func (reg LWWRegister) Set(value []byte) *LWWRegDelta {
return &LWWRegDelta{
Data: value,
DocKey: []byte(reg.key.DocKey),
FieldName: reg.fieldName,
SchemaVersionID: reg.schemaVersionKey.SchemaVersionId,
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/crdt/lwwreg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func newMockStore() datastore.DSReaderWriter {
func setupLWWRegister() LWWRegister {
store := newMockStore()
key := core.DataStoreKey{DocKey: "AAAA-BBBB"}
return NewLWWRegister(store, core.CollectionSchemaVersionKey{}, key)
return NewLWWRegister(store, core.CollectionSchemaVersionKey{}, key, "")
}

func setupLoadedLWWRegster(ctx context.Context) LWWRegister {
Expand Down
3 changes: 3 additions & 0 deletions db/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -967,12 +967,14 @@ func (c *collection) saveValueToMerkleCRDT(
args ...any) (ipld.Node, uint64, error) {
switch ctype {
case client.LWW_REGISTER:
field, _ := c.Description().GetFieldByID(key.FieldId)
merkleCRDT, err := c.db.crdtFactory.InstanceWithStores(
txn,
core.NewCollectionSchemaVersionKey(c.Schema().VersionID),
c.db.events.Updates,
ctype,
key,
field.Name,
)
if err != nil {
return nil, 0, err
Expand All @@ -998,6 +1000,7 @@ func (c *collection) saveValueToMerkleCRDT(
c.db.events.Updates,
ctype,
key,
"",
)
if err != nil {
return nil, 0, err
Expand Down
6 changes: 6 additions & 0 deletions db/collection_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ func (c *collection) deleteWithFilter(
if err != nil {
return nil, err
}

err = selectionPlan.Init()
if err != nil {
return nil, err
}

if err := selectionPlan.Start(); err != nil {
return nil, err
}
Expand Down
6 changes: 6 additions & 0 deletions db/collection_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ func (c *collection) updateWithFilter(
if err != nil {
return nil, err
}

err = selectionPlan.Init()
if err != nil {
return nil, err
}

if err = selectionPlan.Start(); err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions db/fetcher/versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ func (vf *VersionedFetcher) processNode(
events.EmptyUpdateChannel,
ctype,
key,
fieldName,
)
if err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions docs/data_format_changes/i851-field-name-in-block.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Add support for querying fieldName and fieldID in commits queries

FieldName was added to the blocks, and this changes all the block CIDs and means that anything committed before this change will cause an error to be returned when using the commits queries.
4 changes: 2 additions & 2 deletions merkle/clock/clock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ func newTestMerkleClock() *MerkleClock {

rw := datastore.AsDSReaderWriter(s)
multistore := datastore.MultiStoreFrom(rw)
reg := crdt.NewLWWRegister(rw, core.CollectionSchemaVersionKey{}, core.DataStoreKey{})
reg := crdt.NewLWWRegister(rw, core.CollectionSchemaVersionKey{}, core.DataStoreKey{}, "")
return NewMerkleClock(multistore.Headstore(), multistore.DAGstore(), core.HeadStoreKey{DocKey: "dockey", FieldId: "1"}, reg).(*MerkleClock)
}

func TestNewMerkleClock(t *testing.T) {
s := newDS()
rw := datastore.AsDSReaderWriter(s)
multistore := datastore.MultiStoreFrom(rw)
reg := crdt.NewLWWRegister(rw, core.CollectionSchemaVersionKey{}, core.DataStoreKey{})
reg := crdt.NewLWWRegister(rw, core.CollectionSchemaVersionKey{}, core.DataStoreKey{}, "")
clk := NewMerkleClock(multistore.Headstore(), multistore.DAGstore(), core.HeadStoreKey{}, reg).(*MerkleClock)

if clk.headstore != multistore.Headstore() {
Expand Down
4 changes: 4 additions & 0 deletions merkle/crdt/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var (
mstore datastore.MultiStore,
schemaID core.CollectionSchemaVersionKey,
uCh events.UpdateChannel,
fieldName string,
) MerkleCRDTInitFn {
return func(key core.DataStoreKey) MerkleCRDT {
return NewMerkleCompositeDAG(
Expand All @@ -39,6 +40,7 @@ var (
uCh,
core.DataStoreKey{},
key,
fieldName,
)
}
},
Expand Down Expand Up @@ -69,12 +71,14 @@ func NewMerkleCompositeDAG(
uCh events.UpdateChannel,
ns,
key core.DataStoreKey,
fieldName string,
) *MerkleCompositeDAG {
compositeDag := corecrdt.NewCompositeDAG(
datastore,
schemaVersionKey,
ns,
key, /* stuff like namespace and ID */
fieldName,
)

clock := clock.NewMerkleClock(headstore, dagstore, key.ToHeadStoreKey(), compositeDag)
Expand Down
7 changes: 5 additions & 2 deletions merkle/crdt/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type MerkleCRDTFactory func(
mstore datastore.MultiStore,
schemaVersionKey core.CollectionSchemaVersionKey,
uCh events.UpdateChannel,
fieldName string,
) MerkleCRDTInitFn

// Factory is a helper utility for instantiating new MerkleCRDTs.
Expand Down Expand Up @@ -66,14 +67,15 @@ func (factory Factory) Instance(
uCh events.UpdateChannel,
t client.CType,
key core.DataStoreKey,
fieldName string,
) (MerkleCRDT, error) {
// get the factory function for the given MerkleCRDT type
// and pass in the current factory state as a MultiStore parameter
fn, err := factory.getRegisteredFactory(t)
if err != nil {
return nil, err
}
return (*fn)(factory, schemaVersionKey, uCh)(key), nil
return (*fn)(factory, schemaVersionKey, uCh, fieldName)(key), nil
}

// InstanceWithStore executes the registered factory function for the given MerkleCRDT type
Expand All @@ -84,13 +86,14 @@ func (factory Factory) InstanceWithStores(
uCh events.UpdateChannel,
t client.CType,
key core.DataStoreKey,
fieldName string,
) (MerkleCRDT, error) {
fn, err := factory.getRegisteredFactory(t)
if err != nil {
return nil, err
}

return (*fn)(store, schemaVersionKey, uCh)(key), nil
return (*fn)(store, schemaVersionKey, uCh, fieldName)(key), nil
}

func (factory Factory) getRegisteredFactory(t client.CType) (*MerkleCRDTFactory, error) {
Expand Down
9 changes: 7 additions & 2 deletions merkle/crdt/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func TestFactoryInstanceMissing(t *testing.T) {
events.EmptyUpdateChannel,
client.LWW_REGISTER,
core.MustNewDataStoreKey("/1/0/MyKey"),
"",
)
assert.Equal(t, err, ErrFactoryTypeNoExist)
}
Expand All @@ -148,6 +149,7 @@ func TestBlankFactoryInstanceWithLWWRegister(t *testing.T) {
events.EmptyUpdateChannel,
client.LWW_REGISTER,
core.MustNewDataStoreKey("/1/0/MyKey"),
"",
)
assert.NoError(t, err)

Expand All @@ -166,6 +168,7 @@ func TestBlankFactoryInstanceWithCompositeRegister(t *testing.T) {
events.EmptyUpdateChannel,
client.COMPOSITE,
core.MustNewDataStoreKey("/1/0/MyKey"),
"",
)
assert.NoError(t, err)

Expand All @@ -183,6 +186,7 @@ func TestFullFactoryInstanceLWWRegister(t *testing.T) {
events.EmptyUpdateChannel,
client.LWW_REGISTER,
core.MustNewDataStoreKey("/1/0/MyKey"),
"",
)
assert.NoError(t, err)

Expand All @@ -200,6 +204,7 @@ func TestFullFactoryInstanceCompositeRegister(t *testing.T) {
events.EmptyUpdateChannel,
client.COMPOSITE,
core.MustNewDataStoreKey("/1/0/MyKey"),
"",
)
assert.NoError(t, err)

Expand All @@ -211,7 +216,7 @@ func TestLWWRegisterFactoryFn(t *testing.T) {
ctx := context.Background()
m := newStores()
f := NewFactory(m) // here factory is only needed to satisfy datastore.MultiStore interface
crdt := lwwFactoryFn(f, core.CollectionSchemaVersionKey{}, events.EmptyUpdateChannel)(core.MustNewDataStoreKey("/1/0/MyKey"))
crdt := lwwFactoryFn(f, core.CollectionSchemaVersionKey{}, events.EmptyUpdateChannel, "")(core.MustNewDataStoreKey("/1/0/MyKey"))

lwwreg, ok := crdt.(*MerkleLWWRegister)
assert.True(t, ok)
Expand All @@ -224,7 +229,7 @@ func TestCompositeRegisterFactoryFn(t *testing.T) {
ctx := context.Background()
m := newStores()
f := NewFactory(m) // here factory is only needed to satisfy datastore.MultiStore interface
crdt := compFactoryFn(f, core.CollectionSchemaVersionKey{}, events.EmptyUpdateChannel)(core.MustNewDataStoreKey("/1/0/MyKey"))
crdt := compFactoryFn(f, core.CollectionSchemaVersionKey{}, events.EmptyUpdateChannel, "")(core.MustNewDataStoreKey("/1/0/MyKey"))

merkleReg, ok := crdt.(*MerkleCompositeDAG)
assert.True(t, ok)
Expand Down
11 changes: 9 additions & 2 deletions merkle/crdt/lwwreg.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ import (

var (
lwwFactoryFn = MerkleCRDTFactory(
func(mstore datastore.MultiStore, schemaID core.CollectionSchemaVersionKey, _ events.UpdateChannel) MerkleCRDTInitFn {
func(
mstore datastore.MultiStore,
schemaID core.CollectionSchemaVersionKey,
_ events.UpdateChannel,
fieldName string,
) MerkleCRDTInitFn {
return func(key core.DataStoreKey) MerkleCRDT {
return NewMerkleLWWRegister(
mstore.Datastore(),
Expand All @@ -34,6 +39,7 @@ var (
schemaID,
core.DataStoreKey{},
key,
fieldName,
)
}
},
Expand Down Expand Up @@ -63,8 +69,9 @@ func NewMerkleLWWRegister(
dagstore datastore.DAGStore,
schemaVersionKey core.CollectionSchemaVersionKey,
ns, key core.DataStoreKey,
fieldName string,
) *MerkleLWWRegister {
register := corecrdt.NewLWWRegister(datastore, schemaVersionKey, key /* stuff like namespace and ID */)
register := corecrdt.NewLWWRegister(datastore, schemaVersionKey, key, fieldName /* stuff like namespace and ID */)
clk := clock.NewMerkleClock(headstore, dagstore, key.ToHeadStoreKey(), register)

// newBaseMerkleCRDT(clock, register)
Expand Down
2 changes: 1 addition & 1 deletion merkle/crdt/merklecrdt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func newTestBaseMerkleCRDT() (*baseMerkleCRDT, datastore.DSReaderWriter) {
rw := datastore.AsDSReaderWriter(s)
multistore := datastore.MultiStoreFrom(rw)

reg := corecrdt.NewLWWRegister(multistore.Datastore(), core.CollectionSchemaVersionKey{}, core.DataStoreKey{})
reg := corecrdt.NewLWWRegister(multistore.Datastore(), core.CollectionSchemaVersionKey{}, core.DataStoreKey{}, "")
clk := clock.NewMerkleClock(multistore.Headstore(), multistore.DAGstore(), core.HeadStoreKey{}, reg)
return &baseMerkleCRDT{clock: clk, crdt: reg}, rw
}
Expand Down
Loading

0 comments on commit 1dcfcae

Please sign in to comment.