-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bulk Load CDK: Unwrap multipart streaming upload #48810
Bulk Load CDK: Unwrap multipart streaming upload #48810
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Skipped Deployment
|
private val streamProcessor: StreamProcessor<T>, | ||
private val wrappingBuffer: T | ||
) : ObjectStorageFormattingWriter { | ||
override val numCapturedChanges: AtomicLong = writer.numCapturedChanges |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this for? I only see it used for logging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's some of what I added for testing, but if feels like useful information.
|
||
fun isDataSufficient(): Boolean { | ||
return buffer.size() >= partSize | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: this seems orthogonal to the buffered writer. Since it's called strictly externally, we can easily pull this out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it should just be a size check.
metadata: Map<String, String> | ||
): StreamingUpload<S3Object> { | ||
// TODO: Remove permit handling once we control concurrency with # of accumulators | ||
if (uploadPermits != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we go ahead and move the semaphore upstream to get it out of the client code at least?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved, pending nit change not withstanding
c44bb77
to
c0dcc58
Compare
I also ripped out the logging. |
c0dcc58
to
5000a21
Compare
a8f42b9
to
6b486f6
Compare
6b486f6
to
7f0a7ae
Compare
What
This is a first step toward separating processing from uploading
BufferingFormattedWriter
that wraps the Avro/Json/Csv/Parquet writers, accumulating the records and yielding complete parts