diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationCatalog.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationCatalog.kt index 2f84cdf19c86c..d70583b125046 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationCatalog.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationCatalog.kt @@ -35,6 +35,11 @@ data class DestinationCatalog(val streams: List = emptyList() ?: throw IllegalArgumentException("Stream not found: namespace=$namespace, name=$name") } + fun getStream(descriptor: DestinationStream.Descriptor): DestinationStream { + return byDescriptor[descriptor] + ?: throw IllegalArgumentException("Stream not found: $descriptor") + } + fun asProtocolObject(): ConfiguredAirbyteCatalog = ConfiguredAirbyteCatalog().withStreams(streams.map { it.asProtocolObject() }) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt index 7ced4f3d7493b..07b2d90bfd912 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt @@ -9,10 +9,17 @@ import io.airbyte.cdk.load.command.DestinationConfiguration import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.message.BatchEnvelope import io.airbyte.cdk.load.message.ChannelMessageQueue +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue import io.airbyte.cdk.load.message.MultiProducerChannel +import io.airbyte.cdk.load.message.PartitionedQueue +import io.airbyte.cdk.load.message.PipelineEvent +import io.airbyte.cdk.load.message.StreamKey +import io.airbyte.cdk.load.pipeline.BatchUpdate import io.airbyte.cdk.load.state.ReservationManager +import io.airbyte.cdk.load.state.Reserved import io.airbyte.cdk.load.task.implementor.FileAggregateMessage import io.airbyte.cdk.load.task.implementor.FileTransferQueueMessage +import io.airbyte.cdk.load.write.LoadStrategy import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Factory import io.micronaut.context.annotation.Value @@ -94,5 +101,37 @@ class SyncBeanFactory { @Singleton @Named("openStreamQueue") - class OpenStreamQueue : ChannelMessageQueue() + class OpenStreamQueue : ChannelMessageQueue(Channel(Channel.UNLIMITED)) + + /** + * If the client uses a new-style LoadStrategy, then we need to checkpoint by checkpoint id + * instead of record index. + */ + @Singleton + @Named("checkpointById") + fun isCheckpointById(loadStrategy: LoadStrategy? = null): Boolean = loadStrategy != null + + /** + * A single record queue for the whole sync, containing all streams, optionally partitioned by a + * configurable number of partitions. Number of partitions is controlled by the specified + * LoadStrategy, if any. + */ + @Singleton + @Named("recordQueue") + fun recordQueue( + loadStrategy: LoadStrategy? = null, + ): PartitionedQueue>> { + return PartitionedQueue( + Array(loadStrategy?.inputPartitions ?: 1) { + ChannelMessageQueue(Channel(Channel.UNLIMITED)) + } + ) + } + + /** A queue for updating batch states, which is not partitioned. */ + @Singleton + @Named("batchStateUpdateQueue") + fun batchStateUpdateQueue(): ChannelMessageQueue { + return ChannelMessageQueue(Channel(100)) + } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/Batch.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/Batch.kt index 0c6fe3e172878..72f10bdc76b04 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/Batch.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/Batch.kt @@ -57,15 +57,17 @@ interface Batch { PROCESSED, STAGED, PERSISTED, - COMPLETE + COMPLETE; + + fun isPersisted(): Boolean = + when (this) { + PERSISTED, + COMPLETE -> true + else -> false + } } - fun isPersisted(): Boolean = - when (state) { - State.PERSISTED, - State.COMPLETE -> true - else -> false - } + fun isPersisted(): Boolean = state.isPersisted() val state: State diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt index a02df2b5a437f..9cb52fd53593f 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt @@ -139,7 +139,8 @@ data class DestinationRecord( Meta( message.record.meta?.changes?.map { Meta.Change(it.field, it.change, it.reason) } ?: emptyList() - ) + ), + serialized.length.toLong() ) } } @@ -159,6 +160,7 @@ data class DestinationRecordAirbyteValue( val data: AirbyteValue, val emittedAtMs: Long, val meta: Meta?, + val serializedSizeBytes: Long = 0L ) data class DestinationFile( diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessageQueues.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessageQueues.kt index 7164e89826889..abd2e10a3d2c5 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessageQueues.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessageQueues.kt @@ -11,6 +11,7 @@ import io.airbyte.cdk.load.state.Reserved import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton import java.util.concurrent.ConcurrentHashMap +import kotlinx.coroutines.channels.Channel interface Sized { val sizeBytes: Long @@ -52,7 +53,8 @@ data class StreamFlushEvent( override val sizeBytes: Long = 0L } -class DestinationStreamEventQueue : ChannelMessageQueue>() +class DestinationStreamEventQueue : + ChannelMessageQueue>(Channel(Channel.UNLIMITED)) /** * A supplier of message queues to which ([ReservationManager.reserve]'d) @ [DestinationStreamEvent] @@ -97,4 +99,5 @@ data class GlobalCheckpointWrapped( */ @Singleton @Secondary -class CheckpointMessageQueue : ChannelMessageQueue>() +class CheckpointMessageQueue : + ChannelMessageQueue>(Channel(Channel.UNLIMITED)) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MessageQueue.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MessageQueue.kt index d62f4db311a67..6478a2094b613 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MessageQueue.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MessageQueue.kt @@ -12,7 +12,7 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.receiveAsFlow interface QueueReader { - suspend fun consume(): Flow + fun consume(): Flow suspend fun poll(): T? } @@ -23,12 +23,11 @@ interface QueueWriter : CloseableCoroutine { interface MessageQueue : QueueReader, QueueWriter -abstract class ChannelMessageQueue : MessageQueue { - open val channel = Channel(Channel.UNLIMITED) +open class ChannelMessageQueue(val channel: Channel) : MessageQueue { private val isClosed = AtomicBoolean(false) override suspend fun publish(message: T) = channel.send(message) - override suspend fun consume(): Flow = channel.receiveAsFlow() + override fun consume(): Flow = channel.receiveAsFlow() override suspend fun poll(): T? = channel.tryReceive().getOrNull() override suspend fun close() { if (isClosed.setOnce()) { diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MultiProducerChannel.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MultiProducerChannel.kt index c369e8b47b8c3..8641407af121b 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MultiProducerChannel.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MultiProducerChannel.kt @@ -14,9 +14,9 @@ import kotlinx.coroutines.channels.Channel */ class MultiProducerChannel( producerCount: Long, - override val channel: Channel, + channel: Channel, private val name: String, -) : ChannelMessageQueue() { +) : ChannelMessageQueue(channel = channel) { private val log = KotlinLogging.logger {} private val initializedProducerCount = producerCount private val producerCount = AtomicLong(producerCount) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/PartitionedQueue.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/PartitionedQueue.kt new file mode 100644 index 0000000000000..1b49db9eb8a7a --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/PartitionedQueue.kt @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.message + +import io.airbyte.cdk.load.util.CloseableCoroutine +import kotlinx.coroutines.flow.Flow + +class PartitionedQueue(private val queues: Array>) : CloseableCoroutine { + val partitions = queues.size + + fun consume(partition: Int): Flow { + if (partition < 0 || partition >= queues.size) { + throw IllegalArgumentException("Invalid partition: $partition") + } + return queues[partition].consume() + } + + suspend fun publish(value: T, partition: Int) { + if (partition < 0 || partition >= queues.size) { + throw IllegalArgumentException("Invalid partition: $partition") + } + queues[partition].publish(value) + } + + suspend fun broadcast(value: T) = queues.forEach { it.publish(value) } + + override suspend fun close() { + queues.forEach { it.close() } + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/PipelineEvent.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/PipelineEvent.kt new file mode 100644 index 0000000000000..9152fa9a94b1a --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/PipelineEvent.kt @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.message + +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.state.CheckpointId + +/** Used internally by the CDK to pass messages between steps in the loader pipeline. */ +sealed interface PipelineEvent + +class PipelineMessage( + val checkpointCounts: Map, + val key: K, + val value: T +) : PipelineEvent + +/** + * We send the end message on the stream and not the key, because there's no way to partition an + * empty message. + */ +class PipelineEndOfStream(val stream: DestinationStream.Descriptor) : + PipelineEvent diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/WithBatchState.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/WithBatchState.kt new file mode 100644 index 0000000000000..1464cb8aaf970 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/WithBatchState.kt @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.message + +/** + * Used internally by the CDK to implement Loaders. It is added to outputs of + * [io.airbyte.cdk.load.pipeline.BatchAccumulator] that can ack or complete record batches. This is + * done *when stitching the dev interface to the pipeline*, so the dev does not have to think about + * internal state. + */ +interface WithBatchState { + val state: Batch.State +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/WithStream.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/WithStream.kt new file mode 100644 index 0000000000000..409bdf8e534aa --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/WithStream.kt @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.message + +import io.airbyte.cdk.load.command.DestinationStream + +/** + * Used internally by the CDK to keep track of streams while still allowing for partitioning on key. + */ +interface WithStream { + val stream: DestinationStream.Descriptor +} + +/** The default key: partitioned by stream. */ +data class StreamKey(override val stream: DestinationStream.Descriptor) : WithStream diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/BatchAccumulator.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/BatchAccumulator.kt new file mode 100644 index 0000000000000..fe2f03dcc6f71 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/BatchAccumulator.kt @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.pipeline + +import io.airbyte.cdk.load.message.WithStream + +/** + * [BatchAccumulator] is used internally by the CDK to implement RecordLoaders. Connector devs + * should never need to implement this interface. + */ +interface BatchAccumulator { + fun start(key: K, part: Int): S + fun accept(record: T, state: S): Pair + fun finish(state: S): U +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/BatchStateUpdate.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/BatchStateUpdate.kt new file mode 100644 index 0000000000000..79c62336a4732 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/BatchStateUpdate.kt @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.pipeline + +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.message.Batch +import io.airbyte.cdk.load.state.CheckpointId + +/** Used internally by the CDK to track record ranges to ack. */ +sealed interface BatchUpdate { + val stream: DestinationStream.Descriptor +} + +data class BatchStateUpdate( + override val stream: DestinationStream.Descriptor, + val checkpointCounts: Map, + val state: Batch.State, +) : BatchUpdate + +data class BatchEndOfStream( + override val stream: DestinationStream.Descriptor, +) : BatchUpdate diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadPipeline.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadPipeline.kt new file mode 100644 index 0000000000000..8cf5f814c21ad --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadPipeline.kt @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.pipeline + +import io.airbyte.cdk.load.write.DirectLoaderFactory +import io.micronaut.context.annotation.Requires +import jakarta.inject.Singleton + +/** + * Used internally by the CDK to implement the DirectLoader. + * + * Creates a single pipeline step reading from a (possibly partitioned) record stream. Batch updates + * are written to the batchStateUpdateQueue whenever the loader returns + */ +@Singleton +@Requires(bean = DirectLoaderFactory::class) +class DirectLoadPipeline(val pipelineStep: DirectLoadPipelineStep<*>) : + LoadPipeline(listOf(pipelineStep)) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadPipelineStep.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadPipelineStep.kt new file mode 100644 index 0000000000000..c611e103458ce --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadPipelineStep.kt @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.pipeline + +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue +import io.airbyte.cdk.load.message.PartitionedQueue +import io.airbyte.cdk.load.message.PipelineEvent +import io.airbyte.cdk.load.message.QueueWriter +import io.airbyte.cdk.load.message.StreamKey +import io.airbyte.cdk.load.state.Reserved +import io.airbyte.cdk.load.task.internal.LoadPipelineStepTask +import io.airbyte.cdk.load.write.DirectLoader +import io.airbyte.cdk.load.write.DirectLoaderFactory +import io.github.oshai.kotlinlogging.KotlinLogging +import io.micronaut.context.annotation.Requires +import io.micronaut.context.annotation.Value +import jakarta.inject.Named +import jakarta.inject.Singleton + +@Singleton +@Requires(bean = DirectLoaderFactory::class) +class DirectLoadPipelineStep( + val accumulator: DirectLoadRecordAccumulator, + @Named("recordQueue") + val inputQueue: + PartitionedQueue>>, + @Named("batchStateUpdateQueue") val batchQueue: QueueWriter, + @Value("\${airbyte.destination.core.record-batch-size-override:null}") + val batchSizeOverride: Long? = null, + val directLoaderFactory: DirectLoaderFactory, +) : LoadPipelineStep { + private val log = KotlinLogging.logger {} + + override val numWorkers: Int = directLoaderFactory.inputPartitions + + override fun taskForPartition(partition: Int): LoadPipelineStepTask<*, *, *, *, *> { + log.info { "Creating DirectLoad pipeline step task for partition $partition" } + return LoadPipelineStepTask( + accumulator, + inputQueue.consume(partition), + batchUpdateQueue = batchQueue, + outputPartitioner = null, + outputQueue = null as PartitionedQueue>?, + batchSizeOverride?.let { RecordCountFlushStrategy(it) }, + partition + ) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadRecordAccumulator.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadRecordAccumulator.kt new file mode 100644 index 0000000000000..e1f964919da8c --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadRecordAccumulator.kt @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.pipeline + +import io.airbyte.cdk.load.message.Batch +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue +import io.airbyte.cdk.load.message.WithBatchState +import io.airbyte.cdk.load.message.WithStream +import io.airbyte.cdk.load.write.DirectLoader +import io.airbyte.cdk.load.write.DirectLoader.* +import io.airbyte.cdk.load.write.DirectLoaderFactory +import io.micronaut.context.annotation.Requires +import jakarta.inject.Singleton + +data class DirectLoadAccResult(override val state: Batch.State) : WithBatchState + +/** + * Used internally by the CDK to wrap the client-provided DirectLoader in a generic + * BatchAccumulator, so that it can be used as a pipeline step. At this stage, the loader's public + * interface is mapped to the internal interface, hiding internal mechanics. + */ +@Singleton +@Requires(bean = DirectLoaderFactory::class) +class DirectLoadRecordAccumulator( + val directLoaderFactory: DirectLoaderFactory +) : BatchAccumulator { + override fun start(key: K, part: Int): S { + return directLoaderFactory.create(key.stream, part) + } + + override fun accept( + record: DestinationRecordAirbyteValue, + state: S + ): Pair { + state.accept(record).let { + return when (it) { + is Incomplete -> Pair(state, null) + is Complete -> Pair(state, DirectLoadAccResult(Batch.State.COMPLETE)) + } + } + } + + override fun finish(state: S): DirectLoadAccResult { + state.finish() + return DirectLoadAccResult(Batch.State.COMPLETE) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/InputPartitioner.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/InputPartitioner.kt new file mode 100644 index 0000000000000..19189e7d33a44 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/InputPartitioner.kt @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.pipeline + +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue +import io.micronaut.context.annotation.Secondary +import jakarta.inject.Singleton +import kotlin.math.abs + +/** + * A dev interface for expressing how incoming data is partitioned. By default, data will be + * partitioned by a hash of the stream name and namespace. + */ +interface InputPartitioner { + fun getPartition(record: DestinationRecordAirbyteValue, numParts: Int): Int +} + +@Singleton +@Secondary +class ByStreamInputPartitioner : InputPartitioner { + override fun getPartition(record: DestinationRecordAirbyteValue, numParts: Int): Int { + return abs(record.stream.hashCode()) % numParts + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/LoadPipeline.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/LoadPipeline.kt new file mode 100644 index 0000000000000..44aad5ad96945 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/LoadPipeline.kt @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.pipeline + +import io.airbyte.cdk.load.task.Task +import io.airbyte.cdk.load.task.internal.LoadPipelineStepTask + +interface LoadPipelineStep { + val numWorkers: Int + fun taskForPartition(partition: Int): LoadPipelineStepTask<*, *, *, *, *> +} + +/** + * Used internally by the pipeline to assemble a launcher for any loader's pipeline. CDK devs can + * use this to implement new flavors of interface. Connector devs should generally avoid using this. + */ +abstract class LoadPipeline( + private val steps: List, +) { + suspend fun start(launcher: suspend (Task) -> Unit) { + steps.forEach { step -> repeat(step.numWorkers) { launcher(step.taskForPartition(it)) } } + } + + /** For closing intermediate queues or other resources. */ + open suspend fun stop() {} +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/OutputPartitioner.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/OutputPartitioner.kt new file mode 100644 index 0000000000000..af9f049a160e5 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/OutputPartitioner.kt @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.pipeline + +import io.airbyte.cdk.load.message.WithStream + +/** + * Used internally by the CDK to determine how to partition data passed between steps. The dev + * should not implement this directly, but via specialized child classes provided for each loader + * type. + */ +interface OutputPartitioner { + fun getOutputKey(inputKey: K1, output: U): K2 + fun getPart(outputKey: K2, numParts: Int): Int +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/PipelineFlushStrategy.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/PipelineFlushStrategy.kt new file mode 100644 index 0000000000000..f69a1f9a6096f --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/PipelineFlushStrategy.kt @@ -0,0 +1,11 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.pipeline + +interface PipelineFlushStrategy { + fun shouldFlush( + inputCount: Long, + ): Boolean +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/RecordCountFlushStrategy.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/RecordCountFlushStrategy.kt new file mode 100644 index 0000000000000..6284fb32adaf3 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/RecordCountFlushStrategy.kt @@ -0,0 +1,11 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.pipeline + +class RecordCountFlushStrategy(private val recordCount: Long) : PipelineFlushStrategy { + override fun shouldFlush(inputCount: Long): Boolean { + return inputCount >= recordCount + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/CheckpointManager.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/CheckpointManager.kt index 4b119edeb1aa0..cbe51c3d6e77b 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/CheckpointManager.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/CheckpointManager.kt @@ -13,7 +13,7 @@ import io.airbyte.cdk.load.util.use import io.airbyte.protocol.models.v0.AirbyteMessage import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Secondary -import io.micronaut.context.annotation.Value +import jakarta.inject.Named import jakarta.inject.Singleton import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentLinkedQueue @@ -290,11 +290,13 @@ class DefaultCheckpointManager( override val syncManager: SyncManager, override val outputConsumer: suspend (Reserved) -> Unit, override val timeProvider: TimeProvider, - @Value("\${airbyte.destination.core.checkpoint-by-id:false}") - override val checkpointById: Boolean = false + @Named("checkpointById") override val checkpointById: Boolean = false, ) : StreamsCheckpointManager>() { + private val log = KotlinLogging.logger {} + init { lastFlushTimeMs.set(timeProvider.currentTimeMillis()) + log.info { "Checkpoint manager initialized with checkpointById: $checkpointById" } } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/StreamManager.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/StreamManager.kt index dfe0abb408260..fb12001bfc85c 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/StreamManager.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/StreamManager.kt @@ -14,7 +14,6 @@ import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicLong import kotlinx.coroutines.CompletableDeferred @@ -100,7 +99,7 @@ interface StreamManager { * * This will be incremented each time `markCheckpoint` is called. */ - fun getNextCheckpointId(): CheckpointId + fun getCurrentCheckpointId(): CheckpointId /** Update the counts of persisted for a given checkpoint. */ fun incrementPersistedCount(checkpointId: CheckpointId, count: Long) @@ -144,8 +143,10 @@ class DefaultStreamManager( val recordsPersisted: AtomicLong = AtomicLong(0L), val recordsCompleted: AtomicLong = AtomicLong(0L), ) + private val nextCheckpointId = AtomicLong(0L) private val lastCheckpointRecordIndex = AtomicLong(0L) - private val checkpointCounts: ConcurrentLinkedQueue = ConcurrentLinkedQueue() + private val checkpointCounts: ConcurrentHashMap = + ConcurrentHashMap() init { Batch.State.entries.forEach { rangesState[it] = TreeRangeSet.create() } @@ -183,7 +184,15 @@ class DefaultStreamManager( override fun markCheckpoint(): Pair { val recordIndex = recordCount.get() val count = recordIndex - lastCheckpointRecordIndex.getAndSet(recordIndex) - checkpointCounts.add(CheckpointCounts(count)) + + val checkpointId = CheckpointId(nextCheckpointId.getAndIncrement().toInt()) + checkpointCounts.merge(checkpointId, CheckpointCounts(recordsRead = count)) { old, _ -> + if (old.recordsRead > 0) { + throw IllegalStateException("Checkpoint $old already exists") + } + old.copy(recordsRead = count) + } + return Pair(recordIndex, count) } @@ -332,39 +341,32 @@ class DefaultStreamManager( return streamResult.isActive } - override fun getNextCheckpointId(): CheckpointId { - return CheckpointId(checkpointCounts.size) + override fun getCurrentCheckpointId(): CheckpointId { + return CheckpointId(nextCheckpointId.get().toInt()) } override fun incrementPersistedCount(checkpointId: CheckpointId, count: Long) { - checkpointCounts.elementAtOrNull(checkpointId.id)?.let { - val result = it.recordsPersisted.addAndGet(count) - if (result > it.recordsRead) { - throw IllegalStateException( - "Persisted count $result for $checkpointId exceeds read count ${it.recordsRead}" - ) - } - } - ?: throw IllegalStateException("No checkpoint found for $checkpointId") + checkpointCounts + .getOrPut(checkpointId) { CheckpointCounts() } + .recordsPersisted + .addAndGet(count) } override fun incrementCompletedCount(checkpointId: CheckpointId, count: Long) { - checkpointCounts.elementAtOrNull(checkpointId.id)?.let { - val result = it.recordsCompleted.addAndGet(count) - if (result > it.recordsRead) { - throw IllegalStateException( - "Completed count $result for $checkpointId exceeds read count ${it.recordsRead}" - ) - } - } - ?: throw IllegalStateException("No checkpoint found for $checkpointId") + checkpointCounts + .getOrPut(checkpointId) { CheckpointCounts() } + .recordsCompleted + .addAndGet(count) } override fun areRecordsPersistedUntilCheckpoint(checkpointId: CheckpointId): Boolean { + val counts = checkpointCounts.filter { it.key.id <= checkpointId.id } + if (counts.size < checkpointId.id + 1) { + return false + } + val (readCount, persistedCount, completedCount) = - checkpointCounts.take(checkpointId.id + 1).fold(Triple(0L, 0L, 0L)) { - acc, - checkpointCount -> + counts.toList().fold(Triple(0L, 0L, 0L)) { acc, (_, checkpointCount) -> Triple( acc.first + checkpointCount.recordsRead, acc.second + checkpointCount.recordsPersisted.get(), @@ -388,7 +390,7 @@ class DefaultStreamManager( return true } - val completedCount = checkpointCounts.sumOf { it.recordsCompleted.get() } + val completedCount = checkpointCounts.values.sumOf { it.recordsCompleted.get() } return completedCount == readCount } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt index 3afcdcaf16c08..9d67c39e4ebcb 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt @@ -9,11 +9,20 @@ import io.airbyte.cdk.load.command.DestinationCatalog import io.airbyte.cdk.load.command.DestinationConfiguration import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.message.BatchEnvelope +import io.airbyte.cdk.load.message.ChannelMessageQueue import io.airbyte.cdk.load.message.CheckpointMessageWrapped +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue import io.airbyte.cdk.load.message.DestinationStreamEvent import io.airbyte.cdk.load.message.MessageQueue import io.airbyte.cdk.load.message.MessageQueueSupplier +import io.airbyte.cdk.load.message.PartitionedQueue +import io.airbyte.cdk.load.message.PipelineEvent import io.airbyte.cdk.load.message.QueueWriter +import io.airbyte.cdk.load.message.StreamKey +import io.airbyte.cdk.load.message.WithStream +import io.airbyte.cdk.load.pipeline.BatchUpdate +import io.airbyte.cdk.load.pipeline.InputPartitioner +import io.airbyte.cdk.load.pipeline.LoadPipeline import io.airbyte.cdk.load.state.Reserved import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.implementor.CloseStreamTaskFactory @@ -31,6 +40,7 @@ import io.airbyte.cdk.load.task.internal.FlushTickTask import io.airbyte.cdk.load.task.internal.InputConsumerTaskFactory import io.airbyte.cdk.load.task.internal.ReservingDeserializingInputFlow import io.airbyte.cdk.load.task.internal.SpillToDiskTaskFactory +import io.airbyte.cdk.load.task.internal.UpdateBatchStateTaskFactory import io.airbyte.cdk.load.task.internal.UpdateCheckpointsTask import io.airbyte.cdk.load.util.setOnce import io.github.oshai.kotlinlogging.KotlinLogging @@ -48,6 +58,7 @@ import kotlinx.coroutines.sync.withLock interface DestinationTaskLauncher : TaskLauncher { suspend fun handleSetupComplete() suspend fun handleNewBatch(stream: DestinationStream.Descriptor, wrapped: BatchEnvelope<*>) + suspend fun handleStreamComplete(stream: DestinationStream.Descriptor) suspend fun handleStreamClosed(stream: DestinationStream.Descriptor) suspend fun handleTeardownComplete(success: Boolean = true) suspend fun handleException(e: Exception) @@ -90,7 +101,7 @@ interface DestinationTaskLauncher : TaskLauncher { "NP_NONNULL_PARAM_VIOLATION", justification = "arguments are guaranteed to be non-null by Kotlin's type system" ) -class DefaultDestinationTaskLauncher( +class DefaultDestinationTaskLauncher( private val taskScopeProvider: TaskScopeProvider, private val catalog: DestinationCatalog, private val config: DestinationConfiguration, @@ -129,7 +140,16 @@ class DefaultDestinationTaskLauncher( private val checkpointQueue: QueueWriter>, @Named("fileMessageQueue") private val fileTransferQueue: MessageQueue, - @Named("openStreamQueue") private val openStreamQueue: MessageQueue + @Named("openStreamQueue") private val openStreamQueue: MessageQueue, + + // New interface shim + @Named("recordQueue") + private val recordQueueForPipeline: + PartitionedQueue>>, + @Named("batchStateUpdateQueue") private val batchUpdateQueue: ChannelMessageQueue, + private val loadPipeline: LoadPipeline?, + private val partitioner: InputPartitioner, + private val updateBatchTaskFactory: UpdateBatchStateTaskFactory, ) : DestinationTaskLauncher { private val log = KotlinLogging.logger {} @@ -187,6 +207,10 @@ class DefaultDestinationTaskLauncher( checkpointQueue = checkpointQueue, fileTransferQueue = fileTransferQueue, destinationTaskLauncher = this, + recordQueueForPipeline = recordQueueForPipeline, + loadPipeline = loadPipeline, + partitioner = partitioner, + openStreamQueue = openStreamQueue, ) launch(inputConsumerTask) @@ -200,48 +224,59 @@ class DefaultDestinationTaskLauncher( launch(openStreamTaskFactory.make()) } - // TODO: pluggable file transfer - if (!fileTransferEnabled) { - // Start a spill-to-disk task for each record stream - catalog.streams.forEach { stream -> - log.info { "Starting spill-to-disk task for $stream" } - val spillTask = spillToDiskTaskFactory.make(this, stream.descriptor) - launch(spillTask) - } + if (loadPipeline != null) { + log.info { "Setting up load pipeline" } + loadPipeline.start { launch(it) } + log.info { "Launching update batch task" } + val updateBatchTask = updateBatchTaskFactory.make(this) + launch(updateBatchTask) + } else { + // TODO: pluggable file transfer + if (!fileTransferEnabled) { + // Start a spill-to-disk task for each record stream + catalog.streams.forEach { stream -> + log.info { "Starting spill-to-disk task for $stream" } + val spillTask = spillToDiskTaskFactory.make(this, stream.descriptor) + launch(spillTask) + } - repeat(config.numProcessRecordsWorkers) { - log.info { "Launching process records task $it" } - val task = processRecordsTaskFactory.make(this) - launch(task) - } + repeat(config.numProcessRecordsWorkers) { + log.info { "Launching process records task $it" } + val task = processRecordsTaskFactory.make(this) + launch(task) + } - repeat(config.numProcessBatchWorkers) { - log.info { "Launching process batch task $it" } - val task = processBatchTaskFactory.make(this) - launch(task) - } - } else { - repeat(config.numProcessRecordsWorkers) { - log.info { "Launching process file task $it" } - launch(processFileTaskFactory.make(this)) - } + repeat(config.numProcessBatchWorkers) { + log.info { "Launching process batch task $it" } + val task = processBatchTaskFactory.make(this) + launch(task) + } + } else { + repeat(config.numProcessRecordsWorkers) { + log.info { "Launching process file task $it" } + launch(processFileTaskFactory.make(this)) + } - repeat(config.numProcessBatchWorkersForFileTransfer) { - log.info { "Launching process batch task $it" } - val task = processBatchTaskFactory.make(this) - launch(task) + repeat(config.numProcessBatchWorkersForFileTransfer) { + log.info { "Launching process batch task $it" } + val task = processBatchTaskFactory.make(this) + launch(task) + } } + // Start flush task + log.info { "Starting timed file aggregate flush task " } + launch(flushTickTask) } - // Start flush task - log.info { "Starting timed file aggregate flush task " } - launch(flushTickTask) - log.info { "Starting checkpoint update task" } launch(updateCheckpointsTask) // Await completion - if (succeeded.receive()) { + val result = succeeded.receive() + openStreamQueue.close() + recordQueueForPipeline.close() + batchUpdateQueue.close() + if (result) { taskScopeProvider.close() } else { taskScopeProvider.kill() @@ -249,9 +284,12 @@ class DefaultDestinationTaskLauncher( } override suspend fun handleSetupComplete() { - log.info { "Setup task complete, opening streams" } - catalog.streams.forEach { openStreamQueue.publish(it) } - openStreamQueue.close() + if (loadPipeline == null) { + log.info { "Setup task complete, opening streams" } + catalog.streams.forEach { openStreamQueue.publish(it) } + log.info { "Closing open stream queue" } + openStreamQueue.close() + } } /** @@ -274,19 +312,24 @@ class DefaultDestinationTaskLauncher( } if (streamManager.isBatchProcessingComplete()) { - if (closeStreamHasRun.getOrPut(stream) { AtomicBoolean(false) }.setOnce()) { - log.info { "Batch processing complete: Starting close stream task for $stream" } - val task = closeStreamTaskFactory.make(this, stream) - launch(task) - } else { - log.info { "Close stream task has already run, skipping." } - } + handleStreamComplete(stream) } else { log.info { "Batch processing not complete: nothing else to do." } } } } + override suspend fun handleStreamComplete(stream: DestinationStream.Descriptor) { + log.info { "Processing complete for $stream" } + if (closeStreamHasRun.getOrPut(stream) { AtomicBoolean(false) }.setOnce()) { + log.info { "Batch processing complete: Starting close stream task for $stream" } + val task = closeStreamTaskFactory.make(this, stream) + launch(task) + } else { + log.info { "Close stream task has already run, skipping." } + } + } + /** Called when a stream is closed. */ override suspend fun handleStreamClosed(stream: DestinationStream.Descriptor) { if (teardownIsEnqueued.setOnce()) { diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt index 098aeaba55fd5..0f149c96e890b 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt @@ -15,6 +15,7 @@ import io.airbyte.cdk.load.message.DestinationFile import io.airbyte.cdk.load.message.DestinationFileStreamComplete import io.airbyte.cdk.load.message.DestinationFileStreamIncomplete import io.airbyte.cdk.load.message.DestinationRecord +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue import io.airbyte.cdk.load.message.DestinationRecordStreamComplete import io.airbyte.cdk.load.message.DestinationRecordStreamIncomplete import io.airbyte.cdk.load.message.DestinationStreamAffinedMessage @@ -23,13 +24,20 @@ import io.airbyte.cdk.load.message.GlobalCheckpoint import io.airbyte.cdk.load.message.GlobalCheckpointWrapped import io.airbyte.cdk.load.message.MessageQueue import io.airbyte.cdk.load.message.MessageQueueSupplier +import io.airbyte.cdk.load.message.PartitionedQueue +import io.airbyte.cdk.load.message.PipelineEndOfStream +import io.airbyte.cdk.load.message.PipelineEvent +import io.airbyte.cdk.load.message.PipelineMessage import io.airbyte.cdk.load.message.QueueWriter import io.airbyte.cdk.load.message.SimpleBatch import io.airbyte.cdk.load.message.StreamCheckpoint import io.airbyte.cdk.load.message.StreamCheckpointWrapped import io.airbyte.cdk.load.message.StreamEndEvent +import io.airbyte.cdk.load.message.StreamKey import io.airbyte.cdk.load.message.StreamRecordEvent import io.airbyte.cdk.load.message.Undefined +import io.airbyte.cdk.load.pipeline.InputPartitioner +import io.airbyte.cdk.load.pipeline.LoadPipeline import io.airbyte.cdk.load.state.Reserved import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.DestinationTaskLauncher @@ -42,6 +50,7 @@ import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Secondary import jakarta.inject.Named import jakarta.inject.Singleton +import java.util.concurrent.ConcurrentHashMap interface InputConsumerTask : Task @@ -62,16 +71,26 @@ class DefaultInputConsumerTask( private val inputFlow: ReservingDeserializingInputFlow, private val recordQueueSupplier: MessageQueueSupplier>, - private val checkpointQueue: QueueWriter>, + val checkpointQueue: QueueWriter>, private val syncManager: SyncManager, private val destinationTaskLauncher: DestinationTaskLauncher, @Named("fileMessageQueue") private val fileTransferQueue: MessageQueue, + + // Required by new interface + @Named("recordQueue") + private val recordQueueForPipeline: + PartitionedQueue>>, + private val loadPipeline: LoadPipeline? = null, + private val partitioner: InputPartitioner, + private val openStreamQueue: QueueWriter ) : InputConsumerTask { private val log = KotlinLogging.logger {} override val terminalCondition: TerminalCondition = OnSyncFailureOnly + private val unopenedStreams = ConcurrentHashMap(catalog.streams.associateBy { it.descriptor }) + private suspend fun handleRecord( reserved: Reserved, sizeBytes: Long @@ -123,6 +142,65 @@ class DefaultInputConsumerTask( } } + private suspend fun handleRecordForPipeline( + reserved: Reserved, + ) { + val stream = reserved.value.stream + unopenedStreams.remove(stream)?.let { + log.info { "Saw first record for stream $stream; initializing" } + // Note, since we're not spilling to disk, there is nothing to do with + // any records before initialization is complete, so we'll wait here + // for it to finish. + openStreamQueue.publish(it) + syncManager.getOrAwaitStreamLoader(stream) + log.info { "Initialization for stream $stream complete" } + } + val manager = syncManager.getStreamManager(stream) + when (val message = reserved.value) { + is DestinationRecord -> { + val record = message.asRecordMarshaledToAirbyteValue() + manager.incrementReadCount() + val pipelineMessage = + PipelineMessage( + mapOf(manager.getCurrentCheckpointId() to 1), + StreamKey(stream), + record + ) + val partition = partitioner.getPartition(record, recordQueueForPipeline.partitions) + recordQueueForPipeline.publish(reserved.replace(pipelineMessage), partition) + } + is DestinationRecordStreamComplete -> { + manager.markEndOfStream(true) + log.info { "Read COMPLETE for stream $stream" } + recordQueueForPipeline.broadcast(reserved.replace(PipelineEndOfStream(stream))) + reserved.release() + } + is DestinationRecordStreamIncomplete -> { + manager.markEndOfStream(false) + log.info { "Read INCOMPLETE for stream $stream" } + recordQueueForPipeline.broadcast(reserved.replace(PipelineEndOfStream(stream))) + reserved.release() + } + is DestinationFile -> { + val index = manager.incrementReadCount() + // destinationTaskLauncher.handleFile(stream, message, index) + fileTransferQueue.publish(FileTransferQueueMessage(stream, message, index)) + } + is DestinationFileStreamComplete -> { + reserved.release() // safe because multiple calls conflate + manager.markEndOfStream(true) + val envelope = + BatchEnvelope( + SimpleBatch(Batch.State.COMPLETE), + streamDescriptor = message.stream, + ) + destinationTaskLauncher.handleNewBatch(stream, envelope) + } + is DestinationFileStreamIncomplete -> + throw IllegalStateException("File stream $stream failed upstream, cannot continue.") + } + } + private suspend fun handleCheckpoint( reservation: Reserved, sizeBytes: Long @@ -135,12 +213,19 @@ class DefaultInputConsumerTask( is StreamCheckpoint -> { val stream = checkpoint.checkpoint.stream val manager = syncManager.getStreamManager(stream) + val checkpointId = manager.getCurrentCheckpointId() val (currentIndex, countSinceLast) = manager.markCheckpoint() + val indexOrId = + if (loadPipeline == null) { + currentIndex + } else { + checkpointId.id.toLong() + } val messageWithCount = checkpoint.withDestinationStats(CheckpointMessage.Stats(countSinceLast)) checkpointQueue.publish( reservation.replace( - StreamCheckpointWrapped(sizeBytes, stream, currentIndex, messageWithCount) + StreamCheckpointWrapped(sizeBytes, stream, indexOrId, messageWithCount) ) ) } @@ -153,8 +238,15 @@ class DefaultInputConsumerTask( val streamWithIndexAndCount = catalog.streams.map { stream -> val manager = syncManager.getStreamManager(stream.descriptor) + val checkpointId = manager.getCurrentCheckpointId() val (currentIndex, countSinceLast) = manager.markCheckpoint() - Triple(stream, currentIndex, countSinceLast) + val indexOrId = + if (loadPipeline == null) { + currentIndex + } else { + checkpointId.id.toLong() + } + Triple(stream, indexOrId, countSinceLast) } val totalCount = streamWithIndexAndCount.sumOf { it.third } val messageWithCount = @@ -181,8 +273,13 @@ class DefaultInputConsumerTask( inputFlow.collect { (sizeBytes, reserved) -> when (val message = reserved.value) { /* If the input message represents a record. */ - is DestinationStreamAffinedMessage -> - handleRecord(reserved.replace(message), sizeBytes) + is DestinationStreamAffinedMessage -> { + if (loadPipeline != null) { + handleRecordForPipeline(reserved.replace(message)) + } else { + handleRecord(reserved.replace(message), sizeBytes) + } + } is CheckpointMessage -> handleCheckpoint(reserved.replace(message), sizeBytes) is Undefined -> { @@ -196,6 +293,7 @@ class DefaultInputConsumerTask( log.info { "Closing record queues" } catalog.streams.forEach { recordQueueSupplier.get(it.descriptor).close() } fileTransferQueue.close() + recordQueueForPipeline.close() } } } @@ -208,14 +306,22 @@ interface InputConsumerTaskFactory { MessageQueueSupplier>, checkpointQueue: QueueWriter>, destinationTaskLauncher: DestinationTaskLauncher, - fileTransferQueue: MessageQueue + fileTransferQueue: MessageQueue, + + // Required by new interface + recordQueueForPipeline: + PartitionedQueue>>, + loadPipeline: LoadPipeline?, + partitioner: InputPartitioner, + openStreamQueue: QueueWriter, ): InputConsumerTask } @Singleton @Secondary -class DefaultInputConsumerTaskFactory(private val syncManager: SyncManager) : - InputConsumerTaskFactory { +class DefaultInputConsumerTaskFactory( + private val syncManager: SyncManager, +) : InputConsumerTaskFactory { override fun make( catalog: DestinationCatalog, inputFlow: ReservingDeserializingInputFlow, @@ -224,6 +330,13 @@ class DefaultInputConsumerTaskFactory(private val syncManager: SyncManager) : checkpointQueue: QueueWriter>, destinationTaskLauncher: DestinationTaskLauncher, fileTransferQueue: MessageQueue, + + // Required by new interface + recordQueueForPipeline: + PartitionedQueue>>, + loadPipeline: LoadPipeline?, + partitioner: InputPartitioner, + openStreamQueue: QueueWriter, ): InputConsumerTask { return DefaultInputConsumerTask( catalog, @@ -233,6 +346,12 @@ class DefaultInputConsumerTaskFactory(private val syncManager: SyncManager) : syncManager, destinationTaskLauncher, fileTransferQueue, + + // Required by new interface + recordQueueForPipeline, + loadPipeline, + partitioner, + openStreamQueue, ) } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTask.kt new file mode 100644 index 0000000000000..3139a44f4f89c --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTask.kt @@ -0,0 +1,141 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.task.internal + +import io.airbyte.cdk.load.message.PartitionedQueue +import io.airbyte.cdk.load.message.PipelineEndOfStream +import io.airbyte.cdk.load.message.PipelineEvent +import io.airbyte.cdk.load.message.PipelineMessage +import io.airbyte.cdk.load.message.QueueWriter +import io.airbyte.cdk.load.message.WithBatchState +import io.airbyte.cdk.load.message.WithStream +import io.airbyte.cdk.load.pipeline.BatchAccumulator +import io.airbyte.cdk.load.pipeline.BatchEndOfStream +import io.airbyte.cdk.load.pipeline.BatchStateUpdate +import io.airbyte.cdk.load.pipeline.BatchUpdate +import io.airbyte.cdk.load.pipeline.OutputPartitioner +import io.airbyte.cdk.load.pipeline.PipelineFlushStrategy +import io.airbyte.cdk.load.state.CheckpointId +import io.airbyte.cdk.load.state.Reserved +import io.airbyte.cdk.load.task.OnEndOfSync +import io.airbyte.cdk.load.task.Task +import io.airbyte.cdk.load.task.TerminalCondition +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.fold + +data class RangeState( + val state: S, + val checkpointCounts: MutableMap = mutableMapOf(), + val inputCount: Long = 0, +) + +/** A long-running task that actually implements a load pipeline step. */ +class LoadPipelineStepTask( + private val batchAccumulator: BatchAccumulator, + private val inputFlow: Flow>>, + private val batchUpdateQueue: QueueWriter, + private val outputPartitioner: OutputPartitioner?, + private val outputQueue: PartitionedQueue>?, + private val flushStrategy: PipelineFlushStrategy?, + private val part: Int, +) : Task { + override val terminalCondition: TerminalCondition = OnEndOfSync + + override suspend fun execute() { + inputFlow.fold(mutableMapOf>()) { stateStore, reservation -> + try { + when (val input = reservation.value) { + is PipelineMessage -> { + // Fetch and update the local state associated with the current batch. + val state = + stateStore + .getOrPut(input.key) { + RangeState( + batchAccumulator.start(input.key, part), + ) + } + .let { it.copy(inputCount = it.inputCount + 1) } + val (newState, output) = + batchAccumulator.accept( + input.value, + state.state, + ) + reservation.release() // TODO: Accumulate and release when persisted + input.checkpointCounts.forEach { + state.checkpointCounts.merge(it.key, it.value) { old, new -> old + new } + } + + // If the accumulator did not produce a result, check if we should flush. + // If so, use the result of a finish call as the output. + val finalOutput = + output + ?: if (flushStrategy?.shouldFlush(state.inputCount) == true) { + batchAccumulator.finish(newState) + } else { + null + } + + if (finalOutput != null) { + // Publish the emitted output and evict the state. + handleOutput(input.key, state.checkpointCounts, finalOutput) + stateStore.remove(input.key) + } else { + // If there's no output yet, just update the local state. + stateStore[input.key] = RangeState(newState, state.checkpointCounts) + } + stateStore + } + is PipelineEndOfStream -> { + // Give any key associated with the stream a chance to finish + val keysToRemove = stateStore.keys.filter { it.stream == input.stream } + keysToRemove.forEach { key -> + stateStore.remove(key)?.let { stored -> + val output = batchAccumulator.finish(stored.state) + handleOutput(key, stored.checkpointCounts, output) + } + } + + outputQueue?.broadcast(PipelineEndOfStream(input.stream)) + batchUpdateQueue.publish(BatchEndOfStream(input.stream)) + + stateStore + } + } + } catch (t: Throwable) { + // Close the local state associated with the current batch. + stateStore.values + .map { runCatching { it.state.close() } } + .forEach { it.getOrThrow() } + throw t + } + } + } + + private suspend fun handleOutput( + inputKey: K1, + checkpointCounts: Map, + output: U + ) { + + // Only publish the output if there's a next step. + outputQueue?.let { + val outputKey = outputPartitioner!!.getOutputKey(inputKey, output) + val message = PipelineMessage(checkpointCounts, outputKey, output) + val outputPart = outputPartitioner.getPart(outputKey, it.partitions) + it.publish(message, outputPart) + } + + // If the output contained a global batch state, publish an update. + if (output is WithBatchState && output.state.isPersisted()) { + val update = + BatchStateUpdate( + stream = inputKey.stream, + checkpointCounts = checkpointCounts, + state = output.state + ) + batchUpdateQueue.publish(update) + } + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/UpdateBatchStateTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/UpdateBatchStateTask.kt new file mode 100644 index 0000000000000..32adf4c15c7bd --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/UpdateBatchStateTask.kt @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.task.internal + +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.message.Batch +import io.airbyte.cdk.load.message.QueueReader +import io.airbyte.cdk.load.pipeline.BatchStateUpdate +import io.airbyte.cdk.load.pipeline.BatchUpdate +import io.airbyte.cdk.load.state.CheckpointManager +import io.airbyte.cdk.load.state.SyncManager +import io.airbyte.cdk.load.task.DestinationTaskLauncher +import io.airbyte.cdk.load.task.OnEndOfSync +import io.airbyte.cdk.load.task.Task +import io.github.oshai.kotlinlogging.KotlinLogging +import jakarta.inject.Named +import jakarta.inject.Singleton + +/** A long-running task that updates the state of record batches after they are processed. */ +class UpdateBatchStateTask( + private val inputQueue: QueueReader, + private val syncManager: SyncManager, + private val checkpointManager: CheckpointManager, + private val launcher: DestinationTaskLauncher +) : Task { + private val log = KotlinLogging.logger {} + + override val terminalCondition = OnEndOfSync + + override suspend fun execute() { + inputQueue.consume().collect { message -> + val manager = syncManager.getStreamManager(message.stream) + if (message is BatchStateUpdate) { + when (message.state) { + Batch.State.COMPLETE -> { + message.checkpointCounts.forEach { + manager.incrementCompletedCount( + it.key, + it.value, + ) + } + } + Batch.State.PERSISTED -> { + message.checkpointCounts.forEach { + manager.incrementPersistedCount( + it.key, + it.value, + ) + } + } + else -> return@collect + } + } + checkpointManager.flushReadyCheckpointMessages() + if (manager.isBatchProcessingCompleteForCheckpoints()) { + log.info { "Batch processing complete for ${message.stream}" } + launcher.handleStreamComplete(message.stream) + } else { + log.info { "Batch processing still incomplete for ${message.stream}" } + } + } + } +} + +@Singleton +class UpdateBatchStateTaskFactory( + @Named("batchStateUpdateQueue") val inputQueue: QueueReader, + private val syncManager: SyncManager, + private val checkpointManager: CheckpointManager +) { + fun make(launcher: DestinationTaskLauncher): Task { + return UpdateBatchStateTask(inputQueue, syncManager, checkpointManager, launcher) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/DirectLoader.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/DirectLoader.kt new file mode 100644 index 0000000000000..3b759b7e21f80 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/DirectLoader.kt @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.write + +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue + +/** + * [DirectLoader] is for the use case where records are loaded directly into the destination or + * staged in chunks with a 3rd party library (eg, Iceberg) + * + * One direct loader will be created per batch of records per stream (optionally: and per part). It + * will be discarded at the end of the batch (defined below). + * + * A batch is a series of records that are loaded together, processed in the order they were + * received. Its end is user-defined, or forced at the end of the stream (or if the CDK determines + * the work needs to be flushed due to resource constraints, etc). From the implementor's POV, the + * end is when any staged work is forwarded, after which it is safe for the CDK to acknowledge to + * the Airbyte platform that the records have been handled. (Specifically, even after a sync failure + * these records might not be replayed, though some may be.) If records are being pushed + * incrementally, then there is no need to signal the end of a batch. + * + * To enable, declare a `@Singleton` inheriting from [DirectLoaderFactory] in your connector and set + * the value `airbyte.destination.load-strategy` to `direct` in your + * `src/main/resources/application-destination.yaml` + * + * [accept] will be called once per record until it returns [Complete]. If end-of-stream is reached + * before [accept] returns [Complete], [finish] will be called. [finish] might also be called at + * other times by the CDK to force work to be flushed and start a new batch. Implementors should + * always forward whatever work is in progress in [finish], as [accept] will not be called again for + * the same batch. + * + * [close] will be called once at the end of the batch, after the last call to [accept] or [finish], + * or if the sync fails. Afterward the loader will be discarded and a new one will be created for + * the next batch if more data arrives. (Note: close should only be used to do cleanup that must + * happen even if the sync fails; it should not be used to forward work.) + * + * By default, there is one part per stream, but this can be changed by setting + * [DirectLoaderFactory.inputPartitions] to a number greater than 1. Specifically, up to + * `numWorkers` DirectLoaders will be created per stream, and each will handle a specific subset of + * records concurrently, and each subset will be processed in order of receipt. + * + * By default, the work is partitioned by stream (ie, even with 2 parts, only one batch for Stream A + * will ever be in progress at a time, so increased concurrency will only help if streams are + * interleaved). To distribute the work differently, implement + * [io.airbyte.cdk.load.pipeline.InputPartitioner]. + */ +interface DirectLoader : AutoCloseable { + sealed interface DirectLoadResult + data object Incomplete : DirectLoadResult + data object Complete : DirectLoadResult + + /** + * Called once per record until it returns [Complete], after which [close] is called, the loader + * is discarded, and the records are considered processed by the platform. + */ + fun accept(record: DestinationRecordAirbyteValue): DirectLoadResult + + /** + * Called by the CDK to force work to finish. It will only be called if the last call to + * [accept] did not return [Complete]. After which [close] is called, the loader is discarded, + * and the records are considered processed by the platform. + */ + fun finish() +} + +interface DirectLoaderFactory : LoadStrategy { + fun create(streamDescriptor: DestinationStream.Descriptor, part: Int): T +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/LoadStrategy.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/LoadStrategy.kt new file mode 100644 index 0000000000000..21e522960873c --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/LoadStrategy.kt @@ -0,0 +1,10 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.write + +interface LoadStrategy { + val inputPartitions: Int + get() = 1 +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamLoader.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamLoader.kt index f45d3b67c81dc..fb721b68d1d91 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamLoader.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamLoader.kt @@ -49,6 +49,7 @@ interface StreamLoader : BatchAccumulator, FileBatchAccumulator { val stream: DestinationStream suspend fun start() {} + suspend fun createBatchAccumulator(): BatchAccumulator = this suspend fun createFileBatchAccumulator( outputQueue: MultiProducerChannel>, diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamStateStore.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamStateStore.kt new file mode 100644 index 0000000000000..4292a39c2e238 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamStateStore.kt @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.write + +import io.airbyte.cdk.load.command.DestinationStream +import jakarta.inject.Singleton +import java.util.concurrent.ConcurrentHashMap + +/** + * Can be used by the dev connector to pass state between different parts of the connector. To use + * it is sufficient to inject a StreamStateStore of any type into any component. The expected use is + * for making state generated during initialization generally available to the Loaders. + */ +@Singleton +class StreamStateStore { + private val store = ConcurrentHashMap() + + fun put(stream: DestinationStream.Descriptor, state: S) { + store[stream] = state + } + + fun get(stream: DestinationStream.Descriptor): S? { + return store[stream] + } +} diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/state/CheckpointManagerUTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/state/CheckpointManagerUTest.kt index 7a2a3c1844673..5390c69741738 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/state/CheckpointManagerUTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/state/CheckpointManagerUTest.kt @@ -61,7 +61,7 @@ class CheckpointManagerUTest { syncManager, outputConsumer, timeProvider, - checkpointById = checkpointById + checkpointById ) } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/state/StreamManagerTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/state/StreamManagerTest.kt index c01bad7dc3e39..99681ec65c369 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/state/StreamManagerTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/state/StreamManagerTest.kt @@ -18,7 +18,7 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertThrows +import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.extension.ExtensionContext import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.Arguments @@ -31,8 +31,8 @@ class StreamManagerTest { val manager1 = DefaultStreamManager(stream1) val manager2 = DefaultStreamManager(stream2) - Assertions.assertEquals(0, manager1.getNextCheckpointId().id) - Assertions.assertEquals(0, manager2.getNextCheckpointId().id) + Assertions.assertEquals(0, manager1.getCurrentCheckpointId().id) + Assertions.assertEquals(0, manager2.getCurrentCheckpointId().id) // Incrementing once yields (n, n) repeat(10) { manager1.incrementReadCount() } @@ -41,8 +41,8 @@ class StreamManagerTest { Assertions.assertEquals(10, index) Assertions.assertEquals(10, count) - Assertions.assertEquals(1, manager1.getNextCheckpointId().id) - Assertions.assertEquals(0, manager2.getNextCheckpointId().id) + Assertions.assertEquals(1, manager1.getCurrentCheckpointId().id) + Assertions.assertEquals(0, manager2.getCurrentCheckpointId().id) // Incrementing a second time yields (n + m, m) repeat(5) { manager1.incrementReadCount() } @@ -51,14 +51,14 @@ class StreamManagerTest { Assertions.assertEquals(15, index2) Assertions.assertEquals(5, count2) - Assertions.assertEquals(2, manager1.getNextCheckpointId().id) - Assertions.assertEquals(0, manager2.getNextCheckpointId().id) + Assertions.assertEquals(2, manager1.getCurrentCheckpointId().id) + Assertions.assertEquals(0, manager2.getCurrentCheckpointId().id) // Never incrementing yields (0, 0) val (index3, count3) = manager2.markCheckpoint() - Assertions.assertEquals(2, manager1.getNextCheckpointId().id) - Assertions.assertEquals(1, manager2.getNextCheckpointId().id) + Assertions.assertEquals(2, manager1.getCurrentCheckpointId().id) + Assertions.assertEquals(1, manager2.getCurrentCheckpointId().id) Assertions.assertEquals(0, index3) Assertions.assertEquals(0, count3) @@ -66,8 +66,8 @@ class StreamManagerTest { // Incrementing twice in a row yields (n + m + 0, 0) val (index4, count4) = manager1.markCheckpoint() - Assertions.assertEquals(3, manager1.getNextCheckpointId().id) - Assertions.assertEquals(1, manager2.getNextCheckpointId().id) + Assertions.assertEquals(3, manager1.getCurrentCheckpointId().id) + Assertions.assertEquals(1, manager2.getCurrentCheckpointId().id) Assertions.assertEquals(15, index4) Assertions.assertEquals(0, count4) @@ -478,7 +478,7 @@ class StreamManagerTest { @Test fun `test persisted counts`() { val manager = DefaultStreamManager(stream1) - val checkpointId = manager.getNextCheckpointId() + val checkpointId = manager.getCurrentCheckpointId() repeat(10) { manager.incrementReadCount() } manager.markCheckpoint() @@ -488,28 +488,17 @@ class StreamManagerTest { Assertions.assertFalse(manager.areRecordsPersistedUntilCheckpoint(checkpointId)) manager.incrementPersistedCount(checkpointId, 5) Assertions.assertTrue(manager.areRecordsPersistedUntilCheckpoint(checkpointId)) - - // Should throw if we try to persist more than the total count - assertThrows { manager.incrementPersistedCount(checkpointId, 1) } - } - - @Test - fun `test persisting un unmarked checkpoint throws`() { - val manager = DefaultStreamManager(stream1) - val checkpointId = manager.getNextCheckpointId() - - assertThrows { manager.incrementPersistedCount(checkpointId, 1) } } @Test fun `test persisted count for multiple checkpoints`() { val manager = DefaultStreamManager(stream1) - val checkpointId1 = manager.getNextCheckpointId() + val checkpointId1 = manager.getCurrentCheckpointId() repeat(10) { manager.incrementReadCount() } manager.markCheckpoint() - val checkpointId2 = manager.getNextCheckpointId() + val checkpointId2 = manager.getCurrentCheckpointId() repeat(15) { manager.incrementReadCount() } manager.markCheckpoint() @@ -529,13 +518,14 @@ class StreamManagerTest { fun `test persisted count for multiple checkpoints out of order`() { val manager = DefaultStreamManager(stream1) - val checkpointId1 = manager.getNextCheckpointId() + val checkpointId1 = manager.getCurrentCheckpointId() repeat(10) { manager.incrementReadCount() } manager.markCheckpoint() - val checkpointId2 = manager.getNextCheckpointId() + val checkpointId2 = manager.getCurrentCheckpointId() repeat(15) { manager.incrementReadCount() } + manager.markCheckpoint() Assertions.assertFalse(manager.areRecordsPersistedUntilCheckpoint(checkpointId1)) @@ -555,7 +545,7 @@ class StreamManagerTest { fun `test completion implies persistence`() { val manager = DefaultStreamManager(stream1) - val checkpointId1 = manager.getNextCheckpointId() + val checkpointId1 = manager.getCurrentCheckpointId() repeat(10) { manager.incrementReadCount() } manager.markCheckpoint() @@ -569,9 +559,6 @@ class StreamManagerTest { // Can still count persisted (but without effect) manager.incrementPersistedCount(checkpointId1, 10) Assertions.assertTrue(manager.areRecordsPersistedUntilCheckpoint(checkpointId1)) - - // Completed should also throw if we try to complete more than the total count - assertThrows { manager.incrementCompletedCount(checkpointId1, 1) } } @Test @@ -589,12 +576,12 @@ class StreamManagerTest { cases.forEach { steps -> val manager = DefaultStreamManager(stream1) - val checkpointId1 = manager.getNextCheckpointId() + val checkpointId1 = manager.getCurrentCheckpointId() repeat(10) { manager.incrementReadCount() } manager.markCheckpoint() - val checkpointId2 = manager.getNextCheckpointId() + val checkpointId2 = manager.getCurrentCheckpointId() repeat(20) { manager.incrementReadCount() } manager.markCheckpoint() @@ -618,4 +605,15 @@ class StreamManagerTest { } } } + + @Test + fun `do not throw when counting before marking`() { + val manager1 = DefaultStreamManager(stream1) + + Assertions.assertEquals(0, manager1.getCurrentCheckpointId().id) + + repeat(10) { manager1.incrementReadCount() } + manager1.incrementPersistedCount(manager1.getCurrentCheckpointId(), 10) + assertDoesNotThrow { manager1.markCheckpoint() } + } } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt index 66b4b9460c543..ce7e08a847dcc 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt @@ -13,10 +13,16 @@ import io.airbyte.cdk.load.command.MockDestinationConfiguration import io.airbyte.cdk.load.message.Batch import io.airbyte.cdk.load.message.BatchEnvelope import io.airbyte.cdk.load.message.CheckpointMessageWrapped +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue import io.airbyte.cdk.load.message.DestinationStreamEvent import io.airbyte.cdk.load.message.MessageQueue import io.airbyte.cdk.load.message.MessageQueueSupplier +import io.airbyte.cdk.load.message.PartitionedQueue +import io.airbyte.cdk.load.message.PipelineEvent import io.airbyte.cdk.load.message.QueueWriter +import io.airbyte.cdk.load.message.StreamKey +import io.airbyte.cdk.load.pipeline.InputPartitioner +import io.airbyte.cdk.load.pipeline.LoadPipeline import io.airbyte.cdk.load.state.Reserved import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.implementor.CloseStreamTask @@ -152,6 +158,11 @@ class DestinationTaskLauncherTest { checkpointQueue: QueueWriter>, destinationTaskLauncher: DestinationTaskLauncher, fileTransferQueue: MessageQueue, + recordQueueForPipeline: + PartitionedQueue>>, + loadPipeline: LoadPipeline?, + partitioner: InputPartitioner, + openStreamQueue: QueueWriter, ): InputConsumerTask { return object : InputConsumerTask { override val terminalCondition: TerminalCondition = SelfTerminating diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt index 1f865ba76c01d..02a85819668fd 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt @@ -9,12 +9,20 @@ import io.airbyte.cdk.load.command.DestinationConfiguration import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.message.Batch import io.airbyte.cdk.load.message.BatchEnvelope +import io.airbyte.cdk.load.message.ChannelMessageQueue import io.airbyte.cdk.load.message.CheckpointMessageWrapped +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue import io.airbyte.cdk.load.message.DestinationStreamEvent import io.airbyte.cdk.load.message.MessageQueue import io.airbyte.cdk.load.message.MessageQueueSupplier +import io.airbyte.cdk.load.message.PartitionedQueue +import io.airbyte.cdk.load.message.PipelineEvent import io.airbyte.cdk.load.message.QueueWriter import io.airbyte.cdk.load.message.SimpleBatch +import io.airbyte.cdk.load.message.StreamKey +import io.airbyte.cdk.load.pipeline.BatchUpdate +import io.airbyte.cdk.load.pipeline.InputPartitioner +import io.airbyte.cdk.load.pipeline.LoadPipeline import io.airbyte.cdk.load.state.Reserved import io.airbyte.cdk.load.state.StreamManager import io.airbyte.cdk.load.state.SyncManager @@ -29,12 +37,13 @@ import io.airbyte.cdk.load.task.implementor.ProcessFileTaskFactory import io.airbyte.cdk.load.task.implementor.ProcessRecordsTaskFactory import io.airbyte.cdk.load.task.implementor.SetupTaskFactory import io.airbyte.cdk.load.task.implementor.TeardownTaskFactory +import io.airbyte.cdk.load.task.internal.DefaultInputConsumerTaskFactory import io.airbyte.cdk.load.task.internal.FlushCheckpointsTaskFactory import io.airbyte.cdk.load.task.internal.FlushTickTask -import io.airbyte.cdk.load.task.internal.InputConsumerTaskFactory import io.airbyte.cdk.load.task.internal.ReservingDeserializingInputFlow import io.airbyte.cdk.load.task.internal.SpillToDiskTask import io.airbyte.cdk.load.task.internal.SpillToDiskTaskFactory +import io.airbyte.cdk.load.task.internal.UpdateBatchStateTaskFactory import io.airbyte.cdk.load.task.internal.UpdateCheckpointsTask import io.mockk.Called import io.mockk.coEvery @@ -45,6 +54,7 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow class DestinationTaskLauncherUTest { private val taskScopeProvider: TaskScopeProvider = mockk(relaxed = true) @@ -52,7 +62,7 @@ class DestinationTaskLauncherUTest { private val syncManager: SyncManager = mockk(relaxed = true) // Internal Tasks - private val inputConsumerTaskFactory: InputConsumerTaskFactory = mockk(relaxed = true) + private val inputConsumerTaskFactory: DefaultInputConsumerTaskFactory = mockk(relaxed = true) private val spillToDiskTaskFactory: SpillToDiskTaskFactory = mockk(relaxed = true) private val flushTickTask: FlushTickTask = mockk(relaxed = true) @@ -84,9 +94,17 @@ class DestinationTaskLauncherUTest { private val fileTransferQueue: MessageQueue = mockk(relaxed = true) private val openStreamQueue: MessageQueue = mockk(relaxed = true) + private val recordQueueForPipeline: + PartitionedQueue>> = + mockk(relaxed = true) + private val batchUpdateQueue: ChannelMessageQueue = mockk(relaxed = true) + private val partitioner: InputPartitioner = mockk(relaxed = true) + private val updateBatchTaskFactory: UpdateBatchStateTaskFactory = mockk(relaxed = true) + private fun getDefaultDestinationTaskLauncher( - useFileTranfer: Boolean - ): DefaultDestinationTaskLauncher { + useFileTranfer: Boolean, + loadPipeline: LoadPipeline? = null + ): DefaultDestinationTaskLauncher { return DefaultDestinationTaskLauncher( taskScopeProvider, catalog, @@ -112,6 +130,13 @@ class DestinationTaskLauncherUTest { checkpointQueue, fileTransferQueue, openStreamQueue, + + // New interface shim + recordQueueForPipeline, + batchUpdateQueue, + loadPipeline, + partitioner, + updateBatchTaskFactory ) } @@ -126,18 +151,17 @@ class DestinationTaskLauncherUTest { } @Test - fun `test that we don't start the spill-to-disk task when file transfer is enabled`() = - runTest { - val destinationTaskLauncher = getDefaultDestinationTaskLauncher(true) - // This is needed to let the run method to complete - destinationTaskLauncher.handleTeardownComplete() - destinationTaskLauncher.run() - - coVerify { spillToDiskTaskFactory wasNot Called } - } + fun `don't start the spill-to-disk task when file transfer is enabled`() = runTest { + val destinationTaskLauncher = getDefaultDestinationTaskLauncher(true) + // This is needed to let the run method to complete + destinationTaskLauncher.handleTeardownComplete() + destinationTaskLauncher.run() + + coVerify { spillToDiskTaskFactory wasNot Called } + } @Test - fun `test that we start the spill-to-disk task when file transfer is disabled`() = runTest { + fun `start the spill-to-disk task when file transfer is disabled`() = runTest { val spillToDiskTask = mockk(relaxed = true) coEvery { spillToDiskTaskFactory.make(any(), any()) } returns spillToDiskTask @@ -150,7 +174,7 @@ class DestinationTaskLauncherUTest { } @Test - fun `test handle exception`() = runTest { + fun `handle exception`() = runTest { val destinationTaskLauncher = getDefaultDestinationTaskLauncher(true) launch { destinationTaskLauncher.run() } val e = Exception("e") @@ -162,7 +186,7 @@ class DestinationTaskLauncherUTest { } @Test - fun `test run close stream no more than once per stream`() = runTest { + fun `run close stream no more than once per stream`() = runTest { val destinationTaskLauncher = getDefaultDestinationTaskLauncher(true) val streamManager = mockk(relaxed = true) coEvery { syncManager.getStreamManager(any()) } returns streamManager @@ -180,7 +204,7 @@ class DestinationTaskLauncherUTest { } @Test - fun `task successful completion triggers scope close`() = runTest { + fun `successful completion triggers scope close`() = runTest { // This should close the scope provider. val taskLauncher = getDefaultDestinationTaskLauncher(false) launch { @@ -191,7 +215,7 @@ class DestinationTaskLauncherUTest { } @Test - fun `test completion with failure triggers scope kill`() = runTest { + fun `completion with failure triggers scope kill`() = runTest { val taskLauncher = getDefaultDestinationTaskLauncher(false) launch { taskLauncher.run() @@ -201,7 +225,7 @@ class DestinationTaskLauncherUTest { } @Test - fun `test exceptions in tasks throw`() = runTest { + fun `exceptions in tasks throw`() = runTest { coEvery { spillToDiskTaskFactory.make(any(), any()) } answers { val task = mockk(relaxed = true) @@ -228,7 +252,7 @@ class DestinationTaskLauncherUTest { } @Test - fun `test numOpenStreamWorkers open stream tasks are launched`() = runTest { + fun `numOpenStreamWorkers open stream tasks are launched`() = runTest { val numOpenStreamWorkers = 3 val destinationTaskLauncher = getDefaultDestinationTaskLauncher(false) @@ -244,7 +268,7 @@ class DestinationTaskLauncherUTest { } @Test - fun `test streams opened when setup completes`() = runTest { + fun `streams are opened when setup completes`() = runTest { val launcher = getDefaultDestinationTaskLauncher(false) coEvery { openStreamQueue.publish(any()) } returns Unit @@ -253,4 +277,28 @@ class DestinationTaskLauncherUTest { coVerify(exactly = catalog.streams.size) { openStreamQueue.publish(any()) } } + + @Test + fun `don't start the load pipeline if not provided, do start old tasks`() = runTest { + val launcher = getDefaultDestinationTaskLauncher(false, null as LoadPipeline?) + coEvery { config.numProcessRecordsWorkers } returns 1 + coEvery { config.numProcessBatchWorkers } returns 1 + val job = assertDoesNotThrow { launch { launcher.run() } } + launcher.handleTeardownComplete(true) + job.join() + coVerify { processRecordsTaskFactory.make(any()) } + coVerify { processBatchTaskFactory.make(any()) } + job.cancel() + } + + @Test + fun `start the load pipeline if provided`() = runTest { + val pipeline = mockk(relaxed = true) + val launcher = getDefaultDestinationTaskLauncher(false, pipeline) + val job = launch { launcher.run() } + launcher.handleTeardownComplete(true) + job.join() + coVerify { pipeline.start(any()) } + job.cancel() + } } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/MockTaskLauncher.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/MockTaskLauncher.kt index c95fd6f0fb2a2..ea29cbedbaf8e 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/MockTaskLauncher.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/MockTaskLauncher.kt @@ -27,6 +27,10 @@ class MockTaskLauncher : DestinationTaskLauncher { batchEnvelopes.add(wrapped) } + override suspend fun handleStreamComplete(stream: DestinationStream.Descriptor) { + TODO("Not yet implemented") + } + override suspend fun handleStreamClosed(stream: DestinationStream.Descriptor) { throw NotImplementedError() } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/implementor/ProcessRecordsTaskTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/implementor/ProcessRecordsTaskTest.kt index 52146fc1472b7..3444bd0732638 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/implementor/ProcessRecordsTaskTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/implementor/ProcessRecordsTaskTest.kt @@ -45,7 +45,7 @@ class ProcessRecordsTaskTest { private lateinit var batchAccumulator: BatchAccumulator private lateinit var inputQueue: MessageQueue private lateinit var processRecordsTaskFactory: DefaultProcessRecordsTaskFactory - private lateinit var launcher: DefaultDestinationTaskLauncher + private lateinit var launcher: DefaultDestinationTaskLauncher<*> private lateinit var outputQueue: MultiProducerChannel> private lateinit var syncManager: SyncManager diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskTest.kt index 2c30993b44847..cfd5d0fe1a511 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskTest.kt @@ -100,6 +100,10 @@ class InputConsumerTaskTest { checkpointQueue = checkpointQueue, destinationTaskLauncher = mockk(), fileTransferQueue = mockk(relaxed = true), + recordQueueForPipeline = mockk(relaxed = true), + loadPipeline = null, + partitioner = mockk(relaxed = true), + openStreamQueue = mockk(relaxed = true), ) task.execute() @@ -157,6 +161,10 @@ class InputConsumerTaskTest { checkpointQueue = checkpointQueue, destinationTaskLauncher = mockk(), fileTransferQueue = mockk(relaxed = true), + recordQueueForPipeline = mockk(relaxed = true), + loadPipeline = null, + partitioner = mockk(relaxed = true), + openStreamQueue = mockk(relaxed = true), ) task.execute() coVerifySequence { @@ -195,6 +203,10 @@ class InputConsumerTaskTest { checkpointQueue = checkpointQueue, destinationTaskLauncher = mockk(), fileTransferQueue = mockk(relaxed = true), + recordQueueForPipeline = mockk(relaxed = true), + loadPipeline = null, + partitioner = mockk(relaxed = true), + openStreamQueue = mockk(relaxed = true), ) coEvery { inputFlow.collect(any()) } coAnswers { @@ -251,6 +263,10 @@ class InputConsumerTaskTest { checkpointQueue = checkpointQueue, destinationTaskLauncher = mockk(), fileTransferQueue = mockk(relaxed = true), + recordQueueForPipeline = mockk(relaxed = true), + loadPipeline = null, + partitioner = mockk(relaxed = true), + openStreamQueue = mockk(relaxed = true), ) coEvery { inputFlow.collect(any()) } coAnswers @@ -324,6 +340,10 @@ class InputConsumerTaskTest { checkpointQueue = checkpointQueue, destinationTaskLauncher = mockk(relaxed = true), fileTransferQueue = mockk(relaxed = true), + recordQueueForPipeline = mockk(relaxed = true), + loadPipeline = null, + partitioner = mockk(relaxed = true), + openStreamQueue = mockk(relaxed = true), ) assertThrows(IllegalStateException::class) { task.execute() } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskUTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskUTest.kt new file mode 100644 index 0000000000000..d9fe8a7e3947d --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskUTest.kt @@ -0,0 +1,140 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.task.internal + +import io.airbyte.cdk.load.command.DestinationCatalog +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema +import io.airbyte.cdk.load.message.CheckpointMessageWrapped +import io.airbyte.cdk.load.message.DestinationMessage +import io.airbyte.cdk.load.message.DestinationRecord +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue +import io.airbyte.cdk.load.message.DestinationStreamEvent +import io.airbyte.cdk.load.message.MessageQueue +import io.airbyte.cdk.load.message.MessageQueueSupplier +import io.airbyte.cdk.load.message.PartitionedQueue +import io.airbyte.cdk.load.message.PipelineEvent +import io.airbyte.cdk.load.message.QueueWriter +import io.airbyte.cdk.load.message.StreamKey +import io.airbyte.cdk.load.pipeline.InputPartitioner +import io.airbyte.cdk.load.pipeline.LoadPipeline +import io.airbyte.cdk.load.state.Reserved +import io.airbyte.cdk.load.state.SyncManager +import io.airbyte.cdk.load.task.DestinationTaskLauncher +import io.airbyte.cdk.load.task.implementor.FileTransferQueueMessage +import io.mockk.coEvery +import io.mockk.coVerify +import io.mockk.every +import io.mockk.impl.annotations.MockK +import io.mockk.mockk +import kotlinx.coroutines.flow.FlowCollector +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test + +class InputConsumerTaskUTest { + @MockK lateinit var catalog: DestinationCatalog + @MockK lateinit var inputFlow: ReservingDeserializingInputFlow + @MockK + lateinit var recordQueueSupplier: + MessageQueueSupplier> + @MockK lateinit var checkpointQueue: QueueWriter> + @MockK lateinit var syncManager: SyncManager + @MockK lateinit var destinationTaskLauncher: DestinationTaskLauncher + @MockK lateinit var fileTransferQueue: MessageQueue + @MockK + lateinit var recordQueueForPipeline: + PartitionedQueue>> + @MockK lateinit var partitioner: InputPartitioner + @MockK lateinit var openStreamQueue: QueueWriter + + private val stream = DestinationStream.Descriptor("namespace", "name") + + private fun createTask(loadPipeline: LoadPipeline?) = + DefaultInputConsumerTask( + catalog, + inputFlow, + recordQueueSupplier, + checkpointQueue, + syncManager, + destinationTaskLauncher, + fileTransferQueue, + recordQueueForPipeline, + loadPipeline, + partitioner, + openStreamQueue + ) + + @BeforeEach + fun setup() { + val dstream = mockk(relaxed = true) + every { dstream.descriptor } returns stream + coEvery { catalog.streams } returns listOf(dstream) + coEvery { recordQueueSupplier.get(stream) } returns mockk(relaxed = true) + coEvery { fileTransferQueue.close() } returns Unit + coEvery { recordQueueForPipeline.close() } returns Unit + coEvery { openStreamQueue.close() } returns Unit + coEvery { checkpointQueue.close() } returns Unit + } + + @Test + fun `input consumer does not use the new path when there is no load pipeline`() = runTest { + val inputConsumerTask = createTask(null) + + coEvery { inputFlow.collect(any()) } coAnswers + { + val collector: FlowCollector>> = firstArg() + collector.emit( + Pair( + 0L, + Reserved( + null, + 0, + DestinationRecord( + stream = stream, + message = mockk(relaxed = true), + serialized = "", + schema = ObjectTypeWithoutSchema + ) + ) + ) + ) + val job = launch { inputConsumerTask.execute() } + job.join() + coVerify { recordQueueSupplier.get(stream) } + coVerify(exactly = 0) { recordQueueForPipeline.publish(any(), any()) } + } + } + + @Test + fun `input consumer uses the new path when there is a load pipeline`(): Unit = runTest { + val inputConsumerTask = createTask(mockk(relaxed = true)) + + coEvery { inputFlow.collect(any()) } coAnswers + { + val collector: FlowCollector>> = firstArg() + collector.emit( + Pair( + 0L, + Reserved( + null, + 0, + DestinationRecord( + stream = stream, + message = mockk(relaxed = true), + serialized = "", + schema = ObjectTypeWithoutSchema + ) + ) + ) + ) + val job = launch { inputConsumerTask.execute() } + job.join() + coVerify(exactly = 0) { recordQueueSupplier.get(stream) } + coVerify { recordQueueForPipeline.publish(any(), any()) } + } + } +} diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTaskUTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTaskUTest.kt new file mode 100644 index 0000000000000..99a806360279f --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTaskUTest.kt @@ -0,0 +1,383 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.task.internal + +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.message.Batch +import io.airbyte.cdk.load.message.PipelineEndOfStream +import io.airbyte.cdk.load.message.PipelineEvent +import io.airbyte.cdk.load.message.PipelineMessage +import io.airbyte.cdk.load.message.QueueWriter +import io.airbyte.cdk.load.message.StreamKey +import io.airbyte.cdk.load.message.WithBatchState +import io.airbyte.cdk.load.pipeline.BatchAccumulator +import io.airbyte.cdk.load.pipeline.BatchStateUpdate +import io.airbyte.cdk.load.pipeline.BatchUpdate +import io.airbyte.cdk.load.state.CheckpointId +import io.airbyte.cdk.load.state.Reserved +import io.airbyte.cdk.load.util.setOnce +import io.mockk.coEvery +import io.mockk.coVerify +import io.mockk.every +import io.mockk.impl.annotations.MockK +import io.mockk.verify +import java.util.concurrent.atomic.AtomicBoolean +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.FlowCollector +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test + +class LoadPipelineStepTaskUTest { + @MockK + lateinit var batchAccumulatorNoUpdate: + BatchAccumulator + @MockK + lateinit var batchAccumulatorWithUpdate: + BatchAccumulator + @MockK lateinit var inputFlow: Flow>> + @MockK lateinit var batchUpdateQueue: QueueWriter + + data class Closeable(val id: Int = 0) : AutoCloseable { + override fun close() {} + } + + data class MyBatch(override val state: Batch.State) : WithBatchState + + @BeforeEach + fun setup() { + coEvery { batchAccumulatorNoUpdate.finish(any()) } returns true + } + + private fun createTask( + part: Int, + batchAccumulator: BatchAccumulator + ): LoadPipelineStepTask = + LoadPipelineStepTask( + batchAccumulator, + inputFlow, + batchUpdateQueue, + // TODO: test output partitioner, queue, and flush strategy when actually used + null, + null, + null, + part + ) + + private fun reserved(value: T): Reserved = Reserved(null, 0L, value) + private fun messageEvent( + key: StreamKey, + value: String, + counts: Map = emptyMap() + ): Reserved> = + reserved(PipelineMessage(counts.mapKeys { CheckpointId(it.key) }, key, value)) + private fun endOfStreamEvent(key: StreamKey): Reserved> = + reserved(PipelineEndOfStream(key.stream)) + + @Test + fun `start and accept called on first no-output message, accept only on second`() = runTest { + val key = StreamKey(DestinationStream.Descriptor("namespace", "stream")) + val part = 7 + val task = createTask(part, batchAccumulatorNoUpdate) + + // No call to accept will finish the batch, but state will be threaded through. + val state1 = Closeable(1) + val state2 = Closeable(2) + every { batchAccumulatorNoUpdate.start(any(), any()) } returns state1 + every { batchAccumulatorNoUpdate.accept("value_0", state1) } returns Pair(state2, null) + every { batchAccumulatorNoUpdate.accept("value_1", state2) } returns Pair(Closeable(), null) + + coEvery { inputFlow.collect(any()) } coAnswers + { + val collector = + firstArg>>>() + repeat(2) { collector.emit(messageEvent(key, "value_$it")) } + } + + task.execute() + + verify(exactly = 1) { batchAccumulatorNoUpdate.start(key, part) } + repeat(2) { verify(exactly = 1) { batchAccumulatorNoUpdate.accept("value_$it", any()) } } + verify(exactly = 0) { batchAccumulatorNoUpdate.finish(any()) } + } + + @Test + fun `start called again after batch completes (no update)`() = runTest { + val key = StreamKey(DestinationStream.Descriptor("namespace", "stream")) + val part = 6 + val task = createTask(part, batchAccumulatorNoUpdate) + val stateA1 = Closeable(1) + val stateA2 = Closeable(2) + val stateA3 = Closeable(3) + val stateB1 = Closeable(4) + val stateB2 = Closeable(5) + val startHasBeenCalled = AtomicBoolean(false) + + every { batchAccumulatorNoUpdate.start(any(), any()) } answers + { + if (startHasBeenCalled.setOnce()) stateA1 else stateB1 + } + every { batchAccumulatorNoUpdate.accept("value_0", stateA1) } returns Pair(stateA2, null) + every { batchAccumulatorNoUpdate.accept("value_1", stateA2) } returns Pair(stateA3, true) + every { batchAccumulatorNoUpdate.accept("value_2", stateB1) } returns Pair(stateB2, null) + + coEvery { inputFlow.collect(any()) } coAnswers + { + val collector = + firstArg>>>() + repeat(3) { collector.emit(messageEvent(key, "value_$it")) } + } + + task.execute() + verify(exactly = 2) { batchAccumulatorNoUpdate.start(key, part) } + repeat(3) { verify(exactly = 1) { batchAccumulatorNoUpdate.accept("value_$it", any()) } } + } + + @Test + fun `finish and update called on end-of-stream when last accept did not yield output`() = + runTest { + val key = StreamKey(DestinationStream.Descriptor("namespace", "stream")) + val part = 5 + val task = createTask(part, batchAccumulatorNoUpdate) + + every { batchAccumulatorNoUpdate.start(any(), any()) } returns Closeable() + every { batchAccumulatorNoUpdate.accept(any(), any()) } returns Pair(Closeable(), null) + every { batchAccumulatorNoUpdate.finish(any()) } returns true + coEvery { batchUpdateQueue.publish(any()) } returns Unit + + coEvery { inputFlow.collect(any()) } coAnswers + { + val collector = + firstArg>>>() + repeat(10) { // arbitrary number of messages + collector.emit(messageEvent(key, "value")) + } + collector.emit(endOfStreamEvent(key)) + } + + task.execute() + + verify(exactly = 1) { batchAccumulatorNoUpdate.start(key, part) } + verify(exactly = 10) { batchAccumulatorNoUpdate.accept(any(), any()) } + verify(exactly = 1) { batchAccumulatorNoUpdate.finish(any()) } + coVerify(exactly = 1) { batchUpdateQueue.publish(any()) } + } + + @Test + fun `update but not finish called on end-of-stream when last accept yielded output`() = + runTest { + val key = StreamKey(DestinationStream.Descriptor("namespace", "stream")) + val part = 4 + val task = createTask(part, batchAccumulatorNoUpdate) + + var acceptCalls = 0 + every { batchAccumulatorNoUpdate.start(any(), any()) } returns Closeable() + every { batchAccumulatorNoUpdate.accept(any(), any()) } answers + { + if (++acceptCalls == 10) Pair(Closeable(), true) else Pair(Closeable(), null) + } + every { batchAccumulatorNoUpdate.finish(any()) } returns true + coEvery { batchUpdateQueue.publish(any()) } returns Unit + + coEvery { inputFlow.collect(any()) } coAnswers + { + val collector = + firstArg>>>() + repeat(10) { // arbitrary number of messages + collector.emit(messageEvent(key, "value")) + } + collector.emit(endOfStreamEvent(key)) + } + + task.execute() + + verify(exactly = 1) { batchAccumulatorNoUpdate.start(key, part) } + verify(exactly = 10) { batchAccumulatorNoUpdate.accept(any(), any()) } + verify(exactly = 0) { batchAccumulatorNoUpdate.finish(any()) } + coVerify(exactly = 1) { batchUpdateQueue.publish(any()) } + } + + @Test + fun `update at end-of-batch when output provides persisted batch state`() = runTest { + val key = StreamKey(DestinationStream.Descriptor("namespace", "stream")) + val part = 99 + val task = createTask(part, batchAccumulator = batchAccumulatorWithUpdate) + var acceptCalls = 0 + + every { batchAccumulatorWithUpdate.start(any(), any()) } returns Closeable() + every { batchAccumulatorWithUpdate.accept(any(), any()) } answers + { + val output = + when (acceptCalls++ % 4) { + 0 -> null + 1 -> MyBatch(Batch.State.PROCESSED) + 2 -> MyBatch(Batch.State.PERSISTED) + 3 -> MyBatch(Batch.State.COMPLETE) + else -> error("unreachable") + } + Pair(Closeable(), output) + } + coEvery { batchUpdateQueue.publish(any()) } returns Unit + coEvery { inputFlow.collect(any()) } coAnswers + { + val collector = + firstArg>>>() + repeat(12) { // arbitrary number of messages + collector.emit(messageEvent(key, "value")) + } + } + + task.execute() + + verify(exactly = 9) { + batchAccumulatorWithUpdate.start(key, part) + } // only 1/4 are no output + verify(exactly = 12) { batchAccumulatorWithUpdate.accept(any(), any()) } // all have data + verify(exactly = 0) { batchAccumulatorWithUpdate.finish(any()) } // never end-of-stream + coVerify(exactly = 6) { batchUpdateQueue.publish(any()) } // half are PERSISTED/COMPLETE + } + + @Test + fun `manage state separately by stream`() = runTest { + val key1 = StreamKey(DestinationStream.Descriptor("namespace", "stream1")) + val key2 = StreamKey(DestinationStream.Descriptor("namespace", "stream2")) + val part = 89 + + val task = createTask(part, batchAccumulatorWithUpdate) + + // Make stream1 finish with a persisted output every 3 calls (otherwise null) + // Make stream2 finish with a persisted output every 2 calls (otherwise null) + val stream1States = (0 until 11).map { Closeable(it) } + val stream2States = (0 until 11).map { Closeable(it + 100) } + var stream1StartCalls = 0 + var stream2StartCalls = 0 + every { batchAccumulatorWithUpdate.start(key1, any()) } answers + { + // Stream 1 will finish on 0, 3, 6, 9 + // (so the last finish is right before end-of-stream, leaving no input to finish) + when (stream1StartCalls++) { + 0 -> stream1States.first() + 1 -> stream1States[1] + 2 -> stream1States[4] + 3 -> stream1States[7] + else -> error("unreachable stream1 start call") + } + } + every { batchAccumulatorWithUpdate.start(key2, any()) } answers + { + // Stream 2 will finish on 0, 2, 4, 6, 8 + // (so the last finish is one record before end-of-stream, leaving input to finish) + when (stream2StartCalls++) { + 0 -> stream2States.first() + 1 -> stream2States[1] + 2 -> stream2States[3] + 3 -> stream2States[5] + 4 -> stream2States[7] + 5 -> stream2States[9] + else -> error("unreachable stream2 start call") + } + } + repeat(10) { + every { batchAccumulatorWithUpdate.accept(any(), stream1States[it]) } returns + Pair( + stream1States[it + 1], + if (it % 3 == 0) MyBatch(Batch.State.PERSISTED) else null + ) + every { batchAccumulatorWithUpdate.accept(any(), stream2States[it]) } returns + Pair( + stream2States[it + 1], + if (it % 2 == 0) MyBatch(Batch.State.COMPLETE) else null + ) + } + + coEvery { inputFlow.collect(any()) } coAnswers + { + val collector = + firstArg>>>() + repeat(10) { // arbitrary number of messages + collector.emit(messageEvent(key1, "stream1_value")) + collector.emit(messageEvent(key2, "stream2_value")) + // Include an end-of-stream for each stream, as only the even-numbered + // emitter (stream2) will have just finished a batch. + } + collector.emit(endOfStreamEvent(key1)) + collector.emit(endOfStreamEvent(key2)) + } + + every { batchAccumulatorWithUpdate.finish(any()) } returns MyBatch(Batch.State.COMPLETE) + coEvery { batchUpdateQueue.publish(any()) } returns Unit + + task.execute() + + verify(exactly = 4) { batchAccumulatorWithUpdate.start(key1, part) } + verify(exactly = 6) { batchAccumulatorWithUpdate.start(key2, part) } + verify(exactly = 10) { + batchAccumulatorWithUpdate.accept("stream1_value", match { it in stream1States }) + } + verify(exactly = 10) { + batchAccumulatorWithUpdate.accept("stream2_value", match { it in stream2States }) + } + verify(exactly = 1) { batchAccumulatorWithUpdate.finish(stream2States[10]) } + } + + @Test + fun `checkpoint counts are merged`() = runTest { + val key1 = StreamKey(DestinationStream.Descriptor("namespace", "stream1")) + val key2 = StreamKey(DestinationStream.Descriptor("namespace", "stream2")) + val part = 66666 + + val task = createTask(part, batchAccumulatorWithUpdate) + + every { batchAccumulatorWithUpdate.start(key1, part) } returns Closeable(1) + every { batchAccumulatorWithUpdate.start(key2, part) } returns Closeable(2) + every { batchAccumulatorWithUpdate.accept("stream1_value", any()) } returns + Pair(Closeable(1), null) + every { batchAccumulatorWithUpdate.accept("stream2_value", any()) } returns + Pair(Closeable(2), null) + every { batchAccumulatorWithUpdate.finish(Closeable(1)) } returns + MyBatch(Batch.State.COMPLETE) + every { batchAccumulatorWithUpdate.finish(Closeable(2)) } returns + MyBatch(Batch.State.PERSISTED) + + coEvery { inputFlow.collect(any()) } coAnswers + { + val collector = + firstArg>>>() + + // Emit 10 messages for stream1, 10 messages for stream2 + repeat(12) { + collector.emit( + messageEvent(key1, "stream1_value", mapOf(it / 6 to it.toLong())) + ) // 0 -> 15, 1 -> 51 + collector.emit( + messageEvent(key2, "stream2_value", mapOf((it / 4) + 1 to it.toLong())) + ) // 1 -> 6, 2 -> 22, 3 -> 38 + } + + // Emit end-of-stream for stream1, end-of-stream for stream2 + collector.emit(endOfStreamEvent(key1)) + collector.emit(endOfStreamEvent(key2)) + } + + coEvery { batchUpdateQueue.publish(any()) } returns Unit + + task.execute() + + val expectedBatchUpdateStream1 = + BatchStateUpdate( + key1.stream, + mapOf(CheckpointId(0) to 15L, CheckpointId(1) to 51L), + Batch.State.COMPLETE + ) + val expectedBatchUpdateStream2 = + BatchStateUpdate( + key2.stream, + mapOf(CheckpointId(1) to 6L, CheckpointId(2) to 22L, CheckpointId(3) to 38L), + Batch.State.PERSISTED + ) + coVerify(exactly = 1) { batchUpdateQueue.publish(expectedBatchUpdateStream1) } + coVerify(exactly = 1) { batchUpdateQueue.publish(expectedBatchUpdateStream2) } + } +} diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt index 286e302fcbcca..c8092a024db44 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt @@ -17,8 +17,7 @@ import io.airbyte.cdk.load.data.avro.toAvroRecord import io.airbyte.cdk.load.data.avro.toAvroSchema import io.airbyte.cdk.load.data.csv.toCsvRecord import io.airbyte.cdk.load.data.dataWithAirbyteMeta -import io.airbyte.cdk.load.data.json.toJson -import io.airbyte.cdk.load.data.parquet.ParquetMapperPipelineFactory +import io.airbyte.cdk.load.data.parquet.ParquetMapperPipelineTest import io.airbyte.cdk.load.data.withAirbyteMeta import io.airbyte.cdk.load.file.StreamProcessor import io.airbyte.cdk.load.file.avro.toAvroWriter @@ -163,7 +162,7 @@ class ParquetFormattingWriter( ) : ObjectStorageFormattingWriter { private val log = KotlinLogging.logger {} - private val pipeline = ParquetMapperPipelineFactory().create(stream) + private val pipeline = ParquetMapperPipelineTest().create(stream) private val mappedSchema: ObjectType = pipeline.finalSchema.withAirbyteMeta(rootLevelFlattening) private val avroSchema: Schema = mappedSchema.toAvroSchema(stream.descriptor) private val writer: ParquetWriter = diff --git a/airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/data/parquet/ParquetMapperPipelineFactory.kt b/airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/data/parquet/ParquetMapperPipelineTest.kt similarity index 97% rename from airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/data/parquet/ParquetMapperPipelineFactory.kt rename to airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/data/parquet/ParquetMapperPipelineTest.kt index 15f7023c71638..359a5d45bcc47 100644 --- a/airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/data/parquet/ParquetMapperPipelineFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/data/parquet/ParquetMapperPipelineTest.kt @@ -18,7 +18,7 @@ import io.airbyte.cdk.load.data.TimeStringToInteger import io.airbyte.cdk.load.data.UnionTypeToDisjointRecord import io.airbyte.cdk.load.data.UnionValueToDisjointRecord -class ParquetMapperPipelineFactory : MapperPipelineFactory { +class ParquetMapperPipelineTest : MapperPipelineFactory { override fun create(stream: DestinationStream): MapperPipeline = MapperPipeline( stream.schema, diff --git a/airbyte-cdk/bulk/toolkits/load-parquet/src/test/kotlin/ParquetMapperPipelineTest.kt b/airbyte-cdk/bulk/toolkits/load-parquet/src/test/kotlin/ParquetMapperBatchPipelineTest.kt similarity index 96% rename from airbyte-cdk/bulk/toolkits/load-parquet/src/test/kotlin/ParquetMapperPipelineTest.kt rename to airbyte-cdk/bulk/toolkits/load-parquet/src/test/kotlin/ParquetMapperBatchPipelineTest.kt index a390507429df8..518d44a517793 100644 --- a/airbyte-cdk/bulk/toolkits/load-parquet/src/test/kotlin/ParquetMapperPipelineTest.kt +++ b/airbyte-cdk/bulk/toolkits/load-parquet/src/test/kotlin/ParquetMapperBatchPipelineTest.kt @@ -4,12 +4,12 @@ import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.data.* -import io.airbyte.cdk.load.data.parquet.ParquetMapperPipelineFactory +import io.airbyte.cdk.load.data.parquet.ParquetMapperPipelineTest import io.mockk.every import io.mockk.mockk import org.junit.jupiter.api.Test -class ParquetMapperPipelineTest { +class ParquetMapperBatchPipelineTest { @Test fun `test conversions nested in unions`() { val stream = mockk() @@ -81,7 +81,7 @@ class ParquetMapperPipelineTest { ), ) ) - val pipeline = ParquetMapperPipelineFactory().create(stream) + val pipeline = ParquetMapperPipelineTest().create(stream) val schemaMapped = pipeline.finalSchema as ObjectType val (recordMapped, _) = pipeline.map(record) diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/metadata.yaml b/airbyte-integrations/connectors/destination-s3-data-lake/metadata.yaml index eb081ce9e0ca5..aecc4714e2420 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3-data-lake/metadata.yaml @@ -26,7 +26,7 @@ data: alias: airbyte-connector-testing-secret-store connectorType: destination definitionId: 716ca874-520b-4902-9f80-9fad66754b89 - dockerImageTag: 0.3.13 + dockerImageTag: 0.3.14 dockerRepository: airbyte/destination-s3-data-lake documentationUrl: https://docs.airbyte.com/integrations/destinations/s3-data-lake githubIssueLabel: destination-s3-data-lake diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeConfiguration.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeConfiguration.kt index 6f9249466c080..248d2ce27960c 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeConfiguration.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeConfiguration.kt @@ -24,16 +24,16 @@ data class S3DataLakeConfiguration( override val awsAccessKeyConfiguration: AWSAccessKeyConfiguration, override val s3BucketConfiguration: S3BucketConfiguration, override val icebergCatalogConfiguration: IcebergCatalogConfiguration, - override val numProcessRecordsWorkers: Int, - override val numProcessBatchWorkers: Int, + // Now that partitioning is enabled, we can run more than one worker. + // This will likely not show performance improvements in the cloud without additional + // resources. In the future, if enterprise or oss users need more flexibility, we can + // expose this in their configurations. + override val numProcessRecordsWorkers: Int = 2 ) : DestinationConfiguration(), AWSAccessKeyConfigurationProvider, IcebergCatalogConfigurationProvider, - S3BucketConfigurationProvider { - override val recordBatchSizeBytes: Long - get() = 1500 * 1024 * 1024 -} + S3BucketConfigurationProvider {} @Singleton class S3DataLakeConfigurationFactory : @@ -46,12 +46,6 @@ class S3DataLakeConfigurationFactory : awsAccessKeyConfiguration = pojo.toAWSAccessKeyConfiguration(), s3BucketConfiguration = pojo.toS3BucketConfiguration(), icebergCatalogConfiguration = pojo.toIcebergCatalogConfiguration(), - // When running in dedup mode, we need to process everything in serial, - // so that we don't overwrite newer records with older records. - // For the sake of simplicity, just set workers to 1 regardless of - // sync mode. - numProcessRecordsWorkers = 1, - numProcessBatchWorkers = 1, ) } } diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeDirectLoader.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeDirectLoader.kt new file mode 100644 index 0000000000000..e4e94c062ac42 --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeDirectLoader.kt @@ -0,0 +1,122 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.s3_data_lake + +import io.airbyte.cdk.load.command.DestinationCatalog +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.data.MapperPipeline +import io.airbyte.cdk.load.data.iceberg.parquet.IcebergParquetPipelineFactory +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergTableWriterFactory +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergUtil +import io.airbyte.cdk.load.write.DirectLoader +import io.airbyte.cdk.load.write.DirectLoaderFactory +import io.airbyte.cdk.load.write.StreamStateStore +import io.github.oshai.kotlinlogging.KotlinLogging +import jakarta.inject.Singleton +import org.apache.iceberg.Schema +import org.apache.iceberg.Table +import org.apache.iceberg.data.Record +import org.apache.iceberg.io.BaseTaskWriter + +@Singleton +class S3DataLakeDirectLoaderFactory( + private val catalog: DestinationCatalog, + private val config: S3DataLakeConfiguration, + private val streamStateStore: StreamStateStore, + private val icebergTableWriterFactory: IcebergTableWriterFactory, + private val icebergUtil: IcebergUtil, +) : DirectLoaderFactory { + private val log = KotlinLogging.logger {} + + override val inputPartitions: Int = config.numProcessRecordsWorkers + + override fun create( + streamDescriptor: DestinationStream.Descriptor, + part: Int + ): S3DataLakeDirectLoader { + log.info { "Creating direct loader for stream $streamDescriptor" } + + val state = streamStateStore.get(streamDescriptor)!! + val stream = catalog.getStream(streamDescriptor) + val writer = + icebergTableWriterFactory.create( + table = state.table, + generationId = icebergUtil.constructGenerationIdSuffix(stream), + importType = stream.importType, + schema = state.schema + ) + + return S3DataLakeDirectLoader( + batchSize = config.recordBatchSizeBytes, + stream = stream, + table = state.table, + schema = state.schema, + stagingBranchName = DEFAULT_STAGING_BRANCH, + writer = writer, + icebergUtil = icebergUtil, + pipeline = IcebergParquetPipelineFactory().create(stream) + ) + } +} + +class S3DataLakeDirectLoader( + private val stream: DestinationStream, + private val table: Table, + private val schema: Schema, + private val stagingBranchName: String, + private val batchSize: Long, + private val writer: BaseTaskWriter, + private val icebergUtil: IcebergUtil, + private val pipeline: MapperPipeline +) : DirectLoader { + private val log = KotlinLogging.logger {} + private var dataSize = 0L + + companion object { + val commitLock: Any = Any() + } + + override fun accept(record: DestinationRecordAirbyteValue): DirectLoader.DirectLoadResult { + val icebergRecord = + icebergUtil.toRecord( + record = record, + stream = stream, + tableSchema = schema, + pipeline = pipeline + ) + writer.write(icebergRecord) + + dataSize += record.serializedSizeBytes // TODO: use icebergRecord.size() instead? + if (dataSize < batchSize) { + return DirectLoader.Incomplete + } + + finish() + + return DirectLoader.Complete + } + + override fun finish() { + log.info { "Finishing writing to $stagingBranchName" } + val writeResult = writer.complete() + if (writeResult.deleteFiles().isNotEmpty()) { + val delta = table.newRowDelta().toBranch(stagingBranchName) + writeResult.dataFiles().forEach { delta.addRows(it) } + writeResult.deleteFiles().forEach { delta.addDeletes(it) } + synchronized(commitLock) { delta.commit() } + } else { + val append = table.newAppend().toBranch(stagingBranchName) + writeResult.dataFiles().forEach { append.appendFile(it) } + synchronized(commitLock) { append.commit() } + } + log.info { "Finished writing records to $stagingBranchName" } + } + + override fun close() { + log.info { "Closing writer for $stagingBranchName" } + writer.close() + } +} diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakePartitioner.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakePartitioner.kt new file mode 100644 index 0000000000000..dff24d58eb124 --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakePartitioner.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.s3_data_lake + +import io.airbyte.cdk.load.command.Dedupe +import io.airbyte.cdk.load.command.DestinationCatalog +import io.airbyte.cdk.load.data.ObjectValue +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue +import io.airbyte.cdk.load.pipeline.InputPartitioner +import jakarta.inject.Singleton +import kotlin.math.abs +import kotlin.random.Random + +@Singleton +class S3DataLakePartitioner(catalog: DestinationCatalog) : InputPartitioner { + private val streamToPrimaryKeyFieldNames = + catalog.streams.associate { stream -> + stream.descriptor to + when (stream.importType) { + is Dedupe -> (stream.importType as Dedupe).primaryKey + else -> null + } + } + private val random = Random(System.currentTimeMillis()) + + override fun getPartition(record: DestinationRecordAirbyteValue, numParts: Int): Int { + if (numParts == 1) { + return 0 + } + + streamToPrimaryKeyFieldNames[record.stream]?.let { primaryKey -> + val primaryKeyValues = + primaryKey.map { it.map { key -> (record.data as ObjectValue).values[key] } } + val hash = primaryKeyValues.hashCode() + /** abs(MIN_VALUE) == MIN_VALUE, so we need to handle this case separately */ + if (hash == Int.MIN_VALUE) { + return 0 + } + return abs(primaryKeyValues.hashCode()) % numParts + } + ?: run { + return abs(random.nextInt()) % numParts + } + } +} diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeStreamLoader.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeStreamLoader.kt index 0ec48f8406ac5..b17e334867c29 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeStreamLoader.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeStreamLoader.kt @@ -7,16 +7,13 @@ package io.airbyte.integrations.destination.s3_data_lake import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.data.iceberg.parquet.IcebergParquetPipelineFactory -import io.airbyte.cdk.load.message.Batch -import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue -import io.airbyte.cdk.load.message.SimpleBatch import io.airbyte.cdk.load.state.StreamProcessingFailed import io.airbyte.cdk.load.toolkits.iceberg.parquet.ColumnTypeChangeBehavior import io.airbyte.cdk.load.toolkits.iceberg.parquet.IcebergTableSynchronizer import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergTableCleaner -import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergTableWriterFactory import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergUtil import io.airbyte.cdk.load.write.StreamLoader +import io.airbyte.cdk.load.write.StreamStateStore import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeUtil import io.github.oshai.kotlinlogging.KotlinLogging import org.apache.iceberg.Schema @@ -30,11 +27,11 @@ class S3DataLakeStreamLoader( private val icebergConfiguration: S3DataLakeConfiguration, override val stream: DestinationStream, private val icebergTableSynchronizer: IcebergTableSynchronizer, - private val icebergTableWriterFactory: IcebergTableWriterFactory, private val s3DataLakeUtil: S3DataLakeUtil, private val icebergUtil: IcebergUtil, private val stagingBranchName: String, - private val mainBranchName: String + private val mainBranchName: String, + private val streamStateStore: StreamStateStore, ) : StreamLoader { private lateinit var table: Table private lateinit var targetSchema: Schema @@ -86,47 +83,13 @@ class S3DataLakeStreamLoader( "branch $DEFAULT_STAGING_BRANCH already exists for stream ${stream.descriptor}" } } - } - override suspend fun processRecords( - records: Iterator, - totalSizeBytes: Long, - endOfStream: Boolean - ): Batch { - icebergTableWriterFactory - .create( + val state = + S3DataLakeStreamState( table = table, - generationId = icebergUtil.constructGenerationIdSuffix(stream), - importType = stream.importType, schema = targetSchema, ) - .use { writer -> - logger.info { "Writing records to branch $stagingBranchName" } - records.forEach { record -> - val icebergRecord = - icebergUtil.toRecord( - record = record, - stream = stream, - tableSchema = targetSchema, - pipeline = pipeline, - ) - writer.write(icebergRecord) - } - val writeResult = writer.complete() - if (writeResult.deleteFiles().isNotEmpty()) { - val delta = table.newRowDelta().toBranch(stagingBranchName) - writeResult.dataFiles().forEach { delta.addRows(it) } - writeResult.deleteFiles().forEach { delta.addDeletes(it) } - delta.commit() - } else { - val append = table.newAppend().toBranch(stagingBranchName) - writeResult.dataFiles().forEach { append.appendFile(it) } - append.commit() - } - logger.info { "Finished writing records to $stagingBranchName" } - } - - return SimpleBatch(Batch.State.COMPLETE) + streamStateStore.put(stream.descriptor, state) } override suspend fun close(streamFailure: StreamProcessingFailed?) { @@ -143,6 +106,7 @@ class S3DataLakeStreamLoader( table.refresh() computeOrExecuteSchemaUpdate().pendingUpdate?.commit() table.manageSnapshots().fastForwardBranch(mainBranchName, stagingBranchName).commit() + if (stream.minimumGenerationId > 0) { logger.info { "Detected a minimum generation ID (${stream.minimumGenerationId}). Preparing to delete obsolete generation IDs." diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeStreamState.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeStreamState.kt new file mode 100644 index 0000000000000..590d11ce3d9ae --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeStreamState.kt @@ -0,0 +1,13 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.s3_data_lake + +import org.apache.iceberg.Schema +import org.apache.iceberg.Table + +class S3DataLakeStreamState( + val table: Table, + val schema: Schema, +) diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriter.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriter.kt index c1e42d7dd76da..bc7e7d795d10d 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriter.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriter.kt @@ -9,23 +9,23 @@ import io.airbyte.cdk.load.command.DestinationCatalog import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.toolkits.iceberg.parquet.IcebergTableSynchronizer import io.airbyte.cdk.load.toolkits.iceberg.parquet.TableIdGenerator -import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergTableWriterFactory import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergUtil import io.airbyte.cdk.load.write.DestinationWriter import io.airbyte.cdk.load.write.StreamLoader +import io.airbyte.cdk.load.write.StreamStateStore import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeUtil import javax.inject.Singleton import org.apache.iceberg.catalog.TableIdentifier @Singleton class S3DataLakeWriter( - private val icebergTableWriterFactory: IcebergTableWriterFactory, private val icebergConfiguration: S3DataLakeConfiguration, private val s3DataLakeUtil: S3DataLakeUtil, private val icebergUtil: IcebergUtil, private val icebergTableSynchronizer: IcebergTableSynchronizer, private val catalog: DestinationCatalog, private val tableIdGenerator: TableIdGenerator, + private val streamStateStore: StreamStateStore ) : DestinationWriter { override suspend fun setup() { super.setup() @@ -64,11 +64,11 @@ class S3DataLakeWriter( icebergConfiguration, stream, icebergTableSynchronizer, - icebergTableWriterFactory, s3DataLakeUtil, icebergUtil, stagingBranchName = DEFAULT_STAGING_BRANCH, mainBranchName = icebergConfiguration.icebergCatalogConfiguration.mainBranchName, + streamStateStore = streamStateStore, ) } } diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakePerformanceTest.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakePerformanceTest.kt index 64e052af801a1..3833c3aa802be 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakePerformanceTest.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakePerformanceTest.kt @@ -14,7 +14,7 @@ class S3DataLakePerformanceTest : BasicPerformanceTest( configContents = Files.readString(S3DataLakeTestUtil.GLUE_CONFIG_PATH), configSpecClass = S3DataLakeSpecification::class.java, - defaultRecordsToInsert = 500_000, + defaultRecordsToInsert = 1_000_000, micronautProperties = S3DataLakeTestUtil.getAwsAssumeRoleCredentials().asMicronautProperties(), ) diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeStreamLoaderTest.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeStreamLoaderTest.kt index 5cd1b9d98ce2f..c88d26d72e345 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeStreamLoaderTest.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeStreamLoaderTest.kt @@ -27,10 +27,11 @@ import io.airbyte.cdk.load.toolkits.iceberg.parquet.ColumnTypeChangeBehavior import io.airbyte.cdk.load.toolkits.iceberg.parquet.IcebergSuperTypeFinder import io.airbyte.cdk.load.toolkits.iceberg.parquet.IcebergTableSynchronizer import io.airbyte.cdk.load.toolkits.iceberg.parquet.IcebergTypesComparator -import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergTableWriterFactory import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergUtil +import io.airbyte.cdk.load.write.StreamStateStore import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeUtil import io.mockk.every +import io.mockk.impl.annotations.MockK import io.mockk.just import io.mockk.mockk import io.mockk.runs @@ -45,9 +46,17 @@ import org.apache.iceberg.io.CloseableIterable import org.apache.iceberg.types.Type.PrimitiveType import org.apache.iceberg.types.Types import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test internal class S3DataLakeStreamLoaderTest { + @MockK(relaxed = true) + private lateinit var streamStateStore: StreamStateStore + + @BeforeEach + fun setup() { + every { streamStateStore.put(any(), any()) } returns Unit + } @Test fun testCreateStreamLoader() { @@ -106,7 +115,6 @@ internal class S3DataLakeStreamLoaderTest { ), Types.NestedField.of(12, false, COLUMN_NAME_AB_GENERATION_ID, Types.LongType.get()), ) - val icebergTableWriterFactory: IcebergTableWriterFactory = mockk() val awsConfiguration: AWSAccessKeyConfiguration = mockk { every { accessKeyId } returns "access-key" every { secretAccessKey } returns "secret-access-key" @@ -152,11 +160,11 @@ internal class S3DataLakeStreamLoaderTest { IcebergTypesComparator(), IcebergSuperTypeFinder(IcebergTypesComparator()), ), - icebergTableWriterFactory, s3DataLakeUtil, icebergUtil, stagingBranchName = DEFAULT_STAGING_BRANCH, mainBranchName = "main", + streamStateStore = streamStateStore, ) assertNotNull(streamLoader) } @@ -183,7 +191,6 @@ internal class S3DataLakeStreamLoaderTest { Schema( Types.NestedField.of(2, true, "name", Types.StringType.get()), ) - val icebergTableWriterFactory: IcebergTableWriterFactory = mockk() val awsConfiguration: AWSAccessKeyConfiguration = mockk { every { accessKeyId } returns "access-key" every { secretAccessKey } returns "secret-access-key" @@ -252,11 +259,11 @@ internal class S3DataLakeStreamLoaderTest { IcebergTypesComparator(), IcebergSuperTypeFinder(IcebergTypesComparator()), ), - icebergTableWriterFactory, s3DataLakeUtil, icebergUtil, stagingBranchName = DEFAULT_STAGING_BRANCH, mainBranchName = "main", + streamStateStore = streamStateStore, ) runBlocking { streamLoader.start() } @@ -335,7 +342,6 @@ internal class S3DataLakeStreamLoaderTest { Types.NestedField.of(12, false, COLUMN_NAME_AB_GENERATION_ID, Types.LongType.get()), ) val icebergSchema = Schema(columns, emptySet()) - val icebergTableWriterFactory: IcebergTableWriterFactory = mockk() val awsConfiguration: AWSAccessKeyConfiguration = mockk { every { accessKeyId } returns "access-key" every { secretAccessKey } returns "secret-access-key" @@ -404,11 +410,11 @@ internal class S3DataLakeStreamLoaderTest { IcebergTypesComparator(), IcebergSuperTypeFinder(IcebergTypesComparator()), ), - icebergTableWriterFactory, s3DataLakeUtil, icebergUtil, stagingBranchName = DEFAULT_STAGING_BRANCH, mainBranchName = "main", + streamStateStore = streamStateStore, ) runBlocking { streamLoader.start() } @@ -444,7 +450,6 @@ internal class S3DataLakeStreamLoaderTest { syncId = 1, ) val icebergConfiguration: S3DataLakeConfiguration = mockk() - val icebergTableWriterFactory: IcebergTableWriterFactory = mockk() val s3DataLakeUtil: S3DataLakeUtil = mockk() val icebergUtil: IcebergUtil = mockk { every { toIcebergSchema(any(), any()) } answers @@ -461,11 +466,11 @@ internal class S3DataLakeStreamLoaderTest { IcebergTypesComparator(), IcebergSuperTypeFinder(IcebergTypesComparator()), ), - icebergTableWriterFactory, s3DataLakeUtil, icebergUtil, stagingBranchName = DEFAULT_STAGING_BRANCH, mainBranchName = "main", + streamStateStore = streamStateStore, ) assertEquals( diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeUtilTest.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeUtilTest.kt index 89d85bab3b9d3..cd3b491edaab3 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeUtilTest.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeUtilTest.kt @@ -20,7 +20,7 @@ import io.airbyte.cdk.load.data.ObjectValue import io.airbyte.cdk.load.data.StringType import io.airbyte.cdk.load.data.StringValue import io.airbyte.cdk.load.data.TimestampWithTimezoneValue -import io.airbyte.cdk.load.data.parquet.ParquetMapperPipelineFactory +import io.airbyte.cdk.load.data.parquet.ParquetMapperPipelineTest import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue import io.airbyte.cdk.load.message.Meta import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_EXTRACTED_AT @@ -215,7 +215,7 @@ internal class S3DataLakeUtilTest { emittedAtMs = System.currentTimeMillis(), meta = Meta(), ) - val pipeline = ParquetMapperPipelineFactory().create(airbyteStream) + val pipeline = ParquetMapperPipelineTest().create(airbyteStream) val columns = mutableListOf( Types.NestedField.required(1, "id", Types.IntegerType.get()), @@ -267,7 +267,7 @@ internal class S3DataLakeUtilTest { emittedAtMs = System.currentTimeMillis(), meta = Meta(), ) - val pipeline = ParquetMapperPipelineFactory().create(airbyteStream) + val pipeline = ParquetMapperPipelineTest().create(airbyteStream) val columns = mutableListOf( Types.NestedField.required(1, "id", Types.IntegerType.get()), @@ -314,7 +314,7 @@ internal class S3DataLakeUtilTest { emittedAtMs = System.currentTimeMillis(), meta = Meta(), ) - val pipeline = ParquetMapperPipelineFactory().create(airbyteStream) + val pipeline = ParquetMapperPipelineTest().create(airbyteStream) val columns = mutableListOf( Types.NestedField.required(1, "id", Types.IntegerType.get()), @@ -366,7 +366,6 @@ internal class S3DataLakeUtilTest { icebergCatalogConfiguration = icebergCatalogConfiguration, s3BucketConfiguration = s3BucketConfiguration, numProcessRecordsWorkers = 1, - numProcessBatchWorkers = 1, ) val catalogProperties = s3DataLakeUtil.toCatalogProperties(config = configuration) assertEquals(ICEBERG_CATALOG_TYPE_NESSIE, catalogProperties[ICEBERG_CATALOG_TYPE]) @@ -458,7 +457,7 @@ internal class S3DataLakeUtilTest { minimumGenerationId = 1, syncId = 1, ) - val pipeline = ParquetMapperPipelineFactory().create(stream) + val pipeline = ParquetMapperPipelineTest().create(stream) val schema = icebergUtil.toIcebergSchema(stream = stream, pipeline = pipeline) assertEquals(primaryKeys.toSet(), schema.identifierFieldNames()) assertEquals(6, schema.columns().size) @@ -488,7 +487,7 @@ internal class S3DataLakeUtilTest { minimumGenerationId = 1, syncId = 1, ) - val pipeline = ParquetMapperPipelineFactory().create(stream) + val pipeline = ParquetMapperPipelineTest().create(stream) val schema = icebergUtil.toIcebergSchema(stream = stream, pipeline = pipeline) assertEquals(emptySet(), schema.identifierFieldNames()) assertEquals(6, schema.columns().size) diff --git a/docs/integrations/destinations/s3-data-lake.md b/docs/integrations/destinations/s3-data-lake.md index 502424e315ba7..a250902c637ec 100644 --- a/docs/integrations/destinations/s3-data-lake.md +++ b/docs/integrations/destinations/s3-data-lake.md @@ -157,6 +157,7 @@ drop all table versions. | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------| +| 0.3.14 | 2025-02-14 | [\#53241](https://github.com/airbytehq/airbyte/pull/53241) | New CDK interface; perf improvements, skip initial record staging | | 0.3.13 | 2025-02-14 | [\#53697](https://github.com/airbytehq/airbyte/pull/53697) | Internal refactor | | 0.3.12 | 2025-02-12 | [\#53170](https://github.com/airbytehq/airbyte/pull/53170) | Improve documentation, tweak error handling of invalid schema evolution | | 0.3.11 | 2025-02-12 | [\#53216](https://github.com/airbytehq/airbyte/pull/53216) | Support arbitrary schema change in overwrite / truncate refresh / clear sync |