Skip to content

Commit

Permalink
Load-CDK/Destination-S3DataLake: DirectLoader (no spill2disk, partiti…
Browse files Browse the repository at this point in the history
…oning) (#53241)
  • Loading branch information
johnny-schmidt authored Feb 25, 2025
1 parent 91ccf4e commit b88b37b
Show file tree
Hide file tree
Showing 54 changed files with 1,870 additions and 239 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ data class DestinationCatalog(val streams: List<DestinationStream> = emptyList()
?: throw IllegalArgumentException("Stream not found: namespace=$namespace, name=$name")
}

fun getStream(descriptor: DestinationStream.Descriptor): DestinationStream {
return byDescriptor[descriptor]
?: throw IllegalArgumentException("Stream not found: $descriptor")
}

fun asProtocolObject(): ConfiguredAirbyteCatalog =
ConfiguredAirbyteCatalog().withStreams(streams.map { it.asProtocolObject() })

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,17 @@ import io.airbyte.cdk.load.command.DestinationConfiguration
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.message.BatchEnvelope
import io.airbyte.cdk.load.message.ChannelMessageQueue
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
import io.airbyte.cdk.load.message.MultiProducerChannel
import io.airbyte.cdk.load.message.PartitionedQueue
import io.airbyte.cdk.load.message.PipelineEvent
import io.airbyte.cdk.load.message.StreamKey
import io.airbyte.cdk.load.pipeline.BatchUpdate
import io.airbyte.cdk.load.state.ReservationManager
import io.airbyte.cdk.load.state.Reserved
import io.airbyte.cdk.load.task.implementor.FileAggregateMessage
import io.airbyte.cdk.load.task.implementor.FileTransferQueueMessage
import io.airbyte.cdk.load.write.LoadStrategy
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Value
Expand Down Expand Up @@ -94,5 +101,37 @@ class SyncBeanFactory {

@Singleton
@Named("openStreamQueue")
class OpenStreamQueue : ChannelMessageQueue<DestinationStream>()
class OpenStreamQueue : ChannelMessageQueue<DestinationStream>(Channel(Channel.UNLIMITED))

/**
* If the client uses a new-style LoadStrategy, then we need to checkpoint by checkpoint id
* instead of record index.
*/
@Singleton
@Named("checkpointById")
fun isCheckpointById(loadStrategy: LoadStrategy? = null): Boolean = loadStrategy != null

/**
* A single record queue for the whole sync, containing all streams, optionally partitioned by a
* configurable number of partitions. Number of partitions is controlled by the specified
* LoadStrategy, if any.
*/
@Singleton
@Named("recordQueue")
fun recordQueue(
loadStrategy: LoadStrategy? = null,
): PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>> {
return PartitionedQueue(
Array(loadStrategy?.inputPartitions ?: 1) {
ChannelMessageQueue(Channel(Channel.UNLIMITED))
}
)
}

/** A queue for updating batch states, which is not partitioned. */
@Singleton
@Named("batchStateUpdateQueue")
fun batchStateUpdateQueue(): ChannelMessageQueue<BatchUpdate> {
return ChannelMessageQueue(Channel(100))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,17 @@ interface Batch {
PROCESSED,
STAGED,
PERSISTED,
COMPLETE
COMPLETE;

fun isPersisted(): Boolean =
when (this) {
PERSISTED,
COMPLETE -> true
else -> false
}
}

fun isPersisted(): Boolean =
when (state) {
State.PERSISTED,
State.COMPLETE -> true
else -> false
}
fun isPersisted(): Boolean = state.isPersisted()

val state: State

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ data class DestinationRecord(
Meta(
message.record.meta?.changes?.map { Meta.Change(it.field, it.change, it.reason) }
?: emptyList()
)
),
serialized.length.toLong()
)
}
}
Expand All @@ -159,6 +160,7 @@ data class DestinationRecordAirbyteValue(
val data: AirbyteValue,
val emittedAtMs: Long,
val meta: Meta?,
val serializedSizeBytes: Long = 0L
)

