diff --git a/src/Kurrent.Client/Core/Common/Diagnostics/EventMetadataExtensions.cs b/src/Kurrent.Client/Core/Common/Diagnostics/EventMetadataExtensions.cs index d257eb194..c0bf1d172 100644 --- a/src/Kurrent.Client/Core/Common/Diagnostics/EventMetadataExtensions.cs +++ b/src/Kurrent.Client/Core/Common/Diagnostics/EventMetadataExtensions.cs @@ -115,11 +115,11 @@ out var messageTypeClrTypeName } [MethodImpl(MethodImplOptions.AggressiveInlining)] - static ReadOnlySpan InjectSerializationMetadata( + public static ReadOnlyMemory InjectSerializationMetadata( this ReadOnlyMemory eventMetadata, SerializationMetadata serializationMetadata ) { if (serializationMetadata == SerializationMetadata.None || !serializationMetadata.IsValid) - return eventMetadata.Span; + return ReadOnlyMemory.Empty; return eventMetadata.IsEmpty ? JsonSerializer.SerializeToUtf8Bytes(serializationMetadata) diff --git a/src/Kurrent.Client/Core/Serialization/MessageSerializer.cs b/src/Kurrent.Client/Core/Serialization/MessageSerializer.cs index 3b166df36..5fbc8548a 100644 --- a/src/Kurrent.Client/Core/Serialization/MessageSerializer.cs +++ b/src/Kurrent.Client/Core/Serialization/MessageSerializer.cs @@ -1,11 +1,10 @@ -using System.Diagnostics.CodeAnalysis; +using EventStore.Client.Diagnostics; using Kurrent.Client.Core.Serialization; -using Kurrent.Client.Tests.Streams.Serialization; namespace EventStore.Client.Serialization; public interface IMessageSerializer { - public EventData Serialize(object value); + public EventData Serialize(Message value, MessageSerializationContext context); public bool TryDeserialize(EventRecord messageRecord, out object? deserialized); } @@ -16,12 +15,21 @@ public class MessageSerializer( ISerializer jsonSerializer, IMessageTypeResolutionStrategy messageTypeResolutionStrategy ) : IMessageSerializer { - public EventData Serialize(object value) { - var eventType = messageTypeResolutionStrategy.ResolveTypeName(value); - var bytes = serializer.Serialize(value); - var metadata = jsonSerializer.Serialize(SerializationMetadata.From(value.GetType())); - - return new EventData(Uuid.NewUuid(), eventType, bytes, metadata, contentType.ToMessageContentType()); + readonly string _messageContentType = contentType.ToMessageContentType(); + + public EventData Serialize(Message message, MessageSerializationContext serializationContext) { + var (data, metadata, eventId) = message; + var eventType = messageTypeResolutionStrategy.ResolveTypeName(message, serializationContext); + var serializedData = serializer.Serialize(data); + var serializedMetadata = metadata != null ? jsonSerializer.Serialize(metadata) : ReadOnlyMemory.Empty; + + return new EventData( + eventId, + eventType, + serializedData, + serializedMetadata.InjectSerializationMetadata(SerializationMetadata.From(data.GetType())).ToArray(), + _messageContentType + ); } public bool TryDeserialize(EventRecord messageRecord, out object? deserialized) { diff --git a/src/Kurrent.Client/Core/Serialization/MessageTypeMapper.cs b/src/Kurrent.Client/Core/Serialization/MessageTypeMapper.cs index 49fcd11a6..447d6b3b7 100644 --- a/src/Kurrent.Client/Core/Serialization/MessageTypeMapper.cs +++ b/src/Kurrent.Client/Core/Serialization/MessageTypeMapper.cs @@ -2,9 +2,6 @@ namespace Kurrent.Client.Tests.Streams.Serialization; -// TODO: Discuss how to proceed with that and whether to move the Schema Registry code here -// The scanning part and registration seems to be more robust there -// I used this for simplicity public interface IMessageTypeMapper { void AddType(Type messageType, string messageTypeName); string? GetTypeName(Type messageType); diff --git a/src/Kurrent.Client/Core/Serialization/MessageTypeResolutionStrategy.cs b/src/Kurrent.Client/Core/Serialization/MessageTypeResolutionStrategy.cs index 8d7229451..08b76ef65 100644 --- a/src/Kurrent.Client/Core/Serialization/MessageTypeResolutionStrategy.cs +++ b/src/Kurrent.Client/Core/Serialization/MessageTypeResolutionStrategy.cs @@ -1,11 +1,13 @@ using System.Diagnostics.CodeAnalysis; +using System.Text.Json; using Kurrent.Client.Tests.Streams.Serialization; using EventStore.Client.Diagnostics; +using Kurrent.Client.Core.Serialization; namespace EventStore.Client.Serialization; public interface IMessageTypeResolutionStrategy { - string ResolveTypeName(object messageData); + string ResolveTypeName(Message message, MessageSerializationContext serializationContext); #if NET48 bool TryResolveClrType(EventRecord messageRecord, out Type? type); @@ -18,10 +20,10 @@ public class MessageTypeResolutionStrategyWrapper( IMessageTypeMapper messageTypeMapper, IMessageTypeResolutionStrategy messageTypeResolutionStrategy ) : IMessageTypeResolutionStrategy { - public string ResolveTypeName(object messageData) { + public string ResolveTypeName(Message message, MessageSerializationContext serializationContext) { return messageTypeMapper.GetOrAddTypeName( - messageData.GetType(), - _ => messageTypeResolutionStrategy.ResolveTypeName(messageData) + message.Data.GetType(), + _ => messageTypeResolutionStrategy.ResolveTypeName(message, serializationContext) ); } @@ -43,8 +45,8 @@ public bool TryResolveClrType(EventRecord messageRecord, [NotNullWhen(true)] out public class DefaultMessageTypeResolutionStrategy : IMessageTypeResolutionStrategy { - public string ResolveTypeName(object messageData) => - messageData.GetType().FullName!; + public string ResolveTypeName(Message message, MessageSerializationContext serializationContext) => + $"{serializationContext.CategoryName}-{JsonNamingPolicy.SnakeCaseLower.ConvertName(message.Data.GetType().Name.ToLower())}"; #if NET48 public bool TryResolveClrType(EventRecord messageRecord, out Type? type) { diff --git a/src/Kurrent.Client/Core/Serialization/SerializationContext.cs b/src/Kurrent.Client/Core/Serialization/SerializationContext.cs index 9c54b18da..f2ace008c 100644 --- a/src/Kurrent.Client/Core/Serialization/SerializationContext.cs +++ b/src/Kurrent.Client/Core/Serialization/SerializationContext.cs @@ -5,22 +5,66 @@ namespace Kurrent.Client.Core.Serialization; using static ContentTypeExtensions; +public readonly struct Message { + /// + /// The raw bytes of the event data. + /// + public readonly object Data; + + /// + /// The raw bytes of the event metadata. + /// + public readonly object? Metadata; + + /// + /// The of the event, used as part of the idempotent write check. + /// + public readonly Uuid EventId; + + /// + /// Constructs a new . + /// + /// The raw bytes of the event data. + /// The raw bytes of the event metadata. + /// The of the event, used as part of the idempotent write check. + /// + public Message(object data, object? metadata = null, Uuid? eventId = null) { + if (eventId == Uuid.Empty) + throw new ArgumentOutOfRangeException(nameof(eventId)); + + EventId = eventId ?? Uuid.NewUuid(); + Data = data; + Metadata = metadata; + } + + public void Deconstruct(out object data, out object? metadata, out Uuid eventId) { + data = Data; + metadata = Metadata; + eventId = EventId; + } +} + +public record MessageSerializationContext( + string StreamName, + ContentType? ContentType = null +) { + public string CategoryName => + // TODO: This is dangerous, as separator can be changed in database settings + StreamName.Split('-').FirstOrDefault() ?? "no_stream_category"; +} + public record SerializationContext( SchemaRegistry SchemaRegistry, ContentType DefaultContentType, AutomaticDeserialization AutomaticDeserialization ) { - public EventData[] Serialize(IEnumerable messages) { - return Serialize(DefaultContentType, messages); - } - - public EventData[] Serialize(ContentType contentType, IEnumerable messages) { + public EventData[] Serialize(IEnumerable messages, MessageSerializationContext context) { if (AutomaticDeserialization == AutomaticDeserialization.Disabled) throw new InvalidOperationException("Cannot serialize, automatic deserialization is disabled"); - var serializer = SchemaRegistry.GetSerializer(contentType); + var serializer = SchemaRegistry.GetSerializer(context.ContentType ?? DefaultContentType); - return messages.Select(serializer.Serialize).ToArray(); + return messages.Select(m => serializer.Serialize(m, context)).ToArray(); } #if NET48 diff --git a/src/Kurrent.Client/Kurrent.Client.csproj b/src/Kurrent.Client/Kurrent.Client.csproj index e6652b773..59ebc0861 100644 --- a/src/Kurrent.Client/Kurrent.Client.csproj +++ b/src/Kurrent.Client/Kurrent.Client.csproj @@ -101,7 +101,7 @@ - + diff --git a/src/Kurrent.Client/Streams/KurrentClient.Append.cs b/src/Kurrent.Client/Streams/KurrentClient.Append.cs index 76ab0d404..a7257f389 100644 --- a/src/Kurrent.Client/Streams/KurrentClient.Append.cs +++ b/src/Kurrent.Client/Streams/KurrentClient.Append.cs @@ -37,7 +37,8 @@ public Task AppendToStreamAsync( UserCredentials? userCredentials = null, CancellationToken cancellationToken = default ) { - var eventsData = _defaultSerializationContext.Serialize(events); + var serializationContext = new MessageSerializationContext(streamName); + var eventsData = _defaultSerializationContext.Serialize(events.Select(e => new Message(e)), serializationContext); return AppendToStreamAsync( streamName, @@ -71,7 +72,8 @@ public Task AppendToStreamAsync( UserCredentials? userCredentials = null, CancellationToken cancellationToken = default ) { - var eventsData = _defaultSerializationContext.Serialize(events); + var serializationContext = new MessageSerializationContext(streamName); + var eventsData = _defaultSerializationContext.Serialize(events.Select(e => new Message(e)), serializationContext); return AppendToStreamAsync( streamName,