Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance test specifications and support classes for testing read models #157

Merged
merged 1 commit into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/ReactiveDomain.Foundation/StreamStore/ReadModelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,20 @@ protected virtual void Dispose(bool disposing)
}
_disposed = true;
}

/// <summary>
/// Applies a message synchronously to the read model while ensuring that the <see cref="ReaderLock"/>
/// is respected and bypasses both the queue and listeners. This is primarily useful in tests.
/// </summary>
/// <param name="message">The message to apply.</param>
public void DirectApply(IMessage message) { DequeueMessage(message); }
public void Handle(Message message) { ((IHandle<IMessage>)_queue).Handle(message); }
public void Handle(IMessage message) { ((IHandle<IMessage>)_queue).Handle(message); }
/// <summary>
/// Publishes a message onto the read model's internal queue.
/// This bypasses the Listeners while ensuring that the <see cref="ReaderLock"/>
/// is respected. All messages will be processed in order from the queue thread.
/// </summary>
/// <param name="message">The message to publish.</param>
public void Publish(IMessage message) { ((IPublisher)_queue).Publish(message); }
}
}
3 changes: 2 additions & 1 deletion src/ReactiveDomain.Testing/AssertEx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public static void IsOrBecomesTrue(Func<bool> func, int? timeout = null, string
var delay = 1;
while (true)
{
if (EvaluateAfterDelay(func, TimeSpan.FromMilliseconds(delay)))
if (func())
{
result = true;
break;
Expand All @@ -129,6 +129,7 @@ public static void IsOrBecomesTrue(Func<bool> func, int? timeout = null, string
delay = delay << 1;
}
delay = Math.Min(delay, endTime - now);
Thread.Sleep(delay);
}
Assert.True(result, msg ?? "");
}
Expand Down
170 changes: 119 additions & 51 deletions src/ReactiveDomain.Testing/EventStore/MockStreamStoreConnection.cs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
using ReactiveDomain.Foundation;
using ReactiveDomain.Messaging;
using System;

namespace ReactiveDomain.Testing
{
/// <summary>
/// An empty configured connection that produces null repositories, readers, etc.
/// Implements <see cref="IConfiguredConnection"/>.
/// </summary>
public class NullConfiguredConnection : IConfiguredConnection
{
/// <summary>
/// Gets a <see cref="NullConnection"/>.
/// </summary>
public IStreamStoreConnection Connection => new NullConnection();

/// <summary>
/// Gets a standard stream name builder
/// </summary>
public IStreamNameBuilder StreamNamer => new PrefixedCamelCaseStreamNameBuilder();

/// <summary>
/// Gets a default Json message serializer.
/// </summary>
public IEventSerializer Serializer => new JsonMessageSerializer();

/// <summary>
/// Gets a <see cref="NullRepository"/>.
/// </summary>
/// <param name="baseRepository">This parameter is ignored.</param>
/// <param name="caching">This parameter is ignored.</param>
/// <param name="currentPolicyUserId">This parameter is ignored.</param>
/// <returns>A <see cref="NullRepository"/>.</returns>
public ICorrelatedRepository GetCorrelatedRepository(
IRepository baseRepository = null,
bool caching = false,
Func<Guid> currentPolicyUserId = null)
{
return new NullRepository();
}

/// <summary>
/// Gets a <see cref="NullListener"/>.
/// </summary>
/// <param name="name">The name of the listener.</param>
/// <returns>A <see cref="NullListener"/></returns>
public IListener GetListener(string name)
{
return new NullListener(name);
}

/// <summary>
/// Gets a <see cref="NullListener"/>.
/// </summary>
/// <param name="name">The name of the listener.</param>
/// <returns>A <see cref="NullListener"/></returns>
public IListener GetQueuedListener(string name)
{
return new NullListener(name);
}

/// <summary>
/// Gets a <see cref="NullReader"/>.
/// </summary>
/// <param name="name">The name of the reader.</param>
/// <param name="handle">This parameter is ignored.</param>
/// <returns>A <see cref="NullListener"/></returns>
public IStreamReader GetReader(string name, Action<IMessage> handle)
{
return new NullReader(name);
}

/// <summary>
/// Gets a <see cref="NullRepository"/>.
/// </summary>
/// <param name="caching">This parameter is ignored.</param>
/// <param name="currentPolicyUserId">This parameter is ignored.</param>
/// <returns>A <see cref="NullRepository"/>.</returns>
public IRepository GetRepository(bool caching = false, Func<Guid> currentPolicyUserId = null)
{
return new NullRepository();
}
}
}
150 changes: 150 additions & 0 deletions src/ReactiveDomain.Testing/Specifications/NullConnection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
using ReactiveDomain.Util;
using System;

namespace ReactiveDomain.Testing
{
/// <summary>
/// An empty connection that implements <see cref="IStreamStoreConnection"/>.
/// </summary>
public class NullConnection : IStreamStoreConnection
{
/// <summary>
/// The name of the connection.
/// </summary>
public string ConnectionName => "NullConnection";

/// <summary>
/// Drops the events and returns a write result at the default version.
/// </summary>
/// <param name="stream">This parameter is ignored.</param>
/// <param name="expectedVersion">This parameter is ignored.</param>
/// <param name="credentials">This parameter is ignored.</param>
/// <param name="events">This parameter is ignored.</param>
/// <returns>A <see cref="WriteResult"/> at the default version.</returns>
public WriteResult AppendToStream(string stream, long expectedVersion, UserCredentials credentials = null, params EventData[] events)
{
return new WriteResult(0);
}

/// <summary>
/// Does nothing. Required for implementation of <see cref="IStreamStoreConnection"/>.
/// </summary>
public void Close() { }

/// <summary>
/// Does nothing. Required for implementation of <see cref="IStreamStoreConnection"/>.
/// </summary>
public void Connect() { }

/// <summary>
/// Does nothing. Required for implementation of <see cref="IStreamStoreConnection"/>.
/// </summary>
/// <param name="stream">This parameter is ignored.</param>
/// <param name="expectedVersion">This parameter is ignored.</param>
/// <param name="credentials">This parameter is ignored.</param>
public void DeleteStream(string stream, long expectedVersion, UserCredentials credentials = null)
{
}

/// <summary>
/// Cleans up resources.
/// </summary>
public void Dispose()
{
}

/// <summary>
/// Does nothing. Required for implementation of <see cref="IStreamStoreConnection"/>.
/// </summary>
/// <param name="stream">This parameter is ignored.</param>
/// <param name="expectedVersion">This parameter is ignored.</param>
/// <param name="credentials">This parameter is ignored.</param>
public void HardDeleteStream(string stream, long expectedVersion, UserCredentials credentials = null)
{
}

/// <summary>
/// Gets an empty stream slice.
/// </summary>
/// <param name="stream">This parameter is ignored.</param>
/// <param name="start">This parameter is ignored.</param>
/// <param name="count">This parameter is ignored.</param>
/// <param name="credentials">This parameter is ignored.</param>
/// <returns>An empty <see cref="StreamEventsSlice"/>.</returns>
public StreamEventsSlice ReadStreamBackward(string stream, long start, long count, UserCredentials credentials = null)
{
return new StreamEventsSlice(stream, 0, ReadDirection.Backward, Array.Empty<RecordedEvent>(), 0, 0, true);
}

/// <summary>
/// Gets an empty stream slice.
/// </summary>
/// <param name="stream">This parameter is ignored.</param>
/// <param name="start">This parameter is ignored.</param>
/// <param name="count">This parameter is ignored.</param>
/// <param name="credentials">This parameter is ignored.</param>
/// <returns>An empty <see cref="StreamEventsSlice"/>.</returns>
public StreamEventsSlice ReadStreamForward(string stream, long start, long count, UserCredentials credentials = null)
{
return new StreamEventsSlice(stream, 0, ReadDirection.Forward, Array.Empty<RecordedEvent>(), 0, 0, true);
}

/// <summary>
/// Does nothing. Required for implementation of <see cref="IStreamStoreConnection"/>.
/// </summary>
/// <param name="eventAppeared">This parameter is ignored.</param>
/// <param name="subscriptionDropped">This parameter is ignored.</param>
/// <param name="credentials">This parameter is ignored.</param>
/// <param name="resolveLinkTos">This parameter is ignored.</param>
/// <returns>This connection.</returns>
public IDisposable SubscribeToAll(Action<RecordedEvent> eventAppeared, Action<SubscriptionDropReason, Exception> subscriptionDropped = null, UserCredentials credentials = null, bool resolveLinkTos = true)
{
return this;
}

/// <summary>
/// Does nothing. Required for implementation of <see cref="IStreamStoreConnection"/>.
/// </summary>
/// <param name="from">This parameter is ignored.</param>
/// <param name="eventAppeared">This parameter is ignored.</param>
/// <param name="settings">This parameter is ignored.</param>
/// <param name="liveProcessingStarted">This parameter is ignored.</param>
/// <param name="subscriptionDropped">This parameter is ignored.</param>
/// <param name="credentials">This parameter is ignored.</param>
/// <param name="resolveLinkTos">This parameter is ignored.</param>
/// <returns>This connection.</returns>
public IDisposable SubscribeToAllFrom(Position from, Action<RecordedEvent> eventAppeared, CatchUpSubscriptionSettings settings = null, Action liveProcessingStarted = null, Action<SubscriptionDropReason, Exception> subscriptionDropped = null, UserCredentials credentials = null, bool resolveLinkTos = true)
{
return this;
}

/// <summary>
/// Does nothing. Required for implementation of <see cref="IStreamStoreConnection"/>.
/// </summary>
/// <param name="stream">This parameter is ignored.</param>
/// <param name="eventAppeared">This parameter is ignored.</param>
/// <param name="subscriptionDropped">This parameter is ignored.</param>
/// <param name="credentials">This parameter is ignored.</param>
/// <returns>This connection.</returns>
public IDisposable SubscribeToStream(string stream, Action<RecordedEvent> eventAppeared, Action<SubscriptionDropReason, Exception> subscriptionDropped = null, UserCredentials credentials = null)
{
return this;
}

/// <summary>
/// Does nothing. Required for implementation of <see cref="IStreamStoreConnection"/>.
/// </summary>
/// <param name="stream">This parameter is ignored.</param>
/// <param name="lastCheckpoint">This parameter is ignored.</param>
/// <param name="settings">This parameter is ignored.</param>
/// <param name="eventAppeared">This parameter is ignored.</param>
/// <param name="liveProcessingStarted">This parameter is ignored.</param>
/// <param name="subscriptionDropped">This parameter is ignored.</param>
/// <param name="credentials"></param>
/// <returns>This connection.</returns>
public IDisposable SubscribeToStreamFrom(string stream, long? lastCheckpoint, CatchUpSubscriptionSettings settings, Action<RecordedEvent> eventAppeared, Action<Unit> liveProcessingStarted = null, Action<SubscriptionDropReason, Exception> subscriptionDropped = null, UserCredentials credentials = null)
{
return this;
}
}
}
Loading