Skip to content

Commit fa4bad4

Browse files
authored
Kafka: docs, comments, toCommit channel unlimited (#243)
1 parent b88cd3f commit fa4bad4

File tree

3 files changed

+26
-10
lines changed

3 files changed

+26
-10
lines changed

doc/integrations/kafka.md

+10-5
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Kafka sources & drains
1+
# Kafka flows
22

33
Dependency:
44

@@ -17,7 +17,8 @@ To read from a Kafka topic, use:
1717
import ox.kafka.{ConsumerSettings, KafkaFlow, ReceivedMessage}
1818
import ox.kafka.ConsumerSettings.AutoOffsetReset
1919

20-
val settings = ConsumerSettings.default("my_group").bootstrapServers("localhost:9092").autoOffsetReset(AutoOffsetReset.Earliest)
20+
val settings = ConsumerSettings.default("my_group").bootstrapServers("localhost:9092")
21+
.autoOffsetReset(AutoOffsetReset.Earliest)
2122
val topic = "my_topic"
2223

2324
val source = KafkaFlow.subscribe(settings, topic)
@@ -49,7 +50,9 @@ In order to do so, a `Flow[SendPacket]` needs to be created. The definition of `
4950
import org.apache.kafka.clients.producer.ProducerRecord
5051
import ox.kafka.ReceivedMessage
5152

52-
case class SendPacket[K, V](send: List[ProducerRecord[K, V]], commit: List[ReceivedMessage[_, _]])
53+
case class SendPacket[K, V](
54+
send: List[ProducerRecord[K, V]],
55+
commit: List[ReceivedMessage[_, _]])
5356
```
5457

5558
The `send` list contains the messages to be sent (each message is a Kafka `ProducerRecord`). The `commit` list contains
@@ -63,15 +66,17 @@ import ox.kafka.ConsumerSettings.AutoOffsetReset
6366
import ox.pipe
6467
import org.apache.kafka.clients.producer.ProducerRecord
6568

66-
val consumerSettings = ConsumerSettings.default("my_group").bootstrapServers("localhost:9092").autoOffsetReset(AutoOffsetReset.Earliest)
69+
val consumerSettings = ConsumerSettings.default("my_group")
70+
.bootstrapServers("localhost:9092").autoOffsetReset(AutoOffsetReset.Earliest)
6771
val producerSettings = ProducerSettings.default.bootstrapServers("localhost:9092")
6872
val sourceTopic = "source_topic"
6973
val destTopic = "dest_topic"
7074

7175
KafkaFlow
7276
.subscribe(consumerSettings, sourceTopic)
7377
.map(in => (in.value.toLong * 2, in))
74-
.map((value, original) => SendPacket(ProducerRecord[String, String](destTopic, value.toString), original))
78+
.map((value, original) =>
79+
SendPacket(ProducerRecord[String, String](destTopic, value.toString), original))
7580
.pipe(KafkaDrain.runPublishAndCommit(producerSettings))
7681
```
7782

kafka/src/main/scala/ox/kafka/KafkaDrain.scala

+12-4
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,15 @@ import ox.flow.Flow
1010
object KafkaDrain:
1111
private val logger = LoggerFactory.getLogger(classOf[KafkaDrain.type])
1212

13+
/** @return
14+
* A drain, which sends all records emitted by the provided [[Flow]].
15+
*/
1316
def runPublish[K, V](settings: ProducerSettings[K, V])(using BufferCapacity): Flow[ProducerRecord[K, V]] => Unit = flow =>
1417
runPublish(settings.toProducer, closeWhenComplete = true)(flow)
1518

19+
/** @return
20+
* A drain, which sends all records emitted by the provided [[Flow]].
21+
*/
1622
def runPublish[K, V](producer: KafkaProducer[K, V], closeWhenComplete: Boolean)(using
1723
BufferCapacity
1824
): Flow[ProducerRecord[K, V]] => Unit =
@@ -42,18 +48,20 @@ object KafkaDrain:
4248
finally
4349
if closeWhenComplete then uninterruptible(producer.close())
4450
end try
51+
end runPublish
52+
4553
/** @return
46-
* A drain, which consumes all packets from the provided `Source`.. For each packet, first all `send` messages (producer records) are
47-
* sent. Then, all `commit` messages (consumer records) up to their offsets are committed.
54+
* A drain, which consumes all packets emitted by the provided [[Flow]]. For each packet, first all `send` messages (producer records)
55+
* are sent. Then, all `commit` messages (consumer records) up to their offsets are committed.
4856
*/
4957
def runPublishAndCommit[K, V](producerSettings: ProducerSettings[K, V])(using BufferCapacity): Flow[SendPacket[K, V]] => Unit =
5058
flow => runPublishAndCommit(producerSettings.toProducer, closeWhenComplete = true)(flow)
5159

5260
/** @param producer
5361
* The producer that is used to send messages.
5462
* @return
55-
* A drain, which consumes all packets from the provided `Source`.. For each packet, first all `send` messages (producer records) are
56-
* sent. Then, all `commit` messages (consumer records) up to their offsets are committed.
63+
* A drain, which consumes all packets emitted by the provided [[Flow]]. For each packet, first all `send` messages (producer records)
64+
* are sent. Then, all `commit` messages (consumer records) up to their offsets are committed.
5765
*/
5866
def runPublishAndCommit[K, V](producer: KafkaProducer[K, V], closeWhenComplete: Boolean)(using
5967
BufferCapacity

kafka/src/main/scala/ox/kafka/KafkaStage.scala

+4-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ object KafkaStage:
6868
// possible out-of-order metadata of the records published from `packet.send`
6969
val metadata = Channel.unlimited[(Long, RecordMetadata)]
7070
// packets which are fully sent, and should be committed
71-
val toCommit = BufferCapacity.newChannel[SendPacket[_, _]]
71+
// using an unlimited buffer so that the I/O thread doesn't get blocked in producer.send callbacks; backpressure is provided
72+
// by creating a buffered channel in `flow.runToChannel()` below
73+
val toCommit = Channel.unlimited[SendPacket[_, _]]
7274

7375
// used to reorder values received from `metadata` using the assigned sequence numbers
7476
val sendInSequence = SendInSequence(emit)
@@ -129,6 +131,7 @@ object KafkaStage:
129131
val leftToSend = new AtomicInteger(packet.send.size)
130132
packet.send.foreach { toSend =>
131133
val sequenceNo = sendInSequence.nextSequenceNo
134+
// this will block if Kafka's buffers are full, thus limting the number of packets that are in-flight (waiting to be sent)
132135
producer.send(
133136
toSend,
134137
(m: RecordMetadata, e: Exception) =>

0 commit comments

Comments
 (0)