-
Notifications
You must be signed in to change notification settings - Fork 78
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feature: [Host.Nats] Transport for Nats.io #286
Signed-off-by: Damien Blanchet <[email protected]>
- Loading branch information
1 parent
fb9aabd
commit ee8d7b2
Showing
19 changed files
with
605 additions
and
40 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
# Nats transport provider for SlimMessageBus <!-- omit in toc --> | ||
|
||
Please read the [Introduction](intro.md) before reading this provider documentation. | ||
|
||
- [Underlying Nats client](#underlying-nats-client) | ||
- [Configuration](#configuration) | ||
- [Message Serialization](#message-serialization) | ||
|
||
## Underlying Nats client | ||
|
||
This transport provider uses [Nats.Nets](https://www.nuget.org/packages/NATS.Net) client to connect to the Nats broker. | ||
|
||
## Configuration | ||
|
||
The configuration is arranged via the `.WithProviderNats(cfg => {})` method on the message bus builder. | ||
|
||
```cs | ||
services.AddSlimMessageBus(mbb => | ||
{ | ||
mbb.WithProviderNats(cfg => | ||
{ | ||
cfg.Endpoint = configuration["Nats:Endpoint"]; | ||
cfg.ClientName = configuration["Nats:ClientName"]; | ||
cfg.AuthOpts = new NatsAuthOpts() | ||
{ | ||
Username = configuration["Nats:Username"], | ||
Password = configuration["Nats:Password"] | ||
}; | ||
}); | ||
|
||
mbb.AddServicesFromAssemblyContaining<PingConsumer>(); | ||
mbb.AddJsonSerializer(); | ||
}); | ||
``` | ||
|
||
The `NatsMessageBusSettings` property is used to configure the underlying [Nats.Net library client](https://github.com/nats-io/nats.net). | ||
Please consult the Nats.Net library docs for more configuration options. | ||
|
||
## Message Serialization | ||
|
||
Nats offers native serialization functionality. This functionality conflicts with the serialization functionality provided by SlimMessageBus. We have chosen to leave the responsibility for serialization to SlimMessageBus and leave the default configuration of Nats serialization, which is raw serialization. This means that the message body is serialized as a byte array. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
15 changes: 15 additions & 0 deletions
15
src/SlimMessageBus.Host.Nats/Config/MessageBusBuilderExtensions.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
namespace SlimMessageBus.Host.Nats.Config; | ||
|
||
public static class MessageBusBuilderExtensions | ||
{ | ||
public static MessageBusBuilder WithProviderNats(this MessageBusBuilder mbb, Action<NatsMessageBusSettings> configure) | ||
{ | ||
if (mbb == null) throw new ArgumentNullException(nameof(mbb)); | ||
if (configure == null) throw new ArgumentNullException(nameof(configure)); | ||
|
||
var providerSettings = new NatsMessageBusSettings(); | ||
configure(providerSettings); | ||
|
||
return mbb.WithProvider(settings => new NatsMessageBus(settings, providerSettings)); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
global using Microsoft.Extensions.Logging; | ||
|
||
global using SlimMessageBus.Host.Services; | ||
|
||
global using NATS.Client.Core; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
namespace SlimMessageBus.Host.Nats; | ||
|
||
public static class NatsHeadersExtensions | ||
{ | ||
public static IReadOnlyDictionary<string, object> ToReadOnlyDictionary(this NatsHeaders headers) => | ||
headers == null ? new Dictionary<string, object>() : headers.ToDictionary(kvp => kvp.Key, kvp => (object) kvp.Value.ToString()); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
namespace SlimMessageBus.Host.Nats; | ||
|
||
using Microsoft.Extensions.Primitives; | ||
|
||
public class NatsMessageBus : MessageBusBase<NatsMessageBusSettings> | ||
{ | ||
private readonly ILogger _logger; | ||
private NatsConnection _connection; | ||
|
||
public NatsMessageBus(MessageBusSettings settings, NatsMessageBusSettings providerSettings) : base(settings, providerSettings) | ||
{ | ||
_logger = LoggerFactory.CreateLogger<NatsMessageBus>(); | ||
OnBuildProvider(); | ||
} | ||
|
||
protected override IMessageBusSettingsValidationService ValidationService => new NatsMessageBusSettingsValidationService(Settings, ProviderSettings); | ||
|
||
public bool IsConnected => _connection is {ConnectionState: NatsConnectionState.Open}; | ||
|
||
protected override void Build() | ||
{ | ||
base.Build(); | ||
AddInit(CreateConnection()); | ||
} | ||
|
||
private Task CreateConnection() | ||
{ | ||
try | ||
{ | ||
_connection = new NatsConnection(new NatsOpts | ||
{ | ||
Url = ProviderSettings.Endpoint, | ||
LoggerFactory = LoggerFactory, | ||
AuthOpts = ProviderSettings.AuthOpts, | ||
Verbose = false, | ||
Name = ProviderSettings.ClientName | ||
}); | ||
} | ||
catch (Exception e) | ||
{ | ||
_logger.LogError(e, "Could not initialize Nats connection: {ErrorMessage}", e.Message); | ||
} | ||
|
||
return Task.CompletedTask; | ||
} | ||
|
||
protected override Task CreateConsumers() | ||
{ | ||
foreach (var (subject, consumerSettings) in Settings.Consumers.GroupBy(x => x.Path).ToDictionary(x => x.Key, x => x.ToList())) | ||
{ | ||
var processor = new MessageProcessor<NatsMsg<byte[]>>(consumerSettings, this, (type, message) => Serializer.Deserialize(type, message.Data), subject, this); | ||
AddSubjectConsumer(subject, processor); | ||
} | ||
|
||
if (Settings.RequestResponse != null) | ||
{ | ||
var processor = new ResponseMessageProcessor<NatsMsg<byte[]>>(LoggerFactory, Settings.RequestResponse, this, message => message.Data); | ||
AddSubjectConsumer(Settings.RequestResponse.Path, processor); | ||
} | ||
|
||
return Task.CompletedTask; | ||
} | ||
|
||
private void AddSubjectConsumer(string subject, IMessageProcessor<NatsMsg<byte[]>> processor) | ||
{ | ||
_logger.LogInformation("Creating consumer for {Subject}", subject); | ||
var consumer = new NatsSubjectConsumer<byte[]>(LoggerFactory.CreateLogger<NatsSubjectConsumer<byte[]>>(), subject, _connection, processor); | ||
AddConsumer(consumer); | ||
} | ||
|
||
protected override async ValueTask DisposeAsyncCore() | ||
{ | ||
await base.DisposeAsyncCore().ConfigureAwait(false); | ||
|
||
if (_connection != null) | ||
{ | ||
await _connection.DisposeAsync(); | ||
_connection = null; | ||
} | ||
} | ||
|
||
protected override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBulk<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, | ||
CancellationToken cancellationToken) | ||
{ | ||
var messages = envelopes.Select(envelope => | ||
{ | ||
var messagePayload = Serializer.Serialize(envelope.MessageType, envelope.Message); | ||
|
||
var replyTo = envelope.Headers.TryGetValue("ReplyTo", out var replyToValue) ? replyToValue.ToString() : null; | ||
|
||
NatsMsg<byte[]> m = new() | ||
{ | ||
Data = messagePayload, | ||
Subject = path, | ||
Headers = new NatsHeaders(), | ||
ReplyTo = replyTo | ||
}; | ||
|
||
foreach (var header in envelope.Headers) | ||
{ | ||
m.Headers.Add(new KeyValuePair<string, StringValues>(header.Key, header.Value.ToString())); | ||
} | ||
|
||
return (Envelope: envelope, Message: m); | ||
}); | ||
|
||
var dispatched = new List<T>(envelopes.Count); | ||
foreach (var item in messages) | ||
{ | ||
try | ||
{ | ||
await _connection.PublishAsync(item.Message, cancellationToken: cancellationToken); | ||
dispatched.Add(item.Envelope); | ||
} | ||
catch (Exception ex) | ||
{ | ||
return new ProduceToTransportBulkResult<T>(dispatched, ex); | ||
} | ||
} | ||
|
||
return new ProduceToTransportBulkResult<T>(dispatched, null); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
namespace SlimMessageBus.Host.Nats; | ||
|
||
public class NatsMessageBusSettings : HasProviderExtensions | ||
{ | ||
public string Endpoint { get; set; } | ||
public string ClientName { get; set; } | ||
public NatsAuthOpts AuthOpts { get; set; } | ||
} |
25 changes: 25 additions & 0 deletions
25
src/SlimMessageBus.Host.Nats/NatsMessageBusSettingsValidationService.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
namespace SlimMessageBus.Host.Nats; | ||
|
||
public class NatsMessageBusSettingsValidationService(MessageBusSettings settings, NatsMessageBusSettings providerSettings) | ||
: DefaultMessageBusSettingsValidationService<NatsMessageBusSettings>(settings, providerSettings) | ||
{ | ||
public override void AssertSettings() | ||
{ | ||
base.AssertSettings(); | ||
|
||
if (ProviderSettings.ClientName is null) | ||
{ | ||
throw new ConfigurationMessageBusException(Settings, $"The {nameof(NatsMessageBusSettings)}.{nameof(NatsMessageBusSettings.ClientName)} must be set"); | ||
} | ||
|
||
if (ProviderSettings.Endpoint is null) | ||
{ | ||
throw new ConfigurationMessageBusException(Settings, $"The {nameof(NatsMessageBusSettings)}.{nameof(NatsMessageBusSettings.Endpoint)} must be set"); | ||
} | ||
|
||
if (ProviderSettings.AuthOpts is null) | ||
{ | ||
throw new ConfigurationMessageBusException(Settings, $"The {nameof(NatsMessageBusSettings)}.{nameof(NatsMessageBusSettings.AuthOpts)} must be set"); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
namespace SlimMessageBus.Host.Nats; | ||
|
||
public class NatsSubjectConsumer<TType>(ILogger logger, string subject, INatsConnection connection, IMessageProcessor<NatsMsg<TType>> messageProcessor) : AbstractConsumer(logger) | ||
{ | ||
private CancellationTokenSource _cancellationTokenSource; | ||
private INatsSub<TType> _subscription; | ||
|
||
protected override async Task OnStart() | ||
{ | ||
_cancellationTokenSource = new CancellationTokenSource(); | ||
_subscription ??= await connection.SubscribeCoreAsync<TType>(subject, cancellationToken: _cancellationTokenSource.Token); | ||
|
||
_ = Task.Run(async () => | ||
{ | ||
while (await _subscription.Msgs.WaitToReadAsync(_cancellationTokenSource.Token)) | ||
{ | ||
while (_subscription.Msgs.TryRead(out var msg)) | ||
{ | ||
await messageProcessor.ProcessMessage(msg, msg.Headers.ToReadOnlyDictionary(), cancellationToken: _cancellationTokenSource.Token).ConfigureAwait(false); | ||
} | ||
} | ||
}, _cancellationTokenSource.Token); | ||
} | ||
|
||
protected override async Task OnStop() | ||
{ | ||
if (_cancellationTokenSource != null) | ||
{ | ||
_cancellationTokenSource.Cancel(); | ||
_cancellationTokenSource.Dispose(); | ||
} | ||
|
||
if (_subscription != null) | ||
{ | ||
await _subscription.UnsubscribeAsync().ConfigureAwait(false); | ||
await _subscription.DisposeAsync(); | ||
} | ||
} | ||
} |
19 changes: 19 additions & 0 deletions
19
src/SlimMessageBus.Host.Nats/SlimMessageBus.Host.Nats.csproj
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
<Project Sdk="Microsoft.NET.Sdk"> | ||
|
||
<Import Project="../Host.Plugin.Properties.xml" /> | ||
|
||
<PropertyGroup> | ||
<Description>Nats transport provider for SlimMessageBus</Description> | ||
<PackageTags>Nats transport provider SlimMessageBus MessageBus bus facade messaging client</PackageTags> | ||
<PackageReleaseNotes /> | ||
</PropertyGroup> | ||
|
||
<ItemGroup> | ||
<ProjectReference Include="..\SlimMessageBus.Host\SlimMessageBus.Host.csproj" /> | ||
</ItemGroup> | ||
|
||
<ItemGroup> | ||
<PackageReference Include="NATS.Net" Version="2.3.2" /> | ||
</ItemGroup> | ||
|
||
</Project> |
Oops, something went wrong.