Skip to content

Commit

Permalink
Added DeliverAt to message header for auditing and metrics
Browse files Browse the repository at this point in the history
Updated approval file
  • Loading branch information
TravisNickels committed Feb 17, 2022
1 parent 311525e commit 2ca686b
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 6 deletions.
2 changes: 1 addition & 1 deletion src/NServiceBus.AcceptanceTests/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
using NUnit.Framework;

[assembly: Parallelizable(ParallelScope.Fixtures)]
[assembly: Parallelizable(ParallelScope.Fixtures)]
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ namespace NServiceBus
public const string CorrelationId = "NServiceBus.CorrelationId";
public const string DelayedRetries = "NServiceBus.Retries";
public const string DelayedRetriesTimestamp = "NServiceBus.Retries.Timestamp";
public const string DeliverAt = "NServiceBus.DeliverAt";
public const string DestinationSites = "NServiceBus.DestinationSites";
public const string EnclosedMessageTypes = "NServiceBus.EnclosedMessageTypes";
public const string HasLicenseExpired = "$.diagnostics.license.expired";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Extensibility;
using NServiceBus.Routing;
using NUnit.Framework;
using Testing;
Expand Down Expand Up @@ -55,12 +56,58 @@ public async Task Should_not_override_nsb_version_header()
Assert.AreEqual(nsbVersion, message.Headers[Headers.NServiceBusVersion]);
}

static async Task<OutgoingMessage> InvokeBehavior(Dictionary<string, string> headers = null)
[Test]
public async Task Should_set_deliver_At_header_when_delay_delivery_with_setAsync()
{
var options = new SendOptions();
var delayTime = TimeSpan.FromSeconds(2);
options.DelayDeliveryWith(delayTime);
var message = await InvokeBehavior(null, options);
var expectedTime = DateTimeExtensions.ToUtcDateTime(message.Headers[Headers.TimeSent]).Add(delayTime);

Assert.True(message.Headers.ContainsKey(Headers.DeliverAt));
Assert.AreEqual(DateTimeExtensions.ToWireFormattedString(expectedTime), message.Headers[Headers.DeliverAt]);
}

[Test]
public async Task Should_set_deliver_At_header_when_do_not_deliver_before_setAsync()
{
var options = new SendOptions();
var doNotDeliverBefore = DateTimeOffset.UtcNow;
options.DoNotDeliverBefore(doNotDeliverBefore);
var message = await InvokeBehavior(null, options);

Assert.True(message.Headers.ContainsKey(Headers.DeliverAt));
Assert.AreEqual(DateTimeExtensions.ToWireFormattedString(doNotDeliverBefore.UtcDateTime), message.Headers[Headers.DeliverAt]);
}

[Test]
public async Task Should_not_override_deliver_at_headerAsync()
{
var options = new SendOptions();
var doNotDeliverBefore = DateTimeOffset.UtcNow;
options.DelayDeliveryWith(TimeSpan.FromSeconds(2));
var message = await InvokeBehavior(new Dictionary<string, string>
{
{Headers.DeliverAt, DateTimeExtensions.ToWireFormattedString(doNotDeliverBefore.UtcDateTime)}
}, options);

Assert.True(message.Headers.ContainsKey(Headers.DeliverAt));
Assert.AreEqual(DateTimeExtensions.ToWireFormattedString(doNotDeliverBefore.UtcDateTime), message.Headers[Headers.DeliverAt]);
}

static async Task<OutgoingMessage> InvokeBehavior(Dictionary<string, string> headers = null, SendOptions options = null)
{
var message = new OutgoingMessage("id", headers ?? new Dictionary<string, string>(), null);
var stash = new ContextBag();

if (options != null)
{
stash.Set(options);
}

await new AttachSenderRelatedInfoOnMessageBehavior()
.Invoke(new TestableRoutingContext { Message = message, RoutingStrategies = new List<UnicastRoutingStrategy> { new UnicastRoutingStrategy("_") } }, _ => TaskEx.CompletedTask);
.Invoke(new TestableRoutingContext { Message = message, Extensions = stash, RoutingStrategies = new List<UnicastRoutingStrategy> { new UnicastRoutingStrategy("_") } }, _ => TaskEx.CompletedTask);

return message;
}
Expand Down
5 changes: 5 additions & 0 deletions src/NServiceBus.Core/Headers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ public static class Headers
/// </summary>
public const string TimeSent = "NServiceBus.TimeSent";

/// <summary>
/// The time this message should be delivered to the endpoint to start processing.
/// </summary>
public const string DeliverAt = "NServiceBus.DeliverAt";

/// <summary>
/// Id of the message that caused this message to be sent.
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion src/NServiceBus.Core/JanitorSettings.cs
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@

[assembly: Janitor.SkipWeavingNamespace("LightInject")]
[assembly: Janitor.SkipWeavingNamespace("LightInject")]
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@ namespace NServiceBus
{
using System;
using System.Threading.Tasks;
using DelayedDelivery;
using Pipeline;

class AttachSenderRelatedInfoOnMessageBehavior : IBehavior<IRoutingContext, IRoutingContext>
{
public Task Invoke(IRoutingContext context, Func<IRoutingContext, Task> next)
{
var message = context.Message;
var utcNow = DateTime.UtcNow;

// This behavior executes in the case of auditing as well, so assuming there are no delayed delivery constraints set,
// no existing header should be overwritten, otherwise the message being audited would be modified improperly.

if (!message.Headers.ContainsKey(Headers.NServiceBusVersion))
{
Expand All @@ -17,7 +22,27 @@ public Task Invoke(IRoutingContext context, Func<IRoutingContext, Task> next)

if (!message.Headers.ContainsKey(Headers.TimeSent))
{
message.Headers[Headers.TimeSent] = DateTimeExtensions.ToWireFormattedString(DateTime.UtcNow);
message.Headers[Headers.TimeSent] = DateTimeExtensions.ToWireFormattedString(utcNow);
}

if (!message.Headers.ContainsKey(Headers.DeliverAt))
{
if (context.Extensions.TryGet<SendOptions>(out var options))
{
if (options != null)
{
if (options.DelayedDeliveryConstraint is DelayDeliveryWith delayDeliveryWith)
{
var timeDelay = delayDeliveryWith.Delay;
message.Headers[Headers.DeliverAt] = DateTimeExtensions.ToWireFormattedString(utcNow.Add(timeDelay));
}
else if (options.DelayedDeliveryConstraint is DoNotDeliverBefore doNotDeliverBefore)
{
var deliverAt = doNotDeliverBefore.At;
message.Headers[Headers.DeliverAt] = DateTimeExtensions.ToWireFormattedString(deliverAt);
}
}
}
}
return next(context);
}
Expand Down
2 changes: 1 addition & 1 deletion src/NServiceBus.Core/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
using System.Runtime.InteropServices;

[assembly: ComVisible(false)]
[assembly: CLSCompliant(true)]
[assembly: CLSCompliant(true)]

0 comments on commit 2ca686b

Please sign in to comment.