-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Data types update: Implement protocol message migrations #19240
Data types update: Implement protocol message migrations #19240
Conversation
@benmoriceau / @gosusnp - is the rough structure here correct? I.e.
And I don't need to write a new serializer, because that's only needed for major version bumps? |
@edgao, not exactly, we should only write migrations between Major versions, the assumption is that everything is backward compatible with the same major. |
import java.util.function.Function; | ||
import java.util.stream.StreamSupport; | ||
|
||
public class AirbyteMessageMigrationV1_1_0 implements AirbyteMessageMigration<AirbyteMessage, io.airbyte.protocol.models.v0.AirbyteMessage> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a AirbyteMessageMigrationV1
since we will apply this migration for this major, and not specifically 1.1.0
Since we need to build the v1, the implement signature should be AirbyteMessageMigration<io.airbyte.protocol.models.v0.AirbyteMessage, io.airbyte.protocol.models.v1.AirbyteMessage>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since v1 isn't fully defined yet, I'll just switch this to io.airbyte.protocol.models.v0.AirbyteMessage, AirbyteMessage
for now. Will write an issue to fix this later.
...ocol/src/main/java/io/airbyte/commons/protocol/migrations/AirbyteMessageMigrationV1_1_0.java
Outdated
Show resolved
Hide resolved
hm, I think there might have been some mixup about what constitutes a breaking change. My understanding from talking with evan is that anything with non-destructive upgrade migration (i.e. upgrade+downgrade is a noop; downgrade+upgrade does something reasonable) is considered backwards-compatible? I think this change depends on the v1 change (connectors being able to declare their version). Does it make sense to declare this as v2? (and then have the legacy state removal in v3) |
My two cents on this, probably the most common breaking change would be removing a field. Adding a new field is generally backward compatible if it is defined with a default. For example, expanding on this, renaming a field would be breaking change because it involves adding and removing a field at the same time. The idea of moving to v1 is to pave the way for some clean up. Technically, adding the protocol version is a non breaking change because it has been defined with 0.2.0 as a default value. |
cool, will switch this PR to use 1.0.0 then. That means I do need to write a serializer then, right? |
Yes but it should be an easy one, as long as we don't break the current pattern of using JSON messages, there's a helper. You should be able to replicate AirbyteMessageV0Serializer, same for deserialization. |
...tocol/protocol-models/src/main/java/io/airbyte/protocol/models/JsonSchemaReferenceTypes.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good work!
@gosusnp can you take another look at this? Feel free to ignore the actual migration code, I just want to make sure I didn't miss any important components. (I'm working on the downgrade migration in a separate branch) my understanding is that this PR depends on #16814 being done, after which I'll need need to switch everything from |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I focused mostly on the structure for versioning. This looks good for the most part. There's an annotation missing on the migration and some types to adjust but we'll need the actual v1 in before being able to fix it.
import java.util.function.Function; | ||
import java.util.stream.StreamSupport; | ||
|
||
public class AirbyteMessageMigrationV1 implements AirbyteMessageMigration<io.airbyte.protocol.models.v0.AirbyteMessage, AirbyteMessage> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of things here:
You need the @Singleton
annotation from import jakarta.inject.Singleton;
for this migration to be picked up by the framework.
There should be an io.airbyte.protocol.models.v1.AirbyteMessage
and the implements should be AirbyteMessageMigration<io.airbyte.protocol.models.v0.AirbyteMessage, io.airbyte.protocol.models.v1.AirbyteMessage>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can wait until we get the v1 objects merged in.
…gao/data_type_protocol_migration_upgrade
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, A couple of notes
- missing a lot of
final
, please fix those before merging - the build errors feel unrelated to your changes, since we're merging into another branch, it's probably fine to merge and address those while we pull from master.
* Add Airbyte Protocol V1 support. * Fix VersionedAirbyteStreamFactoryTest * Remove AirbyteMessageMigrationV0 example * Add Protocol Version constants * 🎉Updated normalization to handle new datatypes (#19721) * Updated normalization simple stream processing to handle new datatypes * Updated normalization nested stream processing to handle new datatypes * Updated normalization nested stream processing to handle new datatypes * Updated normalization drop_scd_catalog processing to handle new datatypes * Updated normalization ephemeral test processing to handle new datatypes * fixed more tests for normalization * fixed more tests for normalization * fixed more tests for normalization * fixed more tests for normalization * fixed more issues * fixed more issues (clickhouse) * fixed more issues * fixed more issues * fixed more issues * added binary type processing for some DBs * cleared commented code and moved some hardcodes to processing as macro * fixed codestyle and cleared commented code * minor refactor * minor refactor * minor refactor * fixed bool cast error * fixed dict->str cast error * fixed is_combining_node cast py check * removed commented code * removed commented code * committed autogenerated normalization_test_output files * committed autogenerated normalization_test_output files (new files) * refactored utils.py * Updated utils.py to use Callable functions and get rid of property_type in is_number and is_bool functions * committed autogenerated normalization_test_output files (new files) * fixed typo in TIMESTAMP_WITH_TIMEZONE_TYPE * updated stream_processor to handle string type first as a wider type * fixed arrays normalization by updating is_simple_property method as per new approaches * format Co-authored-by: Edward Gao <[email protected]> * Update airbyte protocol migration (#20745) * Extract MigrationContainer from AirbyteMessageMigrator * Add ConfiguredAirbyteCatalogMigrations * Add ConfiguredAirbyteCatalog to AirbyteMessageMigrations * Enable ConfiguredAirbyteCatalog migration * Fix tests * Remove extra this. * Add missing docs * Typo Co-authored-by: Edward Gao <[email protected]> * Data types update: Implement protocol message migrations (#19240) * Extract MigrationContainer from AirbyteMessageMigrator * Add ConfiguredAirbyteCatalogMigrations * Add ConfiguredAirbyteCatalog to AirbyteMessageMigrations * Enable ConfiguredAirbyteCatalog migration * set up scaffolding * [wip] more scaffolding, basic unit test * minimal green code * [wip] add failing test for other primitive types * correct version number * handle basic primitive type decls * add implicit cases * add recursive schema * formatting * comment * support not * fix indentation * handle all nested schema cases * handle boolean schemas * verify empty schema handling * cleanup * extract map * code organization * extract method * reformat * [wip] more tests, minor fix type array handling * corrected test * cleanup * reformat * switch to v1 * add support for multityped fields * missed test case * nested test class * basic record upgrade * implement record upgrades * slight refactor * comments+clarificationso * extract constants * (partly) correct model classes * add de/ser * formatting * extract constants * fix json reference * update docs * switch to v1 models * fix compile+test * add base64 handling * use vnull * Data types update: Implement protocol message downgrade path (#19909) * rough skeleton for passing catalog into migration * basic test * more scaffolding * basic implementation * add primitives test * add in other tests (nested fields currently failing) * add formats * impleent oneOf handling * formatting * oneOf handling * better tests * comments + organization * progress * basic test case * downgrade objects, ish * basic array implementation * handle numeric failure * test for new type * handle array items * empty schema handling * first pass at oneof handling * add more tests+handling * more tests * comments * add empty oneof test case * format + reorganize * more reorganize * fix name * also downgrade binary data * only import vnull * move migrations into v1 package * extract schema mutation code * comment * extract schema migration to new class * extract record downgrade logic for future use * format * fix build after rebase * rename private method for consistency * also implement configuredcatalog migrations >.> * quick and dirty tests * slight cleanup * fix tests * pmd * pmd test * null check on message objects * maybe fix acceptance tests? * fix name * extract constants * more fixes * tmp * meh * fix cdc acc tests * revert to master source-postgres * remove log messages * revert other misc hacks * integers are valid cursors * remove unrelated change * fix build * fix build more? * [MUST REVERT] use dev normalization * capture kube logs * also here? * no debug logs? * delete dup from merging * add final everywhere * revert test changes Co-authored-by: Jimmy Ma <[email protected]> * On-the-fly migrations of persisted catalogs (#21757) * On the fly catalog migration for normalization activity * On the fly catalog migration for job persistence * On the fly migration for standard sync persistence * On the fly migration for airbyte catalogs * Refactor code to share JsonSchema traversal * Add V0 Data type search function * PMD and Format * Fix getOrInsertActorCatalog and ConfigRepositoryE2E tests * Null-proofing CatalogMigrationV1Helper * More null checks * Fix test * Format * Add data type v1 support to the FE * Changes AC test check to check exited ps (#21672) some docker compose changes no longer show exited processes. this broke out test this change should fix master tested in a runner that failed * Move wellknown types mapping to the utility function * use protocolv1 normalization --------- Co-authored-by: Topher Lubaway <[email protected]> Co-authored-by: Edward Gao <[email protected]> * Update protocol support range (#21996) * bump normalization version to 0.3.0 * Add version check on normalization (#22048) * Add normalization min version check * Add visible for testing --------- Co-authored-by: Edward Gao <[email protected]> Co-authored-by: Eugene <[email protected]> Co-authored-by: Topher Lubaway <[email protected]>
closes #17639 and #18254
In most catalogs, this is a one-to-one swap (i.e. detect an existing schema and replace it with an updated schema). The one exception is in
type: [foo, bar]
schema declarations, which are now expanded tooneOf
declarations.mutateSchemas
is written slightly generically, because I plan to use it in the downgrade path as well.Reading order: