From dcae49cc7d0f3872aa30e44cd9ad05a8aef17d8c Mon Sep 17 00:00:00 2001 From: Tim Bussmann Date: Tue, 22 Jun 2021 14:14:33 +0200 Subject: [PATCH 01/21] add behavior to send retry confirmation control message --- .../ManualRetryNotificationBehavior.cs | 60 +++++++++++++++++++ .../Retries/PlatformRetryNotifications.cs | 20 +++++++ 2 files changed, 80 insertions(+) create mode 100644 src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs create mode 100644 src/NServiceBus.Core/ServicePlatform/Retries/PlatformRetryNotifications.cs diff --git a/src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs b/src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs new file mode 100644 index 00000000000..9f6f369edad --- /dev/null +++ b/src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs @@ -0,0 +1,60 @@ +namespace NServiceBus +{ + using System; + using System.Collections.Generic; + using System.Threading.Tasks; + using Pipeline; + using Routing; + using Transport; + + class ManualRetryNotificationBehavior : IForkConnector + { + const string RetryUniqueMessageIdHeader = "ServiceControl.Retry.UniqueMessageId"; + + readonly string errorQueue; + + public ManualRetryNotificationBehavior(string errorQueue) + { + this.errorQueue = errorQueue; + } + + public async Task Invoke(IIncomingLogicalMessageContext context, Func next) + { + await next(context).ConfigureAwait(false); + + if (IsRetriedMessage(out var id)) + { + await ConfirmSuccessfulRetry().ConfigureAwait(false); + } + + async Task ConfirmSuccessfulRetry() + { + var messageToDispatch = new OutgoingMessage( + CombGuid.Generate().ToString(), + new Dictionary + { + { "ServiceControl.Retry.Successful", DateTimeOffset.UtcNow.ToString("O") }, + { RetryUniqueMessageIdHeader, id } + }, + new byte[0]); + var routingContext = new RoutingContext(messageToDispatch, new UnicastRoutingStrategy(errorQueue), context); + await this.Fork(routingContext).ConfigureAwait(false); + } + + bool IsRetriedMessage(out string retryUniqueMessageId) + { + // check if the message is coming from a manual retry attempt + if (context.Headers.TryGetValue(RetryUniqueMessageIdHeader, out var uniqueMessageId) && + // The SC version that supports the confirmation message also started to add the SC version header + context.Headers.ContainsKey("ServiceControl.Version")) + { + retryUniqueMessageId = uniqueMessageId; + return true; + } + + retryUniqueMessageId = null; + return false; + } + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Core/ServicePlatform/Retries/PlatformRetryNotifications.cs b/src/NServiceBus.Core/ServicePlatform/Retries/PlatformRetryNotifications.cs new file mode 100644 index 00000000000..86b9fd067be --- /dev/null +++ b/src/NServiceBus.Core/ServicePlatform/Retries/PlatformRetryNotifications.cs @@ -0,0 +1,20 @@ +namespace NServiceBus +{ + using Features; + + /// + /// Provides notifications to ServiceControl about successfully retried messages + /// + public class PlatformRetryNotifications : Feature + { + PlatformRetryNotifications() => EnableByDefault(); + + /// + protected internal override void Setup(FeatureConfigurationContext context) + { + var errorQueueAddress = context.Settings.ErrorQueueAddress(); + var forkBehavior = new ManualRetryNotificationBehavior(errorQueueAddress); + context.Pipeline.Register(forkBehavior, "Provides retry notifications to ServiceControl"); + } + } +} \ No newline at end of file From 303d6d8932687b47fc7fb01f29bec262e4ad41bf Mon Sep 17 00:00:00 2001 From: Tim Bussmann Date: Mon, 28 Jun 2021 13:44:37 +0200 Subject: [PATCH 02/21] add acceptance test --- .../When_retrying_message_from_error_queue.cs | 99 +++++++++++++++++++ 1 file changed, 99 insertions(+) create mode 100644 src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_message_from_error_queue.cs diff --git a/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_message_from_error_queue.cs b/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_message_from_error_queue.cs new file mode 100644 index 00000000000..8f0a11cc513 --- /dev/null +++ b/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_message_from_error_queue.cs @@ -0,0 +1,99 @@ +namespace NServiceBus.AcceptanceTests.Recoverability +{ + using System; + using System.Threading.Tasks; + using AcceptanceTesting; + using AcceptanceTesting.Customization; + using EndpointTemplates; + using NServiceBus.Pipeline; + using NUnit.Framework; + + public class When_retrying_message_from_error_queue : NServiceBusAcceptanceTest + { + [Test] + public async Task Should_confirm_successful_processing_to_error_queue() + { + var retryId = Guid.NewGuid().ToString("D"); + + var context = await Scenario.Define() + .WithEndpoint(e => e + .When(s => + { + var sendOptions = new SendOptions(); + sendOptions.RouteToThisEndpoint(); + // set SC retry header information + sendOptions.SetHeader("ServiceControl.Version", "42"); + sendOptions.SetHeader("ServiceControl.Retry.UniqueMessageId", retryId); + return s.Send(new FailedMessage(), sendOptions); + })) + .WithEndpoint() + .Done(c => c.ConfirmedRetryId != null) + .Run(); + + Assert.IsTrue(context.MessageProcessed); + Assert.AreEqual(retryId, context.ConfirmedRetryId); + var processingTime = DateTimeOffset.Parse(context.RetryProcessingTimestamp); + Assert.That(processingTime, Is.EqualTo(DateTimeOffset.UtcNow).Within(TimeSpan.FromMinutes(1))); + } + + class Context : ScenarioContext + { + public string ConfirmedRetryId { get; set; } + public string RetryProcessingTimestamp { get; set; } + public bool MessageProcessed { get; set; } + } + + class ProcessingEndpoint : EndpointConfigurationBuilder + { + public ProcessingEndpoint() => EndpointSetup(c => c + .SendFailedMessagesTo()); + + class FailedMessageHandler : IHandleMessages + { + Context testContext; + + public FailedMessageHandler(Context testContext) + { + this.testContext = testContext; + } + + public Task Handle(FailedMessage message, IMessageHandlerContext context) + { + testContext.MessageProcessed = true; + return Task.CompletedTask; + } + } + } + + class ErrorSpy : EndpointConfigurationBuilder + { + public ErrorSpy() => EndpointSetup((e, r) => e.Pipeline.Register( + new ControlMessageBehavior(r.ScenarioContext as Context), + "Checks for confirmation control message")); + + class ControlMessageBehavior : Behavior + { + Context testContext; + + public ControlMessageBehavior(Context testContext) + { + this.testContext = testContext; + } + + public override async Task Invoke(IIncomingPhysicalMessageContext context, Func next) + { + await next(); + + testContext.ConfirmedRetryId = context.MessageHeaders["ServiceControl.Retry.UniqueMessageId"]; + testContext.RetryProcessingTimestamp = context.MessageHeaders["ServiceControl.Retry.Successful"]; + } + } + } + + class FailedMessage : IMessage + { + } + } + + +} \ No newline at end of file From a8d10539cf0d0b18d9db3cb974063bcb1d4c6332 Mon Sep 17 00:00:00 2001 From: Tim Bussmann Date: Mon, 28 Jun 2021 14:42:16 +0200 Subject: [PATCH 03/21] add unit tests for behavior --- .../ManualRetryNotificationBehaviorTests.cs | 123 ++++++++++++++++++ 1 file changed, 123 insertions(+) create mode 100644 src/NServiceBus.Core.Tests/ServicePlatform/Retries/ManualRetryNotificationBehaviorTests.cs diff --git a/src/NServiceBus.Core.Tests/ServicePlatform/Retries/ManualRetryNotificationBehaviorTests.cs b/src/NServiceBus.Core.Tests/ServicePlatform/Retries/ManualRetryNotificationBehaviorTests.cs new file mode 100644 index 00000000000..d0bb69ac5d7 --- /dev/null +++ b/src/NServiceBus.Core.Tests/ServicePlatform/Retries/ManualRetryNotificationBehaviorTests.cs @@ -0,0 +1,123 @@ +namespace NServiceBus.Core.Tests.ServicePlatform.Retries +{ + using System; + using System.Collections.Generic; + using System.Linq; + using System.Threading.Tasks; + using Microsoft.Extensions.DependencyInjection; + using NServiceBus.Pipeline; + using NServiceBus.Routing; + using NUnit.Framework; + using Testing; + + [TestFixture] + public class ManualRetryNotificationBehaviorTests + { + [Test] + public async Task Should_confirm_successful_retries_to_error_queue() + { + const string errorQueue = "configuredErrorQueue"; + var routingPipeline = new RoutingPipeline(); + var behavior = new ManualRetryNotificationBehavior(errorQueue); + + TestableIncomingLogicalMessageContext context = SetupTestableContext(routingPipeline); + // Set necessary SC headers + context.Headers["ServiceControl.Retry.UniqueMessageId"] = Guid.NewGuid().ToString("N"); + context.Headers["ServiceControl.Version"] = "42"; + + await behavior.Invoke(context, _ => Task.CompletedTask); + + var outgoingMessage = routingPipeline.ForkInvocations.Single(); + Assert.AreEqual( + context.Headers["ServiceControl.Retry.UniqueMessageId"], + outgoingMessage.Message.Headers["ServiceControl.Retry.UniqueMessageId"]); + + Assert.IsTrue(outgoingMessage.Message.Headers.ContainsKey("ServiceControl.Retry.Successful")); + + Assert.AreEqual(0, outgoingMessage.Message.Body.Length); + + Assert.AreEqual(bool.TrueString, outgoingMessage.Message.Headers[Headers.ControlMessageHeader]); + + var addressTag = outgoingMessage.RoutingStrategies.Single().Apply(new Dictionary()) as UnicastAddressTag; + Assert.AreEqual(addressTag.Destination, errorQueue); + } + + [Test] + public void Should_not_confirm_when_processing_fails() + { + const string errorQueue = "configuredErrorQueue"; + var routingPipeline = new RoutingPipeline(); + var behavior = new ManualRetryNotificationBehavior(errorQueue); + + TestableIncomingLogicalMessageContext context = SetupTestableContext(routingPipeline); + // Set necessary SC headers + context.Headers["ServiceControl.Retry.UniqueMessageId"] = Guid.NewGuid().ToString("N"); + context.Headers["ServiceControl.Version"] = "42"; + + var exception = new Exception("some pipeline failure"); + var thrownException = Assert.ThrowsAsync(async () => await behavior.Invoke(context, _ => Task.FromException(exception))); + + Assert.AreSame(thrownException, exception); + Assert.AreEqual(0, routingPipeline.ForkInvocations.Count); + } + + [Test] + // A missing SC version header indicates an older version of SC that cannot handle the confirmation message yet + public async Task Should_not_confirm_when_message_does_not_contain_SC_version_header() + { + const string errorQueue = "configuredErrorQueue"; + var routingPipeline = new RoutingPipeline(); + var behavior = new ManualRetryNotificationBehavior(errorQueue); + + TestableIncomingLogicalMessageContext context = SetupTestableContext(routingPipeline); + context.Headers["ServiceControl.Retry.UniqueMessageId"] = Guid.NewGuid().ToString("N"); + + await behavior.Invoke(context, _ => Task.CompletedTask); + + Assert.AreEqual(0, routingPipeline.ForkInvocations.Count); + } + + [Test] + public async Task Should_not_confirm_when_message_does_not_retry_header() + { + const string errorQueue = "configuredErrorQueue"; + var routingPipeline = new RoutingPipeline(); + var behavior = new ManualRetryNotificationBehavior(errorQueue); + + TestableIncomingLogicalMessageContext context = SetupTestableContext(routingPipeline); + context.Headers["ServiceControl.Version"] = "42"; + + await behavior.Invoke(context, _ => Task.CompletedTask); + + Assert.AreEqual(0, routingPipeline.ForkInvocations.Count); + } + + static TestableIncomingLogicalMessageContext SetupTestableContext(RoutingPipeline routingPipeline) + { + var context = new TestableIncomingLogicalMessageContext(); + + //setup fork pipeline + var serviceCollection = new ServiceCollection(); + var pipelineModifications = new PipelineModifications(); + pipelineModifications.Additions.Add( + RegisterStep.Create("routingFork", typeof(RoutingPipeline), "for testing", _ => routingPipeline)); + var pipelineCache = new PipelineCache(serviceCollection.BuildServiceProvider(), pipelineModifications); + context.Extensions.Set(pipelineCache); + + return context; + } + + class RoutingPipeline : Behavior + { + public List ForkInvocations { get; } = new List(); + + public override Task Invoke(IRoutingContext context, Func next) + { + ForkInvocations.Add(context); + return Task.CompletedTask; + } + } + + //TODO also for control messages + } +} \ No newline at end of file From 800f59c4bfa753342b765cb5606a07a2c5b729e0 Mon Sep 17 00:00:00 2001 From: Tim Bussmann Date: Mon, 28 Jun 2021 14:42:26 +0200 Subject: [PATCH 04/21] add control message header --- .../ServicePlatform/Retries/ManualRetryNotificationBehavior.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs b/src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs index 9f6f369edad..67dab10ddb9 100644 --- a/src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs +++ b/src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs @@ -34,7 +34,8 @@ async Task ConfirmSuccessfulRetry() new Dictionary { { "ServiceControl.Retry.Successful", DateTimeOffset.UtcNow.ToString("O") }, - { RetryUniqueMessageIdHeader, id } + { RetryUniqueMessageIdHeader, id }, + { Headers.ControlMessageHeader, bool.TrueString } }, new byte[0]); var routingContext = new RoutingContext(messageToDispatch, new UnicastRoutingStrategy(errorQueue), context); From 1f17e73eb4a0f4cc3cb667e05992d80e4c332b1f Mon Sep 17 00:00:00 2001 From: Tim Bussmann Date: Mon, 28 Jun 2021 17:01:45 +0200 Subject: [PATCH 05/21] add test for control message --- ...trying_control_message_from_error_queue.cs | 111 ++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100644 src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_control_message_from_error_queue.cs diff --git a/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_control_message_from_error_queue.cs b/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_control_message_from_error_queue.cs new file mode 100644 index 00000000000..c47606a5f1c --- /dev/null +++ b/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_control_message_from_error_queue.cs @@ -0,0 +1,111 @@ +namespace NServiceBus.AcceptanceTests.Recoverability +{ + using System; + using System.Threading; + using System.Threading.Tasks; + using AcceptanceTesting; + using AcceptanceTesting.Customization; + using EndpointTemplates; + using Features; + using Microsoft.Extensions.DependencyInjection; + using NServiceBus.Pipeline; + using NServiceBus.Routing; + using NUnit.Framework; + using Transport; + using Unicast.Transport; + + public class When_retrying_control_message_from_error_queue + { + static readonly string RetryId = Guid.NewGuid().ToString("D"); + + [Test] + public async Task Should_confirm_successful_processing_to_error_queue() + { + Requires.MessageDrivenPubSub(); //required for subscription control message support + + var context = await Scenario.Define() + .WithEndpoint() + .WithEndpoint() + .Done(c => c.ConfirmedRetryId != null) + .Run(); + + Assert.AreEqual(RetryId, context.ConfirmedRetryId); + var processingTime = DateTimeOffset.Parse(context.RetryProcessingTimestamp); + Assert.That(processingTime, Is.EqualTo(DateTimeOffset.UtcNow).Within(TimeSpan.FromMinutes(1))); + } + + class Context : ScenarioContext + { + public string ConfirmedRetryId { get; set; } + public string RetryProcessingTimestamp { get; set; } + } + + class ProcessingEndpoint : EndpointConfigurationBuilder + { + public ProcessingEndpoint() => EndpointSetup(c => + { + c.EnableFeature(); + c.SendFailedMessagesTo(); + }); + + class ControlMessageFeature : Feature + { + protected override void Setup(FeatureConfigurationContext context) + { + context.RegisterStartupTask(s => + new ControlMessageSender(s.GetRequiredService())); + } + } + + class ControlMessageSender : FeatureStartupTask + { + IMessageDispatcher dispatcher; + + public ControlMessageSender(IMessageDispatcher dispatcher) + { + this.dispatcher = dispatcher; + } + + protected override async Task OnStart(IMessageSession session, CancellationToken cancellationToken = default) + { + var controlMessage = ControlMessageFactory.Create(MessageIntentEnum.Subscribe); + // set necessary subscription control message headers + controlMessage.Headers.Add(Headers.SubscriptionMessageType, typeof(object).AssemblyQualifiedName); + controlMessage.Headers.Add(Headers.ReplyToAddress, "TestSubscriberAddress"); + // set SC headers + controlMessage.Headers.Add("ServiceControl.Retry.UniqueMessageId", RetryId); + controlMessage.Headers.Add("ServiceControl.Version", Math.PI.ToString("N")); + var messageOperation = new TransportOperation(controlMessage, new UnicastAddressTag(Conventions.EndpointNamingConvention(typeof(ProcessingEndpoint)))); + await dispatcher.Dispatch(new TransportOperations(messageOperation), new TransportTransaction(), cancellationToken); + } + + protected override Task OnStop(IMessageSession session, CancellationToken cancellationToken = default) => Task.CompletedTask; + } + } + + class ErrorSpy : EndpointConfigurationBuilder + { + public ErrorSpy() => EndpointSetup((e, r) => e.Pipeline.Register( + new ControlMessageBehavior(r.ScenarioContext as Context), + "Checks for confirmation control message")); + + class ControlMessageBehavior : Behavior + { + Context testContext; + + public ControlMessageBehavior(Context testContext) + { + this.testContext = testContext; + } + + public override async Task Invoke(IIncomingPhysicalMessageContext context, Func next) + { + await next(); + + testContext.ConfirmedRetryId = context.MessageHeaders["ServiceControl.Retry.UniqueMessageId"]; + testContext.RetryProcessingTimestamp = context.MessageHeaders["ServiceControl.Retry.Successful"]; + } + } + } + } +} \ No newline at end of file From bf4fa4a7099835cd83dcefd044af464618a4376a Mon Sep 17 00:00:00 2001 From: Tim Bussmann Date: Mon, 28 Jun 2021 17:01:59 +0200 Subject: [PATCH 06/21] move to ITransportReceiveContext stage --- .../ManualRetryNotificationBehaviorTests.cs | 26 +++++++++---------- .../ManualRetryNotificationBehavior.cs | 8 +++--- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/src/NServiceBus.Core.Tests/ServicePlatform/Retries/ManualRetryNotificationBehaviorTests.cs b/src/NServiceBus.Core.Tests/ServicePlatform/Retries/ManualRetryNotificationBehaviorTests.cs index d0bb69ac5d7..7d00ec1b2e0 100644 --- a/src/NServiceBus.Core.Tests/ServicePlatform/Retries/ManualRetryNotificationBehaviorTests.cs +++ b/src/NServiceBus.Core.Tests/ServicePlatform/Retries/ManualRetryNotificationBehaviorTests.cs @@ -20,16 +20,16 @@ public async Task Should_confirm_successful_retries_to_error_queue() var routingPipeline = new RoutingPipeline(); var behavior = new ManualRetryNotificationBehavior(errorQueue); - TestableIncomingLogicalMessageContext context = SetupTestableContext(routingPipeline); + var context = SetupTestableContext(routingPipeline); // Set necessary SC headers - context.Headers["ServiceControl.Retry.UniqueMessageId"] = Guid.NewGuid().ToString("N"); - context.Headers["ServiceControl.Version"] = "42"; + context.Message.Headers["ServiceControl.Retry.UniqueMessageId"] = Guid.NewGuid().ToString("N"); + context.Message.Headers["ServiceControl.Version"] = "42"; await behavior.Invoke(context, _ => Task.CompletedTask); var outgoingMessage = routingPipeline.ForkInvocations.Single(); Assert.AreEqual( - context.Headers["ServiceControl.Retry.UniqueMessageId"], + context.Message.Headers["ServiceControl.Retry.UniqueMessageId"], outgoingMessage.Message.Headers["ServiceControl.Retry.UniqueMessageId"]); Assert.IsTrue(outgoingMessage.Message.Headers.ContainsKey("ServiceControl.Retry.Successful")); @@ -49,10 +49,10 @@ public void Should_not_confirm_when_processing_fails() var routingPipeline = new RoutingPipeline(); var behavior = new ManualRetryNotificationBehavior(errorQueue); - TestableIncomingLogicalMessageContext context = SetupTestableContext(routingPipeline); + var context = SetupTestableContext(routingPipeline); // Set necessary SC headers - context.Headers["ServiceControl.Retry.UniqueMessageId"] = Guid.NewGuid().ToString("N"); - context.Headers["ServiceControl.Version"] = "42"; + context.Message.Headers["ServiceControl.Retry.UniqueMessageId"] = Guid.NewGuid().ToString("N"); + context.Message.Headers["ServiceControl.Version"] = "42"; var exception = new Exception("some pipeline failure"); var thrownException = Assert.ThrowsAsync(async () => await behavior.Invoke(context, _ => Task.FromException(exception))); @@ -69,8 +69,8 @@ public async Task Should_not_confirm_when_message_does_not_contain_SC_version_he var routingPipeline = new RoutingPipeline(); var behavior = new ManualRetryNotificationBehavior(errorQueue); - TestableIncomingLogicalMessageContext context = SetupTestableContext(routingPipeline); - context.Headers["ServiceControl.Retry.UniqueMessageId"] = Guid.NewGuid().ToString("N"); + var context = SetupTestableContext(routingPipeline); + context.Message.Headers["ServiceControl.Retry.UniqueMessageId"] = Guid.NewGuid().ToString("N"); await behavior.Invoke(context, _ => Task.CompletedTask); @@ -84,17 +84,17 @@ public async Task Should_not_confirm_when_message_does_not_retry_header() var routingPipeline = new RoutingPipeline(); var behavior = new ManualRetryNotificationBehavior(errorQueue); - TestableIncomingLogicalMessageContext context = SetupTestableContext(routingPipeline); - context.Headers["ServiceControl.Version"] = "42"; + var context = SetupTestableContext(routingPipeline); + context.Message.Headers["ServiceControl.Version"] = "42"; await behavior.Invoke(context, _ => Task.CompletedTask); Assert.AreEqual(0, routingPipeline.ForkInvocations.Count); } - static TestableIncomingLogicalMessageContext SetupTestableContext(RoutingPipeline routingPipeline) + static TestableTransportReceiveContext SetupTestableContext(RoutingPipeline routingPipeline) { - var context = new TestableIncomingLogicalMessageContext(); + var context = new TestableTransportReceiveContext(); //setup fork pipeline var serviceCollection = new ServiceCollection(); diff --git a/src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs b/src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs index 67dab10ddb9..a50281056b7 100644 --- a/src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs +++ b/src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs @@ -7,7 +7,7 @@ using Routing; using Transport; - class ManualRetryNotificationBehavior : IForkConnector + class ManualRetryNotificationBehavior : IForkConnector { const string RetryUniqueMessageIdHeader = "ServiceControl.Retry.UniqueMessageId"; @@ -18,7 +18,7 @@ public ManualRetryNotificationBehavior(string errorQueue) this.errorQueue = errorQueue; } - public async Task Invoke(IIncomingLogicalMessageContext context, Func next) + public async Task Invoke(ITransportReceiveContext context, Func next) { await next(context).ConfigureAwait(false); @@ -45,9 +45,9 @@ async Task ConfirmSuccessfulRetry() bool IsRetriedMessage(out string retryUniqueMessageId) { // check if the message is coming from a manual retry attempt - if (context.Headers.TryGetValue(RetryUniqueMessageIdHeader, out var uniqueMessageId) && + if (context.Message.Headers.TryGetValue(RetryUniqueMessageIdHeader, out var uniqueMessageId) && // The SC version that supports the confirmation message also started to add the SC version header - context.Headers.ContainsKey("ServiceControl.Version")) + context.Message.Headers.ContainsKey("ServiceControl.Version")) { retryUniqueMessageId = uniqueMessageId; return true; From 4990b715222a6e6e9b6ed00e3c426706459a3f69 Mon Sep 17 00:00:00 2001 From: Tim Bussmann Date: Mon, 28 Jun 2021 17:08:14 +0200 Subject: [PATCH 07/21] use required namespace --- .../ServicePlatform/Retries/PlatformRetryNotifications.cs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/NServiceBus.Core/ServicePlatform/Retries/PlatformRetryNotifications.cs b/src/NServiceBus.Core/ServicePlatform/Retries/PlatformRetryNotifications.cs index 86b9fd067be..3b5929f8005 100644 --- a/src/NServiceBus.Core/ServicePlatform/Retries/PlatformRetryNotifications.cs +++ b/src/NServiceBus.Core/ServicePlatform/Retries/PlatformRetryNotifications.cs @@ -1,9 +1,7 @@ -namespace NServiceBus +namespace NServiceBus.Features { - using Features; - /// - /// Provides notifications to ServiceControl about successfully retried messages + /// Provides notifications to ServiceControl about successfully retried messages. /// public class PlatformRetryNotifications : Feature { From b1b71b8b78081a1c4c08ca79db591fe0330a8da8 Mon Sep 17 00:00:00 2001 From: Tim Bussmann Date: Mon, 28 Jun 2021 17:08:21 +0200 Subject: [PATCH 08/21] approve API --- .../APIApprovals.ApproveNServiceBus.approved.txt | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/NServiceBus.Core.Tests/ApprovalFiles/APIApprovals.ApproveNServiceBus.approved.txt b/src/NServiceBus.Core.Tests/ApprovalFiles/APIApprovals.ApproveNServiceBus.approved.txt index b851b91f6ea..b100f0b8d87 100644 --- a/src/NServiceBus.Core.Tests/ApprovalFiles/APIApprovals.ApproveNServiceBus.approved.txt +++ b/src/NServiceBus.Core.Tests/ApprovalFiles/APIApprovals.ApproveNServiceBus.approved.txt @@ -1546,6 +1546,10 @@ namespace NServiceBus.Features { protected override void Setup(NServiceBus.Features.FeatureConfigurationContext context) { } } + public class PlatformRetryNotifications : NServiceBus.Features.Feature + { + protected override void Setup(NServiceBus.Features.FeatureConfigurationContext context) { } + } public class Sagas : NServiceBus.Features.Feature { protected override void Setup(NServiceBus.Features.FeatureConfigurationContext context) { } From fff5e64da6b5ca30b80b4a8bff776813d01ad36c Mon Sep 17 00:00:00 2001 From: Tim Bussmann Date: Mon, 28 Jun 2021 17:09:22 +0200 Subject: [PATCH 09/21] add missing test base class --- .../When_retrying_control_message_from_error_queue.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_control_message_from_error_queue.cs b/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_control_message_from_error_queue.cs index c47606a5f1c..92840cfa59f 100644 --- a/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_control_message_from_error_queue.cs +++ b/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_control_message_from_error_queue.cs @@ -14,7 +14,7 @@ using Transport; using Unicast.Transport; - public class When_retrying_control_message_from_error_queue + public class When_retrying_control_message_from_error_queue : NServiceBusAcceptanceTest { static readonly string RetryId = Guid.NewGuid().ToString("D"); From 43cb7d88cf1d4a9ee663ad6e74a93e9082ab1e94 Mon Sep 17 00:00:00 2001 From: Tim Bussmann Date: Mon, 28 Jun 2021 17:34:56 +0200 Subject: [PATCH 10/21] mark message as public --- .../Recoverability/When_retrying_message_from_error_queue.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_message_from_error_queue.cs b/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_message_from_error_queue.cs index 8f0a11cc513..482157a4ef4 100644 --- a/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_message_from_error_queue.cs +++ b/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_message_from_error_queue.cs @@ -90,7 +90,7 @@ public override async Task Invoke(IIncomingPhysicalMessageContext context, Func< } } - class FailedMessage : IMessage + public class FailedMessage : IMessage { } } From 87157a57e811ec36c8b0fe7cfbeaef72ac161691 Mon Sep 17 00:00:00 2001 From: Tim Bussmann Date: Mon, 28 Jun 2021 17:56:40 +0200 Subject: [PATCH 11/21] update approved pipeline behavior order --- ...lt.Should_preserve_order.net5.approved.txt | 21 ++++++++++--------- ...e_built.Should_preserve_order.approved.txt | 19 +++++++++-------- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net5.approved.txt b/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net5.approved.txt index 484bf54f76e..cb41fd463ae 100644 --- a/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net5.approved.txt +++ b/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net5.approved.txt @@ -54,13 +54,14 @@ context0 => value(NServiceBus.EnforceUnsubscribeBestPracticesBehavior).Invoke(co context1 => value(NServiceBus.MessageDrivenUnsubscribeTerminator).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IUnsubscribeContext],System.Threading.Tasks.Task`1[System.Threading.Tasks.VoidTaskResult]])), context0 => value(NServiceBus.AcceptanceTesting.Support.CaptureExceptionBehavior).Invoke(context0, value(System.Func`2[NServiceBus.Pipeline.ITransportReceiveContext,System.Threading.Tasks.Task])), - context1 => value(NServiceBus.TransportReceiveToPhysicalMessageConnector).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), - context2 => value(NServiceBus.ProcessingStatisticsBehavior).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), - context3 => value(NServiceBus.MutateIncomingTransportMessageBehavior).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), - context4 => value(NServiceBus.SubscriptionReceiverBehavior).Invoke(context4, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), - context5 => value(NServiceBus.UnitOfWorkBehavior).Invoke(context5, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), - context6 => value(NServiceBus.DeserializeMessageConnector).Invoke(context6, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])), - context7 => value(NServiceBus.MutateIncomingMessageBehavior).Invoke(context7, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])), - context8 => value(NServiceBus.InferredMessageTypeEnricherBehavior).Invoke(context8, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])), - context9 => value(NServiceBus.LoadHandlersConnector).Invoke(context9, value(System.Func`2[NServiceBus.Pipeline.IInvokeHandlerContext,System.Threading.Tasks.Task])), - context10 => value(NServiceBus.InvokeHandlerTerminator).Invoke(context10, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IInvokeHandlerContext],System.Threading.Tasks.Task`1[System.Threading.Tasks.VoidTaskResult]])), + context1 => value(NServiceBus.ManualRetryNotificationBehavior).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.ITransportReceiveContext,System.Threading.Tasks.Task])), + context2 => value(NServiceBus.TransportReceiveToPhysicalMessageConnector).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), + context3 => value(NServiceBus.ProcessingStatisticsBehavior).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), + context4 => value(NServiceBus.MutateIncomingTransportMessageBehavior).Invoke(context4, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), + context5 => value(NServiceBus.SubscriptionReceiverBehavior).Invoke(context5, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), + context6 => value(NServiceBus.UnitOfWorkBehavior).Invoke(context6, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), + context7 => value(NServiceBus.DeserializeMessageConnector).Invoke(context7, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])), + context8 => value(NServiceBus.MutateIncomingMessageBehavior).Invoke(context8, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])), + context9 => value(NServiceBus.InferredMessageTypeEnricherBehavior).Invoke(context9, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])), + context10 => value(NServiceBus.LoadHandlersConnector).Invoke(context10, value(System.Func`2[NServiceBus.Pipeline.IInvokeHandlerContext,System.Threading.Tasks.Task])), + context11 => value(NServiceBus.InvokeHandlerTerminator).Invoke(context11, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IInvokeHandlerContext],System.Threading.Tasks.Task`1[System.Threading.Tasks.VoidTaskResult]])), diff --git a/src/NServiceBus.Learning.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.approved.txt b/src/NServiceBus.Learning.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.approved.txt index 4bc5051129f..dea76e6f7ae 100644 --- a/src/NServiceBus.Learning.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.approved.txt +++ b/src/NServiceBus.Learning.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.approved.txt @@ -51,12 +51,13 @@ context0 => value(NServiceBus.EnforceUnsubscribeBestPracticesBehavior).Invoke(co context1 => value(NServiceBus.NativeUnsubscribeTerminator).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IUnsubscribeContext],System.Threading.Tasks.Task])), context0 => value(NServiceBus.AcceptanceTesting.Support.CaptureExceptionBehavior).Invoke(context0, value(System.Func`2[NServiceBus.Pipeline.ITransportReceiveContext,System.Threading.Tasks.Task])), - context1 => value(NServiceBus.TransportReceiveToPhysicalMessageConnector).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), - context2 => value(NServiceBus.ProcessingStatisticsBehavior).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), - context3 => value(NServiceBus.MutateIncomingTransportMessageBehavior).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), - context4 => value(NServiceBus.UnitOfWorkBehavior).Invoke(context4, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), - context5 => value(NServiceBus.DeserializeMessageConnector).Invoke(context5, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])), - context6 => value(NServiceBus.MutateIncomingMessageBehavior).Invoke(context6, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])), - context7 => value(NServiceBus.InferredMessageTypeEnricherBehavior).Invoke(context7, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])), - context8 => value(NServiceBus.LoadHandlersConnector).Invoke(context8, value(System.Func`2[NServiceBus.Pipeline.IInvokeHandlerContext,System.Threading.Tasks.Task])), - context9 => value(NServiceBus.InvokeHandlerTerminator).Invoke(context9, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IInvokeHandlerContext],System.Threading.Tasks.Task])), + context1 => value(NServiceBus.ManualRetryNotificationBehavior).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.ITransportReceiveContext,System.Threading.Tasks.Task])), + context2 => value(NServiceBus.TransportReceiveToPhysicalMessageConnector).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), + context3 => value(NServiceBus.ProcessingStatisticsBehavior).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), + context4 => value(NServiceBus.MutateIncomingTransportMessageBehavior).Invoke(context4, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), + context5 => value(NServiceBus.UnitOfWorkBehavior).Invoke(context5, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), + context6 => value(NServiceBus.DeserializeMessageConnector).Invoke(context6, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])), + context7 => value(NServiceBus.MutateIncomingMessageBehavior).Invoke(context7, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])), + context8 => value(NServiceBus.InferredMessageTypeEnricherBehavior).Invoke(context8, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])), + context9 => value(NServiceBus.LoadHandlersConnector).Invoke(context9, value(System.Func`2[NServiceBus.Pipeline.IInvokeHandlerContext,System.Threading.Tasks.Task])), + context10 => value(NServiceBus.InvokeHandlerTerminator).Invoke(context10, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IInvokeHandlerContext],System.Threading.Tasks.Task])), From 1bad79624be15c861d4a0cb70c6e94c1e1df4d22 Mon Sep 17 00:00:00 2001 From: Tim Bussmann Date: Mon, 28 Jun 2021 18:09:25 +0200 Subject: [PATCH 12/21] more approvals --- ...e_built.Should_preserve_order.approved.txt | 21 ++++++++++--------- ...lt.Should_preserve_order.net5.approved.txt | 19 +++++++++-------- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.approved.txt b/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.approved.txt index a8fa36dd2c5..2ea91d68889 100644 --- a/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.approved.txt +++ b/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.approved.txt @@ -54,13 +54,14 @@ context0 => value(NServiceBus.EnforceUnsubscribeBestPracticesBehavior).Invoke(co context1 => value(NServiceBus.MessageDrivenUnsubscribeTerminator).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IUnsubscribeContext],System.Threading.Tasks.Task])), context0 => value(NServiceBus.AcceptanceTesting.Support.CaptureExceptionBehavior).Invoke(context0, value(System.Func`2[NServiceBus.Pipeline.ITransportReceiveContext,System.Threading.Tasks.Task])), - context1 => value(NServiceBus.TransportReceiveToPhysicalMessageConnector).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), - context2 => value(NServiceBus.ProcessingStatisticsBehavior).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), - context3 => value(NServiceBus.MutateIncomingTransportMessageBehavior).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), - context4 => value(NServiceBus.SubscriptionReceiverBehavior).Invoke(context4, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), - context5 => value(NServiceBus.UnitOfWorkBehavior).Invoke(context5, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), - context6 => value(NServiceBus.DeserializeMessageConnector).Invoke(context6, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])), - context7 => value(NServiceBus.MutateIncomingMessageBehavior).Invoke(context7, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])), - context8 => value(NServiceBus.InferredMessageTypeEnricherBehavior).Invoke(context8, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])), - context9 => value(NServiceBus.LoadHandlersConnector).Invoke(context9, value(System.Func`2[NServiceBus.Pipeline.IInvokeHandlerContext,System.Threading.Tasks.Task])), - context10 => value(NServiceBus.InvokeHandlerTerminator).Invoke(context10, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IInvokeHandlerContext],System.Threading.Tasks.Task])), + context1 => value(NServiceBus.ManualRetryNotificationBehavior).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.ITransportReceiveContext,System.Threading.Tasks.Task])), + context2 => value(NServiceBus.TransportReceiveToPhysicalMessageConnector).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), + context3 => value(NServiceBus.ProcessingStatisticsBehavior).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), + context4 => value(NServiceBus.MutateIncomingTransportMessageBehavior).Invoke(context4, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), + context5 => value(NServiceBus.SubscriptionReceiverBehavior).Invoke(context5, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), + context6 => value(NServiceBus.UnitOfWorkBehavior).Invoke(context6, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), + context7 => value(NServiceBus.DeserializeMessageConnector).Invoke(context7, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])), + context8 => value(NServiceBus.MutateIncomingMessageBehavior).Invoke(context8, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])), + context9 => value(NServiceBus.InferredMessageTypeEnricherBehavior).Invoke(context9, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])), + context10 => value(NServiceBus.LoadHandlersConnector).Invoke(context10, value(System.Func`2[NServiceBus.Pipeline.IInvokeHandlerContext,System.Threading.Tasks.Task])), + context11 => value(NServiceBus.InvokeHandlerTerminator).Invoke(context11, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IInvokeHandlerContext],System.Threading.Tasks.Task])), diff --git a/src/NServiceBus.Learning.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net5.approved.txt b/src/NServiceBus.Learning.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net5.approved.txt index 8d9762c9ab5..d0c3602e0c8 100644 --- a/src/NServiceBus.Learning.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net5.approved.txt +++ b/src/NServiceBus.Learning.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net5.approved.txt @@ -51,12 +51,13 @@ context0 => value(NServiceBus.EnforceUnsubscribeBestPracticesBehavior).Invoke(co context1 => value(NServiceBus.NativeUnsubscribeTerminator).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IUnsubscribeContext],System.Threading.Tasks.Task`1[System.Threading.Tasks.VoidTaskResult]])), context0 => value(NServiceBus.AcceptanceTesting.Support.CaptureExceptionBehavior).Invoke(context0, value(System.Func`2[NServiceBus.Pipeline.ITransportReceiveContext,System.Threading.Tasks.Task])), - context1 => value(NServiceBus.TransportReceiveToPhysicalMessageConnector).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), - context2 => value(NServiceBus.ProcessingStatisticsBehavior).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), - context3 => value(NServiceBus.MutateIncomingTransportMessageBehavior).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), - context4 => value(NServiceBus.UnitOfWorkBehavior).Invoke(context4, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), - context5 => value(NServiceBus.DeserializeMessageConnector).Invoke(context5, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])), - context6 => value(NServiceBus.MutateIncomingMessageBehavior).Invoke(context6, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])), - context7 => value(NServiceBus.InferredMessageTypeEnricherBehavior).Invoke(context7, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])), - context8 => value(NServiceBus.LoadHandlersConnector).Invoke(context8, value(System.Func`2[NServiceBus.Pipeline.IInvokeHandlerContext,System.Threading.Tasks.Task])), - context9 => value(NServiceBus.InvokeHandlerTerminator).Invoke(context9, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IInvokeHandlerContext],System.Threading.Tasks.Task`1[System.Threading.Tasks.VoidTaskResult]])), + context1 => value(NServiceBus.ManualRetryNotificationBehavior).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.ITransportReceiveContext,System.Threading.Tasks.Task])), + context2 => value(NServiceBus.TransportReceiveToPhysicalMessageConnector).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), + context3 => value(NServiceBus.ProcessingStatisticsBehavior).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), + context4 => value(NServiceBus.MutateIncomingTransportMessageBehavior).Invoke(context4, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), + context5 => value(NServiceBus.UnitOfWorkBehavior).Invoke(context5, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), + context6 => value(NServiceBus.DeserializeMessageConnector).Invoke(context6, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])), + context7 => value(NServiceBus.MutateIncomingMessageBehavior).Invoke(context7, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])), + context8 => value(NServiceBus.InferredMessageTypeEnricherBehavior).Invoke(context8, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])), + context9 => value(NServiceBus.LoadHandlersConnector).Invoke(context9, value(System.Func`2[NServiceBus.Pipeline.IInvokeHandlerContext,System.Threading.Tasks.Task])), + context10 => value(NServiceBus.InvokeHandlerTerminator).Invoke(context10, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IInvokeHandlerContext],System.Threading.Tasks.Task`1[System.Threading.Tasks.VoidTaskResult]])), From ff63428dcffa449eb7098fa9451ee0cef209c3ab Mon Sep 17 00:00:00 2001 From: Szymon Pobiega Date: Thu, 1 Jul 2021 14:32:15 +0200 Subject: [PATCH 13/21] Add custom audit header when sending direct retry acknowledgement --- .../When_retrying_message_from_error_queue.cs | 55 ++++++++++++++++++- .../ManualRetryNotificationBehavior.cs | 11 +++- .../Retries/MarkAsAcknowledgedBehavior.cs | 24 ++++++++ .../Retries/PlatformRetryNotifications.cs | 1 + 4 files changed, 86 insertions(+), 5 deletions(-) create mode 100644 src/NServiceBus.Core/ServicePlatform/Retries/MarkAsAcknowledgedBehavior.cs diff --git a/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_message_from_error_queue.cs b/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_message_from_error_queue.cs index 482157a4ef4..db29e4d71bf 100644 --- a/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_message_from_error_queue.cs +++ b/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_message_from_error_queue.cs @@ -1,6 +1,7 @@ namespace NServiceBus.AcceptanceTests.Recoverability { using System; + using System.Collections.Generic; using System.Threading.Tasks; using AcceptanceTesting; using AcceptanceTesting.Customization; @@ -27,13 +28,15 @@ public async Task Should_confirm_successful_processing_to_error_queue() return s.Send(new FailedMessage(), sendOptions); })) .WithEndpoint() - .Done(c => c.ConfirmedRetryId != null) + .WithEndpoint() + .Done(c => c.ConfirmedRetryId != null && c.AuditHeaders != null) .Run(); Assert.IsTrue(context.MessageProcessed); Assert.AreEqual(retryId, context.ConfirmedRetryId); var processingTime = DateTimeOffset.Parse(context.RetryProcessingTimestamp); Assert.That(processingTime, Is.EqualTo(DateTimeOffset.UtcNow).Within(TimeSpan.FromMinutes(1))); + Assert.IsTrue(context.AuditHeaders.ContainsKey("ServiceControl.Retry.AcknowledgementSent")); } class Context : ScenarioContext @@ -41,12 +44,19 @@ class Context : ScenarioContext public string ConfirmedRetryId { get; set; } public string RetryProcessingTimestamp { get; set; } public bool MessageProcessed { get; set; } + public IReadOnlyDictionary AuditHeaders { get; set; } } class ProcessingEndpoint : EndpointConfigurationBuilder { - public ProcessingEndpoint() => EndpointSetup(c => c - .SendFailedMessagesTo()); + public ProcessingEndpoint() + { + EndpointSetup(c => + { + c.SendFailedMessagesTo(); + c.AuditProcessedMessagesTo(); + }); + } class FailedMessageHandler : IHandleMessages { @@ -90,6 +100,45 @@ public override async Task Invoke(IIncomingPhysicalMessageContext context, Func< } } + class AuditSpy : EndpointConfigurationBuilder + { + public AuditSpy() => EndpointSetup(); + + class FailedMessageHandler : IHandleMessages + { + Context testContext; + + public FailedMessageHandler(Context testContext) + { + this.testContext = testContext; + } + + public Task Handle(FailedMessage message, IMessageHandlerContext context) + { + testContext.AuditHeaders = context.MessageHeaders; + return Task.CompletedTask; + } + } + + class ControlMessageBehavior : Behavior + { + Context testContext; + + public ControlMessageBehavior(Context testContext) + { + this.testContext = testContext; + } + + public override async Task Invoke(IIncomingPhysicalMessageContext context, Func next) + { + await next(); + + testContext.ConfirmedRetryId = context.MessageHeaders["ServiceControl.Retry.UniqueMessageId"]; + testContext.RetryProcessingTimestamp = context.MessageHeaders["ServiceControl.Retry.Successful"]; + } + } + } + public class FailedMessage : IMessage { } diff --git a/src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs b/src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs index a50281056b7..3bb629c2a26 100644 --- a/src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs +++ b/src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs @@ -20,9 +20,16 @@ public ManualRetryNotificationBehavior(string errorQueue) public async Task Invoke(ITransportReceiveContext context, Func next) { + var useRetryAcknowledgement = UseRetryAcknowledgement(out var id); + + if (useRetryAcknowledgement) + { + context.Extensions.Set(new MarkAsAcknowledgedBehavior.State()); + } + await next(context).ConfigureAwait(false); - if (IsRetriedMessage(out var id)) + if (useRetryAcknowledgement) { await ConfirmSuccessfulRetry().ConfigureAwait(false); } @@ -42,7 +49,7 @@ async Task ConfirmSuccessfulRetry() await this.Fork(routingContext).ConfigureAwait(false); } - bool IsRetriedMessage(out string retryUniqueMessageId) + bool UseRetryAcknowledgement(out string retryUniqueMessageId) { // check if the message is coming from a manual retry attempt if (context.Message.Headers.TryGetValue(RetryUniqueMessageIdHeader, out var uniqueMessageId) && diff --git a/src/NServiceBus.Core/ServicePlatform/Retries/MarkAsAcknowledgedBehavior.cs b/src/NServiceBus.Core/ServicePlatform/Retries/MarkAsAcknowledgedBehavior.cs new file mode 100644 index 00000000000..345ed8c6d97 --- /dev/null +++ b/src/NServiceBus.Core/ServicePlatform/Retries/MarkAsAcknowledgedBehavior.cs @@ -0,0 +1,24 @@ + +namespace NServiceBus +{ + using System; + using System.Threading.Tasks; + using Pipeline; + + class MarkAsAcknowledgedBehavior : IBehavior + { + public Task Invoke(IAuditContext context, Func next) + { + if (context.Extensions.TryGet(out _)) + { + context.AddAuditData("ServiceControl.Retry.AcknowledgementSent", "true"); + } + + return next(context); + } + + public class State + { + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Core/ServicePlatform/Retries/PlatformRetryNotifications.cs b/src/NServiceBus.Core/ServicePlatform/Retries/PlatformRetryNotifications.cs index 3b5929f8005..008601a890e 100644 --- a/src/NServiceBus.Core/ServicePlatform/Retries/PlatformRetryNotifications.cs +++ b/src/NServiceBus.Core/ServicePlatform/Retries/PlatformRetryNotifications.cs @@ -13,6 +13,7 @@ protected internal override void Setup(FeatureConfigurationContext context) var errorQueueAddress = context.Settings.ErrorQueueAddress(); var forkBehavior = new ManualRetryNotificationBehavior(errorQueueAddress); context.Pipeline.Register(forkBehavior, "Provides retry notifications to ServiceControl"); + context.Pipeline.Register(new MarkAsAcknowledgedBehavior(), "Adds audit information about direct retry acknowledgement"); } } } \ No newline at end of file From 01cf627ba8f95fe70c9ac06eb655e1bd9ea2e417 Mon Sep 17 00:00:00 2001 From: Tim Bussmann Date: Fri, 2 Jul 2021 11:53:51 +0200 Subject: [PATCH 14/21] use acknowledgement queue header --- ...trying_control_message_from_error_queue.cs | 9 ++-- .../When_retrying_message_from_error_queue.cs | 11 ++--- .../ManualRetryNotificationBehaviorTests.cs | 29 ++++++------- .../ManualRetryNotificationBehavior.cs | 43 +++++++++---------- .../Retries/PlatformRetryNotifications.cs | 3 +- 5 files changed, 42 insertions(+), 53 deletions(-) diff --git a/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_control_message_from_error_queue.cs b/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_control_message_from_error_queue.cs index 92840cfa59f..4710b8b0d6e 100644 --- a/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_control_message_from_error_queue.cs +++ b/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_control_message_from_error_queue.cs @@ -25,7 +25,7 @@ public async Task Should_confirm_successful_processing_to_error_queue() var context = await Scenario.Define() .WithEndpoint() - .WithEndpoint() + .WithEndpoint() .Done(c => c.ConfirmedRetryId != null) .Run(); @@ -45,7 +45,6 @@ class ProcessingEndpoint : EndpointConfigurationBuilder public ProcessingEndpoint() => EndpointSetup(c => { c.EnableFeature(); - c.SendFailedMessagesTo(); }); class ControlMessageFeature : Feature @@ -74,7 +73,7 @@ protected override async Task OnStart(IMessageSession session, CancellationToken controlMessage.Headers.Add(Headers.ReplyToAddress, "TestSubscriberAddress"); // set SC headers controlMessage.Headers.Add("ServiceControl.Retry.UniqueMessageId", RetryId); - controlMessage.Headers.Add("ServiceControl.Version", Math.PI.ToString("N")); + controlMessage.Headers.Add("ServiceControl.Retry.AcknowledgementQueue", Conventions.EndpointNamingConvention(typeof(RetryAckSpy))); var messageOperation = new TransportOperation(controlMessage, new UnicastAddressTag(Conventions.EndpointNamingConvention(typeof(ProcessingEndpoint)))); await dispatcher.Dispatch(new TransportOperations(messageOperation), new TransportTransaction(), cancellationToken); } @@ -83,9 +82,9 @@ protected override async Task OnStart(IMessageSession session, CancellationToken } } - class ErrorSpy : EndpointConfigurationBuilder + class RetryAckSpy : EndpointConfigurationBuilder { - public ErrorSpy() => EndpointSetup((e, r) => e.Pipeline.Register( + public RetryAckSpy() => EndpointSetup((e, r) => e.Pipeline.Register( new ControlMessageBehavior(r.ScenarioContext as Context), "Checks for confirmation control message")); diff --git a/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_message_from_error_queue.cs b/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_message_from_error_queue.cs index db29e4d71bf..96f74f044f7 100644 --- a/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_message_from_error_queue.cs +++ b/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_message_from_error_queue.cs @@ -23,11 +23,11 @@ public async Task Should_confirm_successful_processing_to_error_queue() var sendOptions = new SendOptions(); sendOptions.RouteToThisEndpoint(); // set SC retry header information - sendOptions.SetHeader("ServiceControl.Version", "42"); sendOptions.SetHeader("ServiceControl.Retry.UniqueMessageId", retryId); + sendOptions.SetHeader("ServiceControl.Retry.AcknowledgementQueue", Conventions.EndpointNamingConvention(typeof(RetryAckSpy))); return s.Send(new FailedMessage(), sendOptions); })) - .WithEndpoint() + .WithEndpoint() .WithEndpoint() .Done(c => c.ConfirmedRetryId != null && c.AuditHeaders != null) .Run(); @@ -53,7 +53,6 @@ public ProcessingEndpoint() { EndpointSetup(c => { - c.SendFailedMessagesTo(); c.AuditProcessedMessagesTo(); }); } @@ -75,9 +74,9 @@ public Task Handle(FailedMessage message, IMessageHandlerContext context) } } - class ErrorSpy : EndpointConfigurationBuilder + class RetryAckSpy : EndpointConfigurationBuilder { - public ErrorSpy() => EndpointSetup((e, r) => e.Pipeline.Register( + public RetryAckSpy() => EndpointSetup((e, r) => e.Pipeline.Register( new ControlMessageBehavior(r.ScenarioContext as Context), "Checks for confirmation control message")); @@ -143,6 +142,4 @@ public class FailedMessage : IMessage { } } - - } \ No newline at end of file diff --git a/src/NServiceBus.Core.Tests/ServicePlatform/Retries/ManualRetryNotificationBehaviorTests.cs b/src/NServiceBus.Core.Tests/ServicePlatform/Retries/ManualRetryNotificationBehaviorTests.cs index 7d00ec1b2e0..ec05f0b48b4 100644 --- a/src/NServiceBus.Core.Tests/ServicePlatform/Retries/ManualRetryNotificationBehaviorTests.cs +++ b/src/NServiceBus.Core.Tests/ServicePlatform/Retries/ManualRetryNotificationBehaviorTests.cs @@ -14,16 +14,16 @@ public class ManualRetryNotificationBehaviorTests { [Test] - public async Task Should_confirm_successful_retries_to_error_queue() + public async Task Should_confirm_successful_retries_to_acknowledgement_queue() { - const string errorQueue = "configuredErrorQueue"; + const string acknowledgementQueue = "configuredAcknowledgementQueue"; var routingPipeline = new RoutingPipeline(); - var behavior = new ManualRetryNotificationBehavior(errorQueue); + var behavior = new ManualRetryNotificationBehavior(); var context = SetupTestableContext(routingPipeline); // Set necessary SC headers - context.Message.Headers["ServiceControl.Retry.UniqueMessageId"] = Guid.NewGuid().ToString("N"); - context.Message.Headers["ServiceControl.Version"] = "42"; + context.Message.Headers[ManualRetryNotificationBehavior.RetryUniqueMessageIdHeaderKey] = Guid.NewGuid().ToString("N"); + context.Message.Headers[ManualRetryNotificationBehavior.RetryConfirmationQueueHeaderKey] = acknowledgementQueue; await behavior.Invoke(context, _ => Task.CompletedTask); @@ -39,20 +39,19 @@ public async Task Should_confirm_successful_retries_to_error_queue() Assert.AreEqual(bool.TrueString, outgoingMessage.Message.Headers[Headers.ControlMessageHeader]); var addressTag = outgoingMessage.RoutingStrategies.Single().Apply(new Dictionary()) as UnicastAddressTag; - Assert.AreEqual(addressTag.Destination, errorQueue); + Assert.AreEqual(addressTag.Destination, acknowledgementQueue); } [Test] public void Should_not_confirm_when_processing_fails() { - const string errorQueue = "configuredErrorQueue"; var routingPipeline = new RoutingPipeline(); - var behavior = new ManualRetryNotificationBehavior(errorQueue); + var behavior = new ManualRetryNotificationBehavior(); var context = SetupTestableContext(routingPipeline); // Set necessary SC headers context.Message.Headers["ServiceControl.Retry.UniqueMessageId"] = Guid.NewGuid().ToString("N"); - context.Message.Headers["ServiceControl.Version"] = "42"; + context.Message.Headers[ManualRetryNotificationBehavior.RetryConfirmationQueueHeaderKey] = "SomeQueue"; var exception = new Exception("some pipeline failure"); var thrownException = Assert.ThrowsAsync(async () => await behavior.Invoke(context, _ => Task.FromException(exception))); @@ -63,11 +62,10 @@ public void Should_not_confirm_when_processing_fails() [Test] // A missing SC version header indicates an older version of SC that cannot handle the confirmation message yet - public async Task Should_not_confirm_when_message_does_not_contain_SC_version_header() + public async Task Should_not_confirm_when_message_does_not_contain_acknowledgementQueue_header() { - const string errorQueue = "configuredErrorQueue"; var routingPipeline = new RoutingPipeline(); - var behavior = new ManualRetryNotificationBehavior(errorQueue); + var behavior = new ManualRetryNotificationBehavior(); var context = SetupTestableContext(routingPipeline); context.Message.Headers["ServiceControl.Retry.UniqueMessageId"] = Guid.NewGuid().ToString("N"); @@ -78,14 +76,13 @@ public async Task Should_not_confirm_when_message_does_not_contain_SC_version_he } [Test] - public async Task Should_not_confirm_when_message_does_not_retry_header() + public async Task Should_not_confirm_when_message_does_not_contain_retry_header() { - const string errorQueue = "configuredErrorQueue"; var routingPipeline = new RoutingPipeline(); - var behavior = new ManualRetryNotificationBehavior(errorQueue); + var behavior = new ManualRetryNotificationBehavior(); var context = SetupTestableContext(routingPipeline); - context.Message.Headers["ServiceControl.Version"] = "42"; + context.Message.Headers[ManualRetryNotificationBehavior.RetryConfirmationQueueHeaderKey] = "SomeQueue"; await behavior.Invoke(context, _ => Task.CompletedTask); diff --git a/src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs b/src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs index 3bb629c2a26..d938e858341 100644 --- a/src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs +++ b/src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs @@ -9,21 +9,16 @@ class ManualRetryNotificationBehavior : IForkConnector { - const string RetryUniqueMessageIdHeader = "ServiceControl.Retry.UniqueMessageId"; - - readonly string errorQueue; - - public ManualRetryNotificationBehavior(string errorQueue) - { - this.errorQueue = errorQueue; - } + internal const string RetryUniqueMessageIdHeaderKey = "ServiceControl.Retry.UniqueMessageId"; + internal const string RetryConfirmationQueueHeaderKey = "ServiceControl.Retry.AcknowledgementQueue"; public async Task Invoke(ITransportReceiveContext context, Func next) { - var useRetryAcknowledgement = UseRetryAcknowledgement(out var id); + var useRetryAcknowledgement = IsRetriedMessage(context, out var id, out var acknowledgementQueue); if (useRetryAcknowledgement) { + // notify the ServiceControl audit instance that the retry has already been acknowledged by the endpoint context.Extensions.Set(new MarkAsAcknowledgedBehavior.State()); } @@ -41,28 +36,30 @@ async Task ConfirmSuccessfulRetry() new Dictionary { { "ServiceControl.Retry.Successful", DateTimeOffset.UtcNow.ToString("O") }, - { RetryUniqueMessageIdHeader, id }, + { RetryUniqueMessageIdHeaderKey, id }, { Headers.ControlMessageHeader, bool.TrueString } }, new byte[0]); - var routingContext = new RoutingContext(messageToDispatch, new UnicastRoutingStrategy(errorQueue), context); + var routingContext = new RoutingContext(messageToDispatch, new UnicastRoutingStrategy(acknowledgementQueue), context); await this.Fork(routingContext).ConfigureAwait(false); } + } - bool UseRetryAcknowledgement(out string retryUniqueMessageId) + static bool IsRetriedMessage(ITransportReceiveContext context, out string retryUniqueMessageId, out string retryAcknowledgementQueue) + { + // check if the message is coming from a manual retry attempt + if (context.Message.Headers.TryGetValue(RetryUniqueMessageIdHeaderKey, out var uniqueMessageId) && + // The SC version that supports the confirmation message also started to add the SC version header + context.Message.Headers.TryGetValue(RetryConfirmationQueueHeaderKey, out var acknowledgementQueue)) { - // check if the message is coming from a manual retry attempt - if (context.Message.Headers.TryGetValue(RetryUniqueMessageIdHeader, out var uniqueMessageId) && - // The SC version that supports the confirmation message also started to add the SC version header - context.Message.Headers.ContainsKey("ServiceControl.Version")) - { - retryUniqueMessageId = uniqueMessageId; - return true; - } - - retryUniqueMessageId = null; - return false; + retryUniqueMessageId = uniqueMessageId; + retryAcknowledgementQueue = acknowledgementQueue; + return true; } + + retryUniqueMessageId = null; + retryAcknowledgementQueue = null; + return false; } } } \ No newline at end of file diff --git a/src/NServiceBus.Core/ServicePlatform/Retries/PlatformRetryNotifications.cs b/src/NServiceBus.Core/ServicePlatform/Retries/PlatformRetryNotifications.cs index 008601a890e..8e04aa7ca39 100644 --- a/src/NServiceBus.Core/ServicePlatform/Retries/PlatformRetryNotifications.cs +++ b/src/NServiceBus.Core/ServicePlatform/Retries/PlatformRetryNotifications.cs @@ -10,8 +10,7 @@ public class PlatformRetryNotifications : Feature /// protected internal override void Setup(FeatureConfigurationContext context) { - var errorQueueAddress = context.Settings.ErrorQueueAddress(); - var forkBehavior = new ManualRetryNotificationBehavior(errorQueueAddress); + var forkBehavior = new ManualRetryNotificationBehavior(); context.Pipeline.Register(forkBehavior, "Provides retry notifications to ServiceControl"); context.Pipeline.Register(new MarkAsAcknowledgedBehavior(), "Adds audit information about direct retry acknowledgement"); } From 7c5594189832aeefd3dffd977d1e848934de13d2 Mon Sep 17 00:00:00 2001 From: Tim Bussmann Date: Fri, 2 Jul 2021 12:05:49 +0200 Subject: [PATCH 15/21] verify state in unit tests --- .../Retries/ManualRetryNotificationBehaviorTests.cs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/NServiceBus.Core.Tests/ServicePlatform/Retries/ManualRetryNotificationBehaviorTests.cs b/src/NServiceBus.Core.Tests/ServicePlatform/Retries/ManualRetryNotificationBehaviorTests.cs index ec05f0b48b4..467875cce18 100644 --- a/src/NServiceBus.Core.Tests/ServicePlatform/Retries/ManualRetryNotificationBehaviorTests.cs +++ b/src/NServiceBus.Core.Tests/ServicePlatform/Retries/ManualRetryNotificationBehaviorTests.cs @@ -40,6 +40,8 @@ public async Task Should_confirm_successful_retries_to_acknowledgement_queue() var addressTag = outgoingMessage.RoutingStrategies.Single().Apply(new Dictionary()) as UnicastAddressTag; Assert.AreEqual(addressTag.Destination, acknowledgementQueue); + + Assert.IsTrue(context.Extensions.TryGet(out MarkAsAcknowledgedBehavior.State _)); } [Test] @@ -73,6 +75,7 @@ public async Task Should_not_confirm_when_message_does_not_contain_acknowledgeme await behavior.Invoke(context, _ => Task.CompletedTask); Assert.AreEqual(0, routingPipeline.ForkInvocations.Count); + Assert.IsFalse(context.Extensions.TryGet(out MarkAsAcknowledgedBehavior.State _)); } [Test] @@ -87,6 +90,7 @@ public async Task Should_not_confirm_when_message_does_not_contain_retry_header( await behavior.Invoke(context, _ => Task.CompletedTask); Assert.AreEqual(0, routingPipeline.ForkInvocations.Count); + Assert.IsFalse(context.Extensions.TryGet(out MarkAsAcknowledgedBehavior.State _)); } static TestableTransportReceiveContext SetupTestableContext(RoutingPipeline routingPipeline) From b9dc3c4ce32ba08ac1cc637469f2ce6c464dd88c Mon Sep 17 00:00:00 2001 From: Tim Bussmann Date: Mon, 5 Jul 2021 16:29:09 +0200 Subject: [PATCH 16/21] adjust test name --- .../When_retrying_control_message_from_error_queue.cs | 2 +- .../Recoverability/When_retrying_message_from_error_queue.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_control_message_from_error_queue.cs b/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_control_message_from_error_queue.cs index 4710b8b0d6e..a2c03af87c8 100644 --- a/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_control_message_from_error_queue.cs +++ b/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_control_message_from_error_queue.cs @@ -19,7 +19,7 @@ public class When_retrying_control_message_from_error_queue : NServiceBusAccepta static readonly string RetryId = Guid.NewGuid().ToString("D"); [Test] - public async Task Should_confirm_successful_processing_to_error_queue() + public async Task Should_confirm_successful_processing() { Requires.MessageDrivenPubSub(); //required for subscription control message support diff --git a/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_message_from_error_queue.cs b/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_message_from_error_queue.cs index 96f74f044f7..ca4b1c84c0a 100644 --- a/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_message_from_error_queue.cs +++ b/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_message_from_error_queue.cs @@ -12,7 +12,7 @@ public class When_retrying_message_from_error_queue : NServiceBusAcceptanceTest { [Test] - public async Task Should_confirm_successful_processing_to_error_queue() + public async Task Should_confirm_successful_processing() { var retryId = Guid.NewGuid().ToString("D"); From 45c025498bcd25903532853dc1a00ea9b1503060 Mon Sep 17 00:00:00 2001 From: Tim Bussmann Date: Mon, 5 Jul 2021 16:30:38 +0200 Subject: [PATCH 17/21] remove todo comment --- .../Retries/ManualRetryNotificationBehaviorTests.cs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/NServiceBus.Core.Tests/ServicePlatform/Retries/ManualRetryNotificationBehaviorTests.cs b/src/NServiceBus.Core.Tests/ServicePlatform/Retries/ManualRetryNotificationBehaviorTests.cs index 467875cce18..50ccd37b89b 100644 --- a/src/NServiceBus.Core.Tests/ServicePlatform/Retries/ManualRetryNotificationBehaviorTests.cs +++ b/src/NServiceBus.Core.Tests/ServicePlatform/Retries/ManualRetryNotificationBehaviorTests.cs @@ -118,7 +118,5 @@ public override Task Invoke(IRoutingContext context, Func next) return Task.CompletedTask; } } - - //TODO also for control messages } } \ No newline at end of file From cb9ce674234916a5fc6d5972a0fe018a64935c0d Mon Sep 17 00:00:00 2001 From: Tim Bussmann Date: Tue, 6 Jul 2021 13:48:51 +0200 Subject: [PATCH 18/21] use single state instance --- .../Retries/ManualRetryNotificationBehavior.cs | 2 +- .../ServicePlatform/Retries/MarkAsAcknowledgedBehavior.cs | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs b/src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs index d938e858341..670e9a2768e 100644 --- a/src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs +++ b/src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs @@ -19,7 +19,7 @@ public async Task Invoke(ITransportReceiveContext context, Func next) public class State { + public static readonly State Instance = new State(); + + State() + { + } } } } \ No newline at end of file From 4025d15c7e85110a4357aa99da9b46db5b8eb7d6 Mon Sep 17 00:00:00 2001 From: Tim Bussmann Date: Tue, 6 Jul 2021 13:50:31 +0200 Subject: [PATCH 19/21] use DateTimeOffsetHelper for ServiceControl.Retry.Successful header --- .../ServicePlatform/Retries/ManualRetryNotificationBehavior.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs b/src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs index 670e9a2768e..1f1cc2bdf11 100644 --- a/src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs +++ b/src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs @@ -35,7 +35,7 @@ async Task ConfirmSuccessfulRetry() CombGuid.Generate().ToString(), new Dictionary { - { "ServiceControl.Retry.Successful", DateTimeOffset.UtcNow.ToString("O") }, + { "ServiceControl.Retry.Successful", DateTimeOffsetHelper.ToWireFormattedString(DateTimeOffset.UtcNow) }, { RetryUniqueMessageIdHeaderKey, id }, { Headers.ControlMessageHeader, bool.TrueString } }, From 861ec28bac019c019b797d0dc9be2a589f4c9564 Mon Sep 17 00:00:00 2001 From: Tim Bussmann Date: Wed, 7 Jul 2021 14:21:53 +0200 Subject: [PATCH 20/21] fix acceptance test assertion --- .../When_retrying_control_message_from_error_queue.cs | 2 +- .../Recoverability/When_retrying_message_from_error_queue.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_control_message_from_error_queue.cs b/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_control_message_from_error_queue.cs index a2c03af87c8..6d359b2ee54 100644 --- a/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_control_message_from_error_queue.cs +++ b/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_control_message_from_error_queue.cs @@ -30,7 +30,7 @@ public async Task Should_confirm_successful_processing() .Run(); Assert.AreEqual(RetryId, context.ConfirmedRetryId); - var processingTime = DateTimeOffset.Parse(context.RetryProcessingTimestamp); + var processingTime = DateTimeOffsetHelper.ToDateTimeOffset(context.RetryProcessingTimestamp); Assert.That(processingTime, Is.EqualTo(DateTimeOffset.UtcNow).Within(TimeSpan.FromMinutes(1))); } diff --git a/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_message_from_error_queue.cs b/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_message_from_error_queue.cs index ca4b1c84c0a..4920bfe97eb 100644 --- a/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_message_from_error_queue.cs +++ b/src/NServiceBus.AcceptanceTests/Recoverability/When_retrying_message_from_error_queue.cs @@ -34,7 +34,7 @@ public async Task Should_confirm_successful_processing() Assert.IsTrue(context.MessageProcessed); Assert.AreEqual(retryId, context.ConfirmedRetryId); - var processingTime = DateTimeOffset.Parse(context.RetryProcessingTimestamp); + var processingTime = DateTimeOffsetHelper.ToDateTimeOffset(context.RetryProcessingTimestamp); Assert.That(processingTime, Is.EqualTo(DateTimeOffset.UtcNow).Within(TimeSpan.FromMinutes(1))); Assert.IsTrue(context.AuditHeaders.ContainsKey("ServiceControl.Retry.AcknowledgementSent")); } From fe833c1aac5adee3a173a503fc91f3a734691fc0 Mon Sep 17 00:00:00 2001 From: Tim Bussmann Date: Wed, 7 Jul 2021 14:24:48 +0200 Subject: [PATCH 21/21] rename behavior --- ...re_built.Should_preserve_order.approved.txt | 2 +- ...ilt.Should_preserve_order.net5.approved.txt | 2 +- ...cs => RetryAcknowledgementBehaviorTests.cs} | 18 +++++++++--------- .../Retries/PlatformRetryNotifications.cs | 2 +- ...vior.cs => RetryAcknowledgementBehavior.cs} | 2 +- ...re_built.Should_preserve_order.approved.txt | 2 +- ...ilt.Should_preserve_order.net5.approved.txt | 2 +- 7 files changed, 15 insertions(+), 15 deletions(-) rename src/NServiceBus.Core.Tests/ServicePlatform/Retries/{ManualRetryNotificationBehaviorTests.cs => RetryAcknowledgementBehaviorTests.cs} (85%) rename src/NServiceBus.Core/ServicePlatform/Retries/{ManualRetryNotificationBehavior.cs => RetryAcknowledgementBehavior.cs} (95%) diff --git a/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.approved.txt b/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.approved.txt index 2ea91d68889..4d4460c2712 100644 --- a/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.approved.txt +++ b/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.approved.txt @@ -54,7 +54,7 @@ context0 => value(NServiceBus.EnforceUnsubscribeBestPracticesBehavior).Invoke(co context1 => value(NServiceBus.MessageDrivenUnsubscribeTerminator).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IUnsubscribeContext],System.Threading.Tasks.Task])), context0 => value(NServiceBus.AcceptanceTesting.Support.CaptureExceptionBehavior).Invoke(context0, value(System.Func`2[NServiceBus.Pipeline.ITransportReceiveContext,System.Threading.Tasks.Task])), - context1 => value(NServiceBus.ManualRetryNotificationBehavior).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.ITransportReceiveContext,System.Threading.Tasks.Task])), + context1 => value(NServiceBus.RetryAcknowledgementBehavior).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.ITransportReceiveContext,System.Threading.Tasks.Task])), context2 => value(NServiceBus.TransportReceiveToPhysicalMessageConnector).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), context3 => value(NServiceBus.ProcessingStatisticsBehavior).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), context4 => value(NServiceBus.MutateIncomingTransportMessageBehavior).Invoke(context4, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), diff --git a/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net5.approved.txt b/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net5.approved.txt index cb41fd463ae..daa189a4a9f 100644 --- a/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net5.approved.txt +++ b/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net5.approved.txt @@ -54,7 +54,7 @@ context0 => value(NServiceBus.EnforceUnsubscribeBestPracticesBehavior).Invoke(co context1 => value(NServiceBus.MessageDrivenUnsubscribeTerminator).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IUnsubscribeContext],System.Threading.Tasks.Task`1[System.Threading.Tasks.VoidTaskResult]])), context0 => value(NServiceBus.AcceptanceTesting.Support.CaptureExceptionBehavior).Invoke(context0, value(System.Func`2[NServiceBus.Pipeline.ITransportReceiveContext,System.Threading.Tasks.Task])), - context1 => value(NServiceBus.ManualRetryNotificationBehavior).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.ITransportReceiveContext,System.Threading.Tasks.Task])), + context1 => value(NServiceBus.RetryAcknowledgementBehavior).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.ITransportReceiveContext,System.Threading.Tasks.Task])), context2 => value(NServiceBus.TransportReceiveToPhysicalMessageConnector).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), context3 => value(NServiceBus.ProcessingStatisticsBehavior).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), context4 => value(NServiceBus.MutateIncomingTransportMessageBehavior).Invoke(context4, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), diff --git a/src/NServiceBus.Core.Tests/ServicePlatform/Retries/ManualRetryNotificationBehaviorTests.cs b/src/NServiceBus.Core.Tests/ServicePlatform/Retries/RetryAcknowledgementBehaviorTests.cs similarity index 85% rename from src/NServiceBus.Core.Tests/ServicePlatform/Retries/ManualRetryNotificationBehaviorTests.cs rename to src/NServiceBus.Core.Tests/ServicePlatform/Retries/RetryAcknowledgementBehaviorTests.cs index 50ccd37b89b..bb1bb68b766 100644 --- a/src/NServiceBus.Core.Tests/ServicePlatform/Retries/ManualRetryNotificationBehaviorTests.cs +++ b/src/NServiceBus.Core.Tests/ServicePlatform/Retries/RetryAcknowledgementBehaviorTests.cs @@ -11,19 +11,19 @@ using Testing; [TestFixture] - public class ManualRetryNotificationBehaviorTests + public class RetryAcknowledgementBehaviorTests { [Test] public async Task Should_confirm_successful_retries_to_acknowledgement_queue() { const string acknowledgementQueue = "configuredAcknowledgementQueue"; var routingPipeline = new RoutingPipeline(); - var behavior = new ManualRetryNotificationBehavior(); + var behavior = new RetryAcknowledgementBehavior(); var context = SetupTestableContext(routingPipeline); // Set necessary SC headers - context.Message.Headers[ManualRetryNotificationBehavior.RetryUniqueMessageIdHeaderKey] = Guid.NewGuid().ToString("N"); - context.Message.Headers[ManualRetryNotificationBehavior.RetryConfirmationQueueHeaderKey] = acknowledgementQueue; + context.Message.Headers[RetryAcknowledgementBehavior.RetryUniqueMessageIdHeaderKey] = Guid.NewGuid().ToString("N"); + context.Message.Headers[RetryAcknowledgementBehavior.RetryConfirmationQueueHeaderKey] = acknowledgementQueue; await behavior.Invoke(context, _ => Task.CompletedTask); @@ -48,12 +48,12 @@ public async Task Should_confirm_successful_retries_to_acknowledgement_queue() public void Should_not_confirm_when_processing_fails() { var routingPipeline = new RoutingPipeline(); - var behavior = new ManualRetryNotificationBehavior(); + var behavior = new RetryAcknowledgementBehavior(); var context = SetupTestableContext(routingPipeline); // Set necessary SC headers context.Message.Headers["ServiceControl.Retry.UniqueMessageId"] = Guid.NewGuid().ToString("N"); - context.Message.Headers[ManualRetryNotificationBehavior.RetryConfirmationQueueHeaderKey] = "SomeQueue"; + context.Message.Headers[RetryAcknowledgementBehavior.RetryConfirmationQueueHeaderKey] = "SomeQueue"; var exception = new Exception("some pipeline failure"); var thrownException = Assert.ThrowsAsync(async () => await behavior.Invoke(context, _ => Task.FromException(exception))); @@ -67,7 +67,7 @@ public void Should_not_confirm_when_processing_fails() public async Task Should_not_confirm_when_message_does_not_contain_acknowledgementQueue_header() { var routingPipeline = new RoutingPipeline(); - var behavior = new ManualRetryNotificationBehavior(); + var behavior = new RetryAcknowledgementBehavior(); var context = SetupTestableContext(routingPipeline); context.Message.Headers["ServiceControl.Retry.UniqueMessageId"] = Guid.NewGuid().ToString("N"); @@ -82,10 +82,10 @@ public async Task Should_not_confirm_when_message_does_not_contain_acknowledgeme public async Task Should_not_confirm_when_message_does_not_contain_retry_header() { var routingPipeline = new RoutingPipeline(); - var behavior = new ManualRetryNotificationBehavior(); + var behavior = new RetryAcknowledgementBehavior(); var context = SetupTestableContext(routingPipeline); - context.Message.Headers[ManualRetryNotificationBehavior.RetryConfirmationQueueHeaderKey] = "SomeQueue"; + context.Message.Headers[RetryAcknowledgementBehavior.RetryConfirmationQueueHeaderKey] = "SomeQueue"; await behavior.Invoke(context, _ => Task.CompletedTask); diff --git a/src/NServiceBus.Core/ServicePlatform/Retries/PlatformRetryNotifications.cs b/src/NServiceBus.Core/ServicePlatform/Retries/PlatformRetryNotifications.cs index 8e04aa7ca39..e797a760190 100644 --- a/src/NServiceBus.Core/ServicePlatform/Retries/PlatformRetryNotifications.cs +++ b/src/NServiceBus.Core/ServicePlatform/Retries/PlatformRetryNotifications.cs @@ -10,7 +10,7 @@ public class PlatformRetryNotifications : Feature /// protected internal override void Setup(FeatureConfigurationContext context) { - var forkBehavior = new ManualRetryNotificationBehavior(); + var forkBehavior = new RetryAcknowledgementBehavior(); context.Pipeline.Register(forkBehavior, "Provides retry notifications to ServiceControl"); context.Pipeline.Register(new MarkAsAcknowledgedBehavior(), "Adds audit information about direct retry acknowledgement"); } diff --git a/src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs b/src/NServiceBus.Core/ServicePlatform/Retries/RetryAcknowledgementBehavior.cs similarity index 95% rename from src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs rename to src/NServiceBus.Core/ServicePlatform/Retries/RetryAcknowledgementBehavior.cs index 1f1cc2bdf11..7605f0365d7 100644 --- a/src/NServiceBus.Core/ServicePlatform/Retries/ManualRetryNotificationBehavior.cs +++ b/src/NServiceBus.Core/ServicePlatform/Retries/RetryAcknowledgementBehavior.cs @@ -7,7 +7,7 @@ using Routing; using Transport; - class ManualRetryNotificationBehavior : IForkConnector + class RetryAcknowledgementBehavior : IForkConnector { internal const string RetryUniqueMessageIdHeaderKey = "ServiceControl.Retry.UniqueMessageId"; internal const string RetryConfirmationQueueHeaderKey = "ServiceControl.Retry.AcknowledgementQueue"; diff --git a/src/NServiceBus.Learning.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.approved.txt b/src/NServiceBus.Learning.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.approved.txt index dea76e6f7ae..4b13b35e1a9 100644 --- a/src/NServiceBus.Learning.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.approved.txt +++ b/src/NServiceBus.Learning.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.approved.txt @@ -51,7 +51,7 @@ context0 => value(NServiceBus.EnforceUnsubscribeBestPracticesBehavior).Invoke(co context1 => value(NServiceBus.NativeUnsubscribeTerminator).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IUnsubscribeContext],System.Threading.Tasks.Task])), context0 => value(NServiceBus.AcceptanceTesting.Support.CaptureExceptionBehavior).Invoke(context0, value(System.Func`2[NServiceBus.Pipeline.ITransportReceiveContext,System.Threading.Tasks.Task])), - context1 => value(NServiceBus.ManualRetryNotificationBehavior).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.ITransportReceiveContext,System.Threading.Tasks.Task])), + context1 => value(NServiceBus.RetryAcknowledgementBehavior).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.ITransportReceiveContext,System.Threading.Tasks.Task])), context2 => value(NServiceBus.TransportReceiveToPhysicalMessageConnector).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), context3 => value(NServiceBus.ProcessingStatisticsBehavior).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), context4 => value(NServiceBus.MutateIncomingTransportMessageBehavior).Invoke(context4, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), diff --git a/src/NServiceBus.Learning.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net5.approved.txt b/src/NServiceBus.Learning.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net5.approved.txt index d0c3602e0c8..ca306b77450 100644 --- a/src/NServiceBus.Learning.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net5.approved.txt +++ b/src/NServiceBus.Learning.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net5.approved.txt @@ -51,7 +51,7 @@ context0 => value(NServiceBus.EnforceUnsubscribeBestPracticesBehavior).Invoke(co context1 => value(NServiceBus.NativeUnsubscribeTerminator).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IUnsubscribeContext],System.Threading.Tasks.Task`1[System.Threading.Tasks.VoidTaskResult]])), context0 => value(NServiceBus.AcceptanceTesting.Support.CaptureExceptionBehavior).Invoke(context0, value(System.Func`2[NServiceBus.Pipeline.ITransportReceiveContext,System.Threading.Tasks.Task])), - context1 => value(NServiceBus.ManualRetryNotificationBehavior).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.ITransportReceiveContext,System.Threading.Tasks.Task])), + context1 => value(NServiceBus.RetryAcknowledgementBehavior).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.ITransportReceiveContext,System.Threading.Tasks.Task])), context2 => value(NServiceBus.TransportReceiveToPhysicalMessageConnector).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), context3 => value(NServiceBus.ProcessingStatisticsBehavior).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])), context4 => value(NServiceBus.MutateIncomingTransportMessageBehavior).Invoke(context4, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),