Skip to content

Commit

Permalink
[DEVEX-222] Added MessageSerializationContext to pass additional info…
Browse files Browse the repository at this point in the history
…rmation like category name
  • Loading branch information
oskardudycz committed Jan 29, 2025
1 parent 786caf3 commit 6446ea2
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,11 @@ out var messageTypeClrTypeName
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
static ReadOnlySpan<byte> InjectSerializationMetadata(
public static ReadOnlyMemory<byte> InjectSerializationMetadata(
this ReadOnlyMemory<byte> eventMetadata, SerializationMetadata serializationMetadata
) {
if (serializationMetadata == SerializationMetadata.None || !serializationMetadata.IsValid)
return eventMetadata.Span;
return ReadOnlyMemory<byte>.Empty;

return eventMetadata.IsEmpty
? JsonSerializer.SerializeToUtf8Bytes(serializationMetadata)
Expand Down
26 changes: 17 additions & 9 deletions src/Kurrent.Client/Core/Serialization/MessageSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
using System.Diagnostics.CodeAnalysis;
using EventStore.Client.Diagnostics;
using Kurrent.Client.Core.Serialization;
using Kurrent.Client.Tests.Streams.Serialization;

namespace EventStore.Client.Serialization;

public interface IMessageSerializer {
public EventData Serialize(object value);
public EventData Serialize(Message value, MessageSerializationContext context);

public bool TryDeserialize(EventRecord messageRecord, out object? deserialized);
}
Expand All @@ -16,12 +15,21 @@ public class MessageSerializer(
ISerializer jsonSerializer,
IMessageTypeResolutionStrategy messageTypeResolutionStrategy
) : IMessageSerializer {
public EventData Serialize(object value) {
var eventType = messageTypeResolutionStrategy.ResolveTypeName(value);
var bytes = serializer.Serialize(value);
var metadata = jsonSerializer.Serialize(SerializationMetadata.From(value.GetType()));

return new EventData(Uuid.NewUuid(), eventType, bytes, metadata, contentType.ToMessageContentType());
readonly string _messageContentType = contentType.ToMessageContentType();

public EventData Serialize(Message message, MessageSerializationContext serializationContext) {
var (data, metadata, eventId) = message;
var eventType = messageTypeResolutionStrategy.ResolveTypeName(message, serializationContext);
var serializedData = serializer.Serialize(data);
var serializedMetadata = metadata != null ? jsonSerializer.Serialize(metadata) : ReadOnlyMemory<byte>.Empty;

return new EventData(
eventId,
eventType,
serializedData,
serializedMetadata.InjectSerializationMetadata(SerializationMetadata.From(data.GetType())).ToArray(),
_messageContentType
);
}

public bool TryDeserialize(EventRecord messageRecord, out object? deserialized) {
Expand Down
3 changes: 0 additions & 3 deletions src/Kurrent.Client/Core/Serialization/MessageTypeMapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@

namespace Kurrent.Client.Tests.Streams.Serialization;

// TODO: Discuss how to proceed with that and whether to move the Schema Registry code here
// The scanning part and registration seems to be more robust there
// I used this for simplicity
public interface IMessageTypeMapper {
void AddType(Type messageType, string messageTypeName);
string? GetTypeName(Type messageType);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
using System.Diagnostics.CodeAnalysis;
using System.Text.Json;
using Kurrent.Client.Tests.Streams.Serialization;
using EventStore.Client.Diagnostics;
using Kurrent.Client.Core.Serialization;

namespace EventStore.Client.Serialization;

public interface IMessageTypeResolutionStrategy {
string ResolveTypeName(object messageData);
string ResolveTypeName(Message message, MessageSerializationContext serializationContext);

#if NET48
bool TryResolveClrType(EventRecord messageRecord, out Type? type);
Expand All @@ -18,10 +20,10 @@ public class MessageTypeResolutionStrategyWrapper(
IMessageTypeMapper messageTypeMapper,
IMessageTypeResolutionStrategy messageTypeResolutionStrategy
) : IMessageTypeResolutionStrategy {
public string ResolveTypeName(object messageData) {
public string ResolveTypeName(Message message, MessageSerializationContext serializationContext) {
return messageTypeMapper.GetOrAddTypeName(
messageData.GetType(),
_ => messageTypeResolutionStrategy.ResolveTypeName(messageData)
message.Data.GetType(),
_ => messageTypeResolutionStrategy.ResolveTypeName(message, serializationContext)
);
}

Expand All @@ -43,8 +45,8 @@ public bool TryResolveClrType(EventRecord messageRecord, [NotNullWhen(true)] out

public class DefaultMessageTypeResolutionStrategy
: IMessageTypeResolutionStrategy {
public string ResolveTypeName(object messageData) =>
messageData.GetType().FullName!;
public string ResolveTypeName(Message message, MessageSerializationContext serializationContext) =>
$"{serializationContext.CategoryName}-{JsonNamingPolicy.SnakeCaseLower.ConvertName(message.Data.GetType().Name.ToLower())}";

#if NET48
public bool TryResolveClrType(EventRecord messageRecord, out Type? type) {
Expand Down
58 changes: 51 additions & 7 deletions src/Kurrent.Client/Core/Serialization/SerializationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,66 @@ namespace Kurrent.Client.Core.Serialization;

using static ContentTypeExtensions;

public readonly struct Message {
/// <summary>
/// The raw bytes of the event data.
/// </summary>
public readonly object Data;

/// <summary>
/// The raw bytes of the event metadata.
/// </summary>
public readonly object? Metadata;

/// <summary>
/// The <see cref="Uuid"/> of the event, used as part of the idempotent write check.
/// </summary>
public readonly Uuid EventId;

/// <summary>
/// Constructs a new <see cref="Message"/>.
/// </summary>
/// <param name="data">The raw bytes of the event data.</param>
/// <param name="metadata">The raw bytes of the event metadata.</param>
/// <param name="eventId">The <see cref="Uuid"/> of the event, used as part of the idempotent write check.</param>
/// <exception cref="ArgumentOutOfRangeException"></exception>
public Message(object data, object? metadata = null, Uuid? eventId = null) {
if (eventId == Uuid.Empty)
throw new ArgumentOutOfRangeException(nameof(eventId));

EventId = eventId ?? Uuid.NewUuid();
Data = data;
Metadata = metadata;
}

public void Deconstruct(out object data, out object? metadata, out Uuid eventId) {
data = Data;
metadata = Metadata;
eventId = EventId;
}
}

public record MessageSerializationContext(
string StreamName,
ContentType? ContentType = null
) {
public string CategoryName =>
// TODO: This is dangerous, as separator can be changed in database settings
StreamName.Split('-').FirstOrDefault() ?? "no_stream_category";
}

public record SerializationContext(
SchemaRegistry SchemaRegistry,
ContentType DefaultContentType,
AutomaticDeserialization AutomaticDeserialization
) {
public EventData[] Serialize(IEnumerable<object> messages) {
return Serialize(DefaultContentType, messages);
}

public EventData[] Serialize(ContentType contentType, IEnumerable<object> messages) {
public EventData[] Serialize(IEnumerable<Message> messages, MessageSerializationContext context) {
if (AutomaticDeserialization == AutomaticDeserialization.Disabled)
throw new InvalidOperationException("Cannot serialize, automatic deserialization is disabled");

var serializer = SchemaRegistry.GetSerializer(contentType);
var serializer = SchemaRegistry.GetSerializer(context.ContentType ?? DefaultContentType);

return messages.Select(serializer.Serialize).ToArray();
return messages.Select(m => serializer.Serialize(m, context)).ToArray();
}

#if NET48
Expand Down
2 changes: 1 addition & 1 deletion src/Kurrent.Client/Kurrent.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
<ItemGroup Condition="'$(TargetFramework)' == 'net48'">
<!-- <PackageReference Include="System.Net.Http" Version="4.3.4"/>-->
<PackageReference Include="System.Net.Http.WinHttpHandler" Version="8.0.0"/>
<PackageReference Include="System.Text.Json" Version="8.0.5"/>
<PackageReference Include="System.Text.Json" Version="9.0.1" />
<PackageReference Include="System.Threading.Channels" Version="8.0.0"/>
<PackageReference Include="BouncyCastle.Cryptography" Version="2.3.1"/>
</ItemGroup>
Expand Down
6 changes: 4 additions & 2 deletions src/Kurrent.Client/Streams/KurrentClient.Append.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public Task<IWriteResult> AppendToStreamAsync(
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default
) {
var eventsData = _defaultSerializationContext.Serialize(events);
var serializationContext = new MessageSerializationContext(streamName);
var eventsData = _defaultSerializationContext.Serialize(events.Select(e => new Message(e)), serializationContext);

return AppendToStreamAsync(
streamName,
Expand Down Expand Up @@ -71,7 +72,8 @@ public Task<IWriteResult> AppendToStreamAsync(
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default
) {
var eventsData = _defaultSerializationContext.Serialize(events);
var serializationContext = new MessageSerializationContext(streamName);
var eventsData = _defaultSerializationContext.Serialize(events.Select(e => new Message(e)), serializationContext);

return AppendToStreamAsync(
streamName,
Expand Down

0 comments on commit 6446ea2

Please sign in to comment.