Skip to content

Commit

Permalink
[Host.RabbitMq] Multiple consumers on same queue with varying concurr…
Browse files Browse the repository at this point in the history
…ency separated by routing key

Signed-off-by: Tomasz Maruszak <[email protected]>
  • Loading branch information
zarusz committed Dec 23, 2024
1 parent 8b19f25 commit f770702
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 76 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
namespace SlimMessageBus.Host;

/// <summary>
/// Decorator for <see cref="IMessageProcessor{TMessage}"> that increases the amount of messages being concurrently processed.
/// Decorator profor <see cref="IMessageProcessor{TMessage}"> that increases the amount of messages being concurrentlycessed.
/// The expectation is that <see cref="IMessageProcessor{TMessage}.ProcessMessage(TMessage)"/> will be executed synchronously (in sequential order) by the caller on which we want to increase amount of concurrent transportMessage being processed.
/// </summary>
/// <typeparam name="TMessage"></typeparam>
Expand Down Expand Up @@ -87,7 +87,7 @@ public TMessage GetMessageWithException()
/// <returns></returns>
public async Task WaitAll(CancellationToken cancellationToken)
{
while (_pendingCount > 0)
while (_pendingCount > 0 && !cancellationToken.IsCancellationRequested)
{
await Task.Delay(200, cancellationToken).ConfigureAwait(false);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
namespace SlimMessageBus.Host.Test.Consumer;

public class ConcurrentMessageProcessorDecoratorTest
{
private readonly MessageBusMock _busMock;
private readonly Mock<IMessageProcessor<SomeMessage>> _messageProcessorMock;

public ConcurrentMessageProcessorDecoratorTest()
{
_busMock = new MessageBusMock();
_messageProcessorMock = new Mock<IMessageProcessor<SomeMessage>>();
}

[Fact]
public void When_Dispose_Then_CallsDisposeOnTarget()
{
// arrange
var targetDisposableMock = _messageProcessorMock.As<IDisposable>();
var subject = new ConcurrentMessageProcessorDecorator<SomeMessage>(1, NullLoggerFactory.Instance, _messageProcessorMock.Object);

// act
subject.Dispose();

// assert
targetDisposableMock.Verify(x => x.Dispose(), Times.Once);
}

[Theory]
[InlineData(false)]
[InlineData(true)]
public async Task When_WaitAll_Then_WaitsOnAllPendingMessageProcessToFinish(bool cancelAwait)
{
// arrange
_messageProcessorMock
.Setup(x => x.ProcessMessage(
It.IsAny<SomeMessage>(),
It.IsAny<IReadOnlyDictionary<string, object>>(),
It.IsAny<IDictionary<string, object>>(),
It.IsAny<IServiceProvider>(),
It.IsAny<CancellationToken>()))
.Returns(async () =>
{
await Task.Delay(TimeSpan.FromSeconds(1));
return new(null, null, null);
});

var subject = new ConcurrentMessageProcessorDecorator<SomeMessage>(1, NullLoggerFactory.Instance, _messageProcessorMock.Object);

await subject.ProcessMessage(new SomeMessage(), new Dictionary<string, object>(), default);

using var cts = new CancellationTokenSource();

if (cancelAwait)
{
cts.CancelAfter(TimeSpan.FromMilliseconds(100));
}

// act
var waitAll = () => subject.WaitAll(cts.Token);

// assert
if (cancelAwait)
{
await waitAll.Should().ThrowAsync<TaskCanceledException>();
}
else
{
await waitAll.Should().NotThrowAsync();
}
_messageProcessorMock
.Verify(x => x.ProcessMessage(
It.IsAny<SomeMessage>(),
It.IsAny<IReadOnlyDictionary<string, object>>(),
It.IsAny<IDictionary<string, object>>(),
It.IsAny<IServiceProvider>(),
It.IsAny<CancellationToken>()),
cancelAwait ? Times.Once : Times.Once);
}

[Theory]
[InlineData(10, 40)]
[InlineData(2, 40)]
public async Task When_ProcessMessage_Given_NMessagesAndConcurrencySetToC_Then_NMethodInvocationsHappenOnTargetWithCConcurrently(int concurrency, int expectedMessageCount)
{
// arrange
var subject = new ConcurrentMessageProcessorDecorator<SomeMessage>(concurrency, NullLoggerFactory.Instance, _messageProcessorMock.Object);

var currentSectionCount = 0;
var maxSectionCount = 0;
var maxSectionCountLock = new object();
var messageCount = 0;

_messageProcessorMock
.Setup(x => x.ProcessMessage(It.IsAny<SomeMessage>(), It.IsAny<IReadOnlyDictionary<string, object>>(), It.IsAny<IDictionary<string, object>>(), It.IsAny<IServiceProvider>(), It.IsAny<CancellationToken>()))
.Returns(async () =>
{
// Entering critical section
Interlocked.Increment(ref currentSectionCount);

// Simulate work
await Task.Delay(50);

Interlocked.Increment(ref messageCount);

lock (maxSectionCountLock)
{
if (currentSectionCount > maxSectionCount)
{
maxSectionCount = currentSectionCount;
}
}

// Simulate work
await Task.Delay(50);

// Leaving critical section
Interlocked.Decrement(ref currentSectionCount);
return new(null, null, null);
});

// act
var msg = new SomeMessage();
var msgHeaders = new Dictionary<string, object>();
for (var i = 0; i < expectedMessageCount; i++)
{
// executed in sequence
await subject.ProcessMessage(msg, msgHeaders, default);
}

// assert
while (subject.PendingCount > 0)
{
await Task.Delay(100);
}

messageCount.Should().Be(expectedMessageCount);
maxSectionCount.Should().Be(concurrency);
}
}

0 comments on commit f770702

Please sign in to comment.