diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs index 8e53b2959084..e31ea3b52ed6 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs @@ -445,7 +445,7 @@ await receiver.CompleteInternalAsync( /// /// /// The lockToken of the to complete. - /// + /// The timeout for the operation. private async Task CompleteInternalAsync( Guid lockToken, TimeSpan timeout) @@ -460,7 +460,7 @@ await DisposeMessageRequestResponseAsync( SessionId).ConfigureAwait(false); return; } - await DisposeMessageAsync(lockToken, AmqpConstants.AcceptedOutcome, timeout).ConfigureAwait(false); + await DisposeMessageAsync(lockToken, AmqpConstants.AcceptedOutcome, DispositionStatus.Completed, timeout).ConfigureAwait(false); } /// @@ -468,12 +468,20 @@ await DisposeMessageRequestResponseAsync( /// /// /// The lockToken of the to complete. - /// - /// + /// The outcome of the message - used when disposing over receive link. + /// The disposition of the message - used when disposing over the management link. + /// The timeout for the operation. + /// Properties to modify when deadlettering, deferring, or abandoning. + /// Dead letter reason. Only valid when deadlettering. + /// Dead letter description. Only valid when deadlettering. private async Task DisposeMessageAsync( Guid lockToken, Outcome outcome, - TimeSpan timeout) + DispositionStatus disposition, + TimeSpan timeout, + IDictionary propertiesToModify = null, + string deadLetterReason = null, + string deadLetterDescription = null) { byte[] bufferForLockToken = ArrayPool.Shared.Rent(SizeOfGuidInBytes); GuidUtilities.WriteGuidToBuffer(lockToken, bufferForLockToken.AsSpan(0, SizeOfGuidInBytes)); @@ -508,7 +516,21 @@ private async Task DisposeMessageAsync( { if (error.Condition.Equals(AmqpErrorCode.NotFound)) { - ThrowLockLostException(); + if (_isSessionReceiver) + { + ThrowLockLostException(); + } + + // The message was not found on the link which can occur as a result of a reconnect. + // Attempt to settle the message over the management link. + await DisposeMessageRequestResponseAsync( + lockToken, + timeout, + disposition, + propertiesToModify: propertiesToModify, + deadLetterReason: deadLetterReason, + deadLetterDescription: deadLetterDescription).ConfigureAwait(false); + return; } throw error.ToMessagingContractException(); @@ -587,7 +609,7 @@ await receiver.DeferInternalAsync( /// Indicates that the receiver wants to defer the processing for the message. /// /// The lock token of the . - /// + /// The timeout for the operation. /// The properties of the message to modify while deferring the message. /// private Task DeferInternalAsync( @@ -605,7 +627,12 @@ private Task DeferInternalAsync( SessionId, propertiesToModify); } - return DisposeMessageAsync(lockToken, GetDeferOutcome(propertiesToModify), timeout); + return DisposeMessageAsync( + lockToken, + GetDeferOutcome(propertiesToModify), + DispositionStatus.Defered, + timeout, + propertiesToModify: propertiesToModify); } /// @@ -645,7 +672,7 @@ await receiver.AbandonInternalAsync( /// /// /// The lock token of the corresponding message to abandon. - /// + /// The timeout for the operation. /// The properties of the message to modify while abandoning the message. private Task AbandonInternalAsync( Guid lockToken, @@ -662,7 +689,12 @@ private Task AbandonInternalAsync( SessionId, propertiesToModify); } - return DisposeMessageAsync(lockToken, GetAbandonOutcome(propertiesToModify), timeout); + return DisposeMessageAsync( + lockToken, + GetAbandonOutcome(propertiesToModify), + DispositionStatus.Abandoned, + timeout, + propertiesToModify: propertiesToModify); } /// @@ -710,7 +742,7 @@ await receiver.DeadLetterInternalAsync( /// /// /// The lock token of the corresponding message to dead-letter. - /// + /// The timeout for the operation. /// The properties of the message to modify while moving to subqueue. /// The reason for dead-lettering the message. /// The error description for dead-lettering the message. @@ -740,7 +772,11 @@ internal virtual Task DeadLetterInternalAsync( return DisposeMessageAsync( lockToken, GetRejectedOutcome(propertiesToModify, deadLetterReason, deadLetterErrorDescription), - timeout); + DispositionStatus.Suspended, + timeout, + propertiesToModify, + deadLetterReason, + deadLetterErrorDescription); } private static Rejected GetRejectedOutcome( diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs index ac0b7f27383c..936e3892a26d 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs @@ -176,6 +176,106 @@ public async Task PeekSingleMessage() } } + [Test] + public async Task CanRenewWithSeparateReceiver() + { + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) + { + await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); + ServiceBusSender sender = client.CreateSender(scope.QueueName); + await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage()); + var receiver1 = client.CreateReceiver(scope.QueueName); + var message1 = await receiver1.ReceiveMessageAsync(); + await receiver1.RenewMessageLockAsync(message1); + + var receiver2 = client.CreateReceiver(scope.QueueName); + await receiver2.RenewMessageLockAsync(message1); + await receiver2.CompleteMessageAsync(message1); + } + } + + [Test] + public async Task CanCompleteAfterLinkReconnect() + { + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) + { + await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); + var sender = client.CreateSender(scope.QueueName); + var receiver = client.CreateReceiver(scope.QueueName); + await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage()); + + var message = await receiver.ReceiveMessageAsync(); + + SimulateNetworkFailure(client); + + await receiver.CompleteMessageAsync(message); + } + } + + [Test] + public async Task CanAbandonAfterLinkReconnect() + { + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) + { + await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); + var sender = client.CreateSender(scope.QueueName); + var receiver = client.CreateReceiver(scope.QueueName); + await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage()); + + var message = await receiver.ReceiveMessageAsync(); + + SimulateNetworkFailure(client); + + await receiver.AbandonMessageAsync(message, new Dictionary{{ "test key", "test value" }}); + message = await receiver.ReceiveMessageAsync(); + Assert.AreEqual("test value", message.ApplicationProperties["test key"]); + } + } + + [Test] + public async Task CanDeferAfterLinkReconnect() + { + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) + { + await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); + var sender = client.CreateSender(scope.QueueName); + var receiver = client.CreateReceiver(scope.QueueName); + await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage()); + + var message = await receiver.ReceiveMessageAsync(); + + SimulateNetworkFailure(client); + + await receiver.DeferMessageAsync(message, new Dictionary{{ "test key", "test value" }}); + message = await receiver.ReceiveDeferredMessageAsync(message.SequenceNumber); + Assert.AreEqual("test value", message.ApplicationProperties["test key"]); + } + } + + [Test] + public async Task CanDeadLetterAfterLinkReconnect() + { + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) + { + await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); + var sender = client.CreateSender(scope.QueueName); + var receiver = client.CreateReceiver(scope.QueueName); + await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage()); + + var message = await receiver.ReceiveMessageAsync(); + + SimulateNetworkFailure(client); + + await receiver.DeadLetterMessageAsync(message, new Dictionary{{ "test key", "test value" }}, "test reason", "test description"); + + var dlqReceiver = client.CreateReceiver(scope.QueueName, new ServiceBusReceiverOptions { SubQueue = SubQueue.DeadLetter }); + var dlqMessage = await dlqReceiver.ReceiveMessageAsync(); + Assert.AreEqual("test reason", dlqMessage.DeadLetterReason); + Assert.AreEqual("test description", dlqMessage.DeadLetterErrorDescription); + Assert.AreEqual("test value", dlqMessage.ApplicationProperties["test key"]); + } + } + [Test] public async Task PeekMessagesWithACustomIdentifier() { diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs index e833517bf49a..667a64ae676a 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs @@ -1064,5 +1064,85 @@ public async Task OpenSessionIsNotClosedWhenAcceptNextSessionTimesOut(bool enabl Assert.IsNotNull(message); } } + + [Test] + public async Task CannotCompleteAfterLinkReconnect() + { + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true)) + { + await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); + var sender = client.CreateSender(scope.QueueName); + await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage("session")); + var receiver = await client.AcceptNextSessionAsync(scope.QueueName); + + var message = await receiver.ReceiveMessageAsync(); + + SimulateNetworkFailure(client); + Assert.That( + async () => await receiver.CompleteMessageAsync(message), + Throws.InstanceOf().And.Property(nameof(ServiceBusException.Reason)) + .EqualTo(ServiceBusFailureReason.SessionLockLost)); + } + } + + [Test] + public async Task CanAbandonAfterLinkReconnect() + { + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true)) + { + await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); + var sender = client.CreateSender(scope.QueueName); + await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage("session")); + var receiver = await client.AcceptNextSessionAsync(scope.QueueName); + + var message = await receiver.ReceiveMessageAsync(); + + SimulateNetworkFailure(client); + Assert.That( + async () => await receiver.AbandonMessageAsync(message), + Throws.InstanceOf().And.Property(nameof(ServiceBusException.Reason)) + .EqualTo(ServiceBusFailureReason.SessionLockLost)); + } + } + + [Test] + public async Task CannotDeferAfterLinkReconnect() + { + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true)) + { + await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); + var sender = client.CreateSender(scope.QueueName); + await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage("session")); + var receiver = await client.AcceptNextSessionAsync(scope.QueueName); + + var message = await receiver.ReceiveMessageAsync(); + + SimulateNetworkFailure(client); + Assert.That( + async () => await receiver.DeferMessageAsync(message), + Throws.InstanceOf().And.Property(nameof(ServiceBusException.Reason)) + .EqualTo(ServiceBusFailureReason.SessionLockLost)); + } + } + + [Test] + public async Task CannotDeadLetterAfterLinkReconnect() + { + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true)) + { + await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); + var sender = client.CreateSender(scope.QueueName); + await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage("session")); + var receiver = await client.AcceptNextSessionAsync(scope.QueueName); + + var message = await receiver.ReceiveMessageAsync(); + + SimulateNetworkFailure(client); + Assert.That( + async () => await receiver.DeadLetterMessageAsync(message), + Throws.InstanceOf().And.Property(nameof(ServiceBusException.Reason)) + .EqualTo(ServiceBusFailureReason.SessionLockLost)); + } + } } } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Transactions/TransactionLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Transactions/TransactionLiveTests.cs index 5d47dec5c3c4..840822c5aad9 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Transactions/TransactionLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Transactions/TransactionLiveTests.cs @@ -297,14 +297,7 @@ public async Task TransactionThrowsWhenOperationsOfDifferentPartitionsAreInSameT await receiver.CompleteMessageAsync(receivedMessage1); - // the service seems to abandon the message that - // triggered the InvalidOperationException - // in the transaction - Assert.That( - async () => - await receiver.CompleteMessageAsync(receivedMessage2), Throws.InstanceOf() - .And.Property(nameof(ServiceBusException.Reason)) - .EqualTo(ServiceBusFailureReason.MessageLockLost)); + await receiver.CompleteMessageAsync(receivedMessage2); } }