Skip to content

Commit

Permalink
[DEVEX-222] Refactored SerializationContext into MessageSerializerWra…
Browse files Browse the repository at this point in the history
…pper to make clearer responsibilities

Actually, it's just wrapping message serializer based on the client settings.
  • Loading branch information
oskardudycz committed Jan 29, 2025
1 parent 6446ea2 commit 1e65b62
Show file tree
Hide file tree
Showing 12 changed files with 110 additions and 95 deletions.
4 changes: 2 additions & 2 deletions src/Kurrent.Client/Core/ResolvedEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ public static ResolvedEvent From(
EventRecord @event,
EventRecord? link,
ulong? commitPosition,
SerializationContext serializationContext
MessageSerializerWrapper _messageSerializer
) {
var originalEvent = link ?? @event;
return serializationContext.TryDeserialize(originalEvent, out var deserialized)
return _messageSerializer.TryDeserialize(originalEvent, out var deserialized)
? new ResolvedEvent(@event, link, deserialized, commitPosition)
: new ResolvedEvent(@event, link, commitPosition);
}
Expand Down
66 changes: 51 additions & 15 deletions src/Kurrent.Client/Core/Serialization/MessageSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,78 @@

namespace EventStore.Client.Serialization;

using static ContentTypeExtensions;

public interface IMessageSerializer {
public EventData Serialize(Message value, MessageSerializationContext context);

public bool TryDeserialize(EventRecord messageRecord, out object? deserialized);
}

