From 2c2aafacc182cf17144ccf1f5741858545219951 Mon Sep 17 00:00:00 2001 From: Gilles Barbier Date: Mon, 7 Oct 2024 10:15:48 +0200 Subject: [PATCH] Finalize ProcessorConsumer implementation for a clean processing of tasks, including: - clean use of coroutines for batch processing - clean behavior when calling scope canceled: all ongoing messages are processed - clean behavior when error and excpetion are thrown --- .../clients/dispatcher/ClientDispatcher.kt | 4 +- .../infinitic/clients/InfiniticClientTests.kt | 8 +- .../common/transport/InfiniticConsumer.kt | 28 +- .../common/transport/TransportConsumer.kt | 7 - .../consumers/AbstractConsumerProcessor.kt | 165 ---------- .../consumers/ConsumerSharedProcessor.kt | 296 ------------------ .../consumers/ConsumerUniqueProcessor.kt | 58 ---- .../{consumer => consumers}/OneOrMany.kt | 6 +- .../ProcessorConsumer.kt | 31 +- .../{consumer => consumers}/Result.kt | 4 +- .../{consumer => consumers}/batchBy.kt | 4 +- .../{consumer => consumers}/batchProcess.kt | 8 +- .../{consumer => consumers}/channels.kt | 6 +- .../{consumer => consumers}/collect.kt | 4 +- .../{consumer => consumers}/process.kt | 7 +- .../receiveIfNotClose.kt | 4 +- .../{consumer => consumers}/startBatching.kt | 4 +- .../{consumer => consumers}/startConsuming.kt | 4 +- .../common/transport/consumers/string.kt | 6 +- .../logged/LoggedInfiniticConsumer.kt | 16 +- .../common/utils/java/BatchUtilTests.java | 12 +- .../common/transport/consumer/fakes.kt | 132 -------- .../{consumer => consumers}/BatchByTests.kt | 2 +- .../ConsumerBatchedProcessorTests.kt | 248 --------------- .../consumers/ConsumerSharedProcessorTests.kt | 148 --------- .../consumers/ConsumerUniqueProcessorTests.kt | 144 --------- .../ProcessorConsumerTests.kt | 2 +- .../ProcessorConsumerWithBatchTests.kt | 2 +- .../StartBatchingTests.kt | 2 +- .../StartConsumingTests.kt | 2 +- .../common/transport/consumers/fakes.kt | 69 ++-- .../infinitic/tasks/executor/TaskExecutor.kt | 10 +- .../inMemory/InMemoryInfiniticConsumer.kt | 43 +-- .../inMemory/consumers/InMemoryConsumer.kt | 12 - .../pulsar/PulsarInfiniticConsumer.kt | 44 +-- .../pulsar/consumers/PulsarConsumer.kt | 8 - .../pulsar/PulsarInfiniticConsumerTests.kt | 44 +-- .../pulsar/consumers/ConsumerTests.kt | 10 +- .../io/infinitic/workers/InfiniticWorker.kt | 88 +++--- 39 files changed, 227 insertions(+), 1465 deletions(-) delete mode 100644 infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/AbstractConsumerProcessor.kt delete mode 100644 infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/ConsumerSharedProcessor.kt delete mode 100644 infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/ConsumerUniqueProcessor.kt rename infinitic-common/src/main/kotlin/io/infinitic/common/transport/{consumer => consumers}/OneOrMany.kt (89%) rename infinitic-common/src/main/kotlin/io/infinitic/common/transport/{consumer => consumers}/ProcessorConsumer.kt (85%) rename infinitic-common/src/main/kotlin/io/infinitic/common/transport/{consumer => consumers}/Result.kt (96%) rename infinitic-common/src/main/kotlin/io/infinitic/common/transport/{consumer => consumers}/batchBy.kt (98%) rename infinitic-common/src/main/kotlin/io/infinitic/common/transport/{consumer => consumers}/batchProcess.kt (94%) rename infinitic-common/src/main/kotlin/io/infinitic/common/transport/{consumer => consumers}/channels.kt (90%) rename infinitic-common/src/main/kotlin/io/infinitic/common/transport/{consumer => consumers}/collect.kt (96%) rename infinitic-common/src/main/kotlin/io/infinitic/common/transport/{consumer => consumers}/process.kt (93%) rename infinitic-common/src/main/kotlin/io/infinitic/common/transport/{consumer => consumers}/receiveIfNotClose.kt (91%) rename infinitic-common/src/main/kotlin/io/infinitic/common/transport/{consumer => consumers}/startBatching.kt (96%) rename infinitic-common/src/main/kotlin/io/infinitic/common/transport/{consumer => consumers}/startConsuming.kt (94%) delete mode 100644 infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumer/fakes.kt rename infinitic-common/src/test/kotlin/io/infinitic/common/transport/{consumer => consumers}/BatchByTests.kt (98%) delete mode 100644 infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/ConsumerBatchedProcessorTests.kt delete mode 100644 infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/ConsumerSharedProcessorTests.kt delete mode 100644 infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/ConsumerUniqueProcessorTests.kt rename infinitic-common/src/test/kotlin/io/infinitic/common/transport/{consumer => consumers}/ProcessorConsumerTests.kt (99%) rename infinitic-common/src/test/kotlin/io/infinitic/common/transport/{consumer => consumers}/ProcessorConsumerWithBatchTests.kt (99%) rename infinitic-common/src/test/kotlin/io/infinitic/common/transport/{consumer => consumers}/StartBatchingTests.kt (98%) rename infinitic-common/src/test/kotlin/io/infinitic/common/transport/{consumer => consumers}/StartConsumingTests.kt (98%) diff --git a/infinitic-client/src/main/kotlin/io/infinitic/clients/dispatcher/ClientDispatcher.kt b/infinitic-client/src/main/kotlin/io/infinitic/clients/dispatcher/ClientDispatcher.kt index 6f7424065..b302531eb 100644 --- a/infinitic-client/src/main/kotlin/io/infinitic/clients/dispatcher/ClientDispatcher.kt +++ b/infinitic-client/src/main/kotlin/io/infinitic/clients/dispatcher/ClientDispatcher.kt @@ -793,9 +793,9 @@ internal class ClientDispatcher( consumer.startAsync( subscription = MainSubscription(ClientTopic), entity = emitterName.toString(), - handler = { message, _ -> responseFlow.emit(message) }, - beforeDlq = null, concurrency = 1, + process = { message, _ -> responseFlow.emit(message) }, + beforeDlq = null, ) } // asynchronously listen diff --git a/infinitic-client/src/test/kotlin/io/infinitic/clients/InfiniticClientTests.kt b/infinitic-client/src/test/kotlin/io/infinitic/clients/InfiniticClientTests.kt index ba28720bf..ccb0aeefc 100644 --- a/infinitic-client/src/test/kotlin/io/infinitic/clients/InfiniticClientTests.kt +++ b/infinitic-client/src/test/kotlin/io/infinitic/clients/InfiniticClientTests.kt @@ -160,7 +160,7 @@ internal val mockedProducer = mockk { internal val mockedConsumer = mockk { coEvery { with(capture(scopeSlot)) { - startAsync(any>(), "$clientNameTest", any(), any(), 1) + startAsync(any>(), "$clientNameTest", 1, any(), any()) } } answers { scopeSlot.captured.launch { delay(Long.MAX_VALUE) } @@ -230,9 +230,9 @@ internal class InfiniticClientTests : StringSpec( mockedConsumer.start( MainSubscription(ClientTopic), "$clientNameTest", + 1, any(), any(), - 1, ) } } @@ -514,9 +514,9 @@ internal class InfiniticClientTests : StringSpec( mockedConsumer.startAsync( MainSubscription(ClientTopic), "$clientNameTest", + 1, any(), any(), - 1, ) } } @@ -530,9 +530,9 @@ internal class InfiniticClientTests : StringSpec( mockedConsumer.startAsync( MainSubscription(ClientTopic), "$clientNameTest", + 1, any(), any(), - 1, ) } } diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/InfiniticConsumer.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/InfiniticConsumer.kt index 89e0b6317..ba4aac565 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/InfiniticConsumer.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/InfiniticConsumer.kt @@ -39,7 +39,7 @@ interface InfiniticConsumer { * @param S The type of the messages to be consumed. * @param subscription The subscription from which to consume messages. * @param entity The entity associated with this consumer. (typically a service name or workflow name) - * @param handler The function to handle each consumed message and its publishing time. + * @param process The function to handle each consumed message and its publishing time. * @param beforeDlq An optional function to be executed before sending the message to the dead-letter queue (DLQ). * @param concurrency The number of concurrent message handlers to be used. */ @@ -47,11 +47,11 @@ interface InfiniticConsumer { suspend fun startAsync( subscription: Subscription, entity: String, - handler: suspend (S, MillisInstant) -> Unit, - beforeDlq: (suspend (S?, Exception) -> Unit)?, concurrency: Int, - getBatchConfig: (suspend (S) -> Result)? = null, - handlerBatch: (suspend (List, List) -> Unit)? = null + process: suspend (S, MillisInstant) -> Unit, + beforeDlq: (suspend (S?, Exception) -> Unit)?, + batchConfig: (suspend (S) -> BatchConfig?)? = null, + batchProcess: (suspend (List, List) -> Unit)? = null ): Job /** @@ -62,7 +62,7 @@ interface InfiniticConsumer { * @param S The type of the messages to be consumed. * @param subscription The subscription from which to consume messages. * @param entity The entity associated with this consumer. (typically a service name or workflow name) - * @param handler The function to handle each consumed message and its publishing time. + * @param process The function to handle each consumed message and its publishing time. * @param beforeDlq An optional function to be executed before sending the message to the dead-letter queue (DLQ). * @param concurrency The number of concurrent message handlers to be used. */ @@ -70,18 +70,18 @@ interface InfiniticConsumer { suspend fun start( subscription: Subscription, entity: String, - handler: suspend (S, MillisInstant) -> Unit, - beforeDlq: (suspend (S?, Exception) -> Unit)?, concurrency: Int, - getBatchConfig: (suspend (S) -> Result)? = null, - handlerBatch: (suspend (List, List) -> Unit)? = null + process: suspend (S, MillisInstant) -> Unit, + beforeDlq: (suspend (S?, Exception) -> Unit)?, + batchConfig: (suspend (S) -> BatchConfig?)? = null, + batchProcess: (suspend (List, List) -> Unit)? = null ): Unit = startAsync( subscription, entity, - handler, - beforeDlq, concurrency, - getBatchConfig, - handlerBatch, + process, + beforeDlq, + batchConfig, + batchProcess, ).join() } diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/TransportConsumer.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/TransportConsumer.kt index 8ac840492..2b1cf9894 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/TransportConsumer.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/TransportConsumer.kt @@ -31,13 +31,6 @@ interface TransportConsumer { fun acknowledgeAsync(message: T): CompletableFuture fun negativeAcknowledgeAsync(message: T): CompletableFuture - fun acknowledgeAsync(messages: List): CompletableFuture - fun negativeAcknowledgeAsync(messages: List): CompletableFuture - suspend fun acknowledge(message: T): Unit = acknowledgeAsync(message).await() suspend fun negativeAcknowledge(message: T): Unit = negativeAcknowledgeAsync(message).await() - - suspend fun acknowledge(messages: List): Unit = acknowledgeAsync(messages).await() - suspend fun negativeAcknowledge(messages: List): Unit = - negativeAcknowledgeAsync(messages).await() } diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/AbstractConsumerProcessor.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/AbstractConsumerProcessor.kt deleted file mode 100644 index 55b5b8ee4..000000000 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/AbstractConsumerProcessor.kt +++ /dev/null @@ -1,165 +0,0 @@ -/** - * "Commons Clause" License Condition v1.0 - * - * The Software is provided to you by the Licensor under the License, as defined below, subject to - * the following condition. - * - * Without limiting other conditions in the License, the grant of rights under the License will not - * include, and the License does not grant to you, the right to Sell the Software. - * - * For purposes of the foregoing, “Sell” means practicing any or all of the rights granted to you - * under the License to provide to third parties, for a fee or other consideration (including - * without limitation fees for hosting or consulting/ support services related to the Software), a - * product or service whose value derives, entirely or substantially, from the functionality of the - * Software. Any license notice or attribution required by the License must also include this - * Commons Clause License Condition notice. - * - * Software: Infinitic - * - * License: MIT License (https://opensource.org/licenses/MIT) - * - * Licensor: infinitic.io - */ -package io.infinitic.common.transport.consumers - -import io.github.oshai.kotlinlogging.KotlinLogging -import io.infinitic.common.data.MillisInstant -import io.infinitic.common.transport.TransportConsumer -import io.infinitic.common.transport.TransportMessage -import kotlinx.coroutines.currentCoroutineContext -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.future.await -import kotlinx.coroutines.isActive - -abstract class AbstractConsumerProcessor( - private val consumer: TransportConsumer, - private val deserialize: suspend (S) -> D, - private val process: suspend (D, MillisInstant) -> Unit, - private val beforeNegativeAcknowledgement: (suspend (S, D?, Exception) -> Unit)? -) { - protected val logger = KotlinLogging.logger {} - - /** - * Converts the asynchronous message receiving mechanism of a Consumer into a Flow of strings. - * - * The function continuously receives messages from the Consumer using its `receiveAsync` method, - * emits them to the Flow, and handles any potential exceptions. The loop breaks if a - * `CancellationException` is encountered, ensuring graceful shutdown of the flow. - */ - protected fun TransportConsumer.receiveAsFlow(): Flow = flow { - while (currentCoroutineContext().isActive) { - try { - emit(receiveAsync().await()) - } catch (e: Exception) { - logger.error(e) { "Error when receiving message from consumer" } - } - } - } - - /** - * Attempts to deserialize a given transport message. - * In case of an error, it logs the error, sends a negative acknowledgment, and returns null. - */ - protected suspend fun tryDeserialize( - message: S - ): DeserializedMessage? = try { - DeserializedMessage( - transportMessage = message, - deserialized = deserialize(message), - ) - } catch (e: Exception) { - logger.error(e) { "Error when deserializing message ${message.string}" } - negativeAcknowledge(message, null, e) - null - } - - /** - * Processes a deserialized message. - */ - protected suspend fun processSingle(message: DeserializedMessage) { - try { - process(message.deserialized, message.transportMessage.publishTime) - acknowledge(message) - } catch (e: Exception) { - logger.error(e) { "error when processing message ${message.deserialized.string}" } - negativeAcknowledge(message.transportMessage, message.deserialized, e) - } - } - - /** - * Acknowledges the deserialized message. - */ - protected suspend fun acknowledge(message: DeserializedMessage) = try { - consumer.acknowledge(message.transportMessage) - } catch (e: Exception) { - logger.warn(e) { "Error when acknowledging message ${message.string}" } - } - - /** - * Invoked before sending a negative acknowledgment for a message. - * - * This method serves as a pre-processing step prior to issuing a negative acknowledgment - * for a given transport message. It logs the message processing attempt, executes any - * registered `beforeNegativeAcknowledgement` callback, and catches any exceptions that - * occur during this step. - */ - protected suspend fun beforeNegativeAcknowledge( - transportMessage: S, - deserialized: D?, - cause: Exception - ) { - val msg by lazy { "beforeNegativeAcknowledgement for ${deserialized?.string ?: transportMessage.messageId}}" } - try { - beforeNegativeAcknowledgement?.let { - logger.debug { "Processing $msg" } - it( - transportMessage, - deserialized, - cause, - ) - logger.trace { "Processed $msg" } - } - } catch (e: Exception) { - logger.warn(e) { "Exception when processing $msg" } - } - } - - /** - * Sends a negative acknowledgment for a transport message. - * This method should not fail - */ - private suspend fun negativeAcknowledge( - transportMessage: S, - deserialized: D?, - cause: Exception - ) { - try { - beforeNegativeAcknowledge(transportMessage, deserialized, cause) - consumer.negativeAcknowledge(transportMessage) - } catch (e: Exception) { - // the message should be automatically negatively acknowledged after ackTimeout - // TODO: check ackTimeout setting - logger.warn(e) { "Error when negative acknowledging message ${deserialized?.string ?: transportMessage.messageId}" } - } - } - - /** - * A data class that holds the original transport message alongside its deserialized content. - */ - data class DeserializedMessage( - val transportMessage: S, - val deserialized: D, - ) { - // We ensure this never fails, as it is used in catch structure - override fun toString(): String = "DeserializedMessage(" + - "transportMessageId=${transportMessage.messageId}, " + - "deserialized=${deserialized.string}" + - ")" - } -} - - - - - diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/ConsumerSharedProcessor.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/ConsumerSharedProcessor.kt deleted file mode 100644 index fa7600982..000000000 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/ConsumerSharedProcessor.kt +++ /dev/null @@ -1,296 +0,0 @@ -/** - * "Commons Clause" License Condition v1.0 - * - * The Software is provided to you by the Licensor under the License, as defined below, subject to - * the following condition. - * - * Without limiting other conditions in the License, the grant of rights under the License will not - * include, and the License does not grant to you, the right to Sell the Software. - * - * For purposes of the foregoing, “Sell” means practicing any or all of the rights granted to you - * under the License to provide to third parties, for a fee or other consideration (including - * without limitation fees for hosting or consulting/ support services related to the Software), a - * product or service whose value derives, entirely or substantially, from the functionality of the - * Software. Any license notice or attribution required by the License must also include this - * Commons Clause License Condition notice. - * - * Software: Infinitic - * - * License: MIT License (https://opensource.org/licenses/MIT) - * - * Licensor: infinitic.io - */ -package io.infinitic.common.transport.consumers - -import io.infinitic.common.data.MillisInstant -import io.infinitic.common.transport.BatchConfig -import io.infinitic.common.transport.TransportConsumer -import io.infinitic.common.transport.TransportMessage -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.Job -import kotlinx.coroutines.NonCancellable -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.flatMapMerge -import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.receiveAsFlow -import kotlinx.coroutines.launch -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock -import kotlinx.coroutines.withContext -import java.time.Instant - -class ConsumerSharedProcessor( - private val consumer: TransportConsumer, - deserialize: suspend (S) -> D, - process: suspend (D, MillisInstant) -> Unit, - beforeNegativeAcknowledgement: (suspend (S, D?, Exception) -> Unit)?, - private val getBatchConfig: (suspend (D) -> Result)? = null, - private val processBatch: (suspend (List, List) -> Unit)? = null -) : AbstractConsumerProcessor( - consumer, - deserialize, - process, - beforeNegativeAcknowledgement, -) { - init { - require((processBatch == null) == (getBatchConfig == null)) { - "${::getBatchConfig.name} and processBatch should be defined together or not at all" - } - } - - /** - * Channel for messages to process - */ - private val processChannel = Channel>() - - /** - * A map that holds channels for batching messages. - * The key is common to all messages batched together - */ - private val batchingChannels = mutableMapOf>>() - - suspend fun start(concurrency: Int) = coroutineScope { - // Start [concurrency] processors for processChannel - List(concurrency) { launch { processChannel.startProcessing() } } - - consumer - .receiveAsFlow() - .deserialize(concurrency) - .collect { message -> - val batchConfigResult = getBatchConfig?.let { it(message.deserialized) } - batchConfigResult - ?.onFailure { - // assessBatching has the responsibility to tell - // the parent workflow that this task failed, - // but the consumer is done with this message - acknowledge(message) - } - ?.onSuccess { batchConfig -> - when (batchConfig) { - // no batch config for this message, we just process it - null -> processChannel.send(MessageSingle(message)) - // we found a batch config - else -> batchingChannels.send(batchConfig, message) - } - } - // no assessBatching function, we just process the message - ?: processChannel.send(MessageSingle(message)) - } - } - - /** - * Deserializes messages emitted by a Flow using the specified level of concurrency. - * - * @param concurrency The maximum number of concurrent deserialization operations to be performed. - * @return A Flow emitting deserialized strings. - */ - @OptIn(ExperimentalCoroutinesApi::class) - private fun Flow.deserialize(concurrency: Int): Flow> = - flatMapMerge(concurrency) { message -> - flow { tryDeserialize(message)?.let { emit(it) } } - } - - /** - * Starts processing messages from the channel. - */ - private suspend fun Channel>.startProcessing() { - for (message in this) { - withContext(NonCancellable) { - when (message) { - is MessageSingle -> processSingle(message.deserialized) - is MessageBatch -> processBatch(message.deserialized) - } - } - } - } - - /** - * Processes a batch of deserialized messages. - * (This method should not fail) - */ - private suspend fun processBatch(messages: List>) { - try { - processBatch!!( - messages.map { it.deserialized }, - messages.map { it.transportMessage.publishTime }, - ) - acknowledge(messages) - } catch (e: Exception) { - logger.error(e) { "error when processing batch ${messages.map { it.transportMessage.messageId }}" } - negativeAcknowledge(messages, e) - } - } - - /** - * Acknowledges a batch of deserialized message. - * (This method should not fail) - */ - private suspend fun acknowledge(messages: List>) = try { - consumer.acknowledge(messages.map { it.transportMessage }) - } catch (e: Exception) { - logger.warn(e) { "Error when acknowledging batch messages ${messages.map { it.transportMessage.messageId }}" } - } - - /** - * Negatively acknowledges a batch of deserialized message. - * (This method should not fail) - */ - private suspend fun negativeAcknowledge( - messages: List>, - cause: Exception - ) { - try { - coroutineScope { - messages.forEach { - launch { - beforeNegativeAcknowledge(it.transportMessage, it.deserialized, cause) - } - } - } - consumer.negativeAcknowledge(messages.map { it.transportMessage }) - } catch (e: Exception) { - // the messages should be automatically negatively acknowledged after ackTimeout - // TODO: check ackTimeout setting - logger.warn(e) { "Error when negative acknowledging batch messages ${messages.map { it.transportMessage.messageId }}" } - } - } - - /** - * Sends a message to a batched processing channel based on the specified configuration. - * If needed, creates the batching channel on the fly and start listening to it - */ - context(CoroutineScope) - private suspend fun MutableMap>>.send( - config: BatchConfig, - message: DeserializedMessage - ) { - getOrPut(config.batchKey) { - // create channel and start listening to it - Channel>().also { launch { it.startBatchingAndProcessing(config) } } - }.send(message) - } - - /** - * Starts batching and processing the elements received in the channel - * using the provided batch configuration. - */ - private suspend fun Channel>.startBatchingAndProcessing(config: BatchConfig) { - receiveAsFlow() - .collectBatch(config) { batch -> - // once a batch completed, send to processChannel as a MessageBatch - processChannel.send(MessageBatch(batch)) - } - } - - /** - * Collects elements from the flow into batches based on the provided configuration and executes - * a specified action on each batch. The collection is performed within a coroutine scope. - * - * @param config The configuration that provides batch size and timeout settings for batching. - * @param action The action to be performed on each batch of elements. - */ - internal suspend fun Flow.collectBatch( - config: BatchConfig, - action: suspend (List) -> Unit - ) = coroutineScope { - require(config.maxMessages > 1) { "batch size must be > 1" } - - var nowMillis: Long = 0 - val buffer = mutableListOf() - val timeoutMillis = config.maxDuration - val bufferMutex = Mutex() - lateinit var timeoutJob: Job - - fun logBatchProcessing(batch: List) { - val duration = Instant.now().toEpochMilli() - nowMillis - logger.debug { "Processing ${batch.size} elements for ${config.batchKey} after ${duration}ms" } - } - - fun CoroutineScope.startTimeoutJob() = launch { - try { - delay(timeoutMillis.millis) - // we reach the timeout, before the batch size - val batch = bufferMutex.withLock { - ArrayList(buffer).also { buffer.clear() } - } - // just in case - if (batch.isNotEmpty()) { - logBatchProcessing(batch) - action(batch) - } - } catch (e: CancellationException) { - // Do nothing - } - } - - try { - collect { value -> - var batch: List? = null - bufferMutex.withLock { - buffer.add(value) - when (buffer.size) { - 1 -> { - // after the first element, we start the timeout job - nowMillis = Instant.now().toEpochMilli() - timeoutJob = startTimeoutJob() - } - - config.maxMessages -> { - // we reach the batch size, before the timeout - timeoutJob.cancel() - batch = ArrayList(buffer) - buffer.clear() - } - - else -> null - } - } - batch?.let { - logBatchProcessing(it) - action(it) - } - } - } catch (e: CancellationException) { - // Do nothing - } finally { - timeoutJob.cancel() - } - } - - /** - * A sealed class representing either a single message or a batch of messages. - */ - private sealed class MessageOrMessageList - - private data class MessageSingle(val deserialized: DeserializedMessage) : - MessageOrMessageList() - - private data class MessageBatch(val deserialized: List>) : - MessageOrMessageList() -} - diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/ConsumerUniqueProcessor.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/ConsumerUniqueProcessor.kt deleted file mode 100644 index c7e1d879e..000000000 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/ConsumerUniqueProcessor.kt +++ /dev/null @@ -1,58 +0,0 @@ -/** - * "Commons Clause" License Condition v1.0 - * - * The Software is provided to you by the Licensor under the License, as defined below, subject to - * the following condition. - * - * Without limiting other conditions in the License, the grant of rights under the License will not - * include, and the License does not grant to you, the right to Sell the Software. - * - * For purposes of the foregoing, “Sell” means practicing any or all of the rights granted to you - * under the License to provide to third parties, for a fee or other consideration (including - * without limitation fees for hosting or consulting/ support services related to the Software), a - * product or service whose value derives, entirely or substantially, from the functionality of the - * Software. Any license notice or attribution required by the License must also include this - * Commons Clause License Condition notice. - * - * Software: Infinitic - * - * License: MIT License (https://opensource.org/licenses/MIT) - * - * Licensor: infinitic.io - */ -package io.infinitic.common.transport.consumers - -import io.infinitic.common.data.MillisInstant -import io.infinitic.common.transport.TransportConsumer -import io.infinitic.common.transport.TransportMessage -import kotlinx.coroutines.NonCancellable -import kotlinx.coroutines.withContext - -class ConsumerUniqueProcessor( - private val consumer: TransportConsumer, - deserialize: suspend (S) -> D, - process: suspend (D, MillisInstant) -> Unit, - beforeNegativeAcknowledgement: (suspend (S, D?, Exception) -> Unit)? -) : AbstractConsumerProcessor( - consumer, - deserialize, - process, - beforeNegativeAcknowledgement, -) { - - suspend fun start() = consumer - .receiveAsFlow() - .collect { message -> - withContext(NonCancellable) { - tryDeserialize(message)?.let { processSingle(it) } - } - } -} - - - - - - - - diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/OneOrMany.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/OneOrMany.kt similarity index 89% rename from infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/OneOrMany.kt rename to infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/OneOrMany.kt index 1dfe4369c..bbdb01960 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/OneOrMany.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/OneOrMany.kt @@ -20,7 +20,7 @@ * * Licensor: infinitic.io */ -package io.infinitic.common.transport.consumer +package io.infinitic.common.transport.consumers /** * Represents a structure that can hold either a single item or a collection of items. @@ -29,10 +29,10 @@ package io.infinitic.common.transport.consumer */ sealed interface OneOrMany -class One(val datum: D) : OneOrMany { +internal class One(val datum: D) : OneOrMany { override fun toString() = "One(${datum.toString()})" } -class Many(val data: List) : OneOrMany { +internal class Many(val data: List) : OneOrMany { override fun toString() = "Many($data})" } diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/ProcessorConsumer.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/ProcessorConsumer.kt similarity index 85% rename from infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/ProcessorConsumer.kt rename to infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/ProcessorConsumer.kt index dd65ba3df..4ed98037c 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/ProcessorConsumer.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/ProcessorConsumer.kt @@ -20,8 +20,9 @@ * * Licensor: infinitic.io */ -package io.infinitic.common.transport.consumer +package io.infinitic.common.transport.consumers +import io.github.oshai.kotlinlogging.KLogger import io.github.oshai.kotlinlogging.KotlinLogging import io.infinitic.common.data.MillisInstant import io.infinitic.common.transport.BatchConfig @@ -29,13 +30,12 @@ import io.infinitic.common.transport.TransportConsumer import io.infinitic.common.transport.TransportMessage import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job -import kotlinx.coroutines.future.await import kotlinx.coroutines.launch /** * A generic consumer for messages that handles * - deserialization, - * - processing, + * - single and batch processing, * - acknowledgement and negative acknowledgement. * * @param S The type of the message implementing the interface [TransportMessage]. @@ -45,8 +45,8 @@ import kotlinx.coroutines.launch * @param beforeNegativeAcknowledgement A suspend function called before negatively acknowledging a message. */ class ProcessorConsumer( - val consumer: TransportConsumer, - val beforeNegativeAcknowledgement: (suspend (Exception, S, D?) -> Unit)?, + private val consumer: TransportConsumer, + private val beforeNegativeAcknowledgement: (suspend (S, D?, Exception) -> Unit)?, ) { /** @@ -71,7 +71,6 @@ class ProcessorConsumer( batchConfig: (suspend (D) -> BatchConfig?)? = null, batchProcess: (suspend (List, List) -> Unit)? = null, ): Job = launch { - with(logger) { consumer .startConsuming() @@ -96,21 +95,23 @@ class ProcessorConsumer( } catch (e: Exception) { null } - negativeAcknowledge(it, message, d) + negativeAcknowledge(message, d, it) } } } } + context(KLogger) private suspend fun acknowledge(message: S, deserialize: D) = try { - consumer.acknowledgeAsync(message).await() + consumer.acknowledge(message) } catch (e: Exception) { logWarn(e) { "Error when acknowledging ${deserialize.string}" } } - private suspend fun negativeAcknowledge(e: Exception, message: S, deserialized: D?) { + context(KLogger) + private suspend fun negativeAcknowledge(message: S, deserialized: D?, e: Exception) { try { - beforeNegativeAcknowledgement?.let { it(e, message, deserialized) } + beforeNegativeAcknowledgement?.let { it(message, deserialized, e) } } catch (e: Exception) { logWarn(e) { "Error when calling negative acknowledgement hook for message " + @@ -118,7 +119,7 @@ class ProcessorConsumer( } } try { - consumer.negativeAcknowledgeAsync(message).await() + consumer.negativeAcknowledge(message) } catch (e: Exception) { logWarn(e) { "Error when negatively acknowledging message " + @@ -127,14 +128,6 @@ class ProcessorConsumer( } } - // No Error should come from logging errors - private val D.string: String - get() = try { - toString() - } catch (e: Exception) { - "${this::class.java.name}: Error during toString() - (${e::class.java.name}(${e.message}))" - } - private fun logWarn(e: Exception, message: () -> String) = try { logger.warn(e, message) } catch (e: Exception) { diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/Result.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/Result.kt similarity index 96% rename from infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/Result.kt rename to infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/Result.kt index 5dd1bffa9..4adb4db61 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/Result.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/Result.kt @@ -20,9 +20,9 @@ * * Licensor: infinitic.io */ -package io.infinitic.common.transport.consumer +package io.infinitic.common.transport.consumers -class Result internal constructor( +internal class Result internal constructor( private val message: M, private val value: Any? ) { diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/batchBy.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/batchBy.kt similarity index 98% rename from infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/batchBy.kt rename to infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/batchBy.kt index 03fca45ba..37de11f67 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/batchBy.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/batchBy.kt @@ -21,7 +21,7 @@ * Licensor: infinitic.io */ -package io.infinitic.common.transport.consumer +package io.infinitic.common.transport.consumers import io.github.oshai.kotlinlogging.KLogger import io.infinitic.common.transport.BatchConfig @@ -46,7 +46,7 @@ import kotlinx.coroutines.withContext * either a [SingleMessage] or a [MultipleMessages] instance. */ context(CoroutineScope, KLogger) -fun Channel>.batchBy( +internal fun Channel>.batchBy( getBatchConfig: suspend (I) -> BatchConfig?, ): Channel>> { val callingScope: CoroutineScope = this@CoroutineScope diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/batchProcess.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/batchProcess.kt similarity index 94% rename from infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/batchProcess.kt rename to infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/batchProcess.kt index 86eb06f88..d83057ce3 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/batchProcess.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/batchProcess.kt @@ -20,10 +20,9 @@ * * Licensor: infinitic.io */ -package io.infinitic.common.transport.consumer +package io.infinitic.common.transport.consumers import io.github.oshai.kotlinlogging.KLogger -import io.infinitic.common.transport.consumers.string import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.cancel @@ -42,7 +41,7 @@ import kotlinx.coroutines.withContext * @return A channel where the results of the processed elements will be sent. */ context(CoroutineScope, KLogger) -fun Channel>>.batchProcess( +internal fun Channel>>.batchProcess( concurrency: Int = 1, singleProcess: suspend (M, I) -> O, batchProcess: suspend (List, List) -> List, @@ -100,6 +99,9 @@ fun Channel>>.batchProcess( is One -> process(oneOrMany) is Many -> process(oneOrMany) } + } catch (e: Exception) { + warn(e) { "Exception while batch processing messages" } + throw e } catch (e: Error) { warn(e) { "Error when batch processing messages, cancelling calling scope" } callingScope.cancel() diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/channels.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/channels.kt similarity index 90% rename from infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/channels.kt rename to infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/channels.kt index 91bf22778..0e931e54b 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/channels.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/channels.kt @@ -20,7 +20,7 @@ * * Licensor: infinitic.io */ -package io.infinitic.common.transport.consumer +package io.infinitic.common.transport.consumers import io.github.oshai.kotlinlogging.KLogger import io.infinitic.common.exceptions.thisShouldNotHappen @@ -33,13 +33,13 @@ private val producersMutex = Mutex() private val producerCounters = mutableMapOf, Int>() context(KLogger) -suspend fun Channel<*>.addProducer() = producersMutex.withLock { +internal suspend fun Channel<*>.addProducer() = producersMutex.withLock { debug { "Adding one producer from ${producerCounters[this]} to channel ${this.hashCode()}" } producerCounters[this] = (producerCounters[this] ?: 0) + 1 } context(KLogger) -suspend fun Channel<*>.removeProducer() = producersMutex.withLock { +internal suspend fun Channel<*>.removeProducer() = producersMutex.withLock { debug { "Removing one producer from ${producerCounters[this]} from channel ${this.hashCode()}" } when (val count = producerCounters[this]) { null -> thisShouldNotHappen() diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/collect.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/collect.kt similarity index 96% rename from infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/collect.kt rename to infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/collect.kt index 6b5a55041..17f6adcda 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/collect.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/collect.kt @@ -20,7 +20,7 @@ * * Licensor: infinitic.io */ -package io.infinitic.common.transport.consumer +package io.infinitic.common.transport.consumers import io.github.oshai.kotlinlogging.KLogger import kotlinx.coroutines.CoroutineScope @@ -38,7 +38,7 @@ import kotlinx.coroutines.withContext * If null, no processing is applied. */ context(CoroutineScope, KLogger) -fun Channel.collect( +internal fun Channel.collect( process: (suspend (S) -> Unit)? = null ) { val callingScope: CoroutineScope = this@CoroutineScope diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/process.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/process.kt similarity index 93% rename from infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/process.kt rename to infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/process.kt index 82543505b..f64034868 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/process.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/process.kt @@ -20,7 +20,7 @@ * * Licensor: infinitic.io */ -package io.infinitic.common.transport.consumer +package io.infinitic.common.transport.consumers import io.github.oshai.kotlinlogging.KLogger import kotlinx.coroutines.CoroutineScope @@ -44,7 +44,7 @@ import kotlinx.coroutines.withContext * @return A new channel that contains the processed results. */ context(CoroutineScope, KLogger) -fun Channel>.process( +internal fun Channel>.process( concurrency: Int = 1, process: suspend (M, I) -> O, ): Channel> { @@ -73,6 +73,9 @@ fun Channel>.process( result.onFailure { outputChannel.send(result.failure(it)) } + } catch (e: Exception) { + warn(e) { "Exception while processing" } + throw e } catch (e: Error) { warn(e) { "Error while processing, cancelling calling scope" } callingScope.cancel() diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/receiveIfNotClose.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/receiveIfNotClose.kt similarity index 91% rename from infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/receiveIfNotClose.kt rename to infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/receiveIfNotClose.kt index 493a5120d..86d6be4de 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/receiveIfNotClose.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/receiveIfNotClose.kt @@ -20,12 +20,12 @@ * * Licensor: infinitic.io */ -package io.infinitic.common.transport.consumer +package io.infinitic.common.transport.consumers import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ClosedReceiveChannelException -suspend fun Channel.receiveIfNotClose(): M? = +internal suspend fun Channel.receiveIfNotClose(): M? = try { receive() } catch (e: ClosedReceiveChannelException) { diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/startBatching.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/startBatching.kt similarity index 96% rename from infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/startBatching.kt rename to infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/startBatching.kt index 9e488ac12..b12bc6bd6 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/startBatching.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/startBatching.kt @@ -20,7 +20,7 @@ * * Licensor: infinitic.io */ -package io.infinitic.common.transport.consumer +package io.infinitic.common.transport.consumers import io.github.oshai.kotlinlogging.KLogger import kotlinx.coroutines.CoroutineScope @@ -40,7 +40,7 @@ import kotlinx.coroutines.withTimeoutOrNull * @param outputChannel The channel to send the batched messages to. Defaults to a new channel. */ context(CoroutineScope, KLogger) -fun Channel>.startBatching( +internal fun Channel>.startBatching( maxMessages: Int, maxDuration: Long, outputChannel: Channel>>, diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/startConsuming.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/startConsuming.kt similarity index 94% rename from infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/startConsuming.kt rename to infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/startConsuming.kt index ef9aa48e5..83cd0f352 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumer/startConsuming.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/startConsuming.kt @@ -20,7 +20,7 @@ * * Licensor: infinitic.io */ -package io.infinitic.common.transport.consumer +package io.infinitic.common.transport.consumers import io.github.oshai.kotlinlogging.KLogger import io.infinitic.common.transport.TransportConsumer @@ -41,7 +41,7 @@ import kotlin.coroutines.cancellation.CancellationException * @return A channel that emits Result objects containing the original and resulting messages. */ context(CoroutineScope, KLogger) -fun TransportConsumer.startConsuming(): Channel> { +internal fun TransportConsumer.startConsuming(): Channel> { val channel = Channel>() val scope = this@CoroutineScope diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/string.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/string.kt index 6f149bc81..3e6641276 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/string.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/string.kt @@ -22,17 +22,19 @@ */ package io.infinitic.common.transport.consumers +import io.github.oshai.kotlinlogging.KLogger + /** * Extension property to safely convert any object to its io.infinitic.workers.consumers.getString representation. * * This property ensures that a call to the `toString()` method does not throw an exception * This is used in catch() sections to avoid creating a potential additional issue */ +context(KLogger) internal val Any.string: String get() = try { toString() } catch (e: Exception) { - //logger.warn(e) { "Error when calling toString()" } + warn(e) { "Error when calling toString()" } "${this::class.java.name}(${e::class.java.name}(${e.message}))" } - diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/logged/LoggedInfiniticConsumer.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/logged/LoggedInfiniticConsumer.kt index d9771f161..a56092fc1 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/logged/LoggedInfiniticConsumer.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/logged/LoggedInfiniticConsumer.kt @@ -40,15 +40,15 @@ class LoggedInfiniticConsumer( override suspend fun startAsync( subscription: Subscription, entity: String, - handler: suspend (S, MillisInstant) -> Unit, - beforeDlq: (suspend (S?, Exception) -> Unit)?, concurrency: Int, - getBatchConfig: (suspend (S) -> Result)?, - handlerBatch: (suspend (List, List) -> Unit)? + process: suspend (S, MillisInstant) -> Unit, + beforeDlq: (suspend (S?, Exception) -> Unit)?, + batchConfig: (suspend (S) -> BatchConfig?)?, + batchProcess: (suspend (List, List) -> Unit)? ): Job { val loggedHandler: suspend (S, MillisInstant) -> Unit = { message, instant -> logger.debug { formatLog(message.id(), "Processing:", message) } - handler(message, instant) + process(message, instant) logger.trace { formatLog(message.id(), "Processed:", message) } } @@ -64,11 +64,11 @@ class LoggedInfiniticConsumer( return consumer.startAsync( subscription, entity, + concurrency, loggedHandler, loggedBeforeDlq, - concurrency, - getBatchConfig, - handlerBatch, + batchConfig, + batchProcess, ) } } diff --git a/infinitic-common/src/test/java/io/infinitic/common/utils/java/BatchUtilTests.java b/infinitic-common/src/test/java/io/infinitic/common/utils/java/BatchUtilTests.java index ef77e4fc0..4fcff32ff 100644 --- a/infinitic-common/src/test/java/io/infinitic/common/utils/java/BatchUtilTests.java +++ b/infinitic-common/src/test/java/io/infinitic/common/utils/java/BatchUtilTests.java @@ -20,7 +20,6 @@ * * Licensor: infinitic.io */ - package io.infinitic.common.utils.java; import io.infinitic.annotations.Batch; @@ -44,6 +43,7 @@ public List bar(List p) { } // 2 parameters - Batched method with Collection parameter +@SuppressWarnings("unused") class FooBatch2 { public String bar(int p, int q) { return Integer.toString(p) + q; @@ -56,6 +56,7 @@ public List bar(List l) { } // 1 parameter - Batched method with Collection parameter +@SuppressWarnings("unused") class FooBatch3 { public String bar(Set p) { return p.toString(); @@ -68,6 +69,7 @@ public List bar(List> p) { } // 1 parameter - No return +@SuppressWarnings("unused") class FooBatch5 { public void bar(int p, int q) { // do nothing @@ -80,6 +82,7 @@ public void bar(List p) { } // 1 parameter - Parameter with Generic +@SuppressWarnings("unused") class FooBatch6 { public void bar(MyPair pair) { // do nothing @@ -92,6 +95,7 @@ public void bar(List> pairs) { } // Single method with parent return value +@SuppressWarnings("unused") class FooBatch7 implements FooBatch { @Override @@ -107,11 +111,13 @@ public List bar(List list) { } } +@SuppressWarnings("unused") interface FooBatch { PairInt bar(int p); } // vararg not accepted +@SuppressWarnings("unused") class FooBatchError0 { String bar(int p) { return Integer.toString(p); @@ -126,6 +132,7 @@ List bar(int... p) { } // annotation @Batch without corresponding single method with the right parameters +@SuppressWarnings("unused") class FooBatchError1 { public String bar(int p) { return Integer.toString(p); @@ -138,6 +145,7 @@ public List bar(List p, int q) { } // annotation @Batch without corresponding single method with the right parameters +@SuppressWarnings("unused") class FooBatchError2 { public String bar(int p, int q) { return Integer.toString(p) + q; @@ -150,6 +158,7 @@ public List bar(List p) { } // Not the right return type +@SuppressWarnings("unused") class FooBatchError4 { public String bar(int p, int q) { return "?"; @@ -162,6 +171,7 @@ public List bar(List p) { } // Not a List in return type +@SuppressWarnings("unused") class FooBatchError5 { public String bar(int p, int q) { return "?"; diff --git a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumer/fakes.kt b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumer/fakes.kt deleted file mode 100644 index 7ecc9bf6a..000000000 --- a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumer/fakes.kt +++ /dev/null @@ -1,132 +0,0 @@ -/** - * "Commons Clause" License Condition v1.0 - * - * The Software is provided to you by the Licensor under the License, as defined below, subject to - * the following condition. - * - * Without limiting other conditions in the License, the grant of rights under the License will not - * include, and the License does not grant to you, the right to Sell the Software. - * - * For purposes of the foregoing, “Sell” means practicing any or all of the rights granted to you - * under the License to provide to third parties, for a fee or other consideration (including - * without limitation fees for hosting or consulting/ support services related to the Software), a - * product or service whose value derives, entirely or substantially, from the functionality of the - * Software. Any license notice or attribution required by the License must also include this - * Commons Clause License Condition notice. - * - * Software: Infinitic - * - * License: MIT License (https://opensource.org/licenses/MIT) - * - * Licensor: infinitic.io - */ -package io.infinitic.common.transport.consumer - -import io.infinitic.common.data.MillisDuration -import io.infinitic.common.data.MillisInstant -import io.infinitic.common.transport.BatchConfig -import io.infinitic.common.transport.TransportConsumer -import io.infinitic.common.transport.TransportMessage -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.delay -import kotlinx.coroutines.future.future -import java.util.* -import java.util.concurrent.CompletableFuture -import java.util.concurrent.atomic.AtomicInteger -import kotlin.random.Random - -internal data class IntMessage(val value: Int) : TransportMessage { - override val messageId: String = value.toString() - override val redeliveryCount: Int = Random.nextInt(3) - override val publishTime: MillisInstant = MillisInstant.now() - override fun toString(): String = value.toString() -} - -internal data class DeserializedIntMessage(val value: IntMessage) { - override fun toString(): String = value.toString() -} - -internal val receivedList = Collections.synchronizedList(mutableListOf()) -internal val deserializedList = Collections.synchronizedList(mutableListOf()) -internal val processedList = Collections.synchronizedList(mutableListOf()) -internal val acknowledgedList = Collections.synchronizedList(mutableListOf()) -internal val negativeAcknowledgedList = Collections.synchronizedList(mutableListOf()) -internal val beforeNegativeAcknowledgedList = Collections.synchronizedList(mutableListOf()) - -internal open class IntConsumer : TransportConsumer { - private val counter = AtomicInteger(0) - - protected val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) - - fun reset() { - counter.set(0) - } - - override fun receiveAsync() = scope.future { - IntMessage(counter.incrementAndGet()) - .also { receivedList.add(it.value) } - } - - override fun negativeAcknowledgeAsync(messages: List): CompletableFuture = - scope.future { - delay(Random.nextLong(5)) - .also { negativeAcknowledgedList.addAll(messages.map { it.value }) } - } - - override fun acknowledgeAsync(messages: List): CompletableFuture = - scope.future { - delay(Random.nextLong(5)) - .also { acknowledgedList.addAll(messages.map { it.value }) } - } - - override fun negativeAcknowledgeAsync(message: IntMessage): CompletableFuture = - scope.future { - delay(Random.nextLong(5)) - .also { negativeAcknowledgedList.add(message.value) } - } - - override fun acknowledgeAsync(message: IntMessage): CompletableFuture = - scope.future { - delay(Random.nextLong(5)) - .also { acknowledgedList.add(message.value) } - } -} - -internal suspend fun deserialize(value: IntMessage) = DeserializedIntMessage(value).also { - println("start deserializing...$value") - delay(Random.nextLong(5)) - deserializedList.add(it.value.value) - println("end deserializing...$value") -} - - -internal suspend fun process(message: DeserializedIntMessage, publishTime: MillisInstant) { - println("start processing......${message.value.value}") - delay(Random.nextLong(100)) - println("end processing......${message.value.value}") - processedList.add(message.value.value) -} - -internal fun processBatch(batch: List, publishTimes: List) { - processedList.addAll(batch.map { it.value.value }) -} - -internal fun getBatchingConfig(value: DeserializedIntMessage): BatchConfig? { - val i = value.value.value - return when { - (i % 3) == 0 -> null - (i % 3) == 1 -> BatchConfig("1", 20, MillisDuration(1000 * 3600 * 50)) - (i % 3) == 2 -> BatchConfig("2", 20, MillisDuration(1000 * 3600 * 50)) - else -> throw IllegalStateException() - } -} - -internal fun beforeNegativeAcknowledgement( - e: Exception, - message: IntMessage, - deserialized: DeserializedIntMessage?, -) { - beforeNegativeAcknowledgedList.add(message.value) -} diff --git a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumer/BatchByTests.kt b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/BatchByTests.kt similarity index 98% rename from infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumer/BatchByTests.kt rename to infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/BatchByTests.kt index b1c96bc12..d4bce5b55 100644 --- a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumer/BatchByTests.kt +++ b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/BatchByTests.kt @@ -20,7 +20,7 @@ * * Licensor: infinitic.io */ -package io.infinitic.common.transport.consumer +package io.infinitic.common.transport.consumers import io.github.oshai.kotlinlogging.KotlinLogging import io.infinitic.common.data.MillisDuration diff --git a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/ConsumerBatchedProcessorTests.kt b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/ConsumerBatchedProcessorTests.kt deleted file mode 100644 index de1dd28f5..000000000 --- a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/ConsumerBatchedProcessorTests.kt +++ /dev/null @@ -1,248 +0,0 @@ -/** - * "Commons Clause" License Condition v1.0 - * - * The Software is provided to you by the Licensor under the License, as defined below, subject to - * the following condition. - * - * Without limiting other conditions in the License, the grant of rights under the License will not - * include, and the License does not grant to you, the right to Sell the Software. - * - * For purposes of the foregoing, “Sell” means practicing any or all of the rights granted to you - * under the License to provide to third parties, for a fee or other consideration (including - * without limitation fees for hosting or consulting/ support services related to the Software), a - * product or service whose value derives, entirely or substantially, from the functionality of the - * Software. Any license notice or attribution required by the License must also include this - * Commons Clause License Condition notice. - * - * Software: Infinitic - * - * License: MIT License (https://opensource.org/licenses/MIT) - * - * Licensor: infinitic.io - */ -package io.infinitic.common.transport.consumers - -import io.infinitic.common.data.MillisDuration -import io.infinitic.common.data.MillisInstant -import io.infinitic.common.fixtures.runAndCancel -import io.infinitic.common.transport.BatchConfig -import io.kotest.assertions.throwables.shouldThrow -import io.kotest.core.spec.style.StringSpec -import io.kotest.matchers.shouldBe -import kotlin.random.Random - -@Suppress("UNUSED_PARAMETER") -internal class ConsumerBatchedProcessorTests : StringSpec( - { - val concurrency = Random.nextInt(2, 100) - - fun noProcess(message: DeserializedIntMessage, publishTime: MillisInstant) { - throw NoSuchMethodError() - } - - val processor = ConsumerSharedProcessor( - Consumer(), - ::deserialize, - ::noProcess, - ::beforeNegativeAcknowledgement, - ::assessBatching, - ::processBatch, - ) - - beforeEach { - receivedList.clear() - deserializedList.clear() - processedList.clear() - acknowledgedList.clear() - negativeAcknowledgedList.clear() - beforeNegativeAcknowledgedList.clear() - } - - "Processor throws CancellationException when current scope is canceled" { - runAndCancel { - processor.start(concurrency) - } - } - - "On cancellation, all ongoing messages should be processed before closing" { - runAndCancel { - processor.start(concurrency) - } - - checkAllProcessedAreAcknowledged() - checkBeforeNegativeAcknowledged() - negativeAcknowledgedList shouldBe emptyList() - } - - "An Error during deserialization throws" { - - fun deserializeWithError(value: IntMessage) = DeserializedIntMessage(value).also { - if (it.value.value == 200) throw Error("Expected Error") - deserializedList.add(it.value.value) - } - - val processorWithError = ConsumerSharedProcessor( - Consumer(), - ::deserializeWithError, - ::noProcess, - ::beforeNegativeAcknowledgement, - ::assessBatching, - ::processBatch, - ) - - val e = shouldThrow { processorWithError.start(concurrency) } - e.message shouldBe "Expected Error" - checkAllProcessedAreAcknowledged() - checkBeforeNegativeAcknowledged() - negativeAcknowledgedList shouldBe emptyList() - } - - "An Error during batch processing throws" { - fun processBatchWithError( - batch: List, - publishTimes: List - ) { - if (batch.map { it.value.value }.contains(200)) throw Error("Expected Error") - println("processBatch: $batch") - processedList.addAll(batch.map { it.value.value }) - } - - val processorWithError = ConsumerSharedProcessor( - Consumer(), - ::deserialize, - ::noProcess, - ::beforeNegativeAcknowledgement, - ::assessBatching, - ::processBatchWithError, - ) - - val e = shouldThrow { processorWithError.start(concurrency) } - e.message shouldBe "Expected Error" - checkAllProcessedAreAcknowledged() - checkBeforeNegativeAcknowledged() - negativeAcknowledgedList shouldBe emptyList() - } - - "An exception during deserialization triggers negative acknowledgment" { - fun deserializeWitError(value: IntMessage) = DeserializedIntMessage(value).also { - if (it.value.value == 100) throw Exception("Expected Exception") - if (it.value.value == 400) throw Error("Expected Error") - deserializedList.add(it.value.value) - } - - val processorWithException = ConsumerSharedProcessor( - Consumer(), - ::deserializeWitError, - ::noProcess, - ::beforeNegativeAcknowledgement, - ::assessBatching, - ::processBatch, - ) - - shouldThrow { processorWithException.start(concurrency) } - checkAllProcessedAreAcknowledged() - checkBeforeNegativeAcknowledged() - negativeAcknowledgedList shouldBe listOf(100) - } - - "An exception during batch processing triggers negative acknowledgment" { - - fun processBatchWithError( - batch: List, - publishTimes: List - ) { - if (batch.map { it.value.value }.contains(100)) throw Exception("Expected Exception") - if (batch.map { it.value.value }.contains(200)) throw Error("Expected Error") - println("processBatch: $batch") - processedList.addAll(batch.map { it.value.value }) - } - - val processorWithException = ConsumerSharedProcessor( - Consumer(), - ::deserialize, - ::noProcess, - ::beforeNegativeAcknowledgement, - ::assessBatching, - ::processBatchWithError, - ) - - shouldThrow { processorWithException.start(concurrency) } - checkAllProcessedAreAcknowledged() - checkBeforeNegativeAcknowledged() - negativeAcknowledgedList.contains(100) shouldBe true - negativeAcknowledgedList.size shouldBe 20 - } - - "Checking batching process by size" { - - fun processBatchWithChecks( - batch: List, - publishTimes: List - ) { - // checking that batches are correct - batch.map { it.value.value % 2 }.toSet().size shouldBe 1 - batch.size shouldBe 20 - - processedList.addAll(batch.map { it.value.value }) - } - - val processorWithError = ConsumerSharedProcessor( - Consumer(), - ::deserialize, - ::noProcess, - ::beforeNegativeAcknowledgement, - ::assessBatching, - ::processBatchWithChecks, - ) - - runAndCancel { - processorWithError.start(concurrency) - } - - checkAllProcessedAreAcknowledged() - checkBeforeNegativeAcknowledged() - negativeAcknowledgedList shouldBe emptySet() - } - - "Checking batching by time" { - - fun assessTimeBatching(value: DeserializedIntMessage): Result { - val i = value.value.value - return when { - i == 0 -> null - (i % 2) == 0 -> BatchConfig("even", Int.MAX_VALUE, MillisDuration(10)) - (i % 2) == 1 -> BatchConfig("odd", Int.MAX_VALUE, MillisDuration(10)) - else -> throw IllegalStateException() - }.let { Result.success(it) } - } - - fun processBatchWithChecks( - batch: List, - publishTimes: List - ) { - // checking that batches are correct - batch.map { it.value.value % 2 }.toSet().size shouldBe 1 - println("batch = $batch") - processedList.addAll(batch.map { it.value.value }) - } - - val processorWithError = ConsumerSharedProcessor( - Consumer(), - ::deserialize, - ::noProcess, - ::beforeNegativeAcknowledgement, - ::assessTimeBatching, - ::processBatchWithChecks, - ) - - runAndCancel { - processorWithError.start(concurrency) - } - - checkAllProcessedAreAcknowledged() - checkBeforeNegativeAcknowledged() - negativeAcknowledgedList shouldBe emptySet() - } - - }, -) diff --git a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/ConsumerSharedProcessorTests.kt b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/ConsumerSharedProcessorTests.kt deleted file mode 100644 index ee7ccff3c..000000000 --- a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/ConsumerSharedProcessorTests.kt +++ /dev/null @@ -1,148 +0,0 @@ -/** - * "Commons Clause" License Condition v1.0 - * - * The Software is provided to you by the Licensor under the License, as defined below, subject to - * the following condition. - * - * Without limiting other conditions in the License, the grant of rights under the License will not - * include, and the License does not grant to you, the right to Sell the Software. - * - * For purposes of the foregoing, “Sell” means practicing any or all of the rights granted to you - * under the License to provide to third parties, for a fee or other consideration (including - * without limitation fees for hosting or consulting/ support services related to the Software), a - * product or service whose value derives, entirely or substantially, from the functionality of the - * Software. Any license notice or attribution required by the License must also include this - * Commons Clause License Condition notice. - * - * Software: Infinitic - * - * License: MIT License (https://opensource.org/licenses/MIT) - * - * Licensor: infinitic.io - */ -package io.infinitic.common.transport.consumers - -import io.infinitic.common.data.MillisInstant -import io.infinitic.common.fixtures.runAndCancel -import io.kotest.assertions.throwables.shouldThrow -import io.kotest.core.spec.style.StringSpec -import io.kotest.matchers.shouldBe -import kotlin.random.Random - -internal class ConsumerSharedProcessorTests : StringSpec( - { - val concurrency = Random.nextInt(100) - - val processor = ConsumerSharedProcessor( - Consumer(), - ::deserialize, - ::process, - ::beforeNegativeAcknowledgement, - ) - - beforeEach { - receivedList.clear() - deserializedList.clear() - processedList.clear() - acknowledgedList.clear() - negativeAcknowledgedList.clear() - beforeNegativeAcknowledgedList.clear() - } - - "Processor throws CancellationException when current scope is canceled" { - runAndCancel { - processor.start(concurrency) - } - } - - "On cancellation, all ongoing messages should be processed before closing" { - runAndCancel { - processor.start(concurrency) - } - - checkAllProcessedAreAcknowledged() - checkBeforeNegativeAcknowledged() - negativeAcknowledgedList shouldBe emptyList() - } - - "An Error during deserialization throws" { - fun deserializeWitError(value: IntMessage) = DeserializedIntMessage(value).also { - if (it.value.value == 200) throw Error("Expected Error") - deserializedList.add(it.value.value) - } - - val processorWithError = ConsumerSharedProcessor( - Consumer(), - ::deserializeWitError, - ::process, - ::beforeNegativeAcknowledgement, - ) - - val e = shouldThrow { processorWithError.start(concurrency) } - e.message shouldBe "Expected Error" - checkAllProcessedAreAcknowledged() - checkBeforeNegativeAcknowledged() - negativeAcknowledgedList shouldBe emptyList() - } - - "An Error during processing throws" { - fun processWithError(message: DeserializedIntMessage, publishTime: MillisInstant) { - if (message.value.value == 200) throw Error("Expected Error") - processedList.add(message.value.value) - } - - val processorWithError = ConsumerSharedProcessor( - Consumer(), - ::deserialize, - ::processWithError, - ::beforeNegativeAcknowledgement, - ) - - val e = shouldThrow { processorWithError.start(concurrency) } - e.message shouldBe "Expected Error" - checkAllProcessedAreAcknowledged() - checkBeforeNegativeAcknowledged() - negativeAcknowledgedList shouldBe emptyList() - } - -// " An exception during deserialization triggers negative acknowledgment" { -// fun deserializeWitError(value: IntMessage) = DeserializedIntMessage(value).also { -// if (it.value.value == 100) throw Exception("Expected Exception") -// if (it.value.value == 200) throw Error("Expected Error") -// deserializedList.add(it.value.value) -// } -// -// val processorWithException = ConsumerSharedProcessor( -// Consumer(), -// ::deserializeWitError, -// ::process, -// ::beforeNegativeAcknowledgement, -// ) -// -// shouldThrow { processorWithException.start(concurrency) } -// checkAllProcessedAreAcknowledged() -// checkBeforeNegativeAcknowledged() -// negativeAcknowledgedList shouldBe listOf(100) -// } - - "An exception during processing triggers negative acknowledgment" { - fun processWithException(message: DeserializedIntMessage, publishTime: MillisInstant) { - if (message.value.value == 100) throw Exception("Expected Exception") - if (message.value.value == 200) throw Error("Expected Error") - processedList.add(message.value.value) - } - - val processorWithException = ConsumerSharedProcessor( - Consumer(), - ::deserialize, - ::processWithException, - ::beforeNegativeAcknowledgement, - ) - - shouldThrow { processorWithException.start(concurrency) } - checkAllProcessedAreAcknowledged() - checkBeforeNegativeAcknowledged() - negativeAcknowledgedList shouldBe listOf(100) - } - }, -) diff --git a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/ConsumerUniqueProcessorTests.kt b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/ConsumerUniqueProcessorTests.kt deleted file mode 100644 index 49dd541fa..000000000 --- a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/ConsumerUniqueProcessorTests.kt +++ /dev/null @@ -1,144 +0,0 @@ -/** - * "Commons Clause" License Condition v1.0 - * - * The Software is provided to you by the Licensor under the License, as defined below, subject to - * the following condition. - * - * Without limiting other conditions in the License, the grant of rights under the License will not - * include, and the License does not grant to you, the right to Sell the Software. - * - * For purposes of the foregoing, “Sell” means practicing any or all of the rights granted to you - * under the License to provide to third parties, for a fee or other consideration (including - * without limitation fees for hosting or consulting/ support services related to the Software), a - * product or service whose value derives, entirely or substantially, from the functionality of the - * Software. Any license notice or attribution required by the License must also include this - * Commons Clause License Condition notice. - * - * Software: Infinitic - * - * License: MIT License (https://opensource.org/licenses/MIT) - * - * Licensor: infinitic.io - */ -package io.infinitic.common.transport.consumers - -import io.infinitic.common.data.MillisInstant -import io.infinitic.common.fixtures.runAndCancel -import io.kotest.assertions.throwables.shouldThrow -import io.kotest.core.spec.style.StringSpec -import io.kotest.matchers.shouldBe - -internal class ConsumerUniqueProcessorTests : StringSpec( - { - val processor = ConsumerUniqueProcessor( - Consumer(), - ::deserialize, - ::process, - ::beforeNegativeAcknowledgement, - ) - - beforeEach { - receivedList.clear() - deserializedList.clear() - processedList.clear() - acknowledgedList.clear() - negativeAcknowledgedList.clear() - beforeNegativeAcknowledgedList.clear() - } - - "Processor throws CancellationException when current scope is canceled" { - runAndCancel { - processor.start() - } - } - - "On cancellation, all ongoing messages should be processed before closing" { - runAndCancel { - processor.start() - } - - processedList shouldBe acknowledgedList - } - - "An Error during deserialization throws" { - fun deserializeWitError(value: IntMessage) = DeserializedIntMessage(value).also { - if (it.value.value == 200) throw Error("Expected Error") - deserializedList.add(it.value.value) - } - - val processorWithError = ConsumerUniqueProcessor( - Consumer(), - ::deserializeWitError, - ::process, - ::beforeNegativeAcknowledgement, - ) - - val e = shouldThrow { processorWithError.start() } - e.message shouldBe "Expected Error" - acknowledgedList shouldBe (1..199).toList() - negativeAcknowledgedList shouldBe emptyList() - } - - "An Error during processing throws" { - fun processWithError(message: DeserializedIntMessage, publishTime: MillisInstant) { - if (message.value.value == 200) throw Error("Expected Error") - processedList.add(message.value.value) - } - - val processorWithError = ConsumerUniqueProcessor( - Consumer(), - ::deserialize, - ::processWithError, - ::beforeNegativeAcknowledgement, - ) - - val e = shouldThrow { processorWithError.start() } - e.message shouldBe "Expected Error" - acknowledgedList shouldBe (1..199).toList() - negativeAcknowledgedList shouldBe emptyList() - } - - "An exception during deserialization triggers negative acknowledgment" { - fun deserializeWitError(value: IntMessage) = DeserializedIntMessage(value).also { - if (it.value.value == 100) throw Exception("Expected Exception") - if (it.value.value == 200) throw Error("Expected Error") - deserializedList.add(it.value.value) - } - - val processorWithException = ConsumerUniqueProcessor( - Consumer(), - ::deserializeWitError, - ::process, - ::beforeNegativeAcknowledgement, - ) - - shouldThrow { processorWithException.start() } - acknowledgedList.size shouldBe 198 - deserializedList shouldBe acknowledgedList - negativeAcknowledgedList shouldBe listOf(100) - beforeNegativeAcknowledgedList shouldBe listOf(100) - } - - "An exception during processing triggers negative acknowledgment" { - fun processWithException(message: DeserializedIntMessage, publishTime: MillisInstant) { - if (message.value.value == 100) throw Exception("Expected Exception") - if (message.value.value == 200) throw Error("Expected Error") - processedList.add(message.value.value) - } - - val processorWithException = ConsumerUniqueProcessor( - Consumer(), - ::deserialize, - ::processWithException, - ::beforeNegativeAcknowledgement, - ) - - shouldThrow { processorWithException.start() } - deserializedList.remove(100) shouldBe true - deserializedList.remove(200) shouldBe true - deserializedList shouldBe acknowledgedList - negativeAcknowledgedList shouldBe listOf(100) - beforeNegativeAcknowledgedList shouldBe listOf(100) - } - }, -) diff --git a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumer/ProcessorConsumerTests.kt b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/ProcessorConsumerTests.kt similarity index 99% rename from infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumer/ProcessorConsumerTests.kt rename to infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/ProcessorConsumerTests.kt index 6f473a7a3..b3223baa3 100644 --- a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumer/ProcessorConsumerTests.kt +++ b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/ProcessorConsumerTests.kt @@ -20,7 +20,7 @@ * * Licensor: infinitic.io */ -package io.infinitic.common.transport.consumer +package io.infinitic.common.transport.consumers import io.infinitic.common.data.MillisInstant import io.infinitic.common.fixtures.later diff --git a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumer/ProcessorConsumerWithBatchTests.kt b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/ProcessorConsumerWithBatchTests.kt similarity index 99% rename from infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumer/ProcessorConsumerWithBatchTests.kt rename to infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/ProcessorConsumerWithBatchTests.kt index 434339391..38e57df11 100644 --- a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumer/ProcessorConsumerWithBatchTests.kt +++ b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/ProcessorConsumerWithBatchTests.kt @@ -20,7 +20,7 @@ * * Licensor: infinitic.io */ -package io.infinitic.common.transport.consumer +package io.infinitic.common.transport.consumers import io.infinitic.common.data.MillisInstant import io.infinitic.common.fixtures.later diff --git a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumer/StartBatchingTests.kt b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/StartBatchingTests.kt similarity index 98% rename from infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumer/StartBatchingTests.kt rename to infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/StartBatchingTests.kt index 43cb20db0..ea3598fa9 100644 --- a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumer/StartBatchingTests.kt +++ b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/StartBatchingTests.kt @@ -20,7 +20,7 @@ * * Licensor: infinitic.io */ -package io.infinitic.common.transport.consumer +package io.infinitic.common.transport.consumers import io.github.oshai.kotlinlogging.KotlinLogging import io.infinitic.common.fixtures.later diff --git a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumer/StartConsumingTests.kt b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/StartConsumingTests.kt similarity index 98% rename from infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumer/StartConsumingTests.kt rename to infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/StartConsumingTests.kt index 16cd26455..320b81a28 100644 --- a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumer/StartConsumingTests.kt +++ b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/StartConsumingTests.kt @@ -20,7 +20,7 @@ * * Licensor: infinitic.io */ -package io.infinitic.common.transport.consumer +package io.infinitic.common.transport.consumers import io.github.oshai.kotlinlogging.KotlinLogging import io.kotest.assertions.throwables.shouldNotThrowAny diff --git a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/fakes.kt b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/fakes.kt index d19c6b4ce..3a303986b 100644 --- a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/fakes.kt +++ b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/fakes.kt @@ -27,9 +27,9 @@ import io.infinitic.common.data.MillisInstant import io.infinitic.common.transport.BatchConfig import io.infinitic.common.transport.TransportConsumer import io.infinitic.common.transport.TransportMessage -import io.kotest.matchers.shouldBe import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.delay import kotlinx.coroutines.future.future import java.util.* @@ -55,52 +55,45 @@ internal val acknowledgedList = Collections.synchronizedList(mutableListOf( internal val negativeAcknowledgedList = Collections.synchronizedList(mutableListOf()) internal val beforeNegativeAcknowledgedList = Collections.synchronizedList(mutableListOf()) -fun checkAllProcessedAreAcknowledged() { - // no double in processed - processedList.toSet().size shouldBe processedList.size - // all processed are acknowledged - processedList.sorted() shouldBe acknowledgedList.sorted() -} - -fun checkBeforeNegativeAcknowledged() { - // no double negative acknowledging - negativeAcknowledgedList.toSet().size shouldBe negativeAcknowledgedList.size - // all negatively acknowledged have processed beforeNegativeAcknowledge - negativeAcknowledgedList.sorted() shouldBe beforeNegativeAcknowledgedList.sorted() -} +internal open class IntConsumer : TransportConsumer { + private val counter = AtomicInteger(0) -internal class Consumer : TransportConsumer { - val counter = AtomicInteger(0) + private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) - private val scope = CoroutineScope(Dispatchers.IO) + fun reset() { + counter.set(0) + } override fun receiveAsync() = scope.future { IntMessage(counter.incrementAndGet()) - }.thenApply { it.also { receivedList.add(it.value) } } - - override fun negativeAcknowledgeAsync(messages: List): CompletableFuture = - scope.future { delay(Random.nextLong(5)) } - .thenApply { negativeAcknowledgedList.addAll(messages.map { it.value }) } - - override fun acknowledgeAsync(messages: List): CompletableFuture = - scope.future { delay(Random.nextLong(5)) } - .thenApply { acknowledgedList.addAll(messages.map { it.value }) } + .also { receivedList.add(it.value) } + } override fun negativeAcknowledgeAsync(message: IntMessage): CompletableFuture = - scope.future { delay(Random.nextLong(5)) } - .thenApply { negativeAcknowledgedList.add(message.value) } + scope.future { + delay(Random.nextLong(5)) + .also { negativeAcknowledgedList.add(message.value) } + } override fun acknowledgeAsync(message: IntMessage): CompletableFuture = - scope.future { delay(Random.nextLong(5)) } - .thenApply { acknowledgedList.add(message.value) } + scope.future { + delay(Random.nextLong(5)) + .also { acknowledgedList.add(message.value) } + } } -internal fun deserialize(value: IntMessage) = DeserializedIntMessage(value).also { +internal suspend fun deserialize(value: IntMessage) = DeserializedIntMessage(value).also { + println("start deserializing...$value") + delay(Random.nextLong(5)) deserializedList.add(it.value.value) + println("end deserializing...$value") } -internal fun process(message: DeserializedIntMessage, publishTime: MillisInstant) { +internal suspend fun process(message: DeserializedIntMessage, publishTime: MillisInstant) { + println("start processing......${message.value.value}") + delay(Random.nextLong(100)) + println("end processing......${message.value.value}") processedList.add(message.value.value) } @@ -108,19 +101,19 @@ internal fun processBatch(batch: List, publishTimes: Lis processedList.addAll(batch.map { it.value.value }) } -internal fun assessBatching(value: DeserializedIntMessage): Result { +internal fun getBatchingConfig(value: DeserializedIntMessage): BatchConfig? { val i = value.value.value return when { - i == 0 -> null - (i % 2) == 0 -> BatchConfig("even", 20, MillisDuration(1000 * 3600 * 50)) - (i % 2) == 1 -> BatchConfig("odd", 20, MillisDuration(1000 * 3600 * 50)) + (i % 3) == 0 -> null + (i % 3) == 1 -> BatchConfig("1", 20, MillisDuration(1000 * 3600 * 50)) + (i % 3) == 2 -> BatchConfig("2", 20, MillisDuration(1000 * 3600 * 50)) else -> throw IllegalStateException() - }.let { Result.success(it) } + } } internal fun beforeNegativeAcknowledgement( message: IntMessage, - value: DeserializedIntMessage?, + deserialized: DeserializedIntMessage?, e: Exception ) { beforeNegativeAcknowledgedList.add(message.value) diff --git a/infinitic-task-executor/src/main/kotlin/io/infinitic/tasks/executor/TaskExecutor.kt b/infinitic-task-executor/src/main/kotlin/io/infinitic/tasks/executor/TaskExecutor.kt index 5f28bc70e..e15b7cd4d 100644 --- a/infinitic-task-executor/src/main/kotlin/io/infinitic/tasks/executor/TaskExecutor.kt +++ b/infinitic-task-executor/src/main/kotlin/io/infinitic/tasks/executor/TaskExecutor.kt @@ -102,17 +102,13 @@ class TaskExecutor( executeTasks.process() } - suspend fun getBatchConfig(msg: ServiceExecutorMessage): Result = + fun getBatchConfig(msg: ServiceExecutorMessage): BatchConfig? = when (msg) { is ExecuteTask -> msg.getBatchConfig() } - private suspend fun ExecuteTask.getBatchConfig(): Result = try { - Result.success(getInstanceAndMethod().second.getBatchConfig()) - } catch (e: Exception) { - sendTaskFailed(e, taskMeta) { "Error when retrieving the @Batch config for $this" } - Result.failure(e) - } + private fun ExecuteTask.getBatchConfig(): BatchConfig? = + getInstanceAndMethod().second.getBatchConfig() private data class TaskData( val instance: Any, diff --git a/infinitic-transport-inMemory/src/main/kotlin/io/infinitic/inMemory/InMemoryInfiniticConsumer.kt b/infinitic-transport-inMemory/src/main/kotlin/io/infinitic/inMemory/InMemoryInfiniticConsumer.kt index 7594b293b..2bd4b6bbf 100644 --- a/infinitic-transport-inMemory/src/main/kotlin/io/infinitic/inMemory/InMemoryInfiniticConsumer.kt +++ b/infinitic-transport-inMemory/src/main/kotlin/io/infinitic/inMemory/InMemoryInfiniticConsumer.kt @@ -32,8 +32,7 @@ import io.infinitic.common.transport.MainSubscription import io.infinitic.common.transport.Subscription import io.infinitic.common.transport.TransportConsumer import io.infinitic.common.transport.acceptDelayed -import io.infinitic.common.transport.consumers.ConsumerSharedProcessor -import io.infinitic.common.transport.consumers.ConsumerUniqueProcessor +import io.infinitic.common.transport.consumers.ProcessorConsumer import io.infinitic.inMemory.channels.DelayedMessage import io.infinitic.inMemory.channels.InMemoryChannels import io.infinitic.inMemory.consumers.InMemoryConsumer @@ -55,11 +54,11 @@ class InMemoryInfiniticConsumer( override suspend fun startAsync( subscription: Subscription, entity: String, - handler: suspend (S, MillisInstant) -> Unit, - beforeDlq: (suspend (S?, Exception) -> Unit)?, concurrency: Int, - getBatchConfig: (suspend (S) -> Result)?, - handlerBatch: (suspend (List, List) -> Unit)? + process: suspend (S, MillisInstant) -> Unit, + beforeDlq: (suspend (S?, Exception) -> Unit)?, + batchConfig: (suspend (S) -> BatchConfig?)?, + batchProcess: (suspend (List, List) -> Unit)? ): Job { val loggedDeserialize: suspend (InMemoryTransportMessage) -> S = { message -> @@ -71,7 +70,7 @@ class InMemoryInfiniticConsumer( val loggedHandler: suspend (S, MillisInstant) -> Unit = { message, publishTime -> logger.debug { "Processing $message" } - handler(message, publishTime) + process(message, publishTime) logger.trace { "Processed $message" } } @@ -86,21 +85,13 @@ class InMemoryInfiniticConsumer( return when (subscription.withKey) { true -> { // build the consumers synchronously (but in parallel) - val consumers = coroutineScope { + val consumers: List>> = coroutineScope { List(concurrency) { async { buildConsumer(it) } }.map { it.await() } } launch { - coroutineScope { - repeat(concurrency) { - launch { - ConsumerUniqueProcessor( - consumers[it], - loggedDeserialize, - loggedHandler, - null, - ).start() - } - } + repeat(concurrency) { + val processor = ProcessorConsumer, S>(consumers[it], null) + with(processor) { startAsync(1, loggedDeserialize, loggedHandler) } } } } @@ -108,15 +99,15 @@ class InMemoryInfiniticConsumer( false -> { // build the consumer synchronously val consumer = buildConsumer() - launch { - ConsumerSharedProcessor( - consumer, + val processor = ProcessorConsumer, S>(consumer, null) + with(processor) { + startAsync( + 1, loggedDeserialize, loggedHandler, - null, - getBatchConfig, - handlerBatch, - ).start(concurrency) + batchConfig, + batchProcess, + ) } } } diff --git a/infinitic-transport-inMemory/src/main/kotlin/io/infinitic/inMemory/consumers/InMemoryConsumer.kt b/infinitic-transport-inMemory/src/main/kotlin/io/infinitic/inMemory/consumers/InMemoryConsumer.kt index 087420580..fd54fe93b 100644 --- a/infinitic-transport-inMemory/src/main/kotlin/io/infinitic/inMemory/consumers/InMemoryConsumer.kt +++ b/infinitic-transport-inMemory/src/main/kotlin/io/infinitic/inMemory/consumers/InMemoryConsumer.kt @@ -41,15 +41,9 @@ class InMemoryConsumer( InMemoryTransportMessage(channel.receive()) } - override fun negativeAcknowledgeAsync(messages: List>): CompletableFuture = - scope.future {} - override fun negativeAcknowledgeAsync(message: InMemoryTransportMessage): CompletableFuture = scope.future {} - override fun acknowledgeAsync(messages: List>): CompletableFuture = - scope.future {} - override fun acknowledgeAsync(message: InMemoryTransportMessage): CompletableFuture = scope.future {} } @@ -66,15 +60,9 @@ class InMemoryDelayedConsumer( } } - override fun negativeAcknowledgeAsync(messages: List>): CompletableFuture = - scope.future {} - override fun negativeAcknowledgeAsync(message: InMemoryTransportMessage): CompletableFuture = scope.future {} - override fun acknowledgeAsync(messages: List>): CompletableFuture = - scope.future {} - override fun acknowledgeAsync(message: InMemoryTransportMessage): CompletableFuture = scope.future {} } diff --git a/infinitic-transport-pulsar/src/main/kotlin/io/infinitic/pulsar/PulsarInfiniticConsumer.kt b/infinitic-transport-pulsar/src/main/kotlin/io/infinitic/pulsar/PulsarInfiniticConsumer.kt index 35ac8714a..65cb3f68a 100644 --- a/infinitic-transport-pulsar/src/main/kotlin/io/infinitic/pulsar/PulsarInfiniticConsumer.kt +++ b/infinitic-transport-pulsar/src/main/kotlin/io/infinitic/pulsar/PulsarInfiniticConsumer.kt @@ -31,8 +31,7 @@ import io.infinitic.common.transport.EventListenerSubscription import io.infinitic.common.transport.InfiniticConsumer import io.infinitic.common.transport.MainSubscription import io.infinitic.common.transport.Subscription -import io.infinitic.common.transport.consumers.ConsumerSharedProcessor -import io.infinitic.common.transport.consumers.ConsumerUniqueProcessor +import io.infinitic.common.transport.consumers.ProcessorConsumer import io.infinitic.pulsar.client.InfiniticPulsarClient import io.infinitic.pulsar.config.PulsarConsumerConfig import io.infinitic.pulsar.consumers.PulsarConsumer @@ -61,11 +60,11 @@ class PulsarInfiniticConsumer( override suspend fun startAsync( subscription: Subscription, entity: String, - handler: suspend (S, MillisInstant) -> Unit, - beforeDlq: (suspend (S?, Exception) -> Unit)?, concurrency: Int, - getBatchConfig: (suspend (S) -> Result)?, - handlerBatch: (suspend (List, List) -> Unit)? + process: suspend (S, MillisInstant) -> Unit, + beforeDlq: (suspend (S?, Exception) -> Unit)?, + batchConfig: (suspend (S) -> BatchConfig?)?, + batchProcess: (suspend (List, List) -> Unit)? ): Job { // Retrieve the name of the topic and of the DLQ topic @@ -93,7 +92,7 @@ class PulsarInfiniticConsumer( val loggedHandler: suspend (S, MillisInstant) -> Unit = { message, publishTime -> logger.debug { "Processing $message" } - handler(message, publishTime) + process(message, publishTime) logger.trace { "Processed $message" } } @@ -130,34 +129,25 @@ class PulsarInfiniticConsumer( List(concurrency) { async { buildConsumer(it) } }.map { it.await() } } launch { - coroutineScope { - List(concurrency) { index -> - launch { - ConsumerUniqueProcessor( - consumers[index], - loggedDeserialize, - loggedHandler, - beforeNegativeAcknowledgement, - ).start() - } - } + List(concurrency) { index -> + val processor = ProcessorConsumer(consumers[index], beforeNegativeAcknowledgement) + with(processor) { startAsync(1, loggedDeserialize, loggedHandler) } } } } - false -> { - // build the consumer synchronously + // build the unique consumer synchronously val consumer = buildConsumer() - launch { - ConsumerSharedProcessor( - consumer, + val processor = ProcessorConsumer(consumer, beforeNegativeAcknowledgement) + with(processor) { + startAsync( + concurrency, loggedDeserialize, loggedHandler, - beforeNegativeAcknowledgement, - getBatchConfig, - handlerBatch, - ).start(concurrency) + batchConfig, + batchProcess, + ) } } } diff --git a/infinitic-transport-pulsar/src/main/kotlin/io/infinitic/pulsar/consumers/PulsarConsumer.kt b/infinitic-transport-pulsar/src/main/kotlin/io/infinitic/pulsar/consumers/PulsarConsumer.kt index 411c71144..d1e5757aa 100644 --- a/infinitic-transport-pulsar/src/main/kotlin/io/infinitic/pulsar/consumers/PulsarConsumer.kt +++ b/infinitic-transport-pulsar/src/main/kotlin/io/infinitic/pulsar/consumers/PulsarConsumer.kt @@ -31,19 +31,11 @@ class PulsarConsumer( override fun receiveAsync(): CompletableFuture> = pulsarConsumer.receiveAsync().thenApply { PulsarTransportMessage(it) } - override fun negativeAcknowledgeAsync(messages: List>): CompletableFuture { - pulsarConsumer.negativeAcknowledge(messages.toPulsarMessages()) - return CompletableFuture.completedFuture(Unit) - } - override fun negativeAcknowledgeAsync(message: PulsarTransportMessage): CompletableFuture { pulsarConsumer.negativeAcknowledge(message.toPulsarMessage()) return CompletableFuture.completedFuture(Unit) } - override fun acknowledgeAsync(messages: List>): CompletableFuture = - pulsarConsumer.acknowledgeAsync(messages.toPulsarMessages()).thenApply { } - override fun acknowledgeAsync(message: PulsarTransportMessage): CompletableFuture = pulsarConsumer.acknowledgeAsync(message.toPulsarMessage()).thenApply { } } diff --git a/infinitic-transport-pulsar/src/test/kotlin/io/infinitic/pulsar/PulsarInfiniticConsumerTests.kt b/infinitic-transport-pulsar/src/test/kotlin/io/infinitic/pulsar/PulsarInfiniticConsumerTests.kt index 7757d263c..13d29d510 100644 --- a/infinitic-transport-pulsar/src/test/kotlin/io/infinitic/pulsar/PulsarInfiniticConsumerTests.kt +++ b/infinitic-transport-pulsar/src/test/kotlin/io/infinitic/pulsar/PulsarInfiniticConsumerTests.kt @@ -99,9 +99,9 @@ class PulsarInfiniticConsumerTests : StringSpec( infiniticConsumer.start( subscription = MainSubscription(ClientTopic), entity = name, - handler = { _, _ -> }, - beforeDlq = { _, _ -> }, concurrency = 1, + process = { _, _ -> }, + beforeDlq = { _, _ -> }, ) } @@ -122,9 +122,9 @@ class PulsarInfiniticConsumerTests : StringSpec( infiniticConsumer.start( subscription = MainSubscription(WorkflowTagEngineTopic), entity = name, - handler = { _, _ -> }, - beforeDlq = { _, _ -> }, concurrency = 10, + process = { _, _ -> }, + beforeDlq = { _, _ -> }, ) } @@ -144,9 +144,9 @@ class PulsarInfiniticConsumerTests : StringSpec( infiniticConsumer.start( subscription = MainSubscription(WorkflowStateCmdTopic), entity = name, - handler = { _, _ -> }, - beforeDlq = { _, _ -> }, concurrency = 10, + process = { _, _ -> }, + beforeDlq = { _, _ -> }, ) } @@ -166,9 +166,9 @@ class PulsarInfiniticConsumerTests : StringSpec( infiniticConsumer.start( subscription = MainSubscription(WorkflowStateEngineTopic), entity = name, - handler = { _, _ -> }, - beforeDlq = { _, _ -> }, concurrency = 10, + process = { _, _ -> }, + beforeDlq = { _, _ -> }, ) } @@ -188,9 +188,9 @@ class PulsarInfiniticConsumerTests : StringSpec( infiniticConsumer.start( subscription = MainSubscription(WorkflowStateTimerTopic), entity = name, - handler = { _, _ -> }, - beforeDlq = { _, _ -> }, concurrency = 10, + process = { _, _ -> }, + beforeDlq = { _, _ -> }, ) } @@ -210,9 +210,9 @@ class PulsarInfiniticConsumerTests : StringSpec( infiniticConsumer.start( subscription = MainSubscription(WorkflowStateEventTopic), entity = name, - handler = { _, _ -> }, - beforeDlq = { _, _ -> }, concurrency = 10, + process = { _, _ -> }, + beforeDlq = { _, _ -> }, ) } @@ -232,9 +232,9 @@ class PulsarInfiniticConsumerTests : StringSpec( infiniticConsumer.start( subscription = MainSubscription(WorkflowExecutorTopic), entity = name, - handler = { _, _ -> }, - beforeDlq = { _, _ -> }, concurrency = 10, + process = { _, _ -> }, + beforeDlq = { _, _ -> }, ) } @@ -254,9 +254,9 @@ class PulsarInfiniticConsumerTests : StringSpec( infiniticConsumer.start( subscription = MainSubscription(WorkflowExecutorEventTopic), entity = name, - handler = { _, _ -> }, - beforeDlq = { _, _ -> }, concurrency = 10, + process = { _, _ -> }, + beforeDlq = { _, _ -> }, ) } @@ -276,9 +276,9 @@ class PulsarInfiniticConsumerTests : StringSpec( infiniticConsumer.start( subscription = MainSubscription(ServiceTagEngineTopic), entity = name, - handler = { _, _ -> }, - beforeDlq = { _, _ -> }, concurrency = 10, + process = { _, _ -> }, + beforeDlq = { _, _ -> }, ) } @@ -298,9 +298,9 @@ class PulsarInfiniticConsumerTests : StringSpec( infiniticConsumer.start( subscription = MainSubscription(ServiceExecutorTopic), entity = name, - handler = { _, _ -> }, - beforeDlq = { _, _ -> }, concurrency = 10, + process = { _, _ -> }, + beforeDlq = { _, _ -> }, ) } @@ -320,9 +320,9 @@ class PulsarInfiniticConsumerTests : StringSpec( infiniticConsumer.start( subscription = MainSubscription(ServiceExecutorEventTopic), entity = name, - handler = { _, _ -> }, - beforeDlq = { _, _ -> }, concurrency = 10, + process = { _, _ -> }, + beforeDlq = { _, _ -> }, ) } diff --git a/infinitic-transport-pulsar/src/test/kotlin/io/infinitic/pulsar/consumers/ConsumerTests.kt b/infinitic-transport-pulsar/src/test/kotlin/io/infinitic/pulsar/consumers/ConsumerTests.kt index bfac7109c..ba327a5d7 100644 --- a/infinitic-transport-pulsar/src/test/kotlin/io/infinitic/pulsar/consumers/ConsumerTests.kt +++ b/infinitic-transport-pulsar/src/test/kotlin/io/infinitic/pulsar/consumers/ConsumerTests.kt @@ -94,7 +94,7 @@ class ConsumerTests : StringSpec( coroutineScope { repeat(total) { launch { - val m = when (message) { + @Suppress("UNCHECKED_CAST") val m = when (message) { is DispatchWorkflow -> message.copy(workflowId = WorkflowId()) is ExecuteTask -> message.copy(taskId = TaskId()) else -> message @@ -147,7 +147,7 @@ class ConsumerTests : StringSpec( } } - consumer.start(subscription, entity, handler, null, 1) + consumer.start(subscription, entity, 1, handler, null) } } catch (e: CancellationException) { // do nothing @@ -191,7 +191,7 @@ class ConsumerTests : StringSpec( } } - consumer.start(subscription, entity, handler, null, 100) + consumer.start(subscription, entity, 100, handler, null) } } catch (e: CancellationException) { // do nothing @@ -235,7 +235,7 @@ class ConsumerTests : StringSpec( } } - consumer.start(subscription, entity, handler, null, 1) + consumer.start(subscription, entity, 1, handler, null) } } catch (e: CancellationException) { // do nothing @@ -280,7 +280,7 @@ class ConsumerTests : StringSpec( } } - consumer.start(subscription, entity, handler, null, 100) + consumer.start(subscription, entity, 100, handler, null) } } catch (e: CancellationException) { // do nothing diff --git a/infinitic-worker/src/main/kotlin/io/infinitic/workers/InfiniticWorker.kt b/infinitic-worker/src/main/kotlin/io/infinitic/workers/InfiniticWorker.kt index 1deeb5bf0..debb8b9b8 100644 --- a/infinitic-worker/src/main/kotlin/io/infinitic/workers/InfiniticWorker.kt +++ b/infinitic-worker/src/main/kotlin/io/infinitic/workers/InfiniticWorker.kt @@ -395,9 +395,9 @@ class InfiniticWorker( loggedConsumer.startAsync( subscription = MainSubscription(ServiceTagEngineTopic), entity = config.serviceName, - handler = handler, - beforeDlq = null, concurrency = config.concurrency, + process = handler, + beforeDlq = null, ) } } @@ -446,11 +446,11 @@ class InfiniticWorker( loggedConsumer.startAsync( subscription = MainSubscription(ServiceExecutorTopic), entity = config.serviceName, - handler = handler, - beforeDlq = beforeDlq, concurrency = config.concurrency, - getBatchConfig = taskExecutor::getBatchConfig, - handlerBatch = handlerBatch, + process = handler, + beforeDlq = beforeDlq, + batchConfig = taskExecutor::getBatchConfig, + batchProcess = handlerBatch, ) } @@ -465,9 +465,9 @@ class InfiniticWorker( loggedConsumer.startAsync( subscription = MainSubscription(ServiceExecutorRetryTopic), entity = config.serviceName, - handler = taskRetryHandler::handle, - beforeDlq = null, concurrency = config.concurrency, + process = taskRetryHandler::handle, + beforeDlq = null, ) } @@ -487,9 +487,9 @@ class InfiniticWorker( loggedConsumer.startAsync( subscription = MainSubscription(ServiceExecutorEventTopic), entity = config.serviceName, - handler = handler, - beforeDlq = null, concurrency = config.concurrency, + process = handler, + beforeDlq = null, ) } @@ -522,9 +522,9 @@ class InfiniticWorker( loggedConsumer.startAsync( subscription = MainSubscription(WorkflowTagEngineTopic), entity = config.workflowName, - handler = handler, - beforeDlq = null, concurrency = config.concurrency, + process = handler, + beforeDlq = null, ) } } @@ -554,9 +554,9 @@ class InfiniticWorker( loggedConsumer.startAsync( subscription = MainSubscription(WorkflowStateCmdTopic), entity = config.workflowName, - handler = handler, - beforeDlq = null, concurrency = config.concurrency, + process = handler, + beforeDlq = null, ) } @@ -580,9 +580,9 @@ class InfiniticWorker( loggedConsumer.startAsync( subscription = MainSubscription(WorkflowStateEngineTopic), entity = config.workflowName, - handler = handler, - beforeDlq = null, concurrency = config.concurrency, + process = handler, + beforeDlq = null, ) } @@ -598,9 +598,9 @@ class InfiniticWorker( loggedConsumer.startAsync( subscription = MainSubscription(WorkflowStateTimerTopic), entity = config.workflowName, - handler = workflowStateTimerHandler::handle, - beforeDlq = null, concurrency = config.concurrency, + process = workflowStateTimerHandler::handle, + beforeDlq = null, ) } @@ -621,9 +621,9 @@ class InfiniticWorker( loggedConsumer.startAsync( subscription = MainSubscription(WorkflowStateEventTopic), entity = config.workflowName, - handler = handler, - beforeDlq = null, concurrency = config.concurrency, + process = handler, + beforeDlq = null, ) } @@ -663,9 +663,9 @@ class InfiniticWorker( loggedConsumer.startAsync( subscription = MainSubscription(WorkflowExecutorTopic), entity = config.workflowName, - handler = handler, - beforeDlq = beforeDlq, concurrency = config.concurrency, + process = handler, + beforeDlq = beforeDlq, ) } @@ -680,9 +680,9 @@ class InfiniticWorker( loggedConsumer.startAsync( subscription = MainSubscription(WorkflowExecutorRetryTopic), entity = config.workflowName, - handler = taskRetryHandler::handle, - beforeDlq = null, concurrency = config.concurrency, + process = taskRetryHandler::handle, + beforeDlq = null, ) } @@ -703,9 +703,9 @@ class InfiniticWorker( loggedConsumer.startAsync( subscription = MainSubscription(WorkflowExecutorEventTopic), entity = config.workflowName, - handler = handler, - beforeDlq = null, concurrency = config.concurrency, + process = handler, + beforeDlq = null, ) } @@ -835,9 +835,9 @@ class InfiniticWorker( loggedConsumer.startAsync( subscription = subscriptionType.create(ServiceExecutorTopic, subscriptionName), entity = serviceName.toString(), - handler = handler, - beforeDlq = logMessageSentToDLQ, concurrency = concurrency, + process = handler, + beforeDlq = logMessageSentToDLQ, ) } // TASK-RETRY topic @@ -845,9 +845,9 @@ class InfiniticWorker( loggedConsumer.startAsync( subscription = subscriptionType.create(ServiceExecutorRetryTopic, subscriptionName), entity = serviceName.toString(), - handler = handler, - beforeDlq = logMessageSentToDLQ, concurrency = concurrency, + process = handler, + beforeDlq = logMessageSentToDLQ, ) } // TASK-EVENTS topic @@ -855,9 +855,9 @@ class InfiniticWorker( loggedConsumer.startAsync( subscription = subscriptionType.create(ServiceExecutorEventTopic, subscriptionName), entity = serviceName.toString(), - handler = handler, - beforeDlq = logMessageSentToDLQ, concurrency = concurrency, + process = handler, + beforeDlq = logMessageSentToDLQ, ) } @@ -878,9 +878,9 @@ class InfiniticWorker( loggedConsumer.startAsync( subscription = subscriptionType.create(WorkflowExecutorTopic, subscriptionName), entity = workflowName.toString(), - handler = handler, - beforeDlq = logMessageSentToDLQ, concurrency = concurrency, + process = handler, + beforeDlq = logMessageSentToDLQ, ) } // WORKFLOW-TASK-RETRY topic @@ -888,9 +888,9 @@ class InfiniticWorker( loggedConsumer.startAsync( subscription = subscriptionType.create(WorkflowExecutorRetryTopic, subscriptionName), entity = workflowName.toString(), - handler = handler, - beforeDlq = logMessageSentToDLQ, concurrency = concurrency, + process = handler, + beforeDlq = logMessageSentToDLQ, ) } // WORKFLOW-TASK-EVENTS topic @@ -898,9 +898,9 @@ class InfiniticWorker( loggedConsumer.startAsync( subscription = subscriptionType.create(WorkflowExecutorEventTopic, subscriptionName), entity = workflowName.toString(), - handler = handler, - beforeDlq = logMessageSentToDLQ, concurrency = concurrency, + process = handler, + beforeDlq = logMessageSentToDLQ, ) } return listOf(jobExecutor, jobRetry, jobEvents) @@ -918,9 +918,9 @@ class InfiniticWorker( consumer.startAsync( subscription = subscriptionType.create(WorkflowStateCmdTopic, subscriptionName), entity = workflowName.toString(), - handler = handler, - beforeDlq = logMessageSentToDLQ, concurrency = concurrency, + process = handler, + beforeDlq = logMessageSentToDLQ, ) } @@ -929,13 +929,13 @@ class InfiniticWorker( consumer.startAsync( subscription = subscriptionType.create(WorkflowStateEngineTopic, subscriptionName), entity = workflowName.toString(), - handler = { message: Message, publishedAt: MillisInstant -> + concurrency = concurrency, + process = { message: Message, publishedAt: MillisInstant -> // the event handler is not applied for WorkflowCmdMessage from clients // as the event has already been handled in the workflow-cmd topic if (message !is WorkflowStateEngineCmdMessage) handler(message, publishedAt) }, beforeDlq = logMessageSentToDLQ, - concurrency = concurrency, ) } @@ -944,9 +944,9 @@ class InfiniticWorker( consumer.startAsync( subscription = subscriptionType.create(WorkflowStateEventTopic, subscriptionName), entity = workflowName.toString(), - handler = handler, - beforeDlq = logMessageSentToDLQ, concurrency = concurrency, + process = handler, + beforeDlq = logMessageSentToDLQ, ) }