Skip to content

Commit

Permalink
bulk-cdk-toolkit-extract-cdc: API changes (#52040)
Browse files Browse the repository at this point in the history
  • Loading branch information
Marius Posta authored and stephane-airbyte committed Feb 3, 2025
1 parent 16464ed commit 7b8af6b
Show file tree
Hide file tree
Showing 10 changed files with 392 additions and 380 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.cdk.read.cdc

import io.airbyte.cdk.StreamIdentifier
import io.airbyte.cdk.command.OpaqueStateValue
import io.airbyte.cdk.read.ConcurrencyResource
import io.airbyte.cdk.read.PartitionReadCheckpoint
import io.airbyte.cdk.read.PartitionReader
Expand All @@ -15,7 +16,7 @@ import io.debezium.engine.ChangeEvent
import io.debezium.engine.DebeziumEngine
import io.debezium.engine.format.Json
import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.*
import java.util.Properties
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicReference
import java.util.function.Consumer
Expand All @@ -32,12 +33,15 @@ open class CdcPartitionReader<T : Comparable<T>>(
val streamRecordConsumers: Map<StreamIdentifier, StreamRecordConsumer>,
val readerOps: CdcPartitionReaderDebeziumOperations<T>,
val upperBound: T,
val input: DebeziumInput,
) : PartitionReader {
val debeziumProperties: Map<String, String>,
val startingOffset: DebeziumOffset,
val startingSchemaHistory: DebeziumSchemaHistory?,
val isInputStateSynthetic: Boolean,
) : UnlimitedTimePartitionReader {
private val log = KotlinLogging.logger {}
private val acquiredThread = AtomicReference<ConcurrencyResource.AcquiredThread>()
private lateinit var stateFilesAccessor: DebeziumStateFilesAccessor
private lateinit var properties: Properties
private lateinit var decoratedProperties: Properties
private lateinit var engine: DebeziumEngine<ChangeEvent<String?, String?>>

internal val closeReasonReference = AtomicReference<CloseReason>()
Expand Down Expand Up @@ -65,19 +69,19 @@ open class CdcPartitionReader<T : Comparable<T>>(
}

override suspend fun run() {
stateFilesAccessor.writeOffset(input.state.offset)
if (input.state.schemaHistory != null) {
stateFilesAccessor.writeSchema(input.state.schemaHistory)
stateFilesAccessor.writeOffset(startingOffset)
if (startingSchemaHistory != null) {
stateFilesAccessor.writeSchema(startingSchemaHistory)
}
properties =
decoratedProperties =
DebeziumPropertiesBuilder()
.with(input.properties)
.with(debeziumProperties)
.withOffsetFile(stateFilesAccessor.offsetFilePath)
.withSchemaHistoryFile(stateFilesAccessor.schemaFilePath)
.build()
engine =
DebeziumEngine.create(Json::class.java)
.using(properties)
.using(decoratedProperties)
.using(ConnectorCallback())
.using(CompletionCallback())
.notifying(EventConsumer(coroutineContext))
Expand Down Expand Up @@ -117,17 +121,15 @@ open class CdcPartitionReader<T : Comparable<T>>(
}

override fun checkpoint(): PartitionReadCheckpoint {
val offset: DebeziumOffset = stateFilesAccessor.readUpdatedOffset(input.state.offset)
val expectsSchemaHistoryFile = DebeziumPropertiesBuilder().with(properties).expectsSchemaHistoryFile
log.info{"SGX checkpoint, offset=$offset, expectsSchemaHistoryFile =$expectsSchemaHistoryFile, stateFilesAccessor=$stateFilesAccessor"}
val offset: DebeziumOffset = stateFilesAccessor.readUpdatedOffset(startingOffset)
val schemaHistory: DebeziumSchemaHistory? =
if (expectsSchemaHistoryFile) {
if (DebeziumPropertiesBuilder().with(decoratedProperties).expectsSchemaHistoryFile) {
stateFilesAccessor.readSchema()
} else {
null
}
val output = DebeziumState(offset, schemaHistory)
return PartitionReadCheckpoint(readerOps.serialize(output), numEmittedRecords.get())
val serializedState: OpaqueStateValue = readerOps.serializeState(offset, schemaHistory)
return PartitionReadCheckpoint(serializedState, numEmittedRecords.get())
}

inner class EventConsumer(
Expand Down Expand Up @@ -177,7 +179,7 @@ open class CdcPartitionReader<T : Comparable<T>>(
// Ignore events which can't be mapped to a stream.
?: return EventType.RECORD_DISCARDED_BY_STREAM_ID
val deserializedRecord: DeserializedRecord =
readerOps.deserialize(event.key, event.value, streamRecordConsumer.stream)
readerOps.deserializeRecord(event.key, event.value, streamRecordConsumer.stream)
// Ignore events which can't be deserialized into records.
?: return EventType.RECORD_DISCARDED_BY_DESERIALIZE
// Emit the record at the end of the happy path.
Expand Down Expand Up @@ -218,7 +220,7 @@ open class CdcPartitionReader<T : Comparable<T>>(
}

private fun findCloseReason(event: DebeziumEvent, eventType: EventType): CloseReason? {
if (input.isSynthetic && eventType != EventType.HEARTBEAT) {
if (isInputStateSynthetic && eventType != EventType.HEARTBEAT) {
// Special case where the engine started with a synthetic offset:
// don't even consider closing the engine unless handling a heartbeat event.
// For some databases, such as Oracle, Debezium actually needs to snapshot the
Expand Down Expand Up @@ -315,7 +317,6 @@ open class CdcPartitionReader<T : Comparable<T>>(
}

enum class CloseReason(val message: String) {
TIMEOUT("timed out"),
HEARTBEAT_OR_TOMBSTONE_REACHED_TARGET_POSITION(
"heartbeat or tombstone indicates that WAL consumption has reached the target position"
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ open class CdcPartitionsCreator<T : Comparable<T>>(
val readerOps: CdcPartitionReaderDebeziumOperations<T>,
val lowerBoundReference: AtomicReference<T>,
val upperBoundReference: AtomicReference<T>,
val resetReason: AtomicReference<String?>,
) : PartitionsCreator {
private val log = KotlinLogging.logger {}
private val acquiredThread = AtomicReference<ConcurrencyResource.AcquiredThread>()

class OffsetInvalidNeedsResyncIllegalStateException() : IllegalStateException()

override fun tryAcquireResources(): PartitionsCreator.TryAcquireResourcesStatus {
val acquiredThread: ConcurrencyResource.AcquiredThread =
concurrencyResource.tryAcquire()
Expand All @@ -50,48 +49,74 @@ open class CdcPartitionsCreator<T : Comparable<T>>(
)

override suspend fun run(): List<PartitionReader> {
if (CDCNeedsRestart) {
resetReason.get()?.let { reason: String ->
throw TransientErrorException(
"Saved offset no longer present on the server, Airbyte is going to trigger a sync from scratch."
"Triggering reset. Incumbent CDC state is invalid, reason: ${reason}."
)
}
val activeStreams: List<Stream> by lazy {
feedBootstrap.feed.streams.filter { feedBootstrap.stateQuerier.current(it) != null }
}
val syntheticInput: DebeziumInput by lazy { creatorOps.synthesize() }
val syntheticOffset: DebeziumOffset by lazy { creatorOps.generateColdStartOffset() }
// Ensure that the WAL position upper bound has been computed for this sync.
val upperBound: T =
upperBoundReference.updateAndGet {
it ?: creatorOps.position(syntheticInput.state.offset)
}
upperBoundReference.updateAndGet { it ?: creatorOps.position(syntheticOffset) }
// Deserialize the incumbent state value, if it exists.
val input: DebeziumInput =
when (val incumbentOpaqueStateValue = feedBootstrap.currentState) {
null -> syntheticInput
else -> {
// validate if existing state is still valid on DB.
try {
creatorOps.deserialize(incumbentOpaqueStateValue, activeStreams)
} catch (ex: ConfigErrorException) {
log.error(ex) { "Existing state is invalid." }
throw ex
} catch (_: OffsetInvalidNeedsResyncIllegalStateException) {
// If deserialization concludes we need a re-sync we rollback stream states
// and put the creator in a Need Restart mode.
// The next round will throw a transient error to kickoff the resync
feedBootstrap.stateQuerier.resetFeedStates()
CDCNeedsRestart = true
syntheticInput
}
val warmStartState: DebeziumWarmStartState? =
feedBootstrap.currentState?.let {
try {
creatorOps.deserializeState(it)
} catch (e: Exception) {
// This catch should be redundant for well-behaved implementations
// but is included anyway for safety.
AbortDebeziumWarmStartState(e.toString())
}
}

val debeziumProperties: Map<String, String>
val startingOffset: DebeziumOffset
val startingSchemaHistory: DebeziumSchemaHistory?
when (warmStartState) {
null -> {
debeziumProperties = creatorOps.generateColdStartProperties()
startingOffset = syntheticOffset
startingSchemaHistory = null
}
is ValidDebeziumWarmStartState -> {
debeziumProperties = creatorOps.generateWarmStartProperties(activeStreams)
startingOffset = warmStartState.offset
startingSchemaHistory = warmStartState.schemaHistory
}
is AbortDebeziumWarmStartState -> {
val e =
ConfigErrorException(
"Incumbent CDC state is invalid, reason: ${warmStartState.reason}"
)
log.error(e) { "Aborting. ${e.message}." }
throw e
}
is ResetDebeziumWarmStartState -> {
// The incumbent CDC state value is invalid and the sync needs to be reset.
// Doing so is not so straightforward as throwing a TransientErrorException because
// a STATE message with a post-reset state needs to be emitted first.
// This new state is obtained by zeroing all corresponding feeds in the StateManager
// and returning a CdcPartitionReader for a cold start with a synthetic offset.
// This CdcPartitionReader will run, after which the desired STATE message will be
// emitted, and finally the next CdcPartitionsCreator will throw a
// TransientErrorException. The next sync will then snapshot the tables.
resetReason.set(warmStartState.reason)
log.info { "Resetting invalid incumbent CDC state with synthetic state." }
feedBootstrap.stateQuerier.resetFeedStates()
debeziumProperties = creatorOps.generateColdStartProperties()
startingOffset = syntheticOffset
startingSchemaHistory = null
}
}
// Build and return PartitionReader instance, if applicable.
val partitionReader = createCdcPartitionReader(upperBound, input)

val lowerBound: T = creatorOps.position(input.state.offset)
val lowerBoundInPreviousRound: T? = lowerBoundReference.getAndSet(lowerBound)
if (input.isSynthetic) {
if (partitionReader.isInputStateSynthetic) {
// Handle synthetic offset edge-case, which always needs to run.
// Debezium needs to run to generate the full state, which might include schema history.
log.info { "Current offset is synthetic." }
Expand All @@ -116,8 +141,4 @@ open class CdcPartitionsCreator<T : Comparable<T>>(
log.info { "Current position '$lowerBound' does not exceed target position '$upperBound'." }
return listOf(partitionReader)
}

companion object {
var CDCNeedsRestart: Boolean = false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class CdcPartitionsCreatorFactory<T : Comparable<T>>(
*/
protected val upperBoundReference = AtomicReference<T>()

/** [AtomicReference] used to trigger resetting a sync when not null. */
private val resetReason = AtomicReference<String?>(null)

override fun make(feedBootstrap: FeedBootstrap<*>): PartitionsCreator? {
if (feedBootstrap !is GlobalFeedBootstrap) {
// Fall through on non-Global streams.
Expand All @@ -47,6 +50,7 @@ class CdcPartitionsCreatorFactory<T : Comparable<T>>(
debeziumOps,
lowerBoundReference,
upperBoundReference,
resetReason,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,19 @@ value class DebeziumRecordValue(val wrapped: JsonNode) {
}
}

/** Debezium Engine input. */
data class DebeziumInput(
val properties: Map<String, String>,
val state: DebeziumState,
val isSynthetic: Boolean,
)

/** Debezium Engine output, other than records of course. */
data class DebeziumState(
/** Return type for [CdcPartitionsCreatorDebeziumOperations.deserializeState]. */
sealed interface DebeziumWarmStartState

data class ValidDebeziumWarmStartState(
val offset: DebeziumOffset,
val schemaHistory: DebeziumSchemaHistory?,
)
val schemaHistory: DebeziumSchemaHistory?
) : DebeziumWarmStartState

sealed interface InvalidDebeziumWarmStartState : DebeziumWarmStartState

data class AbortDebeziumWarmStartState(val reason: String) : InvalidDebeziumWarmStartState

data class ResetDebeziumWarmStartState(val reason: String) : InvalidDebeziumWarmStartState

/** [DebeziumOffset] wraps the contents of a Debezium offset file. */
@JvmInline value class DebeziumOffset(val wrapped: Map<JsonNode, JsonNode>)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,19 @@ interface CdcPartitionsCreatorDebeziumOperations<T : Comparable<T>> {
/** Extracts the WAL position from a [DebeziumOffset]. */
fun position(offset: DebeziumOffset): T

/** Synthesizes a [DebeziumInput] when no incumbent [OpaqueStateValue] is available. */
fun synthesize(): DebeziumInput
/**
* Synthesizes a [DebeziumColdStartingState] when no incumbent [OpaqueStateValue] is available.
*/
fun generateColdStartOffset(): DebeziumOffset

/** Generates Debezium properties for use with a [DebeziumColdStartingState]. */
fun generateColdStartProperties(): Map<String, String>

/** Maps an incumbent [OpaqueStateValue] into a [DebeziumWarmStartState]. */
fun deserializeState(opaqueStateValue: OpaqueStateValue): DebeziumWarmStartState

/** Builds a [DebeziumInput] using an incumbent [OpaqueStateValue]. */
fun deserialize(opaqueStateValue: OpaqueStateValue, streams: List<Stream>): DebeziumInput
/** Generates Debezium properties for use with a [ValidDebeziumWarmStartState]. */
fun generateWarmStartProperties(streams: List<Stream>): Map<String, String>
}

interface CdcPartitionReaderDebeziumOperations<T : Comparable<T>> {
Expand All @@ -35,7 +43,7 @@ interface CdcPartitionReaderDebeziumOperations<T : Comparable<T>> {
*
* Returning null means that the event should be treated like a heartbeat.
*/
fun deserialize(
fun deserializeRecord(
key: DebeziumRecordKey,
value: DebeziumRecordValue,
stream: Stream,
Expand All @@ -53,8 +61,11 @@ interface CdcPartitionReaderDebeziumOperations<T : Comparable<T>> {
value: DebeziumRecordValue,
): String?

/** Maps a [DebeziumState] to an [OpaqueStateValue]. */
fun serialize(debeziumState: DebeziumState): OpaqueStateValue
/** Maps a Debezium state to an [OpaqueStateValue]. */
fun serializeState(
offset: DebeziumOffset,
schemaHistory: DebeziumSchemaHistory?
): OpaqueStateValue

/** Tries to extract the WAL position from a [DebeziumRecordValue]. */
fun position(recordValue: DebeziumRecordValue): T?
Expand Down
Loading

0 comments on commit 7b8af6b

Please sign in to comment.