diff --git a/client/collection.go b/client/collection.go index 328bad3c25..f59bf43d6b 100644 --- a/client/collection.go +++ b/client/collection.go @@ -100,26 +100,30 @@ type Collection interface { // Target can be a Filter statement, a single docKey, a single document, an array of docKeys, // or an array of documents. It is recommended to use the respective typed versions of Delete // (e.g. DeleteWithFilter or DeleteWithKey) over this function if you can. - // This operation will hard-delete all state relating to the given DocKey. This includes data, block, and head storage. + // This operation will soft-delete documents related to the given DocKey and update the composite block + // with a status of `Deleted`. // // Returns an ErrInvalidDeleteTarget if the target type is not supported. - DeleteWith(ctx context.Context, target any, status DocumentStatus) (*DeleteResult, error) + DeleteWith(ctx context.Context, target any) (*DeleteResult, error) // DeleteWithFilter deletes documents matching the given filter. // - // This operation will hard-delete all state relating to the given DocKey. This includes data, block, and head storage. - DeleteWithFilter(ctx context.Context, filter any, status DocumentStatus) (*DeleteResult, error) + // This operation will soft-delete documents related to the given filter and update the composite block + // with a status of `Deleted`. + DeleteWithFilter(ctx context.Context, filter any) (*DeleteResult, error) // DeleteWithKey deletes using a DocKey to target a single document for delete. // - // This operation will hard-delete all state relating to the given DocKey. This includes data, block, and head storage. + // This operation will soft-delete documents related to the given DocKey and update the composite block + // with a status of `Deleted`. // // Returns an ErrDocumentNotFound if a document matching the given DocKey is not found. - DeleteWithKey(context.Context, DocKey, DocumentStatus) (*DeleteResult, error) + DeleteWithKey(context.Context, DocKey) (*DeleteResult, error) // DeleteWithKeys deletes documents matching the given DocKeys. // - // This operation will hard-delete all state relating to the given DocKey. This includes data, block, and head storage. + // This operation will soft-delete documents related to the given DocKeys and update the composite block + // with a status of `Deleted`. // // Returns an ErrDocumentNotFound if a document is not found for any given DocKey. - DeleteWithKeys(context.Context, []DocKey, DocumentStatus) (*DeleteResult, error) + DeleteWithKeys(context.Context, []DocKey) (*DeleteResult, error) // Get returns the document with the given DocKey. // diff --git a/client/document.go b/client/document.go index 9712ef8e71..11d432d1fa 100644 --- a/client/document.go +++ b/client/document.go @@ -473,18 +473,30 @@ func (doc *Document) toMapWithKey() (map[string]any, error) { return docMap, nil } +// DocumentStatus represent the state of the document in the DAG store. +// It can either be `Active“ or `Deleted`. type DocumentStatus uint8 const ( - Active DocumentStatus = iota + 1 - Deleted - Purged + // Active is the default state of a document. + Active DocumentStatus = 1 + // Deleted represents a document that has been marked as deleted. This means that the document + // can still be in the datastore but a normal request won't return it. The DAG store will still have all + // the associated links. + Deleted DocumentStatus = 2 ) var DocumentStatusToString = map[DocumentStatus]string{ Active: "Active", Deleted: "Deleted", - Purged: "Purged", +} + +func (dStatus DocumentStatus) UInt8() uint8 { + return uint8(dStatus) +} + +func (dStatus DocumentStatus) IsDeleted() bool { + return dStatus > 1 } // loops through an object of the form map[string]any diff --git a/client/request/mutation.go b/client/request/mutation.go index d48a72a2a8..c7f0e07ee8 100644 --- a/client/request/mutation.go +++ b/client/request/mutation.go @@ -19,7 +19,6 @@ const ( CreateObjects UpdateObjects DeleteObjects - PurgeObjects ) // ObjectMutation is a field on the `mutation` operation of a graphql request. It includes diff --git a/client/request/status.go b/client/request/status.go deleted file mode 100644 index aea04f89b3..0000000000 --- a/client/request/status.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright 2022 Democratized Data Foundation -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package request - -type StatusSelect struct { - Field -} diff --git a/core/crdt/composite.go b/core/crdt/composite.go index d65d81e062..bcd878abc1 100644 --- a/core/crdt/composite.go +++ b/core/crdt/composite.go @@ -42,7 +42,7 @@ type CompositeDAGDelta struct { DocKey []byte SubDAGs []core.DAGLink // Status represents the status of the document. By default it is `Active`. - // Alternatively, if can be set to `Deleted` or `Purged`. + // Alternatively, if can be set to `Deleted`. Status client.DocumentStatus } @@ -56,6 +56,16 @@ func (delta *CompositeDAGDelta) SetPriority(prio uint64) { delta.Priority = prio } +// GetStatus gets the current document status for this delta. +func (delta *CompositeDAGDelta) GetStatus() client.DocumentStatus { + return delta.Status +} + +// SetStatus will set the document status for this delta. +func (delta *CompositeDAGDelta) SetStatus(status client.DocumentStatus) { + delta.Status = status +} + // Marshal will serialize this delta to a byte array. func (delta *CompositeDAGDelta) Marshal() ([]byte, error) { h := &codec.CborHandle{} @@ -66,8 +76,8 @@ func (delta *CompositeDAGDelta) Marshal() ([]byte, error) { Priority uint64 Data []byte DocKey []byte - Status client.DocumentStatus - }{delta.SchemaVersionID, delta.Priority, delta.Data, delta.DocKey, delta.Status}) + Status uint8 + }{delta.SchemaVersionID, delta.Priority, delta.Data, delta.DocKey, delta.Status.UInt8()}) if err != nil { return nil, err } @@ -118,7 +128,7 @@ func (c CompositeDAG) Value(ctx context.Context) ([]byte, error) { } // Set applies a delta to the composite DAG CRDT. TBD -func (c CompositeDAG) Set(patch []byte, links []core.DAGLink, status client.DocumentStatus) *CompositeDAGDelta { +func (c CompositeDAG) Set(patch []byte, links []core.DAGLink) *CompositeDAGDelta { // make sure the links are sorted lexicographically by CID sort.Slice(links, func(i, j int) bool { return strings.Compare(links[i].Cid.String(), links[j].Cid.String()) < 0 @@ -128,7 +138,6 @@ func (c CompositeDAG) Set(patch []byte, links []core.DAGLink, status client.Docu DocKey: []byte(c.key.DocKey), SubDAGs: links, SchemaVersionID: c.schemaVersionKey.SchemaVersionId, - Status: status, } } @@ -136,6 +145,10 @@ func (c CompositeDAG) Set(patch []byte, links []core.DAGLink, status client.Docu // It ensures that the object marker exists for the given key. // If it doesn't, it adds it to the store. func (c CompositeDAG) Merge(ctx context.Context, delta core.Delta, id string) error { + if delta.GetStatus().IsDeleted() { + return c.store.Put(ctx, c.key.ToPrimaryDataStoreKey().ToDS(), []byte{base.DeletedObjectMarker}) + } + // ensure object marker exists exists, err := c.store.Has(ctx, c.key.ToPrimaryDataStoreKey().ToDS()) if err != nil { @@ -145,6 +158,7 @@ func (c CompositeDAG) Merge(ctx context.Context, delta core.Delta, id string) er // write object marker return c.store.Put(ctx, c.key.ToPrimaryDataStoreKey().ToDS(), []byte{base.ObjectMarker}) } + return nil } diff --git a/core/crdt/lwwreg.go b/core/crdt/lwwreg.go index 85505c60dd..455607f0fa 100644 --- a/core/crdt/lwwreg.go +++ b/core/crdt/lwwreg.go @@ -50,6 +50,20 @@ func (delta *LWWRegDelta) SetPriority(prio uint64) { delta.Priority = prio } +// GetStatus gets the current document status for this delta. +// +// Currently a no-op +func (delta *LWWRegDelta) GetStatus() client.DocumentStatus { + return 0 +} + +// SetStatus will set the document status for this delta. +// +// Currently a no-op +func (delta *LWWRegDelta) SetStatus(status client.DocumentStatus) { + // No-op +} + // Marshal encodes the delta using CBOR. // for now le'ts do cbor (quick to implement) func (delta *LWWRegDelta) Marshal() ([]byte, error) { diff --git a/core/delta.go b/core/delta.go index 185a0352e2..551975acc2 100644 --- a/core/delta.go +++ b/core/delta.go @@ -12,6 +12,8 @@ package core import ( cid "github.com/ipfs/go-cid" + + "github.com/sourcenetwork/defradb/client" ) // Delta represents a delta-state update to delta-CRDT. @@ -19,6 +21,8 @@ import ( type Delta interface { GetPriority() uint64 SetPriority(uint64) + GetStatus() client.DocumentStatus + SetStatus(client.DocumentStatus) Marshal() ([]byte, error) Value() any } diff --git a/core/doc.go b/core/doc.go index f1b188ebdc..6966a8db4d 100644 --- a/core/doc.go +++ b/core/doc.go @@ -16,6 +16,7 @@ package core import ( "github.com/sourcenetwork/immutable" + "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/client/request" ) @@ -32,6 +33,7 @@ type Doc struct { Hidden bool Fields DocFields + Status client.DocumentStatus } // GetKey returns the DocKey for this document. diff --git a/core/key.go b/core/key.go index 44c4a96a53..756290a607 100644 --- a/core/key.go +++ b/core/key.go @@ -124,13 +124,6 @@ type ReplicatorKey struct { var _ Key = (*ReplicatorKey)(nil) -type DeletedDataStoreKey struct { - CollectionId string - DocKey string -} - -var _ Key = (*DeletedDataStoreKey)(nil) - // Creates a new DataStoreKey from a string as best as it can, // splitting the input using '/' as a field deliminator. It assumes // that the input string is in the following format: @@ -233,6 +226,12 @@ func (k DataStoreKey) WithPriorityFlag() DataStoreKey { return newKey } +func (k DataStoreKey) WithDeletedFlag() DataStoreKey { + newKey := k + newKey.InstanceType = DeletedKey + return newKey +} + func (k DataStoreKey) WithDocKey(docKey string) DataStoreKey { newKey := k newKey.DocKey = docKey @@ -473,35 +472,6 @@ func (k ReplicatorKey) ToDS() ds.Key { return ds.NewKey(k.ToString()) } -func (k DataStoreKey) ToDeletedDataStoreKey() DeletedDataStoreKey { - return DeletedDataStoreKey{ - CollectionId: k.CollectionID, - DocKey: k.DocKey, - } -} - -func (k DeletedDataStoreKey) ToString() string { - result := "" - - if k.CollectionId != "" { - result = result + "/" + k.CollectionId - } - result = result + "/" + string(DeletedKey) - if k.DocKey != "" { - result = result + "/" + k.DocKey - } - - return result -} - -func (k DeletedDataStoreKey) Bytes() []byte { - return []byte(k.ToString()) -} - -func (k DeletedDataStoreKey) ToDS() ds.Key { - return ds.NewKey(k.ToString()) -} - func (k HeadStoreKey) ToString() string { var result string diff --git a/db/base/descriptions.go b/db/base/descriptions.go index 69e1c8f3ba..9ef991a4ff 100644 --- a/db/base/descriptions.go +++ b/db/base/descriptions.go @@ -11,5 +11,6 @@ package base const ( - ObjectMarker = byte(0xff) // @todo: Investigate object marker values + ObjectMarker = byte(0xff) // @todo: Investigate object marker values + DeletedObjectMarker = byte(0xfe) ) diff --git a/db/collection.go b/db/collection.go index a35e862674..a9b05c4984 100644 --- a/db/collection.go +++ b/db/collection.go @@ -11,6 +11,7 @@ package db import ( + "bytes" "context" "encoding/json" "fmt" @@ -878,14 +879,13 @@ func (c *collection) Delete(ctx context.Context, key client.DocKey) (bool, error return deleted, c.commitImplicitTxn(ctx, txn) } -// at the moment, delete only does data storage delete. -// Dag, and head store will soon follow. +// delete put the document in a deleted state func (c *collection) delete( ctx context.Context, txn datastore.Txn, key core.PrimaryDataStoreKey, ) (bool, error) { - err := txn.Datastore().Delete(ctx, key.ToDS()) + err := txn.Datastore().Put(ctx, key.ToDS(), []byte{base.DeletedObjectMarker}) if err != nil { return false, err } @@ -902,18 +902,28 @@ func (c *collection) delete( return c.deleteWithPrefix(ctx, txn, keyDS) } -// deleteWithPrefix will delete all the keys using a prefix query set as the given key. +// deleteWithPrefix will convert value instances to deleted instances +// using a prefix query set as the given key. func (c *collection) deleteWithPrefix(ctx context.Context, txn datastore.Txn, key core.DataStoreKey) (bool, error) { q := query.Query{ - Prefix: key.ToString(), - KeysOnly: true, + Prefix: key.ToString(), } if key.InstanceType == core.ValueKey { - err := txn.Datastore().Delete(ctx, key.ToDS()) - if err != nil { + val, err := txn.Datastore().Get(ctx, key.ToDS()) + if err != nil && !errors.Is(err, ds.ErrNotFound) { return false, err } + if !errors.Is(err, ds.ErrNotFound) { + err = txn.Datastore().Put(ctx, key.WithDeletedFlag().ToDS(), val) + if err != nil { + return false, err + } + err = txn.Datastore().Delete(ctx, key.ToDS()) + if err != nil { + return false, err + } + } } res, err := txn.Datastore().Query(ctx, q) @@ -928,6 +938,13 @@ func (c *collection) deleteWithPrefix(ctx context.Context, txn datastore.Txn, ke return false, err } + if dsKey.InstanceType == core.ValueKey { + err = txn.Datastore().Put(ctx, dsKey.WithDeletedFlag().ToDS(), e.Value) + if err != nil { + return false, err + } + } + err = txn.Datastore().Delete(ctx, dsKey.ToDS()) if err != nil { return false, err @@ -959,12 +976,17 @@ func (c *collection) exists( txn datastore.Txn, key core.PrimaryDataStoreKey, ) (exists bool, isDeleted bool, err error) { - exists, err = txn.Datastore().Has(ctx, key.ToDS()) - if err != nil { - return exists, isDeleted, err + val, err := txn.Datastore().Get(ctx, key.ToDS()) + if err != nil && errors.Is(err, ds.ErrNotFound) { + return false, false, nil + } else if err != nil { + return false, false, err + } + if bytes.Equal(val, []byte{base.DeletedObjectMarker}) { + return true, true, nil } - isDeleted, err = txn.Datastore().Has(ctx, key.ToDataStoreKey().ToDeletedDataStoreKey().ToDS()) - return exists, isDeleted, err + + return true, false, nil } func (c *collection) saveDocValue( diff --git a/db/collection_delete.go b/db/collection_delete.go index c509ab51e6..fde841708c 100644 --- a/db/collection_delete.go +++ b/db/collection_delete.go @@ -20,14 +20,11 @@ import ( ipld "github.com/ipfs/go-ipld-format" "github.com/ipfs/go-libipfs/blocks" dag "github.com/ipfs/go-merkledag" - "github.com/ugorji/go/codec" "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/client/request" "github.com/sourcenetwork/defradb/core" - "github.com/sourcenetwork/defradb/core/crdt" "github.com/sourcenetwork/defradb/datastore" - "github.com/sourcenetwork/defradb/db/base" "github.com/sourcenetwork/defradb/errors" "github.com/sourcenetwork/defradb/events" "github.com/sourcenetwork/defradb/merkle/clock" @@ -43,15 +40,14 @@ import ( func (c *collection) DeleteWith( ctx context.Context, target any, - status client.DocumentStatus, ) (*client.DeleteResult, error) { switch t := target.(type) { case string, map[string]any, *request.Filter: - return c.DeleteWithFilter(ctx, t, status) + return c.DeleteWithFilter(ctx, t) case client.DocKey: - return c.DeleteWithKey(ctx, t, status) + return c.DeleteWithKey(ctx, t) case []client.DocKey: - return c.DeleteWithKeys(ctx, t, status) + return c.DeleteWithKeys(ctx, t) default: return nil, client.ErrInvalidDeleteTarget } @@ -61,7 +57,6 @@ func (c *collection) DeleteWith( func (c *collection) DeleteWithKey( ctx context.Context, key client.DocKey, - status client.DocumentStatus, ) (*client.DeleteResult, error) { txn, err := c.getTxn(ctx, false) if err != nil { @@ -71,7 +66,7 @@ func (c *collection) DeleteWithKey( defer c.discardImplicitTxn(ctx, txn) dsKey := c.getPrimaryKeyFromDocKey(key) - res, err := c.deleteWithKey(ctx, txn, dsKey, status) + res, err := c.deleteWithKey(ctx, txn, dsKey, client.Deleted) if err != nil { return nil, err } @@ -83,7 +78,6 @@ func (c *collection) DeleteWithKey( func (c *collection) DeleteWithKeys( ctx context.Context, keys []client.DocKey, - status client.DocumentStatus, ) (*client.DeleteResult, error) { txn, err := c.getTxn(ctx, false) if err != nil { @@ -92,7 +86,7 @@ func (c *collection) DeleteWithKeys( defer c.discardImplicitTxn(ctx, txn) - res, err := c.deleteWithKeys(ctx, txn, keys, status) + res, err := c.deleteWithKeys(ctx, txn, keys, client.Deleted) if err != nil { return nil, err } @@ -104,7 +98,6 @@ func (c *collection) DeleteWithKeys( func (c *collection) DeleteWithFilter( ctx context.Context, filter any, - status client.DocumentStatus, ) (*client.DeleteResult, error) { txn, err := c.getTxn(ctx, false) if err != nil { @@ -113,7 +106,7 @@ func (c *collection) DeleteWithFilter( defer c.discardImplicitTxn(ctx, txn) - res, err := c.deleteWithFilter(ctx, txn, filter, status) + res, err := c.deleteWithFilter(ctx, txn, filter, client.Deleted) if err != nil { return nil, err } @@ -247,7 +240,7 @@ func newDagDeleter(bstore datastore.DAGStore) dagDeleter { func isDeleteStatus(status client.DocumentStatus) bool { switch status { - case client.Deleted, client.Purged: + case client.Deleted: return true default: return false @@ -267,19 +260,17 @@ func (c *collection) applyDelete( if err != nil { return err } - if !found { + if !found || isDeleted { return client.ErrDocumentNotFound } - dsKey := key.ToDataStoreKey() - - if !isDeleted { - err = c.txn.Datastore().Put(ctx, dsKey.ToDeletedDataStoreKey().ToDS(), []byte{base.ObjectMarker}) - if err != nil { - return err - } + _, err = c.delete(ctx, txn, key) + if err != nil { + return err } + dsKey := key.ToDataStoreKey() + headset := clock.NewHeadSet( txn.Headstore(), dsKey.WithFieldId(core.COMPOSITE_NAMESPACE).ToHeadStoreKey(), @@ -289,52 +280,41 @@ func (c *collection) applyDelete( return err } - b, err := txn.DAGstore().Get(ctx, cids[0]) - if err != nil { - return err - } - nd, err := dag.DecodeProtobuf(b.RawData()) - if err != nil { - return err + dagLinks := make([]core.DAGLink, len(cids)) + for i, cid := range cids { + dagLinks[i] = core.DAGLink{ + Name: core.HEAD, + Cid: cid, + } } - delta := &crdt.CompositeDAGDelta{} - h := &codec.CborHandle{} - dec := codec.NewDecoderBytes(nd.Data(), h) - err = dec.Decode(delta) + + headNode, priority, err := c.saveValueToMerkleCRDT( + ctx, + txn, + dsKey, + client.COMPOSITE, + []byte{}, + dagLinks, + status, + ) if err != nil { return err } - if delta.Status != status { - delta.Status = client.Deleted - headNode, priority, err := c.saveValueToMerkleCRDT( - ctx, - txn, - dsKey, - client.COMPOSITE, - delta.Data, - delta.SubDAGs, - status, + if c.db.events.Updates.HasValue() { + txn.OnSuccess( + func() { + c.db.events.Updates.Value().Publish( + events.Update{ + DocKey: key.DocKey, + Cid: headNode.Cid(), + SchemaID: c.schemaID, + Block: headNode, + Priority: priority, + }, + ) + }, ) - if err != nil { - return err - } - - if c.db.events.Updates.HasValue() { - txn.OnSuccess( - func() { - c.db.events.Updates.Value().Publish( - events.Update{ - DocKey: key.DocKey, - Cid: headNode.Cid(), - SchemaID: c.schemaID, - Block: headNode, - Priority: priority, - }, - ) - }, - ) - } } return nil diff --git a/db/collection_get.go b/db/collection_get.go index 3735cb5030..678a154598 100644 --- a/db/collection_get.go +++ b/db/collection_get.go @@ -37,7 +37,7 @@ func (c *collection) Get(ctx context.Context, key client.DocKey, showDeleted boo return nil, client.ErrDocumentNotFound } - doc, err := c.get(ctx, txn, dsKey) + doc, err := c.get(ctx, txn, dsKey, showDeleted) if err != nil { return nil, err } @@ -48,12 +48,13 @@ func (c *collection) get( ctx context.Context, txn datastore.Txn, key core.PrimaryDataStoreKey, + showDeleted bool, ) (*client.Document, error) { // create a new document fetcher df := new(fetcher.DocumentFetcher) desc := &c.desc // initialize it with the primary index - err := df.Init(&c.desc, nil, false) + err := df.Init(&c.desc, nil, false, showDeleted) if err != nil { _ = df.Close() return nil, err diff --git a/db/errors.go b/db/errors.go index aebd97d22f..5c2900ec35 100644 --- a/db/errors.go +++ b/db/errors.go @@ -218,7 +218,7 @@ func NewErrCannotDeleteField(name string, id client.FieldID) error { func NewErrInvalidDeleteStatus(status client.DocumentStatus) error { return errors.New( errInvalidDeleteStatus, - errors.NewKV("Expected", "Deleted or Purged"), + errors.NewKV("Expected", "Deleted"), errors.NewKV("Actual", client.DocumentStatusToString[status]), ) } diff --git a/db/fetcher/encoded_doc.go b/db/fetcher/encoded_doc.go index 29e0a5c56d..a141c50652 100644 --- a/db/fetcher/encoded_doc.go +++ b/db/fetcher/encoded_doc.go @@ -213,7 +213,6 @@ func (encdoc *encodedDocument) Decode() (*client.Document, error) { // map of field/value pairs func (encdoc *encodedDocument) DecodeToDoc(mapping *core.DocumentMapping) (core.Doc, error) { doc := mapping.NewDoc() - doc.SetKey(string(encdoc.Key)) for fieldDesc, prop := range encdoc.Properties { _, val, err := prop.Decode() diff --git a/db/fetcher/fetcher.go b/db/fetcher/fetcher.go index 4aacda86bb..a0e14fac7e 100644 --- a/db/fetcher/fetcher.go +++ b/db/fetcher/fetcher.go @@ -26,7 +26,7 @@ import ( // Fetcher is the interface for collecting documents from the underlying data store. // It handles all the key/value scanning, aggregation, and document encoding. type Fetcher interface { - Init(col *client.CollectionDescription, fields []*client.FieldDescription, reverse bool) error + Init(col *client.CollectionDescription, fields []*client.FieldDescription, reverse bool, showDeleted bool) error Start(ctx context.Context, txn datastore.Txn, spans core.Spans) error FetchNext(ctx context.Context) (*encodedDocument, error) FetchNextDecoded(ctx context.Context) (*client.Document, error) @@ -60,6 +60,8 @@ type DocumentFetcher struct { kvResultsIter dsq.Results kvEnd bool isReadingDocument bool + + deletedDocFetcher *DocumentFetcher } // Init implements DocumentFetcher. @@ -67,6 +69,7 @@ func (df *DocumentFetcher) Init( col *client.CollectionDescription, fields []*client.FieldDescription, reverse bool, + showDeleted bool, ) error { if col.Schema.IsEmpty() { return client.NewErrUninitializeProperty("DocumentFetcher", "Schema") @@ -96,11 +99,56 @@ func (df *DocumentFetcher) Init( for _, field := range col.Schema.Fields { df.schemaFields[uint32(field.ID)] = field } + if showDeleted { + ddf := df.deletedDocFetcher + if ddf == nil { + ddf = new(DocumentFetcher) + } + + ddf.col = col + ddf.fields = fields + ddf.reverse = reverse + ddf.initialized = true + ddf.isReadingDocument = false + ddf.doc = new(encodedDocument) + + if ddf.kvResultsIter != nil { + if err := ddf.kvResultsIter.Close(); err != nil { + return err + } + } + ddf.kvResultsIter = nil + if ddf.kvIter != nil { + if err := ddf.kvIter.Close(); err != nil { + return err + } + } + ddf.kvIter = nil + + ddf.schemaFields = make(map[uint32]client.FieldDescription) + for _, field := range col.Schema.Fields { + ddf.schemaFields[uint32(field.ID)] = field + } + df.deletedDocFetcher = ddf + } return nil } -// Start implements DocumentFetcher. func (df *DocumentFetcher) Start(ctx context.Context, txn datastore.Txn, spans core.Spans) error { + err := df.start(ctx, txn, spans, false) + if err != nil { + return err + } + + if df.deletedDocFetcher != nil { + return df.deletedDocFetcher.start(ctx, txn, spans, true) + } + + return nil +} + +// Start implements DocumentFetcher. +func (df *DocumentFetcher) start(ctx context.Context, txn datastore.Txn, spans core.Spans, withDeleted bool) error { if df.col == nil { return client.NewErrUninitializeProperty("DocumentFetcher", "CollectionDescription") } @@ -109,13 +157,22 @@ func (df *DocumentFetcher) Start(ctx context.Context, txn datastore.Txn, spans c } if !spans.HasValue { // no specified spans so create a prefix scan key for the entire collection - start := base.MakeCollectionKey(*df.col).WithValueFlag() + start := base.MakeCollectionKey(*df.col) + if withDeleted { + start = start.WithDeletedFlag() + } else { + start = start.WithValueFlag() + } df.spans = core.NewSpans(core.NewSpan(start, start.PrefixEnd())) } else { valueSpans := make([]core.Span, len(spans.Value)) for i, span := range spans.Value { // We can only handle value keys, so here we ensure we only read value keys - valueSpans[i] = core.NewSpan(span.Start().WithValueFlag(), span.End().WithValueFlag()) + if withDeleted { + valueSpans[i] = core.NewSpan(span.Start().WithDeletedFlag(), span.End().WithDeletedFlag()) + } else { + valueSpans[i] = core.NewSpan(span.Start().WithValueFlag(), span.End().WithValueFlag()) + } } spans := core.MergeAscending(valueSpans) @@ -203,8 +260,7 @@ func (df *DocumentFetcher) nextKey(ctx context.Context) (spanDone bool, err erro if err != nil { return false, err } - - if df.kv != nil && df.kv.Key.InstanceType != core.ValueKey { + if df.kv != nil && (df.kv.Key.InstanceType != core.ValueKey && df.kv.Key.InstanceType != core.DeletedKey) { // We can only ready value values, if we escape the collection's value keys // then we must be done and can stop reading spanDone = true @@ -365,18 +421,38 @@ func (df *DocumentFetcher) FetchNextDoc( ctx context.Context, mapping *core.DocumentMapping, ) ([]byte, core.Doc, error) { - encdoc, err := df.FetchNext(ctx) - if err != nil { - return nil, core.Doc{}, err + var err error + var encdoc *encodedDocument + var status client.DocumentStatus + ddf := df.deletedDocFetcher + if ddf != nil { + if !ddf.kvEnd { + if df.kvEnd || ddf.kv.Key.DocKey < df.kv.Key.DocKey { + encdoc, err = ddf.FetchNext(ctx) + if err != nil { + return nil, core.Doc{}, err + } + status = client.Deleted + } + } } + if encdoc == nil { - return nil, core.Doc{}, nil + encdoc, err = df.FetchNext(ctx) + if err != nil { + return nil, core.Doc{}, err + } + if encdoc == nil { + return nil, core.Doc{}, nil + } + status = client.Active } doc, err := encdoc.DecodeToDoc(mapping) if err != nil { return nil, core.Doc{}, err } + doc.Status = status return encdoc.Key, doc, err } @@ -395,5 +471,14 @@ func (df *DocumentFetcher) Close() error { return nil } - return df.kvResultsIter.Close() + err = df.kvResultsIter.Close() + if err != nil { + return err + } + + if df.deletedDocFetcher != nil { + return df.deletedDocFetcher.Close() + } + + return nil } diff --git a/db/fetcher/versioned.go b/db/fetcher/versioned.go index 7d7197c3a3..871b96ba80 100644 --- a/db/fetcher/versioned.go +++ b/db/fetcher/versioned.go @@ -101,6 +101,7 @@ func (vf *VersionedFetcher) Init( col *client.CollectionDescription, fields []*client.FieldDescription, reverse bool, + showDeleted bool, ) error { vf.col = col vf.queuedCids = list.New() @@ -108,7 +109,7 @@ func (vf *VersionedFetcher) Init( // run the DF init, VersionedFetchers only supports the Primary (0) index vf.DocumentFetcher = new(DocumentFetcher) - return vf.DocumentFetcher.Init(col, fields, reverse) + return vf.DocumentFetcher.Init(col, fields, reverse, showDeleted) } // Start serializes the correct state according to the Key and CID. diff --git a/db/fetcher_test.go b/db/fetcher_test.go index 9122bb5237..80fc42b359 100644 --- a/db/fetcher_test.go +++ b/db/fetcher_test.go @@ -53,7 +53,7 @@ func newTestCollectionDescription() client.CollectionDescription { func newTestFetcher() (*fetcher.DocumentFetcher, error) { df := new(fetcher.DocumentFetcher) desc := newTestCollectionDescription() - err := df.Init(&desc, nil, false) + err := df.Init(&desc, nil, false, false) if err != nil { return nil, err } @@ -133,7 +133,7 @@ func TestFetcherGetAllPrimaryIndexEncodedDocSingle(t *testing.T) { df := new(fetcher.DocumentFetcher) desc := col.Description() - err = df.Init(&desc, nil, false) + err = df.Init(&desc, nil, false, false) assert.NoError(t, err) err = df.Start(ctx, txn, core.Spans{}) @@ -178,7 +178,7 @@ func TestFetcherGetAllPrimaryIndexEncodedDocMultiple(t *testing.T) { df := new(fetcher.DocumentFetcher) desc := col.Description() - err = df.Init(&desc, nil, false) + err = df.Init(&desc, nil, false, false) assert.NoError(t, err) err = df.Start(ctx, txn, core.Spans{}) @@ -210,7 +210,7 @@ func TestFetcherGetAllPrimaryIndexDecodedSingle(t *testing.T) { df := new(fetcher.DocumentFetcher) desc := col.Description() - err = df.Init(&desc, nil, false) + err = df.Init(&desc, nil, false, false) assert.NoError(t, err) txn, err := db.NewTxn(ctx, true) @@ -262,7 +262,7 @@ func TestFetcherGetAllPrimaryIndexDecodedMultiple(t *testing.T) { df := new(fetcher.DocumentFetcher) desc := col.Description() - err = df.Init(&desc, nil, false) + err = df.Init(&desc, nil, false, false) assert.NoError(t, err) txn, err := db.NewTxn(ctx, true) @@ -319,7 +319,7 @@ func TestFetcherGetOnePrimaryIndexDecoded(t *testing.T) { df := new(fetcher.DocumentFetcher) desc := col.Description() - err = df.Init(&desc, nil, false) + err = df.Init(&desc, nil, false, false) assert.NoError(t, err) // create a span for our document we wish to find diff --git a/merkle/crdt/composite.go b/merkle/crdt/composite.go index 6ca540dc7c..1252ff4e63 100644 --- a/merkle/crdt/composite.go +++ b/merkle/crdt/composite.go @@ -96,7 +96,8 @@ func (m *MerkleCompositeDAG) Set( // Set() call on underlying CompositeDAG CRDT // persist/publish delta log.Debug(ctx, "Applying delta-mutator 'Set' on CompositeDAG") - delta := m.reg.Set(patch, links, status) + delta := m.reg.Set(patch, links) + delta.SetStatus(status) nd, err := m.Publish(ctx, delta) if err != nil { return nil, 0, err diff --git a/planner/delete.go b/planner/delete.go index b843042236..04dec166e9 100644 --- a/planner/delete.go +++ b/planner/delete.go @@ -45,7 +45,7 @@ func (n *deleteNode) Next() (bool, error) { if err != nil { return false, err } - _, err = n.collection.DeleteWithKey(n.p.ctx, key, n.status) + _, err = n.collection.DeleteWithKey(n.p.ctx, key) if err != nil { return false, err } diff --git a/planner/mapper/mapper.go b/planner/mapper/mapper.go index ab29a33670..72b723d441 100644 --- a/planner/mapper/mapper.go +++ b/planner/mapper/mapper.go @@ -536,20 +536,6 @@ func getRequestables( Key: getRenderKey(&f.Field), }) - mapping.Add(index, f.Name) - case *request.StatusSelect: - index := mapping.GetNextIndex() - - fields = append(fields, &Field{ - Index: index, - Name: f.Name, - }) - - mapping.RenderKeys = append(mapping.RenderKeys, core.RenderKey{ - Index: index, - Key: getRenderKey(&f.Field), - }) - mapping.Add(index, f.Name) default: return nil, nil, client.NewErrUnhandledType("field", field) @@ -655,6 +641,8 @@ func getTopLevelInfo( // the typeName index is dynamic, but the field indexes are not mapping.SetTypeName(collectionName) + mapping.Add(mapping.GetNextIndex(), request.StatusFieldName) + return mapping, &desc, nil } diff --git a/planner/mapper/mutation.go b/planner/mapper/mutation.go index 241d770882..c3c5829294 100644 --- a/planner/mapper/mutation.go +++ b/planner/mapper/mutation.go @@ -17,7 +17,6 @@ const ( CreateObjects UpdateObjects DeleteObjects - PurgeObjects ) // Mutation represents a request to mutate data stored in Defra. diff --git a/planner/planner.go b/planner/planner.go index a07afb4f06..a4202dc98a 100644 --- a/planner/planner.go +++ b/planner/planner.go @@ -160,9 +160,6 @@ func (p *Planner) newObjectMutationPlan(stmt *mapper.Mutation) (planNode, error) case mapper.DeleteObjects: return p.DeleteDocs(stmt, client.Deleted) - case mapper.PurgeObjects: - return p.DeleteDocs(stmt, client.Purged) - default: return nil, client.NewErrUnhandledType("mutation", stmt.Type) } @@ -494,7 +491,6 @@ func (p *Planner) executeRequest( return nil, err } } - return docs, err } diff --git a/planner/scan.go b/planner/scan.go index 45a07ddbe7..05b590addd 100644 --- a/planner/scan.go +++ b/planner/scan.go @@ -12,6 +12,7 @@ package planner import ( "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/client/request" "github.com/sourcenetwork/defradb/core" "github.com/sourcenetwork/defradb/db/base" "github.com/sourcenetwork/defradb/db/fetcher" @@ -47,7 +48,7 @@ func (n *scanNode) Kind() string { func (n *scanNode) Init() error { // init the fetcher - if err := n.fetcher.Init(&n.desc, n.fields, n.reverse); err != nil { + if err := n.fetcher.Init(&n.desc, n.fields, n.reverse, n.showDeleted); err != nil { return err } return n.initScan() @@ -98,24 +99,11 @@ func (n *scanNode) Next() (bool, error) { if len(n.currentValue.Fields) == 0 { return false, nil } - - if !n.showDeleted { - dockey, err := client.NewDocKeyFromString(string(n.docKey)) - if err != nil { - return false, err - } - dsKey := core.DataStoreKeyFromDocKey(dockey) - dsKey.CollectionID = n.desc.IDString() - isDeleted, err := n.p.txn.Datastore().Has(n.p.ctx, dsKey.ToDeletedDataStoreKey().ToDS()) - if err != nil { - return false, err - } - - if isDeleted { - continue - } - } - + n.documentMapping.SetFirstOfName( + &n.currentValue, + request.StatusFieldName, + client.DocumentStatusToString[n.currentValue.Status], + ) passed, err := mapper.RunFilter(n.currentValue, n.filter) if err != nil { return false, err diff --git a/planner/select.go b/planner/select.go index 10d382f794..c0e78271e6 100644 --- a/planner/select.go +++ b/planner/select.go @@ -308,14 +308,6 @@ func (n *selectNode) initFields(selectReq *mapper.Select) ([]aggregateNode, erro //nolint:errcheck n.addTypeIndexJoin(f) // @TODO: ISSUE#158 } - case *mapper.Field: - switch f.Name { - case request.StatusFieldName: - statusScanPlan := n.planner.Status(n, n.origSource) - if err := n.addSubPlan(f.Index, statusScanPlan); err != nil { - return nil, err - } - } } } diff --git a/planner/status.go b/planner/status.go deleted file mode 100644 index 86c53c5c99..0000000000 --- a/planner/status.go +++ /dev/null @@ -1,102 +0,0 @@ -// Copyright 2022 Democratized Data Foundation -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package planner - -import ( - dag "github.com/ipfs/go-merkledag" - "github.com/ugorji/go/codec" - - "github.com/sourcenetwork/defradb/client" - "github.com/sourcenetwork/defradb/client/request" - "github.com/sourcenetwork/defradb/core" - "github.com/sourcenetwork/defradb/core/crdt" - "github.com/sourcenetwork/defradb/merkle/clock" -) - -// A lazily loaded cache-node that allows retrieval of cached documents at arbitrary indexes. -// The node will start empty, and then load items as they are requested. Items that are -// requested more than once will not be re-loaded from source. -type statusScanNode struct { - documentIterator - docMapper - - planner *Planner - plan planNode -} - -func (p *Planner) Status(parent *selectNode, source planNode) *statusScanNode { - return &statusScanNode{ - planner: p, - plan: source, - docMapper: docMapper{parent.documentMapping}, - } -} - -func (n *statusScanNode) Kind() string { - return "statusScanNode" -} - -func (n *statusScanNode) Init() error { return n.plan.Init() } - -func (n *statusScanNode) Start() error { return n.plan.Start() } - -func (n *statusScanNode) Spans(spans core.Spans) { n.plan.Spans(spans) } - -func (n *statusScanNode) Close() error { return n.plan.Close() } - -func (n *statusScanNode) Source() planNode { return n.plan } - -func (n *statusScanNode) Value() core.Doc { return n.plan.Value() } - -func (n *statusScanNode) Next() (bool, error) { - hasNext, err := n.plan.Next() - if err != nil || !hasNext { - return hasNext, err - } - - doc := n.plan.Value() - - hsKey := core.HeadStoreKey{ - DocKey: doc.GetKey(), - FieldId: core.COMPOSITE_NAMESPACE, - } - - headset := clock.NewHeadSet( - n.planner.txn.Headstore(), - hsKey, - ) - cids, _, err := headset.List(n.planner.ctx) - if err != nil { - return false, err - } - - b, err := n.planner.txn.DAGstore().Get(n.planner.ctx, cids[0]) - if err != nil { - return false, err - } - nd, err := dag.DecodeProtobuf(b.RawData()) - if err != nil { - return false, err - } - delta := &crdt.CompositeDAGDelta{} - h := &codec.CborHandle{} - dec := codec.NewDecoderBytes(nd.Data(), h) - err = dec.Decode(delta) - if err != nil { - return false, err - } - - n.documentMapping.SetFirstOfName(&doc, request.StatusFieldName, client.DocumentStatusToString[delta.Status]) - n.currentValue = doc - return true, nil -} - -func (n *statusScanNode) Merge() bool { return true } diff --git a/request/graphql/parser/mutation.go b/request/graphql/parser/mutation.go index 9bda126599..c388fa1533 100644 --- a/request/graphql/parser/mutation.go +++ b/request/graphql/parser/mutation.go @@ -26,7 +26,6 @@ var ( "create": request.CreateObjects, "update": request.UpdateObjects, "delete": request.DeleteObjects, - "purge": request.PurgeObjects, } ) diff --git a/request/graphql/parser/request.go b/request/graphql/parser/request.go index 6a64999216..326b64e948 100644 --- a/request/graphql/parser/request.go +++ b/request/graphql/parser/request.go @@ -169,10 +169,6 @@ func parseSelectFields( return nil, err } selections[i] = s - } else if node.Name.Value == request.StatusFieldName { - selections[i] = &request.StatusSelect{ - Field: *parseField(node), - } } else if node.SelectionSet == nil { // regular field selections[i] = parseField(node) } else { // sub type with extra fields diff --git a/tests/integration/mutation/one_to_many/delete/simple_test.go b/tests/integration/mutation/one_to_many/delete/simple_test.go index 634bef9933..1319f4e194 100644 --- a/tests/integration/mutation/one_to_many/delete/simple_test.go +++ b/tests/integration/mutation/one_to_many/delete/simple_test.go @@ -90,6 +90,7 @@ func TestDeletionOfADocumentUsingSingleKeyWithShowDeletedDocumentQuery(t *testin // published to be empty. Request: `query { Author(showDeleted: true) { + _status name age published { @@ -101,14 +102,10 @@ func TestDeletionOfADocumentUsingSingleKeyWithShowDeletedDocumentQuery(t *testin }`, Results: []map[string]any{ { - "name": "John", - "age": uint64(30), + "_status": "Active", + "name": "John", + "age": uint64(30), "published": []map[string]any{ - { - "_status": "Deleted", - "name": "John and the philosopher are stoned", - "rating": 9.9, - }, { "_status": "Active", "name": "John has a chamber of secrets", diff --git a/tests/integration/net/state/simple/peer/with_delete_test.go b/tests/integration/net/state/simple/peer/with_delete_test.go new file mode 100644 index 0000000000..622dd9ace0 --- /dev/null +++ b/tests/integration/net/state/simple/peer/with_delete_test.go @@ -0,0 +1,141 @@ +// Copyright 2022 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package peer_test + +import ( + "testing" + + testUtils "github.com/sourcenetwork/defradb/tests/integration" + "github.com/sourcenetwork/immutable" +) + +// The parent-child distinction in these tests is as much documentation and test +// of the test system as of production. See it as a santity check of sorts. +func TestP2PWithMultipleDocumentsSingleDelete(t *testing.T) { + test := testUtils.TestCase{ + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.SchemaUpdate{ + Schema: ` + type Users { + Name: String + Age: Int + } + `, + }, + testUtils.CreateDoc{ + // Create John on all nodes + Doc: `{ + "Name": "John", + "Age": 43 + }`, + }, + testUtils.CreateDoc{ + // Create Andy on all nodes + Doc: `{ + "Name": "Andy", + "Age": 74 + }`, + }, + testUtils.ConnectPeers{ + SourceNodeID: 0, + TargetNodeID: 1, + }, + testUtils.DeleteDoc{ + NodeID: immutable.Some(0), + DocID: 0, + }, + testUtils.WaitForSync{}, + testUtils.Request{ + Request: `query { + Users { + _status + Name + Age + } + }`, + Results: []map[string]any{ + { + "_status": "Active", + "Name": "Andy", + "Age": uint64(74), + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, []string{"Users"}, test) +} + +func TestP2PWithMultipleDocumentsSingleDeleteWithShowDeleted(t *testing.T) { + test := testUtils.TestCase{ + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.SchemaUpdate{ + Schema: ` + type Users { + Name: String + Age: Int + } + `, + }, + testUtils.CreateDoc{ + // Create John on all nodes + Doc: `{ + "Name": "John", + "Age": 43 + }`, + }, + testUtils.CreateDoc{ + // Create Andy on all nodes + Doc: `{ + "Name": "Andy", + "Age": 74 + }`, + }, + testUtils.ConnectPeers{ + SourceNodeID: 0, + TargetNodeID: 1, + }, + testUtils.DeleteDoc{ + NodeID: immutable.Some(0), + DocID: 0, + }, + testUtils.WaitForSync{}, + testUtils.Request{ + Request: `query { + Users(showDeleted: true) { + _status + Name + Age + } + }`, + Results: []map[string]any{ + { + "_status": "Active", + "Name": "Andy", + "Age": uint64(74), + }, + { + "_status": "Deleted", + "Name": "John", + "Age": uint64(43), + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, []string{"Users"}, test) +} diff --git a/tests/integration/p2p.go b/tests/integration/p2p.go index 13a7e37270..475cd383a7 100644 --- a/tests/integration/p2p.go +++ b/tests/integration/p2p.go @@ -150,6 +150,15 @@ func connectPeers( sourceToTargetEvents[waitIndex] += 1 } + case DeleteDoc: + // Updates to existing docs should always sync (no-sub required) + if action.NodeID.HasValue() && action.NodeID.Value() == cfg.TargetNodeID { + targetToSourceEvents[waitIndex] += 1 + } + if action.NodeID.HasValue() && action.NodeID.Value() == cfg.SourceNodeID { + sourceToTargetEvents[waitIndex] += 1 + } + case UpdateDoc: // Updates to existing docs should always sync (no-sub required) if action.NodeID.HasValue() && action.NodeID.Value() == cfg.TargetNodeID { @@ -241,6 +250,16 @@ func configureReplicator( currentdocID++ + case DeleteDoc: + if _, shouldSyncFromTarget := docIDsSyncedToSource[action.DocID]; shouldSyncFromTarget && + action.NodeID.HasValue() && action.NodeID.Value() == cfg.TargetNodeID { + targetToSourceEvents[waitIndex] += 1 + } + + if action.NodeID.HasValue() && action.NodeID.Value() == cfg.SourceNodeID { + sourceToTargetEvents[waitIndex] += 1 + } + case UpdateDoc: if _, shouldSyncFromTarget := docIDsSyncedToSource[action.DocID]; shouldSyncFromTarget && action.NodeID.HasValue() && action.NodeID.Value() == cfg.TargetNodeID { diff --git a/tests/integration/test_case.go b/tests/integration/test_case.go index f019a06141..7bfd301b80 100644 --- a/tests/integration/test_case.go +++ b/tests/integration/test_case.go @@ -94,6 +94,29 @@ type CreateDoc struct { ExpectedError string } +// DeleteDoc will attempt to delete the given document in the given collection +// using the collection api. +type DeleteDoc struct { + // NodeID may hold the ID (index) of a node to apply this create to. + // + // If a value is not provided the document will be created in all nodes. + NodeID immutable.Option[int] + + // The collection in which this document should be deleted. + CollectionID int + + // The index-identifier of the document within the collection. This is based on + // the order in which it was created, not the ordering of the document within the + // database. + DocID int + + // Any error expected from the action. Optional. + // + // String can be a partial, and the test will pass if an error is returned that + // contains this string. + ExpectedError string +} + // UpdateDoc will attempt to update the given document in the given collection // using the collection api. type UpdateDoc struct { diff --git a/tests/integration/utils2.go b/tests/integration/utils2.go index c5a9a9d709..c59ad26e06 100644 --- a/tests/integration/utils2.go +++ b/tests/integration/utils2.go @@ -360,6 +360,9 @@ func executeTestCase( case CreateDoc: documents = createDoc(ctx, t, testCase, collections, documents, action) + case DeleteDoc: + deleteDoc(ctx, t, testCase, collections, documents, action) + case UpdateDoc: updateDoc(ctx, t, testCase, nodes, collections, documents, action) @@ -774,6 +777,27 @@ func createDoc( return documents } +// deleteDoc deletes a document using the collection api and caches it in the +// given documents slice. +func deleteDoc( + ctx context.Context, + t *testing.T, + testCase TestCase, + nodeCollections [][]client.Collection, + documents [][]*client.Document, + action DeleteDoc, +) { + doc := documents[action.CollectionID][action.DocID] + + var expectedErrorRaised bool + for _, collections := range getNodeCollections(action.NodeID, nodeCollections) { + _, err := collections[action.CollectionID].DeleteWithKey(ctx, doc.Key()) + expectedErrorRaised = AssertError(t, testCase.Description, err, action.ExpectedError) + } + + assertExpectedErrorRaised(t, testCase.Description, action.ExpectedError, expectedErrorRaised) +} + // updateDoc updates a document using the collection api. func updateDoc( ctx context.Context,