Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

InMemory gateway deduplication can cause message loss #5556

Merged
merged 2 commits into from
Jan 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
namespace NServiceBus.Persistence.InMemory.Tests
{
using NUnit.Framework;

[TestFixture]
class ClientIdStorageTests
{
[Test]
public void Should_detect_duplicates()
{
var clientIdStorage = new ClientIdStorage(10);

clientIdStorage.RegisterClientId("A");
clientIdStorage.RegisterClientId("B");

Assert.True(clientIdStorage.IsDuplicate("A"));
Assert.False(clientIdStorage.IsDuplicate("C"));
}

[Test]
public void Should_evict_oldest_entry_when_LRU_reaches_limit()
{
var clientIdStorage = new ClientIdStorage(2);

clientIdStorage.RegisterClientId("A");
clientIdStorage.RegisterClientId("B");
clientIdStorage.RegisterClientId("C");

Assert.False(clientIdStorage.IsDuplicate("A"));
}

[Test]
public void Should_reset_time_added_for_existing_IDs_when_checked()
{
var clientIdStorage = new ClientIdStorage(2);

clientIdStorage.RegisterClientId("A");
clientIdStorage.RegisterClientId("B");

Assert.True(clientIdStorage.IsDuplicate("A"));

clientIdStorage.RegisterClientId("C");

Assert.False(clientIdStorage.IsDuplicate("B"));
Assert.True(clientIdStorage.IsDuplicate("A"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,59 @@
{
using System;
using System.Threading.Tasks;
using System.Transactions;
using Extensibility;
using NUnit.Framework;

[TestFixture]
class InMemoryGatewayDeduplicationTests
{
[Test]
public async Task Should_return_true_on_first_unique_test()
public async Task Should_add_on_transaction_scope_commit()
{
var storage = new InMemoryGatewayDeduplication(2);
var storage = CreateInMemoryGatewayDeduplication();

await storage.DeduplicateMessage("A", DateTime.UtcNow, new ContextBag());
Assert.True(await storage.DeduplicateMessage("B", DateTime.UtcNow, new ContextBag()));
}
using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled))
{
await storage.DeduplicateMessage("A", DateTime.UtcNow, new ContextBag());

scope.Complete();
}

Assert.False(await storage.DeduplicateMessage("A", DateTime.UtcNow, new ContextBag()));
}

[Test]
public async Task Should_return_false_on_second_test()
public async Task Should_not_add_on_transaction_scope_abort()
{
var storage = new InMemoryGatewayDeduplication(2);
var storage = CreateInMemoryGatewayDeduplication();

using (new TransactionScope(TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled))
{
await storage.DeduplicateMessage("A", DateTime.UtcNow, new ContextBag());

// no commit
}

Assert.True(await storage.DeduplicateMessage("A", DateTime.UtcNow, new ContextBag()));
}

await storage.DeduplicateMessage("A", DateTime.UtcNow, new ContextBag());
Assert.False(await storage.DeduplicateMessage("A", DateTime.UtcNow, new ContextBag()));
}

[Test]
public async Task Should_return_true_if_LRU_reaches_limit()
// With the gateway persistence seam v1 design it's only safe to deduplicate when there is a tx scope present since the check happens before
// the messages have been pushed to the transport. If we add entries earlier they would be considered duplicate when retrying after something went wrong.
// Note: The gateway will always wrap the v1 seam invocation in a transaction scope
public async Task Should_only_deduplicate_when_scope_is_present()
{
var storage = new InMemoryGatewayDeduplication(2);
var storage = CreateInMemoryGatewayDeduplication();

await storage.DeduplicateMessage("A", DateTime.UtcNow, new ContextBag());
await storage.DeduplicateMessage("B", DateTime.UtcNow, new ContextBag());
await storage.DeduplicateMessage("C", DateTime.UtcNow, new ContextBag());

Assert.True(await storage.DeduplicateMessage("A", DateTime.UtcNow, new ContextBag()));
}

InMemoryGatewayDeduplication CreateInMemoryGatewayDeduplication()
{
return new InMemoryGatewayDeduplication(new ClientIdStorage(10));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
namespace NServiceBus
{
using System.Collections.Generic;

class ClientIdStorage
{
public ClientIdStorage(int maxSize)
{
this.maxSize = maxSize;
}

public bool IsDuplicate(string clientId)
{
lock (clientIdSet)
{
// This implementation is not 100% consistent since there is a race condition here where another thread might consider the same message as a non-duplicate.
// We consider this good enough since this is the inmemory persister which will not be consistent when the endpoint is scaled out anyway.
if (clientIdSet.TryGetValue(clientId, out var existingNode)) // O(1)
{
clientIdList.Remove(existingNode); // O(1) operation, because we got the node reference
clientIdList.AddLast(existingNode); // O(1) operation

return true;
}
}

return false;
}

public void RegisterClientId(string clientId)
{
lock (clientIdSet)
{
//another thread might already have added the ID since we checked the last time
if (clientIdSet.ContainsKey(clientId))
{
// another thread has proceed this ID already and there is a potential duplicate message but there is nothing we can do about it at this stage so just return.
// Throwing would just cause unessessary retries for the client.
return;
}

if (clientIdSet.Count == maxSize)
{
var id = clientIdList.First.Value;
clientIdSet.Remove(id); // O(1)
clientIdList.RemoveFirst(); // O(1)
}

var node = clientIdList.AddLast(clientId); // O(1)
clientIdSet.Add(clientId, node); // O(1)
}
}

readonly int maxSize;
readonly LinkedList<string> clientIdList = new LinkedList<string>();
readonly Dictionary<string, LinkedListNode<string>> clientIdSet = new Dictionary<string, LinkedListNode<string>>();
}
}
Original file line number Diff line number Diff line change
@@ -1,48 +1,76 @@
namespace NServiceBus
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Transactions;
using Extensibility;
using Gateway.Deduplication;

class InMemoryGatewayDeduplication : IDeduplicateMessages
{
public InMemoryGatewayDeduplication(int maxSize)
public InMemoryGatewayDeduplication(ClientIdStorage clientIdStorage)
{
this.maxSize = maxSize;
this.clientIdStorage = clientIdStorage;
}

public Task<bool> DeduplicateMessage(string clientId, DateTime timeReceived, ContextBag context)
{
lock (clientIdSet)
if (clientIdStorage.IsDuplicate(clientId))
{
// Return FALSE if item EXISTS, TRUE if ADDED
if (clientIdSet.TryGetValue(clientId, out var existingNode)) // O(1)
return TaskEx.FalseTask;
}

// The design of the v1 gateway seam will only allow deduplicating scope commits.
// This is fine since the gateway will always wrap the call in a transaction scope so this should always be true
if (Transaction.Current != null)
{
Transaction.Current.EnlistVolatile(new EnlistmentNotification(clientIdStorage, clientId), EnlistmentOptions.None);
}

return TaskEx.TrueTask;
}

readonly ClientIdStorage clientIdStorage;

class EnlistmentNotification : IEnlistmentNotification
{
public EnlistmentNotification(ClientIdStorage clientIdStorage, string clientId)
{
this.clientIdStorage = clientIdStorage;
this.clientId = clientId;
}

public void Prepare(PreparingEnlistment preparingEnlistment)
{
try
{
clientIdList.Remove(existingNode); // O(1) operation, because we got the node reference
clientIdList.AddLast(existingNode); // O(1) operation
return TaskEx.FalseTask;
preparingEnlistment.Prepared();
}
else
catch (Exception ex)
{
if (clientIdSet.Count == maxSize)
{
var id = clientIdList.First.Value;
clientIdSet.Remove(id); // O(1)
clientIdList.RemoveFirst(); // O(1)
}
preparingEnlistment.ForceRollback(ex);
}
}

var node = clientIdList.AddLast(clientId); // O(1)
clientIdSet.Add(clientId, node); // O(1)
public void Commit(Enlistment enlistment)
{
clientIdStorage.RegisterClientId(clientId);

return TaskEx.TrueTask;
}
enlistment.Done();
}

public void Rollback(Enlistment enlistment)
{
enlistment.Done();
}

public void InDoubt(Enlistment enlistment)
{
enlistment.Done();
}
}

readonly int maxSize;
readonly LinkedList<string> clientIdList = new LinkedList<string>();
readonly Dictionary<string, LinkedListNode<string>> clientIdSet = new Dictionary<string, LinkedListNode<string>>();
readonly ClientIdStorage clientIdStorage;
readonly string clientId;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ internal InMemoryGatewayPersistence()
protected internal override void Setup(FeatureConfigurationContext context)
{
var maxSize = context.Settings.Get<int>(MaxSizeKey);
context.Container.RegisterSingleton<IDeduplicateMessages>(new InMemoryGatewayDeduplication(maxSize));
context.Container.RegisterSingleton<IDeduplicateMessages>(new InMemoryGatewayDeduplication(new ClientIdStorage(maxSize)));
}

const string MaxSizeKey = "InMemoryGatewayDeduplication.MaxSize";
internal const string MaxSizeKey = "InMemoryGatewayDeduplication.MaxSize";
const int MaxSizeDefault = 10000;
}
}