Skip to content

Commit

Permalink
[DEVEX-222] Refactored new AppendToStreamAsync method to have simplif…
Browse files Browse the repository at this point in the history
…ied syntax

Now they don't require all the fancy, but redundant in most cases Stream Positions, Directions, EventData etc.
  • Loading branch information
oskardudycz committed Feb 6, 2025
1 parent 2bbb89b commit c9f9305
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 91 deletions.
162 changes: 102 additions & 60 deletions src/Kurrent.Client/Streams/KurrentClient.Append.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Text;
using System.Threading.Channels;
using Google.Protobuf;
using EventStore.Client.Streams;
Expand All @@ -21,76 +20,42 @@ public partial class KurrentClient {
/// Appends events asynchronously to a stream.
/// </summary>
/// <param name="streamName">The name of the stream to append events to.</param>
/// <param name="expectedState">The expected <see cref="StreamState"/> of the stream to append to.</param>
/// <param name="events">Messages to append to the stream.</param>
/// <param name="configureOperationOptions">An <see cref="Action{KurrentClientOperationOptions}"/> to configure the operation's options.</param>
/// <param name="deadline"></param>
/// <param name="userCredentials">The <see cref="UserCredentials"/> for the operation.</param>
/// <param name="messages">Messages to append to the stream.</param>
/// <param name="options">Optional settings for the append operation, e.g. expected stream position for optimistic concurrency check</param>
/// <param name="cancellationToken">The optional <see cref="System.Threading.CancellationToken"/>.</param>
/// <returns></returns>
public Task<IWriteResult> AppendToStreamAsync(
string streamName,
StreamState expectedState,
IEnumerable<object> events,
// TODO: I don't like those numerous options, but I'd prefer to tackle that in a dedicated PR
Action<KurrentClientOperationOptions>? configureOperationOptions = null,
TimeSpan? deadline = null,
UserCredentials? userCredentials = null,
IEnumerable<Message> messages,
AppendToStreamOptions options,
CancellationToken cancellationToken = default
) {
var serializationContext = new MessageSerializationContext(
streamName,
Settings.Serialization.DefaultContentType
);
var eventsData = _messageSerializer.Serialize(events.Select(e => Message.From(e)), serializationContext);

return AppendToStreamAsync(
streamName,
expectedState,
eventsData,
configureOperationOptions,
deadline,
userCredentials,
cancellationToken
);
}

/// <summary>
/// Appends events asynchronously to a stream.
/// </summary>
/// <param name="streamName">The name of the stream to append events to.</param>
/// <param name="expectedRevision">The expected <see cref="StreamRevision"/> of the stream to append to.</param>
/// <param name="events">Messages to append to the stream.</param>
/// <param name="configureOperationOptions">An <see cref="Action{KurrentClientOperationOptions}"/> to configure the operation's options.</param>
/// <param name="deadline"></param>
/// <param name="userCredentials">The <see cref="UserCredentials"/> for the operation.</param>
/// <param name="cancellationToken">The optional <see cref="System.Threading.CancellationToken"/>.</param>
/// <returns></returns>
public Task<IWriteResult> AppendToStreamAsync(
string streamName,
StreamRevision expectedRevision,
IEnumerable<object> events,
// TODO: I don't like those numerous options, but I'd prefer to tackle that in a dedicated PR
Action<KurrentClientOperationOptions>? configureOperationOptions = null,
TimeSpan? deadline = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default
) {
var serializationContext = new MessageSerializationContext(
streamName,
Settings.Serialization.DefaultContentType
);
var eventsData = _messageSerializer.Serialize(events.Select(e => Message.From(e)), serializationContext);

return AppendToStreamAsync(
streamName,
expectedRevision,
eventsData,
configureOperationOptions,
deadline,
userCredentials,
cancellationToken
);
var eventsData = _messageSerializer.Serialize(messages, serializationContext);

return options.StreamRevision.HasValue
? AppendToStreamAsync(
streamName,
options.StreamRevision.Value,
eventsData,
options.ConfigureOperationOptions,
options.Deadline,
options.UserCredentials,
cancellationToken
)
: AppendToStreamAsync(
streamName,
options.StreamState ?? StreamState.Any,
eventsData,
options.ConfigureOperationOptions,
options.Deadline,
options.UserCredentials,
cancellationToken
);
}

/// <summary>
Expand Down Expand Up @@ -526,4 +491,81 @@ public void Dispose() {
}
}
}