public class MessageSerializer(
ContentType contentType,
ISerializer serializer,
ISerializer jsonSerializer,
IMessageTypeResolutionStrategy messageTypeResolutionStrategy
) : IMessageSerializer {
readonly string _messageContentType = contentType.ToMessageContentType();

public record MessageSerializationContext(
string StreamName,
ContentType ContentType
) {
public string CategoryName =>
// TODO: This is dangerous, as separator can be changed in database settings
StreamName.Split('-').FirstOrDefault() ?? "no_stream_category";
}

public static class MessageSerializerExtensions {
public static EventData[] Serialize(
this IMessageSerializer serializer,
IEnumerable<Message> messages,
MessageSerializationContext context
) {
return messages.Select(m => serializer.Serialize(m, context)).ToArray();
}
}

public class MessageSerializer(SchemaRegistry schemaRegistry) : IMessageSerializer {
readonly ISerializer _jsonSerializer =
schemaRegistry.GetSerializer(ContentType.Json);

readonly IMessageTypeResolutionStrategy _messageTypeResolutionStrategy =
schemaRegistry.MessageTypeResolutionStrategy;

public EventData Serialize(Message message, MessageSerializationContext serializationContext) {
var (data, metadata, eventId) = message;
var eventType = messageTypeResolutionStrategy.ResolveTypeName(message, serializationContext);
var serializedData = serializer.Serialize(data);
var serializedMetadata = metadata != null ? jsonSerializer.Serialize(metadata) : ReadOnlyMemory<byte>.Empty;

var eventType = _messageTypeResolutionStrategy
.ResolveTypeName(message, serializationContext);

var serializedData = schemaRegistry
.GetSerializer(serializationContext.ContentType)
.Serialize(data);

var serializedMetadata = metadata != null
? _jsonSerializer.Serialize(metadata)
: ReadOnlyMemory<byte>.Empty;

var metadataWithSerialization = serializedMetadata
.InjectSerializationMetadata(SerializationMetadata.From(data.GetType()))
.ToArray();

return new EventData(
eventId,
eventType,
serializedData,
serializedMetadata.InjectSerializationMetadata(SerializationMetadata.From(data.GetType())).ToArray(),
_messageContentType
metadataWithSerialization,
serializationContext.ContentType.ToMessageContentType()
);
}

public bool TryDeserialize(EventRecord messageRecord, out object? deserialized) {
if (!messageTypeResolutionStrategy.TryResolveClrType(messageRecord, out var clrType)) {
if (!schemaRegistry
.MessageTypeResolutionStrategy
.TryResolveClrType(messageRecord, out var clrType)) {
deserialized = null;
return false;
}

deserialized = serializer.Deserialize(messageRecord.Data, clrType!);
deserialized = schemaRegistry
.GetSerializer(FromMessageContentType(messageRecord.ContentType))
.Deserialize(messageRecord.Data, clrType!);

return true;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Diagnostics.CodeAnalysis;
using EventStore.Client;
using EventStore.Client.Serialization;

namespace Kurrent.Client.Core.Serialization;

Expand Down Expand Up @@ -44,50 +45,36 @@ public void Deconstruct(out object data, out object? metadata, out Uuid eventId)
}
}

public record MessageSerializationContext(
string StreamName,
ContentType? ContentType = null
) {
public string CategoryName =>
// TODO: This is dangerous, as separator can be changed in database settings
StreamName.Split('-').FirstOrDefault() ?? "no_stream_category";
}

public record SerializationContext(
SchemaRegistry SchemaRegistry,
ContentType DefaultContentType,
AutomaticDeserialization AutomaticDeserialization
) {
public EventData[] Serialize(IEnumerable<Message> messages, MessageSerializationContext context) {
if (AutomaticDeserialization == AutomaticDeserialization.Disabled)
public class MessageSerializerWrapper(
IMessageSerializer messageSerializer,
AutomaticDeserialization automaticDeserialization
): IMessageSerializer {
public EventData Serialize(Message value, MessageSerializationContext context) {
if (automaticDeserialization == AutomaticDeserialization.Disabled)
throw new InvalidOperationException("Cannot serialize, automatic deserialization is disabled");

var serializer = SchemaRegistry.GetSerializer(context.ContentType ?? DefaultContentType);

return messages.Select(m => serializer.Serialize(m, context)).ToArray();
return messageSerializer.Serialize(value, context);
}

#if NET48
public bool TryDeserialize(EventRecord eventRecord, out object? deserialized) {
#else
public bool TryDeserialize(EventRecord eventRecord, [NotNullWhen(true)] out object? deserialized) {
#endif
if (AutomaticDeserialization == AutomaticDeserialization.Disabled) {
if (automaticDeserialization == AutomaticDeserialization.Disabled) {
deserialized = null;
return false;
}

return SchemaRegistry
.GetSerializer(FromMessageContentType(eventRecord.ContentType))
return messageSerializer
.TryDeserialize(eventRecord, out deserialized);
}

public static SerializationContext From(KurrentClientSerializationSettings? settings = null) {
public static MessageSerializerWrapper From(KurrentClientSerializationSettings? settings = null) {
settings ??= new KurrentClientSerializationSettings();

return new SerializationContext(
SchemaRegistry.From(settings),
settings.DefaultContentType,
return new MessageSerializerWrapper(
new MessageSerializer(SchemaRegistry.From(settings)),
settings.AutomaticDeserialization
);
}
Expand Down
18 changes: 9 additions & 9 deletions src/Kurrent.Client/Core/Serialization/SchemaRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,30 @@ public static string ToMessageContentType(this ContentType contentType) =>

// TODO: We need to discuss how to include the full Schema Registry code here
public class SchemaRegistry(
IDictionary<ContentType, MessageSerializer> serializers
IDictionary<ContentType, ISerializer> serializers,
IMessageTypeResolutionStrategy messageTypeResolutionStrategy
) {
public IMessageSerializer GetSerializer(ContentType schemaType) =>
public IMessageTypeResolutionStrategy MessageTypeResolutionStrategy { get; } = messageTypeResolutionStrategy;

public ISerializer GetSerializer(ContentType schemaType) =>
serializers[schemaType];

public static SchemaRegistry From(KurrentClientSerializationSettings settings) {
var jsonSerializer = settings.JsonSerializer ?? new SystemTextJsonSerializer();
var bytesSerializer = settings.BytesSerializer ?? new SystemTextJsonSerializer();

var messageTypeResolutionStrategy = new MessageTypeResolutionStrategyWrapper(
MessageTypeMapper.Instance,
settings.MessageTypeResolutionStrategy ?? new DefaultMessageTypeResolutionStrategy()
);

var serializers = new Dictionary<ContentType, MessageSerializer> {
var serializers = new Dictionary<ContentType, ISerializer> {
{
ContentType.Json,
new MessageSerializer(ContentType.Json, jsonSerializer, jsonSerializer, messageTypeResolutionStrategy)
settings.JsonSerializer ?? new SystemTextJsonSerializer()
}, {
ContentType.Bytes,
new MessageSerializer(ContentType.Bytes, bytesSerializer, jsonSerializer, messageTypeResolutionStrategy)
settings.BytesSerializer ?? new SystemTextJsonSerializer()
}
};

return new SchemaRegistry(serializers);
return new SchemaRegistry(serializers, messageTypeResolutionStrategy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
using EventStore.Client.PersistentSubscriptions;
using EventStore.Client.Diagnostics;
using Grpc.Core;
using Kurrent.Client.Core.Serialization;
using static EventStore.Client.PersistentSubscriptions.PersistentSubscriptions;
using static EventStore.Client.PersistentSubscriptions.ReadResp.ContentOneofCase;
using SerializationContext = Kurrent.Client.Core.Serialization.SerializationContext;

namespace EventStore.Client {
public class SubscribeToPersistentSubscriptionOptions {
Expand Down Expand Up @@ -199,11 +199,7 @@ public PersistentSubscriptionResult SubscribeToStream(
new() { Options = readOptions },
Settings,
options.UserCredentials,
new SerializationContext(
_schemaRegistry,
Settings.Serialization.DefaultContentType,
Settings.Serialization.AutomaticDeserialization
),
MessageSerializerWrapper.From(Settings.Serialization),
cancellationToken
);
}
Expand Down Expand Up @@ -303,7 +299,7 @@ internal PersistentSubscriptionResult(
ReadReq request,
KurrentClientSettings settings,
UserCredentials? userCredentials,
SerializationContext serializationContext,
MessageSerializerWrapper _messageSerializer,
CancellationToken cancellationToken
) {
StreamName = streamName;
Expand Down Expand Up @@ -341,7 +337,7 @@ async Task PumpMessages() {
response.SubscriptionConfirmation.SubscriptionId
),
Event => new PersistentSubscriptionMessage.Event(
ConvertToResolvedEvent(response, serializationContext),
ConvertToResolvedEvent(response, _messageSerializer),
response.Event.CountCase switch {
ReadResp.Types.ReadEvent.CountOneofCase.RetryCount => response.Event.RetryCount,
_ => null
Expand Down Expand Up @@ -449,7 +445,7 @@ public Task Nack(

static ResolvedEvent ConvertToResolvedEvent(
ReadResp response,
SerializationContext serializationContext
MessageSerializerWrapper _messageSerializer
) =>
ResolvedEvent.From(
ConvertToEventRecord(response.Event.Event)!,
Expand All @@ -458,7 +454,7 @@ SerializationContext serializationContext
ReadResp.Types.ReadEvent.PositionOneofCase.CommitPosition => response.Event.CommitPosition,
_ => null
},
serializationContext
_messageSerializer
);

Task AckInternal(params Uuid[] eventIds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

namespace EventStore.Client {
public partial class KurrentPersistentSubscriptionsClient {
readonly SchemaRegistry _schemaRegistry;
readonly SerializationContext _defaultSerializationContext;
readonly MessageSerializerWrapper _messageSerializer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using Kurrent.Client.Core.Serialization;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using SerializationContext = Kurrent.Client.Core.Serialization.SerializationContext;

namespace EventStore.Client {
/// <summary>
Expand Down Expand Up @@ -40,14 +39,7 @@ public KurrentPersistentSubscriptionsClient(KurrentClientSettings? settings) : b
_log = Settings.LoggerFactory?.CreateLogger<KurrentPersistentSubscriptionsClient>()
?? new NullLogger<KurrentPersistentSubscriptionsClient>();

var serializationSettings = settings?.Serialization ?? KurrentClientSerializationSettings.Default();

_schemaRegistry = SchemaRegistry.From(serializationSettings);
_defaultSerializationContext = new SerializationContext(
_schemaRegistry,
serializationSettings.DefaultContentType,
serializationSettings.AutomaticDeserialization
);
_messageSerializer = MessageSerializerWrapper.From(settings?.Serialization);
}

private static string UrlEncode(string s) {
Expand Down
15 changes: 11 additions & 4 deletions src/Kurrent.Client/Streams/KurrentClient.Append.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Grpc.Core;
using Microsoft.Extensions.Logging;
using EventStore.Client.Diagnostics;
using EventStore.Client.Serialization;
using Kurrent.Client.Core.Serialization;
using Kurrent.Diagnostics;
using Kurrent.Diagnostics.Telemetry;
Expand Down Expand Up @@ -37,8 +38,11 @@ public Task<IWriteResult> AppendToStreamAsync(
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default
) {
var serializationContext = new MessageSerializationContext(streamName);
var eventsData = _defaultSerializationContext.Serialize(events.Select(e => new Message(e)), serializationContext);
var serializationContext = new MessageSerializationContext(
streamName,
Settings.Serialization.DefaultContentType
);
var eventsData = _messageSerializer.Serialize(events.Select(e => new Message(e)), serializationContext);

return AppendToStreamAsync(
streamName,
Expand Down Expand Up @@ -72,8 +76,11 @@ public Task<IWriteResult> AppendToStreamAsync(
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default
) {
var serializationContext = new MessageSerializationContext(streamName);
var eventsData = _defaultSerializationContext.Serialize(events.Select(e => new Message(e)), serializationContext);
var serializationContext = new MessageSerializationContext(
streamName,
Settings.Serialization.DefaultContentType
);
var eventsData = _messageSerializer.Serialize(events.Select(e => new Message(e)), serializationContext);

return AppendToStreamAsync(
streamName,
Expand Down
Loading

0 comments on commit 1e65b62

Please sign in to comment.