diff --git a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/command/SourceConfiguration.kt b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/command/SourceConfiguration.kt index b44451ac18497..10990305792c6 100644 --- a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/command/SourceConfiguration.kt +++ b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/command/SourceConfiguration.kt @@ -20,6 +20,11 @@ interface SourceConfiguration : Configuration, SshTunnelConfiguration { val maxConcurrency: Int val resourceAcquisitionHeartbeat: Duration + /** Whether it's a CDC configuration. Default to global state */ + fun isCdc(): Boolean { + return global + } + /** * Micronaut factory which glues [ConfigurationSpecificationSupplier] and * [SourceConfigurationFactory] together to produce a [SourceConfiguration] singleton. diff --git a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/AirbyteStreamFactory.kt b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/AirbyteStreamFactory.kt index b238524f4e96a..709cd35361ba1 100644 --- a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/AirbyteStreamFactory.kt +++ b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/AirbyteStreamFactory.kt @@ -1,17 +1,15 @@ /* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ package io.airbyte.cdk.discover +import io.airbyte.cdk.command.SourceConfiguration import io.airbyte.protocol.models.Field as AirbyteField import io.airbyte.protocol.models.v0.AirbyteStream import io.airbyte.protocol.models.v0.CatalogHelpers /** Stateless object for building an [AirbyteStream] during DISCOVER. */ interface AirbyteStreamFactory { - /** Connector-specific [AirbyteStream] creation logic for GLOBAL-state streams. */ - fun createGlobal(discoveredStream: DiscoveredStream): AirbyteStream - - /** Connector-specific [AirbyteStream] creation logic for STREAM-state streams. */ - fun createNonGlobal(discoveredStream: DiscoveredStream): AirbyteStream + /** Connector-specific [AirbyteStream] creation logic. */ + fun create(config: SourceConfiguration, discoveredStream: DiscoveredStream): AirbyteStream companion object { diff --git a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/DiscoverOperation.kt b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/DiscoverOperation.kt index a0b34e5e3999d..267b849c15610 100644 --- a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/DiscoverOperation.kt +++ b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/DiscoverOperation.kt @@ -38,11 +38,7 @@ class DiscoverOperation( val primaryKey: List> = metadataQuerier.primaryKey(streamID) val discoveredStream = DiscoveredStream(streamID, fields, primaryKey) val airbyteStream: AirbyteStream = - if (config.global) { - airbyteStreamFactory.createGlobal(discoveredStream) - } else { - airbyteStreamFactory.createNonGlobal(discoveredStream) - } + airbyteStreamFactory.create(config, discoveredStream) airbyteStreams.add(airbyteStream) } } diff --git a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/MetaFieldDecorator.kt b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/MetaFieldDecorator.kt index def480d35fb50..8b48162eadabd 100644 --- a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/MetaFieldDecorator.kt +++ b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/MetaFieldDecorator.kt @@ -34,7 +34,6 @@ interface MetaFieldDecorator { } val globalCursorIdentifier: String = globalCursor?.id ?: return airbyteStream.defaultCursorField = listOf(globalCursorIdentifier) - airbyteStream.sourceDefinedCursor = true } /** diff --git a/airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/discover/TestAirbyteStreamFactory.kt b/airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/discover/TestAirbyteStreamFactory.kt index 271a4585c4e23..5df35bd5cd89b 100644 --- a/airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/discover/TestAirbyteStreamFactory.kt +++ b/airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/discover/TestAirbyteStreamFactory.kt @@ -4,6 +4,7 @@ package io.airbyte.cdk.discover +import io.airbyte.cdk.command.SourceConfiguration import io.airbyte.protocol.models.v0.AirbyteStream import io.airbyte.protocol.models.v0.SyncMode import io.micronaut.context.annotation.Requires @@ -17,27 +18,19 @@ class TestAirbyteStreamFactory( val metaFieldDecorator: TestMetaFieldDecorator, ) : AirbyteStreamFactory { - override fun createGlobal(discoveredStream: DiscoveredStream): AirbyteStream = + override fun create( + config: SourceConfiguration, + discoveredStream: DiscoveredStream + ): AirbyteStream = AirbyteStreamFactory.createAirbyteStream(discoveredStream).apply { + val hasPK = discoveredStream.primaryKeyColumnIDs.isNotEmpty() supportedSyncModes = listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL) - metaFieldDecorator.decorateAirbyteStream(this) - if (discoveredStream.primaryKeyColumnIDs.isNotEmpty()) { - sourceDefinedPrimaryKey = discoveredStream.primaryKeyColumnIDs - isResumable = true - } else { - isResumable = false - } - } - - override fun createNonGlobal(discoveredStream: DiscoveredStream): AirbyteStream = - AirbyteStreamFactory.createAirbyteStream(discoveredStream).apply { - supportedSyncModes = listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL) - sourceDefinedCursor = false - if (discoveredStream.primaryKeyColumnIDs.isNotEmpty()) { - sourceDefinedPrimaryKey = discoveredStream.primaryKeyColumnIDs - isResumable = true - } else { - isResumable = false + if (config.isCdc()) { + metaFieldDecorator.decorateAirbyteStream(this) } + sourceDefinedPrimaryKey = + if (hasPK) discoveredStream.primaryKeyColumnIDs else emptyList() + sourceDefinedCursor = config.isCdc() && hasPK + isResumable = hasPK } } diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/discover/JdbcAirbyteStreamFactory.kt b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/discover/JdbcAirbyteStreamFactory.kt index f702d3b506d91..3932e96d916ba 100644 --- a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/discover/JdbcAirbyteStreamFactory.kt +++ b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/discover/JdbcAirbyteStreamFactory.kt @@ -1,47 +1,52 @@ /* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ package io.airbyte.cdk.discover +import io.airbyte.cdk.command.SourceConfiguration import io.airbyte.cdk.jdbc.BooleanFieldType import io.airbyte.cdk.jdbc.CharacterStreamFieldType import io.airbyte.cdk.jdbc.ClobFieldType import io.airbyte.cdk.jdbc.JsonStringFieldType import io.airbyte.cdk.jdbc.NCharacterStreamFieldType import io.airbyte.cdk.jdbc.NClobFieldType +import io.airbyte.protocol.models.v0.AirbyteStream import io.airbyte.protocol.models.v0.SyncMode -/** [JdbcAirbyteStreamFactory] implements [createGlobal] and [createNonGlobal] for JDBC sourcesx. */ +/** [JdbcAirbyteStreamFactory] implements [create] for JDBC sources. */ interface JdbcAirbyteStreamFactory : AirbyteStreamFactory, MetaFieldDecorator { - override fun createGlobal(discoveredStream: DiscoveredStream) = - AirbyteStreamFactory.createAirbyteStream(discoveredStream).apply { - if (hasValidPrimaryKey(discoveredStream)) { - decorateAirbyteStream(this) - supportedSyncModes = listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL) - sourceDefinedPrimaryKey = discoveredStream.primaryKeyColumnIDs - isResumable = true - } else { - supportedSyncModes = listOf(SyncMode.FULL_REFRESH) - sourceDefinedCursor = false - isResumable = false - } - } + override fun create( + config: SourceConfiguration, + discoveredStream: DiscoveredStream + ): AirbyteStream { + val isCdc = config.isCdc() + val hasPK = hasValidPrimaryKey(discoveredStream) + val hasPotentialCursorField = hasPotentialCursorFields(discoveredStream) - override fun createNonGlobal(discoveredStream: DiscoveredStream) = - AirbyteStreamFactory.createAirbyteStream(discoveredStream).apply { - if (hasCursorFields(discoveredStream)) { - supportedSyncModes = listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL) - } else { - supportedSyncModes = listOf(SyncMode.FULL_REFRESH) + val syncModes = + when { + // Incremental sync is only provided as a sync option if the stream has a potential + // cursor field or is configured as CDC with a valid primary key. + !isCdc && hasPotentialCursorField || isCdc && hasPK -> + listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL) + else -> listOf(SyncMode.FULL_REFRESH) } - sourceDefinedCursor = false - if (hasValidPrimaryKey(discoveredStream)) { - sourceDefinedPrimaryKey = discoveredStream.primaryKeyColumnIDs - isResumable = true + val primaryKey: List> = + if (isCdc || hasPK) discoveredStream.primaryKeyColumnIDs else emptyList() + val stream = + AirbyteStreamFactory.createAirbyteStream(discoveredStream).apply { + if (isCdc && hasPK) { + decorateAirbyteStream(this) + } + supportedSyncModes = syncModes + sourceDefinedPrimaryKey = primaryKey + sourceDefinedCursor = isCdc && hasPK + isResumable = hasPK } - } + return stream + } /** Does the [discoveredStream] have a field that could serve as a cursor? */ - fun hasCursorFields(discoveredStream: DiscoveredStream): Boolean = + fun hasPotentialCursorFields(discoveredStream: DiscoveredStream): Boolean = !discoveredStream.columns.none(::isPossibleCursor) /** Does the [discoveredStream] have a valid primary key declared? */ diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/discover/JdbcAirbyteStreamFactoryTest.kt b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/discover/JdbcAirbyteStreamFactoryTest.kt new file mode 100644 index 0000000000000..47fe0b1d483f5 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/discover/JdbcAirbyteStreamFactoryTest.kt @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.discover + +import io.airbyte.cdk.StreamIdentifier +import io.airbyte.cdk.command.SourceConfiguration +import io.airbyte.cdk.h2source.H2SourceOperations +import io.airbyte.cdk.jdbc.BigIntegerFieldType +import io.airbyte.cdk.jdbc.BooleanFieldType +import io.airbyte.cdk.jdbc.StringFieldType +import io.airbyte.protocol.models.v0.SyncMode +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.mockito.Mockito.mock +import org.mockito.Mockito.`when` + +class JdbcAirbyteStreamFactoryTest { + + private lateinit var config: SourceConfiguration + private lateinit var streamId: StreamIdentifier + private lateinit var discoveredStream: DiscoveredStream + + @BeforeEach + fun setUp() { + config = mock(SourceConfiguration::class.java) + streamId = mock(StreamIdentifier::class.java) + discoveredStream = mock(DiscoveredStream::class.java) + `when`(streamId.name).thenReturn("test_stream") + `when`(streamId.namespace).thenReturn("test_namespace") + `when`(discoveredStream.id).thenReturn(streamId) + `when`(discoveredStream.columns) + .thenReturn(listOf(Field("id", BigIntegerFieldType), Field("name", StringFieldType))) + } + + @Test + fun testCreate_withCdcAndWithPK() { + `when`(config.isCdc()).thenReturn(true) + `when`(discoveredStream.primaryKeyColumnIDs).thenReturn(listOf(listOf("id"))) + + val factory = H2SourceOperations() + val stream = factory.create(config, discoveredStream) + + Assertions.assertEquals( + listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL), + stream.supportedSyncModes + ) + Assertions.assertTrue(stream.sourceDefinedCursor) + Assertions.assertTrue(stream.isResumable) + } + + @Test + fun testCreate_withCdcAndWithoutPK() { + `when`(config.isCdc()).thenReturn(true) + `when`(discoveredStream.primaryKeyColumnIDs).thenReturn(emptyList()) + + val factory = H2SourceOperations() + val stream = factory.create(config, discoveredStream) + + Assertions.assertEquals(listOf(SyncMode.FULL_REFRESH), stream.supportedSyncModes) + Assertions.assertFalse(stream.sourceDefinedCursor) + Assertions.assertFalse(stream.isResumable) + } + + @Test + fun testCreate_withNonCdcAndWithPK() { + `when`(config.isCdc()).thenReturn(false) + `when`(discoveredStream.primaryKeyColumnIDs).thenReturn(listOf(listOf("id"))) + + val factory = H2SourceOperations() + val stream = factory.create(config, discoveredStream) + + Assertions.assertEquals( + listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL), + stream.supportedSyncModes + ) + Assertions.assertFalse(stream.sourceDefinedCursor) + Assertions.assertTrue(stream.isResumable) + } + + @Test + fun testCreate_withNonCdcAndWithoutPK() { + `when`(config.isCdc()).thenReturn(false) + `when`(discoveredStream.primaryKeyColumnIDs).thenReturn(emptyList()) + `when`(discoveredStream.columns) + .thenReturn(listOf(Field("non_cursor_col", BooleanFieldType))) + + val factory = H2SourceOperations() + val stream = factory.create(config, discoveredStream) + + Assertions.assertEquals(listOf(SyncMode.FULL_REFRESH), stream.supportedSyncModes) + Assertions.assertFalse(stream.sourceDefinedCursor) + Assertions.assertFalse(stream.isResumable) + } +} diff --git a/airbyte-integrations/connectors/source-mysql/build.gradle b/airbyte-integrations/connectors/source-mysql/build.gradle index c0ca3326509f4..2436ed871d2b3 100644 --- a/airbyte-integrations/connectors/source-mysql/build.gradle +++ b/airbyte-integrations/connectors/source-mysql/build.gradle @@ -9,7 +9,7 @@ application { airbyteBulkConnector { core = 'extract' toolkits = ['extract-jdbc', 'extract-cdc'] - cdk = '0.300' + cdk = 'local' } dependencies { diff --git a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcIntegrationTest.kt b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcIntegrationTest.kt index e2407e724cc31..d146d26912a8d 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcIntegrationTest.kt +++ b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcIntegrationTest.kt @@ -123,7 +123,12 @@ class MySqlSourceCdcIntegrationTest { columns = listOf(Field("k", IntFieldType), Field("v", StringFieldType)), primaryKeyColumnIDs = listOf(listOf("k")), ) - val stream: AirbyteStream = MySqlSourceOperations().createGlobal(discoveredStream) + val stream: AirbyteStream = + MySqlSourceOperations() + .create( + MySqlSourceConfigurationFactory().make(config()), + discoveredStream, + ) val configuredStream: ConfiguredAirbyteStream = CatalogHelpers.toDefaultConfiguredStream(stream) .withSyncMode(SyncMode.INCREMENTAL) diff --git a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCursorBasedIntegrationTest.kt b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCursorBasedIntegrationTest.kt index 5e2f9267fc0a9..acde416ab78bf 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCursorBasedIntegrationTest.kt +++ b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCursorBasedIntegrationTest.kt @@ -196,7 +196,12 @@ class MySqlSourceCursorBasedIntegrationTest { columns = listOf(Field("k", IntFieldType), Field("v", StringFieldType)), primaryKeyColumnIDs = listOf(listOf("k")), ) - val stream: AirbyteStream = MySqlSourceOperations().createGlobal(discoveredStream) + val stream: AirbyteStream = + MySqlSourceOperations() + .create( + MySqlSourceConfigurationFactory().make(config), + discoveredStream, + ) val configuredStream: ConfiguredAirbyteStream = CatalogHelpers.toDefaultConfiguredStream(stream) .withSyncMode(SyncMode.INCREMENTAL)