Skip to content

Commit 22df859

Browse files
authored
Properly wait for Kafka to commit messages, documentation, manual tests (#78)
1 parent 1445c0d commit 22df859

13 files changed

+293
-114
lines changed

build.sbt

+2
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ lazy val kafka: Project = (project in file("kafka"))
6060
slf4j,
6161
logback % Test,
6262
"io.github.embeddedkafka" %% "embedded-kafka" % "3.6.1" % Test,
63+
"org.apache.pekko" %% "pekko-connectors-kafka" % "1.0.0" % Test,
64+
"org.apache.pekko" %% "pekko-stream" % "1.0.1" % Test,
6365
scalaTest
6466
)
6567
)

doc/kafka.md

+17-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,23 @@ supervised {
4545
}
4646
```
4747

48-
To publish data and commit offsets of messages, basing on which the published data is computed:
48+
Quite often data to be published to a topic (`topic1`) is computed basing on data received from another topic
49+
(`topic2`). In such a case, it's possible to commit messages from `topic2`, after the messages to `topic1` are
50+
successfully published.
51+
52+
In order to do so, a `Source[SendPacket]` needs to be created. The definition of `SendPacket` is:
53+
54+
```scala mdoc:compile-only
55+
import org.apache.kafka.clients.producer.ProducerRecord
56+
import ox.kafka.ReceivedMessage
57+
58+
case class SendPacket[K, V](send: List[ProducerRecord[K, V]], commit: List[ReceivedMessage[_, _]])
59+
```
60+
61+
The `send` list contains the messages to be sent (each message is a Kafka `ProducerRecord`). The `commit` list contains
62+
the messages, basing on which the data to be sent was computed. These are the received messages, as produced by a
63+
`KafkaSource`. When committing, for each topic-partition that appears in the received messages, the maximum offset is
64+
computed. For example:
4965

5066
```scala mdoc:compile-only
5167
import ox.kafka.{ConsumerSettings, KafkaDrain, KafkaSource, ProducerSettings, SendPacket}

kafka/docker-tests/docker-compose.yml

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
version: '2'
2+
services:
3+
zookeeper:
4+
image: confluentinc/cp-zookeeper:latest
5+
environment:
6+
ZOOKEEPER_CLIENT_PORT: 2181
7+
ZOOKEEPER_TICK_TIME: 2000
8+
ports:
9+
- 22181:2181
10+
11+
kafka:
12+
image: confluentinc/cp-kafka:latest
13+
depends_on:
14+
- zookeeper
15+
ports:
16+
- 29092:29092
17+
environment:
18+
KAFKA_BROKER_ID: 1
19+
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
20+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
21+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
22+
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
23+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

kafka/src/main/scala/ox/kafka/KafkaConsumerActor.scala

+5-2
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,17 @@ object KafkaConsumerActor:
4242
case NonFatal(e) =>
4343
logger.error("Exception when polling for records in Kafka", e)
4444
results.error(e)
45+
c.error(e)
4546
false
46-
case KafkaConsumerRequest.Commit(offsets) =>
47+
case KafkaConsumerRequest.Commit(offsets, result) =>
4748
try
4849
consumer.commitSync(offsets.view.mapValues(o => new OffsetAndMetadata(o + 1)).toMap.asJava)
50+
result.send(())
4951
true
5052
catch
5153
case NonFatal(e) =>
5254
logger.error("Exception when committing offsets", e)
55+
result.error(e)
5356
c.error(e)
5457
false
5558
}
@@ -64,4 +67,4 @@ object KafkaConsumerActor:
6467
enum KafkaConsumerRequest[K, V]:
6568
case Subscribe(topics: Seq[String])
6669
case Poll(results: Sink[ConsumerRecords[K, V]])
67-
case Commit(offsets: Map[TopicPartition, Long])
70+
case Commit(offsets: Map[TopicPartition, Long], results: Sink[Unit])
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,10 @@
11
package ox.kafka
22

3-
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer, OffsetAndMetadata}
43
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
5-
import org.apache.kafka.common.TopicPartition
64
import org.slf4j.LoggerFactory
75
import ox.*
86
import ox.channels.*
97

10-
import java.util.concurrent.atomic.AtomicInteger
11-
import scala.annotation.tailrec
12-
import scala.collection.mutable
13-
import scala.concurrent.duration.*
148
import scala.jdk.CollectionConverters.*
159

