Skip to content

Commit

Permalink
Remove DatastoreKey-is-actually-headstore hacks
Browse files Browse the repository at this point in the history
This is the primary goal of the PR, as otherwise I'd have to extend the hacks even further in order to query collection commits.
  • Loading branch information
AndrewSisley committed Nov 5, 2024
1 parent 26d1d48 commit 6496449
Show file tree
Hide file tree
Showing 11 changed files with 134 additions and 125 deletions.
8 changes: 4 additions & 4 deletions internal/core/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ import (
// Span is a range of keys from [Start, End).
type Span struct {
// Start represents the starting key of the Span.
Start keys.DataStoreKey
Start keys.Walkable

// End represents the ending key of the Span.
End keys.DataStoreKey
End keys.Walkable
}

// NewSpan creates a new Span from the provided start and end keys.
func NewSpan(start, end keys.DataStoreKey) Span {
func NewSpan(start, end keys.Walkable) Span {
return Span{
Start: start,
End: end,
Expand Down Expand Up @@ -122,7 +122,7 @@ func (this Span) Compare(other Span) SpanComparisonResult {
return After
}

func isAdjacent(this keys.DataStoreKey, other keys.DataStoreKey) bool {
func isAdjacent(this keys.Walkable, other keys.Walkable) bool {
return len(this.ToString()) == len(other.ToString()) &&
(this.PrefixEnd().ToString() == other.ToString() ||
this.ToString() == other.PrefixEnd().ToString())
Expand Down
28 changes: 2 additions & 26 deletions internal/db/fetcher/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,17 @@ package fetcher

import (
"context"
"sort"
"strings"

"github.com/ipfs/go-cid"
dsq "github.com/ipfs/go-datastore/query"
"github.com/sourcenetwork/immutable"

"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/internal/core"
"github.com/sourcenetwork/defradb/internal/keys"
)

// HeadFetcher is a utility to incrementally fetch all the MerkleCRDT heads of a given doc/field.
type HeadFetcher struct {
spans []core.Span
fieldId immutable.Option[string]

kvIter dsq.Results
Expand All @@ -35,33 +31,13 @@ type HeadFetcher struct {
func (hf *HeadFetcher) Start(
ctx context.Context,
txn datastore.Txn,
spans []core.Span,
prefix keys.HeadStoreKey,
fieldId immutable.Option[string],
) error {
if len(spans) == 0 {
spans = []core.Span{
core.NewSpan(
keys.DataStoreKey{},
keys.DataStoreKey{}.PrefixEnd(),
),
}
}

if len(spans) > 1 {
// if we have multiple spans, we need to sort them by their start position
// so we can do a single iterative sweep
sort.Slice(spans, func(i, j int) bool {
// compare by strings if i < j.
// apply the '!= df.reverse' to reverse the sort
// if we need to
return (strings.Compare(spans[i].Start.ToString(), spans[j].Start.ToString()) < 0)
})
}
hf.spans = spans
hf.fieldId = fieldId

q := dsq.Query{
Prefix: hf.spans[0].Start.ToString(),
Prefix: prefix.ToString(),
Orders: []dsq.Order{dsq.OrderByKey{}},
}

Expand Down
63 changes: 28 additions & 35 deletions internal/db/fetcher/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,51 +18,44 @@ import (
)

const (
errFieldIdNotFound string = "unable to find SchemaFieldDescription for given FieldId"
errFailedToDecodeCIDForVFetcher string = "failed to decode CID for VersionedFetcher"
errFailedToSeek string = "seek failed"
errFailedToMergeState string = "failed merging state"
errVFetcherFailedToFindBlock string = "(version fetcher) failed to find block in blockstore"
errVFetcherFailedToGetBlock string = "(version fetcher) failed to get block in blockstore"
errVFetcherFailedToWriteBlock string = "(version fetcher) failed to write block to blockstore"
errVFetcherFailedToDecodeNode string = "(version fetcher) failed to decode protobuf"
errVFetcherFailedToGetDagLink string = "(version fetcher) failed to get node link from DAG"
errFailedToGetDagNode string = "failed to get DAG Node"
errMissingMapper string = "missing document mapper"
errInvalidInOperatorValue string = "invalid _in/_nin value"
errInvalidFilterOperator string = "invalid filter operator is provided"
errNotSupportedKindByIndex string = "kind is not supported by index"
errUnexpectedTypeValue string = "unexpected type value"
errFieldIdNotFound string = "unable to find SchemaFieldDescription for given FieldId"
errFailedToSeek string = "seek failed"
errFailedToMergeState string = "failed merging state"
errVFetcherFailedToFindBlock string = "(version fetcher) failed to find block in blockstore"
errVFetcherFailedToGetBlock string = "(version fetcher) failed to get block in blockstore"
errVFetcherFailedToWriteBlock string = "(version fetcher) failed to write block to blockstore"
errVFetcherFailedToDecodeNode string = "(version fetcher) failed to decode protobuf"
errVFetcherFailedToGetDagLink string = "(version fetcher) failed to get node link from DAG"
errFailedToGetDagNode string = "failed to get DAG Node"
errMissingMapper string = "missing document mapper"
errInvalidInOperatorValue string = "invalid _in/_nin value"
errInvalidFilterOperator string = "invalid filter operator is provided"
errNotSupportedKindByIndex string = "kind is not supported by index"
errUnexpectedTypeValue string = "unexpected type value"
)

var (
ErrFieldIdNotFound = errors.New(errFieldIdNotFound)
ErrFailedToDecodeCIDForVFetcher = errors.New(errFailedToDecodeCIDForVFetcher)
ErrFailedToSeek = errors.New(errFailedToSeek)
ErrFailedToMergeState = errors.New(errFailedToMergeState)
ErrVFetcherFailedToFindBlock = errors.New(errVFetcherFailedToFindBlock)
ErrVFetcherFailedToGetBlock = errors.New(errVFetcherFailedToGetBlock)
ErrVFetcherFailedToWriteBlock = errors.New(errVFetcherFailedToWriteBlock)
ErrVFetcherFailedToDecodeNode = errors.New(errVFetcherFailedToDecodeNode)
ErrVFetcherFailedToGetDagLink = errors.New(errVFetcherFailedToGetDagLink)
ErrFailedToGetDagNode = errors.New(errFailedToGetDagNode)
ErrMissingMapper = errors.New(errMissingMapper)
ErrSingleSpanOnly = errors.New("spans must contain only a single entry")
ErrInvalidInOperatorValue = errors.New(errInvalidInOperatorValue)
ErrInvalidFilterOperator = errors.New(errInvalidFilterOperator)
ErrUnexpectedTypeValue = errors.New(errUnexpectedTypeValue)
ErrFieldIdNotFound = errors.New(errFieldIdNotFound)
ErrFailedToSeek = errors.New(errFailedToSeek)
ErrFailedToMergeState = errors.New(errFailedToMergeState)
ErrVFetcherFailedToFindBlock = errors.New(errVFetcherFailedToFindBlock)
ErrVFetcherFailedToGetBlock = errors.New(errVFetcherFailedToGetBlock)
ErrVFetcherFailedToWriteBlock = errors.New(errVFetcherFailedToWriteBlock)
ErrVFetcherFailedToDecodeNode = errors.New(errVFetcherFailedToDecodeNode)
ErrVFetcherFailedToGetDagLink = errors.New(errVFetcherFailedToGetDagLink)
ErrFailedToGetDagNode = errors.New(errFailedToGetDagNode)
ErrMissingMapper = errors.New(errMissingMapper)
ErrSingleSpanOnly = errors.New("spans must contain only a single entry")
ErrInvalidInOperatorValue = errors.New(errInvalidInOperatorValue)
ErrInvalidFilterOperator = errors.New(errInvalidFilterOperator)
ErrUnexpectedTypeValue = errors.New(errUnexpectedTypeValue)
)

// NewErrFieldIdNotFound returns an error indicating that the given FieldId was not found.
func NewErrFieldIdNotFound(fieldId uint32) error {
return errors.New(errFieldIdNotFound, errors.NewKV("FieldId", fieldId))
}

// NewErrFailedToDecodeCIDForVFetcher returns an error indicating that the given CID could not be decoded.
func NewErrFailedToDecodeCIDForVFetcher(inner error) error {
return errors.Wrap(errFailedToDecodeCIDForVFetcher, inner)
}

// NewErrFailedToSeek returns an error indicating that the given target could not be seeked to.
func NewErrFailedToSeek(target any, inner error) error {
return errors.Wrap(errFailedToSeek, inner, errors.NewKV("Target", target))
Expand Down
15 changes: 12 additions & 3 deletions internal/db/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,11 +278,20 @@ func (df *DocumentFetcher) start(ctx context.Context, spans []core.Span, withDel
} else {
valueSpans := make([]core.Span, len(spans))
for i, span := range spans {
// We can only handle value keys, so here we ensure we only read value keys
if withDeleted {
valueSpans[i] = core.NewSpan(span.Start.WithDeletedFlag(), span.End.WithDeletedFlag())
// DocumentFetcher only ever recieves document keys
//nolint:forcetypeassert
valueSpans[i] = core.NewSpan(
span.Start.(keys.DataStoreKey).WithDeletedFlag(),
span.End.(keys.DataStoreKey).WithDeletedFlag(),
)
} else {
valueSpans[i] = core.NewSpan(span.Start.WithValueFlag(), span.End.WithValueFlag())
// DocumentFetcher only ever recieves document keys
//nolint:forcetypeassert
valueSpans[i] = core.NewSpan(
span.Start.(keys.DataStoreKey).WithValueFlag(),
span.End.(keys.DataStoreKey).WithValueFlag(),
)
}
}

Expand Down
40 changes: 14 additions & 26 deletions internal/db/fetcher/versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ type VersionedFetcher struct {
root datastore.Rootstore
store datastore.Txn

dsKey keys.DataStoreKey
version cid.Cid
dsKey keys.DataStoreKey

queuedCids *list.List

Expand Down Expand Up @@ -162,30 +161,25 @@ func (vf *VersionedFetcher) Start(ctx context.Context, spans ...core.Span) error
return ErrSingleSpanOnly
}

// For the VersionedFetcher, the spans needs to be in the format
// Span{Start: DocID, End: CID}
dk := spans[0].Start
cidRaw := spans[0].End
if dk.DocID == "" {
// VersionedFetcher only ever recieves a headstore key
//nolint:forcetypeassert
prefix := spans[0].Start.(keys.HeadStoreKey)
dk := prefix.DocID
cid := prefix.Cid
if dk == "" {
return client.NewErrUninitializeProperty("Spans", "DocID")
} else if cidRaw.DocID == "" { // todo: dont abuse DataStoreKey/Span like this!
} else if !cid.Defined() {
return client.NewErrUninitializeProperty("Spans", "CID")
}

// decode cidRaw from core.Key to cid.Cid
// need to remove '/' prefix from the core.Key

c, err := cid.Decode(cidRaw.DocID)
if err != nil {
return NewErrFailedToDecodeCIDForVFetcher(err)
}

vf.ctx = ctx
vf.dsKey = dk.WithCollectionRoot(vf.col.Description().RootID)
vf.version = c
vf.dsKey = keys.DataStoreKey{
CollectionRootID: vf.col.Description().RootID,
DocID: dk,
}

if err := vf.seekTo(vf.version); err != nil {
return NewErrFailedToSeek(c, err)
if err := vf.seekTo(cid); err != nil {
return NewErrFailedToSeek(cid, err)
}

return vf.DocumentFetcher.Start(ctx)
Expand Down Expand Up @@ -421,9 +415,3 @@ func (vf *VersionedFetcher) Close() error {

return vf.DocumentFetcher.Close()
}

// NewVersionedSpan creates a new VersionedSpan from a DataStoreKey and a version CID.
func NewVersionedSpan(dsKey keys.DataStoreKey, version cid.Cid) core.Span {
// Todo: Dont abuse DataStoreKey for version cid!
return core.NewSpan(dsKey, keys.DataStoreKey{DocID: version.String()})
}
4 changes: 2 additions & 2 deletions internal/keys/datastore_doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type DataStoreKey struct {
FieldID string
}

var _ Key = (*DataStoreKey)(nil)
var _ Walkable = (*DataStoreKey)(nil)

// Creates a new DataStoreKey from a string as best as it can,
// splitting the input using '/' as a field deliminator. It assumes
Expand Down Expand Up @@ -167,7 +167,7 @@ func (k DataStoreKey) ToPrimaryDataStoreKey() PrimaryDataStoreKey {
// PrefixEnd determines the end key given key as a prefix, that is the key that sorts precisely
// behind all keys starting with prefix: "1" is added to the final byte and the carry propagated.
// The special cases of nil and KeyMin always returns KeyMax.
func (k DataStoreKey) PrefixEnd() DataStoreKey {
func (k DataStoreKey) PrefixEnd() Walkable {
newKey := k

if k.FieldID != "" {
Expand Down
21 changes: 20 additions & 1 deletion internal/keys/headstore_doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type HeadStoreKey struct {
Cid cid.Cid
}

var _ Key = (*HeadStoreKey)(nil)
var _ Walkable = (*HeadStoreKey)(nil)

// Creates a new HeadStoreKey from a string as best as it can,
// splitting the input using '/' as a field deliminator. It assumes
Expand Down Expand Up @@ -92,3 +92,22 @@ func (k HeadStoreKey) Bytes() []byte {
func (k HeadStoreKey) ToDS() ds.Key {
return ds.NewKey(k.ToString())
}

func (k HeadStoreKey) PrefixEnd() Walkable {
newKey := k

if k.FieldID != "" {
newKey.FieldID = string(bytesPrefixEnd([]byte(k.FieldID)))
return newKey
}
if k.DocID != "" {
newKey.DocID = string(bytesPrefixEnd([]byte(k.DocID)))
return newKey
}
if k.Cid.Defined() {
newKey.Cid = cid.MustParse(bytesPrefixEnd(k.Cid.Bytes()))
return newKey
}

Check warning on line 110 in internal/keys/headstore_doc.go

View check run for this annotation

Codecov / codecov/patch

internal/keys/headstore_doc.go#L107-L110

Added lines #L107 - L110 were not covered by tests

return newKey

Check warning on line 112 in internal/keys/headstore_doc.go

View check run for this annotation

Codecov / codecov/patch

internal/keys/headstore_doc.go#L112

Added line #L112 was not covered by tests
}
17 changes: 17 additions & 0 deletions internal/keys/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,20 @@ type Key interface {
Bytes() []byte
ToDS() ds.Key
}

// Walkable represents a key in the database that can be 'walked along'
// by prefixing the end of the key.
type Walkable interface {
Key
PrefixEnd() Walkable
}

// PrettyPrint returns the human readable version of the given key.
func PrettyPrint(k Key) string {
switch typed := k.(type) {
case DataStoreKey:
return typed.PrettyPrint()
default:
return typed.ToString()

Check warning on line 37 in internal/keys/key.go

View check run for this annotation

Codecov / codecov/patch

internal/keys/key.go#L36-L37

Added lines #L36 - L37 were not covered by tests
}
}
Loading

0 comments on commit 6496449

Please sign in to comment.