Skip to content

Commit

Permalink
chore: code review taken into account
Browse files Browse the repository at this point in the history
Signed-off-by: Damien Blanchet <[email protected]>
  • Loading branch information
dblanchetatlectra authored and zarusz committed Aug 4, 2024
1 parent bb5df0b commit 1a03b27
Show file tree
Hide file tree
Showing 19 changed files with 241 additions and 36 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ jobs:
_redis_connectionstring: ${{ secrets.redis_connectionstring }}

sqlserver_connectionstring: ${{ secrets.sqlserver_connectionstring }}

# Connects to the local Test Containers

# need to postpone until feature https://github.com/zarusz/SlimMessageBus/issues/110 is implemented
Expand All @@ -142,13 +142,13 @@ jobs:
mqtt_username: "(empty)"
mqtt_password: "(empty)"
mqtt_secure: false

rabbitmq_connectionstring: amqp://localhost

redis_connectionstring: localhost:6379

#sqlserver_connectionstring: "Server=localhost;Initial Catalog=SlimMessageBus_Outbox;User ID=sa;Password=SuperSecretP@55word;TrustServerCertificate=true;MultipleActiveResultSets=true;"

nats_endpoint: "nats://localhost:4222"

- name: SonarCloud - SonarScanner End
Expand Down
1 change: 1 addition & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@
- [RabbitMq](provider_rabbitmq.md)
- [Redis](provider_redis.md)
- [SQL](provider_sql.md)
- [NATS](provider_nats.md)
- [Serialization Plugins](serialization.md)
22 changes: 11 additions & 11 deletions docs/provider_nats.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,21 @@ This transport provider uses [Nats.Nets](https://www.nuget.org/packages/NATS.Net
The configuration is arranged via the `.WithProviderNats(cfg => {})` method on the message bus builder.

```cs
services.AddSlimMessageBus(mbb =>
builder.Services.AddSlimMessageBus(messageBusBuilder =>
{
mbb.WithProviderNats(cfg =>
messageBusBuilder.WithProviderNats(cfg =>
{
cfg.Endpoint = configuration["Nats:Endpoint"];
cfg.ClientName = configuration["Nats:ClientName"];
cfg.AuthOpts = new NatsAuthOpts()
{
Username = configuration["Nats:Username"],
Password = configuration["Nats:Password"]
};
cfg.Endpoint = endpoint;
cfg.ClientName = $"MyService_{Environment.MachineName}";
cfg.AuthOpts = NatsAuthOpts.Default;
});

mbb.AddServicesFromAssemblyContaining<PingConsumer>();
mbb.AddJsonSerializer();
messageBusBuilder
.Produce<PingMessage>(x => x.DefaultTopic(topic))
.Consume<PingMessage>(x => x.Topic(topic).Instances(1));

messageBusBuilder.AddServicesFromAssemblyContaining<PingConsumer>();
messageBusBuilder.AddJsonSerializer();
});
```

Expand Down
24 changes: 24 additions & 0 deletions docs/provider_nats.t.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Nats transport provider for SlimMessageBus <!-- omit in toc -->

Please read the [Introduction](intro.md) before reading this provider documentation.

- [Underlying Nats client](#underlying-nats-client)
- [Configuration](#configuration)
- [Message Serialization](#message-serialization)

## Underlying Nats client

This transport provider uses [Nats.Nets](https://www.nuget.org/packages/NATS.Net) client to connect to the Nats broker.

## Configuration

The configuration is arranged via the `.WithProviderNats(cfg => {})` method on the message bus builder.

@[:cs](../src/Samples/Sample.Nats.WebApi/Program.cs,ExampleConfiguringMessageBus)

The `NatsMessageBusSettings` property is used to configure the underlying [Nats.Net library client](https://github.com/nats-io/nats.net).
Please consult the Nats.Net library docs for more configuration options.

## Message Serialization

Nats offers native serialization functionality. This functionality conflicts with the serialization functionality provided by SlimMessageBus. We have chosen to leave the responsibility for serialization to SlimMessageBus and leave the default configuration of Nats serialization, which is raw serialization. This means that the message body is serialized as a byte array.
7 changes: 7 additions & 0 deletions src/Samples/Infrastructure/Nats-SingleNode/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
version: '2'
services:
nats:
container_name: slim.nats
image: nats:2.10
ports:
- 4222:4222
16 changes: 16 additions & 0 deletions src/Samples/Sample.Nats.WebApi/PingConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
namespace Sample.Nats.WebApi;

using SlimMessageBus;

public class PingConsumer(ILogger<PingConsumer> logger) : IConsumer<PingMessage>, IConsumerWithContext
{
private readonly ILogger _logger = logger;

public IConsumerContext Context { get; set; }

public Task OnHandle(PingMessage message)
{
_logger.LogInformation("Got message {Counter} on topic {Path}", message.Counter, Context.Path);
return Task.CompletedTask;
}
}
4 changes: 4 additions & 0 deletions src/Samples/Sample.Nats.WebApi/PingMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
namespace Sample.Nats.WebApi;

public record PingMessage(int Counter, Guid Value);

57 changes: 57 additions & 0 deletions src/Samples/Sample.Nats.WebApi/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
using NATS.Client.Core;

using Sample.Nats.WebApi;

using SecretStore;

using SlimMessageBus;
using SlimMessageBus.Host;
using SlimMessageBus.Host.Nats.Config;
using SlimMessageBus.Host.Serialization.Json;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();

Secrets.Load(@"..\..\..\..\..\secrets.txt");

var endpoint = Secrets.Service.PopulateSecrets(builder.Configuration["Nats:Endpoint"]);
var topic = Secrets.Service.PopulateSecrets(builder.Configuration["Nats:Topic"]);

// doc:fragment:ExampleConfiguringMessageBus
builder.Services.AddSlimMessageBus(messageBusBuilder =>
{
messageBusBuilder.WithProviderNats(cfg =>
{
cfg.Endpoint = endpoint;
cfg.ClientName = $"MyService_{Environment.MachineName}";
cfg.AuthOpts = NatsAuthOpts.Default;
});

messageBusBuilder
.Produce<PingMessage>(x => x.DefaultTopic(topic))
.Consume<PingMessage>(x => x.Topic(topic).Instances(1));

messageBusBuilder.AddServicesFromAssemblyContaining<PingConsumer>();
messageBusBuilder.AddJsonSerializer();
});
// doc:fragment:ExampleConfiguringMessageBus

var app = builder.Build();

if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}

app.UseHttpsRedirection();

app.MapGet("/publish-message", (IMessageBus messageBus, CancellationToken cancellationToken) =>
{
PingMessage pingMessage = new(0, Guid.NewGuid());
messageBus.Publish(pingMessage, cancellationToken: cancellationToken);
});

app.Run();
14 changes: 14 additions & 0 deletions src/Samples/Sample.Nats.WebApi/Properties/launchSettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"profiles": {
"Samples.Nats.WebApi": {
"commandName": "Project",
"dotnetRunMessages": true,
"launchBrowser": true,
"launchUrl": "swagger",
"applicationUrl": "https://localhost:7276;http://localhost:5044",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
}
}
}
20 changes: 20 additions & 0 deletions src/Samples/Sample.Nats.WebApi/Sample.Nats.WebApi.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="8.0.2"/>
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.4.0"/>
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\SlimMessageBus.Host.Nats\SlimMessageBus.Host.Nats.csproj" />
<ProjectReference Include="..\..\SlimMessageBus.Host.Serialization.Json\SlimMessageBus.Host.Serialization.Json.csproj" />
<ProjectReference Include="..\..\Tools\SecretStore\SecretStore.csproj" />
</ItemGroup>

</Project>
12 changes: 12 additions & 0 deletions src/Samples/Sample.Nats.WebApi/appsettings.Development.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"Nats": {
"Endpoint": "nats://localhost:4222",
"Topic": "test"
}
}
13 changes: 13 additions & 0 deletions src/Samples/Sample.Nats.WebApi/appsettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"AllowedHosts": "*",
"Nats": {
"Endpoint": "{{nats_endpoint}}",
"Topic": "{{nats_subject}}"
}
}
2 changes: 1 addition & 1 deletion src/SlimMessageBus.Host.Nats/NatsHeadersExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace SlimMessageBus.Host.Nats;

public static class NatsHeadersExtensions
static internal class NatsHeadersExtensions
{
public static IReadOnlyDictionary<string, object> ToReadOnlyDictionary(this NatsHeaders headers) =>
headers == null ? new Dictionary<string, object>() : headers.ToDictionary(kvp => kvp.Key, kvp => (object) kvp.Value.ToString());
Expand Down
23 changes: 18 additions & 5 deletions src/SlimMessageBus.Host.Nats/NatsMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ public NatsMessageBus(MessageBusSettings settings, NatsMessageBusSettings provid
protected override void Build()
{
base.Build();
AddInit(CreateConnection());
AddInit(CreateConnectionAsync());
}

private Task CreateConnection()
private Task CreateConnectionAsync()
{
try
{
Expand All @@ -44,8 +44,15 @@ private Task CreateConnection()
return Task.CompletedTask;
}

protected override Task CreateConsumers()
protected override async Task CreateConsumers()
{
if (_connection == null)
{
throw new ConsumerMessageBusException("The connection is not available at this time");
}

await base.CreateConsumers();

foreach (var (subject, consumerSettings) in Settings.Consumers.GroupBy(x => x.Path).ToDictionary(x => x.Key, x => x.ToList()))
{
var processor = new MessageProcessor<NatsMsg<byte[]>>(consumerSettings, this, (type, message) => Serializer.Deserialize(type, message.Data), subject, this);
Expand All @@ -57,8 +64,6 @@ protected override Task CreateConsumers()
var processor = new ResponseMessageProcessor<NatsMsg<byte[]>>(LoggerFactory, Settings.RequestResponse, this, message => message.Data);
AddSubjectConsumer(Settings.RequestResponse.Path, processor);
}

return Task.CompletedTask;
}

private void AddSubjectConsumer(string subject, IMessageProcessor<NatsMsg<byte[]>> processor)
Expand All @@ -82,6 +87,14 @@ protected override async ValueTask DisposeAsyncCore()
protected override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBulk<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus,
CancellationToken cancellationToken)
{

await EnsureInitFinished();

if (_connection == null)
{
throw new ProducerMessageBusException("The connection is not available at this time");
}

var messages = envelopes.Select(envelope =>
{
var messagePayload = Serializer.Serialize(envelope.MessageType, envelope.Message);
Expand Down
30 changes: 18 additions & 12 deletions src/SlimMessageBus.Host.Nats/NatsSubjectConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,33 +1,39 @@
#nullable enable
namespace SlimMessageBus.Host.Nats;

public class NatsSubjectConsumer<TType>(ILogger logger, string subject, INatsConnection connection, IMessageProcessor<NatsMsg<TType>> messageProcessor) : AbstractConsumer(logger)
{
private CancellationTokenSource _cancellationTokenSource;
private INatsSub<TType> _subscription;
private INatsSub<TType>? _subscription;
private Task? _messageConsumerTask;

protected override async Task OnStart()
{
_cancellationTokenSource = new CancellationTokenSource();
_subscription ??= await connection.SubscribeCoreAsync<TType>(subject, cancellationToken: _cancellationTokenSource.Token);
_subscription ??= await connection.SubscribeCoreAsync<TType>(subject, cancellationToken: CancellationToken);

_ = Task.Run(async () =>
_messageConsumerTask = Task.Factory.StartNew(async () =>
{
while (await _subscription.Msgs.WaitToReadAsync(_cancellationTokenSource.Token))
try
{
while (_subscription.Msgs.TryRead(out var msg))
while (await _subscription.Msgs.WaitToReadAsync(CancellationToken))
{
await messageProcessor.ProcessMessage(msg, msg.Headers.ToReadOnlyDictionary(), cancellationToken: _cancellationTokenSource.Token).ConfigureAwait(false);
while (_subscription.Msgs.TryRead(out var msg))
{
await messageProcessor.ProcessMessage(msg, msg.Headers.ToReadOnlyDictionary(), cancellationToken: CancellationToken).ConfigureAwait(false);
}
}
}
}, _cancellationTokenSource.Token);
catch (OperationCanceledException ex)
{
Logger.LogInformation(ex, "Consumer task was cancelled");
}
}, CancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).Unwrap();
}

protected override async Task OnStop()
{
if (_cancellationTokenSource != null)
if (_messageConsumerTask != null)
{
_cancellationTokenSource.Cancel();
_cancellationTokenSource.Dispose();
await _messageConsumerTask.ConfigureAwait(false);
}

if (_subscription != null)
Expand Down
17 changes: 17 additions & 0 deletions src/SlimMessageBus.sln
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,13 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SlimMessageBus.Host.Nats",
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SlimMessageBus.Host.Nats.Test", "Tests\SlimMessageBus.Host.Nats.Test\SlimMessageBus.Host.Nats.Test.csproj", "{9C464F95-B620-4BDF-B9AC-D95C465D9793}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample.Nats.WebApi", "Samples\Sample.Nats.WebApi\Sample.Nats.WebApi.csproj", "{46C40625-D1AC-4EA1-9562-4F1837D417CE}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Nats-SingleNode", "Nats-SingleNode", "{5250E48D-36C7-4214-8D7E-5924A9E337C6}"
ProjectSection(SolutionItems) = preProject
Samples\Infrastructure\Nats-SingleNode\docker-compose.yml = Samples\Infrastructure\Nats-SingleNode\docker-compose.yml
EndProjectSection
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -812,6 +819,14 @@ Global
{9C464F95-B620-4BDF-B9AC-D95C465D9793}.Release|Any CPU.Build.0 = Release|Any CPU
{9C464F95-B620-4BDF-B9AC-D95C465D9793}.Release|x86.ActiveCfg = Release|Any CPU
{9C464F95-B620-4BDF-B9AC-D95C465D9793}.Release|x86.Build.0 = Release|Any CPU
{46C40625-D1AC-4EA1-9562-4F1837D417CE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{46C40625-D1AC-4EA1-9562-4F1837D417CE}.Debug|Any CPU.Build.0 = Debug|Any CPU
{46C40625-D1AC-4EA1-9562-4F1837D417CE}.Debug|x86.ActiveCfg = Debug|Any CPU
{46C40625-D1AC-4EA1-9562-4F1837D417CE}.Debug|x86.Build.0 = Debug|Any CPU
{46C40625-D1AC-4EA1-9562-4F1837D417CE}.Release|Any CPU.ActiveCfg = Release|Any CPU
{46C40625-D1AC-4EA1-9562-4F1837D417CE}.Release|Any CPU.Build.0 = Release|Any CPU
{46C40625-D1AC-4EA1-9562-4F1837D417CE}.Release|x86.ActiveCfg = Release|Any CPU
{46C40625-D1AC-4EA1-9562-4F1837D417CE}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -891,6 +906,8 @@ Global
{969AAB37-AEFC-40F9-9F89-B4B5E45E13C9} = {D3D6FD9A-968A-45BB-86C7-4527C72A057E}
{57290E47-603D-46D0-BF13-AC1D6481380A} = {9291D340-B4FA-44A3-8060-C14743FB1712}
{9C464F95-B620-4BDF-B9AC-D95C465D9793} = {9F005B5C-A856-4351-8C0C-47A8B785C637}
{46C40625-D1AC-4EA1-9562-4F1837D417CE} = {A5B15524-93B8-4CCE-AC6D-A22984498BA0}
{5250E48D-36C7-4214-8D7E-5924A9E337C6} = {59F88FB5-6D19-4520-87E8-227B3539BBB3}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {435A0D65-610C-4B84-B1AA-2C7FBE72DB80}
Expand Down
Loading

0 comments on commit 1a03b27

Please sign in to comment.