Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add commits fieldName and fieldId fields #1451

Merged
13 changes: 13 additions & 0 deletions client/descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@ func (col CollectionDescription) GetField(name string) (FieldDescription, bool)
return FieldDescription{}, false
}

// GetFieldByID searches for a field with the given ID. If such a field is found it
// will return it and true, if it is not found it will return false.
func (col CollectionDescription) GetFieldByID(id string) (FieldDescription, bool) {
if !col.Schema.IsEmpty() {
for _, field := range col.Schema.Fields {
if field.ID.String() == id {
return field, true
}
}
}
return FieldDescription{}, false
}

// GetRelation returns the field that supports the relation of the given name.
func (col CollectionDescription) GetRelation(name string) (FieldDescription, bool) {
if !col.Schema.IsEmpty() {
Expand Down
4 changes: 4 additions & 0 deletions client/request/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ const (
DockeyFieldName = "dockey"
CollectionIDFieldName = "collectionID"
SchemaVersionIDFieldName = "schemaVersionId"
FieldNameFieldName = "fieldName"
FieldIDFieldName = "fieldId"
DeltaFieldName = "delta"

LinksNameFieldName = "name"
Expand Down Expand Up @@ -94,6 +96,8 @@ var (
DockeyFieldName,
CollectionIDFieldName,
SchemaVersionIDFieldName,
FieldNameFieldName,
FieldIDFieldName,
DeltaFieldName,
}

Expand Down
10 changes: 9 additions & 1 deletion core/crdt/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type CompositeDAGDelta struct {
// Status represents the status of the document. By default it is `Active`.
// Alternatively, if can be set to `Deleted`.
Status client.DocumentStatus

FieldName string
}

// GetPriority gets the current priority for this delta.
Expand All @@ -70,7 +72,8 @@ func (delta *CompositeDAGDelta) Marshal() ([]byte, error) {
Data []byte
DocKey []byte
Status uint8
}{delta.SchemaVersionID, delta.Priority, delta.Data, delta.DocKey, delta.Status.UInt8()})
FieldName string
}{delta.SchemaVersionID, delta.Priority, delta.Data, delta.DocKey, delta.Status.UInt8(), delta.FieldName})
if err != nil {
return nil, err
}
Expand All @@ -95,18 +98,22 @@ type CompositeDAG struct {
//
// It can be used to identify the collection datastructure state at time of commit.
schemaVersionKey core.CollectionSchemaVersionKey

fieldName string
}

func NewCompositeDAG(
store datastore.DSReaderWriter,
schemaVersionKey core.CollectionSchemaVersionKey,
namespace core.Key,
key core.DataStoreKey,
fieldName string,
) CompositeDAG {
return CompositeDAG{
store: store,
key: key,
schemaVersionKey: schemaVersionKey,
fieldName: fieldName,
}
}

Expand All @@ -131,6 +138,7 @@ func (c CompositeDAG) Set(patch []byte, links []core.DAGLink) *CompositeDAGDelta
DocKey: []byte(c.key.DocKey),
SubDAGs: links,
SchemaVersionID: c.schemaVersionKey.SchemaVersionId,
FieldName: c.fieldName,
}
}

Expand Down
9 changes: 8 additions & 1 deletion core/crdt/lwwreg.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type LWWRegDelta struct {
Priority uint64
Data []byte
DocKey []byte
FieldName string
AndrewSisley marked this conversation as resolved.
Show resolved Hide resolved
}

// GetPriority gets the current priority for this delta.
Expand All @@ -62,7 +63,8 @@ func (delta *LWWRegDelta) Marshal() ([]byte, error) {
Priority uint64
Data []byte
DocKey []byte
}{delta.SchemaVersionID, delta.Priority, delta.Data, delta.DocKey})
FieldName string
}{delta.SchemaVersionID, delta.Priority, delta.Data, delta.DocKey, delta.FieldName})
if err != nil {
return nil, err
}
Expand All @@ -82,17 +84,21 @@ type LWWRegister struct {
//
// It can be used to identify the collection datastructure state at time of commit.
schemaVersionKey core.CollectionSchemaVersionKey

fieldName string
}

