From a466eb0511620554ec29307cc91212d48e8e28cd Mon Sep 17 00:00:00 2001 From: Marius Posta Date: Mon, 20 Jan 2025 16:27:03 -0500 Subject: [PATCH] source-mysql: adopt bulk-cdk-toolkit-extract-cdc API changes --- .../connectors/source-mysql/build.gradle | 2 +- .../connectors/source-mysql/metadata.yaml | 2 +- .../mysql/MySqlSourceDebeziumOperations.kt | 189 +++++++++--------- .../source/mysql/MySqlSourceOperations.kt | 9 +- .../MySqlSourceJdbcPartitionFactoryTest.kt | 14 +- docs/integrations/sources/mysql.md | 7 +- 6 files changed, 103 insertions(+), 120 deletions(-) diff --git a/airbyte-integrations/connectors/source-mysql/build.gradle b/airbyte-integrations/connectors/source-mysql/build.gradle index 921be61051e5f..c0ca3326509f4 100644 --- a/airbyte-integrations/connectors/source-mysql/build.gradle +++ b/airbyte-integrations/connectors/source-mysql/build.gradle @@ -9,7 +9,7 @@ application { airbyteBulkConnector { core = 'extract' toolkits = ['extract-jdbc', 'extract-cdc'] - cdk = '0.277' + cdk = '0.300' } dependencies { diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml index 47d3f35f5e563..e32d319a50844 100644 --- a/airbyte-integrations/connectors/source-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 3.11.0 + dockerImageTag: 3.11.1 dockerRepository: airbyte/source-mysql documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceDebeziumOperations.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceDebeziumOperations.kt index 3256101a0a247..2a6df6e6a6b98 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceDebeziumOperations.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceDebeziumOperations.kt @@ -20,16 +20,18 @@ import io.airbyte.cdk.jdbc.JdbcConnectionFactory import io.airbyte.cdk.jdbc.LongFieldType import io.airbyte.cdk.jdbc.StringFieldType import io.airbyte.cdk.read.Stream -import io.airbyte.cdk.read.cdc.CdcPartitionsCreator.OffsetInvalidNeedsResyncIllegalStateException -import io.airbyte.cdk.read.cdc.DebeziumInput +import io.airbyte.cdk.read.cdc.AbortDebeziumWarmStartState import io.airbyte.cdk.read.cdc.DebeziumOffset import io.airbyte.cdk.read.cdc.DebeziumOperations import io.airbyte.cdk.read.cdc.DebeziumPropertiesBuilder import io.airbyte.cdk.read.cdc.DebeziumRecordKey import io.airbyte.cdk.read.cdc.DebeziumRecordValue import io.airbyte.cdk.read.cdc.DebeziumSchemaHistory -import io.airbyte.cdk.read.cdc.DebeziumState +import io.airbyte.cdk.read.cdc.DebeziumWarmStartState import io.airbyte.cdk.read.cdc.DeserializedRecord +import io.airbyte.cdk.read.cdc.InvalidDebeziumWarmStartState +import io.airbyte.cdk.read.cdc.ResetDebeziumWarmStartState +import io.airbyte.cdk.read.cdc.ValidDebeziumWarmStartState import io.airbyte.cdk.ssh.TunnelSession import io.airbyte.cdk.util.Jsons import io.debezium.connector.mysql.MySqlConnector @@ -63,8 +65,11 @@ class MySqlSourceDebeziumOperations( random: Random = Random.Default, ) : DebeziumOperations { private val log = KotlinLogging.logger {} + private val cdcIncrementalConfiguration: CdcIncrementalConfiguration by lazy { + configuration.incrementalConfiguration as CdcIncrementalConfiguration + } - override fun deserialize( + override fun deserializeRecord( key: DebeziumRecordKey, value: DebeziumRecordValue, stream: Stream, @@ -130,29 +135,42 @@ class MySqlSourceDebeziumOperations( override fun findStreamName(key: DebeziumRecordKey, value: DebeziumRecordValue): String? = value.source["table"]?.asText() + override fun deserializeState( + opaqueStateValue: OpaqueStateValue, + ): DebeziumWarmStartState { + val debeziumState: UnvalidatedDeserializedState = + try { + deserializeStateUnvalidated(opaqueStateValue) + } catch (e: Exception) { + log.error(e) { "Error deserializing incumbent state value." } + return AbortDebeziumWarmStartState( + "Error deserializing incumbent state value: ${e.message}" + ) + } + return validate(debeziumState) + } + /** * Checks if GTIDs from previously saved state (debeziumInput) are still valid on DB. And also * check if binlog exists or not. * * Validate is not supposed to perform on synthetic state. */ - private fun validate(debeziumState: DebeziumState): CdcStateValidateResult { + private fun validate(debeziumState: UnvalidatedDeserializedState): DebeziumWarmStartState { val savedStateOffset: SavedOffset = parseSavedOffset(debeziumState) val (_: MySqlSourceCdcPosition, gtidSet: String?) = queryPositionAndGtids() if (gtidSet.isNullOrEmpty() && !savedStateOffset.gtidSet.isNullOrEmpty()) { - log.info { + return abortCdcSync( "Connector used GTIDs previously, but MySQL server does not know of any GTIDs or they are not enabled" - } - return abortCdcSync() + ) } val savedGtidSet = MySqlGtidSet(savedStateOffset.gtidSet) val availableGtidSet = MySqlGtidSet(gtidSet) if (!savedGtidSet.isContainedWithin(availableGtidSet)) { - log.info { + return abortCdcSync( "Connector last known GTIDs are $savedGtidSet, but MySQL server only has $availableGtidSet" - } - return abortCdcSync() + ) } // newGtidSet is gtids from server that hasn't been seen by this connector yet. If the set @@ -161,49 +179,42 @@ class MySqlSourceDebeziumOperations( if (!newGtidSet.isEmpty) { val purgedGtidSet = queryPurgedIds() if (!purgedGtidSet.isEmpty && !newGtidSet.subtract(purgedGtidSet).equals(newGtidSet)) { - log.info { + return abortCdcSync( "Connector has not seen GTIDs $newGtidSet, but MySQL server has purged $purgedGtidSet" - } - return abortCdcSync() + ) } } - if (!savedGtidSet.isEmpty) { - // If the connector has saved GTID set, we will use that to validate and skip - // binlog validation. GTID and binlog works in an independent way to ensure data - // integrity where GTID is for storing transactions and binlog is for storing changes - // in DB. - return CdcStateValidateResult.VALID - } - val existingLogFiles: List = getBinaryLogFileNames() - val found = existingLogFiles.contains(savedStateOffset.position.fileName) - if (!found) { - log.info { - "Connector last known binlog file ${savedStateOffset.position.fileName} is " + - "not found in the server. Server has $existingLogFiles" + // If the connector has saved GTID set, we will use that to validate and skip + // binlog validation. GTID and binlog works in an independent way to ensure data + // integrity where GTID is for storing transactions and binlog is for storing changes + // in DB. + if (savedGtidSet.isEmpty) { + val existingLogFiles: List = getBinaryLogFileNames() + val found = existingLogFiles.contains(savedStateOffset.position.fileName) + if (!found) { + return abortCdcSync( + "Connector last known binlog file ${savedStateOffset.position.fileName} is not found in the server. Server has $existingLogFiles" + ) } - return abortCdcSync() } - return CdcStateValidateResult.VALID + return ValidDebeziumWarmStartState(debeziumState.offset, debeziumState.schemaHistory) } - private fun abortCdcSync(): CdcStateValidateResult { - val cdcIncrementalConfiguration: CdcIncrementalConfiguration = - configuration.incrementalConfiguration as CdcIncrementalConfiguration - return when (cdcIncrementalConfiguration.invalidCdcCursorPositionBehavior) { - InvalidCdcCursorPositionBehavior.FAIL_SYNC -> { - log.warn { "Saved offset no longer present on the server. aborting sync." } - CdcStateValidateResult.INVALID_ABORT - } - InvalidCdcCursorPositionBehavior.RESET_SYNC -> { - log.warn { - "Saved offset no longer present on the server, Airbyte is going to trigger a sync from scratch." - } - CdcStateValidateResult.INVALID_RESET - } + private fun abortCdcSync(reason: String): InvalidDebeziumWarmStartState = + when (cdcIncrementalConfiguration.invalidCdcCursorPositionBehavior) { + InvalidCdcCursorPositionBehavior.FAIL_SYNC -> + AbortDebeziumWarmStartState( + "Saved offset no longer present on the server, please reset the connection, " + + "and then increase binlog retention and/or increase sync frequency. " + + "$reason." + ) + InvalidCdcCursorPositionBehavior.RESET_SYNC -> + ResetDebeziumWarmStartState( + "Saved offset no longer present on the server. $reason." + ) } - } - private fun parseSavedOffset(debeziumState: DebeziumState): SavedOffset { + private fun parseSavedOffset(debeziumState: UnvalidatedDeserializedState): SavedOffset { val position: MySqlSourceCdcPosition = position(debeziumState.offset) val gtidSet: String? = debeziumState.offset.wrapped.values.first()["gtids"]?.asText() return SavedOffset(position, gtidSet) @@ -233,7 +244,7 @@ class MySqlSourceDebeziumOperations( return MySqlSourceCdcPosition(file.toString(), pos) } - override fun synthesize(): DebeziumInput { + override fun generateColdStartOffset(): DebeziumOffset { val (mySqlSourceCdcPosition: MySqlSourceCdcPosition, gtidSet: String?) = queryPositionAndGtids() val topicPrefixName: String = DebeziumPropertiesBuilder.sanitizeTopicPrefix(databaseName) @@ -254,8 +265,7 @@ class MySqlSourceDebeziumOperations( } val offset = DebeziumOffset(mapOf(key to value)) log.info { "Constructed synthetic $offset." } - val state = DebeziumState(offset, schemaHistory = null) - return DebeziumInput(syntheticProperties, state, isSynthetic = true) + return offset } private fun queryPositionAndGtids(): Pair { @@ -319,49 +329,37 @@ class MySqlSourceDebeziumOperations( } } - override fun deserialize( - opaqueStateValue: OpaqueStateValue, - streams: List - ): DebeziumInput { - val debeziumState: DebeziumState = - try { - deserializeDebeziumState(opaqueStateValue) - } catch (e: Exception) { - throw ConfigErrorException("Error deserializing $opaqueStateValue", e) - } - val cdcValidationResult = validate(debeziumState) - if (cdcValidationResult != CdcStateValidateResult.VALID) { - if (cdcValidationResult == CdcStateValidateResult.INVALID_ABORT) { - throw ConfigErrorException( - "Saved offset no longer present on the server. Please reset the connection, and then increase binlog retention and/or increase sync frequency." - ) - } - if (cdcValidationResult == CdcStateValidateResult.INVALID_RESET) { - throw OffsetInvalidNeedsResyncIllegalStateException() - } - return synthesize() - } + override fun generateColdStartProperties(): Map = + DebeziumPropertiesBuilder() + .with(commonProperties) + // https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-snapshot-mode + // We use the recovery property cause using this mode will instruct Debezium to + // construct the db schema history. Note that we used to use schema_only_recovery mode + // instead, but this mode has been deprecated. + .with("snapshot.mode", "recovery") + .withStreams(listOf()) + .buildMap() - val properties: Map = - DebeziumPropertiesBuilder().with(commonProperties).withStreams(streams).buildMap() - return DebeziumInput(properties, debeziumState, isSynthetic = false) - } + override fun generateWarmStartProperties(streams: List): Map = + DebeziumPropertiesBuilder().with(commonProperties).withStreams(streams).buildMap() - override fun serialize(debeziumState: DebeziumState): OpaqueStateValue { + override fun serializeState( + offset: DebeziumOffset, + schemaHistory: DebeziumSchemaHistory? + ): OpaqueStateValue { val stateNode: ObjectNode = Jsons.objectNode() // Serialize offset. val offsetNode: JsonNode = Jsons.objectNode().apply { - for ((k, v) in debeziumState.offset.wrapped) { + for ((k, v) in offset.wrapped) { put(Jsons.writeValueAsString(k), Jsons.writeValueAsString(v)) } } stateNode.set(MYSQL_CDC_OFFSET, offsetNode) // Serialize schema history. - val schemaHistory: List? = debeziumState.schemaHistory?.wrapped if (schemaHistory != null) { val uncompressedString: String = - schemaHistory.joinToString(separator = "\n") { + schemaHistory.wrapped.joinToString(separator = "\n") { DocumentWriter.defaultWriter().write(it.document()) } if (uncompressedString.length <= MAX_UNCOMPRESSED_LENGTH) { @@ -427,24 +425,11 @@ class MySqlSourceDebeziumOperations( MySqlSourceCdcTemporalConverter::class ) - val serverTimezone: String? = - (configuration.incrementalConfiguration as CdcIncrementalConfiguration).serverTimezone - if (!serverTimezone.isNullOrBlank()) { - dbzPropertiesBuilder.with("database.connectionTimezone", serverTimezone) - } - dbzPropertiesBuilder.buildMap() - } + cdcIncrementalConfiguration.serverTimezone + ?.takeUnless { it.isBlank() } + ?.let { dbzPropertiesBuilder.withDatabase("connectionTimezone", it) } - val syntheticProperties: Map by lazy { - DebeziumPropertiesBuilder() - .with(commonProperties) - // https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-snapshot-mode - // We use the recovery property cause using this mode will instruct Debezium to - // construct the db schema history. Note that we used to use schema_only_recovery mode - // instead, but this mode has been deprecated. - .with("snapshot.mode", "recovery") - .withStreams(listOf()) - .buildMap() + dbzPropertiesBuilder.buildMap() } companion object { @@ -470,7 +455,9 @@ class MySqlSourceDebeziumOperations( val INTERNAL_CONVERTER_CONFIG: Map = java.util.Map.of(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false.toString()) - internal fun deserializeDebeziumState(opaqueStateValue: OpaqueStateValue): DebeziumState { + internal fun deserializeStateUnvalidated( + opaqueStateValue: OpaqueStateValue + ): UnvalidatedDeserializedState { val stateNode: ObjectNode = opaqueStateValue[STATE] as ObjectNode // Deserialize offset. val offsetNode: ObjectNode = stateNode[MYSQL_CDC_OFFSET] as ObjectNode @@ -486,7 +473,7 @@ class MySqlSourceDebeziumOperations( val offset = DebeziumOffset(offsetMap) // Deserialize schema history. val schemaNode: JsonNode = - stateNode[MYSQL_DB_HISTORY] ?: return DebeziumState(offset, schemaHistory = null) + stateNode[MYSQL_DB_HISTORY] ?: return UnvalidatedDeserializedState(offset) val isCompressed: Boolean = stateNode[IS_COMPRESSED]?.asBoolean() ?: false val uncompressedString: String = if (isCompressed) { @@ -494,7 +481,6 @@ class MySqlSourceDebeziumOperations( val compressedBytes: ByteArray = textValue.substring(1, textValue.length - 1).toByteArray(Charsets.UTF_8) val decoded = Base64.decodeBase64(compressedBytes) - GZIPInputStream(ByteArrayInputStream(decoded)).reader(Charsets.UTF_8).readText() } else { schemaNode.textValue() @@ -504,9 +490,14 @@ class MySqlSourceDebeziumOperations( .lines() .filter { it.isNotBlank() } .map { HistoryRecord(DocumentReader.defaultReader().read(it)) } - return DebeziumState(offset, DebeziumSchemaHistory(schemaHistoryList)) + return UnvalidatedDeserializedState(offset, DebeziumSchemaHistory(schemaHistoryList)) } + data class UnvalidatedDeserializedState( + val offset: DebeziumOffset, + val schemaHistory: DebeziumSchemaHistory? = null, + ) + internal fun position(offset: DebeziumOffset): MySqlSourceCdcPosition { if (offset.wrapped.size != 1) { throw ConfigErrorException("Expected exactly 1 key in $offset") diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceOperations.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceOperations.kt index dc7930ea91bc7..275d9c42aba1b 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceOperations.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceOperations.kt @@ -63,7 +63,7 @@ import io.airbyte.cdk.read.Where import io.airbyte.cdk.read.WhereClauseLeafNode import io.airbyte.cdk.read.WhereClauseNode import io.airbyte.cdk.read.WhereNode -import io.airbyte.cdk.read.cdc.DebeziumState +import io.airbyte.cdk.read.cdc.DebeziumOffset import io.airbyte.cdk.util.Jsons import io.micronaut.context.annotation.Primary import jakarta.inject.Singleton @@ -102,10 +102,9 @@ class MySqlSourceOperations : if (globalStateValue == null) { return } - val debeziumState: DebeziumState = - MySqlSourceDebeziumOperations.deserializeDebeziumState(globalStateValue) - val position: MySqlSourceCdcPosition = - MySqlSourceDebeziumOperations.position(debeziumState.offset) + val offset: DebeziumOffset = + MySqlSourceDebeziumOperations.deserializeStateUnvalidated(globalStateValue).offset + val position: MySqlSourceCdcPosition = MySqlSourceDebeziumOperations.position(offset) recordData.set( MySqlSourceCdcMetaFields.CDC_LOG_FILE.id, CdcStringMetaFieldType.jsonEncoder.encode(position.fileName), diff --git a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceJdbcPartitionFactoryTest.kt b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceJdbcPartitionFactoryTest.kt index 2075d3f445783..7014e52abf6f6 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceJdbcPartitionFactoryTest.kt +++ b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceJdbcPartitionFactoryTest.kt @@ -21,9 +21,8 @@ import io.airbyte.cdk.output.BufferingOutputConsumer import io.airbyte.cdk.read.ConcurrencyResource import io.airbyte.cdk.read.ConfiguredSyncMode import io.airbyte.cdk.read.DefaultJdbcSharedState -import io.airbyte.cdk.read.Feed import io.airbyte.cdk.read.SelectQuerier -import io.airbyte.cdk.read.StateQuerier +import io.airbyte.cdk.read.StateManager import io.airbyte.cdk.read.Stream import io.airbyte.cdk.read.StreamFeedBootstrap import io.airbyte.cdk.util.Jsons @@ -152,15 +151,8 @@ class MySqlSourceJdbcPartitionFactoryTest { recordData: ObjectNode ) {} }, - stateQuerier = - object : StateQuerier { - override val feeds: List = listOf(stream) - override fun current(feed: Feed): OpaqueStateValue? = - if (feed == stream) incumbentStateValue else null - override fun resetFeedStates() { - /* no-op */ - } - }, + stateManager = + StateManager(initialStreamStates = mapOf(stream to incumbentStateValue)), stream, ) } diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index 05292f8555562..d0c671c52a4c3 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -226,10 +226,11 @@ Any database or table encoding combination of charset and collation is supported | Version | Date | Pull Request | Subject | |:------------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| -| 3.11.0 | 2025-01-14 | [51545](https://github.com/airbytehq/airbyte/pull/51545) | Promoting release candidate 3.11.0-rc.1 to a main version. | +| 3.11.1 | 2025-01-30 | [52039](https://github.com/airbytehq/airbyte/pull/52039) | Adopt latest API changes from Bulk CDK. | +| 3.11.0 | 2025-01-14 | [51545](https://github.com/airbytehq/airbyte/pull/51545) | Promoting release candidate 3.11.0-rc.1 to a main version. | | 3.11.0-rc.1 | 2025-01-09 | [51029](https://github.com/airbytehq/airbyte/pull/51029) | Fix unnecessary schema change when upgrading from legacy mysql source. | -| 3.10.1 | 2025-01-10 | [51510](https://github.com/airbytehq/airbyte/pull/51510) | Use a non root base image | -| 3.10.0 | 2025-01-09 | [51008](https://github.com/airbytehq/airbyte/pull/51008) | Promoting release candidate 3.10.0-rc.9 to a main version. | +| 3.10.1 | 2025-01-10 | [51510](https://github.com/airbytehq/airbyte/pull/51510) | Use a non root base image | +| 3.10.0 | 2025-01-09 | [51008](https://github.com/airbytehq/airbyte/pull/51008) | Promoting release candidate 3.10.0-rc.9 to a main version. | | 3.10.0-rc.9 | 2025-01-08 | [50987](https://github.com/airbytehq/airbyte/pull/50987) | Increase Debezium shutdown timeout. | | 3.10.0-rc.8 | 2025-01-07 | [50965](https://github.com/airbytehq/airbyte/pull/50965) | Fix bug introduced in 3.10.0-rc.3. | | 3.10.0-rc.7 | 2024-12-27 | [50437](https://github.com/airbytehq/airbyte/pull/50437) | Compatibility with MySQL Views. |