-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
bulk-cdk-toolkit-extract-cdc: API changes #52040
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
@@ -113,8 +138,4 @@ class CdcPartitionsCreator<T : Comparable<T>>( | |||
log.info { "Current position '$lowerBound' does not exceed target position '$upperBound'." } | |||
return listOf(partitionReader) | |||
} | |||
|
|||
companion object { | |||
var CDCNeedsRestart: Boolean = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been replaced by an AtomicBoolean
in the cdc partitions creator factory class.
@@ -298,7 +302,6 @@ class CdcPartitionReader<T : Comparable<T>>( | |||
} | |||
|
|||
enum class CloseReason(val message: String) { | |||
TIMEOUT("timed out"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this wasn't being used
startingOffset = syntheticOffset | ||
startingSchemaHistory = null | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The diff here is noisy, as it should be, but the changes really aren't that substantial.
) | ||
|
||
/** Debezium Engine output, other than records of course. */ | ||
data class DebeziumState( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two types just weren't used that much and it seems that getting rid of them does improve things.
|
||
data class AbortDebeziumWarmStartState(val reason: String) : InvalidDebeziumWarmStartState | ||
|
||
data class ResetDebeziumWarmStartState(val reason: String) : InvalidDebeziumWarmStartState |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This abort vs reset failure handling is going to be present in every connector with CDC if it isn't the case already, so it makes sense to recognize it in the CDK. Previously OffsetInvalidNeedsResyncIllegalStateException
was used as a stopgap but using exceptions for control flow can be confusing.
37e4428
to
1565204
Compare
What
This PR makes substantial changes to the API of the extract-cdc toolkit of the Bulk CDK. Thanks @stephane-airbyte for the actionable feedback.
This PR also adds richer data types to better handle invalid debezium states, which can trigger resetting the sync.
How
The
DebeziumInput
andDebeziumState
types are gone,DebeziumOperations
methods are adjusted accordingly and broken down into smaller pieces. Changes then propagate toCdcPartitionsCreator
andCdcPartitionReader
, and then further down to the tests.Review guide
Follow the order outlined in the previous paragraph.
User Impact
None.
Can this PR be safely reverted and rolled back?