Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port extendable options leak fix to master #6349

Merged
merged 6 commits into from
Apr 25, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
namespace NServiceBus.AcceptanceTests.Core.Routing
{
using System;
using System.Threading.Tasks;
using AcceptanceTesting;
using NServiceBus.AcceptanceTesting.Customization;
using EndpointTemplates;
using NServiceBus.Pipeline;
using NUnit.Framework;

public class When_inner_send_with_outer_immediate_dispatch : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_not_apply_immediate_dispatch_to_inner_send()
{
var context = await Scenario.Define<Context>()
.WithEndpoint<EndpointA>(c => c
.DoNotFailOnErrorMessages()
.When(s => s.SendLocal(new TriggerMessage())))
.WithEndpoint<EndpointB>()
.WithEndpoint<EndpointC>()
.Done(c => c.MessageBReceived)
.Run(TimeSpan.FromSeconds(15));

Assert.IsTrue(context.MessageBReceived);
Assert.IsFalse(context.MessageCReceived);
}

class Context : ScenarioContext
{
public bool MessageBReceived { get; set; }
public bool MessageCReceived { get; set; }
}

class EndpointA : EndpointConfigurationBuilder
{
public EndpointA()
{
EndpointSetup<DefaultServer>(c =>
{
c.ConfigureRouting().RouteToEndpoint(typeof(MessageToEndpointB), typeof(EndpointB));
c.ConfigureRouting().RouteToEndpoint(typeof(MessageToEndpointC), typeof(EndpointC));
c.Pipeline.Register(new OutgoingBehaviorWithSend(), "sends a message as part of an incoming message pipeline");
});
}

class TriggerMessageHandler : IHandleMessages<TriggerMessage>
{
public Task Handle(TriggerMessage message, IMessageHandlerContext context)
{
// "outer send"
var sendOptions = new SendOptions();
sendOptions.RequireImmediateDispatch();
return context.Send(new MessageToEndpointB(), sendOptions);
}
}

class OutgoingBehaviorWithSend : Behavior<IOutgoingLogicalMessageContext>
{
public override async Task Invoke(IOutgoingLogicalMessageContext context, Func<Task> next)
{
await next();
if (context.Message.MessageType == typeof(MessageToEndpointB))
{
// "inner send"
await context.Send(new MessageToEndpointC()); // no immediate dispatch
throw new Exception(); // batching should prevent message C from being dispatched.
}
}
}
}

class EndpointB : EndpointConfigurationBuilder
{
public EndpointB() => EndpointSetup<DefaultServer>();

public class EndpointBHandler : IHandleMessages<MessageToEndpointB>
{
Context testContext;

public EndpointBHandler(Context testContext) => this.testContext = testContext;

public Task Handle(MessageToEndpointB message, IMessageHandlerContext context)
{
testContext.MessageBReceived = true;
return Task.FromResult(0);
}
}
}

class EndpointC : EndpointConfigurationBuilder
{
public EndpointC() => EndpointSetup<DefaultServer>();

public class EndpointCHandler : IHandleMessages<MessageToEndpointC>
{
Context testContext;

public EndpointCHandler(Context testContext) => this.testContext = testContext;

public Task Handle(MessageToEndpointC message, IMessageHandlerContext context)
{
testContext.MessageCReceived = true;
return Task.FromResult(0);
}
}
}

public class MessageToEndpointB : IMessage
{
}

public class MessageToEndpointC : IMessage
{
}

public class TriggerMessage : IMessage
{
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
namespace NServiceBus.AcceptanceTests.Core.Routing
{
using System;
using System.Threading.Tasks;
using AcceptanceTesting;
using AcceptanceTesting.Customization;
using EndpointTemplates;
using NServiceBus.Pipeline;
using NUnit.Framework;

public class When_sending_from_outgoing_pipeline : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_use_default_routing_when_empty_send_options()
{
var context = await Scenario.Define<Context>()
.WithEndpoint<EndpointA>(e => e
.CustomConfig(c =>
{
c.Pipeline.Register(new SendingBehaviorUsingDefaultRouting(), "outgoing pipeline behavior sending messages");
c.ConfigureRouting().RouteToEndpoint(typeof(BehaviorMessage), typeof(EndpointB));
})
.When(s => s.SendLocal(new LocalMessage())))
.WithEndpoint<EndpointB>()
.Done(c => c.LocalMessageReceived && c.BehaviorMessageReceived)
.Run(TimeSpan.FromSeconds(15));

Assert.True(context.LocalMessageReceived);
Assert.True(context.BehaviorMessageReceived);
}

[Test]
public async Task Should_apply_send_options_routing()
{
var context = await Scenario.Define<Context>()
.WithEndpoint<EndpointA>(e => e
.CustomConfig(c =>
{
c.Pipeline.Register(new SendingBehaviorUsingRoutingOverride(), "outgoing pipeline behavior sending messages");
})
.When(s => s.SendLocal(new LocalMessage())))
.WithEndpoint<EndpointB>()
.Done(c => c.LocalMessageReceived && c.BehaviorMessageReceived)
.Run(TimeSpan.FromSeconds(15));

Assert.True(context.LocalMessageReceived);
Assert.True(context.BehaviorMessageReceived);
}

public class Context : ScenarioContext
{
public bool BehaviorMessageReceived { get; set; }
public bool LocalMessageReceived { get; set; }
}

public class EndpointA : EndpointConfigurationBuilder
{
public EndpointA() => EndpointSetup<DefaultServer>();

public class LocalMessageHandler : IHandleMessages<LocalMessage>
{
Context testContext;

public LocalMessageHandler(Context testContext) => this.testContext = testContext;

public Task Handle(LocalMessage message, IMessageHandlerContext context)
{
testContext.LocalMessageReceived = true;
return Task.FromResult(0);
}
}
}

public class EndpointB : EndpointConfigurationBuilder
{
public EndpointB() => EndpointSetup<DefaultServer>();

public class BehaviorMessageHandler : IHandleMessages<BehaviorMessage>
{
Context testContext;

public BehaviorMessageHandler(Context testContext) => this.testContext = testContext;

public Task Handle(BehaviorMessage message, IMessageHandlerContext context)
{
testContext.BehaviorMessageReceived = true;
return Task.FromResult(0);
}
}
}

public class SendingBehaviorUsingDefaultRouting : Behavior<IOutgoingLogicalMessageContext>
{
public override async Task Invoke(IOutgoingLogicalMessageContext context, Func<Task> next)
{
await next();
if (context.Message.MessageType != typeof(BehaviorMessage)) // prevent infinite loop
{
await context.Send(new BehaviorMessage(), new SendOptions()); // use empty SendOptions
}
}
}

public class SendingBehaviorUsingRoutingOverride : Behavior<IOutgoingLogicalMessageContext>
{
public override async Task Invoke(IOutgoingLogicalMessageContext context, Func<Task> next)
{
await next();
if (context.Message.MessageType != typeof(BehaviorMessage)) // prevent infinite loop
{
var sendOptions = new SendOptions();
sendOptions.SetDestination(Conventions.EndpointNamingConvention(typeof(EndpointB))); // Configure routing on SendOptions
await context.Send(new BehaviorMessage(), sendOptions);
}
}
}

public class LocalMessage : IMessage
{
}

public class BehaviorMessage : IMessage
{
}
}
}
Loading