Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source-mysql: adopt bulk-cdk-toolkit-extract-cdc API changes #52039

Merged
merged 1 commit into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ application {
airbyteBulkConnector {
core = 'extract'
toolkits = ['extract-jdbc', 'extract-cdc']
cdk = '0.277'
cdk = '0.300'
}

dependencies {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.11.0
dockerImageTag: 3.11.1
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@ import io.airbyte.cdk.jdbc.JdbcConnectionFactory
import io.airbyte.cdk.jdbc.LongFieldType
import io.airbyte.cdk.jdbc.StringFieldType
import io.airbyte.cdk.read.Stream
import io.airbyte.cdk.read.cdc.CdcPartitionsCreator.OffsetInvalidNeedsResyncIllegalStateException
import io.airbyte.cdk.read.cdc.DebeziumInput
import io.airbyte.cdk.read.cdc.AbortDebeziumWarmStartState
import io.airbyte.cdk.read.cdc.DebeziumOffset
import io.airbyte.cdk.read.cdc.DebeziumOperations
import io.airbyte.cdk.read.cdc.DebeziumPropertiesBuilder
import io.airbyte.cdk.read.cdc.DebeziumRecordKey
import io.airbyte.cdk.read.cdc.DebeziumRecordValue
import io.airbyte.cdk.read.cdc.DebeziumSchemaHistory
import io.airbyte.cdk.read.cdc.DebeziumState
import io.airbyte.cdk.read.cdc.DebeziumWarmStartState
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is a "warm" start state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a starting state from which Debezium can resume quickly, as opposed to a cold start state where it needs to rebuild the database's schema history

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this particular change is in #52040

import io.airbyte.cdk.read.cdc.DeserializedRecord
import io.airbyte.cdk.read.cdc.InvalidDebeziumWarmStartState
import io.airbyte.cdk.read.cdc.ResetDebeziumWarmStartState
import io.airbyte.cdk.read.cdc.ValidDebeziumWarmStartState
import io.airbyte.cdk.ssh.TunnelSession
import io.airbyte.cdk.util.Jsons
import io.debezium.connector.mysql.MySqlConnector
Expand Down Expand Up @@ -63,8 +65,11 @@ class MySqlSourceDebeziumOperations(
random: Random = Random.Default,
) : DebeziumOperations<MySqlSourceCdcPosition> {
private val log = KotlinLogging.logger {}
private val cdcIncrementalConfiguration: CdcIncrementalConfiguration by lazy {
configuration.incrementalConfiguration as CdcIncrementalConfiguration
}

override fun deserialize(
override fun deserializeRecord(
key: DebeziumRecordKey,
value: DebeziumRecordValue,
stream: Stream,
Expand Down Expand Up @@ -130,29 +135,42 @@ class MySqlSourceDebeziumOperations(
override fun findStreamName(key: DebeziumRecordKey, value: DebeziumRecordValue): String? =
value.source["table"]?.asText()

override fun deserializeState(
opaqueStateValue: OpaqueStateValue,
): DebeziumWarmStartState {
val debeziumState: UnvalidatedDeserializedState =
try {
deserializeStateUnvalidated(opaqueStateValue)
} catch (e: Exception) {
log.error(e) { "Error deserializing incumbent state value." }
return AbortDebeziumWarmStartState(
"Error deserializing incumbent state value: ${e.message}"
)
}
return validate(debeziumState)
}

/**
* Checks if GTIDs from previously saved state (debeziumInput) are still valid on DB. And also
* check if binlog exists or not.
*
* Validate is not supposed to perform on synthetic state.
*/
private fun validate(debeziumState: DebeziumState): CdcStateValidateResult {
private fun validate(debeziumState: UnvalidatedDeserializedState): DebeziumWarmStartState {
val savedStateOffset: SavedOffset = parseSavedOffset(debeziumState)
val (_: MySqlSourceCdcPosition, gtidSet: String?) = queryPositionAndGtids()
if (gtidSet.isNullOrEmpty() && !savedStateOffset.gtidSet.isNullOrEmpty()) {
log.info {
return abortCdcSync(
"Connector used GTIDs previously, but MySQL server does not know of any GTIDs or they are not enabled"
}
return abortCdcSync()
)
}

val savedGtidSet = MySqlGtidSet(savedStateOffset.gtidSet)
val availableGtidSet = MySqlGtidSet(gtidSet)
if (!savedGtidSet.isContainedWithin(availableGtidSet)) {
log.info {
return abortCdcSync(
"Connector last known GTIDs are $savedGtidSet, but MySQL server only has $availableGtidSet"
}
return abortCdcSync()
)
}

// newGtidSet is gtids from server that hasn't been seen by this connector yet. If the set
Expand All @@ -161,49 +179,42 @@ class MySqlSourceDebeziumOperations(
if (!newGtidSet.isEmpty) {
val purgedGtidSet = queryPurgedIds()
if (!purgedGtidSet.isEmpty && !newGtidSet.subtract(purgedGtidSet).equals(newGtidSet)) {
log.info {
return abortCdcSync(
"Connector has not seen GTIDs $newGtidSet, but MySQL server has purged $purgedGtidSet"
}
return abortCdcSync()
)
}
}
if (!savedGtidSet.isEmpty) {
// If the connector has saved GTID set, we will use that to validate and skip
// binlog validation. GTID and binlog works in an independent way to ensure data
// integrity where GTID is for storing transactions and binlog is for storing changes
// in DB.
return CdcStateValidateResult.VALID
}
val existingLogFiles: List<String> = getBinaryLogFileNames()
val found = existingLogFiles.contains(savedStateOffset.position.fileName)
if (!found) {
log.info {
"Connector last known binlog file ${savedStateOffset.position.fileName} is " +
"not found in the server. Server has $existingLogFiles"
// If the connector has saved GTID set, we will use that to validate and skip
// binlog validation. GTID and binlog works in an independent way to ensure data
// integrity where GTID is for storing transactions and binlog is for storing changes
// in DB.
if (savedGtidSet.isEmpty) {
val existingLogFiles: List<String> = getBinaryLogFileNames()
val found = existingLogFiles.contains(savedStateOffset.position.fileName)
if (!found) {
return abortCdcSync(
"Connector last known binlog file ${savedStateOffset.position.fileName} is not found in the server. Server has $existingLogFiles"
)
}
return abortCdcSync()
}
return CdcStateValidateResult.VALID
return ValidDebeziumWarmStartState(debeziumState.offset, debeziumState.schemaHistory)
}

private fun abortCdcSync(): CdcStateValidateResult {
val cdcIncrementalConfiguration: CdcIncrementalConfiguration =
configuration.incrementalConfiguration as CdcIncrementalConfiguration
return when (cdcIncrementalConfiguration.invalidCdcCursorPositionBehavior) {
InvalidCdcCursorPositionBehavior.FAIL_SYNC -> {
log.warn { "Saved offset no longer present on the server. aborting sync." }
CdcStateValidateResult.INVALID_ABORT
}
InvalidCdcCursorPositionBehavior.RESET_SYNC -> {
log.warn {
"Saved offset no longer present on the server, Airbyte is going to trigger a sync from scratch."
}
CdcStateValidateResult.INVALID_RESET
}
private fun abortCdcSync(reason: String): InvalidDebeziumWarmStartState =
when (cdcIncrementalConfiguration.invalidCdcCursorPositionBehavior) {
InvalidCdcCursorPositionBehavior.FAIL_SYNC ->
AbortDebeziumWarmStartState(
"Saved offset no longer present on the server, please reset the connection, " +
"and then increase binlog retention and/or increase sync frequency. " +
"$reason."
)
InvalidCdcCursorPositionBehavior.RESET_SYNC ->
ResetDebeziumWarmStartState(
"Saved offset no longer present on the server. $reason."
)
}
}

private fun parseSavedOffset(debeziumState: DebeziumState): SavedOffset {
private fun parseSavedOffset(debeziumState: UnvalidatedDeserializedState): SavedOffset {
val position: MySqlSourceCdcPosition = position(debeziumState.offset)
val gtidSet: String? = debeziumState.offset.wrapped.values.first()["gtids"]?.asText()
return SavedOffset(position, gtidSet)
Expand Down Expand Up @@ -233,7 +244,7 @@ class MySqlSourceDebeziumOperations(
return MySqlSourceCdcPosition(file.toString(), pos)
}

override fun synthesize(): DebeziumInput {
override fun generateColdStartOffset(): DebeziumOffset {
val (mySqlSourceCdcPosition: MySqlSourceCdcPosition, gtidSet: String?) =
queryPositionAndGtids()
val topicPrefixName: String = DebeziumPropertiesBuilder.sanitizeTopicPrefix(databaseName)
Expand All @@ -254,8 +265,7 @@ class MySqlSourceDebeziumOperations(
}
val offset = DebeziumOffset(mapOf(key to value))
log.info { "Constructed synthetic $offset." }
val state = DebeziumState(offset, schemaHistory = null)
return DebeziumInput(syntheticProperties, state, isSynthetic = true)
return offset
}

private fun queryPositionAndGtids(): Pair<MySqlSourceCdcPosition, String?> {
Expand Down Expand Up @@ -319,49 +329,37 @@ class MySqlSourceDebeziumOperations(
}
}

override fun deserialize(
opaqueStateValue: OpaqueStateValue,
streams: List<Stream>
): DebeziumInput {
val debeziumState: DebeziumState =
try {
deserializeDebeziumState(opaqueStateValue)
} catch (e: Exception) {
throw ConfigErrorException("Error deserializing $opaqueStateValue", e)
}
val cdcValidationResult = validate(debeziumState)
if (cdcValidationResult != CdcStateValidateResult.VALID) {
if (cdcValidationResult == CdcStateValidateResult.INVALID_ABORT) {
throw ConfigErrorException(
"Saved offset no longer present on the server. Please reset the connection, and then increase binlog retention and/or increase sync frequency."
)
}
if (cdcValidationResult == CdcStateValidateResult.INVALID_RESET) {
throw OffsetInvalidNeedsResyncIllegalStateException()
}
return synthesize()
}
override fun generateColdStartProperties(): Map<String, String> =
DebeziumPropertiesBuilder()
.with(commonProperties)
// https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-snapshot-mode
// We use the recovery property cause using this mode will instruct Debezium to
// construct the db schema history. Note that we used to use schema_only_recovery mode
// instead, but this mode has been deprecated.
.with("snapshot.mode", "recovery")
.withStreams(listOf())
.buildMap()

val properties: Map<String, String> =
DebeziumPropertiesBuilder().with(commonProperties).withStreams(streams).buildMap()
return DebeziumInput(properties, debeziumState, isSynthetic = false)
}
override fun generateWarmStartProperties(streams: List<Stream>): Map<String, String> =
DebeziumPropertiesBuilder().with(commonProperties).withStreams(streams).buildMap()

override fun serialize(debeziumState: DebeziumState): OpaqueStateValue {
override fun serializeState(
offset: DebeziumOffset,
schemaHistory: DebeziumSchemaHistory?
): OpaqueStateValue {
val stateNode: ObjectNode = Jsons.objectNode()
// Serialize offset.
val offsetNode: JsonNode =
Jsons.objectNode().apply {
for ((k, v) in debeziumState.offset.wrapped) {
for ((k, v) in offset.wrapped) {
put(Jsons.writeValueAsString(k), Jsons.writeValueAsString(v))
}
}
stateNode.set<JsonNode>(MYSQL_CDC_OFFSET, offsetNode)
// Serialize schema history.
val schemaHistory: List<HistoryRecord>? = debeziumState.schemaHistory?.wrapped
if (schemaHistory != null) {
val uncompressedString: String =
schemaHistory.joinToString(separator = "\n") {
schemaHistory.wrapped.joinToString(separator = "\n") {
DocumentWriter.defaultWriter().write(it.document())
}
if (uncompressedString.length <= MAX_UNCOMPRESSED_LENGTH) {
Expand Down Expand Up @@ -427,24 +425,11 @@ class MySqlSourceDebeziumOperations(
MySqlSourceCdcTemporalConverter::class
)

val serverTimezone: String? =
(configuration.incrementalConfiguration as CdcIncrementalConfiguration).serverTimezone
if (!serverTimezone.isNullOrBlank()) {
dbzPropertiesBuilder.with("database.connectionTimezone", serverTimezone)
}
dbzPropertiesBuilder.buildMap()
}
cdcIncrementalConfiguration.serverTimezone
?.takeUnless { it.isBlank() }
?.let { dbzPropertiesBuilder.withDatabase("connectionTimezone", it) }

val syntheticProperties: Map<String, String> by lazy {
DebeziumPropertiesBuilder()
.with(commonProperties)
// https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-snapshot-mode
// We use the recovery property cause using this mode will instruct Debezium to
// construct the db schema history. Note that we used to use schema_only_recovery mode
// instead, but this mode has been deprecated.
.with("snapshot.mode", "recovery")
.withStreams(listOf())
.buildMap()
dbzPropertiesBuilder.buildMap()
}

companion object {
Expand All @@ -470,7 +455,9 @@ class MySqlSourceDebeziumOperations(
val INTERNAL_CONVERTER_CONFIG: Map<String, String> =
java.util.Map.of(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false.toString())

internal fun deserializeDebeziumState(opaqueStateValue: OpaqueStateValue): DebeziumState {
internal fun deserializeStateUnvalidated(
opaqueStateValue: OpaqueStateValue
): UnvalidatedDeserializedState {
val stateNode: ObjectNode = opaqueStateValue[STATE] as ObjectNode
// Deserialize offset.
val offsetNode: ObjectNode = stateNode[MYSQL_CDC_OFFSET] as ObjectNode
Expand All @@ -486,15 +473,14 @@ class MySqlSourceDebeziumOperations(
val offset = DebeziumOffset(offsetMap)
// Deserialize schema history.
val schemaNode: JsonNode =
stateNode[MYSQL_DB_HISTORY] ?: return DebeziumState(offset, schemaHistory = null)
stateNode[MYSQL_DB_HISTORY] ?: return UnvalidatedDeserializedState(offset)
val isCompressed: Boolean = stateNode[IS_COMPRESSED]?.asBoolean() ?: false
val uncompressedString: String =
if (isCompressed) {
val textValue: String = schemaNode.textValue()
val compressedBytes: ByteArray =
textValue.substring(1, textValue.length - 1).toByteArray(Charsets.UTF_8)
val decoded = Base64.decodeBase64(compressedBytes)

GZIPInputStream(ByteArrayInputStream(decoded)).reader(Charsets.UTF_8).readText()
} else {
schemaNode.textValue()
Expand All @@ -504,9 +490,14 @@ class MySqlSourceDebeziumOperations(
.lines()
.filter { it.isNotBlank() }
.map { HistoryRecord(DocumentReader.defaultReader().read(it)) }
return DebeziumState(offset, DebeziumSchemaHistory(schemaHistoryList))
return UnvalidatedDeserializedState(offset, DebeziumSchemaHistory(schemaHistoryList))
}

data class UnvalidatedDeserializedState(
val offset: DebeziumOffset,
val schemaHistory: DebeziumSchemaHistory? = null,
)

internal fun position(offset: DebeziumOffset): MySqlSourceCdcPosition {
if (offset.wrapped.size != 1) {
throw ConfigErrorException("Expected exactly 1 key in $offset")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ import io.airbyte.cdk.read.Where
import io.airbyte.cdk.read.WhereClauseLeafNode
import io.airbyte.cdk.read.WhereClauseNode
import io.airbyte.cdk.read.WhereNode
import io.airbyte.cdk.read.cdc.DebeziumState
import io.airbyte.cdk.read.cdc.DebeziumOffset
import io.airbyte.cdk.util.Jsons
import io.micronaut.context.annotation.Primary
import jakarta.inject.Singleton
Expand Down Expand Up @@ -102,10 +102,9 @@ class MySqlSourceOperations :
if (globalStateValue == null) {
return
}
val debeziumState: DebeziumState =
MySqlSourceDebeziumOperations.deserializeDebeziumState(globalStateValue)
val position: MySqlSourceCdcPosition =
MySqlSourceDebeziumOperations.position(debeziumState.offset)
val offset: DebeziumOffset =
MySqlSourceDebeziumOperations.deserializeStateUnvalidated(globalStateValue).offset
val position: MySqlSourceCdcPosition = MySqlSourceDebeziumOperations.position(offset)
recordData.set<JsonNode>(
MySqlSourceCdcMetaFields.CDC_LOG_FILE.id,
CdcStringMetaFieldType.jsonEncoder.encode(position.fileName),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ import io.airbyte.cdk.output.BufferingOutputConsumer
import io.airbyte.cdk.read.ConcurrencyResource
import io.airbyte.cdk.read.ConfiguredSyncMode
import io.airbyte.cdk.read.DefaultJdbcSharedState
import io.airbyte.cdk.read.Feed
import io.airbyte.cdk.read.SelectQuerier
import io.airbyte.cdk.read.StateQuerier
import io.airbyte.cdk.read.StateManager
import io.airbyte.cdk.read.Stream
import io.airbyte.cdk.read.StreamFeedBootstrap
import io.airbyte.cdk.util.Jsons
Expand Down Expand Up @@ -152,15 +151,8 @@ class MySqlSourceJdbcPartitionFactoryTest {
recordData: ObjectNode
) {}
},
stateQuerier =
object : StateQuerier {
override val feeds: List<Feed> = listOf(stream)
override fun current(feed: Feed): OpaqueStateValue? =
if (feed == stream) incumbentStateValue else null
override fun resetFeedStates() {
/* no-op */
}
},
stateManager =
StateManager(initialStreamStates = mapOf(stream to incumbentStateValue)),
stream,
)
}
Expand Down
Loading
Loading