1610
object KafkaDrain:
@@ -58,54 +52,7 @@ object KafkaDrain:
5852
* sent. Then, all `commit` messages (consumer records) up to their offsets are committed.
5953
*/
6054
def publishAndCommit[K, V](producer: KafkaProducer[K, V], closeWhenComplete: Boolean): Source[SendPacket[K, V]] => Unit = source =>
61-
val exceptions = Channel.unlimited[Throwable]
62-
val toCommit = Channel[SendPacket[_, _]](128)
63-
64-
try
65-
// starting a nested scope, so that the committer is interrupted when the main process ends
66-
scoped {
67-
// committer
68-
fork(tapException(doCommit(toCommit)) { e =>
69-
logger.error("Exception when committing offsets", e)
70-
exceptions.send(e)
71-
})
72-
73-
repeatWhile {
74-
select(exceptions.receiveClause, source.receiveClause) match
75-
case e: ChannelClosed.Error =>
76-
logger.debug(s"Stopping publishing: upstream closed due to an error ($e).")
77-
throw e.toThrowable
78-
case ChannelClosed.Done =>
79-
logger.debug(s"Stopping publishing: upstream done.")
80-
false
81-
case exceptions.Received(e) =>
82-
throw e
83-
case source.Received(packet) =>
84-
sendPacket(producer, packet, toCommit, exceptions)
85-
true
86-
}
87-
}
88-
finally
89-
if closeWhenComplete then
90-
logger.debug("Closing the Kafka producer")
91-
uninterruptible(producer.close())
92-
93-
private def sendPacket[K, V](
94-
producer: KafkaProducer[K, V],
95-
packet: SendPacket[K, V],
96-
toCommit: Sink[SendPacket[_, _]],
97-
exceptions: Sink[Throwable]
98-
): Unit =
99-
val leftToSend = new AtomicInteger(packet.send.size)
100-
packet.send.foreach { toSend =>
101-
producer.send(
102-
toSend,
103-
(_: RecordMetadata, exception: Exception) => {
104-
if exception == null
105-
then { if leftToSend.decrementAndGet() == 0 then toCommit.send(packet) }
106-
else
107-
logger.error("Exception when sending record", exception)
108-
exceptions.send(exception)
109-
}
110-
)
55+
supervised {
56+
import KafkaStage.*
57+
source.mapPublishAndCommit(producer, closeWhenComplete).drain()
11158
}

kafka/src/main/scala/ox/kafka/KafkaStage.scala

+85-48
Original file line numberDiff line numberDiff line change
@@ -14,24 +14,43 @@ object KafkaStage:
1414
private val logger = LoggerFactory.getLogger(classOf[KafkaStage.type])
1515

1616
extension [K, V](source: Source[ProducerRecord[K, V]])
17+
/** Publish the messages using a producer created with the given `settings`.
18+
*
19+
* @return
20+
* A stream of published records metadata, in the order in which the [[ProducerRecord]]s are received.
21+
*/
1722
def mapPublish(settings: ProducerSettings[K, V])(using StageCapacity, Ox): Source[RecordMetadata] =
1823
mapPublish(settings.toProducer, closeWhenComplete = true)
1924

25+
/** Publish the messages using the given `producer`. The producer is closed depending on the `closeWhenComplete` flag, after all
26+
* messages are published, or when an exception occurs.
27+
*
28+
* @return
29+
* A stream of published records metadata, in the order in which the [[ProducerRecord]]s are received.
30+
*/
2031
def mapPublish(producer: KafkaProducer[K, V], closeWhenComplete: Boolean)(using StageCapacity, Ox): Source[RecordMetadata] =
2132
source.mapAsView(r => SendPacket(List(r), Nil)).mapPublishAndCommit(producer, closeWhenComplete, commitOffsets = false)
2233

2334
extension [K, V](source: Source[SendPacket[K, V]])
24-
/** For each packet, first all messages (producer records) are sent. Then, all messages up to the offsets of the consumer messages are
25-
* committed. The metadata of the published records is sent downstream.
35+
/** For each packet, first all messages (producer records) from [[SendPacket.send]] are sent, using a producer created with the given
36+
* `producerSettings`. Then, all messages from [[SendPacket.commit]] are committed: for each topic-partition, up to the highest
37+
* observed offset.
38+
*
39+
* @return
40+
* A stream of published records metadata, in the order in which the [[SendPacket]]s are received.
2641
*/
2742
def mapPublishAndCommit(producerSettings: ProducerSettings[K, V])(using StageCapacity, Ox): Source[RecordMetadata] =
2843
mapPublishAndCommit(producerSettings.toProducer, closeWhenComplete = true)
2944

30-
/** For each packet, first all messages (producer records) are sent. Then, all messages up to the offsets of the consumer messages are
31-
* committed. The metadata of the published records is sent downstream.
45+
/** For each packet, first all messages (producer records) are sent, using the given `producer`. Then, all messages from
46+
* [[SendPacket.commit]] are committed: for each topic-partition, up to the highest observed offset.
47+
*
48+
* The producer is closed depending on the `closeWhenComplete` flag, after all messages are published, or when an exception occurs.
3249
*
3350
* @param producer
3451
* The producer that is used to send messages.
52+
* @return
53+
* A stream of published records metadata, in the order in which the [[SendPacket]]s are received.
3554
*/
3655
def mapPublishAndCommit(producer: KafkaProducer[K, V], closeWhenComplete: Boolean)(using StageCapacity, Ox): Source[RecordMetadata] =
3756
mapPublishAndCommit(producer, closeWhenComplete, commitOffsets = true)
@@ -40,29 +59,45 @@ object KafkaStage:
4059
StageCapacity,
4160
Ox
4261
): Source[RecordMetadata] =
62+
// source - the upstream from which packets are received
63+
64+
// the result, where metadata of published records is sent in the same order, as the received packets
4365
val c = StageCapacity.newChannel[RecordMetadata]
66+
// a helper channel to signal any exceptions that occur while publishing or committing offsets
4467
val exceptions = Channel.unlimited[Exception]
68+
// possible out-of-order metadata of the records published from `packet.send`
4569
val metadata = Channel[(Long, RecordMetadata)](128)
70+
// packets which are fully sent, and should be committed
4671
val toCommit = Channel[SendPacket[_, _]](128)
47-
72+
// used to reorder values received from `metadata` using the assigned sequence numbers
4873
val sendInSequence = SendInSequence(c)
4974

