Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

On-the-fly migrations of persisted catalogs #21757

Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,38 +50,7 @@ public static void mutateSchemas(final Function<JsonNode, Boolean> matcher, fina
// additionalProperties
// else if oneof, allof, etc
// but that sounds really verbose for no real benefit
final List<JsonNode> subschemas = new ArrayList<>();

// array schemas
findSubschemas(subschemas, schema, "items");
findSubschemas(subschemas, schema, "additionalItems");
findSubschemas(subschemas, schema, "contains");

// object schemas
if (schema.hasNonNull("properties")) {
final ObjectNode propertiesNode = (ObjectNode) schema.get("properties");
final Iterator<Entry<String, JsonNode>> propertiesIterator = propertiesNode.fields();
while (propertiesIterator.hasNext()) {
final Entry<String, JsonNode> property = propertiesIterator.next();
subschemas.add(property.getValue());
}
}
if (schema.hasNonNull("patternProperties")) {
final ObjectNode propertiesNode = (ObjectNode) schema.get("patternProperties");
final Iterator<Entry<String, JsonNode>> propertiesIterator = propertiesNode.fields();
while (propertiesIterator.hasNext()) {
final Entry<String, JsonNode> property = propertiesIterator.next();
subschemas.add(property.getValue());
}
}
findSubschemas(subschemas, schema, "additionalProperties");

// combining restrictions - destinations have limited support for these, but we should handle the
// schemas correctly anyway
findSubschemas(subschemas, schema, "allOf");
findSubschemas(subschemas, schema, "oneOf");
findSubschemas(subschemas, schema, "anyOf");
findSubschemas(subschemas, schema, "not");
final List<JsonNode> subschemas = findSubschemas(schema);

// recurse into each subschema
for (final JsonNode subschema : subschemas) {
Expand All @@ -90,6 +59,49 @@ public static void mutateSchemas(final Function<JsonNode, Boolean> matcher, fina
}
}

/**
* Returns a list of all the direct children nodes to consider for subSchemas
*
* @param schema The JsonSchema node to start
* @return a list of the JsonNodes to be considered
*/
public static List<JsonNode> findSubschemas(final JsonNode schema) {
final List<JsonNode> subschemas = new ArrayList<>();

// array schemas
findSubschemas(subschemas, schema, "items");
findSubschemas(subschemas, schema, "additionalItems");
findSubschemas(subschemas, schema, "contains");

// object schemas
if (schema.hasNonNull("properties")) {
final ObjectNode propertiesNode = (ObjectNode) schema.get("properties");
final Iterator<Entry<String, JsonNode>> propertiesIterator = propertiesNode.fields();
while (propertiesIterator.hasNext()) {
final Entry<String, JsonNode> property = propertiesIterator.next();
subschemas.add(property.getValue());
}
}
if (schema.hasNonNull("patternProperties")) {
final ObjectNode propertiesNode = (ObjectNode) schema.get("patternProperties");
final Iterator<Entry<String, JsonNode>> propertiesIterator = propertiesNode.fields();
while (propertiesIterator.hasNext()) {
final Entry<String, JsonNode> property = propertiesIterator.next();
subschemas.add(property.getValue());
}
}
findSubschemas(subschemas, schema, "additionalProperties");

// combining restrictions - destinations have limited support for these, but we should handle the
// schemas correctly anyway
findSubschemas(subschemas, schema, "allOf");
findSubschemas(subschemas, schema, "oneOf");
findSubschemas(subschemas, schema, "anyOf");
findSubschemas(subschemas, schema, "not");

return subschemas;
}

/**
* If schema contains key, then grab the subschema(s) at schema[key] and add them to the subschemas
* list.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.protocol.migrations.v1;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.protocol.migrations.util.SchemaMigrations;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;

/**
* For the v0 to v1 migration, it appears that we are persisting some protocol objects without
* version. Until this gets addressed more properly, this class contains the helper functions used
* to handle this on the fly migration.
*
* Once persisted objects are versioned, this code should be deleted.
*/
public class CatalogMigrationV1Helper {

/**
* Performs an in-place migration of the schema from v0 to v1 if v0 data types are detected
*
* @param configuredAirbyteCatalog to migrate
*/
public static void upgradeSchemaIfNeeded(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
if (containsV0DataTypes(configuredAirbyteCatalog)) {
upgradeSchema(configuredAirbyteCatalog);
}
}

/**
* Performs an in-place migration of the schema from v0 to v1 if v0 data types are detected
*
* @param airbyteCatalog to migrate
*/
public static void upgradeSchemaIfNeeded(final AirbyteCatalog airbyteCatalog) {
if (containsV0DataTypes(airbyteCatalog)) {
upgradeSchema(airbyteCatalog);
}
}

/**
* Performs an in-place migration of the schema from v0 to v1
*
* @param configuredAirbyteCatalog to migrate
*/
private static void upgradeSchema(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
for (final var stream : configuredAirbyteCatalog.getStreams()) {
SchemaMigrationV1.upgradeSchema(stream.getStream().getJsonSchema());
}
}

/**
* Performs an in-place migration of the schema from v0 to v1
*
* @param airbyteCatalog to migrate
*/
private static void upgradeSchema(final AirbyteCatalog airbyteCatalog) {
for (final var stream : airbyteCatalog.getStreams()) {
SchemaMigrationV1.upgradeSchema(stream.getJsonSchema());
}
}

/**
* Returns true if catalog contains v0 data types
*/
private static boolean containsV0DataTypes(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
if (configuredAirbyteCatalog == null) {
return false;
}

return configuredAirbyteCatalog
.getStreams()
.stream().findFirst()
.map(ConfiguredAirbyteStream::getStream)
.map(CatalogMigrationV1Helper::streamContainsV0DataTypes)
.orElse(false);
}

/**
* Returns true if catalog contains v0 data types
*/
private static boolean containsV0DataTypes(final AirbyteCatalog airbyteCatalog) {
if (airbyteCatalog == null) {
return false;
}

return airbyteCatalog
.getStreams()
.stream().findFirst()
.map(CatalogMigrationV1Helper::streamContainsV0DataTypes)
.orElse(false);
}

private static boolean streamContainsV0DataTypes(final AirbyteStream airbyteStream) {
if (airbyteStream == null || airbyteStream.getJsonSchema() == null) {
return false;
}
return hasV0DataType(airbyteStream.getJsonSchema());
}

/**
* Performs of search of a v0 data type node, returns true at the first node found.
*/
private static boolean hasV0DataType(final JsonNode schema) {
if (SchemaMigrationV1.isPrimitiveTypeDeclaration(schema)) {
return true;
}

for (final JsonNode subSchema : SchemaMigrations.findSubschemas(schema)) {
if (hasV0DataType(subSchema)) {
return true;
}
}
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static void downgradeSchema(final JsonNode schema) {
* Detects any schema that looks like a primitive type declaration, e.g.: { "type": "string" } or {
* "type": ["string", "object"] }
*/
private static boolean isPrimitiveTypeDeclaration(final JsonNode schema) {
static boolean isPrimitiveTypeDeclaration(final JsonNode schema) {
if (!schema.isObject() || !schema.hasNonNull(TYPE_KEY)) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@
import io.airbyte.commons.version.Version;
import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.ActorType;
import io.airbyte.config.Configs;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.NormalizationDestinationDefinitionConfig;
import io.airbyte.config.ResourceRequirements;
Expand Down Expand Up @@ -79,6 +77,7 @@ class DestinationDefinitionsHandlerTest {
private AirbyteGithubStore githubStore;
private DestinationHandler destinationHandler;
private UUID workspaceId;
private AirbyteProtocolVersionRange protocolVersionRange;

@SuppressWarnings("unchecked")
@BeforeEach
Expand All @@ -91,14 +90,15 @@ void setUp() {
githubStore = mock(AirbyteGithubStore.class);
destinationHandler = mock(DestinationHandler.class);
workspaceId = UUID.randomUUID();
protocolVersionRange = new AirbyteProtocolVersionRange(new Version("0.0.0"), new Version("0.3.0"));

destinationDefinitionsHandler = new DestinationDefinitionsHandler(
configRepository,
uuidSupplier,
schedulerSynchronousClient,
githubStore,
destinationHandler,
new AirbyteProtocolVersionRange(new Version("0.0.0"), new Version("0.3.0")));
protocolVersionRange);
}

private StandardDestinationDefinition generateDestinationDefinition() {
Expand Down Expand Up @@ -540,9 +540,6 @@ void testUpdateDestination() throws ConfigNotFoundException, IOException, JsonVa
verify(schedulerSynchronousClient).createGetSpecJob(newImageName, false);
verify(configRepository).writeStandardDestinationDefinition(updatedDestination);

final Configs configs = new EnvConfigs();
final AirbyteProtocolVersionRange protocolVersionRange =
new AirbyteProtocolVersionRange(configs.getAirbyteProtocolVersionMin(), configs.getAirbyteProtocolVersionMax());
verify(configRepository).clearUnsupportedProtocolVersionFlag(updatedDestination.getDestinationDefinitionId(), ActorType.DESTINATION,
protocolVersionRange);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@
import io.airbyte.commons.version.Version;
import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.ActorType;
import io.airbyte.config.Configs;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.StandardSourceDefinition;
Expand Down Expand Up @@ -489,9 +487,6 @@ void testUpdateSourceDefinition() throws ConfigNotFoundException, IOException, J
verify(schedulerSynchronousClient).createGetSpecJob(newImageName, false);
verify(configRepository).writeStandardSourceDefinition(updatedSource);

final Configs configs = new EnvConfigs();
final AirbyteProtocolVersionRange protocolVersionRange =
new AirbyteProtocolVersionRange(configs.getAirbyteProtocolVersionMin(), configs.getAirbyteProtocolVersionMax());
verify(configRepository).clearUnsupportedProtocolVersionFlag(updatedSource.getSourceDefinitionId(), ActorType.SOURCE, protocolVersionRange);
}

Expand Down
1 change: 1 addition & 0 deletions airbyte-config/config-persistence/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ configurations.all {
dependencies {
implementation project(':airbyte-commons')
implementation project(':airbyte-commons-docker')
implementation project(':airbyte-commons-protocol')
implementation project(':airbyte-config:config-models')
implementation project(':airbyte-db:db-lib')
implementation project(':airbyte-db:jooq')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1184,8 +1184,10 @@ private Map<UUID, AirbyteCatalog> findCatalogByHash(final String catalogHash, fi

final Map<UUID, AirbyteCatalog> result = new HashMap<>();
for (final Record record : records) {
final AirbyteCatalog catalog = Jsons.deserialize(
record.get(ACTOR_CATALOG.CATALOG).toString(), AirbyteCatalog.class);
// We do not apply the on-the-fly migration here because the only caller is getOrInsertActorCatalog
// which is using this to figure out if the catalog has already been inserted. Migrating on the fly
// here will cause us to add a duplicate each time we check for existence of a catalog.
Comment on lines +1209 to +1211
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems risky, granted this is a private method so there is less risk, but I wonder if this code should be moved to the getOrInsertActorCatalog method to prevent any future issues from occurring.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am open to inlining this in getOrInsertActorCatalog. One extra note is that this behavior (the insert only inserts when it's actually different) is currently tested.

final AirbyteCatalog catalog = Jsons.deserialize(record.get(ACTOR_CATALOG.CATALOG).toString(), AirbyteCatalog.class);
result.put(record.get(ACTOR_CATALOG.ID), catalog);
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.protocol.migrations.v1.CatalogMigrationV1Helper;
import io.airbyte.config.ActorCatalog;
import io.airbyte.config.ActorCatalogFetchEvent;
import io.airbyte.config.ActorCatalogWithUpdatedAt;
Expand All @@ -40,6 +41,7 @@
import io.airbyte.config.StandardSync.Status;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.WorkspaceServiceAccount;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -67,8 +69,7 @@ public static StandardSync buildStandardSync(final Record record, final List<UUI
.withSourceId(record.get(CONNECTION.SOURCE_ID))
.withDestinationId(record.get(CONNECTION.DESTINATION_ID))
.withName(record.get(CONNECTION.NAME))
.withCatalog(
Jsons.deserialize(record.get(CONNECTION.CATALOG).data(), ConfiguredAirbyteCatalog.class))
.withCatalog(parseConfiguredAirbyteCatalog(record.get(CONNECTION.CATALOG).data()))
.withFieldSelectionData(record.get(CONNECTION.FIELD_SELECTION_DATA) == null ? null
: Jsons.deserialize(record.get(CONNECTION.FIELD_SELECTION_DATA).data(), FieldSelectionData.class))
.withStatus(
Expand All @@ -92,6 +93,13 @@ public static StandardSync buildStandardSync(final Record record, final List<UUI
.withNotifySchemaChanges(record.get(CONNECTION.NOTIFY_SCHEMA_CHANGES));
}

private static ConfiguredAirbyteCatalog parseConfiguredAirbyteCatalog(final String configuredAirbyteCatalogString) {
final ConfiguredAirbyteCatalog configuredAirbyteCatalog = Jsons.deserialize(configuredAirbyteCatalogString, ConfiguredAirbyteCatalog.class);
// On-the-fly migration of persisted data types related objects (protocol v0->v1)
CatalogMigrationV1Helper.upgradeSchemaIfNeeded(configuredAirbyteCatalog);
return configuredAirbyteCatalog;
}

public static StandardWorkspace buildStandardWorkspace(final Record record) {
final List<Notification> notificationList = new ArrayList<>();
final List fetchedNotifications = Jsons.deserialize(record.get(WORKSPACE.NOTIFICATIONS).data(), List.class);
Expand Down Expand Up @@ -215,18 +223,25 @@ public static SourceOAuthParameter buildSourceOAuthParameter(final Record record
public static ActorCatalog buildActorCatalog(final Record record) {
return new ActorCatalog()
.withId(record.get(ACTOR_CATALOG.ID))
.withCatalog(Jsons.deserialize(record.get(ACTOR_CATALOG.CATALOG).toString()))
.withCatalog(Jsons.jsonNode(parseAirbyteCatalog(record.get(ACTOR_CATALOG.CATALOG).toString())))
.withCatalogHash(record.get(ACTOR_CATALOG.CATALOG_HASH));
}

public static ActorCatalogWithUpdatedAt buildActorCatalogWithUpdatedAt(final Record record) {
return new ActorCatalogWithUpdatedAt()
.withId(record.get(ACTOR_CATALOG.ID))
.withCatalog(Jsons.deserialize(record.get(ACTOR_CATALOG.CATALOG).toString()))
.withCatalog(Jsons.jsonNode(parseAirbyteCatalog(record.get(ACTOR_CATALOG.CATALOG).toString())))
.withCatalogHash(record.get(ACTOR_CATALOG.CATALOG_HASH))
.withUpdatedAt(record.get(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT, LocalDateTime.class).toEpochSecond(ZoneOffset.UTC));
}

public static AirbyteCatalog parseAirbyteCatalog(final String airbyteCatalogString) {
final AirbyteCatalog airbyteCatalog = Jsons.deserialize(airbyteCatalogString, AirbyteCatalog.class);
// On-the-fly migration of persisted data types related objects (protocol v0->v1)
CatalogMigrationV1Helper.upgradeSchemaIfNeeded(airbyteCatalog);
return airbyteCatalog;
}

public static ActorCatalogFetchEvent buildActorCatalogFetchEvent(final Record record) {
return new ActorCatalogFetchEvent()
.withActorId(record.get(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID))
Expand Down
Loading