Skip to content

Commit

Permalink
add test queue version and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
condron committed Jun 24, 2024
1 parent f245057 commit e0fed2d
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 7 deletions.
22 changes: 15 additions & 7 deletions src/ReactiveDomain.Testing/Specifications/TestQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ public sealed class TestQueue : IHandle<IMessage>, IDisposable
private readonly HashSet<Type> _handledTypes = new HashSet<Type>();
private readonly bool _trackTypes;
private readonly ConcurrentDictionary<Guid, ManualResetEventSlim> _idWatchList = new ConcurrentDictionary<Guid, ManualResetEventSlim>();

/// <summary>
/// The queue of messages.
/// </summary>
public ConcurrentMessageQueue<IMessage> Messages { get; }

private long _cleaning = 0;
private long _queueVersion = 0; //queue version is incremented on each clean
private readonly IDisposable _subscription;

/// <summary>
Expand Down Expand Up @@ -64,14 +64,16 @@ public void Dispose()
_subscription?.Dispose();
Clear();
}
private bool EnsureReady() {
private bool EnsureReady()
{
if (_disposed) { throw new ObjectDisposedException(nameof(TestQueue)); }
if (Interlocked.Read(ref _cleaning) != 0)
{
SpinWait.SpinUntil(() => Interlocked.Read(ref _cleaning) == 0, 250);
}
if (Interlocked.Read(ref _cleaning) != 0) {
throw new TimeoutException("Test Queue not ready, still clearing queues");
if (Interlocked.Read(ref _cleaning) != 0)
{
throw new TimeoutException("Test Queue not ready, timeout clearing queues");
}
return true;
}
Expand All @@ -80,7 +82,7 @@ public void Handle(IMessage message)
EnsureReady();
var msgType = message.GetType();
if (_isFiltered && !MessageTypeFilter.Any(t => t.IsAssignableFrom(msgType))) { return; }

Messages.Enqueue(message);

if (_trackTypes) { _handledTypes.Add(msgType); }
Expand All @@ -98,6 +100,7 @@ public void Clear()
{
try
{
Interlocked.Increment(ref _queueVersion);
Interlocked.Exchange(ref _cleaning, 1); //It's ok to clean an extra message on the race condition
while (!Messages.IsEmpty)
Messages.TryDequeue(out var _);
Expand Down Expand Up @@ -128,17 +131,19 @@ public void WaitFor<T>(TimeSpan timeout) where T : IMessage
public void WaitForMultiple<T>(uint num, TimeSpan timeout) where T : IMessage
{
EnsureReady();
var version = Interlocked.Read(ref _queueVersion);
if (!_trackTypes) { throw new InvalidOperationException("Type tracking is disabled for this instance."); }

var startTime = Environment.TickCount; //returns MS since machine start
var endTime = startTime + (int)timeout.TotalMilliseconds;

var delay = 1;
//Evaluating the entire queue is a bit heavy, but is required to support waiting on base types, interfaces, etc.
//calling ToList allows the concurrent queue to handle grabbing a snapshot of the collection
while (Messages.ToList().Count(x => x is T) < num)
{
if (_disposed) { throw new ObjectDisposedException(nameof(TestQueue)); }

if (Interlocked.Read(ref _queueVersion) != version) { throw new InvalidOperationException("Test queue Cleared!");}
var now = Environment.TickCount;
if ((endTime - now) <= 0) { throw new TimeoutException(); }

Expand All @@ -155,6 +160,7 @@ public void WaitForMultiple<T>(uint num, TimeSpan timeout) where T : IMessage
public void WaitForMsgId(Guid id, TimeSpan timeout)
{
EnsureReady();
var version = Interlocked.Read(ref _queueVersion);
var deadline = DateTime.Now + timeout;
try
{
Expand All @@ -175,10 +181,12 @@ public void WaitForMsgId(Guid id, TimeSpan timeout)

if (Messages.ToArray().Any(m => m.MsgId == id)) { waithandle.Set(); }

//wait here to see if the message handler triggers the wait handle we added
while (!waithandle.Wait(10))
{
if (DateTime.Now > deadline) { throw new TimeoutException($"Msg with ID {id} failed to arrive within {timeout}."); }
if (_disposed) { throw new ObjectDisposedException(nameof(TestQueue)); }
if (Interlocked.Read(ref _queueVersion) != version) { throw new InvalidOperationException("Test queue Cleared!"); }
}
}
}
Expand Down
22 changes: 22 additions & 0 deletions src/ReactiveDomain.Testing/Specifications/TestQueueTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,28 @@ public void can_timeout_waiting_for_message_of_type()
tq.AssertEmpty();
}
}
[Fact]
public void waiting_for_message_throws_on_clear()
{
using (var tq = new TestQueue(_dispatcher, new[] { typeof(Event), typeof(Command) }))
{
Task.Delay(5).ContinueWith(t => tq.Clear());
// Don't publish anything
Assert.Throws<InvalidOperationException>(() => tq.WaitFor<TestEvent>(TimeSpan.FromMilliseconds(100)));
tq.AssertEmpty();
}
}
[Fact]
public void waiting_for_id_throws_on_clear()
{
using (var tq = new TestQueue(_dispatcher, new[] { typeof(Event), typeof(Command) }))
{
Task.Delay(5).ContinueWith(t => tq.Clear());
// Don't publish anything
Assert.Throws<InvalidOperationException>(() => tq.WaitForMsgId(Guid.NewGuid(),TimeSpan.FromMilliseconds(100)));
tq.AssertEmpty();
}
}

[Fact]
public void can_wait_for_a_specific_message()
Expand Down

0 comments on commit e0fed2d

Please sign in to comment.