Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add retry for distributed transaction in postgres #265

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion src/BuildingBlocks/EFCore/AppDbContextBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ namespace BuildingBlocks.EFCore;
using Core.Event;
using Core.Model;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging;
using Web;
using Exception = System.Exception;
Expand All @@ -13,6 +14,7 @@ public abstract class AppDbContextBase : DbContext, IDbContext
{
private readonly ICurrentUserProvider? _currentUserProvider;
private readonly ILogger<AppDbContextBase>? _logger;
private IDbContextTransaction _currentTransaction;

protected AppDbContextBase(DbContextOptions options, ICurrentUserProvider? currentUserProvider = null, ILogger<AppDbContextBase>? logger = null) :
base(options)
Expand All @@ -26,10 +28,52 @@ protected override void OnModelCreating(ModelBuilder builder)
{
}

public IExecutionStrategy CreateExecutionStrategy() => Database.CreateExecutionStrategy();

public async Task BeginTransactionAsync(CancellationToken cancellationToken = default)
{
if (_currentTransaction != null) return;

_currentTransaction = await Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken);
}

public async Task CommitTransactionAsync(CancellationToken cancellationToken = default)
{
try
{
await SaveChangesAsync(cancellationToken);
await _currentTransaction?.CommitAsync(cancellationToken)!;
}
catch
{
await RollbackTransactionAsync(cancellationToken);
throw;
}
finally
{
_currentTransaction?.Dispose();
_currentTransaction = null;
}
}

public async Task RollbackTransactionAsync(CancellationToken cancellationToken = default)
{
try
{
await _currentTransaction?.RollbackAsync(cancellationToken)!;
}
finally
{
_currentTransaction?.Dispose();
_currentTransaction = null;
}
}


//ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency#execution-strategies-and-transactions
public Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default)
{
var strategy = Database.CreateExecutionStrategy();
var strategy = CreateExecutionStrategy();
return strategy.ExecuteAsync(async () =>
{
await using var transaction =
Expand Down
24 changes: 19 additions & 5 deletions src/BuildingBlocks/EFCore/EfTxBehavior.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace BuildingBlocks.EFCore;

using System.Transactions;
using PersistMessageProcessor;
using Polly;

public class EfTxBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
where TRequest : notnull, IRequest<TResponse>
Expand Down Expand Up @@ -48,10 +49,10 @@ public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TRe
nameof(EfTxBehavior<TRequest, TResponse>),
typeof(TRequest).FullName);

// ref: https://learn.microsoft.com/en-us/ef/core/saving/transactions#using-systemtransactions
using var scope = new TransactionScope(TransactionScopeOption.Required,
new TransactionOptions { IsolationLevel = IsolationLevel.ReadCommitted },
TransactionScopeAsyncFlowOption.Enabled);
//ref: https://learn.microsoft.com/en-us/ef/core/saving/transactions#using-systemtransactions
using var scope = new TransactionScope(TransactionScopeOption.Required,
new TransactionOptions { IsolationLevel = IsolationLevel.ReadCommitted },
TransactionScopeAsyncFlowOption.Enabled);

var response = await next();

Expand All @@ -71,10 +72,23 @@ public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TRe

await _eventDispatcher.SendAsync(domainEvents.ToArray(), typeof(TRequest), cancellationToken);

await _dbContextBase.SaveChangesAsync(cancellationToken);
// Save data to database with some retry policy in distributed transaction
await _dbContextBase.RetryOnFailure(async () =>
{
await _dbContextBase.SaveChangesAsync(cancellationToken);
});

// Save data to database with some retry policy in distributed transaction
await _dbContextBase.RetryOnFailure(async () =>
{
await _dbContextBase.SaveChangesAsync(cancellationToken);
});

await _persistMessageDbContext.SaveChangesAsync(cancellationToken);

scope.Complete();

return response;
}
}
}
2 changes: 0 additions & 2 deletions src/BuildingBlocks/EFCore/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ public static IServiceCollection AddCustomDbContext<TContext>(
dbOptions =>
{
dbOptions.MigrationsAssembly(typeof(TContext).Assembly.GetName().Name);
//ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency
// dbOptions.EnableRetryOnFailure(3, TimeSpan.FromSeconds(1), null);
})
// https://github.com/efcore/EFCore.NamingConventions
.UseSnakeCaseNamingConvention();
Expand Down
8 changes: 7 additions & 1 deletion src/BuildingBlocks/EFCore/IDbContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,16 @@

namespace BuildingBlocks.EFCore;

using Microsoft.EntityFrameworkCore.Storage;

public interface IDbContext
{
DbSet<TEntity> Set<TEntity>() where TEntity : class;
IReadOnlyList<IDomainEvent> GetDomainEvents();
Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default);
Task<int> SaveChangesAsync(CancellationToken cancellationToken = default);
Task BeginTransactionAsync(CancellationToken cancellationToken = default);
Task CommitTransactionAsync(CancellationToken cancellationToken = default);
Task RollbackTransactionAsync(CancellationToken cancellationToken = default);
IExecutionStrategy CreateExecutionStrategy();
Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace BuildingBlocks.PersistMessageProcessor.Data;
using Core.Model;
using Microsoft.Extensions.Logging;
using Exception = System.Exception;
using IsolationLevel = System.Data.IsolationLevel;

