diff --git a/src/Kurrent.Client/Core/ResolvedEvent.cs b/src/Kurrent.Client/Core/ResolvedEvent.cs index 903805674..88ef93b7c 100644 --- a/src/Kurrent.Client/Core/ResolvedEvent.cs +++ b/src/Kurrent.Client/Core/ResolvedEvent.cs @@ -93,10 +93,10 @@ public static ResolvedEvent From( EventRecord @event, EventRecord? link, ulong? commitPosition, - SerializationContext serializationContext + MessageSerializerWrapper _messageSerializer ) { var originalEvent = link ?? @event; - return serializationContext.TryDeserialize(originalEvent, out var deserialized) + return _messageSerializer.TryDeserialize(originalEvent, out var deserialized) ? new ResolvedEvent(@event, link, deserialized, commitPosition) : new ResolvedEvent(@event, link, commitPosition); } diff --git a/src/Kurrent.Client/Core/Serialization/MessageSerializer.cs b/src/Kurrent.Client/Core/Serialization/MessageSerializer.cs index 5fbc8548a..b35366c6e 100644 --- a/src/Kurrent.Client/Core/Serialization/MessageSerializer.cs +++ b/src/Kurrent.Client/Core/Serialization/MessageSerializer.cs @@ -3,42 +3,78 @@ namespace EventStore.Client.Serialization; +using static ContentTypeExtensions; + public interface IMessageSerializer { public EventData Serialize(Message value, MessageSerializationContext context); public bool TryDeserialize(EventRecord messageRecord, out object? deserialized); } -public class MessageSerializer( - ContentType contentType, - ISerializer serializer, - ISerializer jsonSerializer, - IMessageTypeResolutionStrategy messageTypeResolutionStrategy -) : IMessageSerializer { - readonly string _messageContentType = contentType.ToMessageContentType(); - +public record MessageSerializationContext( + string StreamName, + ContentType ContentType +) { + public string CategoryName => + // TODO: This is dangerous, as separator can be changed in database settings + StreamName.Split('-').FirstOrDefault() ?? "no_stream_category"; +} + +public static class MessageSerializerExtensions { + public static EventData[] Serialize( + this IMessageSerializer serializer, + IEnumerable messages, + MessageSerializationContext context + ) { + return messages.Select(m => serializer.Serialize(m, context)).ToArray(); + } +} + +public class MessageSerializer(SchemaRegistry schemaRegistry) : IMessageSerializer { + readonly ISerializer _jsonSerializer = + schemaRegistry.GetSerializer(ContentType.Json); + + readonly IMessageTypeResolutionStrategy _messageTypeResolutionStrategy = + schemaRegistry.MessageTypeResolutionStrategy; + public EventData Serialize(Message message, MessageSerializationContext serializationContext) { var (data, metadata, eventId) = message; - var eventType = messageTypeResolutionStrategy.ResolveTypeName(message, serializationContext); - var serializedData = serializer.Serialize(data); - var serializedMetadata = metadata != null ? jsonSerializer.Serialize(metadata) : ReadOnlyMemory.Empty; + + var eventType = _messageTypeResolutionStrategy + .ResolveTypeName(message, serializationContext); + + var serializedData = schemaRegistry + .GetSerializer(serializationContext.ContentType) + .Serialize(data); + + var serializedMetadata = metadata != null + ? _jsonSerializer.Serialize(metadata) + : ReadOnlyMemory.Empty; + + var metadataWithSerialization = serializedMetadata + .InjectSerializationMetadata(SerializationMetadata.From(data.GetType())) + .ToArray(); return new EventData( eventId, eventType, serializedData, - serializedMetadata.InjectSerializationMetadata(SerializationMetadata.From(data.GetType())).ToArray(), - _messageContentType + metadataWithSerialization, + serializationContext.ContentType.ToMessageContentType() ); } public bool TryDeserialize(EventRecord messageRecord, out object? deserialized) { - if (!messageTypeResolutionStrategy.TryResolveClrType(messageRecord, out var clrType)) { + if (!schemaRegistry + .MessageTypeResolutionStrategy + .TryResolveClrType(messageRecord, out var clrType)) { deserialized = null; return false; } - deserialized = serializer.Deserialize(messageRecord.Data, clrType!); + deserialized = schemaRegistry + .GetSerializer(FromMessageContentType(messageRecord.ContentType)) + .Deserialize(messageRecord.Data, clrType!); return true; } diff --git a/src/Kurrent.Client/Core/Serialization/SerializationContext.cs b/src/Kurrent.Client/Core/Serialization/MessageSerializerWrapper.cs similarity index 62% rename from src/Kurrent.Client/Core/Serialization/SerializationContext.cs rename to src/Kurrent.Client/Core/Serialization/MessageSerializerWrapper.cs index f2ace008c..df6c7983d 100644 --- a/src/Kurrent.Client/Core/Serialization/SerializationContext.cs +++ b/src/Kurrent.Client/Core/Serialization/MessageSerializerWrapper.cs @@ -1,5 +1,6 @@ using System.Diagnostics.CodeAnalysis; using EventStore.Client; +using EventStore.Client.Serialization; namespace Kurrent.Client.Core.Serialization; @@ -44,27 +45,15 @@ public void Deconstruct(out object data, out object? metadata, out Uuid eventId) } } -public record MessageSerializationContext( - string StreamName, - ContentType? ContentType = null -) { - public string CategoryName => - // TODO: This is dangerous, as separator can be changed in database settings - StreamName.Split('-').FirstOrDefault() ?? "no_stream_category"; -} - -public record SerializationContext( - SchemaRegistry SchemaRegistry, - ContentType DefaultContentType, - AutomaticDeserialization AutomaticDeserialization -) { - public EventData[] Serialize(IEnumerable messages, MessageSerializationContext context) { - if (AutomaticDeserialization == AutomaticDeserialization.Disabled) +public class MessageSerializerWrapper( + IMessageSerializer messageSerializer, + AutomaticDeserialization automaticDeserialization +): IMessageSerializer { + public EventData Serialize(Message value, MessageSerializationContext context) { + if (automaticDeserialization == AutomaticDeserialization.Disabled) throw new InvalidOperationException("Cannot serialize, automatic deserialization is disabled"); - var serializer = SchemaRegistry.GetSerializer(context.ContentType ?? DefaultContentType); - - return messages.Select(m => serializer.Serialize(m, context)).ToArray(); + return messageSerializer.Serialize(value, context); } #if NET48 @@ -72,22 +61,20 @@ public bool TryDeserialize(EventRecord eventRecord, out object? deserialized) { #else public bool TryDeserialize(EventRecord eventRecord, [NotNullWhen(true)] out object? deserialized) { #endif - if (AutomaticDeserialization == AutomaticDeserialization.Disabled) { + if (automaticDeserialization == AutomaticDeserialization.Disabled) { deserialized = null; return false; } - return SchemaRegistry - .GetSerializer(FromMessageContentType(eventRecord.ContentType)) + return messageSerializer .TryDeserialize(eventRecord, out deserialized); } - public static SerializationContext From(KurrentClientSerializationSettings? settings = null) { + public static MessageSerializerWrapper From(KurrentClientSerializationSettings? settings = null) { settings ??= new KurrentClientSerializationSettings(); - return new SerializationContext( - SchemaRegistry.From(settings), - settings.DefaultContentType, + return new MessageSerializerWrapper( + new MessageSerializer(SchemaRegistry.From(settings)), settings.AutomaticDeserialization ); } diff --git a/src/Kurrent.Client/Core/Serialization/SchemaRegistry.cs b/src/Kurrent.Client/Core/Serialization/SchemaRegistry.cs index 3d04f5d6a..6e04dca73 100644 --- a/src/Kurrent.Client/Core/Serialization/SchemaRegistry.cs +++ b/src/Kurrent.Client/Core/Serialization/SchemaRegistry.cs @@ -30,30 +30,30 @@ public static string ToMessageContentType(this ContentType contentType) => // TODO: We need to discuss how to include the full Schema Registry code here public class SchemaRegistry( - IDictionary serializers + IDictionary serializers, + IMessageTypeResolutionStrategy messageTypeResolutionStrategy ) { - public IMessageSerializer GetSerializer(ContentType schemaType) => + public IMessageTypeResolutionStrategy MessageTypeResolutionStrategy { get; } = messageTypeResolutionStrategy; + + public ISerializer GetSerializer(ContentType schemaType) => serializers[schemaType]; public static SchemaRegistry From(KurrentClientSerializationSettings settings) { - var jsonSerializer = settings.JsonSerializer ?? new SystemTextJsonSerializer(); - var bytesSerializer = settings.BytesSerializer ?? new SystemTextJsonSerializer(); - var messageTypeResolutionStrategy = new MessageTypeResolutionStrategyWrapper( MessageTypeMapper.Instance, settings.MessageTypeResolutionStrategy ?? new DefaultMessageTypeResolutionStrategy() ); - var serializers = new Dictionary { + var serializers = new Dictionary { { ContentType.Json, - new MessageSerializer(ContentType.Json, jsonSerializer, jsonSerializer, messageTypeResolutionStrategy) + settings.JsonSerializer ?? new SystemTextJsonSerializer() }, { ContentType.Bytes, - new MessageSerializer(ContentType.Bytes, bytesSerializer, jsonSerializer, messageTypeResolutionStrategy) + settings.BytesSerializer ?? new SystemTextJsonSerializer() } }; - return new SchemaRegistry(serializers); + return new SchemaRegistry(serializers, messageTypeResolutionStrategy); } } diff --git a/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.Read.cs b/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.Read.cs index 1a6f1284f..f70f80f8e 100644 --- a/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.Read.cs +++ b/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.Read.cs @@ -2,9 +2,9 @@ using EventStore.Client.PersistentSubscriptions; using EventStore.Client.Diagnostics; using Grpc.Core; +using Kurrent.Client.Core.Serialization; using static EventStore.Client.PersistentSubscriptions.PersistentSubscriptions; using static EventStore.Client.PersistentSubscriptions.ReadResp.ContentOneofCase; -using SerializationContext = Kurrent.Client.Core.Serialization.SerializationContext; namespace EventStore.Client { public class SubscribeToPersistentSubscriptionOptions { @@ -199,11 +199,7 @@ public PersistentSubscriptionResult SubscribeToStream( new() { Options = readOptions }, Settings, options.UserCredentials, - new SerializationContext( - _schemaRegistry, - Settings.Serialization.DefaultContentType, - Settings.Serialization.AutomaticDeserialization - ), + MessageSerializerWrapper.From(Settings.Serialization), cancellationToken ); } @@ -303,7 +299,7 @@ internal PersistentSubscriptionResult( ReadReq request, KurrentClientSettings settings, UserCredentials? userCredentials, - SerializationContext serializationContext, + MessageSerializerWrapper _messageSerializer, CancellationToken cancellationToken ) { StreamName = streamName; @@ -341,7 +337,7 @@ async Task PumpMessages() { response.SubscriptionConfirmation.SubscriptionId ), Event => new PersistentSubscriptionMessage.Event( - ConvertToResolvedEvent(response, serializationContext), + ConvertToResolvedEvent(response, _messageSerializer), response.Event.CountCase switch { ReadResp.Types.ReadEvent.CountOneofCase.RetryCount => response.Event.RetryCount, _ => null @@ -449,7 +445,7 @@ public Task Nack( static ResolvedEvent ConvertToResolvedEvent( ReadResp response, - SerializationContext serializationContext + MessageSerializerWrapper _messageSerializer ) => ResolvedEvent.From( ConvertToEventRecord(response.Event.Event)!, @@ -458,7 +454,7 @@ SerializationContext serializationContext ReadResp.Types.ReadEvent.PositionOneofCase.CommitPosition => response.Event.CommitPosition, _ => null }, - serializationContext + _messageSerializer ); Task AckInternal(params Uuid[] eventIds) { diff --git a/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.Serialization.cs b/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.Serialization.cs index 0a303356e..5e24ede62 100644 --- a/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.Serialization.cs +++ b/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.Serialization.cs @@ -2,7 +2,6 @@ namespace EventStore.Client { public partial class KurrentPersistentSubscriptionsClient { - readonly SchemaRegistry _schemaRegistry; - readonly SerializationContext _defaultSerializationContext; + readonly MessageSerializerWrapper _messageSerializer; } } diff --git a/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.cs b/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.cs index d7ef3beed..358471feb 100644 --- a/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.cs +++ b/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.cs @@ -4,7 +4,6 @@ using Kurrent.Client.Core.Serialization; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; -using SerializationContext = Kurrent.Client.Core.Serialization.SerializationContext; namespace EventStore.Client { /// @@ -40,14 +39,7 @@ public KurrentPersistentSubscriptionsClient(KurrentClientSettings? settings) : b _log = Settings.LoggerFactory?.CreateLogger() ?? new NullLogger(); - var serializationSettings = settings?.Serialization ?? KurrentClientSerializationSettings.Default(); - - _schemaRegistry = SchemaRegistry.From(serializationSettings); - _defaultSerializationContext = new SerializationContext( - _schemaRegistry, - serializationSettings.DefaultContentType, - serializationSettings.AutomaticDeserialization - ); + _messageSerializer = MessageSerializerWrapper.From(settings?.Serialization); } private static string UrlEncode(string s) { diff --git a/src/Kurrent.Client/Streams/KurrentClient.Append.cs b/src/Kurrent.Client/Streams/KurrentClient.Append.cs index a7257f389..6ea0c5ea0 100644 --- a/src/Kurrent.Client/Streams/KurrentClient.Append.cs +++ b/src/Kurrent.Client/Streams/KurrentClient.Append.cs @@ -7,6 +7,7 @@ using Grpc.Core; using Microsoft.Extensions.Logging; using EventStore.Client.Diagnostics; +using EventStore.Client.Serialization; using Kurrent.Client.Core.Serialization; using Kurrent.Diagnostics; using Kurrent.Diagnostics.Telemetry; @@ -37,8 +38,11 @@ public Task AppendToStreamAsync( UserCredentials? userCredentials = null, CancellationToken cancellationToken = default ) { - var serializationContext = new MessageSerializationContext(streamName); - var eventsData = _defaultSerializationContext.Serialize(events.Select(e => new Message(e)), serializationContext); + var serializationContext = new MessageSerializationContext( + streamName, + Settings.Serialization.DefaultContentType + ); + var eventsData = _messageSerializer.Serialize(events.Select(e => new Message(e)), serializationContext); return AppendToStreamAsync( streamName, @@ -72,8 +76,11 @@ public Task AppendToStreamAsync( UserCredentials? userCredentials = null, CancellationToken cancellationToken = default ) { - var serializationContext = new MessageSerializationContext(streamName); - var eventsData = _defaultSerializationContext.Serialize(events.Select(e => new Message(e)), serializationContext); + var serializationContext = new MessageSerializationContext( + streamName, + Settings.Serialization.DefaultContentType + ); + var eventsData = _messageSerializer.Serialize(events.Select(e => new Message(e)), serializationContext); return AppendToStreamAsync( streamName, diff --git a/src/Kurrent.Client/Streams/KurrentClient.Read.cs b/src/Kurrent.Client/Streams/KurrentClient.Read.cs index 68e62358c..83bd5bfcc 100644 --- a/src/Kurrent.Client/Streams/KurrentClient.Read.cs +++ b/src/Kurrent.Client/Streams/KurrentClient.Read.cs @@ -2,9 +2,9 @@ using EventStore.Client.Serialization; using EventStore.Client.Streams; using Grpc.Core; +using Kurrent.Client.Core.Serialization; using static EventStore.Client.Streams.ReadResp; using static EventStore.Client.Streams.ReadResp.ContentOneofCase; -using SerializationContext = Kurrent.Client.Core.Serialization.SerializationContext; namespace EventStore.Client { public partial class KurrentClient { @@ -93,7 +93,7 @@ public ReadAllStreamResult ReadAllAsync( Settings, deadline, userCredentials, - _defaultSerializationContext, + _messageSerializer, cancellationToken ); } @@ -146,7 +146,7 @@ internal ReadAllStreamResult( KurrentClientSettings settings, TimeSpan? deadline, UserCredentials? userCredentials, - SerializationContext serializationContext, + MessageSerializerWrapper _messageSerializer, CancellationToken cancellationToken ) { var callOptions = KurrentCallOptions.CreateStreaming( @@ -180,7 +180,7 @@ await _channel.Writer.WriteAsync( response.ContentCase switch { StreamNotFound => StreamMessage.NotFound.Instance, Event => new StreamMessage.Event( - ConvertToResolvedEvent(response.Event, serializationContext) + ConvertToResolvedEvent(response.Event, _messageSerializer) ), FirstStreamPosition => new StreamMessage.FirstStreamPosition( new StreamPosition(response.FirstStreamPosition) @@ -279,7 +279,7 @@ public ReadStreamResult ReadStreamAsync( Settings, deadline, userCredentials, - _defaultSerializationContext, + _messageSerializer, cancellationToken ); } @@ -356,7 +356,7 @@ internal ReadStreamResult( KurrentClientSettings settings, TimeSpan? deadline, UserCredentials? userCredentials, - SerializationContext serializationContext, + MessageSerializerWrapper _messageSerializer, CancellationToken cancellationToken ) { var callOptions = KurrentCallOptions.CreateStreaming( @@ -408,7 +408,7 @@ await _channel.Writer.WriteAsync( response.ContentCase switch { StreamNotFound => StreamMessage.NotFound.Instance, Event => new StreamMessage.Event( - ConvertToResolvedEvent(response.Event, serializationContext) + ConvertToResolvedEvent(response.Event, _messageSerializer) ), ContentOneofCase.FirstStreamPosition => new StreamMessage.FirstStreamPosition( new StreamPosition(response.FirstStreamPosition) @@ -461,7 +461,7 @@ public async IAsyncEnumerator GetAsyncEnumerator( static ResolvedEvent ConvertToResolvedEvent( Types.ReadEvent readEvent, - SerializationContext serializationContext + MessageSerializerWrapper _messageSerializer ) => ResolvedEvent.From( ConvertToEventRecord(readEvent.Event)!, @@ -470,7 +470,7 @@ SerializationContext serializationContext Types.ReadEvent.PositionOneofCase.CommitPosition => readEvent.CommitPosition, _ => null }, - serializationContext + _messageSerializer ); static EventRecord? ConvertToEventRecord(ReadResp.Types.ReadEvent.Types.RecordedEvent? e) => diff --git a/src/Kurrent.Client/Streams/KurrentClient.Serialization.cs b/src/Kurrent.Client/Streams/KurrentClient.Serialization.cs index 15219d48c..79bca4917 100644 --- a/src/Kurrent.Client/Streams/KurrentClient.Serialization.cs +++ b/src/Kurrent.Client/Streams/KurrentClient.Serialization.cs @@ -2,6 +2,6 @@ namespace EventStore.Client { public partial class KurrentClient { - readonly SerializationContext _defaultSerializationContext; + readonly MessageSerializerWrapper _messageSerializer; } } diff --git a/src/Kurrent.Client/Streams/KurrentClient.Subscriptions.cs b/src/Kurrent.Client/Streams/KurrentClient.Subscriptions.cs index cfa5b5120..c6a348a2c 100644 --- a/src/Kurrent.Client/Streams/KurrentClient.Subscriptions.cs +++ b/src/Kurrent.Client/Streams/KurrentClient.Subscriptions.cs @@ -3,9 +3,8 @@ using EventStore.Client.Serialization; using EventStore.Client.Streams; using Grpc.Core; - +using Kurrent.Client.Core.Serialization; using static EventStore.Client.Streams.ReadResp.ContentOneofCase; -using SerializationContext = Kurrent.Client.Core.Serialization.SerializationContext; namespace EventStore.Client { public partial class KurrentClient { @@ -66,7 +65,7 @@ public StreamSubscriptionResult SubscribeToAll( }, Settings, userCredentials, - _defaultSerializationContext, + _messageSerializer, cancellationToken ); @@ -125,7 +124,7 @@ public StreamSubscriptionResult SubscribeToStream( }, Settings, userCredentials, - _defaultSerializationContext, + _messageSerializer, cancellationToken ); @@ -182,7 +181,7 @@ internal StreamSubscriptionResult( ReadReq request, KurrentClientSettings settings, UserCredentials? userCredentials, - SerializationContext serializationContext, + MessageSerializerWrapper _messageSerializer, CancellationToken cancellationToken ) { _request = request; @@ -215,7 +214,7 @@ async Task PumpMessages() { StreamMessage subscriptionMessage = response.ContentCase switch { Confirmation => new StreamMessage.SubscriptionConfirmation(response.Confirmation.SubscriptionId), - Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event, serializationContext)), + Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event, _messageSerializer)), FirstStreamPosition => new StreamMessage.FirstStreamPosition(new StreamPosition(response.FirstStreamPosition)), LastStreamPosition => new StreamMessage.LastStreamPosition(new StreamPosition(response.LastStreamPosition)), LastAllStreamPosition => new StreamMessage.LastAllStreamPosition( diff --git a/src/Kurrent.Client/Streams/KurrentClient.cs b/src/Kurrent.Client/Streams/KurrentClient.cs index bb5380a5e..0229a7bab 100644 --- a/src/Kurrent.Client/Streams/KurrentClient.cs +++ b/src/Kurrent.Client/Streams/KurrentClient.cs @@ -7,7 +7,6 @@ using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using ReadReq = EventStore.Client.Streams.ReadReq; -using SerializationContext = Kurrent.Client.Core.Serialization.SerializationContext; namespace EventStore.Client { /// @@ -75,7 +74,7 @@ public KurrentClient(KurrentClientSettings? settings = null) : base(settings, Ex var serializationSettings = settings?.Serialization ?? KurrentClientSerializationSettings.Default(); - _defaultSerializationContext = SerializationContext.From(serializationSettings); + _messageSerializer = MessageSerializerWrapper.From(serializationSettings); } void SwapStreamAppender(Exception ex) =>