diff --git a/.nuget/NuGet.Config b/.nuget/NuGet.Config deleted file mode 100644 index 67f8ea0..0000000 --- a/.nuget/NuGet.Config +++ /dev/null @@ -1,6 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/.nuget/NuGet.exe b/.nuget/NuGet.exe index 305300a..4543b4f 100644 Binary files a/.nuget/NuGet.exe and b/.nuget/NuGet.exe differ diff --git a/.nuget/NuGet.targets b/.nuget/NuGet.targets deleted file mode 100644 index 3f8c37b..0000000 --- a/.nuget/NuGet.targets +++ /dev/null @@ -1,144 +0,0 @@ - - - - $(MSBuildProjectDirectory)\..\ - - - false - - - false - - - true - - - false - - - - - - - - - - - $([System.IO.Path]::Combine($(SolutionDir), ".nuget")) - - - - - $(SolutionDir).nuget - - - - $(MSBuildProjectDirectory)\packages.$(MSBuildProjectName.Replace(' ', '_')).config - $(MSBuildProjectDirectory)\packages.$(MSBuildProjectName).config - - - - $(MSBuildProjectDirectory)\packages.config - $(PackagesProjectConfig) - - - - - $(NuGetToolsPath)\NuGet.exe - @(PackageSource) - - "$(NuGetExePath)" - mono --runtime=v4.0.30319 "$(NuGetExePath)" - - $(TargetDir.Trim('\\')) - - -RequireConsent - -NonInteractive - - "$(SolutionDir) " - "$(SolutionDir)" - - - $(NuGetCommand) install "$(PackagesConfig)" -source "$(PackageSources)" $(NonInteractiveSwitch) $(RequireConsentSwitch) -solutionDir $(PaddedSolutionDir) - $(NuGetCommand) pack "$(ProjectPath)" -Properties "Configuration=$(Configuration);Platform=$(Platform)" $(NonInteractiveSwitch) -OutputDirectory "$(PackageOutputDir)" -symbols - - - - RestorePackages; - $(BuildDependsOn); - - - - - $(BuildDependsOn); - BuildPackage; - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/.nuget/packages.config b/.nuget/packages.config index 9ae0893..149baa6 100644 --- a/.nuget/packages.config +++ b/.nuget/packages.config @@ -1,6 +1,6 @@  - - + + \ No newline at end of file diff --git a/.vs/HangFire.Azure.ServiceBusQueue/DesignTimeBuild/.dtbcache.v2 b/.vs/HangFire.Azure.ServiceBusQueue/DesignTimeBuild/.dtbcache.v2 new file mode 100644 index 0000000..f417f59 Binary files /dev/null and b/.vs/HangFire.Azure.ServiceBusQueue/DesignTimeBuild/.dtbcache.v2 differ diff --git a/HangFire.Azure.ServiceBusQueue.sln b/HangFire.Azure.ServiceBusQueue.sln index f6d7da0..b892904 100644 --- a/HangFire.Azure.ServiceBusQueue.sln +++ b/HangFire.Azure.ServiceBusQueue.sln @@ -1,25 +1,11 @@  Microsoft Visual Studio Solution File, Format Version 12.00 -# Visual Studio 15 -VisualStudioVersion = 15.0.26228.9 +# Visual Studio Version 16 +VisualStudioVersion = 16.0.29123.88 MinimumVisualStudioVersion = 10.0.40219.1 -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Hangfire.Azure.ServiceBusQueue", "src\HangFire.Azure.ServiceBusQueue\Hangfire.Azure.ServiceBusQueue.csproj", "{4CC51F69-0311-4485-B7DE-9ECAB3A1B5E5}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Hangfire.Azure.ServiceBusQueue", "src\HangFire.Azure.ServiceBusQueue\Hangfire.Azure.ServiceBusQueue.csproj", "{4CC51F69-0311-4485-B7DE-9ECAB3A1B5E5}" EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".nuget", ".nuget", "{756FFE84-4908-42BE-9918-4DA34E85AAFB}" - ProjectSection(SolutionItems) = preProject - .nuget\NuGet.Config = .nuget\NuGet.Config - .nuget\NuGet.exe = .nuget\NuGet.exe - .nuget\NuGet.targets = .nuget\NuGet.targets - EndProjectSection -EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "build", "build", "{95E4FDB9-247E-4BD8-896C-D28BCBA311F7}" - ProjectSection(SolutionItems) = preProject - appveyor.yml = appveyor.yml - nuspecs\Hangfire.Azure.ServiceBusQueue.nuspec = nuspecs\Hangfire.Azure.ServiceBusQueue.nuspec - psake-project.ps1 = psake-project.ps1 - EndProjectSection -EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "HangFire.Azure.ServiceBusQueue.Tests", "tests\HangFire.Azure.ServiceBusQueue.Tests\HangFire.Azure.ServiceBusQueue.Tests.csproj", "{C9D29F07-4445-4FCB-BC38-2221B2B38231}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HangFire.Azure.ServiceBusQueue.Tests", "tests\HangFire.Azure.ServiceBusQueue.Tests\HangFire.Azure.ServiceBusQueue.Tests.csproj", "{C9D29F07-4445-4FCB-BC38-2221B2B38231}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution diff --git a/appveyor.yml b/appveyor.yml index d6d4826..df1416a 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -7,9 +7,9 @@ # - Section names should be unique on each level. # Don't edit manually! Use `build.bat version` command instead! -version: 4.1.0-build-0{build} +version: 5.0.0-build-0{build} -os: Visual Studio 2015 +os: Visual Studio 2019 #---------------------------------# # environment configuration # @@ -22,9 +22,6 @@ branches: - master - dev -cache: - - packages - #---------------------------------# # build configuration # #---------------------------------# @@ -51,14 +48,6 @@ artifacts: #---------------------------------# deploy: - - provider: GitHub - auth_token: - secure: Kx+tJBMfq/OK7sMpvQDdigFOIIfzYkVmIHxNQetanQdIA5Mb8zm/VhAkrId4x5zW - artifact: /.*\.zip/ - draft: true - on: - appveyor_repo_tag: true - - provider: NuGet api_key: secure: 8BQmcR1hP9MoxvrmXJKMtlRyX6LI36H9ZTU8fSEetvzMGCl1PUlXKZRUGq+GVMja diff --git a/nuspecs/Hangfire.Azure.ServiceBusQueue.nuspec b/nuspecs/Hangfire.Azure.ServiceBusQueue.nuspec index 8cee985..b512202 100644 --- a/nuspecs/Hangfire.Azure.ServiceBusQueue.nuspec +++ b/nuspecs/Hangfire.Azure.ServiceBusQueue.nuspec @@ -4,23 +4,36 @@ Hangfire.Azure.ServiceBusQueue 0.0.0 Hangfire Azure ServiceBus Queue - Sergey Odinokov, Adam Barclay + Sergey Odinokov, Adam Barclay, Giampaolo Gabba odinserj https://github.com/odinserj/HangFire.Azure.ServiceBusQueue false https://raw.github.com/odinserj/HangFire.Azure.ServiceBusQueue/master/license.txt Azure ServiceBus Queue support for SQL Server job storage implementation - Copyright © 2015 Sergey Odinokov, Adam Barclay + Copyright © 2015 Sergey Odinokov, Adam Barclay, Giampaolo Gabba Hangfire Azure ServiceBus SqlServer - - - - - + + + + + + + + + + + + - - - + + + + + + + + + \ No newline at end of file diff --git a/psake-project.ps1 b/psake-project.ps1 index 9cfbbb0..f779629 100644 --- a/psake-project.ps1 +++ b/psake-project.ps1 @@ -2,22 +2,21 @@ Properties { $solution = "HangFire.Azure.ServiceBusQueue.sln" } -Include "packages\Hangfire.Build.0.1.3\tools\psake-common.ps1" +Include "packages\Hangfire.Build.0.2.6\tools\psake-common.ps1" Task Default -Depends Collect -Task Collect -Depends Compile -Description "Copy all artifacts to the build folder." { - Collect-Assembly "Hangfire.Azure.ServiceBusQueue" "Net45" +Task CompileCore -Depends Clean { + Exec { dotnet build -c Release } } -Task Pack -Depends Collect -Description "Create NuGet packages and archive files." { - $version = Get-BuildVersion +Task Collect -Depends CompileCore -Description "Copy all artifacts to the build folder." { + Collect-Assembly "Hangfire.Azure.ServiceBusQueue" "net461" + Collect-Assembly "Hangfire.Azure.ServiceBusQueue" "netstandard2.0" +} - $tag = $env:APPVEYOR_REPO_TAG_NAME - if ($tag -And $tag.StartsWith("v$version-")) { - "Using tag-based version for packages." - $version = $tag.Substring(1) - } +Task Pack -Depends Collect -Description "Create NuGet packages and archive files." { + $version = Get-PackageVersion Create-Archive "Hangfire.Azure.ServiceBusQueue-$version" Create-Package "Hangfire.Azure.ServiceBusQueue" $version diff --git a/readme.md b/readme.md index 15b19e0..b34f249 100644 --- a/readme.md +++ b/readme.md @@ -27,7 +27,74 @@ Hangfire v1.7+ introduced breaking changes to the SQL Server integration points Usage ------ -To use the queue it needs to be added to your existing SQL Server storage configuration, using one of the `UseServiceBusQueues` overloads: +To use the queue it needs to be added to your existing SQL Server storage configuration. + + +For .NETCore and beyond, you can use the `IGlobalConfiguration` extension `.UseServiceBusQueues`: + +```csharp + +//You can use .UseServiceBusQueues only after .UseSqlStorage() + +// Uses default options (no prefix or configuration) with the "default" queue only +services.AddHangfire(configuration => configuration + .UseSqlServerStorage("") + .UseServiceBusQueues("") + +// Uses default options (no prefix or configuration) with the "critical" and "default" queues +services.AddHangfire(configuration => configuration + .UseSqlServerStorage("") + .UseServiceBusQueues("", "critical", "default") + +// Configures queues on creation and uses the "crtical" and "default" queues +services.AddHangfire(configuration => configuration + .UseSqlServerStorage("") + .UseServiceBusQueues("", + queueOptions => { + queueOptions.MaxSizeInMegabytes = 5120; + queueOptions.DefaultMessageTimeToLive = new TimeSpan(0, 1, 0); + } "critical", "default") + +// Specifies all options +services.AddHangfire(configuration => configuration + .UseSqlServerStorage("") + .UseServiceBusQueues(new ServiceBusQueueOptions + { + ConnectionString = connectionString, + + Configure = configureAction, + + // The actual queues used in Azure will have this prefix if specified + // (e.g. the "default" queue will be created as "my-prefix-default") + // + // This can be useful in development environments particularly where the machine + // name could be used to separate individual developers machines automatically + // (i.e. "my-prefix-{machine-name}".Replace("{machine-name}", Environment.MachineName)) + QueuePrefix = "my-prefix-", + + // The queues to monitor. This *must* be specified, even to set just + // the default queue as done here + Queues = new [] { EnqueuedState.DefaultQueue }, + + // By default queues will be checked and created on startup. This option + // can be disabled if the application will only be sending / listening to + // the queue and you want to remove the 'Manage' permission from the shared + // access policy. + // + // Note that the dashboard *must* have the 'Manage' permission otherwise the + // queue length cannot be read + CheckAndCreateQueues = false, + + // Typically a lower value is desired to keep the throughput of message processing high. A lower timeout means more calls to + // Azure Service Bus which can increase costs, especially on an under-utilised server with few jobs. + // Use a Higher value for lower costs in non production or non critical jobs + LoopReceiveTimeout = TimeSpan.FromMilliseconds(500) + + // Delay between queue polling requests + QueuePollInterval = TimeSpan.Zero + })); +``` +You can also use `UseServiceBusQueues` overloads: ```csharp var sqlStorage = new SqlServerStorage(""); @@ -84,6 +151,9 @@ sqlStorage.UseServiceBusQueues(new ServiceBusQueueOptions // Azure Service Bus which can increase costs, especially on an under-utilised server with few jobs. // Use a Higher value for lower costs in non production or non critical jobs LoopReceiveTimeout = TimeSpan.FromMilliseconds(500) + + // Delay between queue polling requests + QueuePollInterval = TimeSpan.Zero }); GlobalConfiguration.Configuration diff --git a/src/HangFire.Azure.ServiceBusQueue/AsyncHelper.cs b/src/HangFire.Azure.ServiceBusQueue/AsyncHelper.cs new file mode 100644 index 0000000..86de7d6 --- /dev/null +++ b/src/HangFire.Azure.ServiceBusQueue/AsyncHelper.cs @@ -0,0 +1,35 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Hangfire.Azure.ServiceBusQueue +{ + //https://github.com/aspnet/AspNetIdentity/blob/main/src/Microsoft.AspNet.Identity.Core/AsyncHelper.cs + //Without the culture thing wich we dont need + internal static class AsyncHelper + { + private static readonly TaskFactory MyTaskFactory = new + TaskFactory(CancellationToken.None, + TaskCreationOptions.None, + TaskContinuationOptions.None, + TaskScheduler.Default); + + public static TResult RunSync(Func> func) + { + return MyTaskFactory + .StartNew(func) + .Unwrap() + .GetAwaiter() + .GetResult(); + } + + public static void RunSync(Func func) + { + MyTaskFactory + .StartNew(func) + .Unwrap() + .GetAwaiter() + .GetResult(); + } + } +} diff --git a/src/HangFire.Azure.ServiceBusQueue/HangFire.Azure.ServiceBusQueue.csproj b/src/HangFire.Azure.ServiceBusQueue/HangFire.Azure.ServiceBusQueue.csproj index 8860cfd..5cb3b22 100644 --- a/src/HangFire.Azure.ServiceBusQueue/HangFire.Azure.ServiceBusQueue.csproj +++ b/src/HangFire.Azure.ServiceBusQueue/HangFire.Azure.ServiceBusQueue.csproj @@ -1,104 +1,24 @@ - - - - - Debug - AnyCPU - {4CC51F69-0311-4485-B7DE-9ECAB3A1B5E5} - Library - Properties - Hangfire.Azure.ServiceBusQueue - Hangfire.Azure.ServiceBusQueue - v4.5 - 512 - ..\ - true - - - true - full - false - bin\Debug\ - DEBUG;TRACE - prompt - 4 - - - pdbonly - true - bin\Release\ - TRACE - prompt - 4 - bin\Release\Hangfire.Azure.ServiceBusQueue.XML - - - - ..\..\packages\Hangfire.Core.1.7.0\lib\net45\Hangfire.Core.dll - True - - - ..\..\packages\Hangfire.SqlServer.1.7.0\lib\net45\Hangfire.SqlServer.dll - True - - - ..\..\packages\WindowsAzure.ServiceBus.2.7.5\lib\net40-full\Microsoft.ServiceBus.dll - True - - - ..\..\packages\Microsoft.WindowsAzure.ConfigurationManager.3.1.0\lib\net40\Microsoft.WindowsAzure.Configuration.dll - True - - - ..\..\packages\Newtonsoft.Json.7.0.1\lib\net45\Newtonsoft.Json.dll - True - - - ..\..\packages\Owin.1.0\lib\net40\Owin.dll - True - - - - - - - - - - - - - - - Properties\SharedAssemblyInfo.cs - - - - - - - - - - - - - - - - - - + - This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}. + net461;netstandard2.0 + portable + false + true + 1591 + Hangfire.Azure.ServiceBusQueue - - - - \ No newline at end of file + + full + + + + + + + + + + + + + diff --git a/src/HangFire.Azure.ServiceBusQueue/IRetryPolicy.cs b/src/HangFire.Azure.ServiceBusQueue/IRetryPolicy.cs index 817e6b9..b505639 100644 --- a/src/HangFire.Azure.ServiceBusQueue/IRetryPolicy.cs +++ b/src/HangFire.Azure.ServiceBusQueue/IRetryPolicy.cs @@ -1,9 +1,10 @@ using System; +using System.Threading.Tasks; namespace Hangfire.Azure.ServiceBusQueue { public interface IRetryPolicy { - void Execute(Action action); + Task Execute(Func action); } } \ No newline at end of file diff --git a/src/HangFire.Azure.ServiceBusQueue/LinearRetryPolicy.cs b/src/HangFire.Azure.ServiceBusQueue/LinearRetryPolicy.cs index f0e45fe..283d3d9 100644 --- a/src/HangFire.Azure.ServiceBusQueue/LinearRetryPolicy.cs +++ b/src/HangFire.Azure.ServiceBusQueue/LinearRetryPolicy.cs @@ -1,40 +1,63 @@ using System; -using System.Threading; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; namespace Hangfire.Azure.ServiceBusQueue { public class LinearRetryPolicy : IRetryPolicy { + private readonly List _retryDelays = new List(); + public LinearRetryPolicy(int retryCount, TimeSpan retryDelay) { - this.RetryDelay = retryDelay; - this.RetryCount = retryCount; + if (retryCount <= 0) + throw new ArgumentOutOfRangeException(nameof(retryCount)); + + if (retryDelay <= TimeSpan.Zero) + throw new ArgumentOutOfRangeException(nameof(retryCount)); + + for (var i = 0; i < retryCount; i++) + { + _retryDelays.Add(retryDelay); + } } - public TimeSpan RetryDelay { get; private set; } + public LinearRetryPolicy(IEnumerable retryDelay) + { + if (retryDelay == null) + throw new ArgumentNullException(nameof(retryDelay)); - public int RetryCount { get; private set; } + var timeSpans = retryDelay.ToList(); + if (!timeSpans.Any() || timeSpans.Any(x=>x <= TimeSpan.Zero)) + throw new ArgumentOutOfRangeException(nameof(retryDelay)); - public void Execute(Action action) + _retryDelays = timeSpans.ToList(); + } + + public async Task Execute(Func action) { - for (var i = 0; i < this.RetryCount; i++) + var exceptions = new List(); + + var attempt = 0; + foreach (var retryDelay in _retryDelays) { + attempt++; try { - action(); + if (attempt > 1) + await Task.Delay(retryDelay).ConfigureAwait(false); + + await action().ConfigureAwait(false); return; } - catch (TimeoutException) + catch (Exception ex) { - if (i == this.RetryCount - 1) - { - throw; - } - - Thread.Sleep(this.RetryDelay); + exceptions.Add(ex); } } + throw new AggregateException(exceptions); } } -} \ No newline at end of file +} diff --git a/src/HangFire.Azure.ServiceBusQueue/Properties/AssemblyInfo.cs b/src/HangFire.Azure.ServiceBusQueue/Properties/AssemblyInfo.cs index 9c38ae8..f044166 100644 --- a/src/HangFire.Azure.ServiceBusQueue/Properties/AssemblyInfo.cs +++ b/src/HangFire.Azure.ServiceBusQueue/Properties/AssemblyInfo.cs @@ -1,9 +1,5 @@ -using System.Reflection; -using System.Runtime.CompilerServices; +using System.Runtime.CompilerServices; using System.Runtime.InteropServices; -[assembly: AssemblyTitle("Hangfire.Azure.ServiceBus")] -[assembly: AssemblyDescription("ServiceBus Queue support for SQL Server job storage implementation")] [assembly: Guid("90fafb33-186a-47f9-84d8-fd516496a697")] - -[assembly: InternalsVisibleTo("HangFire.Azure.ServiceBusQueue.Tests")] \ No newline at end of file +[assembly: InternalsVisibleTo("HangFire.Azure.ServiceBusQueue.Tests")] diff --git a/src/HangFire.Azure.ServiceBusQueue/QueueDescription.cs b/src/HangFire.Azure.ServiceBusQueue/QueueDescription.cs new file mode 100644 index 0000000..c2dc6e8 --- /dev/null +++ b/src/HangFire.Azure.ServiceBusQueue/QueueDescription.cs @@ -0,0 +1,35 @@ +using System; +using Azure.Messaging.ServiceBus.Administration; + +namespace Hangfire.Azure.ServiceBusQueue +{ + /// + /// + /// + public class QueueDescription : CreateQueueOptions + { + public QueueDescription(string name) : base(name) + { + } + + public QueueDescription(QueueProperties queue) : base(queue) + { + } + + public string Path + { + get => Name; + set => Name = value; + } + + public bool EnableDeadLetteringOnMessageExpiration + { + get => DeadLetteringOnMessageExpiration; + set => DeadLetteringOnMessageExpiration = value; + } + + [Obsolete("This property is no longer in use in the latest azure SDK and has no effect on the queue creation")] + public bool EnableExpress { get; set; } + + } +} diff --git a/src/HangFire.Azure.ServiceBusQueue/ServiceBusGlobalConfigurationExtensions.cs b/src/HangFire.Azure.ServiceBusQueue/ServiceBusGlobalConfigurationExtensions.cs new file mode 100644 index 0000000..a42f434 --- /dev/null +++ b/src/HangFire.Azure.ServiceBusQueue/ServiceBusGlobalConfigurationExtensions.cs @@ -0,0 +1,57 @@ +using System; +using Hangfire.Annotations; +using Hangfire.SqlServer; +using Hangfire.States; + +namespace Hangfire.Azure.ServiceBusQueue +{ + public static class ServiceBusGlobalConfigurationExtensions + { + public static IGlobalConfiguration UseServiceBusQueues( + [NotNull] this IGlobalConfiguration configuration, + [NotNull] string connectionString) + { + return UseServiceBusQueues(configuration, new ServiceBusQueueOptions + { + ConnectionString = connectionString, + Queues = new[] { EnqueuedState.DefaultQueue } + }); + } + + public static IGlobalConfiguration UseServiceBusQueues( + [NotNull] this IGlobalConfiguration configuration, + [NotNull] string connectionString, + params string[] queues) + { + return UseServiceBusQueues(configuration, new ServiceBusQueueOptions + { + ConnectionString = connectionString, + Queues = queues + }); + } + + public static IGlobalConfiguration UseServiceBusQueues( + [NotNull] this IGlobalConfiguration configuration, + [NotNull] string connectionString, + Action configureAction, + params string[] queues) + { + return UseServiceBusQueues(configuration, new ServiceBusQueueOptions + { + ConnectionString = connectionString, + Configure = configureAction, + Queues = queues + }); + } + + public static IGlobalConfiguration UseServiceBusQueues( + [NotNull] this IGlobalConfiguration configuration, + [NotNull] ServiceBusQueueOptions options) + { + var sqlServerStorage = configuration.Entry; + var provider = new ServiceBusQueueJobQueueProvider(options); + sqlServerStorage.QueueProviders.Add(provider, options.Queues); + return configuration; + } + } +} diff --git a/src/HangFire.Azure.ServiceBusQueue/ServiceBusManager.cs b/src/HangFire.Azure.ServiceBusQueue/ServiceBusManager.cs index bc67f7b..87a4c30 100644 --- a/src/HangFire.Azure.ServiceBusQueue/ServiceBusManager.cs +++ b/src/HangFire.Azure.ServiceBusQueue/ServiceBusManager.cs @@ -1,111 +1,129 @@ using System; using System.Collections.Generic; -using Microsoft.ServiceBus; -using Microsoft.ServiceBus.Messaging; using Hangfire.Logging; +using System.Threading.Tasks; +using Azure; +using Azure.Messaging.ServiceBus; +using Azure.Messaging.ServiceBus.Administration; namespace Hangfire.Azure.ServiceBusQueue { internal class ServiceBusManager { - private static readonly ILog Logger = LogProvider.GetCurrentClassLogger(); + private readonly ILog _logger = LogProvider.GetCurrentClassLogger(); // Stores the pre-created QueueClients (note the key is the unprefixed queue name) - private readonly Dictionary _clients; + private readonly Dictionary _senders; + private readonly Dictionary _receivers; - private readonly ServiceBusQueueOptions _options; - private readonly NamespaceManager _namespaceManager; - private readonly MessagingFactory _messagingFactory; + private readonly ServiceBusAdministrationClient _managementAdminClient; + private readonly ServiceBusClient _managementClient; public ServiceBusManager(ServiceBusQueueOptions options) { - if (options == null) throw new ArgumentNullException("options"); + Options = options ?? throw new ArgumentNullException(nameof(options)); - _options = options; + _senders = new Dictionary(options.Queues.Length); + _receivers = new Dictionary(options.Queues.Length); + _managementClient = new ServiceBusClient(options.ConnectionString); + _managementAdminClient = new ServiceBusAdministrationClient(options.ConnectionString); - _clients = new Dictionary(options.Queues.Length); - _namespaceManager = NamespaceManager.CreateFromConnectionString(options.ConnectionString); - _messagingFactory = MessagingFactory.CreateFromConnectionString(options.ConnectionString); - - // If we have this option set to true then we will create all clients up-front, otherwise - // the creation will be delayed until the first client is retrieved if (options.CheckAndCreateQueues) { - CreateQueueClients(); + AsyncHelper.RunSync(CreateQueueClients); } } - public ServiceBusQueueOptions Options { get { return _options; } } + ~ServiceBusManager() + { + foreach (var sender in _senders) + { + AsyncHelper.RunSync(() => sender.Value.CloseAsync()); + } + foreach (var receiver in _receivers) + { + AsyncHelper.RunSync(() => receiver.Value.CloseAsync()); + } + _managementClient.DisposeAsync(); + } + + public ServiceBusQueueOptions Options { get; } + + public async Task GetSenderAsync(string queue) + { + if (_senders.Count != Options.Queues.Length) + { + await CreateQueueClients().ConfigureAwait(false); + } + return _senders[queue]; + } - public QueueClient GetClient(string queue) + public async Task GetReceiverAsync(string queue) { - if (_clients.Count != _options.Queues.Length) + if (_receivers.Count != Options.Queues.Length) { - CreateQueueClients(); + await CreateQueueClients().ConfigureAwait(false); } - return _clients[queue]; + return _receivers[queue]; } - public QueueDescription GetDescription(string queue) + public async Task> GetQueueRuntimeInfoAsync(string queue) { - return _namespaceManager.GetQueue(_options.GetQueueName(queue)); + return await _managementAdminClient.GetQueueRuntimePropertiesAsync(Options.GetQueueName(queue)); } - private void CreateQueueClients() + private async Task CreateQueueClients() { - foreach (var queue in _options.Queues) + foreach (var queue in Options.Queues) { - var prefixedQueue = _options.GetQueueName(queue); + var prefixedQueue = Options.GetQueueName(queue); + + await CreateQueueIfNotExistsAsync(prefixedQueue).ConfigureAwait(false); - CreateQueueIfNotExists(prefixedQueue, _namespaceManager, _options); + _logger.TraceFormat("Creating new Senders and Receivers for queue {0}", prefixedQueue); - Logger.TraceFormat("Creating new QueueClient for queue {0}", prefixedQueue); + if (!_senders.ContainsKey(queue)) + _senders.Add(queue, _managementClient.CreateSender(prefixedQueue)); - // Do not store as prefixed queue to avoid having to re-create name in GetClient method - _clients[queue] = this._messagingFactory.CreateQueueClient(prefixedQueue, ReceiveMode.PeekLock); + if (!_receivers.ContainsKey(queue)) + _receivers.Add(queue, _managementClient.CreateReceiver(prefixedQueue)); } } - private static void CreateQueueIfNotExists(string prefixedQueue, NamespaceManager namespaceManager, ServiceBusQueueOptions options) + private async Task CreateQueueIfNotExistsAsync(string prefixedQueue) { - if (options.CheckAndCreateQueues == false) + if (Options.CheckAndCreateQueues == false) { - Logger.InfoFormat("Not checking for the existence of the queue {0}", prefixedQueue); - + _logger.InfoFormat("Not checking for the existence of the queue {0}", prefixedQueue); return; } - try { - Logger.InfoFormat("Checking if queue {0} exists", prefixedQueue); + _logger.InfoFormat("Checking if queue {0} exists", prefixedQueue); - if (namespaceManager.QueueExists(prefixedQueue)) + if (await _managementAdminClient.QueueExistsAsync(prefixedQueue).ConfigureAwait(false)) { return; } - Logger.InfoFormat("Creating new queue {0}", prefixedQueue); + _logger.InfoFormat("Creating new queue {0}", prefixedQueue); - var description = new QueueDescription(prefixedQueue); - if (options.RequiresDuplicateDetection != null) + var queueOptions = new QueueDescription(prefixedQueue); + if (Options.RequiresDuplicateDetection != null) { - description.RequiresDuplicateDetection = options.RequiresDuplicateDetection.Value; + queueOptions.RequiresDuplicateDetection = Options.RequiresDuplicateDetection.Value; } - if (options.Configure != null) - { - options.Configure(description); - } + Options.Configure?.Invoke(queueOptions); - namespaceManager.CreateQueue(description); + await _managementAdminClient.CreateQueueAsync(queueOptions).ConfigureAwait(false); } catch (UnauthorizedAccessException ex) { - var errorMessage = string.Format( - "Queue '{0}' could not be checked / created, likely due to missing the 'Manage' permission. " + - "You must either grant the 'Manage' permission, or set ServiceBusQueueOptions.CheckAndCreateQueues to false", - prefixedQueue); + var errorMessage = + $"Queue '{prefixedQueue}' could not be checked / created, likely due to missing the 'Manage' permission. " + + "You must either grant the 'Manage' permission, or set ServiceBusQueueOptions.CheckAndCreateQueues to false"; throw new UnauthorizedAccessException(errorMessage, ex); } diff --git a/src/HangFire.Azure.ServiceBusQueue/ServiceBusQueueFetchedJob.cs b/src/HangFire.Azure.ServiceBusQueue/ServiceBusQueueFetchedJob.cs index b666cf7..85f286a 100644 --- a/src/HangFire.Azure.ServiceBusQueue/ServiceBusQueueFetchedJob.cs +++ b/src/HangFire.Azure.ServiceBusQueue/ServiceBusQueueFetchedJob.cs @@ -1,66 +1,91 @@ using System; -using System.Diagnostics; using System.Threading; using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; using Hangfire.Logging; using Hangfire.Storage; -using Microsoft.ServiceBus.Messaging; namespace Hangfire.Azure.ServiceBusQueue { internal class ServiceBusQueueFetchedJob : IFetchedJob { - private readonly ILog _logger = LogProvider.GetLogger(typeof(ServiceBusQueueFetchedJob)); - private readonly BrokeredMessage _message; + private readonly ILog _logger = LogProvider.GetCurrentClassLogger(); + private readonly ServiceBusReceiver _client; private readonly TimeSpan? _lockRenewalDelay; private readonly CancellationTokenSource _cancellationTokenSource; + private readonly ServiceBusReceivedMessage _message; private bool _completed; private bool _disposed; - public ServiceBusQueueFetchedJob(BrokeredMessage message, TimeSpan? lockRenewalDelay) + public ServiceBusQueueFetchedJob(ServiceBusReceiver client, ServiceBusReceivedMessage message, TimeSpan? lockRenewalDelay) { - if (message == null) throw new ArgumentNullException("message"); - - _message = message; + _message = message ?? throw new ArgumentNullException(nameof(message)); + _client = client; _lockRenewalDelay = lockRenewalDelay; _cancellationTokenSource = new CancellationTokenSource(); - JobId = _message.GetBody(); + JobId = message.Body.ToString(); KeepAlive(); } - public string JobId { get; private set; } - - public BrokeredMessage Message { get { return this._message; } } + public string JobId { get; } + public ServiceBusReceivedMessage Message => _message; public void Requeue() { _cancellationTokenSource.Cancel(); - _message.Abandon(); - _completed = true; + try + { + AsyncHelper.RunSync(() => _client.AbandonMessageAsync(_message)); + _completed = true; + } + catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.MessageNotFound) + { + _logger.Warn($"Message with token '{_message.LockToken}' not found in service bus."); + } + catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.MessageLockLost) + { + } } public void RemoveFromQueue() { _cancellationTokenSource.Cancel(); - _message.Complete(); - _completed = true; + try + { + AsyncHelper.RunSync(() => _client.CompleteMessageAsync(_message)); + _completed = true; + } + catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.MessageNotFound) + { + _logger.Warn($"Message with token '{_message.LockToken}' not found in service bus."); + } + catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.MessageLockLost) + { + } } public void Dispose() { _cancellationTokenSource.Cancel(); + _cancellationTokenSource.Dispose(); - if (!_completed && !_disposed) + try + { + if (!_completed && !_disposed) + { + AsyncHelper.RunSync(() => _client.AbandonMessageAsync(_message)); + } + } + catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.MessageNotFound + || ex.Reason == ServiceBusFailureReason.MessageLockLost) { - _message.Abandon(); } - _message.Dispose(); _disposed = true; } @@ -76,26 +101,28 @@ private void KeepAlive() // However since clocks may be non-synchronized well, for long-running // background jobs it's better to have more renewal attempts than a // lock that's expired too early. - var toWait = _lockRenewalDelay.HasValue - ? _lockRenewalDelay.Value - : _message.LockedUntilUtc - DateTime.UtcNow - TimeSpan.FromSeconds(1); - - await Task.Delay(toWait, _cancellationTokenSource.Token); + var toWait = _lockRenewalDelay ?? + _message.LockedUntil - DateTime.UtcNow - TimeSpan.FromSeconds(1); + + await Task.Delay(toWait, _cancellationTokenSource.Token).ConfigureAwait(false); // Double check we have not been cancelled to avoid renewing a lock // unnecessarily - if (!_cancellationTokenSource.Token.IsCancellationRequested) + if (_cancellationTokenSource.Token.IsCancellationRequested) continue; + + try + { + await _client.RenewMessageLockAsync(_message).ConfigureAwait(false); + } + catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.MessageNotFound + || ex.Reason == ServiceBusFailureReason.MessageLockLost) + { + break; + } + catch (Exception ex) { - try - { - _message.RenewLock(); - } - catch (Exception ex) - { - _logger.DebugException( - String.Format("An exception was thrown while trying to renew a lock for job '{0}'.", JobId), - ex); - } + _logger.DebugException( + $"An exception was thrown while trying to renew a lock for job '{JobId}'.", ex); } } }, _cancellationTokenSource.Token); diff --git a/src/HangFire.Azure.ServiceBusQueue/ServiceBusQueueJobQueue.cs b/src/HangFire.Azure.ServiceBusQueue/ServiceBusQueueJobQueue.cs index c3ad49b..1d49596 100644 --- a/src/HangFire.Azure.ServiceBusQueue/ServiceBusQueueJobQueue.cs +++ b/src/HangFire.Azure.ServiceBusQueue/ServiceBusQueueJobQueue.cs @@ -4,82 +4,102 @@ using System.Transactions; using Hangfire.SqlServer; using Hangfire.Storage; -using Microsoft.ServiceBus.Messaging; -using System.Data; +using System.Threading.Tasks; +using System.Data.Common; +using Azure.Messaging.ServiceBus; +using Hangfire.Logging; namespace Hangfire.Azure.ServiceBusQueue { internal class ServiceBusQueueJobQueue : IPersistentJobQueue { - private static readonly TimeSpan MinSyncReceiveTimeout = TimeSpan.FromTicks(1); - private readonly ServiceBusManager _manager; private readonly ServiceBusQueueOptions _options; + private readonly ILog _logger = LogProvider.GetCurrentClassLogger(); public ServiceBusQueueJobQueue(ServiceBusManager manager, ServiceBusQueueOptions options) { - if (manager == null) throw new ArgumentNullException("manager"); - if (options == null) throw new ArgumentNullException("options"); - - _manager = manager; - _options = options; + _manager = manager ?? throw new ArgumentNullException(nameof(manager)); + _options = options ?? throw new ArgumentNullException(nameof(options)); } public IFetchedJob Dequeue(string[] queues, CancellationToken cancellationToken) { - BrokeredMessage message = null; - var queueIndex = 0; + if (queues == null) + throw new ArgumentNullException(nameof(queues)); - var clients = queues - .Select(queue => _manager.GetClient(queue)) - .ToArray(); + if (queues.Length == 0) + throw new ArgumentException("Queue array must not be empty.", nameof(queues)); - do + return Task.Run(async () => { - cancellationToken.ThrowIfCancellationRequested(); + var queueIndex = 0; + var receivers = await Task.WhenAll(queues.Select(queue => _manager.GetReceiverAsync(queue))).ConfigureAwait(false); - try + do { - var client = clients[queueIndex]; - var isLastQueue = queueIndex == queues.Length - 1; + cancellationToken.ThrowIfCancellationRequested(); + var busReceiver = receivers[queueIndex]; - message = isLastQueue - ? client.Receive(_manager.Options.LoopReceiveTimeout) // Last queue - : client.Receive(MinSyncReceiveTimeout); - } - catch (TimeoutException) - { - } - catch (MessagingEntityNotFoundException ex) - { - var errorMessage = string.Format( - "Queue {0} could not be found. Either create the queue manually, " + - "or grant the Manage permission and set ServiceBusQueueOptions.CheckAndCreateQueues to true", - clients[queueIndex].Path); + try + { + var message = await busReceiver.ReceiveMessageAsync(_manager.Options.LoopReceiveTimeout, cancellationToken) + .ConfigureAwait(false); + + if (message != null) + { + _logger.Info( + $"{DateTime.Now} - Dequeue one message from queue {busReceiver.EntityPath} with body {message.Body}"); + return new ServiceBusQueueFetchedJob(busReceiver, message, _options.LockRenewalDelay); + } + } + catch (TaskCanceledException) + { + _logger.Warn($"a cancellation has been requested for the queue {busReceiver.EntityPath}"); + } + catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.ServiceTimeout) + { + _logger.Warn("Servicebus timeout dequeuing one message from queue {busReceiver.EntityPath}"); + } + catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.MessagingEntityNotFound) + { + var errorMessage = + $"Queue {busReceiver.EntityPath} could not be found. Either create the queue manually, " + + "or grant the Manage permission and set ServiceBusQueueOptions.CheckAndCreateQueues to true"; + + throw new UnauthorizedAccessException(errorMessage, ex); + } - throw new UnauthorizedAccessException(errorMessage, ex); - } + queueIndex = (queueIndex + 1) % queues.Length; - queueIndex = (queueIndex + 1) % queues.Length; - } while (message == null); + await Task.Delay(_options.QueuePollInterval).ConfigureAwait(false); - return new ServiceBusQueueFetchedJob(message, _options.LockRenewalDelay); + } while (true); + }).GetAwaiter().GetResult(); } - public void Enqueue(IDbConnection connection, string queue, string jobId) +#if NETSTANDARD2_0 + public void Enqueue(DbConnection connection, DbTransaction transaction, string queue, string jobId) +#else + public void Enqueue(System.Data.IDbConnection connection, string queue, string jobId) +#endif { // Because we are within a TransactionScope at this point the below // call would not work (Local transactions are not supported with other resource managers/DTC // exception is thrown) without suppression using (new TransactionScope(TransactionScopeOption.Suppress)) { - var client = _manager.GetClient(queue); - - using (var message = new BrokeredMessage(jobId) { MessageId = jobId }) - { - _manager.Options.RetryPolicy.Execute(() => client.Send(message)); - } + AsyncHelper.RunSync(() => DoEnqueueAsync(queue, jobId)); } } + + private async Task DoEnqueueAsync(string queue, string jobId) + { + var sender = await _manager.GetSenderAsync(queue).ConfigureAwait(false); + + var message = new ServiceBusMessage(jobId) { MessageId = jobId }; + await _manager.Options.RetryPolicy.Execute(() => sender.SendMessageAsync(message)).ConfigureAwait(false); + _logger.Info($"{DateTime.Now} - Enqueue one message to queue {sender.EntityPath} with body {jobId}"); + } } -} \ No newline at end of file +} diff --git a/src/HangFire.Azure.ServiceBusQueue/ServiceBusQueueJobQueueProvider.cs b/src/HangFire.Azure.ServiceBusQueue/ServiceBusQueueJobQueueProvider.cs index 6e370b4..393c5a4 100644 --- a/src/HangFire.Azure.ServiceBusQueue/ServiceBusQueueJobQueueProvider.cs +++ b/src/HangFire.Azure.ServiceBusQueue/ServiceBusQueueJobQueueProvider.cs @@ -14,7 +14,7 @@ internal class ServiceBusQueueJobQueueProvider : IPersistentJobQueueProvider public ServiceBusQueueJobQueueProvider(ServiceBusQueueOptions options) { - if (options == null) throw new ArgumentNullException("options"); + if (options == null) throw new ArgumentNullException(nameof(options)); options.Validate(); @@ -25,7 +25,7 @@ public ServiceBusQueueJobQueueProvider(ServiceBusQueueOptions options) var manager = new ServiceBusManager(options); - _jobQueue = new ServiceBusQueueJobQueue(manager, options); + _jobQueue = new ServiceBusQueueJobQueue(manager, options); _monitoringApi = new ServiceBusQueueMonitoringApi(manager, options.Queues); } diff --git a/src/HangFire.Azure.ServiceBusQueue/ServiceBusQueueMonitoringApi.cs b/src/HangFire.Azure.ServiceBusQueue/ServiceBusQueueMonitoringApi.cs index b6e9aa4..6377328 100644 --- a/src/HangFire.Azure.ServiceBusQueue/ServiceBusQueueMonitoringApi.cs +++ b/src/HangFire.Azure.ServiceBusQueue/ServiceBusQueueMonitoringApi.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading.Tasks; using Hangfire.SqlServer; namespace Hangfire.Azure.ServiceBusQueue @@ -12,11 +13,8 @@ internal class ServiceBusQueueMonitoringApi : IPersistentJobQueueMonitoringApi public ServiceBusQueueMonitoringApi(ServiceBusManager manager, string[] queues) { - if (manager == null) throw new ArgumentNullException("manager"); - if (queues == null) throw new ArgumentNullException("queues"); - - _manager = manager; - _queues = queues; + _manager = manager ?? throw new ArgumentNullException(nameof(manager)); + _queues = queues ?? throw new ArgumentNullException(nameof(queues)); } public IEnumerable GetQueues() @@ -24,47 +22,49 @@ public IEnumerable GetQueues() return _queues; } - public IEnumerable GetEnqueuedJobIds(string queue, int @from, int perPage) + public IEnumerable GetEnqueuedJobIds(string queue, int from, int perPage) + { + return AsyncHelper.RunSync(() => GetEnqueuedJobIdsAsync(queue, from, perPage)); + } + + private async Task> GetEnqueuedJobIdsAsync(string queue, int from, int perPage) { - var client = _manager.GetClient(queue); + var receiver = await _manager.GetReceiverAsync(queue).ConfigureAwait(false); + var jobIds = new List(); - // We have to overfetch to retrieve enough messages for paging. - // e.g. @from = 10 and page size = 20 we need 30 messages from the start - var messages = client.PeekBatch(0, @from + perPage).ToArray(); - - // We could use LINQ here but to avoid creating lots of garbage lists - // through .Skip / .ToList etc. use a simple loop. - for (var i = 0; i < messages.Length; i++) - { - var msg = messages[i]; + // Hangfire api require a 0 based index for @from, but PeekMessageAsync is 1 based + var messages = await receiver.PeekMessagesAsync(perPage, from + 1).ConfigureAwait(false); - // Only include the job id once we have skipped past the @from - // number - if (i >= @from) + foreach (var msg in messages) + { + if (long.TryParse(msg.Body.ToString(), out var longJobId)) { - jobIds.Add(long.Parse(msg.GetBody())); + jobIds.Add(longJobId); } - - msg.Dispose(); } return jobIds; } - public IEnumerable GetFetchedJobIds(string queue, int @from, int perPage) + public IEnumerable GetFetchedJobIds(string queue, int from, int perPage) { return Enumerable.Empty(); } public EnqueuedAndFetchedCountDto GetEnqueuedAndFetchedCount(string queue) { - var queueDescriptor = _manager.GetDescription(queue); + return AsyncHelper.RunSync(() => GetEnqueuedAndFetchedCountAsync(queue)); + } + + private async Task GetEnqueuedAndFetchedCountAsync(string queue) + { + var queueRuntimeInfo = await _manager.GetQueueRuntimeInfoAsync(queue).ConfigureAwait(false); return new EnqueuedAndFetchedCountDto { - EnqueuedCount = (int) queueDescriptor.MessageCountDetails.ActiveMessageCount, - FetchedCount = null + EnqueuedCount = (int)queueRuntimeInfo.Value.ActiveMessageCount, + FetchedCount = null }; } } diff --git a/src/HangFire.Azure.ServiceBusQueue/ServiceBusQueueOptions.cs b/src/HangFire.Azure.ServiceBusQueue/ServiceBusQueueOptions.cs index 439f8dc..d9eeee6 100644 --- a/src/HangFire.Azure.ServiceBusQueue/ServiceBusQueueOptions.cs +++ b/src/HangFire.Azure.ServiceBusQueue/ServiceBusQueueOptions.cs @@ -1,5 +1,4 @@ using System; -using Microsoft.ServiceBus.Messaging; namespace Hangfire.Azure.ServiceBusQueue { @@ -7,9 +6,22 @@ public class ServiceBusQueueOptions { public ServiceBusQueueOptions() { - this.CheckAndCreateQueues = true; - this.LoopReceiveTimeout = TimeSpan.FromMilliseconds(500); - this.RetryPolicy = new LinearRetryPolicy(3, TimeSpan.FromSeconds(1)); + QueuePollInterval = TimeSpan.Zero; + CheckAndCreateQueues = true; + LoopReceiveTimeout = TimeSpan.FromMilliseconds(500); + RetryPolicy = new LinearRetryPolicy(3, TimeSpan.FromSeconds(1)); + } + + private TimeSpan _queuePollInterval; + + public TimeSpan QueuePollInterval + { + get => _queuePollInterval; + set + { + var message = $"The QueuePollInterval property value should be positive. Given: {value}."; + _queuePollInterval = !(value != value.Duration()) ? value : throw new ArgumentException(message, nameof(value)); + } } /// @@ -35,7 +47,7 @@ public ServiceBusQueueOptions() /// requested. /// public bool CheckAndCreateQueues { get; set; } - + /// /// Gets or sets a delay between calls to the method /// to disallow workers to pick up the same background job several time while it's still @@ -97,7 +109,8 @@ internal void Validate() throw new InvalidOperationException("Must supply Queues to ServiceBusQueueOptions"); if (Queues.Length == 0) - throw new InvalidOperationException("Must supply at least one queue in Queues property of ServiceBusQueueOptions"); + throw new InvalidOperationException( + "Must supply at least one queue in Queues property of ServiceBusQueueOptions"); } } -} \ No newline at end of file +} diff --git a/src/HangFire.Azure.ServiceBusQueue/ServiceBusQueueSqlServerStorageExtensions.cs b/src/HangFire.Azure.ServiceBusQueue/ServiceBusQueueSqlServerStorageExtensions.cs index 73b2439..a55a4ea 100644 --- a/src/HangFire.Azure.ServiceBusQueue/ServiceBusQueueSqlServerStorageExtensions.cs +++ b/src/HangFire.Azure.ServiceBusQueue/ServiceBusQueueSqlServerStorageExtensions.cs @@ -1,7 +1,7 @@ using System; +using Azure.Messaging.ServiceBus.Administration; using Hangfire.SqlServer; using Hangfire.States; -using Microsoft.ServiceBus.Messaging; namespace Hangfire.Azure.ServiceBusQueue { @@ -14,7 +14,7 @@ public static SqlServerStorage UseServiceBusQueues( return UseServiceBusQueues(storage, new ServiceBusQueueOptions { ConnectionString = connectionString, - Queues = new[] { EnqueuedState.DefaultQueue } + Queues = new[] { EnqueuedState.DefaultQueue } }); } @@ -26,21 +26,21 @@ public static SqlServerStorage UseServiceBusQueues( return UseServiceBusQueues(storage, new ServiceBusQueueOptions { ConnectionString = connectionString, - Queues = queues + Queues = queues }); } public static SqlServerStorage UseServiceBusQueues( this SqlServerStorage storage, string connectionString, - Action configureAction, + Action configureAction, params string[] queues) { return UseServiceBusQueues(storage, new ServiceBusQueueOptions { ConnectionString = connectionString, - Configure = configureAction, - Queues = queues + Configure = configureAction, + Queues = queues }); } @@ -48,8 +48,8 @@ public static SqlServerStorage UseServiceBusQueues( this SqlServerStorage storage, ServiceBusQueueOptions options) { - if (storage == null) throw new ArgumentNullException("storage"); - if (options == null) throw new ArgumentNullException("options"); + if (storage == null) throw new ArgumentNullException(nameof(storage)); + if (options == null) throw new ArgumentNullException(nameof(options)); var provider = new ServiceBusQueueJobQueueProvider(options); diff --git a/src/HangFire.Azure.ServiceBusQueue/packages.config b/src/HangFire.Azure.ServiceBusQueue/packages.config deleted file mode 100644 index bbbaf0d..0000000 --- a/src/HangFire.Azure.ServiceBusQueue/packages.config +++ /dev/null @@ -1,9 +0,0 @@ - - - - - - - - - \ No newline at end of file diff --git a/src/SharedAssemblyInfo.cs b/src/SharedAssemblyInfo.cs index 9eba81a..9e32d73 100644 --- a/src/SharedAssemblyInfo.cs +++ b/src/SharedAssemblyInfo.cs @@ -10,4 +10,4 @@ [assembly: CLSCompliant(false)] // Don't edit manually! Use `build.bat version` command instead! -[assembly: AssemblyVersion("4.1.0")] +[assembly: AssemblyVersion("5.0.0")] diff --git a/tests/HangFire.Azure.ServiceBusQueue.Tests/HangFire.Azure.ServiceBusQueue.Tests.csproj b/tests/HangFire.Azure.ServiceBusQueue.Tests/HangFire.Azure.ServiceBusQueue.Tests.csproj index ed7eb69..0f7740d 100644 --- a/tests/HangFire.Azure.ServiceBusQueue.Tests/HangFire.Azure.ServiceBusQueue.Tests.csproj +++ b/tests/HangFire.Azure.ServiceBusQueue.Tests/HangFire.Azure.ServiceBusQueue.Tests.csproj @@ -1,83 +1,25 @@ - - - + - Debug - AnyCPU - {C9D29F07-4445-4FCB-BC38-2221B2B38231} - Library - Properties - HangFire.Azure.ServiceBusQueue.Tests - HangFire.Azure.ServiceBusQueue.Tests - v4.5.2 - 512 + net5.0 + Hangfire.Azure.ServiceBusQueue.Tests + Hangfire.Azure.ServiceBusQueue.Tests + Copyright © Hangfire.Azure.ServiceBusQueue authors 2017 - - true - full - false - bin\Debug\ - DEBUG;TRACE - prompt - 4 - - - pdbonly - true - bin\Release\ - TRACE - prompt - 4 - - - - ..\..\packages\Hangfire.Core.1.7.0\lib\net45\Hangfire.Core.dll - - - ..\..\packages\Hangfire.SqlServer.1.7.0\lib\net45\Hangfire.SqlServer.dll - - - ..\..\packages\WindowsAzure.ServiceBus.2.7.5\lib\net40-full\Microsoft.ServiceBus.dll - - - ..\..\packages\Microsoft.WindowsAzure.ConfigurationManager.3.1.0\lib\net40\Microsoft.WindowsAzure.Configuration.dll - - - ..\..\packages\Newtonsoft.Json.7.0.1\lib\net45\Newtonsoft.Json.dll - - - ..\..\packages\NUnit.3.6.1\lib\net45\nunit.framework.dll - - - ..\..\packages\Owin.1.0\lib\net40\Owin.dll - - - - - - - - - - - - - - - - - - + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + - - {4cc51f69-0311-4485-b7de-9ecab3a1b5e5} - Hangfire.Azure.ServiceBusQueue - + - + + Always + - \ No newline at end of file diff --git a/tests/HangFire.Azure.ServiceBusQueue.Tests/Properties/AssemblyInfo.cs b/tests/HangFire.Azure.ServiceBusQueue.Tests/Properties/AssemblyInfo.cs deleted file mode 100644 index 137e49f..0000000 --- a/tests/HangFire.Azure.ServiceBusQueue.Tests/Properties/AssemblyInfo.cs +++ /dev/null @@ -1,36 +0,0 @@ -using System.Reflection; -using System.Runtime.CompilerServices; -using System.Runtime.InteropServices; - -// General Information about an assembly is controlled through the following -// set of attributes. Change these attribute values to modify the information -// associated with an assembly. -[assembly: AssemblyTitle("HangFire.Azure.ServiceBusQueue.Tests")] -[assembly: AssemblyDescription("")] -[assembly: AssemblyConfiguration("")] -[assembly: AssemblyCompany("")] -[assembly: AssemblyProduct("HangFire.Azure.ServiceBusQueue.Tests")] -[assembly: AssemblyCopyright("Copyright © 2017")] -[assembly: AssemblyTrademark("")] -[assembly: AssemblyCulture("")] - -// Setting ComVisible to false makes the types in this assembly not visible -// to COM components. If you need to access a type in this assembly from -// COM, set the ComVisible attribute to true on that type. -[assembly: ComVisible(false)] - -// The following GUID is for the ID of the typelib if this project is exposed to COM -[assembly: Guid("c9d29f07-4445-4fcb-bc38-2221b2b38231")] - -// Version information for an assembly consists of the following four values: -// -// Major Version -// Minor Version -// Build Number -// Revision -// -// You can specify all the values or you can default the Build and Revision Numbers -// by using the '*' as shown below: -// [assembly: AssemblyVersion("1.0.*")] -[assembly: AssemblyVersion("1.0.0.0")] -[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/tests/HangFire.Azure.ServiceBusQueue.Tests/ServiceBusQueueJobQueueFacts.cs b/tests/HangFire.Azure.ServiceBusQueue.Tests/ServiceBusQueueJobQueueFacts.cs index 78fe79a..886c794 100644 --- a/tests/HangFire.Azure.ServiceBusQueue.Tests/ServiceBusQueueJobQueueFacts.cs +++ b/tests/HangFire.Azure.ServiceBusQueue.Tests/ServiceBusQueueJobQueueFacts.cs @@ -1,9 +1,11 @@ using System; using System.Linq; using System.Threading; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus.Administration; using Hangfire.Azure.ServiceBusQueue; using Hangfire.SqlServer; -using Microsoft.ServiceBus; +using Hangfire.Storage; using NUnit.Framework; namespace HangFire.Azure.ServiceBusQueue.Tests @@ -23,13 +25,13 @@ public void SetUpQueue() } [TearDown] - public void DeleteQueues() + public async Task DeleteQueues() { - var manager = NamespaceManager.CreateFromConnectionString(options.ConnectionString); + var manager = new ServiceBusAdministrationClient(options.ConnectionString); foreach (var queueName in options.Queues) { - manager.DeleteQueue(options.QueuePrefix + queueName); + await manager.DeleteQueueAsync(options.QueuePrefix + queueName); } } @@ -60,7 +62,7 @@ public void Dequeue_ShouldWaitIndefinitely_WhenThereAreNoJobs() public void Dequeue_ShouldFetchJobs_OnlyFromSpecifiedQueues() { // Arrange - queue.Enqueue(null, options.Queues[0], "1"); + queue.Enqueue(null, null, options.Queues[0], "1"); // Act / Assert Assert.Throws( @@ -73,27 +75,54 @@ public void Dequeue_ShouldFetchJobs_OnlyFromSpecifiedQueues() public void Dequeue_ShouldFetchJobs_FromMultipleQueues() { // Arrange - queue.Enqueue(null, options.Queues[0], "1"); - queue.Enqueue(null, options.Queues[1], "2"); + queue.Enqueue(null, null, options.Queues[0], "1"); + queue.Enqueue(null, null, options.Queues[1], "2"); // Act / Assert var job1 = queue.Dequeue( - options.Queues, - CreateTimingOutCancellationToken()); + options.Queues, + CreateTimingOutCancellationToken()); var job2 = queue.Dequeue( - options.Queues, - CreateTimingOutCancellationToken()); + options.Queues, + CreateTimingOutCancellationToken()); Assert.That(job1.JobId, Is.EqualTo("1")); Assert.That(job2.JobId, Is.EqualTo("2")); } + [Test] + public void Dequeue_ShouldFetchJobs_In_Multithread() + { + // Arrange + queue.Enqueue(null, null, options.Queues[0], "1"); + queue.Enqueue(null, null, options.Queues[0], "2"); + queue.Enqueue(null, null, options.Queues[0], "3"); + queue.Enqueue(null, null, options.Queues[0], "4"); + + var task1 = Task.Run(() => queue.Dequeue(options.Queues, CreateTimingOutCancellationToken())); + var task2 = Task.Run(() => queue.Dequeue(options.Queues, CreateTimingOutCancellationToken())); + var task3 = Task.Run(() => queue.Dequeue(options.Queues, CreateTimingOutCancellationToken())); + var task4 = Task.Run(() => queue.Dequeue(options.Queues, CreateTimingOutCancellationToken())); + + IFetchedJob[] job = Task.WhenAll(task1, task2, task3, task4).GetAwaiter().GetResult(); + + Assert.That(job[0].JobId, Is.AnyOf("1", "2", "3", "4")); + Assert.That(job[1].JobId, Is.AnyOf("1", "2", "3", "4")); + Assert.That(job[2].JobId, Is.AnyOf("1", "2", "3", "4")); + Assert.That(job[3].JobId, Is.AnyOf("1", "2", "3", "4")); + + Assert.That(job[0].JobId, !Is.AnyOf(job[1].JobId, job[2].JobId, job[3].JobId)); + Assert.That(job[1].JobId, !Is.AnyOf(job[0].JobId, job[2].JobId, job[3].JobId)); + Assert.That(job[2].JobId, !Is.AnyOf(job[0].JobId, job[1].JobId, job[3].JobId)); + Assert.That(job[3].JobId, !Is.AnyOf(job[0].JobId, job[1].JobId, job[2].JobId)); + } + [Test] public void Dequeue_ShouldFetchAJob_FromTheSpecifiedQueue() { // Arrange - queue.Enqueue(null, options.Queues[0], "1"); + queue.Enqueue(null, null, options.Queues[0], "1"); // Act / Assert var job = queue.Dequeue(options.Queues, CreateTimingOutCancellationToken()); @@ -105,7 +134,7 @@ public void Dequeue_ShouldFetchAJob_FromTheSpecifiedQueue() public void Dequeue_Complete_Should_RemoveFromQueue() { // Arrange - queue.Enqueue(null, options.Queues[0], "1"); + queue.Enqueue(null, null, options.Queues[0], "1"); // Act var job = queue.Dequeue(options.Queues, CreateTimingOutCancellationToken()); @@ -121,7 +150,7 @@ public void Dequeue_Complete_Should_RemoveFromQueue() public void Dequeue_Requeue_ShouldAddBackToQueue() { // Arrange - queue.Enqueue(null, options.Queues[0], "1"); + queue.Enqueue(null, null, options.Queues[0], "1"); // Act var job = queue.Dequeue(options.Queues, CreateTimingOutCancellationToken()); @@ -145,7 +174,7 @@ public void Dequeue_Complete_AfterLockTime_ShouldRemoveFromQueue(int lockTimeMs, // Arrange SetUpQueue(TimeSpan.FromMilliseconds(lockTimeMs)); - queue.Enqueue(null, options.Queues[0], "1"); + queue.Enqueue(null, null, options.Queues[0], "1"); // Act var job = queue.Dequeue(options.Queues, CreateTimingOutCancellationToken()); @@ -174,8 +203,8 @@ private void SetUpQueue(TimeSpan lockDuration) provider = new ServiceBusQueueJobQueueProvider(options); - queue = provider.GetJobQueue(); + queue = provider.GetJobQueue(); monitor = provider.GetJobQueueMonitoringApi(); } } -} \ No newline at end of file +} diff --git a/tests/HangFire.Azure.ServiceBusQueue.Tests/ServiceBusQueueMonitoringApiFacts.cs b/tests/HangFire.Azure.ServiceBusQueue.Tests/ServiceBusQueueMonitoringApiFacts.cs index bd26f14..635ab2e 100644 --- a/tests/HangFire.Azure.ServiceBusQueue.Tests/ServiceBusQueueMonitoringApiFacts.cs +++ b/tests/HangFire.Azure.ServiceBusQueue.Tests/ServiceBusQueueMonitoringApiFacts.cs @@ -1,7 +1,10 @@ -using System.Threading; +using System.Reflection; +using System.Threading; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; +using Azure.Messaging.ServiceBus.Administration; using Hangfire.Azure.ServiceBusQueue; using Hangfire.SqlServer; -using Microsoft.ServiceBus; using NUnit.Framework; namespace HangFire.Azure.ServiceBusQueue.Tests @@ -17,21 +20,20 @@ public class ServiceBusQueueMonitoringApiFacts [SetUp] public void SetUpQueue() { - options = new TestServiceBusQueueOptions(); + options = new TestServiceBusQueueOptions(); provider = new ServiceBusQueueJobQueueProvider(options); - - queue = provider.GetJobQueue(); - monitor = provider.GetJobQueueMonitoringApi(); + queue = provider.GetJobQueue(); + monitor = provider.GetJobQueueMonitoringApi(); } [TearDown] - public void DeleteQueues() + public async Task DeleteQueues() { - var manager = NamespaceManager.CreateFromConnectionString(options.ConnectionString); + var manager = new ServiceBusAdministrationClient(options.ConnectionString); - foreach(var queue in options.Queues) + foreach (var currentQueue in options.Queues) { - manager.DeleteQueue(options.QueuePrefix + queue); + await manager.DeleteQueueAsync(options.QueuePrefix + currentQueue); } } @@ -51,7 +53,7 @@ public void GetEnqueuedAndFetchedCount_WhenNoJobs_ShouldCountFromQueue() public void GetEnqueuedAndFetchedCount_WhenJobs_ShouldCountFromQueue() { // Arrange - queue.Enqueue(null, options.Queues[0], "1234"); + queue.Enqueue(null, null, options.Queues[0], "1234"); // Act var counts = monitor.GetEnqueuedAndFetchedCount(options.Queues[0]); @@ -62,13 +64,17 @@ public void GetEnqueuedAndFetchedCount_WhenJobs_ShouldCountFromQueue() } [Test] - public void GetEnqueuedAndFetchedCount_WhenDeadlettered_ShouldIgnore() + public async Task GetEnqueuedAndFetchedCount_WhenDeadlettered_ShouldIgnore() { // Arrange - queue.Enqueue(null, options.Queues[0], "1234"); - + queue.Enqueue(null, null, options.Queues[0], "1234"); var job = (ServiceBusQueueFetchedJob)queue.Dequeue(options.Queues, default(CancellationToken)); - job.Message.DeadLetter(); + + + var fieldInfo = job.GetType().GetTypeInfo().GetField("_client", BindingFlags.NonPublic | BindingFlags.Instance); + var client = fieldInfo?.GetValue(job) as ServiceBusReceiver; + + await client.DeadLetterMessageAsync(job.Message); // Act var counts = monitor.GetEnqueuedAndFetchedCount(options.Queues[0]); @@ -82,10 +88,10 @@ public void GetEnqueuedAndFetchedCount_WhenDeadlettered_ShouldIgnore() public void GetEnqueuedJobIds_WhenJobs_ShouldWorkPage1() { // Arrange - queue.Enqueue(null, options.Queues[0], "1"); - queue.Enqueue(null, options.Queues[0], "2"); - queue.Enqueue(null, options.Queues[0], "3"); - queue.Enqueue(null, options.Queues[0], "4"); + queue.Enqueue(null, null, options.Queues[0], "1"); + queue.Enqueue(null, null, options.Queues[0], "2"); + queue.Enqueue(null, null, options.Queues[0], "3"); + queue.Enqueue(null, null, options.Queues[0], "4"); // Act var counts = monitor.GetEnqueuedJobIds(options.Queues[0], 0, 5); @@ -98,10 +104,10 @@ public void GetEnqueuedJobIds_WhenJobs_ShouldWorkPage1() public void GetEnqueuedJobIds_WhenJobs_ShouldWorkPage2() { // Arrange - queue.Enqueue(null, options.Queues[0], "1"); - queue.Enqueue(null, options.Queues[0], "2"); - queue.Enqueue(null, options.Queues[0], "3"); - queue.Enqueue(null, options.Queues[0], "4"); + queue.Enqueue(null, null, options.Queues[0], "1"); + queue.Enqueue(null, null, options.Queues[0], "2"); + queue.Enqueue(null, null, options.Queues[0], "3"); + queue.Enqueue(null, null, options.Queues[0], "4"); // Act var counts1 = monitor.GetEnqueuedJobIds(options.Queues[0], 0, 2); @@ -116,9 +122,9 @@ public void GetEnqueuedJobIds_WhenJobs_ShouldWorkPage2() public void GetEnqueuedJobIds_WhenJobs_ShouldWorkWithLargeValues() { // Arrange - for(var i = 0; i < 100; i++) + for (var i = 0; i < 100; i++) { - queue.Enqueue(null, options.Queues[0], i.ToString()); + queue.Enqueue(null, null, options.Queues[0], i.ToString()); } // Act @@ -128,4 +134,4 @@ public void GetEnqueuedJobIds_WhenJobs_ShouldWorkWithLargeValues() Assert.That(counts, Is.EquivalentTo(new long[] { 58, 59 })); } } -} \ No newline at end of file +} diff --git a/tests/HangFire.Azure.ServiceBusQueue.Tests/TestServiceBusQueueOptions.cs b/tests/HangFire.Azure.ServiceBusQueue.Tests/TestServiceBusQueueOptions.cs index 230c5d3..9f32026 100644 --- a/tests/HangFire.Azure.ServiceBusQueue.Tests/TestServiceBusQueueOptions.cs +++ b/tests/HangFire.Azure.ServiceBusQueue.Tests/TestServiceBusQueueOptions.cs @@ -1,5 +1,7 @@ -using System.Configuration; +using System; +using System.IO; using Hangfire.Azure.ServiceBusQueue; +using Microsoft.Extensions.Configuration; namespace HangFire.Azure.ServiceBusQueue.Tests { @@ -7,10 +9,15 @@ public class TestServiceBusQueueOptions : ServiceBusQueueOptions { public TestServiceBusQueueOptions() { + var configBuilder = new ConfigurationBuilder() + .SetBasePath(Directory.GetCurrentDirectory()) + .AddJsonFile("appsettings.json").Build(); + CheckAndCreateQueues = true; - ConnectionString = ConfigurationManager.AppSettings["BusConnectionString"]; - QueuePrefix = "hf-sb-tests-"; - Queues = new[] {"test1", "test2", "test3"}; + ConnectionString = configBuilder["BusConnectionString"]; + QueuePrefix = "hf-sb-tests-"; + Queues = new[] { "test1", "test2", "test3" }; + QueuePollInterval = TimeSpan.Zero; } } -} \ No newline at end of file +} diff --git a/tests/HangFire.Azure.ServiceBusQueue.Tests/app.config b/tests/HangFire.Azure.ServiceBusQueue.Tests/app.config deleted file mode 100644 index 41e13a0..0000000 --- a/tests/HangFire.Azure.ServiceBusQueue.Tests/app.config +++ /dev/null @@ -1,14 +0,0 @@ - - - - - - - - - - - - - - \ No newline at end of file diff --git a/tests/HangFire.Azure.ServiceBusQueue.Tests/appsettings.json b/tests/HangFire.Azure.ServiceBusQueue.Tests/appsettings.json new file mode 100644 index 0000000..d1b53b0 --- /dev/null +++ b/tests/HangFire.Azure.ServiceBusQueue.Tests/appsettings.json @@ -0,0 +1,3 @@ +{ + "BusConnectionString": "" +} diff --git a/tests/HangFire.Azure.ServiceBusQueue.Tests/packages.config b/tests/HangFire.Azure.ServiceBusQueue.Tests/packages.config deleted file mode 100644 index f9f54cb..0000000 --- a/tests/HangFire.Azure.ServiceBusQueue.Tests/packages.config +++ /dev/null @@ -1,10 +0,0 @@ - - - - - - - - - - \ No newline at end of file