Skip to content

Commit

Permalink
Finalize ProcessorConsumer implementation for a clean processing of t…
Browse files Browse the repository at this point in the history
…asks, including:

- clean use of coroutines for batch processing
- clean behavior when calling scope canceled: all ongoing messages are processed
- clean behavior when error and excpetion are thrown
  • Loading branch information
geomagilles committed Oct 7, 2024
1 parent 126931a commit 2c2aafa
Show file tree
Hide file tree
Showing 39 changed files with 227 additions and 1,465 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -793,9 +793,9 @@ internal class ClientDispatcher(
consumer.startAsync(
subscription = MainSubscription(ClientTopic),
entity = emitterName.toString(),
handler = { message, _ -> responseFlow.emit(message) },
beforeDlq = null,
concurrency = 1,
process = { message, _ -> responseFlow.emit(message) },
beforeDlq = null,
)
}
// asynchronously listen
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ internal val mockedProducer = mockk<InMemoryInfiniticProducer> {
internal val mockedConsumer = mockk<InMemoryInfiniticConsumer> {
coEvery {
with(capture(scopeSlot)) {
startAsync(any<Subscription<*>>(), "$clientNameTest", any(), any(), 1)
startAsync(any<Subscription<*>>(), "$clientNameTest", 1, any(), any())
}
} answers {
scopeSlot.captured.launch { delay(Long.MAX_VALUE) }
Expand Down Expand Up @@ -230,9 +230,9 @@ internal class InfiniticClientTests : StringSpec(
mockedConsumer.start(
MainSubscription(ClientTopic),
"$clientNameTest",
1,
any(),
any(),
1,
)
}
}
Expand Down Expand Up @@ -514,9 +514,9 @@ internal class InfiniticClientTests : StringSpec(
mockedConsumer.startAsync(
MainSubscription(ClientTopic),
"$clientNameTest",
1,
any(),
any(),
1,
)
}
}
Expand All @@ -530,9 +530,9 @@ internal class InfiniticClientTests : StringSpec(
mockedConsumer.startAsync(
MainSubscription(ClientTopic),
"$clientNameTest",
1,
any(),
any(),
1,
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,19 @@ interface InfiniticConsumer {
* @param S The type of the messages to be consumed.
* @param subscription The subscription from which to consume messages.
* @param entity The entity associated with this consumer. (typically a service name or workflow name)
* @param handler The function to handle each consumed message and its publishing time.
* @param process The function to handle each consumed message and its publishing time.
* @param beforeDlq An optional function to be executed before sending the message to the dead-letter queue (DLQ).
* @param concurrency The number of concurrent message handlers to be used.
*/
context(CoroutineScope)
suspend fun <S : Message> startAsync(
subscription: Subscription<S>,
entity: String,
handler: suspend (S, MillisInstant) -> Unit,
beforeDlq: (suspend (S?, Exception) -> Unit)?,
concurrency: Int,
getBatchConfig: (suspend (S) -> Result<BatchConfig?>)? = null,
handlerBatch: (suspend (List<S>, List<MillisInstant>) -> Unit)? = null
process: suspend (S, MillisInstant) -> Unit,
beforeDlq: (suspend (S?, Exception) -> Unit)?,
batchConfig: (suspend (S) -> BatchConfig?)? = null,
batchProcess: (suspend (List<S>, List<MillisInstant>) -> Unit)? = null
): Job

/**
Expand All @@ -62,26 +62,26 @@ interface InfiniticConsumer {
* @param S The type of the messages to be consumed.
* @param subscription The subscription from which to consume messages.
* @param entity The entity associated with this consumer. (typically a service name or workflow name)
* @param handler The function to handle each consumed message and its publishing time.
* @param process The function to handle each consumed message and its publishing time.
* @param beforeDlq An optional function to be executed before sending the message to the dead-letter queue (DLQ).
* @param concurrency The number of concurrent message handlers to be used.
*/
context(CoroutineScope)
suspend fun <S : Message> start(
subscription: Subscription<S>,
entity: String,
handler: suspend (S, MillisInstant) -> Unit,
beforeDlq: (suspend (S?, Exception) -> Unit)?,
concurrency: Int,
getBatchConfig: (suspend (S) -> Result<BatchConfig?>)? = null,
handlerBatch: (suspend (List<S>, List<MillisInstant>) -> Unit)? = null
process: suspend (S, MillisInstant) -> Unit,
beforeDlq: (suspend (S?, Exception) -> Unit)?,
batchConfig: (suspend (S) -> BatchConfig?)? = null,
batchProcess: (suspend (List<S>, List<MillisInstant>) -> Unit)? = null
): Unit = startAsync(
subscription,
entity,
handler,
beforeDlq,
concurrency,
getBatchConfig,
handlerBatch,
process,
beforeDlq,
batchConfig,
batchProcess,
).join()
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,6 @@ interface TransportConsumer<T : TransportMessage> {
fun acknowledgeAsync(message: T): CompletableFuture<Unit>
fun negativeAcknowledgeAsync(message: T): CompletableFuture<Unit>

fun acknowledgeAsync(messages: List<T>): CompletableFuture<Unit>
fun negativeAcknowledgeAsync(messages: List<T>): CompletableFuture<Unit>

suspend fun acknowledge(message: T): Unit = acknowledgeAsync(message).await()
suspend fun negativeAcknowledge(message: T): Unit = negativeAcknowledgeAsync(message).await()

suspend fun acknowledge(messages: List<T>): Unit = acknowledgeAsync(messages).await()
suspend fun negativeAcknowledge(messages: List<T>): Unit =
negativeAcknowledgeAsync(messages).await()
}

This file was deleted.

Loading

0 comments on commit 2c2aafa

Please sign in to comment.