Skip to content

Commit

Permalink
Rename
Browse files Browse the repository at this point in the history
  • Loading branch information
MagnusSandgren committed Oct 14, 2024
1 parent ccec6d1 commit fc1872c
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ internal static void AddInfrastructure_Internal(InfrastructureBuilderContext bui
.AddTransient<ISubjectResourceRepository, SubjectResourceRepository>()

// Singleton
.AddSingleton<IIdempotentNotificationContext, IdempotentNotificationContext>()
.AddSingleton<INotificationProcessingContextFactory, NotificationProcessingContextFactory>()

// HttpClient
.AddHttpClients(configuration.GetSection(InfrastructureSettings.ConfigurationSectionName))
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@

namespace Digdir.Domain.Dialogporten.Infrastructure.Persistence.IdempotentNotifications;

public interface IIdempotentNotificationTransaction : IAsyncDisposable
public interface INotificationProcessingContext : IAsyncDisposable
{
Task Ack(CancellationToken cancellationToken = default);
Task Nack(CancellationToken cancellationToken = default);
Task AckHandler(string handlerName, CancellationToken cancellationToken = default);
Task<bool> HandlerIsAcked(string handlerName, CancellationToken cancellationToken = default);
}

internal sealed class IdempotentNotificationTransaction : IIdempotentNotificationTransaction
internal sealed class NotificationProcessingContext : INotificationProcessingContext
{
private readonly SemaphoreSlim _initializeLock = new(1, 1);
private readonly IServiceScopeFactory _serviceScopeFactory;
Expand All @@ -23,7 +23,7 @@ internal sealed class IdempotentNotificationTransaction : IIdempotentNotificatio
private IServiceScope? _serviceScope;
private bool _acknowledged;

public IdempotentNotificationTransaction(IServiceScopeFactory serviceScopeFactory, Guid eventId, Action<Guid> onDispose)
public NotificationProcessingContext(IServiceScopeFactory serviceScopeFactory, Guid eventId, Action<Guid> onDispose)
{
_serviceScopeFactory = serviceScopeFactory ?? throw new ArgumentNullException(nameof(serviceScopeFactory));
_onDispose = onDispose ?? throw new ArgumentNullException(nameof(onDispose));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using System.Collections.Concurrent;
using Digdir.Domain.Dialogporten.Domain.Common.EventPublisher;
using Microsoft.Extensions.DependencyInjection;

namespace Digdir.Domain.Dialogporten.Infrastructure.Persistence.IdempotentNotifications;

public interface INotificationProcessingContextFactory
{
Task<INotificationProcessingContext> CreateContext(IDomainEvent domainEvent, bool isFirstAttempt = false, CancellationToken cancellationToken = default);
INotificationProcessingContext GetExistingContext(Guid eventId);
}

internal sealed class NotificationProcessingContextFactory : INotificationProcessingContextFactory
{
private readonly ConcurrentDictionary<Guid, NotificationProcessingContext> _contextByEventId = new();
private readonly IServiceScopeFactory _serviceScopeFactory;

public NotificationProcessingContextFactory(IServiceScopeFactory serviceScopeFactory)
{
_serviceScopeFactory = serviceScopeFactory ?? throw new ArgumentNullException(nameof(serviceScopeFactory));
}

public async Task<INotificationProcessingContext> CreateContext(
IDomainEvent domainEvent,
bool isFirstAttempt = false,
CancellationToken cancellationToken = default)
{
var transaction = _contextByEventId.GetOrAdd(
key: domainEvent.EventId,
valueFactory: eventId => new(_serviceScopeFactory, eventId, onDispose: RemoveTransaction));
await transaction.Initialize(isFirstAttempt, cancellationToken);
return transaction;
}

public INotificationProcessingContext GetExistingContext(Guid eventId)
{
return _contextByEventId.TryGetValue(eventId, out var transaction)
? transaction
: throw new InvalidOperationException("Notification context not found.");
}

private void RemoveTransaction(Guid eventId) => _contextByEventId.TryRemove(eventId, out _);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@ internal sealed class IdempotentNotificationHandler<TNotification> :
where TNotification : IDomainEvent
{
private readonly INotificationHandler<TNotification> _decorated;
private readonly IIdempotentNotificationContext _context;
private readonly INotificationProcessingContextFactory _processingContextFactory;

public IdempotentNotificationHandler(INotificationHandler<TNotification> decorated, IIdempotentNotificationContext context)
public IdempotentNotificationHandler(INotificationHandler<TNotification> decorated, INotificationProcessingContextFactory processingContextFactory)
{
_decorated = decorated ?? throw new ArgumentNullException(nameof(decorated));
_context = context ?? throw new ArgumentNullException(nameof(context));
_processingContextFactory = processingContextFactory ?? throw new ArgumentNullException(nameof(processingContextFactory));
}

public async Task Handle(TNotification notification, CancellationToken cancellationToken)
{
var handlerName = _decorated.GetType().FullName!;
var transaction = _context.GetExistingTransaction(notification.EventId);
var transaction = _processingContextFactory.GetExistingContext(notification.EventId);
if (await transaction.HandlerIsAcked(handlerName, cancellationToken))
{
// I've handled this event before, so I'm not going to do it again.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,21 @@ public sealed class DomainEventConsumer<T> : IConsumer<T>
where T : class, IDomainEvent
{
private readonly IPublisher _publisher;
private readonly IIdempotentNotificationContext _notificationContext;
private readonly INotificationProcessingContextFactory _notificationProcessingContextFactory;

public DomainEventConsumer(IPublisher publisher, IIdempotentNotificationContext notificationContext)
public DomainEventConsumer(IPublisher publisher, INotificationProcessingContextFactory notificationProcessingContextFactory)
{
_publisher = publisher ?? throw new ArgumentNullException(nameof(publisher));
_notificationContext = notificationContext ?? throw new ArgumentNullException(nameof(notificationContext));
_notificationProcessingContextFactory = notificationProcessingContextFactory ?? throw new ArgumentNullException(nameof(notificationProcessingContextFactory));
}

public async Task Consume(ConsumeContext<T> context)
{
var isFirstAttempt = IsFirstAttempt(context);
await using var transaction = await _notificationContext
.BeginTransaction(context.Message, isFirstAttempt, context.CancellationToken);
await using var notificationContext = await _notificationProcessingContextFactory
.CreateContext(context.Message, isFirstAttempt, context.CancellationToken);
await _publisher.Publish(context.Message, context.CancellationToken);
await transaction.Ack(context.CancellationToken);
await notificationContext.Ack(context.CancellationToken);
}

private static bool IsFirstAttempt(ConsumeContext<T> context)
Expand Down

0 comments on commit fc1872c

Please sign in to comment.