Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add polly for handle resilience #111

Merged
merged 1 commit into from
Jan 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -346,10 +346,15 @@ dotnet_naming_rule.stylecop_instance_fields_must_be_private_rule.severity

# Private fields must be camelCase
# https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1306.md
dotnet_naming_symbols.stylecop_private_fields_group.applicable_accessibilities = private
dotnet_naming_symbols.stylecop_private_fields_group.applicable_kinds = field
dotnet_naming_rule.stylecop_private_fields_must_be_camel_case_rule.symbols = stylecop_private_fields_group
dotnet_naming_rule.stylecop_private_fields_must_be_camel_case_rule.style = camel_case_style
dotnet_naming_rule.private_members_with_underscore.symbols = private_fields
dotnet_naming_rule.private_members_with_underscore.style = prefix_underscore
dotnet_naming_rule.private_members_with_underscore.severity = warning
dotnet_naming_symbols.private_fields.applicable_kinds = field
dotnet_naming_symbols.private_fields.applicable_accessibilities = private
dotnet_naming_style.prefix_underscore.capitalization = camel_case
dotnet_naming_style.prefix_underscore.required_prefix = _


# Local variables must be camelCase
# https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1312.md
Expand Down
76 changes: 47 additions & 29 deletions src/BuildingBlocks/EFCore/AppDbContextBase.cs
Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
using System.Collections.Immutable;
using BuildingBlocks.Core.Event;
using BuildingBlocks.Core.Model;
using BuildingBlocks.Web;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;

namespace BuildingBlocks.EFCore;

using System.Data;
using System.Net;
using System.Security.Claims;
using global::Polly;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

public abstract class AppDbContextBase : DbContext, IDbContext
{
private readonly ICurrentUserProvider _currentUserProvider;

private readonly IHttpContextAccessor _httpContextAccessor;
private IDbContextTransaction _currentTransaction;

protected AppDbContextBase(DbContextOptions options, ICurrentUserProvider currentUserProvider = null) :
protected AppDbContextBase(DbContextOptions options, IHttpContextAccessor httpContextAccessor = default) :
base(options)
{
_currentUserProvider = currentUserProvider;
_httpContextAccessor = httpContextAccessor;
}

protected override void OnModelCreating(ModelBuilder builder)
Expand Down Expand Up @@ -68,44 +72,49 @@ public async Task RollbackTransactionAsync(CancellationToken cancellationToken =
}
}

public override Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
{
OnBeforeSaving();
try
{
return base.SaveChangesAsync(cancellationToken);
return await base.SaveChangesAsync(cancellationToken);
}
//ref: https://learn.microsoft.com/en-us/ef/core/saving/concurrency?tabs=fluent-api#resolving-concurrency-conflicts
catch (DbUpdateConcurrencyException ex)
{
foreach (var entry in ex.Entries)
var logger = _httpContextAccessor?.HttpContext?.RequestServices
.GetRequiredService<ILogger<AppDbContextBase>>();

var entry = ex.Entries.SingleOrDefault();

if (entry == null)
{
var proposedValues = entry.CurrentValues;
var databaseValues = entry.GetDatabaseValues();
return 0;
}

if (databaseValues != null)
{
// update the original values with the database values
entry.OriginalValues.SetValues(databaseValues);
var currentValue = entry.CurrentValues;
var databaseValue = await entry.GetDatabaseValuesAsync(cancellationToken);

logger?.LogInformation("The entity being updated is already use by another Thread!" +
" database value is: {DatabaseValue} and current value is: {CurrentValue}",
databaseValue, currentValue);

// check for conflicts
if (!proposedValues.Equals(databaseValues))
var policy = Policy.Handle<DbUpdateConcurrencyException>()
.WaitAndRetryAsync(retryCount: 3,
sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(1),
onRetry: (exception, timeSpan, retryCount, context) =>
{
if (entry.Entity.GetType() == typeof(IAggregate))
{
// merge concurrency conflict for IAggregate
}
else
if (exception != null)
{
throw new NotSupportedException(
"Don't know how to handle concurrency conflicts for "
+ entry.Metadata.Name);
logger?.LogError(exception,
"Request failed with {StatusCode}. Waiting {TimeSpan} before next retry. Retry attempt {RetryCount}.",
HttpStatusCode.Conflict,
timeSpan,
retryCount);
}
}
}
}
});

