Skip to content

Commit

Permalink
[DEVEX-222] Added message type maps registration
Browse files Browse the repository at this point in the history
  • Loading branch information
oskardudycz committed Jan 29, 2025
1 parent 1e65b62 commit b395583
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 56 deletions.
18 changes: 18 additions & 0 deletions src/Kurrent.Client/Core/KurrentClientSerializationSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ public class KurrentClientSerializationSettings {
public AutomaticDeserialization AutomaticDeserialization { get; set; } = AutomaticDeserialization.Disabled;
public IMessageTypeResolutionStrategy? MessageTypeResolutionStrategy { get; set; }

public IDictionary<Type, string> MessageTypeMap { get; set; } = new Dictionary<Type, string>();

public static KurrentClientSerializationSettings Default(
Action<KurrentClientSerializationSettings>? configure = null
) {
Expand Down Expand Up @@ -45,6 +47,22 @@ public KurrentClientSerializationSettings UseBytesSerializer(ISerializer seriali
return this;
}

public KurrentClientSerializationSettings RegisterMessageType<T>(string typeName) =>
RegisterMessageType(typeof(T), typeName);

public KurrentClientSerializationSettings RegisterMessageType(Type type, string typeName) {
MessageTypeMap.Add(type, typeName);

return this;
}
public KurrentClientSerializationSettings RegisterMessageTypes(IDictionary<Type, string> typeMap) {
foreach (var map in typeMap) {
MessageTypeMap.Add(map.Key, map.Value);
}

return this;
}

public KurrentClientSerializationSettings EnableAutomaticDeserialization() {
AutomaticDeserialization = AutomaticDeserialization.Enabled;

Expand Down
42 changes: 42 additions & 0 deletions src/Kurrent.Client/Core/Serialization/Message.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using EventStore.Client;

namespace Kurrent.Client.Core.Serialization;

public readonly struct Message {
/// <summary>
/// The raw bytes of the event data.
/// </summary>
public readonly object Data;

/// <summary>
/// The raw bytes of the event metadata.
/// </summary>
public readonly object? Metadata;

/// <summary>
/// The <see cref="Uuid"/> of the event, used as part of the idempotent write check.
/// </summary>
public readonly Uuid EventId;

/// <summary>
/// Constructs a new <see cref="Message"/>.
/// </summary>
/// <param name="data">The raw bytes of the event data.</param>
/// <param name="metadata">The raw bytes of the event metadata.</param>
/// <param name="eventId">The <see cref="Uuid"/> of the event, used as part of the idempotent write check.</param>
/// <exception cref="ArgumentOutOfRangeException"></exception>
public Message(object data, object? metadata = null, Uuid? eventId = null) {
if (eventId == Uuid.Empty)
throw new ArgumentOutOfRangeException(nameof(eventId));

EventId = eventId ?? Uuid.NewUuid();
Data = data;
Metadata = metadata;
}

public void Deconstruct(out object data, out object? metadata, out Uuid eventId) {
data = Data;
metadata = Metadata;
eventId = EventId;
}
}
15 changes: 12 additions & 3 deletions src/Kurrent.Client/Core/Serialization/MessageSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Diagnostics.CodeAnalysis;
using EventStore.Client.Diagnostics;
using Kurrent.Client.Core.Serialization;

Expand All @@ -7,8 +8,12 @@ namespace EventStore.Client.Serialization;

public interface IMessageSerializer {
public EventData Serialize(Message value, MessageSerializationContext context);


#if NET48
public bool TryDeserialize(EventRecord messageRecord, out object? deserialized);
#else
public bool TryDeserialize(EventRecord messageRecord, [NotNullWhen(true)] out object? deserialized);
#endif
}

public record MessageSerializationContext(
Expand Down Expand Up @@ -63,8 +68,12 @@ public EventData Serialize(Message message, MessageSerializationContext serializ
serializationContext.ContentType.ToMessageContentType()
);
}


#if NET48
public bool TryDeserialize(EventRecord messageRecord, out object? deserialized) {
#else
public bool TryDeserialize(EventRecord messageRecord, [NotNullWhen(true)] out object? deserialized) {
#endif
if (!schemaRegistry
.MessageTypeResolutionStrategy
.TryResolveClrType(messageRecord, out var clrType)) {
Expand All @@ -76,6 +85,6 @@ public bool TryDeserialize(EventRecord messageRecord, out object? deserialized)
.GetSerializer(FromMessageContentType(messageRecord.ContentType))
.Deserialize(messageRecord.Data, clrType!);

return true;
return deserialized != null;
}
}
41 changes: 0 additions & 41 deletions src/Kurrent.Client/Core/Serialization/MessageSerializerWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,6 @@

namespace Kurrent.Client.Core.Serialization;

using static ContentTypeExtensions;

public readonly struct Message {
/// <summary>
/// The raw bytes of the event data.
/// </summary>
public readonly object Data;

/// <summary>
/// The raw bytes of the event metadata.
/// </summary>
public readonly object? Metadata;

/// <summary>
/// The <see cref="Uuid"/> of the event, used as part of the idempotent write check.
/// </summary>
public readonly Uuid EventId;

/// <summary>
/// Constructs a new <see cref="Message"/>.
/// </summary>
/// <param name="data">The raw bytes of the event data.</param>
/// <param name="metadata">The raw bytes of the event metadata.</param>
/// <param name="eventId">The <see cref="Uuid"/> of the event, used as part of the idempotent write check.</param>
/// <exception cref="ArgumentOutOfRangeException"></exception>
public Message(object data, object? metadata = null, Uuid? eventId = null) {
if (eventId == Uuid.Empty)
throw new ArgumentOutOfRangeException(nameof(eventId));

EventId = eventId ?? Uuid.NewUuid();
Data = data;
Metadata = metadata;
}

public void Deconstruct(out object data, out object? metadata, out Uuid eventId) {
data = Data;
metadata = Metadata;
eventId = EventId;
}
}

public class MessageSerializerWrapper(
IMessageSerializer messageSerializer,
AutomaticDeserialization automaticDeserialization
Expand Down
21 changes: 13 additions & 8 deletions src/Kurrent.Client/Core/Serialization/MessageTypeMapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@

namespace Kurrent.Client.Tests.Streams.Serialization;

public interface IMessageTypeMapper {
void AddType(Type messageType, string messageTypeName);
public interface IMessageTypeRegistry {
void AddType(Type messageType, string messageTypeName);
string? GetTypeName(Type messageType);
string GetOrAddTypeName(Type clrType, Func<Type, string> getTypeName);
Type? GetClrType(string messageTypeName);
Type? GetOrAddClrType(string messageTypeName, Func<string, Type?> getClrType);
}

public class MessageTypeMapper : IMessageTypeMapper {
public static readonly MessageTypeMapper Instance = new MessageTypeMapper();
public class MessageTypeRegistry : IMessageTypeRegistry {
readonly ConcurrentDictionary<string, Type?> _typeMap = new();
readonly ConcurrentDictionary<Type, string> _typeNameMap = new();

Expand Down Expand Up @@ -63,9 +62,15 @@ public string GetOrAddTypeName(Type clrType, Func<Type, string> getTypeName) =>
}

public static class MessageTypeMapperExtensions {
public static void AddType<T>(this IMessageTypeMapper messageTypeMapper, string messageTypeName) =>
messageTypeMapper.AddType(typeof(T), messageTypeName);
public static void AddType<T>(this IMessageTypeRegistry messageTypeRegistry, string messageTypeName) =>
messageTypeRegistry.AddType(typeof(T), messageTypeName);

public static string? GetTypeName<TMessageType>(this IMessageTypeMapper messageTypeMapper) =>
messageTypeMapper.GetTypeName(typeof(TMessageType));
public static void AddTypes(this IMessageTypeRegistry messageTypeRegistry, IDictionary<Type, string> typeMap) {
foreach (var map in typeMap) {
messageTypeRegistry.AddType(map.Key, map.Value);
}
}

public static string? GetTypeName<TMessageType>(this IMessageTypeRegistry messageTypeRegistry) =>
messageTypeRegistry.GetTypeName(typeof(TMessageType));
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ public interface IMessageTypeResolutionStrategy {
}

public class MessageTypeResolutionStrategyWrapper(
IMessageTypeMapper messageTypeMapper,
IMessageTypeRegistry messageTypeRegistry,
IMessageTypeResolutionStrategy messageTypeResolutionStrategy
) : IMessageTypeResolutionStrategy {
public string ResolveTypeName(Message message, MessageSerializationContext serializationContext) {
return messageTypeMapper.GetOrAddTypeName(
return messageTypeRegistry.GetOrAddTypeName(
message.Data.GetType(),
_ => messageTypeResolutionStrategy.ResolveTypeName(message, serializationContext)
);
Expand All @@ -32,7 +32,7 @@ public bool TryResolveClrType(EventRecord messageRecord, out Type? type) {
#else
public bool TryResolveClrType(EventRecord messageRecord, [NotNullWhen(true)] out Type? type) {
#endif
type = messageTypeMapper.GetOrAddClrType(
type = messageTypeRegistry.GetOrAddClrType(
messageRecord.EventType,
_ => messageTypeResolutionStrategy.TryResolveClrType(messageRecord, out var resolvedType)
? resolvedType
Expand Down
5 changes: 4 additions & 1 deletion src/Kurrent.Client/Core/Serialization/SchemaRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ public ISerializer GetSerializer(ContentType schemaType) =>
serializers[schemaType];

public static SchemaRegistry From(KurrentClientSerializationSettings settings) {
var messageTypeRegistry = new MessageTypeRegistry();
messageTypeRegistry.AddTypes(settings.MessageTypeMap);

var messageTypeResolutionStrategy = new MessageTypeResolutionStrategyWrapper(
MessageTypeMapper.Instance,
messageTypeRegistry,
settings.MessageTypeResolutionStrategy ?? new DefaultMessageTypeResolutionStrategy()
);

Expand Down

0 comments on commit b395583

Please sign in to comment.