Skip to content

Commit

Permalink
[source volcano] Decorate stream with cursor field for trigger based …
Browse files Browse the repository at this point in the history
…CDC stream (#53152)
  • Loading branch information
burakku authored Feb 12, 2025
1 parent ddd9e0d commit f0a96b3
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ interface SourceConfiguration : Configuration, SshTunnelConfiguration {
val maxConcurrency: Int
val resourceAcquisitionHeartbeat: Duration

/** Whether it's a CDC configuration. Default to global state */
fun isCdc(): Boolean {
return global
}

/**
* Micronaut factory which glues [ConfigurationSpecificationSupplier] and
* [SourceConfigurationFactory] together to produce a [SourceConfiguration] singleton.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
package io.airbyte.cdk.discover

import io.airbyte.cdk.command.SourceConfiguration
import io.airbyte.protocol.models.Field as AirbyteField
import io.airbyte.protocol.models.v0.AirbyteStream
import io.airbyte.protocol.models.v0.CatalogHelpers

/** Stateless object for building an [AirbyteStream] during DISCOVER. */
interface AirbyteStreamFactory {
/** Connector-specific [AirbyteStream] creation logic for GLOBAL-state streams. */
fun createGlobal(discoveredStream: DiscoveredStream): AirbyteStream

/** Connector-specific [AirbyteStream] creation logic for STREAM-state streams. */
fun createNonGlobal(discoveredStream: DiscoveredStream): AirbyteStream
/** Connector-specific [AirbyteStream] creation logic. */
fun create(config: SourceConfiguration, discoveredStream: DiscoveredStream): AirbyteStream

companion object {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,7 @@ class DiscoverOperation(
val primaryKey: List<List<String>> = metadataQuerier.primaryKey(streamID)
val discoveredStream = DiscoveredStream(streamID, fields, primaryKey)
val airbyteStream: AirbyteStream =
if (config.global) {
airbyteStreamFactory.createGlobal(discoveredStream)
} else {
airbyteStreamFactory.createNonGlobal(discoveredStream)
}
airbyteStreamFactory.create(config, discoveredStream)
airbyteStreams.add(airbyteStream)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ interface MetaFieldDecorator {
}
val globalCursorIdentifier: String = globalCursor?.id ?: return
airbyteStream.defaultCursorField = listOf(globalCursorIdentifier)
airbyteStream.sourceDefinedCursor = true
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.cdk.discover

import io.airbyte.cdk.command.SourceConfiguration
import io.airbyte.protocol.models.v0.AirbyteStream
import io.airbyte.protocol.models.v0.SyncMode
import io.micronaut.context.annotation.Requires
Expand All @@ -17,27 +18,19 @@ class TestAirbyteStreamFactory(
val metaFieldDecorator: TestMetaFieldDecorator,
) : AirbyteStreamFactory {

override fun createGlobal(discoveredStream: DiscoveredStream): AirbyteStream =
override fun create(
config: SourceConfiguration,
discoveredStream: DiscoveredStream
): AirbyteStream =
AirbyteStreamFactory.createAirbyteStream(discoveredStream).apply {
val hasPK = discoveredStream.primaryKeyColumnIDs.isNotEmpty()
supportedSyncModes = listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)
metaFieldDecorator.decorateAirbyteStream(this)
if (discoveredStream.primaryKeyColumnIDs.isNotEmpty()) {
sourceDefinedPrimaryKey = discoveredStream.primaryKeyColumnIDs
isResumable = true
} else {
isResumable = false
}
}

override fun createNonGlobal(discoveredStream: DiscoveredStream): AirbyteStream =
AirbyteStreamFactory.createAirbyteStream(discoveredStream).apply {
supportedSyncModes = listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)
sourceDefinedCursor = false
if (discoveredStream.primaryKeyColumnIDs.isNotEmpty()) {
sourceDefinedPrimaryKey = discoveredStream.primaryKeyColumnIDs
isResumable = true
} else {
isResumable = false
if (config.isCdc()) {
metaFieldDecorator.decorateAirbyteStream(this)
}
sourceDefinedPrimaryKey =
if (hasPK) discoveredStream.primaryKeyColumnIDs else emptyList()
sourceDefinedCursor = config.isCdc() && hasPK
isResumable = hasPK
}
}
Original file line number Diff line number Diff line change
@@ -1,47 +1,52 @@
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
package io.airbyte.cdk.discover

import io.airbyte.cdk.command.SourceConfiguration
import io.airbyte.cdk.jdbc.BooleanFieldType
import io.airbyte.cdk.jdbc.CharacterStreamFieldType
import io.airbyte.cdk.jdbc.ClobFieldType
import io.airbyte.cdk.jdbc.JsonStringFieldType
import io.airbyte.cdk.jdbc.NCharacterStreamFieldType
import io.airbyte.cdk.jdbc.NClobFieldType
import io.airbyte.protocol.models.v0.AirbyteStream
import io.airbyte.protocol.models.v0.SyncMode

/** [JdbcAirbyteStreamFactory] implements [createGlobal] and [createNonGlobal] for JDBC sourcesx. */
/** [JdbcAirbyteStreamFactory] implements [create] for JDBC sources. */
interface JdbcAirbyteStreamFactory : AirbyteStreamFactory, MetaFieldDecorator {

override fun createGlobal(discoveredStream: DiscoveredStream) =
AirbyteStreamFactory.createAirbyteStream(discoveredStream).apply {
if (hasValidPrimaryKey(discoveredStream)) {
decorateAirbyteStream(this)
supportedSyncModes = listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)
sourceDefinedPrimaryKey = discoveredStream.primaryKeyColumnIDs
isResumable = true
} else {
supportedSyncModes = listOf(SyncMode.FULL_REFRESH)
sourceDefinedCursor = false
isResumable = false
}
}
override fun create(
config: SourceConfiguration,
discoveredStream: DiscoveredStream
): AirbyteStream {
val isCdc = config.isCdc()
val hasPK = hasValidPrimaryKey(discoveredStream)
val hasPotentialCursorField = hasPotentialCursorFields(discoveredStream)

override fun createNonGlobal(discoveredStream: DiscoveredStream) =
AirbyteStreamFactory.createAirbyteStream(discoveredStream).apply {
if (hasCursorFields(discoveredStream)) {
supportedSyncModes = listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)
} else {
supportedSyncModes = listOf(SyncMode.FULL_REFRESH)
val syncModes =
when {
// Incremental sync is only provided as a sync option if the stream has a potential
// cursor field or is configured as CDC with a valid primary key.
!isCdc && hasPotentialCursorField || isCdc && hasPK ->
listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)
else -> listOf(SyncMode.FULL_REFRESH)
}
sourceDefinedCursor = false
if (hasValidPrimaryKey(discoveredStream)) {
sourceDefinedPrimaryKey = discoveredStream.primaryKeyColumnIDs
isResumable = true
val primaryKey: List<List<String>> =
if (isCdc || hasPK) discoveredStream.primaryKeyColumnIDs else emptyList()
val stream =
AirbyteStreamFactory.createAirbyteStream(discoveredStream).apply {
if (isCdc && hasPK) {
decorateAirbyteStream(this)
}
supportedSyncModes = syncModes
sourceDefinedPrimaryKey = primaryKey
sourceDefinedCursor = isCdc && hasPK
isResumable = hasPK
}
}
return stream
}

/** Does the [discoveredStream] have a field that could serve as a cursor? */
fun hasCursorFields(discoveredStream: DiscoveredStream): Boolean =
fun hasPotentialCursorFields(discoveredStream: DiscoveredStream): Boolean =
!discoveredStream.columns.none(::isPossibleCursor)

/** Does the [discoveredStream] have a valid primary key declared? */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.discover

import io.airbyte.cdk.StreamIdentifier
import io.airbyte.cdk.command.SourceConfiguration
import io.airbyte.cdk.h2source.H2SourceOperations
import io.airbyte.cdk.jdbc.BigIntegerFieldType
import io.airbyte.cdk.jdbc.BooleanFieldType
import io.airbyte.cdk.jdbc.StringFieldType
import io.airbyte.protocol.models.v0.SyncMode
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.mockito.Mockito.mock
import org.mockito.Mockito.`when`

class JdbcAirbyteStreamFactoryTest {

private lateinit var config: SourceConfiguration
private lateinit var streamId: StreamIdentifier
private lateinit var discoveredStream: DiscoveredStream

@BeforeEach
fun setUp() {
config = mock(SourceConfiguration::class.java)
streamId = mock(StreamIdentifier::class.java)
discoveredStream = mock(DiscoveredStream::class.java)
`when`(streamId.name).thenReturn("test_stream")
`when`(streamId.namespace).thenReturn("test_namespace")
`when`(discoveredStream.id).thenReturn(streamId)
`when`(discoveredStream.columns)
.thenReturn(listOf(Field("id", BigIntegerFieldType), Field("name", StringFieldType)))
}

@Test
fun testCreate_withCdcAndWithPK() {
`when`(config.isCdc()).thenReturn(true)
`when`(discoveredStream.primaryKeyColumnIDs).thenReturn(listOf(listOf("id")))

val factory = H2SourceOperations()
val stream = factory.create(config, discoveredStream)

Assertions.assertEquals(
listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL),
stream.supportedSyncModes
)
Assertions.assertTrue(stream.sourceDefinedCursor)
Assertions.assertTrue(stream.isResumable)
}

@Test
fun testCreate_withCdcAndWithoutPK() {
`when`(config.isCdc()).thenReturn(true)
`when`(discoveredStream.primaryKeyColumnIDs).thenReturn(emptyList())

val factory = H2SourceOperations()
val stream = factory.create(config, discoveredStream)

Assertions.assertEquals(listOf(SyncMode.FULL_REFRESH), stream.supportedSyncModes)
Assertions.assertFalse(stream.sourceDefinedCursor)
Assertions.assertFalse(stream.isResumable)
}

@Test
fun testCreate_withNonCdcAndWithPK() {
`when`(config.isCdc()).thenReturn(false)
`when`(discoveredStream.primaryKeyColumnIDs).thenReturn(listOf(listOf("id")))

val factory = H2SourceOperations()
val stream = factory.create(config, discoveredStream)

Assertions.assertEquals(
listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL),
stream.supportedSyncModes
)
Assertions.assertFalse(stream.sourceDefinedCursor)
Assertions.assertTrue(stream.isResumable)
}

@Test
fun testCreate_withNonCdcAndWithoutPK() {
`when`(config.isCdc()).thenReturn(false)
`when`(discoveredStream.primaryKeyColumnIDs).thenReturn(emptyList())
`when`(discoveredStream.columns)
.thenReturn(listOf(Field("non_cursor_col", BooleanFieldType)))

val factory = H2SourceOperations()
val stream = factory.create(config, discoveredStream)

Assertions.assertEquals(listOf(SyncMode.FULL_REFRESH), stream.supportedSyncModes)
Assertions.assertFalse(stream.sourceDefinedCursor)
Assertions.assertFalse(stream.isResumable)
}
}
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ application {
airbyteBulkConnector {
core = 'extract'
toolkits = ['extract-jdbc', 'extract-cdc']
cdk = '0.300'
cdk = 'local'
}

dependencies {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,12 @@ class MySqlSourceCdcIntegrationTest {
columns = listOf(Field("k", IntFieldType), Field("v", StringFieldType)),
primaryKeyColumnIDs = listOf(listOf("k")),
)
val stream: AirbyteStream = MySqlSourceOperations().createGlobal(discoveredStream)
val stream: AirbyteStream =
MySqlSourceOperations()
.create(
MySqlSourceConfigurationFactory().make(config()),
discoveredStream,
)
val configuredStream: ConfiguredAirbyteStream =
CatalogHelpers.toDefaultConfiguredStream(stream)
.withSyncMode(SyncMode.INCREMENTAL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,12 @@ class MySqlSourceCursorBasedIntegrationTest {
columns = listOf(Field("k", IntFieldType), Field("v", StringFieldType)),
primaryKeyColumnIDs = listOf(listOf("k")),
)
val stream: AirbyteStream = MySqlSourceOperations().createGlobal(discoveredStream)
val stream: AirbyteStream =
MySqlSourceOperations()
.create(
MySqlSourceConfigurationFactory().make(config),
discoveredStream,
)
val configuredStream: ConfiguredAirbyteStream =
CatalogHelpers.toDefaultConfiguredStream(stream)
.withSyncMode(SyncMode.INCREMENTAL)
Expand Down

0 comments on commit f0a96b3

Please sign in to comment.