-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Load CDK: CheckpointManager support for Index-Based Checkpointing #53663
Load CDK: CheckpointManager support for Index-Based Checkpointing #53663
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
7b4f628
to
6069fa9
Compare
8754a59
to
ded3918
Compare
6069fa9
to
5714081
Compare
4c5afe6
to
ebe0036
Compare
5714081
to
c5d56a4
Compare
ebe0036
to
89ffc87
Compare
c5d56a4
to
c34e0df
Compare
89ffc87
to
6c8c579
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
either I'm misunderstanding, or there's a bug? (which I think is being masked b/c the unit test only has 1 record per stream)
airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/CheckpointManager.kt
Outdated
Show resolved
Hide resolved
3570a29
to
d20d91d
Compare
6c8c579
to
a8575e3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
two nits, otherwise lgtm
@@ -155,7 +164,13 @@ abstract class StreamsCheckpointManager<T> : CheckpointManager<DestinationStream | |||
val head = globalCheckpoints.peek() | |||
val allStreamsPersisted = | |||
head.streamIndexes.all { (stream, index) -> | |||
syncManager.getStreamManager(stream).areRecordsPersistedUntil(index) | |||
if (!checkpointById) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: invert this if statement (i.e. if (checkpointById) { ... CheckpointId()... } else {...}
)
======= | ||
* True if all records in the stream have been marked as completed AND the stream has been marked as | ||
* complete. | ||
>>>>>>> 3570a293f9b (addressed comments) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like a borked git merge
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah fixed it locally
a8575e3
to
4261c93
Compare
4261c93
to
70a91da
Compare
70a91da
to
2bf1b3e
Compare
What
Follow-up to #53646. This adds the corresponding support in the checkpoint manager for index-based accounting.