From 4097b7592832844d75051e3d5f961cd16efbf4b5 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Tue, 18 Jul 2023 12:09:25 -0700
Subject: [PATCH 1/3] Fall back to management link when settling non-session
message
---
.../src/Amqp/AmqpReceiver.cs | 60 ++++++++---
.../tests/Receiver/ReceiverLiveTests.cs | 100 ++++++++++++++++++
.../Receiver/SessionReceiverLiveTests.cs | 80 ++++++++++++++
3 files changed, 228 insertions(+), 12 deletions(-)
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs
index 8e53b2959084..003901e9c175 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs
@@ -445,7 +445,7 @@ await receiver.CompleteInternalAsync(
///
///
/// The lockToken of the to complete.
- ///
+ /// The timeout for the operation.
private async Task CompleteInternalAsync(
Guid lockToken,
TimeSpan timeout)
@@ -460,7 +460,7 @@ await DisposeMessageRequestResponseAsync(
SessionId).ConfigureAwait(false);
return;
}
- await DisposeMessageAsync(lockToken, AmqpConstants.AcceptedOutcome, timeout).ConfigureAwait(false);
+ await DisposeMessageAsync(lockToken, AmqpConstants.AcceptedOutcome, DispositionStatus.Completed, timeout).ConfigureAwait(false);
}
///
@@ -468,12 +468,20 @@ await DisposeMessageRequestResponseAsync(
///
///
/// The lockToken of the to complete.
- ///
- ///
+ /// The outcome of the message - used when disposing over receive link.
+ /// The disposition of the message - used when disposing over the management link.
+ /// The timeout for the operation.
+ /// Properties to modify when deadlettering, deferring, or abandoning.
+ /// Dead letter reason. Only valid when deadlettering.
+ /// Dead letter description. Only valid when deadlettering.
private async Task DisposeMessageAsync(
Guid lockToken,
Outcome outcome,
- TimeSpan timeout)
+ DispositionStatus disposition,
+ TimeSpan timeout,
+ IDictionary propertiesToModify = null,
+ string deadLetterReason = null,
+ string deadLetterDescription = null)
{
byte[] bufferForLockToken = ArrayPool.Shared.Rent(SizeOfGuidInBytes);
GuidUtilities.WriteGuidToBuffer(lockToken, bufferForLockToken.AsSpan(0, SizeOfGuidInBytes));
@@ -508,7 +516,21 @@ private async Task DisposeMessageAsync(
{
if (error.Condition.Equals(AmqpErrorCode.NotFound))
{
- ThrowLockLostException();
+ if (_isSessionReceiver)
+ {
+ ThrowLockLostException();
+ }
+
+ // The message was no found on the link which can occur as a result of a reconnect.
+ // Attempt to settle the message over the management link.
+ await DisposeMessageRequestResponseAsync(
+ lockToken,
+ timeout,
+ disposition,
+ propertiesToModify: propertiesToModify,
+ deadLetterReason: deadLetterReason,
+ deadLetterDescription: deadLetterDescription).ConfigureAwait(false);
+ return;
}
throw error.ToMessagingContractException();
@@ -587,7 +609,7 @@ await receiver.DeferInternalAsync(
/// Indicates that the receiver wants to defer the processing for the message.
///
/// The lock token of the .
- ///
+ /// The timeout for the operation.
/// The properties of the message to modify while deferring the message.
///
private Task DeferInternalAsync(
@@ -605,7 +627,12 @@ private Task DeferInternalAsync(
SessionId,
propertiesToModify);
}
- return DisposeMessageAsync(lockToken, GetDeferOutcome(propertiesToModify), timeout);
+ return DisposeMessageAsync(
+ lockToken,
+ GetDeferOutcome(propertiesToModify),
+ DispositionStatus.Defered,
+ timeout,
+ propertiesToModify: propertiesToModify);
}
///
@@ -645,7 +672,7 @@ await receiver.AbandonInternalAsync(
///
///
/// The lock token of the corresponding message to abandon.
- ///
+ /// The timeout for the operation.
/// The properties of the message to modify while abandoning the message.
private Task AbandonInternalAsync(
Guid lockToken,
@@ -662,7 +689,12 @@ private Task AbandonInternalAsync(
SessionId,
propertiesToModify);
}
- return DisposeMessageAsync(lockToken, GetAbandonOutcome(propertiesToModify), timeout);
+ return DisposeMessageAsync(
+ lockToken,
+ GetAbandonOutcome(propertiesToModify),
+ DispositionStatus.Abandoned,
+ timeout,
+ propertiesToModify: propertiesToModify);
}
///
@@ -710,7 +742,7 @@ await receiver.DeadLetterInternalAsync(
///
///
/// The lock token of the corresponding message to dead-letter.
- ///
+ /// The timeout for the operation.
/// The properties of the message to modify while moving to subqueue.
/// The reason for dead-lettering the message.
/// The error description for dead-lettering the message.
@@ -740,7 +772,11 @@ internal virtual Task DeadLetterInternalAsync(
return DisposeMessageAsync(
lockToken,
GetRejectedOutcome(propertiesToModify, deadLetterReason, deadLetterErrorDescription),
- timeout);
+ DispositionStatus.Suspended,
+ timeout,
+ propertiesToModify,
+ deadLetterReason,
+ deadLetterErrorDescription);
}
private static Rejected GetRejectedOutcome(
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs
index ac0b7f27383c..936e3892a26d 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs
@@ -176,6 +176,106 @@ public async Task PeekSingleMessage()
}
}
+ [Test]
+ public async Task CanRenewWithSeparateReceiver()
+ {
+ await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
+ {
+ await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
+ ServiceBusSender sender = client.CreateSender(scope.QueueName);
+ await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage());
+ var receiver1 = client.CreateReceiver(scope.QueueName);
+ var message1 = await receiver1.ReceiveMessageAsync();
+ await receiver1.RenewMessageLockAsync(message1);
+
+ var receiver2 = client.CreateReceiver(scope.QueueName);
+ await receiver2.RenewMessageLockAsync(message1);
+ await receiver2.CompleteMessageAsync(message1);
+ }
+ }
+
+ [Test]
+ public async Task CanCompleteAfterLinkReconnect()
+ {
+ await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
+ {
+ await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
+ var sender = client.CreateSender(scope.QueueName);
+ var receiver = client.CreateReceiver(scope.QueueName);
+ await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage());
+
+ var message = await receiver.ReceiveMessageAsync();
+
+ SimulateNetworkFailure(client);
+
+ await receiver.CompleteMessageAsync(message);
+ }
+ }
+
+ [Test]
+ public async Task CanAbandonAfterLinkReconnect()
+ {
+ await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
+ {
+ await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
+ var sender = client.CreateSender(scope.QueueName);
+ var receiver = client.CreateReceiver(scope.QueueName);
+ await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage());
+
+ var message = await receiver.ReceiveMessageAsync();
+
+ SimulateNetworkFailure(client);
+
+ await receiver.AbandonMessageAsync(message, new Dictionary{{ "test key", "test value" }});
+ message = await receiver.ReceiveMessageAsync();
+ Assert.AreEqual("test value", message.ApplicationProperties["test key"]);
+ }
+ }
+
+ [Test]
+ public async Task CanDeferAfterLinkReconnect()
+ {
+ await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
+ {
+ await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
+ var sender = client.CreateSender(scope.QueueName);
+ var receiver = client.CreateReceiver(scope.QueueName);
+ await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage());
+
+ var message = await receiver.ReceiveMessageAsync();
+
+ SimulateNetworkFailure(client);
+
+ await receiver.DeferMessageAsync(message, new Dictionary{{ "test key", "test value" }});
+ message = await receiver.ReceiveDeferredMessageAsync(message.SequenceNumber);
+ Assert.AreEqual("test value", message.ApplicationProperties["test key"]);
+ }
+ }
+
+ [Test]
+ public async Task CanDeadLetterAfterLinkReconnect()
+ {
+ await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
+ {
+ await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
+ var sender = client.CreateSender(scope.QueueName);
+ var receiver = client.CreateReceiver(scope.QueueName);
+ await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage());
+
+ var message = await receiver.ReceiveMessageAsync();
+
+ SimulateNetworkFailure(client);
+
+ await receiver.DeadLetterMessageAsync(message, new Dictionary{{ "test key", "test value" }}, "test reason", "test description");
+
+ var dlqReceiver = client.CreateReceiver(scope.QueueName, new ServiceBusReceiverOptions { SubQueue = SubQueue.DeadLetter });
+ var dlqMessage = await dlqReceiver.ReceiveMessageAsync();
+ Assert.AreEqual("test reason", dlqMessage.DeadLetterReason);
+ Assert.AreEqual("test description", dlqMessage.DeadLetterErrorDescription);
+ Assert.AreEqual("test value", dlqMessage.ApplicationProperties["test key"]);
+ }
+ }
+
[Test]
public async Task PeekMessagesWithACustomIdentifier()
{
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs
index e833517bf49a..667a64ae676a 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs
@@ -1064,5 +1064,85 @@ public async Task OpenSessionIsNotClosedWhenAcceptNextSessionTimesOut(bool enabl
Assert.IsNotNull(message);
}
}
+
+ [Test]
+ public async Task CannotCompleteAfterLinkReconnect()
+ {
+ await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true))
+ {
+ await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
+ var sender = client.CreateSender(scope.QueueName);
+ await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage("session"));
+ var receiver = await client.AcceptNextSessionAsync(scope.QueueName);
+
+ var message = await receiver.ReceiveMessageAsync();
+
+ SimulateNetworkFailure(client);
+ Assert.That(
+ async () => await receiver.CompleteMessageAsync(message),
+ Throws.InstanceOf().And.Property(nameof(ServiceBusException.Reason))
+ .EqualTo(ServiceBusFailureReason.SessionLockLost));
+ }
+ }
+
+ [Test]
+ public async Task CanAbandonAfterLinkReconnect()
+ {
+ await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true))
+ {
+ await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
+ var sender = client.CreateSender(scope.QueueName);
+ await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage("session"));
+ var receiver = await client.AcceptNextSessionAsync(scope.QueueName);
+
+ var message = await receiver.ReceiveMessageAsync();
+
+ SimulateNetworkFailure(client);
+ Assert.That(
+ async () => await receiver.AbandonMessageAsync(message),
+ Throws.InstanceOf().And.Property(nameof(ServiceBusException.Reason))
+ .EqualTo(ServiceBusFailureReason.SessionLockLost));
+ }
+ }
+
+ [Test]
+ public async Task CannotDeferAfterLinkReconnect()
+ {
+ await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true))
+ {
+ await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
+ var sender = client.CreateSender(scope.QueueName);
+ await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage("session"));
+ var receiver = await client.AcceptNextSessionAsync(scope.QueueName);
+
+ var message = await receiver.ReceiveMessageAsync();
+
+ SimulateNetworkFailure(client);
+ Assert.That(
+ async () => await receiver.DeferMessageAsync(message),
+ Throws.InstanceOf().And.Property(nameof(ServiceBusException.Reason))
+ .EqualTo(ServiceBusFailureReason.SessionLockLost));
+ }
+ }
+
+ [Test]
+ public async Task CannotDeadLetterAfterLinkReconnect()
+ {
+ await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true))
+ {
+ await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
+ var sender = client.CreateSender(scope.QueueName);
+ await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage("session"));
+ var receiver = await client.AcceptNextSessionAsync(scope.QueueName);
+
+ var message = await receiver.ReceiveMessageAsync();
+
+ SimulateNetworkFailure(client);
+ Assert.That(
+ async () => await receiver.DeadLetterMessageAsync(message),
+ Throws.InstanceOf().And.Property(nameof(ServiceBusException.Reason))
+ .EqualTo(ServiceBusFailureReason.SessionLockLost));
+ }
+ }
}
}
From f898770513030130a24172aa801468b398b1f5d3 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Tue, 18 Jul 2023 12:11:37 -0700
Subject: [PATCH 2/3] fix typo
---
.../Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs
index 003901e9c175..e31ea3b52ed6 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs
@@ -521,7 +521,7 @@ private async Task DisposeMessageAsync(
ThrowLockLostException();
}
- // The message was no found on the link which can occur as a result of a reconnect.
+ // The message was not found on the link which can occur as a result of a reconnect.
// Attempt to settle the message over the management link.
await DisposeMessageRequestResponseAsync(
lockToken,
From 1ca3ab25580631dfd8a458082e8a6ec9e9bad31b Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Tue, 18 Jul 2023 12:27:18 -0700
Subject: [PATCH 3/3] fix test
---
.../tests/Transactions/TransactionLiveTests.cs | 9 +--------
1 file changed, 1 insertion(+), 8 deletions(-)
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Transactions/TransactionLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Transactions/TransactionLiveTests.cs
index 5d47dec5c3c4..840822c5aad9 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Transactions/TransactionLiveTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Transactions/TransactionLiveTests.cs
@@ -297,14 +297,7 @@ public async Task TransactionThrowsWhenOperationsOfDifferentPartitionsAreInSameT
await receiver.CompleteMessageAsync(receivedMessage1);
- // the service seems to abandon the message that
- // triggered the InvalidOperationException
- // in the transaction
- Assert.That(
- async () =>
- await receiver.CompleteMessageAsync(receivedMessage2), Throws.InstanceOf()
- .And.Property(nameof(ServiceBusException.Reason))
- .EqualTo(ServiceBusFailureReason.MessageLockLost));
+ await receiver.CompleteMessageAsync(receivedMessage2);
}
}