5075
fork {
5176
try
52-
// starting a nested scope, so that the committer is interrupted when the main process ends
77+
// starting a nested scope, so that the committer is interrupted when the main process ends (when there's an exception)
5378
scoped {
5479
// committer
55-
if commitOffsets then fork(tapException(doCommit(toCommit))(c.error))
80+
val commitDoneSource = if commitOffsets then Source.fromFork(fork(tapException(doCommit(toCommit))(c.error))) else Source.empty
5681

5782
repeatWhile {
5883
select(exceptions.receiveClause, metadata.receiveClause, source.receiveClause) match
59-
case ChannelClosed.Error(r) => c.error(r); false
60-
case ChannelClosed.Done => sendInSequence.drainFromThenDone(exceptions, metadata); false
84+
case ChannelClosed.Error(r) => c.error(r); false
85+
case ChannelClosed.Done =>
86+
// waiting until all records are sent and metadata forwarded to `c`
87+
sendInSequence.drainFrom(metadata, exceptions)
88+
// we now know that there won't be any more offsets sent to be committed - we can complete the channel
89+
toCommit.done()
90+
// waiting until the commit fork is done
91+
commitDoneSource.receive()
92+
// completing the downstream
93+
c.done()
94+
// and finally winding down this scope & fork
95+
false
6196
case exceptions.Received(e) => c.error(e); false
6297
case metadata.Received((s, m)) => sendInSequence.send(s, m); true
6398
case source.Received(packet) =>
6499
try
65-
sendPacket(producer, packet, sendInSequence, toCommit, exceptions, metadata)
100+
sendPacket(producer, packet, sendInSequence, toCommit, exceptions, metadata, commitOffsets)
66101
true
67102
catch
68103
case e: Exception =>
@@ -84,7 +119,8 @@ object KafkaStage:
84119
sendInSequence: SendInSequence[RecordMetadata],
85120
toCommit: Sink[SendPacket[_, _]],
86121
exceptions: Sink[Exception],
87-
metadata: Sink[(Long, RecordMetadata)]
122+
metadata: Sink[(Long, RecordMetadata)],
123+
commitOffsets: Boolean
88124
): Unit =
89125
val leftToSend = new AtomicInteger(packet.send.size)
90126
packet.send.foreach { toSend =>
@@ -94,47 +130,48 @@ object KafkaStage:
94130
(m: RecordMetadata, e: Exception) =>
95131
if e != null then exceptions.send(e)
96132
else {
133+
// sending commit request first, as when upstream `source` is done, we need to know that all commits are
134+
// scheduled in order to shut down properly
135+
if commitOffsets && leftToSend.decrementAndGet() == 0 then toCommit.send(packet)
97136
metadata.send((sequenceNo, m))
98-
if leftToSend.decrementAndGet() == 0 then toCommit.send(packet)
99137
}
100138
)
101139
}
102140

103-
/** Sends `T` elements to the given `c` sink, when elements with subsequence sequence numbers are available. */
104-
private class SendInSequence[T](c: Sink[T]):
105-
private var sequenceNoNext = 0L
106-
private var sequenceNoToSendNext = 0L
107-
private val toSend = mutable.SortedSet[(Long, T)]()(Ordering.by(_._1))
108-
109-
def nextSequenceNo: Long =
110-
val n = sequenceNoNext
111-
sequenceNoNext += 1
112-
n
113-
114-
def send(sequenceNo: Long, v: T): Unit =
115-
toSend.add((sequenceNo, v))
141+
/** Sends `T` elements to the given `c` sink, when elements with subsequent sequence numbers are available. Thread-unsafe. */
142+
private class SendInSequence[T](c: Sink[T]):
143+
private var sequenceNoNext = 0L
144+
private var sequenceNoToSendNext = 0L
145+
private val toSend = mutable.SortedSet[(Long, T)]()(Ordering.by(_._1))
146+
147+
def nextSequenceNo: Long =
148+
val n = sequenceNoNext
149+
sequenceNoNext += 1
150+
n
151+
152+
def send(sequenceNo: Long, v: T): Unit =
153+
toSend.add((sequenceNo, v))
154+
trySend()
155+
156+
def allSent: Boolean = sequenceNoNext == sequenceNoToSendNext
157+
158+
@tailrec
159+
private def trySend(): Unit = toSend.headOption match
160+
case Some((s, m)) if s == sequenceNoToSendNext =>
161+
toSend.remove((s, m))
162+
c.send(m)
163+
sequenceNoToSendNext += 1
116164
trySend()
165+
case _ => ()
117166

118-
def allSent: Boolean = sequenceNoNext == sequenceNoToSendNext
119-
120-
@tailrec
121-
private def trySend(): Unit = toSend.headOption match
122-
case Some((s, m)) if s == sequenceNoToSendNext =>
123-
toSend.remove((s, m))
124-
c.send(m)
125-
sequenceNoToSendNext += 1
126-
trySend()
127-
case _ => ()
128-
129-
@tailrec
130-
final def drainFromThenDone(
131-
exceptions: Source[Exception],
132-
incoming: Source[(Long, T)]
133-
): Unit =
134-
if allSent then c.done()
135-
else
136-
select(exceptions.receiveClause, incoming.receiveClause) match
137-
case ChannelClosed.Error(r) => c.error(r)
138-
case ChannelClosed.Done => throw new IllegalStateException()
139-
case exceptions.Received(e) => c.error(e)
140-
case incoming.Received((s, m)) => send(s, m); drainFromThenDone(exceptions, incoming)
167+
@tailrec
168+
final def drainFrom(
169+
incoming: Source[(Long, T)],
170+
exceptions: Source[Exception]
171+
): Unit =
172+
if !allSent then
173+
select(exceptions.receiveClause, incoming.receiveClause) match
174+
case ChannelClosed.Error(r) => c.error(r)
175+
case ChannelClosed.Done => throw new IllegalStateException()
176+
case exceptions.Received(e) => c.error(e)
177+
case incoming.Received((s, m)) => send(s, m); drainFrom(incoming, exceptions)

kafka/src/main/scala/ox/kafka/kafkaOffsetCommit.scala

+11-4
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,24 @@ import ox.channels.*
88
import scala.collection.mutable
99
import scala.concurrent.duration.*
1010

11-
private[kafka] def doCommit(packets: Source[SendPacket[_, _]])(using Ox) =
11+
private[kafka] def doCommit(packets: Source[SendPacket[_, _]])(using Ox): Unit =
1212
val commitInterval = 1.second
1313
val ticks = Source.tick(commitInterval)
1414
val toCommit = mutable.Map[TopicPartition, Long]()
1515
var consumer: Sink[KafkaConsumerRequest[_, _]] = null // assuming all packets come from the same consumer
16+
val commitDone = Channel[Unit]()
1617

17-
forever {
18-
select(ticks, packets).orThrow match
18+
repeatWhile {
19+
select(ticks, packets) match
20+
case ChannelClosed.Error(e) => throw e
21+
case ChannelClosed.Done => false
1922
case () =>
2023
if consumer != null && toCommit.nonEmpty then
21-
consumer.send(KafkaConsumerRequest.Commit(toCommit.toMap))
24+
consumer.send(KafkaConsumerRequest.Commit(toCommit.toMap, commitDone))
25+
// waiting for the commit to happen
26+
commitDone.receive()
2227
toCommit.clear()
28+
true
2329
case packet: SendPacket[_, _] =>
2430
packet.commit.foreach { receivedMessage =>
2531
if consumer == null then consumer = receivedMessage.consumer.asInstanceOf[Sink[KafkaConsumerRequest[_, _]]]
@@ -29,6 +35,7 @@ private[kafka] def doCommit(packets: Source[SendPacket[_, _]])(using Ox) =
2935
case None => Some(receivedMessage.offset)
3036
}
3137
}
38+
true
3239
}
3340

3441
case class SendPacket[K, V](send: List[ProducerRecord[K, V]], commit: List[ReceivedMessage[_, _]])

0 commit comments

Comments
 (0)