Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly wait for Kafka to commit messages, documentation, manual tests #78

Merged
merged 5 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ lazy val kafka: Project = (project in file("kafka"))
slf4j,
logback % Test,
"io.github.embeddedkafka" %% "embedded-kafka" % "3.6.1" % Test,
"org.apache.pekko" %% "pekko-connectors-kafka" % "1.0.0" % Test,
"org.apache.pekko" %% "pekko-stream" % "1.0.1" % Test,
scalaTest
)
)
Expand Down
18 changes: 17 additions & 1 deletion doc/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,23 @@ supervised {
}
```

To publish data and commit offsets of messages, basing on which the published data is computed:
Quite often data to be published to a topic (`topic1`) is computed basing on data received from another topic
(`topic2`). In such a case, it's possible to commit messages from `topic2`, after the messages to `topic1` are
successfully published.

In order to do so, a `Source[SendPacket]` needs to be created. The definition of `SendPacket` is:

```scala mdoc:compile-only
import org.apache.kafka.clients.producer.ProducerRecord
import ox.kafka.ReceivedMessage

case class SendPacket[K, V](send: List[ProducerRecord[K, V]], commit: List[ReceivedMessage[_, _]])
```

The `send` list contains the messages to be sent (each message is a Kafka `ProducerRecord`). The `commit` list contains
the messages, basing on which the data to be sent was computed. These are the received messages, as produced by a
`KafkaSource`. When committing, for each topic-partition that appears in the received messages, the maximum offset is
computed. For example:

```scala mdoc:compile-only
import ox.kafka.{ConsumerSettings, KafkaDrain, KafkaSource, ProducerSettings, SendPacket}
Expand Down
23 changes: 23 additions & 0 deletions kafka/docker-tests/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181

kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
7 changes: 5 additions & 2 deletions kafka/src/main/scala/ox/kafka/KafkaConsumerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,17 @@ object KafkaConsumerActor:
case NonFatal(e) =>
logger.error("Exception when polling for records in Kafka", e)
results.error(e)
c.error(e)
false
case KafkaConsumerRequest.Commit(offsets) =>
case KafkaConsumerRequest.Commit(offsets, result) =>
try
consumer.commitSync(offsets.view.mapValues(o => new OffsetAndMetadata(o + 1)).toMap.asJava)
result.send(())
true
catch
case NonFatal(e) =>
logger.error("Exception when committing offsets", e)
result.error(e)
c.error(e)
false
}
Expand All @@ -64,4 +67,4 @@ object KafkaConsumerActor:
enum KafkaConsumerRequest[K, V]:
case Subscribe(topics: Seq[String])
case Poll(results: Sink[ConsumerRecords[K, V]])
case Commit(offsets: Map[TopicPartition, Long])
case Commit(offsets: Map[TopicPartition, Long], results: Sink[Unit])
59 changes: 3 additions & 56 deletions kafka/src/main/scala/ox/kafka/KafkaDrain.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
package ox.kafka

import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.TopicPartition
import org.slf4j.LoggerFactory
import ox.*
import ox.channels.*

import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.tailrec
import scala.collection.mutable
import scala.concurrent.duration.*
import scala.jdk.CollectionConverters.*

object KafkaDrain:
Expand Down Expand Up @@ -58,54 +52,7 @@ object KafkaDrain:
* sent. Then, all `commit` messages (consumer records) up to their offsets are committed.
*/
def publishAndCommit[K, V](producer: KafkaProducer[K, V], closeWhenComplete: Boolean): Source[SendPacket[K, V]] => Unit = source =>
val exceptions = Channel.unlimited[Throwable]
val toCommit = Channel[SendPacket[_, _]](128)

