diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt index cbf0199082dff..874f484bb2367 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt @@ -16,6 +16,7 @@ import io.airbyte.cdk.load.message.DestinationFile import io.airbyte.cdk.load.message.MultiProducerChannel import io.airbyte.cdk.load.message.object_storage.LoadablePart import io.airbyte.cdk.load.write.FileBatchAccumulator +import io.github.oshai.kotlinlogging.KotlinLogging import java.io.File import java.nio.file.Path @@ -28,6 +29,8 @@ class FilePartAccumulator( private val stream: DestinationStream, private val outputQueue: MultiProducerChannel<BatchEnvelope<*>>, ) : FileBatchAccumulator { + val log = KotlinLogging.logger {} + override suspend fun processFilePart(file: DestinationFile, index: Long) { val key = Path.of(pathFactory.getFinalDirectory(stream), "${file.fileMessage.fileRelativePath}") @@ -44,8 +47,9 @@ class FilePartAccumulator( while (true) { val bytePart = - ByteArray(ObjectStorageUploadConfiguration.DEFAULT_FILE_SIZE_BYTES.toInt()) + ByteArray(ObjectStorageUploadConfiguration.DEFAULT_PART_SIZE_BYTES.toInt()) val read = fileInputStream.read(bytePart) + log.info { "Read $read bytes from file" } if (read == -1) { val filePart: ByteArray? = null @@ -62,6 +66,7 @@ class FilePartAccumulator( handleFilePart(batch, stream.descriptor, index) } } + fileInputStream.close() localFile.delete() } diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/PartToObjectAccumulator.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/PartToObjectAccumulator.kt index 1dba37ab3ee46..defb28688c527 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/PartToObjectAccumulator.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/PartToObjectAccumulator.kt @@ -48,7 +48,7 @@ class PartToObjectAccumulator<T : RemoteObject<*>>( val streamingUpload = upload.streamingUpload.await() log.info { - "Processing loadable part ${batch.part.partIndex} of ${batch.part.key} (empty=${batch.part.isEmpty}; final=${batch.part.isFinal})" + "Processing loadable part ${batch.part.partIndex} of ${batch.part.key} (size=${batch.part.bytes?.size}; final=${batch.part.isFinal})" } // Upload provided bytes and update indexes.