// NewLWWRegister returns a new instance of the LWWReg with the given ID.
func NewLWWRegister(
store datastore.DSReaderWriter,
schemaVersionKey core.CollectionSchemaVersionKey,
key core.DataStoreKey,
fieldName string,
) LWWRegister {
return LWWRegister{
baseCRDT: newBaseCRDT(store, key),
schemaVersionKey: schemaVersionKey,
fieldName: fieldName,
// id: id,
// data: data,
// ts: ts,
Expand Down Expand Up @@ -120,6 +126,7 @@ func (reg LWWRegister) Set(value []byte) *LWWRegDelta {
return &LWWRegDelta{
Data: value,
DocKey: []byte(reg.key.DocKey),
FieldName: reg.fieldName,
SchemaVersionID: reg.schemaVersionKey.SchemaVersionId,
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/crdt/lwwreg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func newMockStore() datastore.DSReaderWriter {
func setupLWWRegister() LWWRegister {
store := newMockStore()
key := core.DataStoreKey{DocKey: "AAAA-BBBB"}
return NewLWWRegister(store, core.CollectionSchemaVersionKey{}, key)
return NewLWWRegister(store, core.CollectionSchemaVersionKey{}, key, "")
}

func setupLoadedLWWRegster(ctx context.Context) LWWRegister {
Expand Down
3 changes: 3 additions & 0 deletions db/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -967,12 +967,14 @@ func (c *collection) saveValueToMerkleCRDT(
args ...any) (ipld.Node, uint64, error) {
switch ctype {
case client.LWW_REGISTER:
field, _ := c.Description().GetFieldByID(key.FieldId)
merkleCRDT, err := c.db.crdtFactory.InstanceWithStores(
txn,
core.NewCollectionSchemaVersionKey(c.Schema().VersionID),
c.db.events.Updates,
ctype,
key,
field.Name,
)
if err != nil {
return nil, 0, err
Expand All @@ -998,6 +1000,7 @@ func (c *collection) saveValueToMerkleCRDT(
c.db.events.Updates,
ctype,
key,
"",
)
if err != nil {
return nil, 0, err
Expand Down
6 changes: 6 additions & 0 deletions db/collection_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ func (c *collection) deleteWithFilter(
if err != nil {
return nil, err
}

err = selectionPlan.Init()
if err != nil {
return nil, err
}

if err := selectionPlan.Start(); err != nil {
return nil, err
}
Expand Down
6 changes: 6 additions & 0 deletions db/collection_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ func (c *collection) updateWithFilter(
if err != nil {
return nil, err
}

err = selectionPlan.Init()
if err != nil {
return nil, err
}

if err = selectionPlan.Start(); err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions db/fetcher/versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ func (vf *VersionedFetcher) processNode(
events.EmptyUpdateChannel,
ctype,
key,
fieldName,
)
if err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions docs/data_format_changes/i851-field-name-in-block.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Add support for querying fieldName and fieldID in commits queries

FieldName was added to the blocks, and this changes all the block CIDs and means that anything committed before this change will cause an error to be returned when using the commits queries.
4 changes: 2 additions & 2 deletions merkle/clock/clock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ func newTestMerkleClock() *MerkleClock {

rw := datastore.AsDSReaderWriter(s)
multistore := datastore.MultiStoreFrom(rw)
reg := crdt.NewLWWRegister(rw, core.CollectionSchemaVersionKey{}, core.DataStoreKey{})
reg := crdt.NewLWWRegister(rw, core.CollectionSchemaVersionKey{}, core.DataStoreKey{}, "")
return NewMerkleClock(multistore.Headstore(), multistore.DAGstore(), core.HeadStoreKey{DocKey: "dockey", FieldId: "1"}, reg).(*MerkleClock)
}

func TestNewMerkleClock(t *testing.T) {
s := newDS()
rw := datastore.AsDSReaderWriter(s)
multistore := datastore.MultiStoreFrom(rw)
reg := crdt.NewLWWRegister(rw, core.CollectionSchemaVersionKey{}, core.DataStoreKey{})
reg := crdt.NewLWWRegister(rw, core.CollectionSchemaVersionKey{}, core.DataStoreKey{}, "")
clk := NewMerkleClock(multistore.Headstore(), multistore.DAGstore(), core.HeadStoreKey{}, reg).(*MerkleClock)

if clk.headstore != multistore.Headstore() {
Expand Down
4 changes: 4 additions & 0 deletions merkle/crdt/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var (
mstore datastore.MultiStore,
schemaID core.CollectionSchemaVersionKey,
uCh events.UpdateChannel,
fieldName string,
) MerkleCRDTInitFn {
return func(key core.DataStoreKey) MerkleCRDT {
return NewMerkleCompositeDAG(
Expand All @@ -39,6 +40,7 @@ var (
uCh,
core.DataStoreKey{},
key,
fieldName,
)
}
},
Expand Down Expand Up @@ -69,12 +71,14 @@ func NewMerkleCompositeDAG(
uCh events.UpdateChannel,
ns,
key core.DataStoreKey,
fieldName string,
) *MerkleCompositeDAG {
compositeDag := corecrdt.NewCompositeDAG(
datastore,
schemaVersionKey,
ns,
key, /* stuff like namespace and ID */
fieldName,
)

clock := clock.NewMerkleClock(headstore, dagstore, key.ToHeadStoreKey(), compositeDag)
Expand Down
7 changes: 5 additions & 2 deletions merkle/crdt/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type MerkleCRDTFactory func(
mstore datastore.MultiStore,
schemaVersionKey core.CollectionSchemaVersionKey,
uCh events.UpdateChannel,
fieldName string,
) MerkleCRDTInitFn

// Factory is a helper utility for instantiating new MerkleCRDTs.
Expand Down Expand Up @@ -66,14 +67,15 @@ func (factory Factory) Instance(
uCh events.UpdateChannel,
t client.CType,
key core.DataStoreKey,
fieldName string,
) (MerkleCRDT, error) {
// get the factory function for the given MerkleCRDT type
// and pass in the current factory state as a MultiStore parameter
fn, err := factory.getRegisteredFactory(t)
if err != nil {
return nil, err
}
return (*fn)(factory, schemaVersionKey, uCh)(key), nil
return (*fn)(factory, schemaVersionKey, uCh, fieldName)(key), nil
}

// InstanceWithStore executes the registered factory function for the given MerkleCRDT type
Expand All @@ -84,13 +86,14 @@ func (factory Factory) InstanceWithStores(
uCh events.UpdateChannel,
t client.CType,
key core.DataStoreKey,
fieldName string,
) (MerkleCRDT, error) {
fn, err := factory.getRegisteredFactory(t)
if err != nil {
return nil, err
}

return (*fn)(store, schemaVersionKey, uCh)(key), nil
return (*fn)(store, schemaVersionKey, uCh, fieldName)(key), nil
}

func (factory Factory) getRegisteredFactory(t client.CType) (*MerkleCRDTFactory, error) {
Expand Down
9 changes: 7 additions & 2 deletions merkle/crdt/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func TestFactoryInstanceMissing(t *testing.T) {
events.EmptyUpdateChannel,
client.LWW_REGISTER,
core.MustNewDataStoreKey("/1/0/MyKey"),
"",
)
assert.Equal(t, err, ErrFactoryTypeNoExist)
}
Expand All @@ -148,6 +149,7 @@ func TestBlankFactoryInstanceWithLWWRegister(t *testing.T) {
events.EmptyUpdateChannel,
client.LWW_REGISTER,
core.MustNewDataStoreKey("/1/0/MyKey"),
"",
)
assert.NoError(t, err)

Expand All @@ -166,6 +168,7 @@ func TestBlankFactoryInstanceWithCompositeRegister(t *testing.T) {
events.EmptyUpdateChannel,
client.COMPOSITE,
core.MustNewDataStoreKey("/1/0/MyKey"),
"",
)
assert.NoError(t, err)

Expand All @@ -183,6 +186,7 @@ func TestFullFactoryInstanceLWWRegister(t *testing.T) {
events.EmptyUpdateChannel,
client.LWW_REGISTER,
core.MustNewDataStoreKey("/1/0/MyKey"),
"",
)
assert.NoError(t, err)

Expand All @@ -200,6 +204,7 @@ func TestFullFactoryInstanceCompositeRegister(t *testing.T) {
events.EmptyUpdateChannel,
client.COMPOSITE,
core.MustNewDataStoreKey("/1/0/MyKey"),
"",
)
assert.NoError(t, err)

Expand All @@ -211,7 +216,7 @@ func TestLWWRegisterFactoryFn(t *testing.T) {
ctx := context.Background()
m := newStores()
f := NewFactory(m) // here factory is only needed to satisfy datastore.MultiStore interface
crdt := lwwFactoryFn(f, core.CollectionSchemaVersionKey{}, events.EmptyUpdateChannel)(core.MustNewDataStoreKey("/1/0/MyKey"))
crdt := lwwFactoryFn(f, core.CollectionSchemaVersionKey{}, events.EmptyUpdateChannel, "")(core.MustNewDataStoreKey("/1/0/MyKey"))

lwwreg, ok := crdt.(*MerkleLWWRegister)
assert.True(t, ok)
Expand All @@ -224,7 +229,7 @@ func TestCompositeRegisterFactoryFn(t *testing.T) {
ctx := context.Background()
m := newStores()
f := NewFactory(m) // here factory is only needed to satisfy datastore.MultiStore interface
crdt := compFactoryFn(f, core.CollectionSchemaVersionKey{}, events.EmptyUpdateChannel)(core.MustNewDataStoreKey("/1/0/MyKey"))
crdt := compFactoryFn(f, core.CollectionSchemaVersionKey{}, events.EmptyUpdateChannel, "")(core.MustNewDataStoreKey("/1/0/MyKey"))

merkleReg, ok := crdt.(*MerkleCompositeDAG)
assert.True(t, ok)
Expand Down
11 changes: 9 additions & 2 deletions merkle/crdt/lwwreg.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ import (

var (
lwwFactoryFn = MerkleCRDTFactory(
func(mstore datastore.MultiStore, schemaID core.CollectionSchemaVersionKey, _ events.UpdateChannel) MerkleCRDTInitFn {
func(
mstore datastore.MultiStore,
schemaID core.CollectionSchemaVersionKey,
_ events.UpdateChannel,
fieldName string,
) MerkleCRDTInitFn {
return func(key core.DataStoreKey) MerkleCRDT {
return NewMerkleLWWRegister(
mstore.Datastore(),
Expand All @@ -34,6 +39,7 @@ var (
schemaID,
core.DataStoreKey{},
key,
fieldName,
)
}
},
Expand Down Expand Up @@ -63,8 +69,9 @@ func NewMerkleLWWRegister(
dagstore datastore.DAGStore,
schemaVersionKey core.CollectionSchemaVersionKey,
ns, key core.DataStoreKey,
fieldName string,
) *MerkleLWWRegister {
register := corecrdt.NewLWWRegister(datastore, schemaVersionKey, key /* stuff like namespace and ID */)
register := corecrdt.NewLWWRegister(datastore, schemaVersionKey, key, fieldName /* stuff like namespace and ID */)
clk := clock.NewMerkleClock(headstore, dagstore, key.ToHeadStoreKey(), register)

// newBaseMerkleCRDT(clock, register)
Expand Down
2 changes: 1 addition & 1 deletion merkle/crdt/merklecrdt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func newTestBaseMerkleCRDT() (*baseMerkleCRDT, datastore.DSReaderWriter) {
rw := datastore.AsDSReaderWriter(s)
multistore := datastore.MultiStoreFrom(rw)

reg := corecrdt.NewLWWRegister(multistore.Datastore(), core.CollectionSchemaVersionKey{}, core.DataStoreKey{})
reg := corecrdt.NewLWWRegister(multistore.Datastore(), core.CollectionSchemaVersionKey{}, core.DataStoreKey{}, "")
clk := clock.NewMerkleClock(multistore.Headstore(), multistore.DAGstore(), core.HeadStoreKey{}, reg)
return &baseMerkleCRDT{clock: clk, crdt: reg}, rw
}
Expand Down
Loading