Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data types update: Implement protocol message migrations #19240

Merged
Show file tree
Hide file tree
Changes from 64 commits
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
e2146b9
Extract MigrationContainer from AirbyteMessageMigrator
gosusnp Dec 20, 2022
caa1692
Add ConfiguredAirbyteCatalogMigrations
gosusnp Dec 20, 2022
a374a62
Add ConfiguredAirbyteCatalog to AirbyteMessageMigrations
gosusnp Dec 20, 2022
7adbe36
Enable ConfiguredAirbyteCatalog migration
gosusnp Dec 21, 2022
7746f87
set up scaffolding
edgao Nov 8, 2022
06b144a
[wip] more scaffolding, basic unit test
edgao Nov 8, 2022
66c1830
minimal green code
edgao Nov 8, 2022
b9275bd
[wip] add failing test for other primitive types
edgao Nov 8, 2022
a333017
correct version number
edgao Nov 8, 2022
ecbcda9
handle basic primitive type decls
edgao Nov 9, 2022
683e3bf
add implicit cases
edgao Nov 9, 2022
9333641
add recursive schema
edgao Nov 9, 2022
4d71da8
formatting
edgao Nov 9, 2022
e0b95a9
comment
edgao Nov 9, 2022
8ae40ad
support not
edgao Nov 9, 2022
8079601
fix indentation
edgao Nov 9, 2022
8ee6c74
handle all nested schema cases
edgao Nov 9, 2022
916349b
handle boolean schemas
edgao Nov 9, 2022
43fd5df
verify empty schema handling
edgao Nov 9, 2022
82f7a11
cleanup
edgao Nov 9, 2022
8301826
extract map
edgao Nov 9, 2022
66b5141
code organization
edgao Nov 9, 2022
3bd864d
extract method
edgao Nov 9, 2022
0378326
reformat
edgao Nov 9, 2022
d691fe4
[wip] more tests, minor fix type array handling
edgao Nov 9, 2022
06c4ac8
corrected test
edgao Nov 9, 2022
e584a61
cleanup
edgao Nov 9, 2022
3ea8135
reformat
edgao Nov 9, 2022
4c3d960
switch to v1
edgao Nov 10, 2022
a80aa76
add support for multityped fields
edgao Nov 10, 2022
4f242c2
missed test case
edgao Nov 10, 2022
bb5afd5
nested test class
edgao Nov 10, 2022
137ed33
basic record upgrade
edgao Nov 10, 2022
238098b
implement record upgrades
edgao Nov 10, 2022
98cdd89
slight refactor
edgao Nov 10, 2022
2609d88
comments+clarificationso
edgao Nov 11, 2022
c008185
extract constants
edgao Nov 15, 2022
a6c9727
(partly) correct model classes
edgao Nov 16, 2022
32d4c56
add de/ser
edgao Nov 16, 2022
9f868e0
formatting
edgao Nov 16, 2022
ceb4175
extract constants
edgao Nov 16, 2022
71e0eed
fix json reference
edgao Nov 19, 2022
a8e5cce
update docs
edgao Nov 19, 2022
8c79cbd
switch to v1 models
edgao Nov 30, 2022
4475794
fix compile+test
edgao Dec 17, 2022
8e896e5
add base64 handling
edgao Dec 17, 2022
3844a9b
use vnull
edgao Dec 20, 2022
13afb38
Data types update: Implement protocol message downgrade path (#19909)
edgao Dec 21, 2022
5fb55af
move migrations into v1 package
edgao Dec 21, 2022
ed8874a
extract schema mutation code
edgao Dec 21, 2022
9f78cb5
comment
edgao Dec 21, 2022
d8f2465
extract schema migration to new class
edgao Dec 21, 2022
81587a2
extract record downgrade logic for future use
edgao Dec 22, 2022
0c64b7f
format
edgao Dec 22, 2022
b63cce9
fix build after rebase
edgao Dec 23, 2022
433c536
rename private method for consistency
edgao Dec 23, 2022
9478e50
also implement configuredcatalog migrations >.>
edgao Dec 23, 2022
e3a049c
quick and dirty tests
edgao Dec 23, 2022
282f9a1
slight cleanup
edgao Dec 23, 2022
862fa44
fix tests
edgao Dec 23, 2022
eb7b77c
pmd
edgao Dec 23, 2022
60352b7
pmd test
edgao Dec 23, 2022
4ec3ccb
null check on message objects
edgao Dec 23, 2022
7a9ff52
maybe fix acceptance tests?
edgao Dec 23, 2022
a5d49bd
fix name
edgao Jan 2, 2023
7fe61f9
extract constants
edgao Jan 2, 2023
2c3cbbb
more fixes
edgao Jan 4, 2023
774d31c
tmp
edgao Jan 5, 2023
b0270f8
meh
edgao Jan 5, 2023
1300082
fix cdc acc tests
edgao Jan 5, 2023
7f393c7
revert to master source-postgres
edgao Jan 6, 2023
7c5a6dc
remove log messages
edgao Jan 6, 2023
dbad5bb
Merge branch 'gosusnp/platform-use-protocol-v1-the-quick-way' into go…
edgao Jan 6, 2023
f66be38
Merge branch 'gosusnp/20695-update-airbyte-protocol-migration' into l…
edgao Jan 6, 2023
c2a8886
revert other misc hacks
edgao Jan 6, 2023
99d5e64
integers are valid cursors
edgao Jan 6, 2023
7b12861
remove unrelated change
edgao Jan 6, 2023
de287e2
fix build
edgao Jan 6, 2023
5342166
fix build more?
edgao Jan 6, 2023
f703609
[MUST REVERT] use dev normalization
edgao Jan 9, 2023
a7609ca
capture kube logs
edgao Jan 9, 2023
1c91ef7
also here?
edgao Jan 10, 2023
d3d34ea
no debug logs?
edgao Jan 10, 2023
6764a79
Merge branch 'gosusnp/platform-use-protocol-v1-the-quick-way' into ed…
edgao Jan 10, 2023
9c2cf19
delete dup from merging
edgao Jan 10, 2023
f968c6a
add final everywhere
edgao Jan 11, 2023
58f9cd5
revert test changes
edgao Jan 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-commons-protocol/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ dependencies {
testImplementation libs.bundles.micronaut.test

implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-json-validation')
}

Task publishArtifactsTask = getPublishArtifactsTask("$rootProject.ext.version", project)
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.protocol.migrations.AirbyteMessageMigration;
import io.airbyte.commons.protocol.migrations.MigrationContainer;
import io.airbyte.commons.version.Version;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Singleton;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;

/**
* AirbyteProtocol Message Migrator
Expand All @@ -25,104 +24,59 @@
@Singleton
public class AirbyteMessageMigrator {

private final List<AirbyteMessageMigration<?, ?>> migrationsToRegister;
private final SortedMap<String, AirbyteMessageMigration<?, ?>> migrations = new TreeMap<>();
private String mostRecentMajorVersion = "";
private final MigrationContainer<AirbyteMessageMigration<?, ?>> migrationContainer;

public AirbyteMessageMigrator(List<AirbyteMessageMigration<?, ?>> migrations) {
migrationsToRegister = migrations;
}

public AirbyteMessageMigrator() {
this(Collections.emptyList());
public AirbyteMessageMigrator(final List<AirbyteMessageMigration<?, ?>> migrations) {
migrationContainer = new MigrationContainer<>(migrations);
}

@PostConstruct
public void initialize() {
migrationsToRegister.forEach(this::registerMigration);
migrationContainer.initialize();
}

/**
* Downgrade a message from the most recent version to the target version by chaining all the
* required migrations
*/
public <PreviousVersion, CurrentVersion> PreviousVersion downgrade(final CurrentVersion message, final Version target) {
if (target.getMajorVersion().equals(mostRecentMajorVersion)) {
return (PreviousVersion) message;
}

Object result = message;
Object[] selectedMigrations = selectMigrations(target).toArray();
for (int i = selectedMigrations.length; i > 0; --i) {
result = applyDowngrade((AirbyteMessageMigration<?, ?>) selectedMigrations[i - 1], result);
}
return (PreviousVersion) result;
public <PreviousVersion, CurrentVersion> PreviousVersion downgrade(final CurrentVersion message,
final Version target,
final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
return migrationContainer.downgrade(message, target, (migration, msg) -> applyDowngrade(migration, msg, configuredAirbyteCatalog));
}

/**
* Upgrade a message from the source version to the most recent version by chaining all the required
* migrations
*/
public <PreviousVersion, CurrentVersion> CurrentVersion upgrade(final PreviousVersion message, final Version source) {
if (source.getMajorVersion().equals(mostRecentMajorVersion)) {
return (CurrentVersion) message;
}

Object result = message;
for (var migration : selectMigrations(source)) {
result = applyUpgrade(migration, result);
}
return (CurrentVersion) result;
public <PreviousVersion, CurrentVersion> CurrentVersion upgrade(final PreviousVersion message,
final Version source,
final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
return migrationContainer.upgrade(message, source, (migration, msg) -> applyUpgrade(migration, msg, configuredAirbyteCatalog));
}

public Version getMostRecentVersion() {
return new Version(mostRecentMajorVersion, "0", "0");
}

private Collection<AirbyteMessageMigration<?, ?>> selectMigrations(final Version version) {
final Collection<AirbyteMessageMigration<?, ?>> results = migrations.tailMap(version.getMajorVersion()).values();
if (results.isEmpty()) {
throw new RuntimeException("Unsupported migration version " + version.serialize());
}
return results;
return migrationContainer.getMostRecentVersion();
}

// Helper function to work around type casting
private <PreviousVersion, CurrentVersion> PreviousVersion applyDowngrade(final AirbyteMessageMigration<PreviousVersion, CurrentVersion> migration,
final Object message) {
return migration.downgrade((CurrentVersion) message);
private static <PreviousVersion, CurrentVersion> PreviousVersion applyDowngrade(final AirbyteMessageMigration<PreviousVersion, CurrentVersion> migration,
final Object message,
final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
return migration.downgrade((CurrentVersion) message, configuredAirbyteCatalog);
}

// Helper function to work around type casting
private <PreviousVersion, CurrentVersion> CurrentVersion applyUpgrade(final AirbyteMessageMigration<PreviousVersion, CurrentVersion> migration,
final Object message) {
return migration.upgrade((PreviousVersion) message);
}

/**
* Store migration in a sorted map key by the major of the lower version of the migration.
*
* The goal is to be able to retrieve the list of migrations to apply to get to/from a given
* version. We are only keying on the lower version because the right side (most recent version of
* the migration range) is always current version.
*/
@VisibleForTesting
void registerMigration(final AirbyteMessageMigration<?, ?> migration) {
final String key = migration.getPreviousVersion().getMajorVersion();
if (!migrations.containsKey(key)) {
migrations.put(key, migration);
if (migration.getCurrentVersion().getMajorVersion().compareTo(mostRecentMajorVersion) > 0) {
mostRecentMajorVersion = migration.getCurrentVersion().getMajorVersion();
}
} else {
throw new RuntimeException("Trying to register a duplicated migration " + migration.getClass().getName());
}
private static <PreviousVersion, CurrentVersion> CurrentVersion applyUpgrade(final AirbyteMessageMigration<PreviousVersion, CurrentVersion> migration,
final Object message,
final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
return migration.upgrade((PreviousVersion) message, configuredAirbyteCatalog);
}

// Used for inspection of the injection
@VisibleForTesting
Set<String> getMigrationKeys() {
return migrations.keySet();
return migrationContainer.getMigrationKeys();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import io.airbyte.commons.version.Version;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.Optional;

/**
* Wraps message migration from a fixed version to the most recent version
Expand All @@ -20,12 +22,12 @@ public AirbyteMessageVersionedMigrator(final AirbyteMessageMigrator migrator, fi
this.version = version;
}

public OriginalMessageType downgrade(final AirbyteMessage message) {
return migrator.downgrade(message, version);
public OriginalMessageType downgrade(final AirbyteMessage message, final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
return migrator.downgrade(message, version, configuredAirbyteCatalog);
}

public AirbyteMessage upgrade(final OriginalMessageType message) {
return migrator.upgrade(message, version);
public AirbyteMessage upgrade(final OriginalMessageType message, final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
return migrator.upgrade(message, version, configuredAirbyteCatalog);
}

public Version getVersion() {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.protocol;

import io.airbyte.commons.version.Version;
import jakarta.inject.Singleton;

/**
* Factory to build AirbyteMessageVersionedMigrator
*/
@Singleton
public class AirbyteProtocolVersionedMigratorFactory {

private final AirbyteMessageMigrator airbyteMessageMigrator;
private final ConfiguredAirbyteCatalogMigrator configuredAirbyteCatalogMigrator;

public AirbyteProtocolVersionedMigratorFactory(final AirbyteMessageMigrator airbyteMessageMigrator,
final ConfiguredAirbyteCatalogMigrator configuredAirbyteCatalogMigrator) {
this.airbyteMessageMigrator = airbyteMessageMigrator;
this.configuredAirbyteCatalogMigrator = configuredAirbyteCatalogMigrator;
}

public <T> AirbyteMessageVersionedMigrator<T> getAirbyteMessageMigrator(final Version version) {
return new AirbyteMessageVersionedMigrator<>(this.airbyteMessageMigrator, version);
}

public final VersionedProtocolSerializer getProtocolSerializer(final Version version) {
return new VersionedProtocolSerializer(configuredAirbyteCatalogMigrator, version);
}

public Version getMostRecentVersion() {
return airbyteMessageMigrator.getMostRecentVersion();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.protocol;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.protocol.migrations.ConfiguredAirbyteCatalogMigration;
import io.airbyte.commons.protocol.migrations.MigrationContainer;
import io.airbyte.commons.version.Version;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Singleton;
import java.util.List;
import java.util.Set;

@Singleton
public class ConfiguredAirbyteCatalogMigrator {

private final MigrationContainer<ConfiguredAirbyteCatalogMigration<?, ?>> migrationContainer;

public ConfiguredAirbyteCatalogMigrator(final List<ConfiguredAirbyteCatalogMigration<?, ?>> migrations) {
migrationContainer = new MigrationContainer<>(migrations);
}

@PostConstruct
public void initialize() {
migrationContainer.initialize();
}

/**
* Downgrade a message from the most recent version to the target version by chaining all the
* required migrations
*/
public <PreviousVersion, CurrentVersion> PreviousVersion downgrade(final CurrentVersion message, final Version target) {
return migrationContainer.downgrade(message, target, ConfiguredAirbyteCatalogMigrator::applyDowngrade);
}

/**
* Upgrade a message from the source version to the most recent version by chaining all the required
* migrations
*/
public <PreviousVersion, CurrentVersion> CurrentVersion upgrade(final PreviousVersion message, final Version source) {
return migrationContainer.upgrade(message, source, ConfiguredAirbyteCatalogMigrator::applyUpgrade);
}

public Version getMostRecentVersion() {
return migrationContainer.getMostRecentVersion();
}

// Helper function to work around type casting
private static <PreviousVersion, CurrentVersion> PreviousVersion applyDowngrade(final ConfiguredAirbyteCatalogMigration<PreviousVersion, CurrentVersion> migration,
final Object message) {
return migration.downgrade((CurrentVersion) message);
}

// Helper function to work around type casting
private static <PreviousVersion, CurrentVersion> CurrentVersion applyUpgrade(final ConfiguredAirbyteCatalogMigration<PreviousVersion, CurrentVersion> migration,
final Object message) {
return migration.upgrade((PreviousVersion) message);
}

// Used for inspection of the injection
@VisibleForTesting
Set<String> getMigrationKeys() {
return migrationContainer.getMigrationKeys();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.protocol;

import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;

public class DefaultProtocolSerializer implements ProtocolSerializer {

@Override
public String serialize(ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
return Jsons.serialize(configuredAirbyteCatalog);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.protocol;

import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;

public interface ProtocolSerializer {

String serialize(final ConfiguredAirbyteCatalog configuredAirbyteCatalog);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.protocol;

import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.version.Version;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;

public class VersionedProtocolSerializer implements ProtocolSerializer {

private final ConfiguredAirbyteCatalogMigrator configuredAirbyteCatalogMigrator;
private final Version protocolVersion;

public VersionedProtocolSerializer(final ConfiguredAirbyteCatalogMigrator configuredAirbyteCatalogMigrator, final Version protocolVersion) {
this.configuredAirbyteCatalogMigrator = configuredAirbyteCatalogMigrator;
this.protocolVersion = protocolVersion;
}

@Override
public String serialize(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
return Jsons.serialize(configuredAirbyteCatalogMigrator.downgrade(configuredAirbyteCatalog, protocolVersion));
}

}
Loading