Skip to content

Commit

Permalink
feat: Add document delete mechanics (#1263)
Browse files Browse the repository at this point in the history
This PR covers deleting (not purging) of documents. It adds a status value to the composite block and allows the merkle DAG to keep track of deleted documents. It also moves the fields from value instances to deleted instances to easily filter deleted documents and avoid extra overhead from the deleted documents on normal queries. We can thus select to show deleted documents when querying and are able to show the status of the document (_isDeleted: true/false) in the response.
  • Loading branch information
fredcarle authored Apr 2, 2023
1 parent b9d2e32 commit 5cd9d18
Show file tree
Hide file tree
Showing 64 changed files with 1,522 additions and 388 deletions.
14 changes: 9 additions & 5 deletions client/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,31 +100,35 @@ type Collection interface {
// Target can be a Filter statement, a single docKey, a single document, an array of docKeys,
// or an array of documents. It is recommended to use the respective typed versions of Delete
// (e.g. DeleteWithFilter or DeleteWithKey) over this function if you can.
// This operation will hard-delete all state relating to the given DocKey. This includes data, block, and head storage.
// This operation will soft-delete documents related to the given DocKey and update the composite block
// with a status of `Deleted`.
//
// Returns an ErrInvalidDeleteTarget if the target type is not supported.
DeleteWith(ctx context.Context, target any) (*DeleteResult, error)
// DeleteWithFilter deletes documents matching the given filter.
//
// This operation will hard-delete all state relating to the given DocKey. This includes data, block, and head storage.
// This operation will soft-delete documents related to the given filter and update the composite block
// with a status of `Deleted`.
DeleteWithFilter(ctx context.Context, filter any) (*DeleteResult, error)
// DeleteWithKey deletes using a DocKey to target a single document for delete.
//
// This operation will hard-delete all state relating to the given DocKey. This includes data, block, and head storage.
// This operation will soft-delete documents related to the given DocKey and update the composite block
// with a status of `Deleted`.
//
// Returns an ErrDocumentNotFound if a document matching the given DocKey is not found.
DeleteWithKey(context.Context, DocKey) (*DeleteResult, error)
// DeleteWithKeys deletes documents matching the given DocKeys.
//
// This operation will hard-delete all state relating to the given DocKey. This includes data, block, and head storage.
// This operation will soft-delete documents related to the given DocKeys and update the composite block
// with a status of `Deleted`.
//
// Returns an ErrDocumentNotFound if a document is not found for any given DocKey.
DeleteWithKeys(context.Context, []DocKey) (*DeleteResult, error)

// Get returns the document with the given DocKey.
//
// Returns an ErrDocumentNotFound if a document matching the given DocKey is not found.
Get(context.Context, DocKey) (*Document, error)
Get(ctx context.Context, key DocKey, showDeleted bool) (*Document, error)

// WithTxn returns a new instance of the collection, with a transaction
// handle instead of a raw DB handle.
Expand Down
26 changes: 26 additions & 0 deletions client/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,32 @@ func (doc *Document) toMapWithKey() (map[string]any, error) {
return docMap, nil
}

// DocumentStatus represent the state of the document in the DAG store.
// It can either be `Active“ or `Deleted`.
type DocumentStatus uint8

const (
// Active is the default state of a document.
Active DocumentStatus = 1
// Deleted represents a document that has been marked as deleted. This means that the document
// can still be in the datastore but a normal request won't return it. The DAG store will still have all
// the associated links.
Deleted DocumentStatus = 2
)

var DocumentStatusToString = map[DocumentStatus]string{
Active: "Active",
Deleted: "Deleted",
}

func (dStatus DocumentStatus) UInt8() uint8 {
return uint8(dStatus)
}

func (dStatus DocumentStatus) IsDeleted() bool {
return dStatus > 1
}

// loops through an object of the form map[string]any
// and fills in the Document with each field it finds in the object.
// Automatically handles sub objects and arrays.
Expand Down
17 changes: 10 additions & 7 deletions client/request/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ const (
// https://spec.graphql.org/October2021/#sec-Type-Name-Introspection
TypeNameFieldName = "__typename"

Cid = "cid"
Data = "data"
DocKey = "dockey"
DocKeys = "dockeys"
FieldName = "field"
Id = "id"
Ids = "ids"
Cid = "cid"
Data = "data"
DocKey = "dockey"
DocKeys = "dockeys"
FieldName = "field"
Id = "id"
Ids = "ids"
ShowDeleted = "showDeleted"

FilterClause = "filter"
GroupByClause = "groupBy"
Expand All @@ -34,6 +35,7 @@ const (
CountFieldName = "_count"
KeyFieldName = "_key"
GroupFieldName = "_group"
DeletedFieldName = "_deleted"
SumFieldName = "_sum"
VersionFieldName = "_version"

Expand Down Expand Up @@ -72,6 +74,7 @@ var (
SumFieldName: true,
AverageFieldName: true,
KeyFieldName: true,
DeletedFieldName: true,
}

Aggregates = map[string]struct{}{
Expand Down
2 changes: 2 additions & 0 deletions client/request/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type Select struct {
Filter immutable.Option[Filter]

Fields []Selection

ShowDeleted bool
}

// Validate validates the Select.
Expand Down
62 changes: 61 additions & 1 deletion core/crdt/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"sort"
"strings"

ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
ipld "github.com/ipfs/go-ipld-format"
dag "github.com/ipfs/go-merkledag"
"github.com/ugorji/go/codec"
Expand All @@ -24,6 +26,7 @@ import (
"github.com/sourcenetwork/defradb/core"
"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/db/base"
"github.com/sourcenetwork/defradb/errors"
)

var (
Expand All @@ -41,6 +44,9 @@ type CompositeDAGDelta struct {
Data []byte
DocKey []byte
SubDAGs []core.DAGLink
// Status represents the status of the document. By default it is `Active`.
// Alternatively, if can be set to `Deleted`.
Status client.DocumentStatus
}

// GetPriority gets the current priority for this delta.
Expand All @@ -63,7 +69,8 @@ func (delta *CompositeDAGDelta) Marshal() ([]byte, error) {
Priority uint64
Data []byte
DocKey []byte
}{delta.SchemaVersionID, delta.Priority, delta.Data, delta.DocKey})
Status uint8
}{delta.SchemaVersionID, delta.Priority, delta.Data, delta.DocKey, delta.Status.UInt8()})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -131,6 +138,14 @@ func (c CompositeDAG) Set(patch []byte, links []core.DAGLink) *CompositeDAGDelta
// It ensures that the object marker exists for the given key.
// If it doesn't, it adds it to the store.
func (c CompositeDAG) Merge(ctx context.Context, delta core.Delta, id string) error {
if dagDelta, ok := delta.(*CompositeDAGDelta); ok && dagDelta.Status.IsDeleted() {
err := c.store.Put(ctx, c.key.ToPrimaryDataStoreKey().ToDS(), []byte{base.DeletedObjectMarker})
if err != nil {
return err
}
return c.deleteWithPrefix(ctx, c.key.WithValueFlag().WithFieldId(""))
}

// ensure object marker exists
exists, err := c.store.Has(ctx, c.key.ToPrimaryDataStoreKey().ToDS())
if err != nil {
Expand All @@ -140,6 +155,51 @@ func (c CompositeDAG) Merge(ctx context.Context, delta core.Delta, id string) er
// write object marker
return c.store.Put(ctx, c.key.ToPrimaryDataStoreKey().ToDS(), []byte{base.ObjectMarker})
}

return nil
}

func (c CompositeDAG) deleteWithPrefix(ctx context.Context, key core.DataStoreKey) error {
val, err := c.store.Get(ctx, key.ToDS())
if err != nil && !errors.Is(err, ds.ErrNotFound) {
return err
}
if !errors.Is(err, ds.ErrNotFound) {
err = c.store.Put(ctx, c.key.WithDeletedFlag().ToDS(), val)
if err != nil {
return err
}
err = c.store.Delete(ctx, key.ToDS())
if err != nil {
return err
}
}
q := query.Query{
Prefix: key.ToString(),
}
res, err := c.store.Query(ctx, q)
for e := range res.Next() {
if e.Error != nil {
return err
}
dsKey, err := core.NewDataStoreKey(e.Key)
if err != nil {
return err
}

if dsKey.InstanceType == core.ValueKey {
err = c.store.Put(ctx, dsKey.WithDeletedFlag().ToDS(), e.Value)
if err != nil {
return err
}
}

err = c.store.Delete(ctx, dsKey.ToDS())
if err != nil {
return err
}
}

return nil
}

Expand Down
16 changes: 13 additions & 3 deletions core/crdt/lwwreg.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@ import (
"bytes"
"context"

ds "github.com/ipfs/go-datastore"
ipld "github.com/ipfs/go-ipld-format"
dag "github.com/ipfs/go-merkledag"
"github.com/ugorji/go/codec"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/core"
"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/db/base"
"github.com/sourcenetwork/defradb/errors"
)

var (
Expand Down Expand Up @@ -157,11 +160,18 @@ func (reg LWWRegister) setValue(ctx context.Context, val []byte, priority uint64
// if the current priority is higher ignore put
// else if the current value is lexicographically
// greater than the new then ignore
valueK := reg.key.WithValueFlag()
key := reg.key.WithValueFlag()
marker, err := reg.store.Get(ctx, reg.key.ToPrimaryDataStoreKey().ToDS())
if err != nil && !errors.Is(err, ds.ErrNotFound) {
return err
}
if bytes.Equal(marker, []byte{base.DeletedObjectMarker}) {
key = key.WithDeletedFlag()
}
if priority < curPrio {
return nil
} else if priority == curPrio {
curValue, _ := reg.store.Get(ctx, valueK.ToDS())
curValue, _ := reg.store.Get(ctx, key.ToDS())
// Do not use the first byte of the current value in the comparison.
// It's metadata that will falsify the result.
if len(curValue) > 0 {
Expand All @@ -174,7 +184,7 @@ func (reg LWWRegister) setValue(ctx context.Context, val []byte, priority uint64

// prepend the value byte array with a single byte indicator for the CRDT Type.
buf := append([]byte{byte(client.LWW_REGISTER)}, val...)
err = reg.store.Put(ctx, valueK.ToDS(), buf)
err = reg.store.Put(ctx, key.ToDS(), buf)
if err != nil {
return NewErrFailedToStoreValue(err)
}
Expand Down
2 changes: 2 additions & 0 deletions core/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package core
import (
"github.com/sourcenetwork/immutable"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/client/request"
)

Expand All @@ -32,6 +33,7 @@ type Doc struct {
Hidden bool

Fields DocFields
Status client.DocumentStatus
}

// GetKey returns the DocKey for this document.
Expand Down
8 changes: 8 additions & 0 deletions core/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const (
ValueKey = InstanceType("v")
// PriorityKey is a type that represents a priority instance.
PriorityKey = InstanceType("p")
// DeletedKey is a type that represents a deleted document.
DeletedKey = InstanceType("d")
)

const (
Expand Down Expand Up @@ -224,6 +226,12 @@ func (k DataStoreKey) WithPriorityFlag() DataStoreKey {
return newKey
}

func (k DataStoreKey) WithDeletedFlag() DataStoreKey {
newKey := k
newKey.InstanceType = DeletedKey
return newKey
}

func (k DataStoreKey) WithDocKey(docKey string) DataStoreKey {
newKey := k
newKey.DocKey = docKey
Expand Down
3 changes: 2 additions & 1 deletion db/base/descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@
package base

const (
ObjectMarker = byte(0xff) // @todo: Investigate object marker values
ObjectMarker = byte(0xff) // @todo: Investigate object marker values
DeletedObjectMarker = byte(0xfe)
)
Loading

0 comments on commit 5cd9d18

Please sign in to comment.