Skip to content

Commit

Permalink
apply feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
fredcarle committed Mar 31, 2023
1 parent e228cb9 commit 40765a0
Show file tree
Hide file tree
Showing 33 changed files with 483 additions and 329 deletions.
20 changes: 12 additions & 8 deletions client/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,26 +100,30 @@ type Collection interface {
// Target can be a Filter statement, a single docKey, a single document, an array of docKeys,
// or an array of documents. It is recommended to use the respective typed versions of Delete
// (e.g. DeleteWithFilter or DeleteWithKey) over this function if you can.
// This operation will hard-delete all state relating to the given DocKey. This includes data, block, and head storage.
// This operation will soft-delete documents related to the given DocKey and update the composite block
// with a status of `Deleted`.
//
// Returns an ErrInvalidDeleteTarget if the target type is not supported.
DeleteWith(ctx context.Context, target any, status DocumentStatus) (*DeleteResult, error)
DeleteWith(ctx context.Context, target any) (*DeleteResult, error)
// DeleteWithFilter deletes documents matching the given filter.
//
// This operation will hard-delete all state relating to the given DocKey. This includes data, block, and head storage.
DeleteWithFilter(ctx context.Context, filter any, status DocumentStatus) (*DeleteResult, error)
// This operation will soft-delete documents related to the given filter and update the composite block
// with a status of `Deleted`.
DeleteWithFilter(ctx context.Context, filter any) (*DeleteResult, error)
// DeleteWithKey deletes using a DocKey to target a single document for delete.
//
// This operation will hard-delete all state relating to the given DocKey. This includes data, block, and head storage.
// This operation will soft-delete documents related to the given DocKey and update the composite block
// with a status of `Deleted`.
//
// Returns an ErrDocumentNotFound if a document matching the given DocKey is not found.
DeleteWithKey(context.Context, DocKey, DocumentStatus) (*DeleteResult, error)
DeleteWithKey(context.Context, DocKey) (*DeleteResult, error)
// DeleteWithKeys deletes documents matching the given DocKeys.
//
// This operation will hard-delete all state relating to the given DocKey. This includes data, block, and head storage.
// This operation will soft-delete documents related to the given DocKeys and update the composite block
// with a status of `Deleted`.
//
// Returns an ErrDocumentNotFound if a document is not found for any given DocKey.
DeleteWithKeys(context.Context, []DocKey, DocumentStatus) (*DeleteResult, error)
DeleteWithKeys(context.Context, []DocKey) (*DeleteResult, error)

// Get returns the document with the given DocKey.
//
Expand Down
20 changes: 16 additions & 4 deletions client/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,18 +473,30 @@ func (doc *Document) toMapWithKey() (map[string]any, error) {
return docMap, nil
}

// DocumentStatus represent the state of the document in the DAG store.
// It can either be `Active“ or `Deleted`.
type DocumentStatus uint8

const (
Active DocumentStatus = iota + 1
Deleted
Purged
// Active is the default state of a document.
Active DocumentStatus = 1
// Deleted represents a document that has been marked as deleted. This means that the document
// can still be in the datastore but a normal request won't return it. The DAG store will still have all
// the associated links.
Deleted DocumentStatus = 2
)

var DocumentStatusToString = map[DocumentStatus]string{
Active: "Active",
Deleted: "Deleted",
Purged: "Purged",
}

func (dStatus DocumentStatus) UInt8() uint8 {
return uint8(dStatus)
}

func (dStatus DocumentStatus) IsDeleted() bool {
return dStatus > 1
}

// loops through an object of the form map[string]any
Expand Down
1 change: 0 additions & 1 deletion client/request/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ const (
CreateObjects
UpdateObjects
DeleteObjects
PurgeObjects
)

// ObjectMutation is a field on the `mutation` operation of a graphql request. It includes
Expand Down
15 changes: 0 additions & 15 deletions client/request/status.go

This file was deleted.

24 changes: 19 additions & 5 deletions core/crdt/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type CompositeDAGDelta struct {
DocKey []byte
SubDAGs []core.DAGLink
// Status represents the status of the document. By default it is `Active`.
// Alternatively, if can be set to `Deleted` or `Purged`.
// Alternatively, if can be set to `Deleted`.
Status client.DocumentStatus
}

Expand All @@ -56,6 +56,16 @@ func (delta *CompositeDAGDelta) SetPriority(prio uint64) {
delta.Priority = prio
}

// GetStatus gets the current document status for this delta.
func (delta *CompositeDAGDelta) GetStatus() client.DocumentStatus {
return delta.Status
}

// SetStatus will set the document status for this delta.
func (delta *CompositeDAGDelta) SetStatus(status client.DocumentStatus) {
delta.Status = status
}

// Marshal will serialize this delta to a byte array.
func (delta *CompositeDAGDelta) Marshal() ([]byte, error) {
h := &codec.CborHandle{}
Expand All @@ -66,8 +76,8 @@ func (delta *CompositeDAGDelta) Marshal() ([]byte, error) {
Priority uint64
Data []byte
DocKey []byte
Status client.DocumentStatus
}{delta.SchemaVersionID, delta.Priority, delta.Data, delta.DocKey, delta.Status})
Status uint8
}{delta.SchemaVersionID, delta.Priority, delta.Data, delta.DocKey, delta.Status.UInt8()})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -118,7 +128,7 @@ func (c CompositeDAG) Value(ctx context.Context) ([]byte, error) {
}