try
// starting a nested scope, so that the committer is interrupted when the main process ends
scoped {
// committer
fork(tapException(doCommit(toCommit)) { e =>
logger.error("Exception when committing offsets", e)
exceptions.send(e)
})

repeatWhile {
select(exceptions.receiveClause, source.receiveClause) match
case e: ChannelClosed.Error =>
logger.debug(s"Stopping publishing: upstream closed due to an error ($e).")
throw e.toThrowable
case ChannelClosed.Done =>
logger.debug(s"Stopping publishing: upstream done.")
false
case exceptions.Received(e) =>
throw e
case source.Received(packet) =>
sendPacket(producer, packet, toCommit, exceptions)
true
}
}
finally
if closeWhenComplete then
logger.debug("Closing the Kafka producer")
uninterruptible(producer.close())

private def sendPacket[K, V](
producer: KafkaProducer[K, V],
packet: SendPacket[K, V],
toCommit: Sink[SendPacket[_, _]],
exceptions: Sink[Throwable]
): Unit =
val leftToSend = new AtomicInteger(packet.send.size)
packet.send.foreach { toSend =>
producer.send(
toSend,
(_: RecordMetadata, exception: Exception) => {
if exception == null
then { if leftToSend.decrementAndGet() == 0 then toCommit.send(packet) }
else
logger.error("Exception when sending record", exception)
exceptions.send(exception)
}
)
supervised {
import KafkaStage.*
source.mapPublishAndCommit(producer, closeWhenComplete).drain()
}
133 changes: 85 additions & 48 deletions kafka/src/main/scala/ox/kafka/KafkaStage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,43 @@ object KafkaStage:
private val logger = LoggerFactory.getLogger(classOf[KafkaStage.type])

extension [K, V](source: Source[ProducerRecord[K, V]])
/** Publish the messages using a producer created with the given `settings`.
*
* @return
* A stream of published records metadata, in the order in which the [[ProducerRecord]]s are received.
*/
def mapPublish(settings: ProducerSettings[K, V])(using StageCapacity, Ox): Source[RecordMetadata] =
mapPublish(settings.toProducer, closeWhenComplete = true)

/** Publish the messages using the given `producer`. The producer is closed depending on the `closeWhenComplete` flag, after all
* messages are published, or when an exception occurs.
*
* @return
* A stream of published records metadata, in the order in which the [[ProducerRecord]]s are received.
*/
def mapPublish(producer: KafkaProducer[K, V], closeWhenComplete: Boolean)(using StageCapacity, Ox): Source[RecordMetadata] =
source.mapAsView(r => SendPacket(List(r), Nil)).mapPublishAndCommit(producer, closeWhenComplete, commitOffsets = false)

extension [K, V](source: Source[SendPacket[K, V]])
/** For each packet, first all messages (producer records) are sent. Then, all messages up to the offsets of the consumer messages are
* committed. The metadata of the published records is sent downstream.
/** For each packet, first all messages (producer records) from [[SendPacket.send]] are sent, using a producer created with the given
* `producerSettings`. Then, all messages from [[SendPacket.commit]] are committed: for each topic-partition, up to the highest
* observed offset.
*
* @return
* A stream of published records metadata, in the order in which the [[SendPacket]]s are received.
*/
def mapPublishAndCommit(producerSettings: ProducerSettings[K, V])(using StageCapacity, Ox): Source[RecordMetadata] =
mapPublishAndCommit(producerSettings.toProducer, closeWhenComplete = true)

/** For each packet, first all messages (producer records) are sent. Then, all messages up to the offsets of the consumer messages are
* committed. The metadata of the published records is sent downstream.
/** For each packet, first all messages (producer records) are sent, using the given `producer`. Then, all messages from
* [[SendPacket.commit]] are committed: for each topic-partition, up to the highest observed offset.
*
* The producer is closed depending on the `closeWhenComplete` flag, after all messages are published, or when an exception occurs.
*
* @param producer
* The producer that is used to send messages.
* @return
* A stream of published records metadata, in the order in which the [[SendPacket]]s are received.
*/
def mapPublishAndCommit(producer: KafkaProducer[K, V], closeWhenComplete: Boolean)(using StageCapacity, Ox): Source[RecordMetadata] =
mapPublishAndCommit(producer, closeWhenComplete, commitOffsets = true)
Expand All @@ -40,29 +59,45 @@ object KafkaStage:
StageCapacity,
Ox
): Source[RecordMetadata] =
// source - the upstream from which packets are received

