diff --git a/Protos.cs b/Protos.cs
new file mode 100644
index 000000000..329e49dd9
--- /dev/null
+++ b/Protos.cs
@@ -0,0 +1,34 @@
+//
+// Generated by the protocol buffer compiler. DO NOT EDIT!
+// source: src/EventStore.Client.Common/protos
+//
+#pragma warning disable 1591, 0612, 3021
+#region Designer generated code
+
+using pb = global::Google.Protobuf;
+using pbc = global::Google.Protobuf.Collections;
+using pbr = global::Google.Protobuf.Reflection;
+using scg = global::System.Collections.Generic;
+/// Holder for reflection information generated from src/EventStore.Client.Common/protos
+public static partial class ProtosReflection {
+
+ #region Descriptor
+ /// File descriptor for src/EventStore.Client.Common/protos
+ public static pbr::FileDescriptor Descriptor {
+ get { return descriptor; }
+ }
+ private static pbr::FileDescriptor descriptor;
+
+ static ProtosReflection() {
+ byte[] descriptorData = global::System.Convert.FromBase64String(
+ string.Concat(
+ "CiNzcmMvRXZlbnRTdG9yZS5DbGllbnQuQ29tbW9uL3Byb3Rvcw=="));
+ descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData,
+ new pbr::FileDescriptor[] { },
+ new pbr::GeneratedClrTypeInfo(null, null, null));
+ }
+ #endregion
+
+}
+
+#endregion Designer generated code
diff --git a/src/EventStore.Client.Streams/EventStoreClient.Read.cs b/src/EventStore.Client.Streams/EventStoreClient.Read.cs
index 25635f26a..1e873ec87 100644
--- a/src/EventStore.Client.Streams/EventStoreClient.Read.cs
+++ b/src/EventStore.Client.Streams/EventStoreClient.Read.cs
@@ -7,7 +7,6 @@
using System.Threading.Tasks;
using EventStore.Client.Streams;
using Grpc.Core;
-using static EventStore.Client.Streams.ReadResp;
using static EventStore.Client.Streams.ReadResp.ContentOneofCase;
namespace EventStore.Client {
@@ -321,18 +320,11 @@ await _channel.Writer.WriteAsync(StreamMessage.Ok.Instance, linkedCancellationTo
}
}
- await _channel.Writer.WriteAsync(response.ContentCase switch {
- StreamNotFound => StreamMessage.NotFound.Instance,
- Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event)),
- ContentOneofCase.FirstStreamPosition => new StreamMessage.FirstStreamPosition(
- new StreamPosition(response.FirstStreamPosition)),
- ContentOneofCase.LastStreamPosition => new StreamMessage.LastStreamPosition(
- new StreamPosition(response.LastStreamPosition)),
- LastAllStreamPosition => new StreamMessage.LastAllStreamPosition(
- new Position(response.LastAllStreamPosition.CommitPosition,
- response.LastAllStreamPosition.PreparePosition)),
- _ => StreamMessage.Unknown.Instance
- }, linkedCancellationToken).ConfigureAwait(false);
+ var messageToWrite = ConvertResponseToMessage(response);
+ messageToWrite = messageToWrite.IsStreamReadMessage() ? messageToWrite : StreamMessage.Unknown.Instance;
+ await _channel.Writer
+ .WriteAsync(messageToWrite, linkedCancellationToken)
+ .ConfigureAwait(false);
}
_channel.Writer.Complete();
@@ -413,6 +405,24 @@ private static (SubscriptionConfirmation, Position?, ResolvedEvent)? ConvertToIt
_ => null
};
+ private static StreamMessage ConvertResponseToMessage(ReadResp response) =>
+ response.ContentCase switch {
+ Checkpoint => new StreamMessage.SubscriptionMessage.Checkpoint(
+ new Position(response.Checkpoint.CommitPosition, response.Checkpoint.PreparePosition)),
+ Confirmation => new StreamMessage.SubscriptionMessage.SubscriptionConfirmation(response.Confirmation
+ .SubscriptionId),
+ Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event)),
+ FirstStreamPosition => new StreamMessage.FirstStreamPosition(
+ new StreamPosition(response.FirstStreamPosition)),
+ LastAllStreamPosition => new StreamMessage.LastAllStreamPosition(
+ new Position(response.LastAllStreamPosition.CommitPosition,
+ response.LastAllStreamPosition.PreparePosition)),
+ LastStreamPosition => new StreamMessage.LastStreamPosition(
+ new StreamPosition(response.LastStreamPosition)),
+ StreamNotFound => StreamMessage.NotFound.Instance,
+ _ => StreamMessage.Unknown.Instance
+ };
+
private static ResolvedEvent ConvertToResolvedEvent(ReadResp.Types.ReadEvent readEvent) =>
new ResolvedEvent(
ConvertToEventRecord(readEvent.Event)!,
diff --git a/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs b/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs
index 47ea634f1..01d26393b 100644
--- a/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs
+++ b/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs
@@ -1,7 +1,12 @@
using System;
+using System.Collections.Generic;
using System.Threading;
+using System.Threading.Channels;
using System.Threading.Tasks;
using EventStore.Client.Streams;
+using Grpc.Core;
+using Microsoft.Extensions.Logging;
+using static EventStore.Client.SubscriptionState;
namespace EventStore.Client {
public partial class EventStoreClient {
@@ -33,6 +38,31 @@ public Task SubscribeToAllAsync(
}
}, userCredentials, cancellationToken), eventAppeared, subscriptionDropped, _log,
filterOptions?.CheckpointReached, cancellationToken);
+
+ ///
+ /// Subscribes to all events.
+ ///
+ /// A (exclusive of) to start the subscription from.
+ /// Whether to resolve LinkTo events automatically.
+ /// The optional to apply.
+ /// The optional user credentials to perform operation with.
+ /// The optional .
+ /// An instance of SubscriptionResult which contains current state of the subscription and an enumerator to consume messages
+ public SubscriptionResult SubscribeToAll(
+ FromAll start, bool resolveLinkTos = false, SubscriptionFilterOptions? filterOptions = null, UserCredentials? userCredentials = null, CancellationToken cancellationToken = default) {
+ return new SubscriptionResult(async _ => {
+ var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
+ return channelInfo.CallInvoker;
+ }, new ReadReq {
+ Options = new ReadReq.Types.Options {
+ ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards,
+ ResolveLinks = resolveLinkTos,
+ All = ReadReq.Types.Options.Types.AllOptions.FromSubscriptionPosition(start),
+ Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(),
+ Filter = GetFilterOptions(filterOptions)!
+ }
+ }, Settings, userCredentials, cancellationToken, _log);
+ }
///
/// Subscribes to a stream from a checkpoint.
@@ -60,5 +90,234 @@ public Task SubscribeToStreamAsync(string streamName,
}
}, userCredentials, cancellationToken), eventAppeared, subscriptionDropped, _log,
cancellationToken: cancellationToken);
+
+ ///
+ /// Subscribes to a stream from a checkpoint.
+ ///
+ /// A (exclusive of) to start the subscription from.
+ /// The name of the stream to read events from.
+ /// Whether to resolve LinkTo events automatically.
+ /// The optional user credentials to perform operation with.
+ /// The optional .
+ /// An instance of SubscriptionResult which contains current state of the subscription and an enumerator to consume messages
+ public SubscriptionResult SubscribeToStream(string streamName,
+ FromStream start, bool resolveLinkTos = false,
+ UserCredentials? userCredentials = null, CancellationToken cancellationToken = default) {
+ return new SubscriptionResult(async _ => {
+ var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
+ return channelInfo.CallInvoker;
+ }, new ReadReq {
+ Options = new ReadReq.Types.Options {
+ ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards,
+ ResolveLinks = resolveLinkTos,
+ Stream = ReadReq.Types.Options.Types.StreamOptions.FromSubscriptionPosition(streamName, start),
+ Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(),
+ }
+ }, Settings, userCredentials, cancellationToken, _log);
+ }
+
+
+
+ ///
+ /// A class which represents current subscription state and an enumerator to consume messages
+ ///
+ public class SubscriptionResult {
+ private readonly Channel _internalChannel;
+ private readonly CancellationTokenSource _cts;
+ private int _messagesEnumerated;
+ private ILogger _log;
+ ///
+ /// The name of the stream.
+ ///
+ public string StreamName { get; }
+
+ ///
+ ///
+ ///
+ public Position StreamPosition { get; private set; }
+
+ ///
+ /// Represents subscription ID for the current subscription
+ ///
+ public string? SubscriptionId { get; private set; }
+
+ ///
+ /// Current subscription state
+ ///
+
+ public SubscriptionState SubscriptionState {
+ get {
+ if (_exceptionInternal is not null) {
+ throw _exceptionInternal;
+ }
+
+ return _subscriptionStateInternal;
+ }
+ }
+
+ private volatile SubscriptionState _subscriptionStateInternal;
+
+ private volatile Exception? _exceptionInternal;
+
+ ///
+ /// An . Do not enumerate more than once.
+ ///
+ public IAsyncEnumerable Messages {
+ get {
+ return GetMessages();
+
+ async IAsyncEnumerable GetMessages() {
+ if (Interlocked.Exchange(ref _messagesEnumerated, 1) == 1) {
+ throw new InvalidOperationException("Messages may only be enumerated once.");
+ }
+
+ try {
+ await foreach (var message in _internalChannel.Reader.ReadAllAsync()
+ .ConfigureAwait(false)) {
+ if (!message.IsSubscriptionMessage()) {
+ continue;
+ }
+
+ switch (message) {
+ case StreamMessage.SubscriptionMessage.SubscriptionConfirmation(var
+ subscriptionId):
+ SubscriptionId = subscriptionId;
+ continue;
+ case StreamMessage.SubscriptionMessage.Checkpoint(var position):
+ StreamPosition = position;
+ break;
+ }
+
+ yield return message;
+ }
+ } finally {
+ Dispose();
+ }
+ }
+ }
+ }
+
+ ///
+ /// Terminates subscription
+ ///
+ public void Dispose() {
+ if (_subscriptionStateInternal == Disposed) {
+ return;
+ }
+ _subscriptionStateInternal = Disposed;
+ _cts.Cancel();
+ }
+
+ internal SubscriptionResult(Func> selectCallInvoker, ReadReq request,
+ EventStoreClientSettings settings, UserCredentials? userCredentials,
+ CancellationToken cancellationToken, ILogger log) {
+ Sanitize(request);
+ Validate(request);
+
+ _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+
+ var callOptions = EventStoreCallOptions.CreateStreaming(settings, userCredentials: userCredentials,
+ cancellationToken: _cts.Token);
+
+ _internalChannel = Channel.CreateBounded(new BoundedChannelOptions(1) {
+ SingleReader = true,
+ SingleWriter = true,
+ AllowSynchronousContinuations = true
+ });
+
+ _log = log;
+
+ StreamName = request.Options.All != null
+ ? SystemStreams.AllStream
+ : request.Options.Stream.StreamIdentifier!;
+
+ _subscriptionStateInternal = Initializing;
+
+ _ = PumpMessages(selectCallInvoker, request, callOptions);
+ }
+
+ async Task PumpMessages(Func> selectCallInvoker, ReadReq request, CallOptions callOptions) {
+ var firstMessageRead = false;
+ var callInvoker = await selectCallInvoker(_cts.Token).ConfigureAwait(false);
+ var streamsClient = new Streams.Streams.StreamsClient(callInvoker);
+ try {
+ using var call = streamsClient.Read(request, callOptions);
+
+ await foreach (var response in call.ResponseStream
+ .ReadAllAsync(_cts.Token)
+ .WithCancellation(_cts.Token)
+ .ConfigureAwait(false)) {
+ if (response is null) {
+ continue;
+ }
+
+ var message = ConvertResponseToMessage(response);
+ if (!firstMessageRead) {
+ firstMessageRead = true;
+
+ if (message is not StreamMessage.SubscriptionMessage.SubscriptionConfirmation) {
+ throw new InvalidOperationException(
+ $"Subscription to {StreamName} could not be confirmed.");
+ }
+
+ _subscriptionStateInternal = Ok;
+ }
+
+ var messageToWrite = message.IsSubscriptionMessage()
+ ? message
+ : StreamMessage.Unknown.Instance;
+ await _internalChannel.Writer.WriteAsync(messageToWrite, _cts.Token).ConfigureAwait(false);
+
+ if (messageToWrite is StreamMessage.NotFound) {
+ _exceptionInternal = new StreamNotFoundException(StreamName);
+ break;
+ }
+ }
+ } catch (RpcException ex) when (ex.Status.StatusCode == StatusCode.Cancelled &&
+ ex.Status.Detail.Contains("Call canceled by the client.")) {
+ _log.LogInformation(
+ "Subscription {subscriptionId} was dropped because cancellation was requested by the client.",
+ SubscriptionId);
+ } catch (Exception ex) {
+ if (ex is ObjectDisposedException or OperationCanceledException) {
+ _log.LogWarning(
+ ex,
+ "Subscription {subscriptionId} was dropped because cancellation was requested by another caller.",
+ SubscriptionId
+ );
+ } else {
+ _exceptionInternal = ex;
+ }
+ } finally {
+ _internalChannel.Writer.Complete();
+ }
+ }
+
+ private static void Sanitize(ReadReq request) {
+ if (request.Options.Filter == null) {
+ request.Options.NoFilter = new Empty();
+ }
+
+ request.Options.UuidOption = new ReadReq.Types.Options.Types.UUIDOption {Structured = new Empty()};
+ }
+
+ private static void Validate(ReadReq request) {
+ if (request.Options.CountOptionCase == ReadReq.Types.Options.CountOptionOneofCase.Count &&
+ request.Options.Count <= 0) {
+ throw new ArgumentOutOfRangeException("count");
+ }
+
+ var streamOptions = request.Options.Stream;
+ var allOptions = request.Options.All;
+
+ if (allOptions == null && streamOptions == null) {
+ throw new ArgumentException("No stream provided to subscribe");
+ }
+
+ if (allOptions != null && streamOptions != null) {
+ throw new ArgumentException($"Cannot subscribe both ${SystemStreams.AllStream}, and ${streamOptions.StreamIdentifier}");
+ }
+ }
+ }
}
}
diff --git a/src/EventStore.Client.Streams/StreamMessage.cs b/src/EventStore.Client.Streams/StreamMessage.cs
index f6b564e71..f26a19e9c 100644
--- a/src/EventStore.Client.Streams/StreamMessage.cs
+++ b/src/EventStore.Client.Streams/StreamMessage.cs
@@ -41,11 +41,44 @@ public record LastStreamPosition(StreamPosition StreamPosition) : StreamMessage;
/// The .
public record LastAllStreamPosition(Position Position) : StreamMessage;
+ ///
+ /// The base record of all subscription specific messages.
+ ///
+ public abstract record SubscriptionMessage : StreamMessage {
+
+ ///
+ /// A that represents a subscription confirmation.
+ ///
+ public record SubscriptionConfirmation(string SubscriptionId) : SubscriptionMessage;
+
+ ///
+ /// A representing position reached in subscribed stream. This message will only be received when subscribing to $all stream
+ ///
+ public record Checkpoint(Position Position) : SubscriptionMessage;
+ }
+
///
/// A that could not be identified, usually indicating a lower client compatibility level than the server supports.
///
public record Unknown : StreamMessage {
internal static readonly Unknown Instance = new();
}
+
+
+ ///
+ /// A test method that returns true if this message can be expected to be received when reading from stream; otherwise, this method returns false
+ ///
+ ///
+ public bool IsStreamReadMessage() {
+ return this is not SubscriptionMessage && this is not Ok && this is not Unknown;
+ }
+
+ ///
+ /// A test method that returns true if this message can be expected to be received when subscribing to a stream; otherwise, this method returns false
+ ///
+ ///
+ public bool IsSubscriptionMessage() {
+ return this is SubscriptionMessage || this is NotFound || this is Event;
+ }
}
}
diff --git a/src/EventStore.Client.Streams/SubscriptionState.cs b/src/EventStore.Client.Streams/SubscriptionState.cs
new file mode 100644
index 000000000..8d167887d
--- /dev/null
+++ b/src/EventStore.Client.Streams/SubscriptionState.cs
@@ -0,0 +1,19 @@
+namespace EventStore.Client {
+ ///
+ /// An enumeration representing the state of a subscription.
+ ///
+ public enum SubscriptionState {
+ ///
+ /// Subscription is initializing
+ ///
+ Initializing = 0,
+ ///
+ /// Subscription has been successfully established
+ ///
+ Ok,
+ ///
+ /// Subscription has been terminated
+ ///
+ Disposed
+ }
+}
diff --git a/test/EventStore.Client.Streams.Tests/Bugs/Issue_104.cs b/test/EventStore.Client.Streams.Tests/Bugs/Issue_104.cs
index 67815004d..9f0f07e22 100644
--- a/test/EventStore.Client.Streams.Tests/Bugs/Issue_104.cs
+++ b/test/EventStore.Client.Streams.Tests/Bugs/Issue_104.cs
@@ -1,11 +1,11 @@
namespace EventStore.Client.Streams.Tests.Bugs;
[Trait("Category", "Bug")]
-public class Issue_104(ITestOutputHelper output, EventStoreFixture fixture) : EventStoreTests(output, fixture) {
+public class Issue_104(ITestOutputHelper output, EventStoreFixture fixture) : EventStoreTests(output, fixture) {
[Fact]
- public async Task subscription_does_not_send_checkpoint_reached_after_disposal() {
- var streamName = Fixture.GetStreamName();
- var ignoredStreamName = $"ignore_{streamName}";
+ public async Task Callback_API_subscription_does_not_send_checkpoint_reached_after_disposal() {
+ var streamName = $"{Fixture.GetStreamName()}_{Guid.NewGuid()}";
+ var ignoredStreamName = $"ignore_{streamName}_{Guid.NewGuid()}";
var subscriptionDisposed = new TaskCompletionSource();
var eventAppeared = new TaskCompletionSource();
var checkpointReachAfterDisposed = new TaskCompletionSource();
@@ -49,4 +49,67 @@ await Fixture.Streams.AppendToStreamAsync(
var result = await Task.WhenAny(delay, checkpointReachAfterDisposed.Task);
result.ShouldBe(delay); // iow 300ms have passed without seeing checkpointReachAfterDisposed
}
-}
\ No newline at end of file
+
+ [Fact]
+ public async Task Iterator_API_subscription_does_not_send_checkpoint_reached_after_disposal() {
+ var streamName = $"{Fixture.GetStreamName()}_{Guid.NewGuid()}";
+ var ignoredStreamName = $"ignore_{streamName}_{Guid.NewGuid()}";
+ var subscriptionDisposed = new TaskCompletionSource();
+ var eventAppeared = new TaskCompletionSource();
+ var checkpointReachAfterDisposed = new TaskCompletionSource();
+
+ await Fixture.Streams.AppendToStreamAsync(streamName, StreamRevision.None, Fixture.CreateTestEvents());
+
+ var subscription = Fixture.Streams.SubscribeToAll(FromAll.Start,false, new SubscriptionFilterOptions(StreamFilter.Prefix(streamName)));
+
+ ReadMessages(subscription, _ => {
+ eventAppeared.TrySetResult(true);
+ return Task.CompletedTask;
+ }, _ => subscriptionDisposed.TrySetResult(true), _ => {
+ if (!subscriptionDisposed.Task.IsCompleted) {
+ return Task.CompletedTask;
+ }
+
+ checkpointReachAfterDisposed.TrySetResult(true);
+ return Task.CompletedTask;
+ });
+
+ await eventAppeared.Task;
+
+ subscription.Dispose();
+ await subscriptionDisposed.Task;
+
+ await Fixture.Streams.AppendToStreamAsync(ignoredStreamName, StreamRevision.None,
+ Fixture.CreateTestEvents(50));
+
+ var delay = Task.Delay(300);
+ var result = await Task.WhenAny(delay, checkpointReachAfterDisposed.Task);
+ Assert.Equal(delay, result); // iow 300ms have passed without seeing checkpointReachAfterDisposed
+ }
+
+ async void ReadMessages(EventStoreClient.SubscriptionResult subscription, Func eventAppeared, Action subscriptionDropped, Func checkpointReached) {
+ Exception? exception = null;
+ try {
+ await foreach (var message in subscription.Messages) {
+ if (message is StreamMessage.Event eventMessage) {
+ await eventAppeared(eventMessage.ResolvedEvent);
+ } else if (message is StreamMessage.SubscriptionMessage.Checkpoint checkpointMessage) {
+ await checkpointReached(checkpointMessage.Position);
+ }
+ }
+ } catch (Exception ex) {
+ exception = ex;
+ }
+
+ //allow some time for subscription cleanup and chance for exception to be raised
+ await Task.Delay(100);
+
+ try {
+ //subscription.SubscriptionState will throw exception if some problem occurred for the subscription
+ Assert.Equal(SubscriptionState.Disposed, subscription.SubscriptionState);
+ subscriptionDropped(exception);
+ } catch (Exception ex) {
+ subscriptionDropped(ex);
+ }
+ }
+}
diff --git a/test/EventStore.Client.Streams.Tests/Bugs/Issue_2544.cs b/test/EventStore.Client.Streams.Tests/Bugs/Issue_2544.cs
index 26f9dbd83..a031faa3a 100644
--- a/test/EventStore.Client.Streams.Tests/Bugs/Issue_2544.cs
+++ b/test/EventStore.Client.Streams.Tests/Bugs/Issue_2544.cs
@@ -27,8 +27,8 @@ public Issue_2544(ITestOutputHelper output, EventStoreFixture fixture) {
[Theory]
[MemberData(nameof(TestCases))]
- public async Task subscribe_to_stream(int iteration) {
- var streamName = $"{Fixture.GetStreamName()}_{iteration}";
+ public async Task Callback_subscribe_to_stream(int iteration) {
+ var streamName = $"{Fixture.GetStreamName()}_{iteration}_{Guid.NewGuid()}";
var startFrom = FromStream.Start;
async Task Subscribe() =>
@@ -48,10 +48,37 @@ await Fixture.Streams
await _completed.Task.WithTimeout();
}
+ [Theory, MemberData(nameof(TestCases))]
+ public async Task Iterator_subscribe_to_stream(int iteration) {
+ var streamName = $"{Fixture.GetStreamName()}_{iteration}_{Guid.NewGuid()}";
+ var startFrom = FromStream.Start;
+
+ var subscriptionResult = Fixture.Streams.SubscribeToStream(streamName, startFrom, resolveLinkTos: false);
+
+ await AppendEvents(streamName);
+
+ await foreach (var message in subscriptionResult.Messages) {
+ if (message is not StreamMessage.Event @event) continue;
+ var e = @event.ResolvedEvent;
+ if (e.OriginalStreamId != streamName) {
+ continue;
+ }
+
+ if (_seen[e.Event.EventNumber]) {
+ throw new Exception($"Event {e.Event.EventNumber} was already seen");
+ }
+
+ _seen[e.Event.EventNumber] = true;
+ if (e.Event.EventType == "completed") {
+ break;
+ }
+ }
+ }
+
[Theory]
[MemberData(nameof(TestCases))]
- public async Task subscribe_to_all(int iteration) {
- var streamName = $"{Fixture.GetStreamName()}_{iteration}";
+ public async Task Callback_subscribe_to_all(int iteration) {
+ var streamName = $"{Fixture.GetStreamName()}_{iteration}_{Guid.NewGuid()}";
var startFrom = FromAll.Start;
async Task Subscribe() =>
@@ -70,10 +97,37 @@ await Fixture.Streams
await _completed.Task.WithTimeout();
}
+ [Theory, MemberData(nameof(TestCases))]
+ public async Task Iterator_subscribe_to_all(int iteration) {
+ var streamName = $"{Fixture.GetStreamName()}_{iteration}_{Guid.NewGuid()}";
+ var startFrom = FromAll.Start;
+
+ var subscriptionResult = Fixture.Streams.SubscribeToAll(startFrom, resolveLinkTos: false);
+
+ await AppendEvents(streamName);
+
+ await foreach (var message in subscriptionResult.Messages) {
+ if (message is not StreamMessage.Event @event) continue;
+ var e = @event.ResolvedEvent;
+ if (e.OriginalStreamId != streamName) {
+ continue;
+ }
+
+ if (_seen[e.Event.EventNumber]) {
+ throw new Exception($"Event {e.Event.EventNumber} was already seen");
+ }
+
+ _seen[e.Event.EventNumber] = true;
+ if (e.Event.EventType == "completed") {
+ break;
+ }
+ }
+ }
+
[Theory]
[MemberData(nameof(TestCases))]
- public async Task subscribe_to_all_filtered(int iteration) {
- var streamName = $"{Fixture.GetStreamName()}_{iteration}";
+ public async Task Callback_subscribe_to_all_filtered(int iteration) {
+ var streamName = $"{Fixture.GetStreamName()}_{iteration}_{Guid.NewGuid()}";
var startFrom = FromAll.Start;
async Task Subscribe() =>
@@ -93,6 +147,35 @@ await Fixture.Streams
await _completed.Task.WithTimeout();
}
+ [Theory, MemberData(nameof(TestCases))]
+ public async Task Iterator_subscribe_to_all_filtered(int iteration) {
+ var streamName = $"{Fixture.GetStreamName()}_{iteration}_{Guid.NewGuid()}";
+ var startFrom = FromAll.Start;
+
+ var subscriptionResult = Fixture.Streams
+ .SubscribeToAll(startFrom, resolveLinkTos: false,
+ new SubscriptionFilterOptions(EventTypeFilter.ExcludeSystemEvents()));
+
+ await AppendEvents(streamName);
+
+ await foreach (var message in subscriptionResult.Messages) {
+ if (message is not StreamMessage.Event @event) continue;
+ var e = @event.ResolvedEvent;
+ if (e.OriginalStreamId != streamName) {
+ continue;
+ }
+
+ if (_seen[e.Event.EventNumber]) {
+ throw new Exception($"Event {e.Event.EventNumber} was already seen");
+ }
+
+ _seen[e.Event.EventNumber] = true;
+ if (e.Event.EventType == "completed") {
+ break;
+ }
+ }
+ }
+
async Task AppendEvents(string streamName) {
await Task.Delay(TimeSpan.FromMilliseconds(10));
@@ -168,4 +251,4 @@ Task EventAppeared(ResolvedEvent e, string streamName) {
return Task.CompletedTask;
}
-}
\ No newline at end of file
+}
diff --git a/test/EventStore.Client.Streams.Tests/Subscriptions/reconnection.cs b/test/EventStore.Client.Streams.Tests/Subscriptions/reconnection.cs
index df9162130..5362c7870 100644
--- a/test/EventStore.Client.Streams.Tests/Subscriptions/reconnection.cs
+++ b/test/EventStore.Client.Streams.Tests/Subscriptions/reconnection.cs
@@ -7,7 +7,7 @@ namespace EventStore.Client.Streams.Tests.Subscriptions;
public class @reconnection(ITestOutputHelper output, ReconnectionFixture fixture) : EventStoreTests(output, fixture) {
[Theory]
[InlineData(4, 1000, 0, 15000)]
- public async Task when_the_connection_is_lost(int expectedNumberOfEvents, int reconnectDelayMs, int serviceRestartDelayMs, int testTimeoutMs) {
+ public async Task Callback_when_the_connection_is_lost(int expectedNumberOfEvents, int reconnectDelayMs, int serviceRestartDelayMs, int testTimeoutMs) {
using var cancellator = new CancellationTokenSource().With(x => x.CancelAfter(testTimeoutMs));
var streamName = Fixture.GetStreamName();
@@ -84,9 +84,9 @@ public Task ConsumeEvents(
CancellationToken cancellationToken
) {
var receivedAllEvents = new TaskCompletionSource();
-
+
var receivedEventsCount = 0;
-
+
_ = SubscribeToStream(
streamName,
checkpoint: null,
@@ -106,7 +106,7 @@ Func OnReceive() {
Log.Information("Test complete. {ReceivedEventsCount}/{ExpectedNumberOfEvents} events received.", receivedEventsCount, expectedNumberOfEvents);
receivedAllEvents.TrySetResult();
}
-
+
return Task.CompletedTask;
};
}
@@ -173,4 +173,4 @@ CancellationToken cancellationToken
if (resubscribe) _ = SubscribeToStream(stream, checkpoint, onReceive, onDrop, cancellationToken);
}
}
-}
\ No newline at end of file
+}
diff --git a/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_all.cs b/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_all.cs
index 9ea4add2b..0851074dd 100644
--- a/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_all.cs
+++ b/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_all.cs
@@ -546,7 +546,7 @@ void OnDropped(StreamSubscription sub, SubscriptionDroppedReason reason, Excepti
}
[Fact]
- public async Task drops_when_disposed() {
+ public async Task Callback_drops_when_disposed() {
var subscriptionDropped = new TaskCompletionSource();
using var subscription = await Fixture.Streams
@@ -569,7 +569,7 @@ public async Task drops_when_disposed() {
}
[Fact]
- public async Task drops_when_subscriber_error() {
+ public async Task Callback_drops_when_subscriber_error() {
var expectedResult = SubscriptionDroppedResult.SubscriberError();
var subscriptionDropped = new TaskCompletionSource();
@@ -588,4 +588,283 @@ public async Task drops_when_subscriber_error() {
var result = await subscriptionDropped.Task.WithTimeout();
result.ShouldBe(expectedResult);
}
-}
\ No newline at end of file
+
+ [Fact]
+ public async Task Iterator_client_stops_reading_messages_when_subscription_disposed() {
+ var dropped = new TaskCompletionSource();
+
+ var subscription = Fixture.Streams.SubscribeToAll(FromAll.Start);
+ var testEvent = Fixture.CreateTestEvents(1).First();
+ ReadMessages(subscription, EventAppeared, SubscriptionDropped);
+
+ if (dropped.Task.IsCompleted) {
+ Assert.False(dropped.Task.IsCompleted, dropped.Task.Result?.ToString());
+ }
+
+ subscription.Dispose();
+
+ var ex = await dropped.Task.WithTimeout();
+ Assert.Null(ex);
+
+ // new event after subscription is disposed
+ await Fixture.Streams.AppendToStreamAsync($"test-{Guid.NewGuid()}", StreamState.NoStream, new[]{testEvent});
+
+ Task EventAppeared(ResolvedEvent e) {
+ return testEvent.EventId.Equals(e.OriginalEvent.EventId) ? Task.FromException(new Exception("Subscription not dropped")) : Task.CompletedTask;
+ }
+
+ void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex);
+ }
+
+ [Fact]
+ public async Task Callback_calls_subscription_dropped_when_error_processing_event() {
+ var stream = $"{Fixture.GetStreamName()}_{Guid.NewGuid()}";
+ var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception?)>();
+ var expectedException = new Exception("Error");
+
+ using var subscription = await Fixture.Streams.SubscribeToAllAsync(FromAll.Start,
+ EventAppeared, false, SubscriptionDropped)
+ .WithTimeout();
+
+ await Fixture.Streams.AppendToStreamAsync(stream, StreamState.NoStream, Fixture.CreateTestEvents());
+
+ var (reason, ex) = await dropped.Task.WithTimeout();
+
+ Assert.Equal(SubscriptionDroppedReason.SubscriberError, reason);
+ Assert.Same(expectedException, ex);
+
+ Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) =>
+ Task.FromException(expectedException);
+
+ void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception? ex) =>
+ dropped.SetResult((reason, ex));
+ }
+
+ [Fact]
+ public async Task Iterator_client_stops_reading_messages_when_error_processing_event() {
+ var stream = $"{Fixture.GetStreamName()}_{Guid.NewGuid()}";
+ var dropped = new TaskCompletionSource();
+ var expectedException = new Exception("Error");
+ int numTimesCalled = 0;
+
+ var subscription = Fixture.Streams.SubscribeToAll(FromAll.Start);
+ ReadMessages(subscription, EventAppeared, SubscriptionDropped);
+
+ await Fixture.Streams.AppendToStreamAsync(stream, StreamState.NoStream, Fixture.CreateTestEvents(2));
+
+ var ex = await dropped.Task.WithTimeout();
+ Assert.Same(expectedException, ex);
+
+ Assert.Equal(1, numTimesCalled);
+
+ Task EventAppeared(ResolvedEvent e) {
+ numTimesCalled++;
+ return Task.FromException(expectedException);
+ }
+
+ void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex);
+ }
+
+ [Fact]
+ public async Task Callback_subscribe_to_empty_database() {
+ var appeared = new TaskCompletionSource();
+ var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception?)>();
+
+ using var subscription = await Fixture.Streams.SubscribeToAllAsync(FromAll.Start,
+ EventAppeared, false, SubscriptionDropped)
+ .WithTimeout();
+
+ Assert.False(appeared.Task.IsCompleted);
+
+ if (dropped.Task.IsCompleted) {
+ Assert.False(dropped.Task.IsCompleted, dropped.Task.Result.ToString());
+ }
+
+ subscription.Dispose();
+
+ var (reason, ex) = await dropped.Task.WithTimeout();
+
+ Assert.Equal(SubscriptionDroppedReason.Disposed, reason);
+ Assert.Null(ex);
+
+ Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) {
+ if (!SystemStreams.IsSystemStream(e.OriginalStreamId)) {
+ appeared.TrySetResult(true);
+ }
+
+ return Task.CompletedTask;
+ }
+
+ void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception? ex) =>
+ dropped.SetResult((reason, ex));
+ }
+
+ [Fact]
+ public async Task Iterator_subscribe_to_empty_database() {
+ var appeared = new TaskCompletionSource();
+ var dropped = new TaskCompletionSource();
+
+ var subscription = Fixture.Streams.SubscribeToAll(FromAll.Start);
+ ReadMessages(subscription, EventAppeared, SubscriptionDropped);
+ Assert.False(appeared.Task.IsCompleted);
+
+ if (dropped.Task.IsCompleted) {
+ Assert.False(dropped.Task.IsCompleted, dropped.Task.Result?.ToString());
+ }
+
+ subscription.Dispose();
+
+ var ex = await dropped.Task.WithTimeout();
+ Assert.Null(ex);
+
+ Task EventAppeared(ResolvedEvent e) {
+ if (!SystemStreams.IsSystemStream(e.OriginalStreamId)) {
+ appeared.TrySetResult(true);
+ }
+
+ return Task.CompletedTask;
+ }
+
+ void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex);
+ }
+
+ [Fact]
+ public async Task Callback_reads_all_existing_events_and_keep_listening_to_new_ones() {
+ var appeared = new TaskCompletionSource();
+ var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception?)>();
+ var appearedEvents = new List();
+ var beforeEvents = Fixture.CreateTestEvents(10).ToArray();
+ var afterEvents = Fixture.CreateTestEvents(10).ToArray();
+
+ var allStreams = new List();
+
+ foreach (var @event in beforeEvents.Concat((afterEvents))) {
+ allStreams.Add($"stream-{@event.EventId:n}");
+ }
+
+ foreach (var @event in beforeEvents) {
+ await Fixture.Streams.AppendToStreamAsync($"stream-{@event.EventId:n}", StreamState.NoStream,
+ new[] {@event});
+ }
+
+ using var subscription = await Fixture.Streams.SubscribeToAllAsync(FromAll.Start,
+ EventAppeared, false, SubscriptionDropped)
+ .WithTimeout();
+
+ foreach (var @event in afterEvents) {
+ await Fixture.Streams.AppendToStreamAsync($"stream-{@event.EventId:n}", StreamState.NoStream,
+ new[] {@event});
+ }
+
+ await appeared.Task.WithTimeout();
+
+ Assert.Equal(beforeEvents.Concat(afterEvents).Select(x => x.EventId),
+ appearedEvents.Select(x => x.EventId));
+
+ if (dropped.Task.IsCompleted) {
+ Assert.False(dropped.Task.IsCompleted, dropped.Task.Result.ToString());
+ }
+
+ subscription.Dispose();
+
+ var (reason, ex) = await dropped.Task.WithTimeout();
+
+ Assert.Equal(SubscriptionDroppedReason.Disposed, reason);
+ Assert.Null(ex);
+
+ Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) {
+ if (allStreams.Contains(e.OriginalStreamId)) {
+ appearedEvents.Add(e.Event);
+
+ if (appearedEvents.Count >= beforeEvents.Length + afterEvents.Length) {
+ appeared.TrySetResult(true);
+ }
+ }
+
+ return Task.CompletedTask;
+ }
+
+ void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception? ex) =>
+ dropped.SetResult((reason, ex));
+ }
+
+ [Fact]
+ public async Task Iterator_reads_all_existing_events_and_keep_listening_to_new_ones() {
+ var appeared = new TaskCompletionSource();
+ var dropped = new TaskCompletionSource();
+ var appearedEvents = new List();
+ var beforeEvents = Fixture.CreateTestEvents(10).ToArray();
+ var afterEvents = Fixture.CreateTestEvents(10).ToArray();
+
+ var allStreams = new List();
+
+ foreach (var @event in beforeEvents.Concat((afterEvents))) {
+ allStreams.Add($"stream-{@event.EventId:n}");
+ }
+
+ foreach (var @event in beforeEvents) {
+ await Fixture.Streams.AppendToStreamAsync($"stream-{@event.EventId:n}", StreamState.NoStream,
+ new[] {@event});
+ }
+
+ var subscription = Fixture.Streams.SubscribeToAll(FromAll.Start);
+ ReadMessages(subscription, EventAppeared, SubscriptionDropped);
+
+ foreach (var @event in afterEvents) {
+ await Fixture.Streams.AppendToStreamAsync($"stream-{@event.EventId:n}", StreamState.NoStream,
+ new[] { @event });
+ }
+
+ await appeared.Task.WithTimeout();
+
+ Assert.Equal(beforeEvents.Concat(afterEvents).Select(x => x.EventId),
+ appearedEvents.Select(x => x.EventId));
+
+ if (dropped.Task.IsCompleted) {
+ Assert.False(dropped.Task.IsCompleted, dropped.Task.Result?.ToString());
+ }
+
+ subscription.Dispose();
+
+ var ex = await dropped.Task.WithTimeout();
+ Assert.Null(ex);
+
+ Task EventAppeared(ResolvedEvent e) {
+ if (allStreams.Contains(e.OriginalStreamId)) {
+ appearedEvents.Add(e.Event);
+
+ if (appearedEvents.Count >= beforeEvents.Length + afterEvents.Length) {
+ appeared.TrySetResult(true);
+ }
+ }
+
+ return Task.CompletedTask;
+ }
+
+ void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex);
+ }
+
+ async void ReadMessages(EventStoreClient.SubscriptionResult subscription, Func eventAppeared, Action subscriptionDropped) {
+ Exception? exception = null;
+ try {
+ await foreach (var message in subscription.Messages) {
+ if (message is StreamMessage.Event eventMessage) {
+ await eventAppeared(eventMessage.ResolvedEvent);
+ }
+ }
+ } catch (Exception ex) {
+ exception = ex;
+ }
+
+ //allow some time for subscription cleanup and chance for exception to be raised
+ await Task.Delay(100);
+
+ try {
+ //subscription.SubscriptionState will throw exception if some problem occurred for the subscription
+ Assert.Equal(SubscriptionState.Disposed, subscription.SubscriptionState);
+ subscriptionDropped(exception);
+ } catch (Exception ex) {
+ subscriptionDropped(ex);
+ }
+ }
+}
diff --git a/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_stream.cs b/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_stream.cs
index ca5dc122e..edc40561c 100644
--- a/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_stream.cs
+++ b/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_stream.cs
@@ -103,7 +103,7 @@ void OnDropped(StreamSubscription sub, SubscriptionDroppedReason reason, Excepti
}
[Fact]
- public async Task receives_all_events_from_non_existing_stream() {
+ public async Task Callback_receives_all_events_from_non_existing_stream() {
var streamName = Fixture.GetStreamName();
var receivedAllEvents = new TaskCompletionSource();
@@ -148,7 +148,35 @@ void OnDropped(StreamSubscription sub, SubscriptionDroppedReason reason, Excepti
}
[Fact]
- public async Task allow_multiple_subscriptions_to_same_stream() {
+ public async Task Iterator_receives_all_events_from_non_existing_stream() {
+ var stream = $"{Fixture.GetStreamName()}_{Guid.NewGuid()}";
+ var appeared = new TaskCompletionSource();
+ var dropped = new TaskCompletionSource();
+
+ var subscription = Fixture.Streams.SubscribeToStream(stream, FromStream.Start);
+ ReadMessages(subscription, EventAppeared, SubscriptionDropped);
+
+ Assert.False(appeared.Task.IsCompleted);
+
+ if (dropped.Task.IsCompleted) {
+ Assert.False(dropped.Task.IsCompleted, dropped.Task.Result?.ToString());
+ }
+
+ subscription.Dispose();
+
+ var ex = await dropped.Task.WithTimeout();
+ Assert.Null(ex);
+
+ Task EventAppeared(ResolvedEvent e) {
+ appeared.TrySetResult(true);
+ return Task.CompletedTask;
+ }
+
+ void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex);
+ }
+
+ [Fact]
+ public async Task Callback_allow_multiple_subscriptions_to_same_stream() {
var streamName = Fixture.GetStreamName();
var receivedAllEvents = new TaskCompletionSource();
@@ -180,7 +208,34 @@ Task OnReceived(StreamSubscription sub, ResolvedEvent re, CancellationToken ct)
}
[Fact]
- public async Task drops_when_disposed() {
+ public async Task Iterator_allow_multiple_subscriptions_to_same_stream() {
+ var stream = $"{Fixture.GetStreamName()}_{Guid.NewGuid()}";
+
+ var appeared = new TaskCompletionSource();
+
+ int appearedCount = 0;
+
+ await Fixture.Streams.AppendToStreamAsync(stream, StreamState.NoStream, Fixture.CreateTestEvents());
+
+ var s1 = Fixture.Streams.SubscribeToStream(stream, FromStream.Start);
+ ReadMessages(s1, EventAppeared, null);
+
+ var s2 = Fixture.Streams.SubscribeToStream(stream, FromStream.Start);
+ ReadMessages(s2, EventAppeared, null);
+
+ Assert.True(await appeared.Task.WithTimeout());
+
+ Task EventAppeared(ResolvedEvent e) {
+ if (++appearedCount == 2) {
+ appeared.TrySetResult(true);
+ }
+
+ return Task.CompletedTask;
+ }
+ }
+
+ [Fact]
+ public async Task Callback_drops_when_disposed() {
var streamName = Fixture.GetStreamName();
var subscriptionDropped = new TaskCompletionSource();
@@ -206,7 +261,35 @@ public async Task drops_when_disposed() {
}
[Fact]
- public async Task drops_when_subscriber_error() {
+ public async Task Iterator_drops_when_disposed() {
+ var stream = $"{Fixture.GetStreamName()}_{Guid.NewGuid()}";
+ var dropped = new TaskCompletionSource();
+
+ var subscription = Fixture.Streams.SubscribeToStream(stream, FromStream.Start);
+ var testEvent = Fixture.CreateTestEvents(1).First();
+ ReadMessages(subscription, EventAppeared, SubscriptionDropped);
+
+ if (dropped.Task.IsCompleted) {
+ Assert.False(dropped.Task.IsCompleted, dropped.Task.Result?.ToString());
+ }
+
+ subscription.Dispose();
+
+ var ex = await dropped.Task.WithTimeout();
+ Assert.Null(ex);
+
+ // new event after subscription is disposed
+ await Fixture.Streams.AppendToStreamAsync(stream, StreamState.Any, new[]{testEvent});
+
+ Task EventAppeared(ResolvedEvent e) {
+ return testEvent.EventId.Equals(e.OriginalEvent.EventId) ? Task.FromException(new Exception("Subscription not dropped")) : Task.CompletedTask;
+ }
+
+ void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex);
+ }
+
+ [Fact]
+ public async Task Callback_drops_when_subscriber_error() {
var streamName = Fixture.GetStreamName();
var expectedResult = SubscriptionDroppedResult.SubscriberError();
@@ -229,6 +312,31 @@ public async Task drops_when_subscriber_error() {
result.ShouldBe(expectedResult);
}
+ [Fact]
+ public async Task Iterator_drops_when_subscriber_error() {
+ var stream = $"{Fixture.GetStreamName()}_{Guid.NewGuid()}";
+ var dropped = new TaskCompletionSource();
+ var expectedException = new Exception("Error");
+ int numTimesCalled = 0;
+
+ var subscription = Fixture.Streams.SubscribeToStream(stream, FromStream.Start);
+ ReadMessages(subscription, EventAppeared, SubscriptionDropped);
+
+ await Fixture.Streams.AppendToStreamAsync(stream, StreamState.NoStream, Fixture.CreateTestEvents(2));
+
+ var ex = await dropped.Task.WithTimeout();
+ Assert.Same(expectedException, ex);
+
+ Assert.Equal(1, numTimesCalled);
+
+ Task EventAppeared(ResolvedEvent e) {
+ numTimesCalled++;
+ return Task.FromException(expectedException);
+ }
+
+ void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex);
+ }
+
[Fact]
public async Task drops_when_stream_tombstoned() {
var streamName = Fixture.GetStreamName();
@@ -299,4 +407,28 @@ Task OnReceived(StreamSubscription sub, ResolvedEvent re, CancellationToken ct)
void OnDropped(StreamSubscription sub, SubscriptionDroppedReason reason, Exception? ex) =>
subscriptionDropped.SetResult(new(reason, ex));
}
-}
\ No newline at end of file
+
+ async void ReadMessages(EventStoreClient.SubscriptionResult subscription, Func eventAppeared, Action? subscriptionDropped) {
+ Exception? exception = null;
+ try {
+ await foreach (var message in subscription.Messages) {
+ if (message is StreamMessage.Event eventMessage) {
+ await eventAppeared(eventMessage.ResolvedEvent);
+ }
+ }
+ } catch (Exception ex) {
+ exception = ex;
+ }
+
+ //allow some time for subscription cleanup and chance for exception to be raised
+ await Task.Delay(100);
+
+ try {
+ //subscription.SubscriptionState will throw exception if some problem occurred for the subscription
+ Assert.Equal(SubscriptionState.Disposed, subscription.SubscriptionState);
+ subscriptionDropped?.Invoke(exception);
+ } catch (Exception ex) {
+ subscriptionDropped?.Invoke(ex);
+ }
+ }
+}