public static class KurrentClientAppendToStreamExtensions {
/// <summary>
/// Appends events asynchronously to a stream.
/// </summary>
/// <param name="client"></param>
/// <param name="streamName">The name of the stream to append events to.</param>
/// <param name="messages">Messages to append to the stream.</param>
/// <param name="options">Optional settings for the append operation, e.g. expected stream position for optimistic concurrency check</param>
/// <param name="cancellationToken">The optional <see cref="System.Threading.CancellationToken"/>.</param>
/// <returns></returns>
public static Task<IWriteResult> AppendToStreamAsync(
this KurrentClient client,
string streamName,
IEnumerable<Message> messages,
CancellationToken cancellationToken = default
)
=> client.AppendToStreamAsync(
streamName,
messages,
new AppendToStreamOptions(),
cancellationToken
);

/// <summary>
/// Appends events asynchronously to a stream.
/// </summary>
/// <param name="client"></param>
/// <param name="streamName">The name of the stream to append events to.</param>
/// <param name="messages">Messages to append to the stream.</param>
/// <param name="cancellationToken">The optional <see cref="System.Threading.CancellationToken"/>.</param>
/// <returns></returns>
public static Task<IWriteResult> AppendToStreamAsync(
this KurrentClient client,
string streamName,
IEnumerable<object> messages,
CancellationToken cancellationToken = default
)
=> client.AppendToStreamAsync(
streamName,
messages.Select(m => Message.From(m)),
new AppendToStreamOptions(),
cancellationToken
);

/// <summary>
/// Appends events asynchronously to a stream.
/// </summary>
/// <param name="client"></param>
/// <param name="streamName">The name of the stream to append events to.</param>
/// <param name="messages">Messages to append to the stream.</param>
/// <param name="options">Optional settings for the append operation, e.g. expected stream position for optimistic concurrency check</param>
/// <param name="cancellationToken">The optional <see cref="System.Threading.CancellationToken"/>.</param>
/// <returns></returns>
public static Task<IWriteResult> AppendToStreamAsync(
this KurrentClient client,
string streamName,
IEnumerable<object> messages,
AppendToStreamOptions options,
CancellationToken cancellationToken = default
)
=> client.AppendToStreamAsync(
streamName,
messages.Select(m => Message.From(m)),
options,
cancellationToken
);
}

// TODO: In the follow up PR merge StreamState and StreamRevision into a one thing
public class AppendToStreamOptions {
public StreamState? StreamState { get; set; }
public StreamRevision? StreamRevision { get; set; }
public Action<KurrentClientOperationOptions>? ConfigureOperationOptions { get; set; }
public TimeSpan? Deadline { get; set; }
public UserCredentials? UserCredentials { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,7 @@ public async Task appends_with_revision_serializes_using_default_json_serializat

var events = GenerateEvents();

var writeResult = await Fixture.Streams.AppendToStreamAsync(
stream,
StreamRevision.None,
events
);
var writeResult = await Fixture.Streams.AppendToStreamAsync(stream, events);

Assert.Equal(new(0), writeResult.NextExpectedStreamRevision);

Expand All @@ -31,32 +27,7 @@ public async Task appends_with_revision_serializes_using_default_json_serializat
Assert.Equal(events.First(), resolvedEvent.DeserializedEvent);
}

[RetryFact]
public async Task appends_with_stream_state_serializes_using_default_json_serialization() {
var stream = $"{Fixture.GetStreamName()}_{StreamState.Any}";

var events = GenerateEvents();

var writeResult = await Fixture.Streams.AppendToStreamAsync(
stream,
StreamState.Any,
events
);

Assert.Equal(new(0), writeResult.NextExpectedStreamRevision);

var resolvedEvents = await Fixture.Streams.ReadStreamAsync(Direction.Forwards, stream, StreamPosition.Start)
.ToListAsync();

Assert.Single(resolvedEvents);

var resolvedEvent = resolvedEvents.Single();

Assert.NotNull(resolvedEvent.DeserializedEvent);
Assert.Equal(events.First(), resolvedEvent.DeserializedEvent);
}

private List<UserRegistered> GenerateEvents(int count = 1) =>
List<UserRegistered> GenerateEvents(int count = 1) =>
Enumerable.Range(0, count)
.Select(
_ => new UserRegistered(
Expand Down

0 comments on commit c9f9305

Please sign in to comment.