Skip to content

Commit

Permalink
feature: [Host.Nats] Transport for Nats.io #286
Browse files Browse the repository at this point in the history
Signed-off-by: Damien Blanchet <[email protected]>
  • Loading branch information
dblanchetatlectra committed Aug 1, 2024
1 parent fb9aabd commit dfc2652
Show file tree
Hide file tree
Showing 19 changed files with 600 additions and 40 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ jobs:
mqtt_username: "(empty)"
mqtt_password: "(empty)"
mqtt_secure: false

nats_endpoint: "nats://localhost:4222"

rabbitmq_connectionstring: amqp://localhost

Expand Down
41 changes: 41 additions & 0 deletions docs/provider_nats.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Nats transport provider for SlimMessageBus <!-- omit in toc -->

Please read the [Introduction](intro.md) before reading this provider documentation.

- [Underlying Nats client](#underlying-nats-client)
- [Configuration](#configuration)
- [Message Serialization](#message-serialization)

## Underlying Nats client

This transport provider uses [Nats.Nets](https://www.nuget.org/packages/NATS.Net) client to connect to the Nats broker.

## Configuration

The configuration is arranged via the `.WithProviderNats(cfg => {})` method on the message bus builder.

```cs
services.AddSlimMessageBus(mbb =>
{
mbb.WithProviderNats(cfg =>
{
cfg.Endpoint = configuration["Nats:Endpoint"];
cfg.ClientName = configuration["Nats:ClientName"];
cfg.AuthOpts = new NatsAuthOpts()
{
Username = configuration["Nats:Username"],
Password = configuration["Nats:Password"]
};
});

mbb.AddServicesFromAssemblyContaining<PingConsumer>();
mbb.AddJsonSerializer();
});
```

The `NatsMessageBusSettings` property is used to configure the underlying [Nats.Net library client](https://github.com/nats-io/nats.net).
Please consult the Nats.Net library docs for more configuration options.

## Message Serialization

Nats offers native serialization functionality. This functionality conflicts with the serialization functionality provided by SlimMessageBus. We have chosen to leave the responsibility for serialization to SlimMessageBus and leave the default configuration of Nats serialization, which is raw serialization. This means that the message body is serialized as a byte array.
9 changes: 9 additions & 0 deletions src/Infrastructure/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ services:
- "11002:11002"
networks:
- slim

nats:
container_name: slim.nats
image: nats:2.10
ports:
- 4222:4222
networks:
- slim


networks:
slim: {}
Expand Down
15 changes: 15 additions & 0 deletions src/SlimMessageBus.Host.Nats/Config/MessageBusBuilderExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace SlimMessageBus.Host.Nats.Config;

public static class MessageBusBuilderExtensions
{
public static MessageBusBuilder WithProviderNats(this MessageBusBuilder mbb, Action<NatsMessageBusSettings> configure)
{
if (mbb == null) throw new ArgumentNullException(nameof(mbb));
if (configure == null) throw new ArgumentNullException(nameof(configure));

var providerSettings = new NatsMessageBusSettings();
configure(providerSettings);

return mbb.WithProvider(settings => new NatsMessageBus(settings, providerSettings));
}
}
5 changes: 5 additions & 0 deletions src/SlimMessageBus.Host.Nats/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
global using Microsoft.Extensions.Logging;

global using SlimMessageBus.Host.Services;

global using NATS.Client.Core;
7 changes: 7 additions & 0 deletions src/SlimMessageBus.Host.Nats/NatsHeadersExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace SlimMessageBus.Host.Nats;

public static class NatsHeadersExtensions
{
public static IReadOnlyDictionary<string, object> ToReadOnlyDictionary(this NatsHeaders headers) =>
headers == null ? new Dictionary<string, object>() : headers.ToDictionary(kvp => kvp.Key, kvp => (object) kvp.Value.ToString());
}
123 changes: 123 additions & 0 deletions src/SlimMessageBus.Host.Nats/NatsMessageBus.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
namespace SlimMessageBus.Host.Nats;

using Microsoft.Extensions.Primitives;

public class NatsMessageBus : MessageBusBase<NatsMessageBusSettings>
{
private readonly ILogger _logger;
private NatsConnection _connection;

public NatsMessageBus(MessageBusSettings settings, NatsMessageBusSettings providerSettings) : base(settings, providerSettings)
{
_logger = LoggerFactory.CreateLogger<NatsMessageBus>();
OnBuildProvider();
}

protected override IMessageBusSettingsValidationService ValidationService => new NatsMessageBusSettingsValidationService(Settings, ProviderSettings);

public bool IsConnected => _connection is {ConnectionState: NatsConnectionState.Open};

protected override void Build()
{
base.Build();
AddInit(CreateConnection());
}

private Task CreateConnection()
{
try
{
_connection = new NatsConnection(new NatsOpts
{
Url = ProviderSettings.Endpoint,
LoggerFactory = LoggerFactory,
AuthOpts = ProviderSettings.AuthOpts,
Verbose = false,
Name = ProviderSettings.ClientName
});
}
catch (Exception e)
{
_logger.LogError(e, "Could not initialize Nats connection: {ErrorMessage}", e.Message);
}

return Task.CompletedTask;
}

protected override Task CreateConsumers()
{
foreach (var (subject, consumerSettings) in Settings.Consumers.GroupBy(x => x.Path).ToDictionary(x => x.Key, x => x.ToList()))
{
var processor = new MessageProcessor<NatsMsg<byte[]>>(consumerSettings, this, (type, message) => Serializer.Deserialize(type, message.Data), subject, this);
AddSubjectConsumer(subject, processor);
}

if (Settings.RequestResponse != null)
{
var processor = new ResponseMessageProcessor<NatsMsg<byte[]>>(LoggerFactory, Settings.RequestResponse, this, message => message.Data);
AddSubjectConsumer(Settings.RequestResponse.Path, processor);
}

return Task.CompletedTask;
}

private void AddSubjectConsumer(string subject, IMessageProcessor<NatsMsg<byte[]>> processor)
{
_logger.LogInformation("Creating consumer for {Subject}", subject);
var consumer = new NatsSubjectConsumer<byte[]>(LoggerFactory.CreateLogger<NatsSubjectConsumer<byte[]>>(), subject, _connection, processor);
AddConsumer(consumer);
}

protected override async ValueTask DisposeAsyncCore()
{
await base.DisposeAsyncCore().ConfigureAwait(false);

if (_connection != null)
{
await _connection.DisposeAsync();
_connection = null;
}
}

protected override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBulk<T>(IReadOnlyCollection<T> envelopes, string subject, IMessageBusTarget targetBus,
CancellationToken cancellationToken)
{
var messages = envelopes.Select(envelope =>
{
var messagePayload = Serializer.Serialize(envelope.MessageType, envelope.Message);

var replyTo = envelope.Headers.TryGetValue("ReplyTo", out var replyToValue) ? replyToValue.ToString() : null;

NatsMsg<byte[]> m = new()
{
Data = messagePayload,
Subject = subject,
Headers = new NatsHeaders(),
ReplyTo = replyTo
};

foreach (var header in envelope.Headers)
{
m.Headers.Add(new KeyValuePair<string, StringValues>(header.Key, header.Value.ToString()));
}

return (Envelope: envelope, Message: m);
});

var dispatched = new List<T>(envelopes.Count);
foreach (var item in messages)
{
try
{
await _connection.PublishAsync(item.Message, cancellationToken: cancellationToken);
dispatched.Add(item.Envelope);
}
catch (Exception ex)
{
return new ProduceToTransportBulkResult<T>(dispatched, ex);
}
}

return new ProduceToTransportBulkResult<T>(dispatched, null);
}
}
8 changes: 8 additions & 0 deletions src/SlimMessageBus.Host.Nats/NatsMessageBusSettings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace SlimMessageBus.Host.Nats;

public class NatsMessageBusSettings : HasProviderExtensions
{
public string Endpoint { get; set; }
public string ClientName { get; set; }
public NatsAuthOpts AuthOpts { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
namespace SlimMessageBus.Host.Nats;

public class NatsMessageBusSettingsValidationService(MessageBusSettings settings, NatsMessageBusSettings providerSettings)
: DefaultMessageBusSettingsValidationService<NatsMessageBusSettings>(settings, providerSettings)
{
public override void AssertSettings()
{
base.AssertSettings();

if (ProviderSettings.ClientName is null)
{
throw new ConfigurationMessageBusException(Settings, $"The {nameof(NatsMessageBusSettings)}.{nameof(NatsMessageBusSettings.ClientName)} must be set");
}

if (ProviderSettings.Endpoint is null)
{
throw new ConfigurationMessageBusException(Settings, $"The {nameof(NatsMessageBusSettings)}.{nameof(NatsMessageBusSettings.Endpoint)} must be set");
}

if (ProviderSettings.AuthOpts is null)
{
throw new ConfigurationMessageBusException(Settings, $"The {nameof(NatsMessageBusSettings)}.{nameof(NatsMessageBusSettings.AuthOpts)} must be set");
}
}
}
34 changes: 34 additions & 0 deletions src/SlimMessageBus.Host.Nats/NatsSubjectConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
namespace SlimMessageBus.Host.Nats;

public class NatsSubjectConsumer<TType>(ILogger logger, string subject, INatsConnection connection, IMessageProcessor<NatsMsg<TType>> messageProcessor) : AbstractConsumer(logger)
{
private CancellationTokenSource _cancellationTokenSource;
private INatsSub<TType> _subscription;

protected override async Task OnStart()
{
_cancellationTokenSource = new CancellationTokenSource();
_subscription ??= await connection.SubscribeCoreAsync<TType>(subject, cancellationToken: _cancellationTokenSource.Token);

_ = Task.Run(async () =>
{
while (await _subscription.Msgs.WaitToReadAsync(_cancellationTokenSource.Token))
{
while (_subscription.Msgs.TryRead(out var msg))
{
await messageProcessor.ProcessMessage(msg, msg.Headers.ToReadOnlyDictionary(), cancellationToken: _cancellationTokenSource.Token).ConfigureAwait(false);
}
}
}, _cancellationTokenSource.Token);
}

protected override async Task OnStop()
{
_cancellationTokenSource?.Cancel();

if (_subscription != null)
{
await _subscription.UnsubscribeAsync().ConfigureAwait(false);
}
}
}
19 changes: 19 additions & 0 deletions src/SlimMessageBus.Host.Nats/SlimMessageBus.Host.Nats.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">

<Import Project="../Host.Plugin.Properties.xml" />

<PropertyGroup>
<Description>Nats transport provider for SlimMessageBus</Description>
<PackageTags>Nats transport provider SlimMessageBus MessageBus bus facade messaging client</PackageTags>
<PackageReleaseNotes />
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\SlimMessageBus.Host\SlimMessageBus.Host.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="NATS.Net" Version="2.3.2" />
</ItemGroup>

</Project>
Loading

0 comments on commit dfc2652

Please sign in to comment.