-
Notifications
You must be signed in to change notification settings - Fork 647
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Prevent DeliveryConstraints from leaking in nested operations (#6356)
authored-by: Tim Bussmann <[email protected]>
- Loading branch information
1 parent
527d0d5
commit 4330038
Showing
12 changed files
with
527 additions
and
1 deletion.
There are no files selected for viewing
129 changes: 129 additions & 0 deletions
129
src/NServiceBus.AcceptanceTests/Core/Pipeline/When_setting_ttbr_in_outer_publish.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
namespace NServiceBus.AcceptanceTests.Core.Pipeline | ||
{ | ||
using System; | ||
using System.Threading.Tasks; | ||
using AcceptanceTesting; | ||
using DeliveryConstraints; | ||
using EndpointTemplates; | ||
using NServiceBus.Pipeline; | ||
using NUnit.Framework; | ||
using Performance.TimeToBeReceived; | ||
|
||
public class When_setting_ttbr_in_outer_publish : NServiceBusAcceptanceTest | ||
{ | ||
[Test] | ||
public async Task Should_not_apply_ttbr_to_inner_publish() | ||
{ | ||
Requires.NativePubSubSupport(); | ||
|
||
var context = await Scenario.Define<Context>() | ||
.WithEndpoint<PublisherWithTtbr>(e => e | ||
.When(s => s.Publish(new OuterEvent()))) | ||
.WithEndpoint<Subscriber>() | ||
.Done(c => c.OuterEventReceived && c.InnerEventReceived) | ||
.Run(); | ||
|
||
Assert.IsTrue(context.OuterEventHasTtbr, "Outer event should have TTBR settings applied"); | ||
Assert.IsFalse(context.InnerEventHasTtbr, "Inner event should not have TTBR settings applied"); | ||
} | ||
|
||
class Context : ScenarioContext | ||
{ | ||
public bool InnerEventReceived { get; set; } | ||
public bool OuterEventReceived { get; set; } | ||
public bool OuterEventHasTtbr { get; set; } | ||
public bool InnerEventHasTtbr { get; set; } | ||
} | ||
|
||
class PublisherWithTtbr : EndpointConfigurationBuilder | ||
{ | ||
public PublisherWithTtbr() | ||
{ | ||
EndpointSetup<DefaultServer>((c, r) => | ||
{ | ||
c.Pipeline.Register(new InnerPublishBehavior(), "publishes an additional event when publishing an OuterEvent"); | ||
c.Pipeline.Register(new TtbrObserver((Context)r.ScenarioContext), "Checks outgoing messages for their TTBR setting"); | ||
}); | ||
} | ||
|
||
// Behavior needs to be in the OutgoingPhysical stage as the TTBR settings are applied in the OutgoingLogical stage | ||
class InnerPublishBehavior : Behavior<IOutgoingPhysicalMessageContext> | ||
{ | ||
public override async Task Invoke(IOutgoingPhysicalMessageContext context, Func<Task> next) | ||
{ | ||
await next(); | ||
|
||
if (context.Extensions.Get<OutgoingLogicalMessage>().MessageType == typeof(OuterEvent)) | ||
{ | ||
await context.Publish(new InnerEvent()); | ||
} | ||
} | ||
} | ||
|
||
class TtbrObserver : Behavior<IDispatchContext> | ||
{ | ||
Context testContext; | ||
|
||
public TtbrObserver(Context testContext) | ||
{ | ||
this.testContext = testContext; | ||
} | ||
|
||
public override Task Invoke(IDispatchContext context, Func<Task> next) | ||
{ | ||
var outgoingMessage = context.Extensions.Get<OutgoingLogicalMessage>(); | ||
if (outgoingMessage.MessageType == typeof(OuterEvent)) | ||
{ | ||
testContext.OuterEventHasTtbr = context.Extensions.TryGetDeliveryConstraint<DiscardIfNotReceivedBefore>(out var _); | ||
} | ||
|
||
if (outgoingMessage.MessageType == typeof(InnerEvent)) | ||
{ | ||
testContext.InnerEventHasTtbr = context.Extensions.TryGetDeliveryConstraint<DiscardIfNotReceivedBefore>(out var _); | ||
} | ||
|
||
return next(); | ||
} | ||
} | ||
} | ||
|
||
class Subscriber : EndpointConfigurationBuilder | ||
{ | ||
public Subscriber() | ||
{ | ||
EndpointSetup<DefaultServer>(); | ||
} | ||
|
||
class EventHandler : IHandleMessages<OuterEvent>, IHandleMessages<InnerEvent> | ||
{ | ||
Context testContext; | ||
|
||
public EventHandler(Context testContext) | ||
{ | ||
this.testContext = testContext; | ||
} | ||
|
||
public Task Handle(OuterEvent message, IMessageHandlerContext context) | ||
{ | ||
testContext.OuterEventReceived = true; | ||
return Task.FromResult(0); | ||
} | ||
|
||
public Task Handle(InnerEvent message, IMessageHandlerContext context) | ||
{ | ||
testContext.InnerEventReceived = true; | ||
return Task.FromResult(0); | ||
} | ||
} | ||
} | ||
|
||
[TimeToBeReceived("00:30:00")] | ||
public class OuterEvent : IEvent | ||
{ | ||
} | ||
|
||
public class InnerEvent : IEvent | ||
{ | ||
} | ||
} | ||
} |
120 changes: 120 additions & 0 deletions
120
src/NServiceBus.AcceptanceTests/DelayedDelivery/When_deferring_outer_send.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
namespace NServiceBus.AcceptanceTests.DelayedDelivery | ||
{ | ||
using System; | ||
using System.Threading.Tasks; | ||
using AcceptanceTesting; | ||
using AcceptanceTesting.Customization; | ||
using EndpointTemplates; | ||
using Features; | ||
using NServiceBus.Pipeline; | ||
using NUnit.Framework; | ||
|
||
public class When_deferring_outer_send : NServiceBusAcceptanceTest | ||
{ | ||
[Test] | ||
public async Task Should_not_defer_inner_send() | ||
{ | ||
var context = await Scenario.Define<Context>() | ||
.WithEndpoint<SenderWithNestedSend>(e => e | ||
.When(s => | ||
{ | ||
var sendOptions = new SendOptions(); | ||
sendOptions.DelayDeliveryWith(TimeSpan.FromSeconds(2)); | ||
return s.Send(new DelayedMessage(), sendOptions); | ||
})) | ||
.WithEndpoint<Receiver>() | ||
.Done(c => c.ReceivedNonDelayedMessage && c.ReceivedDelayedMessage) | ||
.Run(); | ||
|
||
Assert.IsTrue(context.DelayedMessageDelayed, "should delay the message sent with 'DelayDeliveryWith'"); | ||
Assert.IsFalse(context.NonDelayedMessageDelayed, "should not delay the message sent with default options"); | ||
} | ||
|
||
class Context : ScenarioContext | ||
{ | ||
public bool DelayedMessageDelayed { get; set; } | ||
public bool NonDelayedMessageDelayed { get; set; } | ||
|
||
public bool ReceivedNonDelayedMessage { get; set; } | ||
public bool ReceivedDelayedMessage { get; set; } | ||
} | ||
|
||
class SenderWithNestedSend : EndpointConfigurationBuilder | ||
{ | ||
public SenderWithNestedSend() => EndpointSetup<DefaultServer>((c, r) => | ||
{ | ||
c.Pipeline.Register(new NestedSendBehavior(), "Sends an additional message when sending a delayed message"); | ||
c.ConfigureTransport().Routing().RouteToEndpoint(typeof(DelayedMessage).Assembly, Conventions.EndpointNamingConvention(typeof(Receiver))); | ||
if (!TestSuiteConstraints.Current.SupportsNativeDeferral) | ||
{ | ||
c.EnableFeature<TimeoutManager>(); | ||
} | ||
}); | ||
|
||
class NestedSendBehavior : Behavior<IOutgoingSendContext> | ||
{ | ||
public override async Task Invoke(IOutgoingSendContext context, Func<Task> next) | ||
{ | ||
await next(); | ||
if (context.Message.MessageType == typeof(DelayedMessage)) | ||
{ | ||
await context.Send(new NonDelayedMessage()); // use default options | ||
} | ||
} | ||
} | ||
} | ||
|
||
class Receiver : EndpointConfigurationBuilder | ||
{ | ||
public Receiver() => EndpointSetup<DefaultServer>(); | ||
|
||
class DelayedMessageHandler : IHandleMessages<DelayedMessage>, IHandleMessages<NonDelayedMessage> | ||
{ | ||
Context testContext; | ||
|
||
public DelayedMessageHandler(Context testContext) | ||
{ | ||
this.testContext = testContext; | ||
} | ||
|
||
public Task Handle(DelayedMessage message, IMessageHandlerContext context) | ||
{ | ||
testContext.ReceivedDelayedMessage = true; | ||
if (TestSuiteConstraints.Current.SupportsNativeDeferral) | ||
{ | ||
testContext.DelayedMessageDelayed = context.MessageHeaders.TryGetValue(Headers.DeliverAt, out var _); // header value not set when routing to timeout manager | ||
} | ||
else | ||
{ | ||
testContext.DelayedMessageDelayed = context.MessageHeaders.TryGetValue("NServiceBus.Timeout.RouteExpiredTimeoutTo", out var _); // header value when routing to timeout manager queue | ||
} | ||
|
||
return Task.FromResult(0); | ||
} | ||
|
||
public Task Handle(NonDelayedMessage message, IMessageHandlerContext context) | ||
{ | ||
testContext.ReceivedNonDelayedMessage = true; | ||
if (TestSuiteConstraints.Current.SupportsNativeDeferral) | ||
{ | ||
testContext.NonDelayedMessageDelayed = context.MessageHeaders.TryGetValue(Headers.DeliverAt, out var _); // header value not set when routing to timeout manager | ||
} | ||
else | ||
{ | ||
testContext.NonDelayedMessageDelayed = context.MessageHeaders.TryGetValue("NServiceBus.Timeout.RouteExpiredTimeoutTo", out var _); // header value when routing to timeout manager queue | ||
} | ||
|
||
return Task.FromResult(0); | ||
} | ||
} | ||
} | ||
|
||
public class DelayedMessage : IMessage | ||
{ | ||
} | ||
|
||
public class NonDelayedMessage : IMessage | ||
{ | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.