diff --git a/src/NServiceBus.Core.Tests/Persistence/InMemory/ClientIdStorageTests.cs b/src/NServiceBus.Core.Tests/Persistence/InMemory/ClientIdStorageTests.cs new file mode 100644 index 00000000000..33fa26ca4f4 --- /dev/null +++ b/src/NServiceBus.Core.Tests/Persistence/InMemory/ClientIdStorageTests.cs @@ -0,0 +1,48 @@ +namespace NServiceBus.Persistence.InMemory.Tests +{ + using NUnit.Framework; + + [TestFixture] + class ClientIdStorageTests + { + [Test] + public void Should_detect_duplicates() + { + var clientIdStorage = new ClientIdStorage(10); + + clientIdStorage.RegisterClientId("A"); + clientIdStorage.RegisterClientId("B"); + + Assert.True(clientIdStorage.IsDuplicate("A")); + Assert.False(clientIdStorage.IsDuplicate("C")); + } + + [Test] + public void Should_evict_oldest_entry_when_LRU_reaches_limit() + { + var clientIdStorage = new ClientIdStorage(2); + + clientIdStorage.RegisterClientId("A"); + clientIdStorage.RegisterClientId("B"); + clientIdStorage.RegisterClientId("C"); + + Assert.False(clientIdStorage.IsDuplicate("A")); + } + + [Test] + public void Should_reset_time_added_for_existing_IDs_when_checked() + { + var clientIdStorage = new ClientIdStorage(2); + + clientIdStorage.RegisterClientId("A"); + clientIdStorage.RegisterClientId("B"); + + Assert.True(clientIdStorage.IsDuplicate("A")); + + clientIdStorage.RegisterClientId("C"); + + Assert.False(clientIdStorage.IsDuplicate("B")); + Assert.True(clientIdStorage.IsDuplicate("A")); + } + } +} diff --git a/src/NServiceBus.Core.Tests/Persistence/InMemory/InMemoryGatewayDeduplicationTests.cs b/src/NServiceBus.Core.Tests/Persistence/InMemory/InMemoryGatewayDeduplicationTests.cs index 13a4a9835c3..6860433d930 100644 --- a/src/NServiceBus.Core.Tests/Persistence/InMemory/InMemoryGatewayDeduplicationTests.cs +++ b/src/NServiceBus.Core.Tests/Persistence/InMemory/InMemoryGatewayDeduplicationTests.cs @@ -2,6 +2,7 @@ { using System; using System.Threading.Tasks; + using System.Transactions; using Extensibility; using NUnit.Framework; @@ -9,32 +10,51 @@ class InMemoryGatewayDeduplicationTests { [Test] - public async Task Should_return_true_on_first_unique_test() + public async Task Should_add_on_transaction_scope_commit() { - var storage = new InMemoryGatewayDeduplication(2); + var storage = CreateInMemoryGatewayDeduplication(); - await storage.DeduplicateMessage("A", DateTime.UtcNow, new ContextBag()); - Assert.True(await storage.DeduplicateMessage("B", DateTime.UtcNow, new ContextBag())); - } + using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled)) + { + await storage.DeduplicateMessage("A", DateTime.UtcNow, new ContextBag()); + + scope.Complete(); + } + + Assert.False(await storage.DeduplicateMessage("A", DateTime.UtcNow, new ContextBag())); + } [Test] - public async Task Should_return_false_on_second_test() + public async Task Should_not_add_on_transaction_scope_abort() { - var storage = new InMemoryGatewayDeduplication(2); + var storage = CreateInMemoryGatewayDeduplication(); + + using (new TransactionScope(TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled)) + { + await storage.DeduplicateMessage("A", DateTime.UtcNow, new ContextBag()); + + // no commit + } + + Assert.True(await storage.DeduplicateMessage("A", DateTime.UtcNow, new ContextBag())); + } - await storage.DeduplicateMessage("A", DateTime.UtcNow, new ContextBag()); - Assert.False(await storage.DeduplicateMessage("A", DateTime.UtcNow, new ContextBag())); - } - [Test] - public async Task Should_return_true_if_LRU_reaches_limit() + // With the gateway persistence seam v1 design it's only safe to deduplicate when there is a tx scope present since the check happens before + // the messages have been pushed to the transport. If we add entries earlier they would be considered duplicate when retrying after something went wrong. + // Note: The gateway will always wrap the v1 seam invocation in a transaction scope + public async Task Should_only_deduplicate_when_scope_is_present() { - var storage = new InMemoryGatewayDeduplication(2); + var storage = CreateInMemoryGatewayDeduplication(); await storage.DeduplicateMessage("A", DateTime.UtcNow, new ContextBag()); - await storage.DeduplicateMessage("B", DateTime.UtcNow, new ContextBag()); - await storage.DeduplicateMessage("C", DateTime.UtcNow, new ContextBag()); + Assert.True(await storage.DeduplicateMessage("A", DateTime.UtcNow, new ContextBag())); } + + InMemoryGatewayDeduplication CreateInMemoryGatewayDeduplication() + { + return new InMemoryGatewayDeduplication(new ClientIdStorage(10)); + } } -} \ No newline at end of file +} diff --git a/src/NServiceBus.Core/Persistence/InMemory/Gateway/ClientIdStorage.cs b/src/NServiceBus.Core/Persistence/InMemory/Gateway/ClientIdStorage.cs new file mode 100644 index 00000000000..ec041b8ec32 --- /dev/null +++ b/src/NServiceBus.Core/Persistence/InMemory/Gateway/ClientIdStorage.cs @@ -0,0 +1,58 @@ +namespace NServiceBus +{ + using System.Collections.Generic; + + class ClientIdStorage + { + public ClientIdStorage(int maxSize) + { + this.maxSize = maxSize; + } + + public bool IsDuplicate(string clientId) + { + lock (clientIdSet) + { + // This implementation is not 100% consistent since there is a race condition here where another thread might consider the same message as a non-duplicate. + // We consider this good enough since this is the inmemory persister which will not be consistent when the endpoint is scaled out anyway. + if (clientIdSet.TryGetValue(clientId, out var existingNode)) // O(1) + { + clientIdList.Remove(existingNode); // O(1) operation, because we got the node reference + clientIdList.AddLast(existingNode); // O(1) operation + + return true; + } + } + + return false; + } + + public void RegisterClientId(string clientId) + { + lock (clientIdSet) + { + //another thread might already have added the ID since we checked the last time + if (clientIdSet.ContainsKey(clientId)) + { + // another thread has proceed this ID already and there is a potential duplicate message but there is nothing we can do about it at this stage so just return. + // Throwing would just cause unessessary retries for the client. + return; + } + + if (clientIdSet.Count == maxSize) + { + var id = clientIdList.First.Value; + clientIdSet.Remove(id); // O(1) + clientIdList.RemoveFirst(); // O(1) + } + + var node = clientIdList.AddLast(clientId); // O(1) + clientIdSet.Add(clientId, node); // O(1) + } + } + + readonly int maxSize; + readonly LinkedList clientIdList = new LinkedList(); + readonly Dictionary> clientIdSet = new Dictionary>(); + } +} diff --git a/src/NServiceBus.Core/Persistence/InMemory/Gateway/InMemoryGatewayDeduplication.cs b/src/NServiceBus.Core/Persistence/InMemory/Gateway/InMemoryGatewayDeduplication.cs index aa6ea40059b..f1e3e722511 100644 --- a/src/NServiceBus.Core/Persistence/InMemory/Gateway/InMemoryGatewayDeduplication.cs +++ b/src/NServiceBus.Core/Persistence/InMemory/Gateway/InMemoryGatewayDeduplication.cs @@ -1,48 +1,76 @@ namespace NServiceBus { using System; - using System.Collections.Generic; using System.Threading.Tasks; + using System.Transactions; using Extensibility; using Gateway.Deduplication; class InMemoryGatewayDeduplication : IDeduplicateMessages { - public InMemoryGatewayDeduplication(int maxSize) + public InMemoryGatewayDeduplication(ClientIdStorage clientIdStorage) { - this.maxSize = maxSize; + this.clientIdStorage = clientIdStorage; } public Task DeduplicateMessage(string clientId, DateTime timeReceived, ContextBag context) { - lock (clientIdSet) + if (clientIdStorage.IsDuplicate(clientId)) { - // Return FALSE if item EXISTS, TRUE if ADDED - if (clientIdSet.TryGetValue(clientId, out var existingNode)) // O(1) + return TaskEx.FalseTask; + } + + // The design of the v1 gateway seam will only allow deduplicating scope commits. + // This is fine since the gateway will always wrap the call in a transaction scope so this should always be true + if (Transaction.Current != null) + { + Transaction.Current.EnlistVolatile(new EnlistmentNotification(clientIdStorage, clientId), EnlistmentOptions.None); + } + + return TaskEx.TrueTask; + } + + readonly ClientIdStorage clientIdStorage; + + class EnlistmentNotification : IEnlistmentNotification + { + public EnlistmentNotification(ClientIdStorage clientIdStorage, string clientId) + { + this.clientIdStorage = clientIdStorage; + this.clientId = clientId; + } + + public void Prepare(PreparingEnlistment preparingEnlistment) + { + try { - clientIdList.Remove(existingNode); // O(1) operation, because we got the node reference - clientIdList.AddLast(existingNode); // O(1) operation - return TaskEx.FalseTask; + preparingEnlistment.Prepared(); } - else + catch (Exception ex) { - if (clientIdSet.Count == maxSize) - { - var id = clientIdList.First.Value; - clientIdSet.Remove(id); // O(1) - clientIdList.RemoveFirst(); // O(1) - } + preparingEnlistment.ForceRollback(ex); + } + } - var node = clientIdList.AddLast(clientId); // O(1) - clientIdSet.Add(clientId, node); // O(1) + public void Commit(Enlistment enlistment) + { + clientIdStorage.RegisterClientId(clientId); - return TaskEx.TrueTask; - } + enlistment.Done(); + } + + public void Rollback(Enlistment enlistment) + { + enlistment.Done(); + } + + public void InDoubt(Enlistment enlistment) + { + enlistment.Done(); } - } - readonly int maxSize; - readonly LinkedList clientIdList = new LinkedList(); - readonly Dictionary> clientIdSet = new Dictionary>(); + readonly ClientIdStorage clientIdStorage; + readonly string clientId; + } } } \ No newline at end of file diff --git a/src/NServiceBus.Core/Persistence/InMemory/Gateway/InMemoryGatewayPersistence.cs b/src/NServiceBus.Core/Persistence/InMemory/Gateway/InMemoryGatewayPersistence.cs index bdaa35c0550..c8d0b85a770 100644 --- a/src/NServiceBus.Core/Persistence/InMemory/Gateway/InMemoryGatewayPersistence.cs +++ b/src/NServiceBus.Core/Persistence/InMemory/Gateway/InMemoryGatewayPersistence.cs @@ -22,10 +22,10 @@ internal InMemoryGatewayPersistence() protected internal override void Setup(FeatureConfigurationContext context) { var maxSize = context.Settings.Get(MaxSizeKey); - context.Container.RegisterSingleton(new InMemoryGatewayDeduplication(maxSize)); + context.Container.RegisterSingleton(new InMemoryGatewayDeduplication(new ClientIdStorage(maxSize))); } - const string MaxSizeKey = "InMemoryGatewayDeduplication.MaxSize"; + internal const string MaxSizeKey = "InMemoryGatewayDeduplication.MaxSize"; const int MaxSizeDefault = 10000; } } \ No newline at end of file