Skip to content

Commit

Permalink
Merge pull request #6094 from Particular/sc-retry-notifications
Browse files Browse the repository at this point in the history
Send retry confirmation to SC
  • Loading branch information
tmasternak authored Jul 8, 2021
2 parents bca04ee + fe833c1 commit 66d8162
Show file tree
Hide file tree
Showing 11 changed files with 535 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,14 @@ context0 => value(NServiceBus.EnforceUnsubscribeBestPracticesBehavior).Invoke(co
context1 => value(NServiceBus.MessageDrivenUnsubscribeTerminator).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IUnsubscribeContext],System.Threading.Tasks.Task])),

context0 => value(NServiceBus.AcceptanceTesting.Support.CaptureExceptionBehavior).Invoke(context0, value(System.Func`2[NServiceBus.Pipeline.ITransportReceiveContext,System.Threading.Tasks.Task])),
context1 => value(NServiceBus.TransportReceiveToPhysicalMessageConnector).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context2 => value(NServiceBus.ProcessingStatisticsBehavior).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context3 => value(NServiceBus.MutateIncomingTransportMessageBehavior).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context4 => value(NServiceBus.SubscriptionReceiverBehavior).Invoke(context4, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context5 => value(NServiceBus.UnitOfWorkBehavior).Invoke(context5, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context6 => value(NServiceBus.DeserializeMessageConnector).Invoke(context6, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context7 => value(NServiceBus.MutateIncomingMessageBehavior).Invoke(context7, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context8 => value(NServiceBus.InferredMessageTypeEnricherBehavior).Invoke(context8, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context9 => value(NServiceBus.LoadHandlersConnector).Invoke(context9, value(System.Func`2[NServiceBus.Pipeline.IInvokeHandlerContext,System.Threading.Tasks.Task])),
context10 => value(NServiceBus.InvokeHandlerTerminator).Invoke(context10, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IInvokeHandlerContext],System.Threading.Tasks.Task])),
context1 => value(NServiceBus.RetryAcknowledgementBehavior).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.ITransportReceiveContext,System.Threading.Tasks.Task])),
context2 => value(NServiceBus.TransportReceiveToPhysicalMessageConnector).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context3 => value(NServiceBus.ProcessingStatisticsBehavior).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context4 => value(NServiceBus.MutateIncomingTransportMessageBehavior).Invoke(context4, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context5 => value(NServiceBus.SubscriptionReceiverBehavior).Invoke(context5, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context6 => value(NServiceBus.UnitOfWorkBehavior).Invoke(context6, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context7 => value(NServiceBus.DeserializeMessageConnector).Invoke(context7, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context8 => value(NServiceBus.MutateIncomingMessageBehavior).Invoke(context8, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context9 => value(NServiceBus.InferredMessageTypeEnricherBehavior).Invoke(context9, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context10 => value(NServiceBus.LoadHandlersConnector).Invoke(context10, value(System.Func`2[NServiceBus.Pipeline.IInvokeHandlerContext,System.Threading.Tasks.Task])),
context11 => value(NServiceBus.InvokeHandlerTerminator).Invoke(context11, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IInvokeHandlerContext],System.Threading.Tasks.Task])),
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,14 @@ context0 => value(NServiceBus.EnforceUnsubscribeBestPracticesBehavior).Invoke(co
context1 => value(NServiceBus.MessageDrivenUnsubscribeTerminator).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IUnsubscribeContext],System.Threading.Tasks.Task`1[System.Threading.Tasks.VoidTaskResult]])),

context0 => value(NServiceBus.AcceptanceTesting.Support.CaptureExceptionBehavior).Invoke(context0, value(System.Func`2[NServiceBus.Pipeline.ITransportReceiveContext,System.Threading.Tasks.Task])),
context1 => value(NServiceBus.TransportReceiveToPhysicalMessageConnector).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context2 => value(NServiceBus.ProcessingStatisticsBehavior).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context3 => value(NServiceBus.MutateIncomingTransportMessageBehavior).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context4 => value(NServiceBus.SubscriptionReceiverBehavior).Invoke(context4, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context5 => value(NServiceBus.UnitOfWorkBehavior).Invoke(context5, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context6 => value(NServiceBus.DeserializeMessageConnector).Invoke(context6, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context7 => value(NServiceBus.MutateIncomingMessageBehavior).Invoke(context7, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context8 => value(NServiceBus.InferredMessageTypeEnricherBehavior).Invoke(context8, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context9 => value(NServiceBus.LoadHandlersConnector).Invoke(context9, value(System.Func`2[NServiceBus.Pipeline.IInvokeHandlerContext,System.Threading.Tasks.Task])),
context10 => value(NServiceBus.InvokeHandlerTerminator).Invoke(context10, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IInvokeHandlerContext],System.Threading.Tasks.Task`1[System.Threading.Tasks.VoidTaskResult]])),
context1 => value(NServiceBus.RetryAcknowledgementBehavior).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.ITransportReceiveContext,System.Threading.Tasks.Task])),
context2 => value(NServiceBus.TransportReceiveToPhysicalMessageConnector).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context3 => value(NServiceBus.ProcessingStatisticsBehavior).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context4 => value(NServiceBus.MutateIncomingTransportMessageBehavior).Invoke(context4, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context5 => value(NServiceBus.SubscriptionReceiverBehavior).Invoke(context5, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context6 => value(NServiceBus.UnitOfWorkBehavior).Invoke(context6, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context7 => value(NServiceBus.DeserializeMessageConnector).Invoke(context7, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context8 => value(NServiceBus.MutateIncomingMessageBehavior).Invoke(context8, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context9 => value(NServiceBus.InferredMessageTypeEnricherBehavior).Invoke(context9, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context10 => value(NServiceBus.LoadHandlersConnector).Invoke(context10, value(System.Func`2[NServiceBus.Pipeline.IInvokeHandlerContext,System.Threading.Tasks.Task])),
context11 => value(NServiceBus.InvokeHandlerTerminator).Invoke(context11, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IInvokeHandlerContext],System.Threading.Tasks.Task`1[System.Threading.Tasks.VoidTaskResult]])),
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
namespace NServiceBus.AcceptanceTests.Recoverability
{
using System;
using System.Threading;
using System.Threading.Tasks;
using AcceptanceTesting;
using AcceptanceTesting.Customization;
using EndpointTemplates;
using Features;
using Microsoft.Extensions.DependencyInjection;
using NServiceBus.Pipeline;
using NServiceBus.Routing;
using NUnit.Framework;
using Transport;
using Unicast.Transport;

public class When_retrying_control_message_from_error_queue : NServiceBusAcceptanceTest
{
static readonly string RetryId = Guid.NewGuid().ToString("D");

[Test]
public async Task Should_confirm_successful_processing()
{
Requires.MessageDrivenPubSub(); //required for subscription control message support

var context = await Scenario.Define<Context>()
.WithEndpoint<ProcessingEndpoint>()
.WithEndpoint<RetryAckSpy>()
.Done(c => c.ConfirmedRetryId != null)
.Run();

Assert.AreEqual(RetryId, context.ConfirmedRetryId);
var processingTime = DateTimeOffsetHelper.ToDateTimeOffset(context.RetryProcessingTimestamp);
Assert.That(processingTime, Is.EqualTo(DateTimeOffset.UtcNow).Within(TimeSpan.FromMinutes(1)));
}

class Context : ScenarioContext
{
public string ConfirmedRetryId { get; set; }
public string RetryProcessingTimestamp { get; set; }
}

class ProcessingEndpoint : EndpointConfigurationBuilder
{
public ProcessingEndpoint() => EndpointSetup<DefaultServer>(c =>
{
c.EnableFeature<ControlMessageFeature>();
});

class ControlMessageFeature : Feature
{
protected override void Setup(FeatureConfigurationContext context)
{
context.RegisterStartupTask(s =>
new ControlMessageSender(s.GetRequiredService<IMessageDispatcher>()));
}
}

class ControlMessageSender : FeatureStartupTask
{
IMessageDispatcher dispatcher;

public ControlMessageSender(IMessageDispatcher dispatcher)
{
this.dispatcher = dispatcher;
}

protected override async Task OnStart(IMessageSession session, CancellationToken cancellationToken = default)
{
var controlMessage = ControlMessageFactory.Create(MessageIntentEnum.Subscribe);
// set necessary subscription control message headers
controlMessage.Headers.Add(Headers.SubscriptionMessageType, typeof(object).AssemblyQualifiedName);
controlMessage.Headers.Add(Headers.ReplyToAddress, "TestSubscriberAddress");
// set SC headers
controlMessage.Headers.Add("ServiceControl.Retry.UniqueMessageId", RetryId);
controlMessage.Headers.Add("ServiceControl.Retry.AcknowledgementQueue", Conventions.EndpointNamingConvention(typeof(RetryAckSpy)));
var messageOperation = new TransportOperation(controlMessage, new UnicastAddressTag(Conventions.EndpointNamingConvention(typeof(ProcessingEndpoint))));
await dispatcher.Dispatch(new TransportOperations(messageOperation), new TransportTransaction(), cancellationToken);
}

protected override Task OnStop(IMessageSession session, CancellationToken cancellationToken = default) => Task.CompletedTask;
}
}

class RetryAckSpy : EndpointConfigurationBuilder
{
public RetryAckSpy() => EndpointSetup<DefaultServer>((e, r) => e.Pipeline.Register(
new ControlMessageBehavior(r.ScenarioContext as Context),
"Checks for confirmation control message"));

class ControlMessageBehavior : Behavior<IIncomingPhysicalMessageContext>
{
Context testContext;

public ControlMessageBehavior(Context testContext)
{
this.testContext = testContext;
}

public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
{
await next();

testContext.ConfirmedRetryId = context.MessageHeaders["ServiceControl.Retry.UniqueMessageId"];
testContext.RetryProcessingTimestamp = context.MessageHeaders["ServiceControl.Retry.Successful"];
}
}
}
}
}
Loading

0 comments on commit 66d8162

Please sign in to comment.