Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send retry confirmation to ServiceControl #6094

Merged
merged 21 commits into from
Jul 8, 2021
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,14 @@ context0 => value(NServiceBus.EnforceUnsubscribeBestPracticesBehavior).Invoke(co
context1 => value(NServiceBus.MessageDrivenUnsubscribeTerminator).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IUnsubscribeContext],System.Threading.Tasks.Task])),

context0 => value(NServiceBus.AcceptanceTesting.Support.CaptureExceptionBehavior).Invoke(context0, value(System.Func`2[NServiceBus.Pipeline.ITransportReceiveContext,System.Threading.Tasks.Task])),
context1 => value(NServiceBus.TransportReceiveToPhysicalMessageConnector).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context2 => value(NServiceBus.ProcessingStatisticsBehavior).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context3 => value(NServiceBus.MutateIncomingTransportMessageBehavior).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context4 => value(NServiceBus.SubscriptionReceiverBehavior).Invoke(context4, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context5 => value(NServiceBus.UnitOfWorkBehavior).Invoke(context5, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context6 => value(NServiceBus.DeserializeMessageConnector).Invoke(context6, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context7 => value(NServiceBus.MutateIncomingMessageBehavior).Invoke(context7, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context8 => value(NServiceBus.InferredMessageTypeEnricherBehavior).Invoke(context8, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context9 => value(NServiceBus.LoadHandlersConnector).Invoke(context9, value(System.Func`2[NServiceBus.Pipeline.IInvokeHandlerContext,System.Threading.Tasks.Task])),
context10 => value(NServiceBus.InvokeHandlerTerminator).Invoke(context10, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IInvokeHandlerContext],System.Threading.Tasks.Task])),
context1 => value(NServiceBus.ManualRetryNotificationBehavior).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.ITransportReceiveContext,System.Threading.Tasks.Task])),
context2 => value(NServiceBus.TransportReceiveToPhysicalMessageConnector).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context3 => value(NServiceBus.ProcessingStatisticsBehavior).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context4 => value(NServiceBus.MutateIncomingTransportMessageBehavior).Invoke(context4, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context5 => value(NServiceBus.SubscriptionReceiverBehavior).Invoke(context5, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context6 => value(NServiceBus.UnitOfWorkBehavior).Invoke(context6, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context7 => value(NServiceBus.DeserializeMessageConnector).Invoke(context7, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context8 => value(NServiceBus.MutateIncomingMessageBehavior).Invoke(context8, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context9 => value(NServiceBus.InferredMessageTypeEnricherBehavior).Invoke(context9, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context10 => value(NServiceBus.LoadHandlersConnector).Invoke(context10, value(System.Func`2[NServiceBus.Pipeline.IInvokeHandlerContext,System.Threading.Tasks.Task])),
context11 => value(NServiceBus.InvokeHandlerTerminator).Invoke(context11, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IInvokeHandlerContext],System.Threading.Tasks.Task])),
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,14 @@ context0 => value(NServiceBus.EnforceUnsubscribeBestPracticesBehavior).Invoke(co
context1 => value(NServiceBus.MessageDrivenUnsubscribeTerminator).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IUnsubscribeContext],System.Threading.Tasks.Task`1[System.Threading.Tasks.VoidTaskResult]])),

context0 => value(NServiceBus.AcceptanceTesting.Support.CaptureExceptionBehavior).Invoke(context0, value(System.Func`2[NServiceBus.Pipeline.ITransportReceiveContext,System.Threading.Tasks.Task])),
context1 => value(NServiceBus.TransportReceiveToPhysicalMessageConnector).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context2 => value(NServiceBus.ProcessingStatisticsBehavior).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context3 => value(NServiceBus.MutateIncomingTransportMessageBehavior).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context4 => value(NServiceBus.SubscriptionReceiverBehavior).Invoke(context4, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context5 => value(NServiceBus.UnitOfWorkBehavior).Invoke(context5, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context6 => value(NServiceBus.DeserializeMessageConnector).Invoke(context6, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context7 => value(NServiceBus.MutateIncomingMessageBehavior).Invoke(context7, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context8 => value(NServiceBus.InferredMessageTypeEnricherBehavior).Invoke(context8, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context9 => value(NServiceBus.LoadHandlersConnector).Invoke(context9, value(System.Func`2[NServiceBus.Pipeline.IInvokeHandlerContext,System.Threading.Tasks.Task])),
context10 => value(NServiceBus.InvokeHandlerTerminator).Invoke(context10, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IInvokeHandlerContext],System.Threading.Tasks.Task`1[System.Threading.Tasks.VoidTaskResult]])),
context1 => value(NServiceBus.ManualRetryNotificationBehavior).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.ITransportReceiveContext,System.Threading.Tasks.Task])),
context2 => value(NServiceBus.TransportReceiveToPhysicalMessageConnector).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context3 => value(NServiceBus.ProcessingStatisticsBehavior).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context4 => value(NServiceBus.MutateIncomingTransportMessageBehavior).Invoke(context4, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context5 => value(NServiceBus.SubscriptionReceiverBehavior).Invoke(context5, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context6 => value(NServiceBus.UnitOfWorkBehavior).Invoke(context6, value(System.Func`2[NServiceBus.Pipeline.IIncomingPhysicalMessageContext,System.Threading.Tasks.Task])),
context7 => value(NServiceBus.DeserializeMessageConnector).Invoke(context7, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context8 => value(NServiceBus.MutateIncomingMessageBehavior).Invoke(context8, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context9 => value(NServiceBus.InferredMessageTypeEnricherBehavior).Invoke(context9, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context10 => value(NServiceBus.LoadHandlersConnector).Invoke(context10, value(System.Func`2[NServiceBus.Pipeline.IInvokeHandlerContext,System.Threading.Tasks.Task])),
context11 => value(NServiceBus.InvokeHandlerTerminator).Invoke(context11, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IInvokeHandlerContext],System.Threading.Tasks.Task`1[System.Threading.Tasks.VoidTaskResult]])),
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
namespace NServiceBus.AcceptanceTests.Recoverability
{
using System;
using System.Threading;
using System.Threading.Tasks;
using AcceptanceTesting;
using AcceptanceTesting.Customization;
using EndpointTemplates;
using Features;
using Microsoft.Extensions.DependencyInjection;
using NServiceBus.Pipeline;
using NServiceBus.Routing;
using NUnit.Framework;
using Transport;
using Unicast.Transport;

public class When_retrying_control_message_from_error_queue : NServiceBusAcceptanceTest
{
static readonly string RetryId = Guid.NewGuid().ToString("D");

[Test]
public async Task Should_confirm_successful_processing()
{
Requires.MessageDrivenPubSub(); //required for subscription control message support

var context = await Scenario.Define<Context>()
.WithEndpoint<ProcessingEndpoint>()
.WithEndpoint<RetryAckSpy>()
.Done(c => c.ConfirmedRetryId != null)
.Run();

Assert.AreEqual(RetryId, context.ConfirmedRetryId);
var processingTime = DateTimeOffset.Parse(context.RetryProcessingTimestamp);
Assert.That(processingTime, Is.EqualTo(DateTimeOffset.UtcNow).Within(TimeSpan.FromMinutes(1)));
}

class Context : ScenarioContext
{
public string ConfirmedRetryId { get; set; }
public string RetryProcessingTimestamp { get; set; }
}

class ProcessingEndpoint : EndpointConfigurationBuilder
{
public ProcessingEndpoint() => EndpointSetup<DefaultServer>(c =>
{
c.EnableFeature<ControlMessageFeature>();
});

class ControlMessageFeature : Feature
{
protected override void Setup(FeatureConfigurationContext context)
{
context.RegisterStartupTask(s =>
new ControlMessageSender(s.GetRequiredService<IMessageDispatcher>()));
}
}

class ControlMessageSender : FeatureStartupTask
{
IMessageDispatcher dispatcher;

public ControlMessageSender(IMessageDispatcher dispatcher)
{
this.dispatcher = dispatcher;
}

protected override async Task OnStart(IMessageSession session, CancellationToken cancellationToken = default)
{
var controlMessage = ControlMessageFactory.Create(MessageIntentEnum.Subscribe);
// set necessary subscription control message headers
controlMessage.Headers.Add(Headers.SubscriptionMessageType, typeof(object).AssemblyQualifiedName);
controlMessage.Headers.Add(Headers.ReplyToAddress, "TestSubscriberAddress");
// set SC headers
controlMessage.Headers.Add("ServiceControl.Retry.UniqueMessageId", RetryId);
controlMessage.Headers.Add("ServiceControl.Retry.AcknowledgementQueue", Conventions.EndpointNamingConvention(typeof(RetryAckSpy)));
var messageOperation = new TransportOperation(controlMessage, new UnicastAddressTag(Conventions.EndpointNamingConvention(typeof(ProcessingEndpoint))));
await dispatcher.Dispatch(new TransportOperations(messageOperation), new TransportTransaction(), cancellationToken);
}

protected override Task OnStop(IMessageSession session, CancellationToken cancellationToken = default) => Task.CompletedTask;
}
}

class RetryAckSpy : EndpointConfigurationBuilder
{
public RetryAckSpy() => EndpointSetup<DefaultServer>((e, r) => e.Pipeline.Register(
new ControlMessageBehavior(r.ScenarioContext as Context),
"Checks for confirmation control message"));

class ControlMessageBehavior : Behavior<IIncomingPhysicalMessageContext>
{
Context testContext;

public ControlMessageBehavior(Context testContext)
{
this.testContext = testContext;
}

public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
{
await next();

testContext.ConfirmedRetryId = context.MessageHeaders["ServiceControl.Retry.UniqueMessageId"];
testContext.RetryProcessingTimestamp = context.MessageHeaders["ServiceControl.Retry.Successful"];
}
}
}
}
}
Loading