return base.SaveChangesAsync(cancellationToken);
return await policy.ExecuteAsync(async () => await base.SaveChangesAsync(cancellationToken));
}
}

Expand Down Expand Up @@ -133,7 +142,7 @@ private void OnBeforeSaving()
foreach (var entry in ChangeTracker.Entries<IAggregate>())
{
var isAuditable = entry.Entity.GetType().IsAssignableTo(typeof(IAggregate));
var userId = _currentUserProvider?.GetCurrentUserId();
var userId = GetCurrentUserId();

if (isAuditable)
{
Expand Down Expand Up @@ -161,4 +170,13 @@ private void OnBeforeSaving()
}
}
}

private long? GetCurrentUserId()
{
var nameIdentifier = _httpContextAccessor?.HttpContext?.User?.FindFirstValue(ClaimTypes.NameIdentifier);

long.TryParse(nameIdentifier, out var userId);

return userId;
}
}
2 changes: 1 addition & 1 deletion src/BuildingBlocks/Exception/GrpcExceptionInterceptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>(
}
catch (System.Exception exception)
{
throw new RpcException(new Status(StatusCode.Cancelled, exception.Message));
throw new RpcException(new Status(StatusCode.Internal, exception.Message));
}
}
}
18 changes: 18 additions & 0 deletions src/BuildingBlocks/MassTransit/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

namespace BuildingBlocks.MassTransit;

using Exception;

public static class Extensions
{
private static bool? _isRunningInContainer;
Expand Down Expand Up @@ -80,6 +82,10 @@ private static void SetupMasstransitConfigurations(IServiceCollection services,

foreach (var consumer in consumers)
{
//ref: https://masstransit-project.com/usage/exceptions.html#retry
//ref: https://markgossa.com/2022/06/masstransit-exponential-back-off.html
configurator.UseMessageRetry(r => AddRetryConfiguration(r));

configurator.ConfigureEndpoints(context, x => x.Exclude(consumer));
var methodInfo = typeof(DependencyInjectionReceiveEndpointExtensions)
.GetMethods()
Expand All @@ -95,4 +101,16 @@ private static void SetupMasstransitConfigurations(IServiceCollection services,
}
});
}

private static IRetryConfigurator AddRetryConfiguration(IRetryConfigurator retryConfigurator)
{
retryConfigurator.Exponential(
3,
TimeSpan.FromMilliseconds(200),
TimeSpan.FromMinutes(120),
TimeSpan.FromMilliseconds(200))
.Ignore<ValidationException>(); // don't retry if we have invalid data and message goes to _error queue masstransit

return retryConfigurator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

namespace BuildingBlocks.PersistMessageProcessor.Data;

using Microsoft.AspNetCore.Http;

public class PersistMessageDbContext : AppDbContextBase, IPersistMessageDbContext
{
public PersistMessageDbContext(DbContextOptions<PersistMessageDbContext> options)
: base(options)
public PersistMessageDbContext(DbContextOptions<PersistMessageDbContext> options, IHttpContextAccessor httpContextAccessor = default)
: base(options, httpContextAccessor)
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,17 @@ private async Task ProcessAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
await using (var scope = _serviceProvider.CreateAsyncScope())
{
await using (var scope = _serviceProvider.CreateAsyncScope())
{
var service = scope.ServiceProvider.GetRequiredService<IPersistMessageProcessor>();
await service.ProcessAllAsync(stoppingToken);
}
var service = scope.ServiceProvider.GetRequiredService<IPersistMessageProcessor>();
await service.ProcessAllAsync(stoppingToken);
}

var delay = _options.Interval is { }
? TimeSpan.FromSeconds((int)_options.Interval)
: TimeSpan.FromSeconds(30);
var delay = _options.Interval is { }
? TimeSpan.FromSeconds((int)_options.Interval)
: TimeSpan.FromSeconds(30);

await Task.Delay(delay, stoppingToken);
}
catch (System.Exception e)
{
Console.WriteLine(e);
throw;
}
await Task.Delay(delay, stoppingToken);
}
}
}
7 changes: 7 additions & 0 deletions src/BuildingBlocks/Polly/CircuitBreakerOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace BuildingBlocks.Polly;