public class PersistMessageDbContext : DbContext, IPersistMessageDbContext
{
Expand All @@ -28,6 +29,27 @@ protected override void OnModelCreating(ModelBuilder builder)
builder.ToSnakeCaseTables();
}

//ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency#execution-strategies-and-transactions
public Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default)
{
var strategy = Database.CreateExecutionStrategy();
return strategy.ExecuteAsync(async () =>
{
await using var transaction =
await Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken);
try
{
await SaveChangesAsync(cancellationToken);
await transaction.CommitAsync(cancellationToken);
}
catch
{
await transaction.RollbackAsync(cancellationToken);
throw;
}
});
}

public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
{
OnBeforeSaving();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ public interface IPersistMessageDbContext
{
DbSet<PersistMessage> PersistMessages { get; }
Task<int> SaveChangesAsync(CancellationToken cancellationToken = default);
Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
namespace BuildingBlocks.PersistMessageProcessor;

using Microsoft.EntityFrameworkCore;
using Polly;

public class PersistMessageProcessor : IPersistMessageProcessor
{
Expand Down Expand Up @@ -199,7 +200,10 @@ await _persistMessageDbContext.PersistMessages.AddAsync(
deliveryType),
cancellationToken);

await _persistMessageDbContext.SaveChangesAsync(cancellationToken);
await _persistMessageDbContext.RetryOnFailure(async () =>
{
await _persistMessageDbContext.SaveChangesAsync(cancellationToken);
});

_logger.LogInformation(
"Message with id: {MessageID} and delivery type: {DeliveryType} saved in persistence message store.",
Expand All @@ -215,6 +219,9 @@ private async Task ChangeMessageStatusAsync(PersistMessage message, Cancellation

_persistMessageDbContext.PersistMessages.Update(message);

await _persistMessageDbContext.SaveChangesAsync(cancellationToken);
await _persistMessageDbContext.RetryOnFailure(async () =>
{
await _persistMessageDbContext.SaveChangesAsync(cancellationToken);
});
}
}
20 changes: 20 additions & 0 deletions src/BuildingBlocks/Polly/Extensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
namespace BuildingBlocks.Polly;

using global::Polly;
using Exception = System.Exception;

public static class Extensions
{
public static T RetryOnFailure<T>(this object retrySource, Func<T> action, int retryCount = 3)
{
var retryPolicy = Policy
.Handle<Exception>()
.Retry(retryCount, (exception, retryAttempt) =>
{
Console.WriteLine($"Retry attempt: {retryAttempt}");
Console.WriteLine($"Exception: {exception.Message}");
});

return retryPolicy.Execute(action);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ public class FakeCreateAircraftCommand : AutoFaker<CreateAircraft>
public FakeCreateAircraftCommand()
{
RuleFor(r => r.Id, _ => NewId.NextGuid());
RuleFor(r => r.ManufacturingYear, _ => 2000);
}
}
44 changes: 43 additions & 1 deletion src/Services/Identity/src/Identity/Data/IdentityContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public sealed class IdentityContext : IdentityDbContext<User, Role, Guid,
UserClaim, UserRole, UserLogin, RoleClaim, UserToken>, IDbContext
{
private readonly ILogger<IdentityContext>? _logger;
private IDbContextTransaction _currentTransaction;

public IdentityContext(DbContextOptions<IdentityContext> options, ILogger<IdentityContext>? logger = null) : base(options)
{
Expand All @@ -36,10 +37,51 @@ protected override void OnModelCreating(ModelBuilder builder)
builder.ToSnakeCaseTables();
}

public IExecutionStrategy CreateExecutionStrategy() => Database.CreateExecutionStrategy();

public async Task BeginTransactionAsync(CancellationToken cancellationToken = default)
{
if (_currentTransaction != null) return;

_currentTransaction = await Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken);
}

public async Task CommitTransactionAsync(CancellationToken cancellationToken = default)
{
try
{
await SaveChangesAsync(cancellationToken);
await _currentTransaction?.CommitAsync(cancellationToken)!;
}
catch
{
await RollbackTransactionAsync(cancellationToken);
throw;
}
finally
{
_currentTransaction?.Dispose();
_currentTransaction = null;
}
}

public async Task RollbackTransactionAsync(CancellationToken cancellationToken = default)
{
try
{
await _currentTransaction?.RollbackAsync(cancellationToken)!;
}
finally
{
_currentTransaction?.Dispose();
_currentTransaction = null;
}
}

//ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency#execution-strategies-and-transactions
public Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default)
{
var strategy = Database.CreateExecutionStrategy();
var strategy = CreateExecutionStrategy();
return strategy.ExecuteAsync(async () =>
{
await using var transaction =
Expand Down