// the result, where metadata of published records is sent in the same order, as the received packets
val c = StageCapacity.newChannel[RecordMetadata]
// a helper channel to signal any exceptions that occur while publishing or committing offsets
val exceptions = Channel.unlimited[Exception]
// possible out-of-order metadata of the records published from `packet.send`
val metadata = Channel[(Long, RecordMetadata)](128)
// packets which are fully sent, and should be committed
val toCommit = Channel[SendPacket[_, _]](128)

// used to reorder values received from `metadata` using the assigned sequence numbers
val sendInSequence = SendInSequence(c)

fork {
try
// starting a nested scope, so that the committer is interrupted when the main process ends
// starting a nested scope, so that the committer is interrupted when the main process ends (when there's an exception)
scoped {
// committer
if commitOffsets then fork(tapException(doCommit(toCommit))(c.error))
val commitDoneSource = if commitOffsets then Source.fromFork(fork(tapException(doCommit(toCommit))(c.error))) else Source.empty

repeatWhile {
select(exceptions.receiveClause, metadata.receiveClause, source.receiveClause) match
case ChannelClosed.Error(r) => c.error(r); false
case ChannelClosed.Done => sendInSequence.drainFromThenDone(exceptions, metadata); false
case ChannelClosed.Error(r) => c.error(r); false
case ChannelClosed.Done =>
// waiting until all records are sent and metadata forwarded to `c`
sendInSequence.drainFrom(metadata, exceptions)
// we now know that there won't be any more offsets sent to be committed - we can complete the channel
toCommit.done()
// waiting until the commit fork is done
commitDoneSource.receive()
// completing the downstream
c.done()
// and finally winding down this scope & fork
false
case exceptions.Received(e) => c.error(e); false
case metadata.Received((s, m)) => sendInSequence.send(s, m); true
case source.Received(packet) =>
try
sendPacket(producer, packet, sendInSequence, toCommit, exceptions, metadata)
sendPacket(producer, packet, sendInSequence, toCommit, exceptions, metadata, commitOffsets)
true
catch
case e: Exception =>
Expand All @@ -84,7 +119,8 @@ object KafkaStage:
sendInSequence: SendInSequence[RecordMetadata],
toCommit: Sink[SendPacket[_, _]],
exceptions: Sink[Exception],
metadata: Sink[(Long, RecordMetadata)]
metadata: Sink[(Long, RecordMetadata)],
commitOffsets: Boolean
): Unit =
val leftToSend = new AtomicInteger(packet.send.size)
packet.send.foreach { toSend =>
Expand All @@ -94,47 +130,48 @@ object KafkaStage:
(m: RecordMetadata, e: Exception) =>
if e != null then exceptions.send(e)
else {
// sending commit request first, as when upstream `source` is done, we need to know that all commits are
// scheduled in order to shut down properly
if commitOffsets && leftToSend.decrementAndGet() == 0 then toCommit.send(packet)
metadata.send((sequenceNo, m))
if leftToSend.decrementAndGet() == 0 then toCommit.send(packet)
}
)
}

/** Sends `T` elements to the given `c` sink, when elements with subsequence sequence numbers are available. */
private class SendInSequence[T](c: Sink[T]):
private var sequenceNoNext = 0L
private var sequenceNoToSendNext = 0L
private val toSend = mutable.SortedSet[(Long, T)]()(Ordering.by(_._1))

def nextSequenceNo: Long =
val n = sequenceNoNext
sequenceNoNext += 1
n

def send(sequenceNo: Long, v: T): Unit =
toSend.add((sequenceNo, v))
/** Sends `T` elements to the given `c` sink, when elements with subsequent sequence numbers are available. Thread-unsafe. */
private class SendInSequence[T](c: Sink[T]):
private var sequenceNoNext = 0L
private var sequenceNoToSendNext = 0L
private val toSend = mutable.SortedSet[(Long, T)]()(Ordering.by(_._1))

def nextSequenceNo: Long =
val n = sequenceNoNext
sequenceNoNext += 1
n

def send(sequenceNo: Long, v: T): Unit =
toSend.add((sequenceNo, v))
trySend()

def allSent: Boolean = sequenceNoNext == sequenceNoToSendNext

@tailrec
private def trySend(): Unit = toSend.headOption match
case Some((s, m)) if s == sequenceNoToSendNext =>
toSend.remove((s, m))
c.send(m)
sequenceNoToSendNext += 1
trySend()
case _ => ()

def allSent: Boolean = sequenceNoNext == sequenceNoToSendNext

@tailrec
private def trySend(): Unit = toSend.headOption match
case Some((s, m)) if s == sequenceNoToSendNext =>
toSend.remove((s, m))
c.send(m)
sequenceNoToSendNext += 1
trySend()
case _ => ()

@tailrec
final def drainFromThenDone(
exceptions: Source[Exception],
incoming: Source[(Long, T)]
): Unit =
if allSent then c.done()
else
select(exceptions.receiveClause, incoming.receiveClause) match
case ChannelClosed.Error(r) => c.error(r)
case ChannelClosed.Done => throw new IllegalStateException()
case exceptions.Received(e) => c.error(e)
case incoming.Received((s, m)) => send(s, m); drainFromThenDone(exceptions, incoming)
@tailrec
final def drainFrom(
incoming: Source[(Long, T)],
exceptions: Source[Exception]
): Unit =
if !allSent then
select(exceptions.receiveClause, incoming.receiveClause) match
case ChannelClosed.Error(r) => c.error(r)
case ChannelClosed.Done => throw new IllegalStateException()
case exceptions.Received(e) => c.error(e)
case incoming.Received((s, m)) => send(s, m); drainFrom(incoming, exceptions)
15 changes: 11 additions & 4 deletions kafka/src/main/scala/ox/kafka/kafkaOffsetCommit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,24 @@ import ox.channels.*
import scala.collection.mutable
import scala.concurrent.duration.*

private[kafka] def doCommit(packets: Source[SendPacket[_, _]])(using Ox) =
private[kafka] def doCommit(packets: Source[SendPacket[_, _]])(using Ox): Unit =
val commitInterval = 1.second
val ticks = Source.tick(commitInterval)
val toCommit = mutable.Map[TopicPartition, Long]()
var consumer: Sink[KafkaConsumerRequest[_, _]] = null // assuming all packets come from the same consumer
val commitDone = Channel[Unit]()

forever {
select(ticks, packets).orThrow match
repeatWhile {
select(ticks, packets) match
case ChannelClosed.Error(e) => throw e
case ChannelClosed.Done => false
case () =>
if consumer != null && toCommit.nonEmpty then
consumer.send(KafkaConsumerRequest.Commit(toCommit.toMap))
consumer.send(KafkaConsumerRequest.Commit(toCommit.toMap, commitDone))
// waiting for the commit to happen
commitDone.receive()
toCommit.clear()
true
case packet: SendPacket[_, _] =>
packet.commit.foreach { receivedMessage =>
if consumer == null then consumer = receivedMessage.consumer.asInstanceOf[Sink[KafkaConsumerRequest[_, _]]]
Expand All @@ -29,6 +35,7 @@ private[kafka] def doCommit(packets: Source[SendPacket[_, _]])(using Ox) =
case None => Some(receivedMessage.offset)
}
}
true
}

case class SendPacket[K, V](send: List[ProducerRecord[K, V]], commit: List[ReceivedMessage[_, _]])
Expand Down
Loading
Loading