Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Defra-Lens support for branching schema #2421

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lens/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (f *lensedFetcher) Init(
f.fieldDescriptionsByName[defFields[i].Name] = defFields[i]
}

history, err := getTargetedSchemaHistory(ctx, txn, f.col.Schema().Root, f.col.Schema().VersionID)
history, err := getTargetedCollectionHistory(ctx, txn, f.col.Schema().Root, f.col.Schema().VersionID)
if err != nil {
return err
}
Expand Down
206 changes: 90 additions & 116 deletions lens/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,185 +20,159 @@ import (
"github.com/sourcenetwork/defradb/db/description"
)

// schemaHistoryLink represents an item in a particular schema's history, it
// collectionHistoryLink represents an item in a particular collection's schema history, it
// links to the previous and next version items if they exist.
type schemaHistoryLink struct {
type collectionHistoryLink struct {
// The collection as this point in history.
collection *client.CollectionDescription

// The history link to the next schema versions, if there are some
// The history link to the next collection versions, if there are some
// (for the most recent schema version this will be empty).
next []*schemaHistoryLink
next []*collectionHistoryLink

// The history link to the previous schema versions, if there are
// some (for the initial schema version this will be empty).
previous []*schemaHistoryLink
// The history link to the previous collection versions, if there are
// some (for the initial collection version this will be empty).
previous []*collectionHistoryLink
}

// targetedSchemaHistoryLink represents an item in a particular schema's history, it
// links to the previous and next version items if they exist.
//
// It also contains a vector which describes the distance and direction to the
// target schema version (given as an input param on construction).
type targetedSchemaHistoryLink struct {
// targetedCollectionHistoryLink represents an item in a particular collection's schema history, it
// links to the previous and next version items if they exist and are on the path to
// the target schema version.
type targetedCollectionHistoryLink struct {
// The collection as this point in history.
collection *client.CollectionDescription

// The link to next schema version, if there is one
// (for the most recent schema version this will be None).
next immutable.Option[*targetedSchemaHistoryLink]

// The link to the previous schema version, if there is
// one (for the initial schema version this will be None).
previous immutable.Option[*targetedSchemaHistoryLink]
// The link to next collection version, if there is one
// (for the most recent collection version this will be None).
next immutable.Option[*targetedCollectionHistoryLink]

// The distance and direction from this history item to the target.
//
// A zero value indicates that this is the target item. A positive value
// indicates that the target is more recent. A negative value indicates
// that the target predates this history item.
targetVector int
// The link to the previous collection version, if there is
// one (for the initial collection version this will be None).
previous immutable.Option[*targetedCollectionHistoryLink]
}

// getTargetedSchemaHistory returns the history of the schema of the given id, relative
// getTargetedCollectionHistory returns the history of the schema of the given id, relative
// to the given target schema version id.
//
// This includes any history items that are only known via registered
// schema migrations.
func getTargetedSchemaHistory(
// This includes any history items that are only known via registered schema migrations.
func getTargetedCollectionHistory(
ctx context.Context,
txn datastore.Txn,
schemaRoot string,
targetSchemaVersionID string,
) (map[schemaVersionID]*targetedSchemaHistoryLink, error) {
history, err := getSchemaHistory(ctx, txn, schemaRoot)
) (map[schemaVersionID]*targetedCollectionHistoryLink, error) {
history, err := getCollectionHistory(ctx, txn, schemaRoot)
if err != nil {
return nil, err
}

result := map[schemaVersionID]*targetedSchemaHistoryLink{}

for _, item := range history {
result[item.collection.SchemaVersionID] = &targetedSchemaHistoryLink{
collection: item.collection,
}
targetHistoryItem, ok := history[targetSchemaVersionID]
if !ok {
// If the target schema version is unknown then there are no possible migrations
// that we can do.
return nil, nil
}

for _, item := range result {
schemaHistoryLink := history[item.collection.ID]
nextHistoryItems := schemaHistoryLink.next
if len(nextHistoryItems) == 0 {
continue
}
result := map[schemaVersionID]*targetedCollectionHistoryLink{}

// WARNING: This line assumes that each collection can only have a single source, and so
// just takes the first item. If/when collections can have multiple sources we will need to change
// this slightly.
nextItem := result[nextHistoryItems[0].collection.SchemaVersionID]
item.next = immutable.Some(nextItem)
nextItem.previous = immutable.Some(item)
targetLink := &targetedCollectionHistoryLink{
collection: targetHistoryItem.collection,
}
result[targetLink.collection.SchemaVersionID] = targetLink

orphanSchemaVersions := map[string]struct{}{}
linkForwards(targetLink, targetHistoryItem, result)
linkBackwards(targetLink, targetHistoryItem, result)

for schemaVersion, item := range result {
if item.collection.SchemaVersionID == targetSchemaVersionID {
continue
}
if item.targetVector != 0 {
return result, nil
}

// linkForwards traverses and links the history forwards from the given starting point.
//
// Forward collection versions found will in turn be linked both forwards and backwards, allowing
// branches to be correctly mapped to the target schema version.
func linkForwards(
currentLink *targetedCollectionHistoryLink,
currentHistoryItem *collectionHistoryLink,
result map[schemaVersionID]*targetedCollectionHistoryLink,
) {
for _, nextHistoryItem := range currentHistoryItem.next {
if _, ok := result[nextHistoryItem.collection.SchemaVersionID]; ok {
// As the history forms a DAG, this should only ever happen when
// iterating through the item we were at immediately before the current.
continue
}

distanceTravelled := 0
currentItem := item
wasFound := false
for {
if !currentItem.next.HasValue() {
break
}

currentItem = currentItem.next.Value()
distanceTravelled++
if currentItem.targetVector != 0 {
distanceTravelled += currentItem.targetVector
wasFound = true
break
}
if currentItem.collection.SchemaVersionID == targetSchemaVersionID {
wasFound = true
break
}
nextLink := &targetedCollectionHistoryLink{
collection: nextHistoryItem.collection,
previous: immutable.Some(currentLink),
}
result[nextLink.collection.SchemaVersionID] = nextLink

if !wasFound {
// The target was not found going up the chain, try looking back.
// This is important for downgrading schema versions.
for {
if !currentItem.previous.HasValue() {
break
}

currentItem = currentItem.previous.Value()
distanceTravelled--
if currentItem.targetVector != 0 {
distanceTravelled += currentItem.targetVector
wasFound = true
break
}
if currentItem.collection.SchemaVersionID == targetSchemaVersionID {
wasFound = true
break
}
}
}
linkForwards(nextLink, nextHistoryItem, result)
linkBackwards(nextLink, nextHistoryItem, result)
}
}

if !wasFound {
// This may happen if users define schema migrations to unknown schema versions
// with no migration path to known schema versions, esentially creating orphan
// migrations. These may become linked later and should remain persisted in the
// database, but we can drop them from the history here/now.
orphanSchemaVersions[schemaVersion] = struct{}{}
// linkBackwards traverses and links the history backwards from the given starting point.
//
// Backward collection versions found will in turn be linked both forwards and backwards, allowing
// branches to be correctly mapped to the target schema version.
func linkBackwards(
currentLink *targetedCollectionHistoryLink,
currentHistoryItem *collectionHistoryLink,
result map[schemaVersionID]*targetedCollectionHistoryLink,
) {
for _, prevHistoryItem := range currentHistoryItem.previous {
if _, ok := result[prevHistoryItem.collection.SchemaVersionID]; ok {
// As the history forms a DAG, this should only ever happen when
// iterating through the item we were at immediately before the current.
continue
}

item.targetVector = distanceTravelled
}
prevLink := &targetedCollectionHistoryLink{
collection: prevHistoryItem.collection,
next: immutable.Some(currentLink),
}
result[prevLink.collection.SchemaVersionID] = prevLink

for schemaVersion := range orphanSchemaVersions {
delete(result, schemaVersion)
linkForwards(prevLink, prevHistoryItem, result)
linkBackwards(prevLink, prevHistoryItem, result)
}

return result, nil
}

// getSchemaHistory returns the history of the schema of the given id as linked list
// getCollectionHistory returns the history of the collection of the given root id as linked list
// with each item mapped by schema version id.
//
// This includes any history items that are only known via registered
// schema migrations.
func getSchemaHistory(
// This includes any history items that are only known via registered schema migrations.
func getCollectionHistory(
ctx context.Context,
txn datastore.Txn,
schemaRoot string,
) (map[collectionID]*schemaHistoryLink, error) {
) (map[schemaVersionID]*collectionHistoryLink, error) {
cols, err := description.GetCollectionsBySchemaRoot(ctx, txn, schemaRoot)
if err != nil {
return nil, err
}

history := map[collectionID]*schemaHistoryLink{}
history := map[schemaVersionID]*collectionHistoryLink{}
schemaVersionsByColID := map[uint32]schemaVersionID{}

for _, c := range cols {
// Todo - this `col := c` can be removed with Go 1.22:
// https://github.com/sourcenetwork/defradb/issues/2431
col := c
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: (I know it's from a different PR but I might of missed it then) I don't think this necessary. Why did you decide to do this?

Copy link
Contributor Author

@AndrewSisley AndrewSisley Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea, might have made sense at one point, or it might have been a brain-fart.

I'll remove it shortly before merge.

  • Remove odd var copy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found out, it is because we want a pointer to it, and the way Go works means that without the copy col would always be the last col in cols.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reminding me of this. Could you add a TODO remove when Go 1.22 comment above it please.

Copy link
Contributor Author

@AndrewSisley AndrewSisley Mar 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah nice, glad that will fix it - will do

  • Add too comment


// Convert the temporary types to the cleaner return type:
history[col.ID] = &schemaHistoryLink{
history[col.SchemaVersionID] = &collectionHistoryLink{
collection: &col,
}
schemaVersionsByColID[col.ID] = col.SchemaVersionID
}

for _, historyItem := range history {
for _, source := range historyItem.collection.CollectionSources() {
src := history[source.SourceCollectionID]
srcSchemaVersion := schemaVersionsByColID[source.SourceCollectionID]
src := history[srcSchemaVersion]
historyItem.previous = append(
historyItem.next,
src,
Expand Down
23 changes: 11 additions & 12 deletions lens/lens.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
)

type schemaVersionID = string
type collectionID = uint32

// LensDoc represents a document that will be sent to/from a Lens.
type LensDoc = map[string]any
Expand Down Expand Up @@ -57,7 +56,7 @@ type lens struct {
outputPipe enumerable.Concatenation[LensDoc]
unknownVersionPipe enumerable.Queue[LensDoc]

schemaVersionHistory map[schemaVersionID]*targetedSchemaHistoryLink
collectionHistory map[schemaVersionID]*targetedCollectionHistoryLink

source enumerable.Queue[lensInput]
}
Expand All @@ -68,18 +67,18 @@ func new(
ctx context.Context,
lensRegistry client.LensRegistry,
targetSchemaVersionID schemaVersionID,
schemaVersionHistory map[schemaVersionID]*targetedSchemaHistoryLink,
collectionHistory map[schemaVersionID]*targetedCollectionHistoryLink,
) Lens {
targetSource := enumerable.NewQueue[LensDoc]()
outputPipe := enumerable.Concat[LensDoc](targetSource)

return &lens{
lensRegistry: lensRegistry,
ctx: ctx,
source: enumerable.NewQueue[lensInput](),
outputPipe: outputPipe,
unknownVersionPipe: targetSource,
schemaVersionHistory: schemaVersionHistory,
lensRegistry: lensRegistry,
ctx: ctx,
source: enumerable.NewQueue[lensInput](),
outputPipe: outputPipe,
unknownVersionPipe: targetSource,
collectionHistory: collectionHistory,
lensInputPipesBySchemaVersionIDs: map[schemaVersionID]enumerable.Queue[LensDoc]{
targetSchemaVersionID: targetSource,
},
Expand Down Expand Up @@ -137,7 +136,7 @@ func (l *lens) Next() (bool, error) {
// up to the output via any intermediary pipes.
inputPipe = p
} else {
historyLocation, ok := l.schemaVersionHistory[doc.SchemaVersionID]
historyLocation, ok := l.collectionHistory[doc.SchemaVersionID]
if !ok {
// We may recieve documents of unknown schema versions, they should
// still be fed through the pipe system in order to preserve order.
Expand Down Expand Up @@ -178,7 +177,7 @@ func (l *lens) Next() (bool, error) {
break
}

if historyLocation.targetVector > 0 {
if historyLocation.next.HasValue() {
// Aquire a lens migration from the registery, using the junctionPipe as its source.
// The new pipeHead will then be connected as a source to the next migration-stage on
// the next loop.
Expand All @@ -188,7 +187,7 @@ func (l *lens) Next() (bool, error) {
}

historyLocation = historyLocation.next.Value()
} else {
} else if historyLocation.previous.HasValue() {
// Aquire a lens migration from the registery, using the junctionPipe as its source.
// The new pipeHead will then be connected as a source to the next migration-stage on
// the next loop.
Expand Down
Loading
Loading