// Set applies a delta to the composite DAG CRDT. TBD
func (c CompositeDAG) Set(patch []byte, links []core.DAGLink, status client.DocumentStatus) *CompositeDAGDelta {
func (c CompositeDAG) Set(patch []byte, links []core.DAGLink) *CompositeDAGDelta {
// make sure the links are sorted lexicographically by CID
sort.Slice(links, func(i, j int) bool {
return strings.Compare(links[i].Cid.String(), links[j].Cid.String()) < 0
Expand All @@ -128,14 +138,17 @@ func (c CompositeDAG) Set(patch []byte, links []core.DAGLink, status client.Docu
DocKey: []byte(c.key.DocKey),
SubDAGs: links,
SchemaVersionID: c.schemaVersionKey.SchemaVersionId,
Status: status,
}
}

// Merge implements ReplicatedData interface.
// It ensures that the object marker exists for the given key.
// If it doesn't, it adds it to the store.
func (c CompositeDAG) Merge(ctx context.Context, delta core.Delta, id string) error {
if delta.GetStatus().IsDeleted() {
return c.store.Put(ctx, c.key.ToPrimaryDataStoreKey().ToDS(), []byte{base.DeletedObjectMarker})
}

// ensure object marker exists
exists, err := c.store.Has(ctx, c.key.ToPrimaryDataStoreKey().ToDS())
if err != nil {
Expand All @@ -145,6 +158,7 @@ func (c CompositeDAG) Merge(ctx context.Context, delta core.Delta, id string) er
// write object marker
return c.store.Put(ctx, c.key.ToPrimaryDataStoreKey().ToDS(), []byte{base.ObjectMarker})
}

return nil
}

Expand Down
14 changes: 14 additions & 0 deletions core/crdt/lwwreg.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,20 @@ func (delta *LWWRegDelta) SetPriority(prio uint64) {
delta.Priority = prio
}

// GetStatus gets the current document status for this delta.
//
// Currently a no-op
func (delta *LWWRegDelta) GetStatus() client.DocumentStatus {
return 0
}

// SetStatus will set the document status for this delta.
//
// Currently a no-op
func (delta *LWWRegDelta) SetStatus(status client.DocumentStatus) {
// No-op
}

// Marshal encodes the delta using CBOR.
// for now le'ts do cbor (quick to implement)
func (delta *LWWRegDelta) Marshal() ([]byte, error) {
Expand Down
4 changes: 4 additions & 0 deletions core/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@ package core

import (
cid "github.com/ipfs/go-cid"

"github.com/sourcenetwork/defradb/client"
)

// Delta represents a delta-state update to delta-CRDT.
// They are serialized to and from Protobuf (or CBOR).
type Delta interface {
GetPriority() uint64
SetPriority(uint64)
GetStatus() client.DocumentStatus
SetStatus(client.DocumentStatus)
Marshal() ([]byte, error)
Value() any
}
Expand Down
2 changes: 2 additions & 0 deletions core/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package core
import (
"github.com/sourcenetwork/immutable"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/client/request"
)

Expand All @@ -32,6 +33,7 @@ type Doc struct {
Hidden bool

Fields DocFields
Status client.DocumentStatus
}

// GetKey returns the DocKey for this document.
Expand Down
42 changes: 6 additions & 36 deletions core/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,6 @@ type ReplicatorKey struct {

var _ Key = (*ReplicatorKey)(nil)

type DeletedDataStoreKey struct {
CollectionId string
DocKey string
}

var _ Key = (*DeletedDataStoreKey)(nil)

// Creates a new DataStoreKey from a string as best as it can,
// splitting the input using '/' as a field deliminator. It assumes
// that the input string is in the following format:
Expand Down Expand Up @@ -233,6 +226,12 @@ func (k DataStoreKey) WithPriorityFlag() DataStoreKey {
return newKey
}

func (k DataStoreKey) WithDeletedFlag() DataStoreKey {
newKey := k
newKey.InstanceType = DeletedKey
return newKey
}

func (k DataStoreKey) WithDocKey(docKey string) DataStoreKey {
newKey := k
newKey.DocKey = docKey
Expand Down Expand Up @@ -473,35 +472,6 @@ func (k ReplicatorKey) ToDS() ds.Key {
return ds.NewKey(k.ToString())
}

func (k DataStoreKey) ToDeletedDataStoreKey() DeletedDataStoreKey {
return DeletedDataStoreKey{
CollectionId: k.CollectionID,
DocKey: k.DocKey,
}
}

func (k DeletedDataStoreKey) ToString() string {
result := ""

if k.CollectionId != "" {
result = result + "/" + k.CollectionId
}
result = result + "/" + string(DeletedKey)
if k.DocKey != "" {
result = result + "/" + k.DocKey
}

return result
}

func (k DeletedDataStoreKey) Bytes() []byte {
return []byte(k.ToString())
}

func (k DeletedDataStoreKey) ToDS() ds.Key {
return ds.NewKey(k.ToString())
}

func (k HeadStoreKey) ToString() string {
var result string

Expand Down
3 changes: 2 additions & 1 deletion db/base/descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@
package base

const (
ObjectMarker = byte(0xff) // @todo: Investigate object marker values
ObjectMarker = byte(0xff) // @todo: Investigate object marker values
DeletedObjectMarker = byte(0xfe)
)
48 changes: 35 additions & 13 deletions db/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package db

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -878,14 +879,13 @@ func (c *collection) Delete(ctx context.Context, key client.DocKey) (bool, error
return deleted, c.commitImplicitTxn(ctx, txn)
}

// at the moment, delete only does data storage delete.
// Dag, and head store will soon follow.
// delete put the document in a deleted state
func (c *collection) delete(
ctx context.Context,
txn datastore.Txn,
key core.PrimaryDataStoreKey,
) (bool, error) {
err := txn.Datastore().Delete(ctx, key.ToDS())
err := txn.Datastore().Put(ctx, key.ToDS(), []byte{base.DeletedObjectMarker})
if err != nil {
return false, err
}
Expand All @@ -902,18 +902,28 @@ func (c *collection) delete(
return c.deleteWithPrefix(ctx, txn, keyDS)
}

// deleteWithPrefix will delete all the keys using a prefix query set as the given key.
// deleteWithPrefix will convert value instances to deleted instances
// using a prefix query set as the given key.
func (c *collection) deleteWithPrefix(ctx context.Context, txn datastore.Txn, key core.DataStoreKey) (bool, error) {
q := query.Query{
Prefix: key.ToString(),
KeysOnly: true,
Prefix: key.ToString(),
}

if key.InstanceType == core.ValueKey {
err := txn.Datastore().Delete(ctx, key.ToDS())
if err != nil {
val, err := txn.Datastore().Get(ctx, key.ToDS())
if err != nil && !errors.Is(err, ds.ErrNotFound) {
return false, err
}
if !errors.Is(err, ds.ErrNotFound) {
err = txn.Datastore().Put(ctx, key.WithDeletedFlag().ToDS(), val)
if err != nil {
return false, err
}
err = txn.Datastore().Delete(ctx, key.ToDS())
if err != nil {
return false, err
}
}
}

res, err := txn.Datastore().Query(ctx, q)
Expand All @@ -928,6 +938,13 @@ func (c *collection) deleteWithPrefix(ctx context.Context, txn datastore.Txn, ke
return false, err
}

if dsKey.InstanceType == core.ValueKey {
err = txn.Datastore().Put(ctx, dsKey.WithDeletedFlag().ToDS(), e.Value)
if err != nil {
return false, err
}
}

err = txn.Datastore().Delete(ctx, dsKey.ToDS())
if err != nil {
return false, err
Expand Down Expand Up @@ -959,12 +976,17 @@ func (c *collection) exists(
txn datastore.Txn,
key core.PrimaryDataStoreKey,
) (exists bool, isDeleted bool, err error) {
exists, err = txn.Datastore().Has(ctx, key.ToDS())
if err != nil {
return exists, isDeleted, err
val, err := txn.Datastore().Get(ctx, key.ToDS())
if err != nil && errors.Is(err, ds.ErrNotFound) {
return false, false, nil
} else if err != nil {
return false, false, err
}
if bytes.Equal(val, []byte{base.DeletedObjectMarker}) {
return true, true, nil
}
isDeleted, err = txn.Datastore().Has(ctx, key.ToDataStoreKey().ToDeletedDataStoreKey().ToDS())
return exists, isDeleted, err

return true, false, nil
}

func (c *collection) saveDocValue(
Expand Down
Loading

0 comments on commit 40765a0

Please sign in to comment.