diff --git a/doc/integrations/kafka.md b/doc/integrations/kafka.md index 3809f0c0..5b3901b4 100644 --- a/doc/integrations/kafka.md +++ b/doc/integrations/kafka.md @@ -1,4 +1,4 @@ -# Kafka sources & drains +# Kafka flows Dependency: @@ -17,7 +17,8 @@ To read from a Kafka topic, use: import ox.kafka.{ConsumerSettings, KafkaFlow, ReceivedMessage} import ox.kafka.ConsumerSettings.AutoOffsetReset -val settings = ConsumerSettings.default("my_group").bootstrapServers("localhost:9092").autoOffsetReset(AutoOffsetReset.Earliest) +val settings = ConsumerSettings.default("my_group").bootstrapServers("localhost:9092") + .autoOffsetReset(AutoOffsetReset.Earliest) val topic = "my_topic" val source = KafkaFlow.subscribe(settings, topic) @@ -49,7 +50,9 @@ In order to do so, a `Flow[SendPacket]` needs to be created. The definition of ` import org.apache.kafka.clients.producer.ProducerRecord import ox.kafka.ReceivedMessage -case class SendPacket[K, V](send: List[ProducerRecord[K, V]], commit: List[ReceivedMessage[_, _]]) +case class SendPacket[K, V]( + send: List[ProducerRecord[K, V]], + commit: List[ReceivedMessage[_, _]]) ``` The `send` list contains the messages to be sent (each message is a Kafka `ProducerRecord`). The `commit` list contains @@ -63,7 +66,8 @@ import ox.kafka.ConsumerSettings.AutoOffsetReset import ox.pipe import org.apache.kafka.clients.producer.ProducerRecord -val consumerSettings = ConsumerSettings.default("my_group").bootstrapServers("localhost:9092").autoOffsetReset(AutoOffsetReset.Earliest) +val consumerSettings = ConsumerSettings.default("my_group") + .bootstrapServers("localhost:9092").autoOffsetReset(AutoOffsetReset.Earliest) val producerSettings = ProducerSettings.default.bootstrapServers("localhost:9092") val sourceTopic = "source_topic" val destTopic = "dest_topic" @@ -71,7 +75,8 @@ val destTopic = "dest_topic" KafkaFlow .subscribe(consumerSettings, sourceTopic) .map(in => (in.value.toLong * 2, in)) - .map((value, original) => SendPacket(ProducerRecord[String, String](destTopic, value.toString), original)) + .map((value, original) => + SendPacket(ProducerRecord[String, String](destTopic, value.toString), original)) .pipe(KafkaDrain.runPublishAndCommit(producerSettings)) ``` diff --git a/kafka/src/main/scala/ox/kafka/KafkaDrain.scala b/kafka/src/main/scala/ox/kafka/KafkaDrain.scala index 978f16c6..f38d1ce5 100644 --- a/kafka/src/main/scala/ox/kafka/KafkaDrain.scala +++ b/kafka/src/main/scala/ox/kafka/KafkaDrain.scala @@ -10,9 +10,15 @@ import ox.flow.Flow object KafkaDrain: private val logger = LoggerFactory.getLogger(classOf[KafkaDrain.type]) + /** @return + * A drain, which sends all records emitted by the provided [[Flow]]. + */ def runPublish[K, V](settings: ProducerSettings[K, V])(using BufferCapacity): Flow[ProducerRecord[K, V]] => Unit = flow => runPublish(settings.toProducer, closeWhenComplete = true)(flow) + /** @return + * A drain, which sends all records emitted by the provided [[Flow]]. + */ def runPublish[K, V](producer: KafkaProducer[K, V], closeWhenComplete: Boolean)(using BufferCapacity ): Flow[ProducerRecord[K, V]] => Unit = @@ -42,9 +48,11 @@ object KafkaDrain: finally if closeWhenComplete then uninterruptible(producer.close()) end try + end runPublish + /** @return - * A drain, which consumes all packets from the provided `Source`.. For each packet, first all `send` messages (producer records) are - * sent. Then, all `commit` messages (consumer records) up to their offsets are committed. + * A drain, which consumes all packets emitted by the provided [[Flow]]. For each packet, first all `send` messages (producer records) + * are sent. Then, all `commit` messages (consumer records) up to their offsets are committed. */ def runPublishAndCommit[K, V](producerSettings: ProducerSettings[K, V])(using BufferCapacity): Flow[SendPacket[K, V]] => Unit = flow => runPublishAndCommit(producerSettings.toProducer, closeWhenComplete = true)(flow) @@ -52,8 +60,8 @@ object KafkaDrain: /** @param producer * The producer that is used to send messages. * @return - * A drain, which consumes all packets from the provided `Source`.. For each packet, first all `send` messages (producer records) are - * sent. Then, all `commit` messages (consumer records) up to their offsets are committed. + * A drain, which consumes all packets emitted by the provided [[Flow]]. For each packet, first all `send` messages (producer records) + * are sent. Then, all `commit` messages (consumer records) up to their offsets are committed. */ def runPublishAndCommit[K, V](producer: KafkaProducer[K, V], closeWhenComplete: Boolean)(using BufferCapacity diff --git a/kafka/src/main/scala/ox/kafka/KafkaStage.scala b/kafka/src/main/scala/ox/kafka/KafkaStage.scala index 31172861..aef8cd10 100644 --- a/kafka/src/main/scala/ox/kafka/KafkaStage.scala +++ b/kafka/src/main/scala/ox/kafka/KafkaStage.scala @@ -68,7 +68,9 @@ object KafkaStage: // possible out-of-order metadata of the records published from `packet.send` val metadata = Channel.unlimited[(Long, RecordMetadata)] // packets which are fully sent, and should be committed - val toCommit = BufferCapacity.newChannel[SendPacket[_, _]] + // using an unlimited buffer so that the I/O thread doesn't get blocked in producer.send callbacks; backpressure is provided + // by creating a buffered channel in `flow.runToChannel()` below + val toCommit = Channel.unlimited[SendPacket[_, _]] // used to reorder values received from `metadata` using the assigned sequence numbers val sendInSequence = SendInSequence(emit) @@ -129,6 +131,7 @@ object KafkaStage: val leftToSend = new AtomicInteger(packet.send.size) packet.send.foreach { toSend => val sequenceNo = sendInSequence.nextSequenceNo + // this will block if Kafka's buffers are full, thus limting the number of packets that are in-flight (waiting to be sent) producer.send( toSend, (m: RecordMetadata, e: Exception) =>