Skip to content

Commit

Permalink
Fix implementation of InMemoryChannels for multiple clients (#206)
Browse files Browse the repository at this point in the history
  • Loading branch information
geomagilles authored Dec 11, 2023
1 parent ea298f4 commit e0ee95e
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
package io.infinitic.inMemory

import io.infinitic.common.clients.messages.ClientMessage
import io.infinitic.common.data.ClientName
import io.infinitic.common.data.MillisDuration
import io.infinitic.common.messages.Message
import io.infinitic.common.tasks.data.ServiceName
Expand All @@ -37,7 +38,7 @@ import java.util.concurrent.ConcurrentHashMap
class InMemoryChannels : AutoCloseable {

override fun close() {
clientChannel.close()
clientChannels.values.forEach { it.close() }
taskTagChannels.values.forEach { it.close() }
workflowTagChannels.values.forEach { it.close() }
taskExecutorChannels.values.forEach { it.close() }
Expand All @@ -49,7 +50,8 @@ class InMemoryChannels : AutoCloseable {
}

// Client channel
private val clientChannel = Channel<ClientMessage>()
private val clientChannels =
ConcurrentHashMap<ClientName, Channel<ClientMessage>>()

// Channel for TaskTagMessages
private val taskTagChannels =
Expand Down Expand Up @@ -83,7 +85,8 @@ class InMemoryChannels : AutoCloseable {
private val delayedWorkflowTaskExecutorChannels =
ConcurrentHashMap<WorkflowName, Channel<DelayedMessage<TaskExecutorMessage>>>()

fun forClient() = clientChannel
fun forClient(clientName: ClientName): Channel<ClientMessage> =
clientChannels.getOrPut(clientName) { Channel(Channel.UNLIMITED) }

fun forTaskTag(serviceName: ServiceName): Channel<TaskTagMessage> =
taskTagChannels.getOrPut(serviceName) { Channel(Channel.UNLIMITED) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ class InMemoryInfiniticConsumer(private val channels: InMemoryChannels) : Infini
beforeDlq: (suspend (ClientMessage, Exception) -> Unit)?,
clientName: ClientName
): CompletableFuture<Unit> {
val channel = channels.forClient()
val channel = channels.forClient(clientName)
logger.info { "Channel ${channel.id}: Starting Client consumer for $clientName with concurrency = 1" }
return startAsync(handler, beforeDlq, channel)
return startAsync(handler, beforeDlq, channel, 1)
}

override fun startTaskTagConsumerAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,27 @@ class InMemoryInfiniticProducerAsync(private val channels: InMemoryChannels) :

// Coroutine scope used to receive messages
private val producingScope = CoroutineScope(Dispatchers.IO)

override var name = DEFAULT_NAME

override fun sendAsync(message: ClientMessage) = sendAsync(
override fun sendAsync(
message: ClientMessage
): CompletableFuture<Unit> = sendAsync(
message,
channels.forClient(),
channels.forClient(message.recipientName),
)

override fun sendAsync(message: WorkflowTagMessage) = sendAsync(
override fun sendAsync(
message: WorkflowTagMessage
): CompletableFuture<Unit> = sendAsync(
message,
channels.forWorkflowTag(message.workflowName),
)

override fun sendAsync(
message: WorkflowEngineMessage,
after: MillisDuration
) = when {
): CompletableFuture<Unit> = when {
after > 0 -> sendAsync(
DelayedMessage(message, after),
channels.forDelayedWorkflowEngine(message.workflowName),
Expand All @@ -72,15 +76,17 @@ class InMemoryInfiniticProducerAsync(private val channels: InMemoryChannels) :
)
}

override fun sendAsync(message: TaskTagMessage) = sendAsync(
override fun sendAsync(
message: TaskTagMessage
) = sendAsync(
message,
channels.forTaskTag(message.serviceName),
)

override fun sendAsync(
message: TaskExecutorMessage,
after: MillisDuration
) = when {
): CompletableFuture<Unit> = when {
message.isWorkflowTask() -> when (message) {
is ExecuteTask -> when {
after > 0 -> sendAsync(
Expand Down

0 comments on commit e0ee95e

Please sign in to comment.