data class DestinationFile(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import io.airbyte.cdk.load.state.Reserved
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
import java.util.concurrent.ConcurrentHashMap
import kotlinx.coroutines.channels.Channel

interface Sized {
val sizeBytes: Long
Expand Down Expand Up @@ -52,7 +53,8 @@ data class StreamFlushEvent(
override val sizeBytes: Long = 0L
}

class DestinationStreamEventQueue : ChannelMessageQueue<Reserved<DestinationStreamEvent>>()
class DestinationStreamEventQueue :
ChannelMessageQueue<Reserved<DestinationStreamEvent>>(Channel(Channel.UNLIMITED))

/**
* A supplier of message queues to which ([ReservationManager.reserve]'d) @ [DestinationStreamEvent]
Expand Down Expand Up @@ -97,4 +99,5 @@ data class GlobalCheckpointWrapped(
*/
@Singleton
@Secondary
class CheckpointMessageQueue : ChannelMessageQueue<Reserved<CheckpointMessageWrapped>>()
class CheckpointMessageQueue :
ChannelMessageQueue<Reserved<CheckpointMessageWrapped>>(Channel(Channel.UNLIMITED))
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.receiveAsFlow

interface QueueReader<T> {
suspend fun consume(): Flow<T>
fun consume(): Flow<T>
suspend fun poll(): T?
}

Expand All @@ -23,12 +23,11 @@ interface QueueWriter<T> : CloseableCoroutine {

interface MessageQueue<T> : QueueReader<T>, QueueWriter<T>

abstract class ChannelMessageQueue<T> : MessageQueue<T> {
open val channel = Channel<T>(Channel.UNLIMITED)
open class ChannelMessageQueue<T>(val channel: Channel<T>) : MessageQueue<T> {
private val isClosed = AtomicBoolean(false)

override suspend fun publish(message: T) = channel.send(message)
override suspend fun consume(): Flow<T> = channel.receiveAsFlow()
override fun consume(): Flow<T> = channel.receiveAsFlow()
override suspend fun poll(): T? = channel.tryReceive().getOrNull()
override suspend fun close() {
if (isClosed.setOnce()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import kotlinx.coroutines.channels.Channel
*/
class MultiProducerChannel<T>(
producerCount: Long,
override val channel: Channel<T>,
channel: Channel<T>,
private val name: String,
) : ChannelMessageQueue<T>() {
) : ChannelMessageQueue<T>(channel = channel) {
private val log = KotlinLogging.logger {}
private val initializedProducerCount = producerCount
private val producerCount = AtomicLong(producerCount)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.message

import io.airbyte.cdk.load.util.CloseableCoroutine
import kotlinx.coroutines.flow.Flow

class PartitionedQueue<T>(private val queues: Array<MessageQueue<T>>) : CloseableCoroutine {
val partitions = queues.size

fun consume(partition: Int): Flow<T> {
if (partition < 0 || partition >= queues.size) {
throw IllegalArgumentException("Invalid partition: $partition")
}
return queues[partition].consume()
}

suspend fun publish(value: T, partition: Int) {
if (partition < 0 || partition >= queues.size) {
throw IllegalArgumentException("Invalid partition: $partition")
}
queues[partition].publish(value)
}

suspend fun broadcast(value: T) = queues.forEach { it.publish(value) }

override suspend fun close() {
queues.forEach { it.close() }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.message

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.state.CheckpointId

/** Used internally by the CDK to pass messages between steps in the loader pipeline. */
sealed interface PipelineEvent<K : WithStream, T>

class PipelineMessage<K : WithStream, T>(
val checkpointCounts: Map<CheckpointId, Long>,
val key: K,
val value: T
) : PipelineEvent<K, T>

/**
* We send the end message on the stream and not the key, because there's no way to partition an
* empty message.
*/
class PipelineEndOfStream<K : WithStream, T>(val stream: DestinationStream.Descriptor) :
PipelineEvent<K, T>
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.message

/**
* Used internally by the CDK to implement Loaders. It is added to outputs of
* [io.airbyte.cdk.load.pipeline.BatchAccumulator] that can ack or complete record batches. This is
* done *when stitching the dev interface to the pipeline*, so the dev does not have to think about
* internal state.
*/
interface WithBatchState {
val state: Batch.State
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.message

import io.airbyte.cdk.load.command.DestinationStream

/**
* Used internally by the CDK to keep track of streams while still allowing for partitioning on key.
*/
interface WithStream {
val stream: DestinationStream.Descriptor
}

/** The default key: partitioned by stream. */
data class StreamKey(override val stream: DestinationStream.Descriptor) : WithStream
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.pipeline

import io.airbyte.cdk.load.message.WithStream

/**
* [BatchAccumulator] is used internally by the CDK to implement RecordLoaders. Connector devs
* should never need to implement this interface.
*/
interface BatchAccumulator<S, K : WithStream, T, U> {
fun start(key: K, part: Int): S
fun accept(record: T, state: S): Pair<S, U?>
fun finish(state: S): U
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.pipeline

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.load.state.CheckpointId

/** Used internally by the CDK to track record ranges to ack. */
sealed interface BatchUpdate {
val stream: DestinationStream.Descriptor
}

data class BatchStateUpdate(
override val stream: DestinationStream.Descriptor,
val checkpointCounts: Map<CheckpointId, Long>,
val state: Batch.State,
) : BatchUpdate

data class BatchEndOfStream(
override val stream: DestinationStream.Descriptor,
) : BatchUpdate
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.pipeline

import io.airbyte.cdk.load.write.DirectLoaderFactory
import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton

/**
* Used internally by the CDK to implement the DirectLoader.
*
* Creates a single pipeline step reading from a (possibly partitioned) record stream. Batch updates
* are written to the batchStateUpdateQueue whenever the loader returns
*/
@Singleton
@Requires(bean = DirectLoaderFactory::class)
class DirectLoadPipeline(val pipelineStep: DirectLoadPipelineStep<*>) :
LoadPipeline(listOf(pipelineStep))
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.pipeline

import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
import io.airbyte.cdk.load.message.PartitionedQueue
import io.airbyte.cdk.load.message.PipelineEvent
import io.airbyte.cdk.load.message.QueueWriter
import io.airbyte.cdk.load.message.StreamKey
import io.airbyte.cdk.load.state.Reserved
import io.airbyte.cdk.load.task.internal.LoadPipelineStepTask
import io.airbyte.cdk.load.write.DirectLoader
import io.airbyte.cdk.load.write.DirectLoaderFactory
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Requires
import io.micronaut.context.annotation.Value
import jakarta.inject.Named
import jakarta.inject.Singleton

@Singleton
@Requires(bean = DirectLoaderFactory::class)
class DirectLoadPipelineStep<S : DirectLoader>(
val accumulator: DirectLoadRecordAccumulator<S, StreamKey>,
@Named("recordQueue")
val inputQueue:
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>>,
@Named("batchStateUpdateQueue") val batchQueue: QueueWriter<BatchUpdate>,
@Value("\${airbyte.destination.core.record-batch-size-override:null}")
val batchSizeOverride: Long? = null,
val directLoaderFactory: DirectLoaderFactory<S>,
) : LoadPipelineStep {
private val log = KotlinLogging.logger {}

override val numWorkers: Int = directLoaderFactory.inputPartitions

override fun taskForPartition(partition: Int): LoadPipelineStepTask<*, *, *, *, *> {
log.info { "Creating DirectLoad pipeline step task for partition $partition" }
return LoadPipelineStepTask(
accumulator,
inputQueue.consume(partition),
batchUpdateQueue = batchQueue,
outputPartitioner = null,
outputQueue = null as PartitionedQueue<PipelineEvent<StreamKey, DirectLoadAccResult>>?,
batchSizeOverride?.let { RecordCountFlushStrategy(it) },
partition
)
}
}
Loading

0 comments on commit b88b37b

Please sign in to comment.