Skip to content

Commit

Permalink
[DEVEX-222] Added serialization type and merged serialization into Se…
Browse files Browse the repository at this point in the history
…rialization Context

Previously we had DeserializationContext, but after consideration, it'll be better to merge those concepts to make easier customizations per operations (e.g. append, subscribe, etc.).
  • Loading branch information
oskardudycz committed Jan 29, 2025
1 parent cea88dc commit 4992546
Show file tree
Hide file tree
Showing 12 changed files with 102 additions and 83 deletions.
13 changes: 10 additions & 3 deletions src/Kurrent.Client/Core/KurrentClientSerializationSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,17 @@ public enum AutomaticDeserialization {
Enabled = 1
}

public class KurrentClientSerializationSettings {
public ISerializer? JsonSerializer { get; set; }
public ISerializer? BytesSerializer { get; set; }
public enum SerializationType {
Json = 1,
// Protobuf = 2,
// Avro = 3,
Bytes = 4
}

public class KurrentClientSerializationSettings {
public ISerializer? JsonSerializer { get; set; }
public ISerializer? BytesSerializer { get; set; }
public SerializationType DefaultSerializationType { get; set; } = SerializationType.Json;
public AutomaticDeserialization AutomaticDeserialization { get; set; } = AutomaticDeserialization.Disabled;

public static KurrentClientSerializationSettings Default(
Expand Down
4 changes: 2 additions & 2 deletions src/Kurrent.Client/Core/ResolvedEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ public static ResolvedEvent From(
EventRecord @event,
EventRecord? link,
ulong? commitPosition,
DeserializationContext deserializationContext
SerializationContext serializationContext
) {
var originalEvent = link ?? @event;
return deserializationContext.TryDeserialize(originalEvent, out var deserialized)
return serializationContext.TryDeserialize(originalEvent, out var deserialized)
? new ResolvedEvent(@event, link, deserialized, commitPosition)
: new ResolvedEvent(@event, link, commitPosition);
}
Expand Down
27 changes: 0 additions & 27 deletions src/Kurrent.Client/Core/Serialization/DeserializationContext.cs

This file was deleted.

56 changes: 56 additions & 0 deletions src/Kurrent.Client/Core/Serialization/SerializationContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using System.Diagnostics.CodeAnalysis;
using EventStore.Client;

namespace Kurrent.Client.Core.Serialization;

public record SerializationContext(
SchemaRegistry SchemaRegistry,
SerializationType DefaultSerializationType,
AutomaticDeserialization AutomaticDeserialization
) {
public EventData[] Serialize(IEnumerable<object> messages) {
return Serialize(DefaultSerializationType, messages);
}

public EventData[] Serialize(SerializationType serializationType, IEnumerable<object> messages) {
if (AutomaticDeserialization == AutomaticDeserialization.Disabled)
throw new InvalidOperationException("Cannot serialize, automatic deserialization is disabled");

var serializer = SchemaRegistry.GetSerializer((SchemaDefinitionType)(int)serializationType);

return messages.Select(
@event => {
var (bytes, typeName) = serializer.Serialize(@event);
return new EventData(Uuid.NewUuid(), typeName, bytes);
}
).ToArray();
}

#if NET48
public bool TryDeserialize(EventRecord eventRecord, out object? deserialized) {
#else
public bool TryDeserialize(EventRecord eventRecord, [NotNullWhen(true)] out object? deserialized) {
#endif
if (AutomaticDeserialization == AutomaticDeserialization.Disabled) {
deserialized = null;
return false;
}

var schemaDefinitionType = SchemaDefinitionTypeExtensions.FromContentType(eventRecord.ContentType);

deserialized = SchemaRegistry.GetSerializer(schemaDefinitionType)
.Deserialize(eventRecord.Data, eventRecord.EventType);

return deserialized != null;
}

public static SerializationContext From(KurrentClientSerializationSettings? settings = null) {
settings ??= new KurrentClientSerializationSettings();

return new SerializationContext(
SchemaRegistry.From(settings),
settings.DefaultSerializationType,
settings.AutomaticDeserialization
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
using Grpc.Core;
using static EventStore.Client.PersistentSubscriptions.PersistentSubscriptions;
using static EventStore.Client.PersistentSubscriptions.ReadResp.ContentOneofCase;
using DeserializationContext = Kurrent.Client.Core.Serialization.DeserializationContext;
using SerializationContext = Kurrent.Client.Core.Serialization.SerializationContext;

namespace EventStore.Client {
public class SubscribeToPersistentSubscriptionOptions {
Expand Down Expand Up @@ -199,8 +199,9 @@ public PersistentSubscriptionResult SubscribeToStream(
new() { Options = readOptions },
Settings,
options.UserCredentials,
new DeserializationContext(
new SerializationContext(
_schemaRegistry,
Settings.Serialization.DefaultSerializationType,
Settings.Serialization.AutomaticDeserialization
),
cancellationToken
Expand Down Expand Up @@ -302,7 +303,7 @@ internal PersistentSubscriptionResult(
ReadReq request,
KurrentClientSettings settings,
UserCredentials? userCredentials,
DeserializationContext deserializationContext,
SerializationContext serializationContext,
CancellationToken cancellationToken
) {
StreamName = streamName;
Expand Down Expand Up @@ -340,7 +341,7 @@ async Task PumpMessages() {
response.SubscriptionConfirmation.SubscriptionId
),
Event => new PersistentSubscriptionMessage.Event(
ConvertToResolvedEvent(response, deserializationContext),
ConvertToResolvedEvent(response, serializationContext),
response.Event.CountCase switch {
ReadResp.Types.ReadEvent.CountOneofCase.RetryCount => response.Event.RetryCount,
_ => null
Expand Down Expand Up @@ -448,7 +449,7 @@ public Task Nack(

static ResolvedEvent ConvertToResolvedEvent(
ReadResp response,
DeserializationContext deserializationContext
SerializationContext serializationContext
) =>
ResolvedEvent.From(
ConvertToEventRecord(response.Event.Event)!,
Expand All @@ -457,7 +458,7 @@ DeserializationContext deserializationContext
ReadResp.Types.ReadEvent.PositionOneofCase.CommitPosition => response.Event.CommitPosition,
_ => null
},
deserializationContext
serializationContext
);

Task AckInternal(params Uuid[] eventIds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
namespace EventStore.Client {
public partial class KurrentPersistentSubscriptionsClient {
readonly SchemaRegistry _schemaRegistry;
readonly DeserializationContext _defaultDeserializationContext;
readonly SerializationContext _defaultSerializationContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
using Kurrent.Client.Core.Serialization;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using DeserializationContext = Kurrent.Client.Core.Serialization.DeserializationContext;
using SerializationContext = Kurrent.Client.Core.Serialization.SerializationContext;

namespace EventStore.Client {
/// <summary>
Expand Down Expand Up @@ -43,8 +43,9 @@ public KurrentPersistentSubscriptionsClient(KurrentClientSettings? settings) : b
var serializationSettings = settings?.Serialization ?? KurrentClientSerializationSettings.Default();

_schemaRegistry = SchemaRegistry.From(serializationSettings);
_defaultDeserializationContext = new DeserializationContext(
_defaultSerializationContext = new SerializationContext(
_schemaRegistry,
serializationSettings.DefaultSerializationType,
serializationSettings.AutomaticDeserialization
);
}
Expand Down
22 changes: 4 additions & 18 deletions src/Kurrent.Client/Streams/KurrentClient.Append.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,12 @@ public Task<IWriteResult> AppendToStreamAsync(
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default
) {
var serializer = _schemaRegistry.GetSerializer(SchemaDefinitionType.Json);
var eventsData = _defaultSerializationContext.Serialize(events);

var serializedEvents = events.Select(
@event => {
var (bytes, typeName) = serializer.Serialize(@event);
return new EventData(Uuid.NewUuid(), typeName, bytes);
}
).AsEnumerable();

return AppendToStreamAsync(
streamName,
expectedState,
serializedEvents,
eventsData,
configureOperationOptions,
deadline,
userCredentials,
Expand Down Expand Up @@ -78,19 +71,12 @@ public Task<IWriteResult> AppendToStreamAsync(
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default
) {
var serializer = _schemaRegistry.GetSerializer(SchemaDefinitionType.Json);
var eventsData = _defaultSerializationContext.Serialize(events);

var serializedEvents = events.Select(
@event => {
var (bytes, typeName) = serializer.Serialize(@event);
return new EventData(Uuid.NewUuid(), typeName, bytes);
}
).AsEnumerable();

return AppendToStreamAsync(
streamName,
expectedRevision,
serializedEvents,
eventsData,
configureOperationOptions,
deadline,
userCredentials,
Expand Down
22 changes: 11 additions & 11 deletions src/Kurrent.Client/Streams/KurrentClient.Read.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
using Grpc.Core;
using static EventStore.Client.Streams.ReadResp;
using static EventStore.Client.Streams.ReadResp.ContentOneofCase;
using DeserializationContext = Kurrent.Client.Core.Serialization.DeserializationContext;
using SerializationContext = Kurrent.Client.Core.Serialization.SerializationContext;

namespace EventStore.Client {
public partial class KurrentClient {
Expand Down Expand Up @@ -93,7 +93,7 @@ public ReadAllStreamResult ReadAllAsync(
Settings,
deadline,
userCredentials,
new DeserializationContext(_schemaRegistry, Settings.Serialization.AutomaticDeserialization),
_defaultSerializationContext,
cancellationToken
);
}
Expand Down Expand Up @@ -146,7 +146,7 @@ internal ReadAllStreamResult(
KurrentClientSettings settings,
TimeSpan? deadline,
UserCredentials? userCredentials,
DeserializationContext deserializationContext,
SerializationContext serializationContext,
CancellationToken cancellationToken
) {
var callOptions = KurrentCallOptions.CreateStreaming(
Expand All @@ -173,14 +173,14 @@ async Task PumpMessages() {
var callInvoker = await selectCallInvoker(linkedCancellationToken).ConfigureAwait(false);
var client = new Streams.Streams.StreamsClient(callInvoker);
using var call = client.Read(request, callOptions);

await foreach (var response in call.ResponseStream.ReadAllAsync(linkedCancellationToken)
.ConfigureAwait(false)) {
await _channel.Writer.WriteAsync(
response.ContentCase switch {
StreamNotFound => StreamMessage.NotFound.Instance,
Event => new StreamMessage.Event(
ConvertToResolvedEvent(response.Event, deserializationContext)
ConvertToResolvedEvent(response.Event, serializationContext)
),
FirstStreamPosition => new StreamMessage.FirstStreamPosition(
new StreamPosition(response.FirstStreamPosition)
Expand Down Expand Up @@ -279,7 +279,7 @@ public ReadStreamResult ReadStreamAsync(
Settings,
deadline,
userCredentials,
new DeserializationContext(_schemaRegistry, Settings.Serialization.AutomaticDeserialization),
_defaultSerializationContext,
cancellationToken
);
}
Expand Down Expand Up @@ -356,7 +356,7 @@ internal ReadStreamResult(
KurrentClientSettings settings,
TimeSpan? deadline,
UserCredentials? userCredentials,
DeserializationContext deserializationContext,
SerializationContext serializationContext,
CancellationToken cancellationToken
) {
var callOptions = KurrentCallOptions.CreateStreaming(
Expand Down Expand Up @@ -408,7 +408,7 @@ await _channel.Writer.WriteAsync(
response.ContentCase switch {
StreamNotFound => StreamMessage.NotFound.Instance,
Event => new StreamMessage.Event(
ConvertToResolvedEvent(response.Event, deserializationContext)
ConvertToResolvedEvent(response.Event, serializationContext)
),
ContentOneofCase.FirstStreamPosition => new StreamMessage.FirstStreamPosition(
new StreamPosition(response.FirstStreamPosition)
Expand Down Expand Up @@ -460,8 +460,8 @@ public async IAsyncEnumerator<ResolvedEvent> GetAsyncEnumerator(
}

static ResolvedEvent ConvertToResolvedEvent(
Types.ReadEvent readEvent,
DeserializationContext deserializationContext
Types.ReadEvent readEvent,
SerializationContext serializationContext
) =>
ResolvedEvent.From(
ConvertToEventRecord(readEvent.Event)!,
Expand All @@ -470,7 +470,7 @@ DeserializationContext deserializationContext
Types.ReadEvent.PositionOneofCase.CommitPosition => readEvent.CommitPosition,
_ => null
},
deserializationContext
serializationContext
);

static EventRecord? ConvertToEventRecord(ReadResp.Types.ReadEvent.Types.RecordedEvent? e) =>
Expand Down
3 changes: 1 addition & 2 deletions src/Kurrent.Client/Streams/KurrentClient.Serialization.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

namespace EventStore.Client {
public partial class KurrentClient {
readonly SchemaRegistry _schemaRegistry;
readonly DeserializationContext _defaultDeserializationContext;
readonly SerializationContext _defaultSerializationContext;
}
}
10 changes: 5 additions & 5 deletions src/Kurrent.Client/Streams/KurrentClient.Subscriptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
using Grpc.Core;

using static EventStore.Client.Streams.ReadResp.ContentOneofCase;
using DeserializationContext = Kurrent.Client.Core.Serialization.DeserializationContext;
using SerializationContext = Kurrent.Client.Core.Serialization.SerializationContext;

namespace EventStore.Client {
public partial class KurrentClient {
Expand Down Expand Up @@ -66,7 +66,7 @@ public StreamSubscriptionResult SubscribeToAll(
},
Settings,
userCredentials,
new DeserializationContext(_schemaRegistry, Settings.Serialization.AutomaticDeserialization),
_defaultSerializationContext,
cancellationToken
);

Expand Down Expand Up @@ -125,7 +125,7 @@ public StreamSubscriptionResult SubscribeToStream(
},
Settings,
userCredentials,
new DeserializationContext(_schemaRegistry, Settings.Serialization.AutomaticDeserialization),
_defaultSerializationContext,
cancellationToken
);

Expand Down Expand Up @@ -182,7 +182,7 @@ internal StreamSubscriptionResult(
ReadReq request,
KurrentClientSettings settings,
UserCredentials? userCredentials,
DeserializationContext deserializationContext,
SerializationContext serializationContext,
CancellationToken cancellationToken
) {
_request = request;
Expand Down Expand Up @@ -215,7 +215,7 @@ async Task PumpMessages() {
StreamMessage subscriptionMessage =
response.ContentCase switch {
Confirmation => new StreamMessage.SubscriptionConfirmation(response.Confirmation.SubscriptionId),
Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event, deserializationContext)),
Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event, serializationContext)),
FirstStreamPosition => new StreamMessage.FirstStreamPosition(new StreamPosition(response.FirstStreamPosition)),
LastStreamPosition => new StreamMessage.LastStreamPosition(new StreamPosition(response.LastStreamPosition)),
LastAllStreamPosition => new StreamMessage.LastAllStreamPosition(
Expand Down
Loading

0 comments on commit 4992546

Please sign in to comment.