From 36b6cd62fff0d7bad1506c24bb0f650f1467f74c Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Tue, 20 Feb 2024 10:38:49 -0500 Subject: [PATCH 1/7] Add Lens support for branching schema --- lens/history.go | 155 ++++++++---------- lens/lens.go | 5 +- .../query/with_p2p_schema_branch_test.go | 138 ++++++++++++++++ .../query/with_schema_branch_test.go | 107 ++++++++++++ 4 files changed, 311 insertions(+), 94 deletions(-) create mode 100644 tests/integration/schema/migrations/query/with_p2p_schema_branch_test.go create mode 100644 tests/integration/schema/migrations/query/with_schema_branch_test.go diff --git a/lens/history.go b/lens/history.go index a7a5ee57d8..23f2132f66 100644 --- a/lens/history.go +++ b/lens/history.go @@ -36,10 +36,8 @@ type schemaHistoryLink struct { } // targetedSchemaHistoryLink represents an item in a particular schema's history, it -// links to the previous and next version items if they exist. -// -// It also contains a vector which describes the distance and direction to the -// target schema version (given as an input param on construction). +// links to the previous and next version items if they exist and are on the path to +// the target schema version. type targetedSchemaHistoryLink struct { // The collection as this point in history. collection *client.CollectionDescription @@ -51,13 +49,6 @@ type targetedSchemaHistoryLink struct { // The link to the previous schema version, if there is // one (for the initial schema version this will be None). previous immutable.Option[*targetedSchemaHistoryLink] - - // The distance and direction from this history item to the target. - // - // A zero value indicates that this is the target item. A positive value - // indicates that the target is more recent. A negative value indicates - // that the target predates this history item. - targetVector int } // getTargetedSchemaHistory returns the history of the schema of the given id, relative @@ -76,99 +67,78 @@ func getTargetedSchemaHistory( return nil, err } - result := map[schemaVersionID]*targetedSchemaHistoryLink{} - - for _, item := range history { - result[item.collection.SchemaVersionID] = &targetedSchemaHistoryLink{ - collection: item.collection, - } + targetHistoryItem, ok := history[targetSchemaVersionID] + if !ok { + // If the target schema version is unknown then there are possible no migrations + // that we can do. + return nil, nil } - for _, item := range result { - schemaHistoryLink := history[item.collection.ID] - nextHistoryItems := schemaHistoryLink.next - if len(nextHistoryItems) == 0 { - continue - } + result := map[schemaVersionID]*targetedSchemaHistoryLink{} - // WARNING: This line assumes that each collection can only have a single source, and so - // just takes the first item. If/when collections can have multiple sources we will need to change - // this slightly. - nextItem := result[nextHistoryItems[0].collection.SchemaVersionID] - item.next = immutable.Some(nextItem) - nextItem.previous = immutable.Some(item) + targetLink := &targetedSchemaHistoryLink{ + collection: targetHistoryItem.collection, } + result[targetLink.collection.SchemaVersionID] = targetLink - orphanSchemaVersions := map[string]struct{}{} + linkForwards(targetLink, targetHistoryItem, result) + linkBackwards(targetLink, targetHistoryItem, result) - for schemaVersion, item := range result { - if item.collection.SchemaVersionID == targetSchemaVersionID { - continue - } - if item.targetVector != 0 { + return result, nil +} + +// linkForwards traverses and links the history forwards from the given starting point. +// +// Forward schema versions found will in turn be linked both forwards and backwards, allowing +// schema branches to be correctly mapped to the target schema version. +func linkForwards( + currentLink *targetedSchemaHistoryLink, + currentHistoryItem *schemaHistoryLink, + result map[schemaVersionID]*targetedSchemaHistoryLink, +) { + for _, nextHistoryItem := range currentHistoryItem.next { + if _, ok := result[nextHistoryItem.collection.SchemaVersionID]; ok { + // As the history forms a DAG, this should only ever happen when + // iterating through the item we were at immediately before the current. continue } - distanceTravelled := 0 - currentItem := item - wasFound := false - for { - if !currentItem.next.HasValue() { - break - } - - currentItem = currentItem.next.Value() - distanceTravelled++ - if currentItem.targetVector != 0 { - distanceTravelled += currentItem.targetVector - wasFound = true - break - } - if currentItem.collection.SchemaVersionID == targetSchemaVersionID { - wasFound = true - break - } + nextLink := &targetedSchemaHistoryLink{ + collection: nextHistoryItem.collection, + previous: immutable.Some(currentLink), } + result[nextLink.collection.SchemaVersionID] = nextLink - if !wasFound { - // The target was not found going up the chain, try looking back. - // This is important for downgrading schema versions. - for { - if !currentItem.previous.HasValue() { - break - } - - currentItem = currentItem.previous.Value() - distanceTravelled-- - if currentItem.targetVector != 0 { - distanceTravelled += currentItem.targetVector - wasFound = true - break - } - if currentItem.collection.SchemaVersionID == targetSchemaVersionID { - wasFound = true - break - } - } - } + linkForwards(nextLink, nextHistoryItem, result) + linkBackwards(nextLink, nextHistoryItem, result) + } +} - if !wasFound { - // This may happen if users define schema migrations to unknown schema versions - // with no migration path to known schema versions, esentially creating orphan - // migrations. These may become linked later and should remain persisted in the - // database, but we can drop them from the history here/now. - orphanSchemaVersions[schemaVersion] = struct{}{} +// linkBackwards traverses and links the history backwards from the given starting point. +// +// Forward schema versions found will in turn be linked both forwards and backwards, allowing +// schema branches to be correctly mapped to the target schema version. +func linkBackwards( + currentLink *targetedSchemaHistoryLink, + currentHistoryItem *schemaHistoryLink, + result map[schemaVersionID]*targetedSchemaHistoryLink, +) { + for _, prevHistoryItem := range currentHistoryItem.previous { + if _, ok := result[prevHistoryItem.collection.SchemaVersionID]; ok { + // As the history forms a DAG, this should only ever happen when + // iterating through the item we were at immediately before the current. continue } - item.targetVector = distanceTravelled - } + prevLink := &targetedSchemaHistoryLink{ + collection: prevHistoryItem.collection, + next: immutable.Some(currentLink), + } + result[prevLink.collection.SchemaVersionID] = prevLink - for schemaVersion := range orphanSchemaVersions { - delete(result, schemaVersion) + linkForwards(prevLink, prevHistoryItem, result) + linkBackwards(prevLink, prevHistoryItem, result) } - - return result, nil } // getSchemaHistory returns the history of the schema of the given id as linked list @@ -180,25 +150,28 @@ func getSchemaHistory( ctx context.Context, txn datastore.Txn, schemaRoot string, -) (map[collectionID]*schemaHistoryLink, error) { +) (map[schemaVersionID]*schemaHistoryLink, error) { cols, err := description.GetCollectionsBySchemaRoot(ctx, txn, schemaRoot) if err != nil { return nil, err } - history := map[collectionID]*schemaHistoryLink{} + history := map[schemaVersionID]*schemaHistoryLink{} + schemaVersionsByColID := map[uint32]schemaVersionID{} for _, c := range cols { col := c // Convert the temporary types to the cleaner return type: - history[col.ID] = &schemaHistoryLink{ + history[col.SchemaVersionID] = &schemaHistoryLink{ collection: &col, } + schemaVersionsByColID[col.ID] = col.SchemaVersionID } for _, historyItem := range history { for _, source := range historyItem.collection.CollectionSources() { - src := history[source.SourceCollectionID] + srcSchemaVersion := schemaVersionsByColID[source.SourceCollectionID] + src := history[srcSchemaVersion] historyItem.previous = append( historyItem.next, src, diff --git a/lens/lens.go b/lens/lens.go index 4e700d7324..3e12e3cfd4 100644 --- a/lens/lens.go +++ b/lens/lens.go @@ -19,7 +19,6 @@ import ( ) type schemaVersionID = string -type collectionID = uint32 // LensDoc represents a document that will be sent to/from a Lens. type LensDoc = map[string]any @@ -178,7 +177,7 @@ func (l *lens) Next() (bool, error) { break } - if historyLocation.targetVector > 0 { + if historyLocation.next.HasValue() { // Aquire a lens migration from the registery, using the junctionPipe as its source. // The new pipeHead will then be connected as a source to the next migration-stage on // the next loop. @@ -188,7 +187,7 @@ func (l *lens) Next() (bool, error) { } historyLocation = historyLocation.next.Value() - } else { + } else if historyLocation.previous.HasValue() { // Aquire a lens migration from the registery, using the junctionPipe as its source. // The new pipeHead will then be connected as a source to the next migration-stage on // the next loop. diff --git a/tests/integration/schema/migrations/query/with_p2p_schema_branch_test.go b/tests/integration/schema/migrations/query/with_p2p_schema_branch_test.go new file mode 100644 index 0000000000..74bfce2323 --- /dev/null +++ b/tests/integration/schema/migrations/query/with_p2p_schema_branch_test.go @@ -0,0 +1,138 @@ +// Copyright 2023 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package query + +import ( + "testing" + + "github.com/lens-vm/lens/host-go/config/model" + "github.com/sourcenetwork/immutable" + + "github.com/sourcenetwork/defradb/client" + testUtils "github.com/sourcenetwork/defradb/tests/integration" + "github.com/sourcenetwork/defradb/tests/lenses" +) + +func TestSchemaMigrationQueryWithP2PReplicatedDocOnOtherSchemaBranch(t *testing.T) { + test := testUtils.TestCase{ + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.SchemaUpdate{ + Schema: ` + type Users { + name: String + verified: Boolean + } + `, + }, + testUtils.SchemaPatch{ + // Patch first node only + NodeID: immutable.Some(0), + Patch: ` + [ + { "op": "add", "path": "/Users/Fields/-", "value": {"Name": "email", "Kind": "String"} } + ] + `, + }, + testUtils.ConfigureMigration{ + // Register the migration on both nodes. + LensConfig: client.LensConfig{ + SourceSchemaVersionID: "bafkreihax57fohcdupqr2l4heoqxdsiggjfeaubr44tgrz4xqdgvnid4xy", + DestinationSchemaVersionID: "bafkreifer354qmdrwdtae5n3k7sbl2oauis3mz24fk46p3otub7ojznobq", + Lens: model.Lens{ + Lenses: []model.LensModule{ + { + Path: lenses.SetDefaultModulePath, + Arguments: map[string]any{ + "dst": "name", + "value": "Fred", + }, + }, + }, + }, + }, + }, + testUtils.SchemaPatch{ + // Patch second node with different patch + NodeID: immutable.Some(1), + Patch: ` + [ + { "op": "add", "path": "/Users/Fields/-", "value": {"Name": "phone", "Kind": 11} } + ] + `, + Lens: immutable.Some(model.Lens{ + Lenses: []model.LensModule{ + { + Path: lenses.SetDefaultModulePath, + Arguments: map[string]any{ + "dst": "phone", + "value": "1234567890", + }, + }, + }, + }), + }, + testUtils.ConfigureReplicator{ + SourceNodeID: 0, + TargetNodeID: 1, + }, + testUtils.CreateDoc{ + // Create John on the first (source) node only, and allow the value to sync + NodeID: immutable.Some(0), + Doc: `{ + "name": "John", + "verified": true + }`, + }, + testUtils.WaitForSync{}, + testUtils.Request{ + // Node 0 should yield results as they were defined, as the newer schema version is + // unknown to this node. + NodeID: immutable.Some(0), + Request: `query { + Users { + name + } + }`, + Results: []map[string]any{ + { + "name": "John", + "verified": true, + }, + }, + }, + testUtils.Request{ + // Node 1 should yield results migrated down to schema version 1, then up to schema version 3. + NodeID: immutable.Some(1), + Request: ` + query { + Users { + name + phone + } + } + `, + Results: []map[string]any{ + { + // name has been cleared by the inverse of the migration from version 1 to 2 + "name": nil, + // phone has been set by the migration from version 1 to 3 + "phone": "1234567890", + "verified": true, + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} diff --git a/tests/integration/schema/migrations/query/with_schema_branch_test.go b/tests/integration/schema/migrations/query/with_schema_branch_test.go new file mode 100644 index 0000000000..fe882944ee --- /dev/null +++ b/tests/integration/schema/migrations/query/with_schema_branch_test.go @@ -0,0 +1,107 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package query + +import ( + "testing" + + "github.com/lens-vm/lens/host-go/config/model" + "github.com/sourcenetwork/immutable" + + testUtils "github.com/sourcenetwork/defradb/tests/integration" + "github.com/sourcenetwork/defradb/tests/lenses" +) + +func TestSchemaMigrationQuery_WithBranchingSchema(t *testing.T) { + schemaVersion1ID := "bafkreiht46o4lakri2py2zw57ed3pdeib6ud6ojlsomgjlrgwh53wl3q4a" + + test := testUtils.TestCase{ + Description: "Test schema update, with branching schema migrations", + Actions: []any{ + testUtils.SchemaUpdate{ + Schema: ` + type Users { + name: String + } + `, + }, + testUtils.SchemaPatch{ + SetAsDefaultVersion: immutable.Some(true), + Patch: ` + [ + { "op": "add", "path": "/Users/Fields/-", "value": {"Name": "email", "Kind": 11} } + ] + `, + Lens: immutable.Some(model.Lens{ + Lenses: []model.LensModule{ + { + Path: lenses.SetDefaultModulePath, + Arguments: map[string]any{ + "dst": "name", + "value": "Fred", + }, + }, + }, + }), + }, + testUtils.CreateDoc{ + // Create a document on the second schema version, with an email field value + Doc: `{ + "name": "John", + "email": "john@source.hub" + }`, + }, + testUtils.SetActiveSchemaVersion{ + // Set the active schema version back to the first + SchemaVersionID: schemaVersion1ID, + }, + testUtils.SchemaPatch{ + // The third schema version will be set as the active version, going from version 1 to 3 + SetAsDefaultVersion: immutable.Some(true), + Patch: ` + [ + { "op": "add", "path": "/Users/Fields/-", "value": {"Name": "phone", "Kind": 11} } + ] + `, + Lens: immutable.Some(model.Lens{ + Lenses: []model.LensModule{ + { + Path: lenses.SetDefaultModulePath, + Arguments: map[string]any{ + "dst": "phone", + "value": "1234567890", + }, + }, + }, + }), + }, + testUtils.Request{ + Request: ` + query { + Users { + name + phone + } + } + `, + Results: []map[string]any{ + { + // name has been cleared by the inverse of the migration from version 1 to 2 + "name": nil, + // phone has been set by the migration from version 1 to 3 + "phone": "1234567890", + }, + }, + }, + }, + } + testUtils.ExecuteTestCase(t, test) +} From e054b00efbe61e4ce88f69113042f2075a155c90 Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Tue, 19 Mar 2024 18:13:35 -0400 Subject: [PATCH 2/7] PR FIXUP - Fix comment wording --- lens/history.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lens/history.go b/lens/history.go index 23f2132f66..00196288ed 100644 --- a/lens/history.go +++ b/lens/history.go @@ -69,7 +69,7 @@ func getTargetedSchemaHistory( targetHistoryItem, ok := history[targetSchemaVersionID] if !ok { - // If the target schema version is unknown then there are possible no migrations + // If the target schema version is unknown then there are no possible migrations // that we can do. return nil, nil } From ecedd00fccfc87a15892a04a49c7bbe28e9fd98f Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Tue, 19 Mar 2024 18:18:45 -0400 Subject: [PATCH 3/7] PR FIXUP - Remove debugs statement I think I accidentally commited this in my last PR --- tests/integration/utils2.go | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/integration/utils2.go b/tests/integration/utils2.go index 830a76ba2d..930b429119 100644 --- a/tests/integration/utils2.go +++ b/tests/integration/utils2.go @@ -746,7 +746,6 @@ func refreshCollections( for _, collection := range allCollections { if collection.Name().Value() == collectionName { s.collections[nodeID][i] = collection - println(collection.Description().SchemaVersionID) break } } From 07c31673390f737cfd5cb551317316c6c92b210f Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Wed, 20 Mar 2024 10:46:24 -0400 Subject: [PATCH 4/7] PR FIXUP - Add Go 1.22 todo --- lens/history.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lens/history.go b/lens/history.go index 00196288ed..6455bbf44b 100644 --- a/lens/history.go +++ b/lens/history.go @@ -160,7 +160,10 @@ func getSchemaHistory( schemaVersionsByColID := map[uint32]schemaVersionID{} for _, c := range cols { + // Todo - this `col := c` can be removed with Go 1.22: + // https://github.com/sourcenetwork/defradb/issues/2431 col := c + // Convert the temporary types to the cleaner return type: history[col.SchemaVersionID] = &schemaHistoryLink{ collection: &col, From 1376acf0381d90f124948c8f2b977596f7e461d3 Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Wed, 20 Mar 2024 16:21:48 -0400 Subject: [PATCH 5/7] PR FIXUP - Delete test comment --- .../schema/migrations/query/with_p2p_schema_branch_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/integration/schema/migrations/query/with_p2p_schema_branch_test.go b/tests/integration/schema/migrations/query/with_p2p_schema_branch_test.go index 74bfce2323..76f47db7e0 100644 --- a/tests/integration/schema/migrations/query/with_p2p_schema_branch_test.go +++ b/tests/integration/schema/migrations/query/with_p2p_schema_branch_test.go @@ -95,8 +95,6 @@ func TestSchemaMigrationQueryWithP2PReplicatedDocOnOtherSchemaBranch(t *testing. }, testUtils.WaitForSync{}, testUtils.Request{ - // Node 0 should yield results as they were defined, as the newer schema version is - // unknown to this node. NodeID: immutable.Some(0), Request: `query { Users { From 915761b07808533d864e6b6e61f7c1a730dd832f Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Wed, 20 Mar 2024 16:22:52 -0400 Subject: [PATCH 6/7] PR FIXUP - Fix test query --- .../schema/migrations/query/with_p2p_schema_branch_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/schema/migrations/query/with_p2p_schema_branch_test.go b/tests/integration/schema/migrations/query/with_p2p_schema_branch_test.go index 76f47db7e0..9aba1698e1 100644 --- a/tests/integration/schema/migrations/query/with_p2p_schema_branch_test.go +++ b/tests/integration/schema/migrations/query/with_p2p_schema_branch_test.go @@ -99,6 +99,7 @@ func TestSchemaMigrationQueryWithP2PReplicatedDocOnOtherSchemaBranch(t *testing. Request: `query { Users { name + verified } }`, Results: []map[string]any{ From 9270daf72b9db7b7e7376c7784f058664cd9f639 Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Wed, 20 Mar 2024 16:46:59 -0400 Subject: [PATCH 7/7] PR FIXUP - Rename schema history to collection history --- lens/fetcher.go | 2 +- lens/history.go | 82 ++++++++++++++++++++++++------------------------- lens/lens.go | 18 +++++------ 3 files changed, 50 insertions(+), 52 deletions(-) diff --git a/lens/fetcher.go b/lens/fetcher.go index 1e093f3966..90c80c78fb 100644 --- a/lens/fetcher.go +++ b/lens/fetcher.go @@ -76,7 +76,7 @@ func (f *lensedFetcher) Init( f.fieldDescriptionsByName[defFields[i].Name] = defFields[i] } - history, err := getTargetedSchemaHistory(ctx, txn, f.col.Schema().Root, f.col.Schema().VersionID) + history, err := getTargetedCollectionHistory(ctx, txn, f.col.Schema().Root, f.col.Schema().VersionID) if err != nil { return err } diff --git a/lens/history.go b/lens/history.go index 6455bbf44b..3bf1a28ac8 100644 --- a/lens/history.go +++ b/lens/history.go @@ -20,49 +20,48 @@ import ( "github.com/sourcenetwork/defradb/db/description" ) -// schemaHistoryLink represents an item in a particular schema's history, it +// collectionHistoryLink represents an item in a particular collection's schema history, it // links to the previous and next version items if they exist. -type schemaHistoryLink struct { +type collectionHistoryLink struct { // The collection as this point in history. collection *client.CollectionDescription - // The history link to the next schema versions, if there are some + // The history link to the next collection versions, if there are some // (for the most recent schema version this will be empty). - next []*schemaHistoryLink + next []*collectionHistoryLink - // The history link to the previous schema versions, if there are - // some (for the initial schema version this will be empty). - previous []*schemaHistoryLink + // The history link to the previous collection versions, if there are + // some (for the initial collection version this will be empty). + previous []*collectionHistoryLink } -// targetedSchemaHistoryLink represents an item in a particular schema's history, it +// targetedCollectionHistoryLink represents an item in a particular collection's schema history, it // links to the previous and next version items if they exist and are on the path to // the target schema version. -type targetedSchemaHistoryLink struct { +type targetedCollectionHistoryLink struct { // The collection as this point in history. collection *client.CollectionDescription - // The link to next schema version, if there is one - // (for the most recent schema version this will be None). - next immutable.Option[*targetedSchemaHistoryLink] + // The link to next collection version, if there is one + // (for the most recent collection version this will be None). + next immutable.Option[*targetedCollectionHistoryLink] - // The link to the previous schema version, if there is - // one (for the initial schema version this will be None). - previous immutable.Option[*targetedSchemaHistoryLink] + // The link to the previous collection version, if there is + // one (for the initial collection version this will be None). + previous immutable.Option[*targetedCollectionHistoryLink] } -// getTargetedSchemaHistory returns the history of the schema of the given id, relative +// getTargetedCollectionHistory returns the history of the schema of the given id, relative // to the given target schema version id. // -// This includes any history items that are only known via registered -// schema migrations. -func getTargetedSchemaHistory( +// This includes any history items that are only known via registered schema migrations. +func getTargetedCollectionHistory( ctx context.Context, txn datastore.Txn, schemaRoot string, targetSchemaVersionID string, -) (map[schemaVersionID]*targetedSchemaHistoryLink, error) { - history, err := getSchemaHistory(ctx, txn, schemaRoot) +) (map[schemaVersionID]*targetedCollectionHistoryLink, error) { + history, err := getCollectionHistory(ctx, txn, schemaRoot) if err != nil { return nil, err } @@ -74,9 +73,9 @@ func getTargetedSchemaHistory( return nil, nil } - result := map[schemaVersionID]*targetedSchemaHistoryLink{} + result := map[schemaVersionID]*targetedCollectionHistoryLink{} - targetLink := &targetedSchemaHistoryLink{ + targetLink := &targetedCollectionHistoryLink{ collection: targetHistoryItem.collection, } result[targetLink.collection.SchemaVersionID] = targetLink @@ -89,12 +88,12 @@ func getTargetedSchemaHistory( // linkForwards traverses and links the history forwards from the given starting point. // -// Forward schema versions found will in turn be linked both forwards and backwards, allowing -// schema branches to be correctly mapped to the target schema version. +// Forward collection versions found will in turn be linked both forwards and backwards, allowing +// branches to be correctly mapped to the target schema version. func linkForwards( - currentLink *targetedSchemaHistoryLink, - currentHistoryItem *schemaHistoryLink, - result map[schemaVersionID]*targetedSchemaHistoryLink, + currentLink *targetedCollectionHistoryLink, + currentHistoryItem *collectionHistoryLink, + result map[schemaVersionID]*targetedCollectionHistoryLink, ) { for _, nextHistoryItem := range currentHistoryItem.next { if _, ok := result[nextHistoryItem.collection.SchemaVersionID]; ok { @@ -103,7 +102,7 @@ func linkForwards( continue } - nextLink := &targetedSchemaHistoryLink{ + nextLink := &targetedCollectionHistoryLink{ collection: nextHistoryItem.collection, previous: immutable.Some(currentLink), } @@ -116,12 +115,12 @@ func linkForwards( // linkBackwards traverses and links the history backwards from the given starting point. // -// Forward schema versions found will in turn be linked both forwards and backwards, allowing -// schema branches to be correctly mapped to the target schema version. +// Backward collection versions found will in turn be linked both forwards and backwards, allowing +// branches to be correctly mapped to the target schema version. func linkBackwards( - currentLink *targetedSchemaHistoryLink, - currentHistoryItem *schemaHistoryLink, - result map[schemaVersionID]*targetedSchemaHistoryLink, + currentLink *targetedCollectionHistoryLink, + currentHistoryItem *collectionHistoryLink, + result map[schemaVersionID]*targetedCollectionHistoryLink, ) { for _, prevHistoryItem := range currentHistoryItem.previous { if _, ok := result[prevHistoryItem.collection.SchemaVersionID]; ok { @@ -130,7 +129,7 @@ func linkBackwards( continue } - prevLink := &targetedSchemaHistoryLink{ + prevLink := &targetedCollectionHistoryLink{ collection: prevHistoryItem.collection, next: immutable.Some(currentLink), } @@ -141,22 +140,21 @@ func linkBackwards( } } -// getSchemaHistory returns the history of the schema of the given id as linked list +// getCollectionHistory returns the history of the collection of the given root id as linked list // with each item mapped by schema version id. // -// This includes any history items that are only known via registered -// schema migrations. -func getSchemaHistory( +// This includes any history items that are only known via registered schema migrations. +func getCollectionHistory( ctx context.Context, txn datastore.Txn, schemaRoot string, -) (map[schemaVersionID]*schemaHistoryLink, error) { +) (map[schemaVersionID]*collectionHistoryLink, error) { cols, err := description.GetCollectionsBySchemaRoot(ctx, txn, schemaRoot) if err != nil { return nil, err } - history := map[schemaVersionID]*schemaHistoryLink{} + history := map[schemaVersionID]*collectionHistoryLink{} schemaVersionsByColID := map[uint32]schemaVersionID{} for _, c := range cols { @@ -165,7 +163,7 @@ func getSchemaHistory( col := c // Convert the temporary types to the cleaner return type: - history[col.SchemaVersionID] = &schemaHistoryLink{ + history[col.SchemaVersionID] = &collectionHistoryLink{ collection: &col, } schemaVersionsByColID[col.ID] = col.SchemaVersionID diff --git a/lens/lens.go b/lens/lens.go index 3e12e3cfd4..1a42bdf972 100644 --- a/lens/lens.go +++ b/lens/lens.go @@ -56,7 +56,7 @@ type lens struct { outputPipe enumerable.Concatenation[LensDoc] unknownVersionPipe enumerable.Queue[LensDoc] - schemaVersionHistory map[schemaVersionID]*targetedSchemaHistoryLink + collectionHistory map[schemaVersionID]*targetedCollectionHistoryLink source enumerable.Queue[lensInput] } @@ -67,18 +67,18 @@ func new( ctx context.Context, lensRegistry client.LensRegistry, targetSchemaVersionID schemaVersionID, - schemaVersionHistory map[schemaVersionID]*targetedSchemaHistoryLink, + collectionHistory map[schemaVersionID]*targetedCollectionHistoryLink, ) Lens { targetSource := enumerable.NewQueue[LensDoc]() outputPipe := enumerable.Concat[LensDoc](targetSource) return &lens{ - lensRegistry: lensRegistry, - ctx: ctx, - source: enumerable.NewQueue[lensInput](), - outputPipe: outputPipe, - unknownVersionPipe: targetSource, - schemaVersionHistory: schemaVersionHistory, + lensRegistry: lensRegistry, + ctx: ctx, + source: enumerable.NewQueue[lensInput](), + outputPipe: outputPipe, + unknownVersionPipe: targetSource, + collectionHistory: collectionHistory, lensInputPipesBySchemaVersionIDs: map[schemaVersionID]enumerable.Queue[LensDoc]{ targetSchemaVersionID: targetSource, }, @@ -136,7 +136,7 @@ func (l *lens) Next() (bool, error) { // up to the output via any intermediary pipes. inputPipe = p } else { - historyLocation, ok := l.schemaVersionHistory[doc.SchemaVersionID] + historyLocation, ok := l.collectionHistory[doc.SchemaVersionID] if !ok { // We may recieve documents of unknown schema versions, they should // still be fed through the pipe system in order to preserve order.