Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka: docs, comments, toCommit channel unlimited #243

Merged
merged 2 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions doc/integrations/kafka.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Kafka sources & drains
# Kafka flows

Dependency:

Expand All @@ -17,7 +17,8 @@ To read from a Kafka topic, use:
import ox.kafka.{ConsumerSettings, KafkaFlow, ReceivedMessage}
import ox.kafka.ConsumerSettings.AutoOffsetReset

val settings = ConsumerSettings.default("my_group").bootstrapServers("localhost:9092").autoOffsetReset(AutoOffsetReset.Earliest)
val settings = ConsumerSettings.default("my_group").bootstrapServers("localhost:9092")
.autoOffsetReset(AutoOffsetReset.Earliest)
val topic = "my_topic"

val source = KafkaFlow.subscribe(settings, topic)
Expand Down Expand Up @@ -49,7 +50,9 @@ In order to do so, a `Flow[SendPacket]` needs to be created. The definition of `
import org.apache.kafka.clients.producer.ProducerRecord
import ox.kafka.ReceivedMessage

case class SendPacket[K, V](send: List[ProducerRecord[K, V]], commit: List[ReceivedMessage[_, _]])
case class SendPacket[K, V](
send: List[ProducerRecord[K, V]],
commit: List[ReceivedMessage[_, _]])
```

The `send` list contains the messages to be sent (each message is a Kafka `ProducerRecord`). The `commit` list contains
Expand All @@ -63,15 +66,17 @@ import ox.kafka.ConsumerSettings.AutoOffsetReset
import ox.pipe
import org.apache.kafka.clients.producer.ProducerRecord

val consumerSettings = ConsumerSettings.default("my_group").bootstrapServers("localhost:9092").autoOffsetReset(AutoOffsetReset.Earliest)
val consumerSettings = ConsumerSettings.default("my_group")
.bootstrapServers("localhost:9092").autoOffsetReset(AutoOffsetReset.Earliest)
val producerSettings = ProducerSettings.default.bootstrapServers("localhost:9092")
val sourceTopic = "source_topic"
val destTopic = "dest_topic"

KafkaFlow
.subscribe(consumerSettings, sourceTopic)
.map(in => (in.value.toLong * 2, in))
.map((value, original) => SendPacket(ProducerRecord[String, String](destTopic, value.toString), original))
.map((value, original) =>
SendPacket(ProducerRecord[String, String](destTopic, value.toString), original))
.pipe(KafkaDrain.runPublishAndCommit(producerSettings))
```

Expand Down
16 changes: 12 additions & 4 deletions kafka/src/main/scala/ox/kafka/KafkaDrain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,15 @@ import ox.flow.Flow
object KafkaDrain:
private val logger = LoggerFactory.getLogger(classOf[KafkaDrain.type])

/** @return
* A drain, which sends all records emitted by the provided [[Flow]].
*/
def runPublish[K, V](settings: ProducerSettings[K, V])(using BufferCapacity): Flow[ProducerRecord[K, V]] => Unit = flow =>
runPublish(settings.toProducer, closeWhenComplete = true)(flow)

/** @return
* A drain, which sends all records emitted by the provided [[Flow]].
*/
def runPublish[K, V](producer: KafkaProducer[K, V], closeWhenComplete: Boolean)(using
BufferCapacity
): Flow[ProducerRecord[K, V]] => Unit =
Expand Down Expand Up @@ -42,18 +48,20 @@ object KafkaDrain:
finally
if closeWhenComplete then uninterruptible(producer.close())
end try
end runPublish

/** @return
* A drain, which consumes all packets from the provided `Source`.. For each packet, first all `send` messages (producer records) are
* sent. Then, all `commit` messages (consumer records) up to their offsets are committed.
* A drain, which consumes all packets emitted by the provided [[Flow]]. For each packet, first all `send` messages (producer records)
* are sent. Then, all `commit` messages (consumer records) up to their offsets are committed.
*/
def runPublishAndCommit[K, V](producerSettings: ProducerSettings[K, V])(using BufferCapacity): Flow[SendPacket[K, V]] => Unit =
flow => runPublishAndCommit(producerSettings.toProducer, closeWhenComplete = true)(flow)

/** @param producer
* The producer that is used to send messages.
* @return
* A drain, which consumes all packets from the provided `Source`.. For each packet, first all `send` messages (producer records) are
* sent. Then, all `commit` messages (consumer records) up to their offsets are committed.
* A drain, which consumes all packets emitted by the provided [[Flow]]. For each packet, first all `send` messages (producer records)
* are sent. Then, all `commit` messages (consumer records) up to their offsets are committed.
*/
def runPublishAndCommit[K, V](producer: KafkaProducer[K, V], closeWhenComplete: Boolean)(using
BufferCapacity
Expand Down
5 changes: 4 additions & 1 deletion kafka/src/main/scala/ox/kafka/KafkaStage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ object KafkaStage:
// possible out-of-order metadata of the records published from `packet.send`
val metadata = Channel.unlimited[(Long, RecordMetadata)]
// packets which are fully sent, and should be committed
val toCommit = BufferCapacity.newChannel[SendPacket[_, _]]
// using an unlimited buffer so that the I/O thread doesn't get blocked in producer.send callbacks; backpressure is provided
// by creating a buffered channel in `flow.runToChannel()` below
val toCommit = Channel.unlimited[SendPacket[_, _]]

// used to reorder values received from `metadata` using the assigned sequence numbers
val sendInSequence = SendInSequence(emit)
Expand Down Expand Up @@ -129,6 +131,7 @@ object KafkaStage:
val leftToSend = new AtomicInteger(packet.send.size)
packet.send.foreach { toSend =>
val sequenceNo = sendInSequence.nextSequenceNo
// this will block if Kafka's buffers are full, thus limting the number of packets that are in-flight (waiting to be sent)
producer.send(
toSend,
(m: RecordMetadata, e: Exception) =>
Expand Down
Loading