public class CircuitBreakerOptions
{
public int RetryCount { get; set; }
public int BreakDuration { get; set; }
}
83 changes: 83 additions & 0 deletions src/BuildingBlocks/Polly/GrpcCircuitBreaker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
namespace BuildingBlocks.Polly;

using System.Net;
using Ardalis.GuardClauses;
using BuildingBlocks.Web;
using global::Polly;
using Grpc.Core;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

public static class GrpcCircuitBreaker
{
//ref: https://anthonygiretti.com/2020/03/31/grpc-asp-net-core-3-1-resiliency-with-polly/
public static IHttpClientBuilder AddGrpcCircuitBreakerPolicyHandler(this IHttpClientBuilder httpClientBuilder)
{
return httpClientBuilder.AddPolicyHandler((sp, _) =>
{
var options = sp.GetRequiredService<IConfiguration>().GetOptions<PolicyOptions>(nameof(PolicyOptions));

Guard.Against.Null(options, nameof(options));

var loggerFactory = sp.GetRequiredService<ILoggerFactory>();
var logger = loggerFactory.CreateLogger("PollyGrpcCircuitBreakerPoliciesLogger");

// gRPC status
var gRpcErrors = new StatusCode[]
{
StatusCode.DeadlineExceeded, StatusCode.Internal, StatusCode.NotFound, StatusCode.Cancelled,
StatusCode.ResourceExhausted, StatusCode.Unavailable, StatusCode.Unknown
};

// Http errors
var serverErrors = new HttpStatusCode[]
{
HttpStatusCode.BadGateway, HttpStatusCode.GatewayTimeout, HttpStatusCode.ServiceUnavailable,
HttpStatusCode.InternalServerError, HttpStatusCode.TooManyRequests, HttpStatusCode.RequestTimeout
};

return Policy.HandleResult<HttpResponseMessage>(r =>
{
var grpcStatus = StatusManager.GetStatusCode(r);
var httpStatusCode = r.StatusCode;

return (grpcStatus == null && serverErrors.Contains(httpStatusCode)) || // if the server send an error before gRPC pipeline
(httpStatusCode == HttpStatusCode.OK && gRpcErrors.Contains(grpcStatus.Value)); // if gRPC pipeline handled the request (gRPC always answers OK)
})
.CircuitBreakerAsync(
handledEventsAllowedBeforeBreaking: options.CircuitBreaker.RetryCount,
durationOfBreak: TimeSpan.FromSeconds(options.CircuitBreaker.BreakDuration),
onBreak: (response, breakDuration) =>
{
if (response?.Exception != null)
{
logger.LogError(response.Exception,
"Service shutdown during {BreakDuration} after {RetryCount} failed retries",
breakDuration,
options.CircuitBreaker.RetryCount);
}
},
onReset: () =>
{
logger.LogInformation("Service restarted");
});
});
}

private static class StatusManager
{
public static StatusCode? GetStatusCode(HttpResponseMessage response)
{
var headers = response.Headers;

if (!headers.Contains("grpc-status") && response.StatusCode == HttpStatusCode.OK)
return StatusCode.OK;

if (headers.Contains("grpc-status"))
return (StatusCode)int.Parse(headers.GetValues("grpc-status").First());

return null;
}
}
}
Loading