Skip to content

Commit

Permalink
Destination S3-V2: Bug Fix: File xfer uses part size for part, not fi…
Browse files Browse the repository at this point in the history
…le size
  • Loading branch information
johnny-schmidt committed Jan 30, 2025
1 parent e2edfd7 commit b37d539
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import io.airbyte.cdk.load.message.DestinationFile
import io.airbyte.cdk.load.message.MultiProducerChannel
import io.airbyte.cdk.load.message.object_storage.LoadablePart
import io.airbyte.cdk.load.write.FileBatchAccumulator
import io.github.oshai.kotlinlogging.KotlinLogging
import java.io.File
import java.nio.file.Path

Expand All @@ -28,6 +29,8 @@ class FilePartAccumulator(
private val stream: DestinationStream,
private val outputQueue: MultiProducerChannel<BatchEnvelope<*>>,
) : FileBatchAccumulator {
val log = KotlinLogging.logger {}

override suspend fun processFilePart(file: DestinationFile, index: Long) {
val key =
Path.of(pathFactory.getFinalDirectory(stream), "${file.fileMessage.fileRelativePath}")
Expand All @@ -44,8 +47,9 @@ class FilePartAccumulator(

while (true) {
val bytePart =
ByteArray(ObjectStorageUploadConfiguration.DEFAULT_FILE_SIZE_BYTES.toInt())
ByteArray(ObjectStorageUploadConfiguration.DEFAULT_PART_SIZE_BYTES.toInt())
val read = fileInputStream.read(bytePart)
log.info { "Read $read bytes from file" }

if (read == -1) {
val filePart: ByteArray? = null
Expand All @@ -62,6 +66,7 @@ class FilePartAccumulator(
handleFilePart(batch, stream.descriptor, index)
}
}
fileInputStream.close()
localFile.delete()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class PartToObjectAccumulator<T : RemoteObject<*>>(
val streamingUpload = upload.streamingUpload.await()

log.info {
"Processing loadable part ${batch.part.partIndex} of ${batch.part.key} (empty=${batch.part.isEmpty}; final=${batch.part.isFinal})"
"Processing loadable part ${batch.part.partIndex} of ${batch.part.key} (size=${batch.part.bytes?.size}; final=${batch.part.isFinal})"
}

// Upload provided bytes and update indexes.
Expand Down

0 comments on commit b37d539

Please sign in to comment.