diff --git a/samples/KafkaFlow.Retry.Sample/Helpers/KafkaClusterConfigurationBuilderHelper.cs b/samples/KafkaFlow.Retry.Sample/Helpers/KafkaClusterConfigurationBuilderHelper.cs index cbdde234..cebfc4bf 100644 --- a/samples/KafkaFlow.Retry.Sample/Helpers/KafkaClusterConfigurationBuilderHelper.cs +++ b/samples/KafkaFlow.Retry.Sample/Helpers/KafkaClusterConfigurationBuilderHelper.cs @@ -174,7 +174,18 @@ internal static IClusterConfigurationBuilder SetupRetryDurableSqlServer( .WithCleanupPollingConfiguration( configure => configure .Enabled(false) + .WithCronExpression("0 0/1 * 1/1 * ? *") + ) + .WithRetryDurableActiveQueuesCountPollingConfiguration( + configure => configure + .Enabled(true) + .WithCronExpression("0 0/1 * 1/1 * ? *") + .Do((numberOfActiveQueues) => + { + Console.Write($"Number of active queues {numberOfActiveQueues}"); + }) ) + )) .AddTypedHandlers( handlers => handlers diff --git a/samples/KafkaFlow.Retry.Sample/Program.cs b/samples/KafkaFlow.Retry.Sample/Program.cs index d97cfec8..f3563145 100644 --- a/samples/KafkaFlow.Retry.Sample/Program.cs +++ b/samples/KafkaFlow.Retry.Sample/Program.cs @@ -21,8 +21,16 @@ private static async Task Main() var mongoDbDatabaseName = "kafka_flow_retry_durable_sample"; var mongoDbRetryQueueCollectionName = "RetryQueues"; var mongoDbRetryQueueItemCollectionName = "RetryQueueItems"; - var sqlServerConnectionString = - "Server=localhost;Trusted_Connection=True; Pooling=true; Min Pool Size=1; Max Pool Size=100; MultipleActiveResultSets=true; Application Name=KafkaFlow Retry Sample"; + var sqlServerConnectionString = string.Join( + string.Empty, + "Server=localhost;", + "Trusted_Connection=True;", + "Pooling=true;", + "Min Pool Size=1;", + "Max Pool Size=100;", + "MultipleActiveResultSets=true;", + "Application Name=KafkaFlow Retry Tests;" + ); var sqlServerDatabaseName = "kafka_flow_retry_durable_sample"; var topics = new[] { diff --git a/src/KafkaFlow.Retry.MongoDb/RetryQueueDataProvider.cs b/src/KafkaFlow.Retry.MongoDb/RetryQueueDataProvider.cs index f5400fb7..2e30edb5 100644 --- a/src/KafkaFlow.Retry.MongoDb/RetryQueueDataProvider.cs +++ b/src/KafkaFlow.Retry.MongoDb/RetryQueueDataProvider.cs @@ -105,6 +105,11 @@ public async Task CheckQueuePendingItemsAsync(QueuePend return new QueuePendingItemsResult(QueuePendingItemsResultStatus.NoPendingItems); } + public Task CountQueuesAsync(CountQueuesInput input) + { + throw new NotImplementedException(); + } + public async Task DeleteQueuesAsync(DeleteQueuesInput input) { Guard.Argument(input, nameof(input)).NotNull(); diff --git a/src/KafkaFlow.Retry.Postgres/RetryQueueDataProvider.cs b/src/KafkaFlow.Retry.Postgres/RetryQueueDataProvider.cs index 6e926622..babf9555 100644 --- a/src/KafkaFlow.Retry.Postgres/RetryQueueDataProvider.cs +++ b/src/KafkaFlow.Retry.Postgres/RetryQueueDataProvider.cs @@ -107,6 +107,11 @@ public async Task CheckQueuePendingItemsAsync(QueuePend } } + public Task CountQueuesAsync(CountQueuesInput input) + { + throw new NotImplementedException(); + } + public async Task DeleteQueuesAsync(DeleteQueuesInput input) { Guard.Argument(input, nameof(input)).NotNull(); diff --git a/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueRepository.cs b/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueRepository.cs index 8ceddfde..a2bc6e2c 100644 --- a/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueRepository.cs +++ b/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueRepository.cs @@ -17,6 +17,8 @@ internal interface IRetryQueueRepository Task GetQueueAsync(IDbConnection dbConnection, string queueGroupKey); + Task CountQueueAsync(IDbConnection dbConnection, string searchGroupKey, RetryQueueStatus retryQueueStatus); + Task> GetTopSortedQueuesOrderedAsync(IDbConnection dbConnection, RetryQueueStatus retryQueueStatus, GetQueuesSortOption sortOption, string searchGroupKey, int top); Task UpdateAsync(IDbConnection dbConnection, Guid idDomain, RetryQueueStatus retryQueueStatus, DateTime lastExecution); diff --git a/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueRepository.cs b/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueRepository.cs index 8d55aebf..c934afe4 100644 --- a/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueRepository.cs +++ b/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueRepository.cs @@ -34,6 +34,24 @@ public async Task AddAsync(IDbConnection dbConnection, RetryQueueDbo retry } } + public async Task CountQueueAsync(IDbConnection dbConnection, string searchGroupKey, RetryQueueStatus retryQueueStatus) + { + using (var command = dbConnection.CreateCommand()) + { + command.CommandType = CommandType.Text; + command.CommandText = + $@"SELECT COUNT(1) + FROM [{dbConnection.Schema}].[RetryQueues] + WHERE SearchGroupKey = @SearchGroupKey + AND IdStatus = @IdStatus"; + + command.Parameters.AddWithValue("SearchGroupKey", searchGroupKey); + command.Parameters.AddWithValue("IdStatus", (byte)retryQueueStatus); + + return (Int32) await command.ExecuteScalarAsync().ConfigureAwait(false); + } + } + public async Task DeleteQueuesAsync(IDbConnection dbConnection, string searchGroupKey, RetryQueueStatus retryQueueStatus, DateTime maxLastExecutionDateToBeKept, int maxRowsToDelete) { @@ -83,9 +101,9 @@ public async Task GetQueueAsync(IDbConnection dbConnection, strin command.CommandType = CommandType.Text; command.CommandText = $@"SELECT Id, IdDomain, IdStatus, SearchGroupKey, QueueGroupKey, CreationDate, LastExecution - FROM [{dbConnection.Schema}].[RetryQueues] - WHERE QueueGroupKey = @QueueGroupKey - ORDER BY Id"; + FROM [{dbConnection.Schema}].[RetryQueues] + WHERE QueueGroupKey = @QueueGroupKey + ORDER BY Id"; command.Parameters.AddWithValue("QueueGroupKey", queueGroupKey); diff --git a/src/KafkaFlow.Retry.SqlServer/RetryQueueDataProvider.cs b/src/KafkaFlow.Retry.SqlServer/RetryQueueDataProvider.cs index b048ec82..360b65f9 100644 --- a/src/KafkaFlow.Retry.SqlServer/RetryQueueDataProvider.cs +++ b/src/KafkaFlow.Retry.SqlServer/RetryQueueDataProvider.cs @@ -107,6 +107,20 @@ public async Task CheckQueuePendingItemsAsync(QueuePend } } + public async Task CountQueuesAsync(CountQueuesInput input) + { + Guard.Argument(input, nameof(input)).NotNull(); + + using (var dbConnection = _connectionProvider.Create(_sqlServerDbSettings)) + { + return await _retryQueueRepository.CountQueueAsync( + dbConnection, + input.SearchGroupKey, + input.Status) + .ConfigureAwait(false); + } + } + public async Task DeleteQueuesAsync(DeleteQueuesInput input) { Guard.Argument(input, nameof(input)).NotNull(); diff --git a/src/KafkaFlow.Retry/Durable/Definitions/Builders/Polling/PollingDefinitionsAggregatorBuilder.cs b/src/KafkaFlow.Retry/Durable/Definitions/Builders/Polling/PollingDefinitionsAggregatorBuilder.cs index 571aee62..dea5af03 100644 --- a/src/KafkaFlow.Retry/Durable/Definitions/Builders/Polling/PollingDefinitionsAggregatorBuilder.cs +++ b/src/KafkaFlow.Retry/Durable/Definitions/Builders/Polling/PollingDefinitionsAggregatorBuilder.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Linq; using Dawn; +using KafkaFlow.Retry.Durable.Definitions.Builders.Polling; using KafkaFlow.Retry.Durable.Definitions.Polling; namespace KafkaFlow.Retry; @@ -11,12 +12,14 @@ public class PollingDefinitionsAggregatorBuilder private readonly CleanupPollingDefinitionBuilder _cleanupPollingDefinitionBuilder; private readonly List _pollingDefinitions; private readonly RetryDurablePollingDefinitionBuilder _retryDurablePollingDefinitionBuilder; + private readonly RetryDurableActiveQueuesCountPollingDefinitionBuilder _retryDurableActiveQueuesCountPollingDefinitionBuilder; private string _schedulerId; public PollingDefinitionsAggregatorBuilder() { _cleanupPollingDefinitionBuilder = new CleanupPollingDefinitionBuilder(); _retryDurablePollingDefinitionBuilder = new RetryDurablePollingDefinitionBuilder(); + _retryDurableActiveQueuesCountPollingDefinitionBuilder = new RetryDurableActiveQueuesCountPollingDefinitionBuilder(); _pollingDefinitions = new List(); } @@ -47,6 +50,19 @@ public PollingDefinitionsAggregatorBuilder WithRetryDurablePollingConfiguration( return this; } + public PollingDefinitionsAggregatorBuilder WithRetryDurableActiveQueuesCountPollingConfiguration( + Action configure) + { + Guard.Argument(configure, nameof(configure)).NotNull(); + + configure(_retryDurableActiveQueuesCountPollingDefinitionBuilder); + var etryDurableActiveQueuesCountPollingDefinition = _retryDurableActiveQueuesCountPollingDefinitionBuilder.Build(); + + _pollingDefinitions.Add(etryDurableActiveQueuesCountPollingDefinition); + + return this; + } + public PollingDefinitionsAggregatorBuilder WithSchedulerId(string schedulerId) { _schedulerId = schedulerId; @@ -65,6 +81,11 @@ internal PollingDefinitionsAggregator Build() ValidateRequiredPollingDefinition(PollingJobType.Cleanup); } + if (_retryDurableActiveQueuesCountPollingDefinitionBuilder.Required) + { + ValidateRequiredPollingDefinition(PollingJobType.RetryDurableActiveQueuesCount); + } + return new PollingDefinitionsAggregator(_schedulerId, _pollingDefinitions); } diff --git a/src/KafkaFlow.Retry/Durable/Definitions/Builders/Polling/RetryDurableActiveQueuesCountPollingDefinitionBuilder.cs b/src/KafkaFlow.Retry/Durable/Definitions/Builders/Polling/RetryDurableActiveQueuesCountPollingDefinitionBuilder.cs new file mode 100644 index 00000000..132b42b2 --- /dev/null +++ b/src/KafkaFlow.Retry/Durable/Definitions/Builders/Polling/RetryDurableActiveQueuesCountPollingDefinitionBuilder.cs @@ -0,0 +1,25 @@ +using System; +using KafkaFlow.Retry.Durable.Definitions.Polling; + +namespace KafkaFlow.Retry.Durable.Definitions.Builders.Polling; +public class RetryDurableActiveQueuesCountPollingDefinitionBuilder : PollingDefinitionBuilder +{ + protected Action ActionToPerform = null; + + internal override bool Required => false; + + public RetryDurableActiveQueuesCountPollingDefinitionBuilder Do(Action actionToPerform) + { + ActionToPerform = actionToPerform; + return this; + } + + internal RetryDurableActiveQueuesCountPollingDefinition Build() + { + return new RetryDurableActiveQueuesCountPollingDefinition( + IsEnabled, + CronExpression, + ActionToPerform + ); + } +} diff --git a/src/KafkaFlow.Retry/Durable/Definitions/Polling/PollingJobType.cs b/src/KafkaFlow.Retry/Durable/Definitions/Polling/PollingJobType.cs index 1c26da96..62180a3f 100644 --- a/src/KafkaFlow.Retry/Durable/Definitions/Polling/PollingJobType.cs +++ b/src/KafkaFlow.Retry/Durable/Definitions/Polling/PollingJobType.cs @@ -5,5 +5,6 @@ internal enum PollingJobType Unknown = 0, RetryDurable = 1, - Cleanup = 2 + Cleanup = 2, + RetryDurableActiveQueuesCount = 3 } \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Definitions/Polling/RetryDurableActiveQueuesCountPollingDefinition.cs b/src/KafkaFlow.Retry/Durable/Definitions/Polling/RetryDurableActiveQueuesCountPollingDefinition.cs new file mode 100644 index 00000000..2a4b7081 --- /dev/null +++ b/src/KafkaFlow.Retry/Durable/Definitions/Polling/RetryDurableActiveQueuesCountPollingDefinition.cs @@ -0,0 +1,20 @@ +using System; +using Dawn; + +namespace KafkaFlow.Retry.Durable.Definitions.Polling; +internal class RetryDurableActiveQueuesCountPollingDefinition : PollingDefinition +{ + public RetryDurableActiveQueuesCountPollingDefinition( + bool enabled, + string cronExpression, + Action activeQueues) + : base(enabled, cronExpression) + { + Guard.Argument(activeQueues, nameof(activeQueues)).NotNull(); + ActiveQueues = activeQueues; + } + + public override PollingJobType PollingJobType => PollingJobType.RetryDurableActiveQueuesCount; + + public Action ActiveQueues { get; } +} diff --git a/src/KafkaFlow.Retry/Durable/Polling/JobDataProvidersFactory.cs b/src/KafkaFlow.Retry/Durable/Polling/JobDataProvidersFactory.cs index 7e858267..4fdeafec 100644 --- a/src/KafkaFlow.Retry/Durable/Polling/JobDataProvidersFactory.cs +++ b/src/KafkaFlow.Retry/Durable/Polling/JobDataProvidersFactory.cs @@ -39,7 +39,7 @@ public JobDataProvidersFactory( public IEnumerable Create(IMessageProducer retryDurableMessageProducer, ILogHandler logHandler) { - var jobDataProviders = new List(2); + var jobDataProviders = new List(3); if (TryGetPollingDefinition(PollingJobType.RetryDurable, out var retryDurablePollingDefinition)) @@ -71,6 +71,19 @@ public IEnumerable Create(IMessageProducer retryDurableMessage ); } + if (TryGetPollingDefinition(PollingJobType.RetryDurableActiveQueuesCount, out var retryDurableActiveQueuesCountPollingDefinition)) + { + jobDataProviders.Add( + new RetryDurableActiveQueuesCountJobDataProvider( + retryDurableActiveQueuesCountPollingDefinition, + GetTrigger(retryDurableActiveQueuesCountPollingDefinition), + _pollingDefinitionsAggregator.SchedulerId, + _retryDurableQueueRepository, + logHandler + ) + ); + } + return jobDataProviders; } diff --git a/src/KafkaFlow.Retry/Durable/Polling/Jobs/CleanupPollingJob.cs b/src/KafkaFlow.Retry/Durable/Polling/Jobs/CleanupPollingJob.cs index 62597dbc..84fb2d72 100644 --- a/src/KafkaFlow.Retry/Durable/Polling/Jobs/CleanupPollingJob.cs +++ b/src/KafkaFlow.Retry/Durable/Polling/Jobs/CleanupPollingJob.cs @@ -9,7 +9,7 @@ namespace KafkaFlow.Retry.Durable.Polling.Jobs; -[DisallowConcurrentExecutionAttribute] +[DisallowConcurrentExecution] internal class CleanupPollingJob : IJob { public async Task Execute(IJobExecutionContext context) diff --git a/src/KafkaFlow.Retry/Durable/Polling/Jobs/PollingJobConstants.cs b/src/KafkaFlow.Retry/Durable/Polling/Jobs/PollingJobConstants.cs index 2f67cde0..a56f048e 100644 --- a/src/KafkaFlow.Retry/Durable/Polling/Jobs/PollingJobConstants.cs +++ b/src/KafkaFlow.Retry/Durable/Polling/Jobs/PollingJobConstants.cs @@ -10,4 +10,5 @@ internal static class PollingJobConstants public const string RetryDurableQueueRepository = "RetryDurableQueueRepository"; public const string SchedulerId = "SchedulerId"; public const string Utf8Encoder = "Utf8Encoder"; + public const string RetryDurableActiveQueuesCountPollingDefinition = "RetryDurableActiveQueuesCountPollingDefinition"; } \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Polling/Jobs/RetryDurableActiveQueuesCountJob.cs b/src/KafkaFlow.Retry/Durable/Polling/Jobs/RetryDurableActiveQueuesCountJob.cs new file mode 100644 index 00000000..df72c400 --- /dev/null +++ b/src/KafkaFlow.Retry/Durable/Polling/Jobs/RetryDurableActiveQueuesCountJob.cs @@ -0,0 +1,63 @@ +using System; +using System.Threading.Tasks; +using KafkaFlow.Retry.Durable.Definitions.Polling; +using KafkaFlow.Retry.Durable.Polling.Extensions; +using KafkaFlow.Retry.Durable.Repository; +using KafkaFlow.Retry.Durable.Repository.Actions.Delete; +using KafkaFlow.Retry.Durable.Repository.Actions.Read; +using KafkaFlow.Retry.Durable.Repository.Model; +using Quartz; + +namespace KafkaFlow.Retry.Durable.Polling.Jobs; + +[DisallowConcurrentExecution] +internal class RetryDurableActiveQueuesCountJob : IJob +{ + public async Task Execute(IJobExecutionContext context) + { + var jobDataMap = context.JobDetail.JobDataMap; + + var retryDurableActiveQueuesCountPollingDefinition = + jobDataMap.GetValidValue(PollingJobConstants.RetryDurableActiveQueuesCountPollingDefinition, + nameof(RetryDurableActiveQueuesCountJob)); + var schedulerId = jobDataMap.GetValidStringValue(PollingJobConstants.SchedulerId, nameof(RetryDurableActiveQueuesCountJob)); + var retryDurableQueueRepository = + jobDataMap.GetValidValue(PollingJobConstants.RetryDurableQueueRepository, + nameof(RetryDurableActiveQueuesCountJob)); + var logHandler = + jobDataMap.GetValidValue(PollingJobConstants.LogHandler, nameof(RetryDurableActiveQueuesCountJob)); + + try + { + logHandler.Info( + $"{nameof(RetryDurableActiveQueuesCountJob)} starts execution", + new + { + context.Trigger.Key.Name + } + ); + + var countQueuesInput = new CountQueuesInput(RetryQueueStatus.Active) + { + SearchGroupKey = schedulerId + }; + + var countQueuesResult = + await retryDurableQueueRepository.CountRetryQueuesAsync(countQueuesInput).ConfigureAwait(false); + + retryDurableActiveQueuesCountPollingDefinition.ActiveQueues(countQueuesResult); + + logHandler.Info( + $"{nameof(RetryDurableActiveQueuesCountJob)} executed successfully.", + new + { + SearchGroupKey = schedulerId, + NumberOfActiveQueues = countQueuesResult + }); + } + catch (Exception ex) + { + logHandler.Error($"Exception on {nameof(RetryDurableActiveQueuesCountJob)} execution", ex, null); + } + } +} diff --git a/src/KafkaFlow.Retry/Durable/Polling/Jobs/RetryDurableActiveQueuesCountJobDataProvider.cs b/src/KafkaFlow.Retry/Durable/Polling/Jobs/RetryDurableActiveQueuesCountJobDataProvider.cs new file mode 100644 index 00000000..e4ab9f8c --- /dev/null +++ b/src/KafkaFlow.Retry/Durable/Polling/Jobs/RetryDurableActiveQueuesCountJobDataProvider.cs @@ -0,0 +1,51 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Dawn; +using KafkaFlow.Retry.Durable.Definitions.Polling; +using KafkaFlow.Retry.Durable.Repository; +using Quartz; + +namespace KafkaFlow.Retry.Durable.Polling.Jobs; + +internal class RetryDurableActiveQueuesCountJobDataProvider : IJobDataProvider +{ + private readonly IJobDetail _jobDetail; + private readonly RetryDurableActiveQueuesCountPollingDefinition _retryDurableActiveQueuesCountPollingDefinition; + private readonly ITrigger _trigger; + + public RetryDurableActiveQueuesCountJobDataProvider( + RetryDurableActiveQueuesCountPollingDefinition retryDurableActiveQueuesCountPollingDefinition, + ITrigger trigger, + string schedulerId, + IRetryDurableQueueRepository retryDurableQueueRepository, + ILogHandler logHandler) + { + Guard.Argument(retryDurableActiveQueuesCountPollingDefinition, nameof(retryDurableActiveQueuesCountPollingDefinition)).NotNull(); + Guard.Argument(trigger, nameof(trigger)).NotNull(); + Guard.Argument(schedulerId, nameof(schedulerId)).NotNull().NotEmpty(); + Guard.Argument(retryDurableQueueRepository, nameof(retryDurableQueueRepository)).NotNull(); + Guard.Argument(logHandler, nameof(logHandler)).NotNull(); + + _retryDurableActiveQueuesCountPollingDefinition = retryDurableActiveQueuesCountPollingDefinition; + _trigger = trigger; + _jobDetail = JobBuilder + .Create() + .WithIdentity($"pollingJob_{schedulerId}_{retryDurableActiveQueuesCountPollingDefinition.PollingJobType}", "queueTrackerGroup") + .SetJobData( + new JobDataMap + { + { PollingJobConstants.RetryDurableActiveQueuesCountPollingDefinition, retryDurableActiveQueuesCountPollingDefinition }, + { PollingJobConstants.SchedulerId, schedulerId }, + { PollingJobConstants.RetryDurableQueueRepository, retryDurableQueueRepository }, + { PollingJobConstants.LogHandler, logHandler } + }) + .Build(); + } + + public IJobDetail JobDetail => _jobDetail; + + public PollingDefinition PollingDefinition => _retryDurableActiveQueuesCountPollingDefinition; + + public ITrigger Trigger => _trigger; +} diff --git a/src/KafkaFlow.Retry/Durable/Polling/Jobs/RetryDurablePollingJob.cs b/src/KafkaFlow.Retry/Durable/Polling/Jobs/RetryDurablePollingJob.cs index d5258efd..3df6fe50 100644 --- a/src/KafkaFlow.Retry/Durable/Polling/Jobs/RetryDurablePollingJob.cs +++ b/src/KafkaFlow.Retry/Durable/Polling/Jobs/RetryDurablePollingJob.cs @@ -14,7 +14,7 @@ namespace KafkaFlow.Retry.Durable.Polling.Jobs; -[DisallowConcurrentExecutionAttribute] +[DisallowConcurrentExecution] internal class RetryDurablePollingJob : IJob { private TimeSpan _expirationInterval = TimeSpan.Zero; diff --git a/src/KafkaFlow.Retry/Durable/Repository/Actions/Read/CountQueuesInput.cs b/src/KafkaFlow.Retry/Durable/Repository/Actions/Read/CountQueuesInput.cs new file mode 100644 index 00000000..8d1a7e1a --- /dev/null +++ b/src/KafkaFlow.Retry/Durable/Repository/Actions/Read/CountQueuesInput.cs @@ -0,0 +1,15 @@ +using Dawn; +using KafkaFlow.Retry.Durable.Repository.Model; + +namespace KafkaFlow.Retry.Durable.Repository.Actions.Read; +public class CountQueuesInput +{ + public CountQueuesInput(RetryQueueStatus status) + { + Guard.Argument(status, nameof(status)).NotDefault(); + + Status = status; + } + public RetryQueueStatus Status { get; } + public string SearchGroupKey { get; set; } +} diff --git a/src/KafkaFlow.Retry/Durable/Repository/IRetryDurableQueueRepository.cs b/src/KafkaFlow.Retry/Durable/Repository/IRetryDurableQueueRepository.cs index 6867e3db..3824910b 100644 --- a/src/KafkaFlow.Retry/Durable/Repository/IRetryDurableQueueRepository.cs +++ b/src/KafkaFlow.Retry/Durable/Repository/IRetryDurableQueueRepository.cs @@ -20,6 +20,8 @@ internal interface IRetryDurableQueueRepository Task> GetRetryQueuesAsync(GetQueuesInput getQueuesInput); + Task CountRetryQueuesAsync(CountQueuesInput countQueuesInput); + Task SaveToQueueAsync(IMessageContext context, string description); Task UpdateItemAsync(UpdateItemInput updateItemInput); diff --git a/src/KafkaFlow.Retry/Durable/Repository/IRetryDurableQueueRepositoryProvider.cs b/src/KafkaFlow.Retry/Durable/Repository/IRetryDurableQueueRepositoryProvider.cs index 47ca820a..8ceb7188 100644 --- a/src/KafkaFlow.Retry/Durable/Repository/IRetryDurableQueueRepositoryProvider.cs +++ b/src/KafkaFlow.Retry/Durable/Repository/IRetryDurableQueueRepositoryProvider.cs @@ -18,6 +18,8 @@ public interface IRetryDurableQueueRepositoryProvider Task GetQueuesAsync(GetQueuesInput input); + Task CountQueuesAsync(CountQueuesInput input); + Task SaveToQueueAsync(SaveToQueueInput input); Task UpdateItemExecutionInfoAsync(UpdateItemExecutionInfoInput input); diff --git a/src/KafkaFlow.Retry/Durable/Repository/NullRetryDurableQueueRepository.cs b/src/KafkaFlow.Retry/Durable/Repository/NullRetryDurableQueueRepository.cs index 640e0867..de8c16f1 100644 --- a/src/KafkaFlow.Retry/Durable/Repository/NullRetryDurableQueueRepository.cs +++ b/src/KafkaFlow.Retry/Durable/Repository/NullRetryDurableQueueRepository.cs @@ -24,6 +24,9 @@ public Task CheckQueueNewestItemsAsync(QueueNewestItemsI public Task CheckQueuePendingItemsAsync(QueuePendingItemsInput queuePendingItemsInput) => Task.FromResult(new QueuePendingItemsResult(QueuePendingItemsResultStatus.NoPendingItems)); + public Task CountRetryQueuesAsync(CountQueuesInput countQueuesInput) + => Task.FromResult(default(long)); + public Task DeleteQueuesAsync(DeleteQueuesInput deleteQueuesInput) => Task.FromResult(new DeleteQueuesResult(0)); diff --git a/src/KafkaFlow.Retry/Durable/Repository/RetryDurableQueueRepository.cs b/src/KafkaFlow.Retry/Durable/Repository/RetryDurableQueueRepository.cs index 07d1eaaf..5d51c9bc 100644 --- a/src/KafkaFlow.Retry/Durable/Repository/RetryDurableQueueRepository.cs +++ b/src/KafkaFlow.Retry/Durable/Repository/RetryDurableQueueRepository.cs @@ -135,6 +135,22 @@ public async Task CheckQueuePendingItemsAsync( } } + public async Task CountRetryQueuesAsync(CountQueuesInput countQueuesInput) + { + try + { + return await _retryDurableRepositoryProvider.CountQueuesAsync(countQueuesInput).ConfigureAwait(false); + } + catch (Exception ex) + { + var kafkaException = new RetryDurableException( + new RetryError(RetryErrorCode.DataProviderCountRetryQueues), + "An error ocurred count the retry queues", ex); + + throw kafkaException; + } + } + public async Task DeleteQueuesAsync(DeleteQueuesInput deleteQueuesInput) { return await _retryDurableRepositoryProvider.DeleteQueuesAsync(deleteQueuesInput).ConfigureAwait(false); diff --git a/src/KafkaFlow.Retry/Durable/RetryErrorCode.cs b/src/KafkaFlow.Retry/Durable/RetryErrorCode.cs index a18bf15f..d32e155f 100644 --- a/src/KafkaFlow.Retry/Durable/RetryErrorCode.cs +++ b/src/KafkaFlow.Retry/Durable/RetryErrorCode.cs @@ -31,4 +31,5 @@ public enum RetryErrorCode DataProviderCheckQueuePendingItems = 0204, DataProviderGetRetryQueues = 0205, DataProviderUpdateItem = 0206, + DataProviderCountRetryQueues = 0207, } \ No newline at end of file