Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start a new trace (or root activity) for delayed messages #7049

Merged
merged 22 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
469e5c0
align naming
lailabougria May 31, 2024
7acddf1
start a new trace when the incoming message was delayed through sendo…
lailabougria May 31, 2024
2a44925
add test to check new trace is created
lailabougria May 31, 2024
5b52af5
use deliverat header to identify delayed message set through sendoptions
lailabougria May 31, 2024
be46a22
restructure
lailabougria May 31, 2024
420eda7
add tests for delayed messages
lailabougria May 31, 2024
de75781
Restructure back into core tests and use the ConfigureEndpointAccepta…
lailabougria May 31, 2024
a4f49ca
ocd
lailabougria May 31, 2024
c61cb3e
add saga timeout test
lailabougria May 31, 2024
679633b
use tryparse overload that allows you to set the remote to true
lailabougria May 31, 2024
d6f9a0c
extract in method
lailabougria May 31, 2024
f23b1f3
saga self verification
lailabougria May 31, 2024
1698627
Update src/NServiceBus.Core/OpenTelemetry/Tracing/ActivityFactory.cs
lailabougria Jun 3, 2024
b47a784
Update src/NServiceBus.Core/OpenTelemetry/Tracing/ActivityFactory.cs
lailabougria Jun 3, 2024
1f53283
Setting the correct trace on the sending side making the receiving si…
lailabougria Jun 3, 2024
66df00d
approval files
lailabougria Jun 3, 2024
ddeb972
allow addition of extra header when moved to error queue
lailabougria Jun 3, 2024
74864d4
fix
lailabougria Jun 3, 2024
0d96256
unused usings
lailabougria Jun 3, 2024
ce473c1
add test to verify the StartNewTrace header is added when message is …
lailabougria Jun 3, 2024
77d548c
only set the start new trace header when propagating a traceparent he…
lailabougria Jun 3, 2024
18799c4
fix test
lailabougria Jun 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
namespace NServiceBus.AcceptanceTests.Core.OpenTelemetry;

using System;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using EndpointTemplates;
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTesting.Customization;
using NUnit.Framework;

public class When_incoming_message_was_delayed : OpenTelemetryAcceptanceTest // assuming W3C trace!
{
[Test]
public async Task By_sendoptions_Should_create_new_trace_and_link_to_send()
{
var context = await Scenario.Define<Context>()
.WithEndpoint<TestEndpoint>(b => b
.CustomConfig(c => c.ConfigureRouting().RouteToEndpoint(typeof(IncomingMessage), typeof(ReplyingEndpoint)))
.When(s =>
{
var sendOptions = new SendOptions();
sendOptions.DelayDeliveryWith(TimeSpan.FromMilliseconds(5));
return s.Send(new IncomingMessage(), sendOptions);
}))
.WithEndpoint<ReplyingEndpoint>()
.Done(c => c.ReplyMessageReceived)
.Run();

var incomingMessageActivities = NServicebusActivityListener.CompletedActivities.GetReceiveMessageActivities();
var outgoingMessageActivities = NServicebusActivityListener.CompletedActivities.GetSendMessageActivities();
Assert.AreEqual(2, incomingMessageActivities.Count, "2 messages are received as part of this test");
Assert.AreEqual(2, outgoingMessageActivities.Count, "2 messages are sent as part of this test");

var sendRequest = outgoingMessageActivities[0];
var receiveRequest = incomingMessageActivities[0];
var sendReply = outgoingMessageActivities[1];
var receiveReply = incomingMessageActivities[1];

Assert.AreNotEqual(sendRequest.RootId, receiveRequest.RootId, "send and receive operations are part of different root activities");
Assert.IsNull(receiveRequest.ParentId, "first incoming message does not have a parent, it's a root");
Assert.AreNotEqual(sendRequest.RootId, sendReply.RootId, "first send operation is different than the root activity of the reply");
Assert.AreEqual(sendReply.Id, receiveReply.ParentId, "second incoming message is correlated to the second send operation");
Assert.AreEqual(sendReply.RootId, receiveReply.RootId, "second incoming message is the root activity");

ActivityLink link = receiveRequest.Links.FirstOrDefault();
Assert.IsNotNull(link, "second receive has a link");
Assert.AreEqual(sendRequest.TraceId, link.Context.TraceId, "second receive is linked to send operation");
}

[Test]
public async Task By_retry_Should_create_new_trace_and_link_to_send()
{
var context = await Scenario.Define<Context>()
.WithEndpoint<RetryEndpoint>(b => b
.CustomConfig(c => c.ConfigureRouting().RouteToEndpoint(typeof(IncomingMessage), typeof(ReplyingEndpoint)))
.When(session => session.SendLocal(new MessageToBeRetried()))
.DoNotFailOnErrorMessages())
.Done(c => !c.FailedMessages.IsEmpty)
.Run(TimeSpan.FromSeconds(120));

var incomingMessageActivities = NServicebusActivityListener.CompletedActivities.GetReceiveMessageActivities();
var outgoingMessageActivities = NServicebusActivityListener.CompletedActivities.GetSendMessageActivities();
Assert.AreEqual(2, incomingMessageActivities.Count, "2 messages are received as part of this test (2 attempts)");
Assert.AreEqual(1, outgoingMessageActivities.Count, "1 message sent as part of this test");

var sendRequest = outgoingMessageActivities[0];
var firstAttemptReceiveRequest = incomingMessageActivities[0];
var secondAttemptReceiveRequest = incomingMessageActivities[1];

Assert.AreEqual(sendRequest.RootId, firstAttemptReceiveRequest.RootId, "first send operation is the root activity");
Assert.AreEqual(sendRequest.Id, firstAttemptReceiveRequest.ParentId, "first incoming message is correlated to the first send operation");

Assert.AreNotEqual(sendRequest.RootId, secondAttemptReceiveRequest.RootId, "send and 2nd receive operations are part of different root activities");
Assert.IsNull(secondAttemptReceiveRequest.ParentId, "first incoming message does not have a parent, it's a root");
ActivityLink link = secondAttemptReceiveRequest.Links.FirstOrDefault();
Assert.IsNotNull(link, "second receive has a link");
Assert.AreEqual(sendRequest.TraceId, link.Context.TraceId, "second receive is linked to send operation");
}

class Context : ScenarioContext
{
public bool ReplyMessageReceived { get; set; }
public string IncomingMessageId { get; set; }
public string ReplyMessageId { get; set; }
public bool IncomingMessageReceived { get; set; }
}

class ReplyingEndpoint : EndpointConfigurationBuilder
{
public ReplyingEndpoint() => EndpointSetup<OpenTelemetryEnabledEndpoint>();
class MessageHandler : IHandleMessages<IncomingMessage>
{
readonly Context testContext;

public MessageHandler(Context testContext) => this.testContext = testContext;

public Task Handle(IncomingMessage message, IMessageHandlerContext context)
{
testContext.IncomingMessageId = context.MessageId;
testContext.IncomingMessageReceived = true;
return context.Reply(new ReplyMessage());
}
}
}

class TestEndpoint : EndpointConfigurationBuilder
{
public TestEndpoint()
{
var template = new DefaultServer
{
TransportConfiguration = new ConfigureEndpointAcceptanceTestingTransport(false, true)
};
EndpointSetup(
template,
(c, _) =>
{
c.EnableOpenTelemetry();
var recoverability = c.Recoverability();
recoverability.Delayed(settings => settings.NumberOfRetries(1).TimeIncrease(TimeSpan.FromMilliseconds(1)));
}, metadata => { });
}

class MessageHandler : IHandleMessages<ReplyMessage>
{
Context testContext;

public MessageHandler(Context testContext) => this.testContext = testContext;

public Task Handle(ReplyMessage message, IMessageHandlerContext context)
{
testContext.ReplyMessageId = context.MessageId;
testContext.ReplyMessageReceived = true;
return Task.CompletedTask;
}
}
}

public class RetryEndpoint : EndpointConfigurationBuilder
{
public RetryEndpoint()
{
var template = new DefaultServer
{
TransportConfiguration = new ConfigureEndpointAcceptanceTestingTransport(false, true)
};
EndpointSetup(
template,
(c, _) =>
{
c.EnableOpenTelemetry();
var recoverability = c.Recoverability();
recoverability.Delayed(settings => settings.NumberOfRetries(1).TimeIncrease(TimeSpan.FromMilliseconds(1)));
}, metadata => { });
}

class MessageToBeRetriedHandler : IHandleMessages<MessageToBeRetried>
{
public Task Handle(MessageToBeRetried message, IMessageHandlerContext context)
{
throw new SimulatedException();
}
}
}


public class MessageToBeRetried : IMessage
{
}

public class IncomingMessage : IMessage
{
}

public class ReplyMessage : IMessage
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using NServiceBus.AcceptanceTesting;
using AcceptanceTesting;
using NUnit.Framework;

public class When_processing_incoming_message : OpenTelemetryAcceptanceTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,10 @@
<RemoveSourceFileFromPackage Include="AssemblyInfo.cs" />
</ItemGroup>

<ItemGroup>
<Compile Update="Core\OpenTelemetry\When_incoming_message_was_delayed.cs">
<Link>OpenTelemetry\When_incoming_message_was_delayed.cs</Link>
</Compile>
</ItemGroup>

</Project>
18 changes: 9 additions & 9 deletions src/NServiceBus.Core.Tests/OpenTelemetry/ActivityFactoryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void Should_attach_to_context_activity_when_activity_on_context()
var contextBag = new ContextBag();
contextBag.Set(contextActivity);

var activity = activityFactory.StartIncomingActivity(CreateMessageContext(contextBag: contextBag));
var activity = activityFactory.StartIncomingPipelineActivity(CreateMessageContext(contextBag: contextBag));

Assert.NotNull(activity, "should create activity for receive pipeline");
Assert.AreEqual(contextActivity.Id, activity.ParentId, "should use context activity as parent");
Expand All @@ -60,7 +60,7 @@ public void Should_attach_to_context_activity_when_activity_on_context_and_trace

var messageHeaders = new Dictionary<string, string> { { Headers.DiagnosticsTraceParent, sendActivity.Id } };

var activity = activityFactory.StartIncomingActivity(CreateMessageContext(messageHeaders, contextBag));
var activity = activityFactory.StartIncomingPipelineActivity(CreateMessageContext(messageHeaders, contextBag));

Assert.NotNull(activity, "should create activity for receive pipeline");
Assert.AreEqual(contextActivity.Id, activity.ParentId, "should use context activity as parent");
Expand All @@ -79,7 +79,7 @@ public void Should_attach_to_context_activity_when_activity_on_context_and_ambie
using var ambientActivity = ActivitySources.Main.StartActivity("ambient activity");
Assert.AreEqual(ambientActivity, Activity.Current);

var activity = activityFactory.StartIncomingActivity(CreateMessageContext(contextBag: contextBag));
var activity = activityFactory.StartIncomingPipelineActivity(CreateMessageContext(contextBag: contextBag));

Assert.NotNull(activity, "should create activity for receive pipeline");
Assert.AreEqual(contextActivity.Id, activity.ParentId, "should use context activity as parent");
Expand All @@ -94,7 +94,7 @@ public void Should_start_new_trace_when_activity_on_context_uses_legacy_id_forma
var contextBag = new ContextBag();
contextBag.Set(contextActivity);

var activity = activityFactory.StartIncomingActivity(CreateMessageContext(contextBag: contextBag));
var activity = activityFactory.StartIncomingPipelineActivity(CreateMessageContext(contextBag: contextBag));

Assert.NotNull(activity, "should create activity for receive pipeline");
Assert.IsNull(activity.ParentId, "should create a new trace");
Expand All @@ -108,7 +108,7 @@ public void Should_attach_to_header_trace_when_no_activity_on_context_and_trace_

var messageHeaders = new Dictionary<string, string> { { Headers.DiagnosticsTraceParent, sendActivity.Id } };

var activity = activityFactory.StartIncomingActivity(CreateMessageContext(messageHeaders));
var activity = activityFactory.StartIncomingPipelineActivity(CreateMessageContext(messageHeaders));

Assert.NotNull(activity, "should create activity for receive pipeline");
Assert.AreEqual(sendActivity.Id, activity.ParentId);
Expand All @@ -123,7 +123,7 @@ public void Should_attach_to_ambient_trace_when_no_activity_on_context_and_no_tr
ambientActivity.SetIdFormat(ambientActivityIdFormat);
ambientActivity.Start();

var activity = activityFactory.StartIncomingActivity(CreateMessageContext());
var activity = activityFactory.StartIncomingPipelineActivity(CreateMessageContext());

Assert.NotNull(activity, "should create activity for receive pipeline");
Assert.AreEqual(ambientActivity.Id, activity.ParentId, "should attach to ambient activity");
Expand All @@ -133,7 +133,7 @@ public void Should_attach_to_ambient_trace_when_no_activity_on_context_and_no_tr
[Test]
public void Should_start_new_trace_when_no_activity_on_context_and_no_trace_message_header_and_no_ambient_activity()
{
var activity = activityFactory.StartIncomingActivity(CreateMessageContext());
var activity = activityFactory.StartIncomingPipelineActivity(CreateMessageContext());

Assert.NotNull(activity, "should create activity for receive pipeline");
Assert.IsNull(activity.ParentId, "should start a new trace");
Expand All @@ -145,7 +145,7 @@ public void Should_start_new_trace_when_trace_header_contains_invalid_data()
{
var messageHeaders = new Dictionary<string, string> { { Headers.DiagnosticsTraceParent, "Some invalid traceparent format" } };

var activity = activityFactory.StartIncomingActivity(CreateMessageContext(messageHeaders));
var activity = activityFactory.StartIncomingPipelineActivity(CreateMessageContext(messageHeaders));

Assert.NotNull(activity, "should create activity for receive pipeline");
Assert.IsNull(activity.ParentId, "should start new trace");
Expand All @@ -157,7 +157,7 @@ public void Should_add_native_message_id_tag()
{
MessageContext messageContext = CreateMessageContext();

var activity = activityFactory.StartIncomingActivity(messageContext);
var activity = activityFactory.StartIncomingPipelineActivity(messageContext);

Assert.AreEqual(messageContext.NativeMessageId, activity.Tags.ToImmutableDictionary()["nservicebus.native_message_id"]);
}
Expand Down
35 changes: 26 additions & 9 deletions src/NServiceBus.Core/OpenTelemetry/Tracing/ActivityFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,45 @@

class ActivityFactory : IActivityFactory
{
public Activity StartIncomingActivity(MessageContext context)
public Activity StartIncomingPipelineActivity(MessageContext context)
{
Activity activity;
var incomingTraceParentExists = context.Headers.TryGetValue(Headers.DiagnosticsTraceParent, out var sendSpanId);
var activityContextCreatedFromIncomingTraceParent = ActivityContext.TryParse(sendSpanId, null, out var sendSpanContext);

if (context.Extensions.TryGet(out Activity transportActivity) && transportActivity != null) // attach to transport span but link receive pipeline span to send pipeline span
{
ActivityLink[] links = null;
if (context.Headers.TryGetValue(Headers.DiagnosticsTraceParent, out var sendSpanId) && sendSpanId != transportActivity.Id)
if (incomingTraceParentExists && sendSpanId != transportActivity.Id)
{
if (ActivityContext.TryParse(sendSpanId, null, out var sendSpanContext))
if (activityContextCreatedFromIncomingTraceParent)
{
links = new[] { new ActivityLink(sendSpanContext) };
links = [new ActivityLink(sendSpanContext)];
}
}

activity = ActivitySources.Main.CreateActivity(name: ActivityNames.IncomingMessageActivityName,
ActivityKind.Consumer, transportActivity.Context, links: links, idFormat: ActivityIdFormat.W3C);

}
else if (context.Headers.TryGetValue(Headers.DiagnosticsTraceParent, out var sendSpanId) && ActivityContext.TryParse(sendSpanId, null, out var sendSpanContext)) // otherwise directly create child from logical send
else if (incomingTraceParentExists && activityContextCreatedFromIncomingTraceParent) // otherwise directly create child from logical send
{
// TryParse doesn't have an overload that supports changing the isRemote setting yet
// This can be removed with .NET 7, see https://github.com/dotnet/runtime/issues/42575
var remoteParentActivityContext = new ActivityContext(sendSpanContext.TraceId, sendSpanContext.SpanId, sendSpanContext.TraceFlags, sendSpanContext.TraceState, isRemote: true);
activity = ActivitySources.Main.CreateActivity(name: ActivityNames.IncomingMessageActivityName, ActivityKind.Consumer, remoteParentActivityContext);
if (context.Headers.ContainsKey(Headers.DeliverAt) ||
context.Headers.ContainsKey(Headers.DelayedRetries))
{
// this is a delayed message and should therefore start a new trace
ActivityLink[] links = [new ActivityLink(sendSpanContext)];
// create a new trace or root activity
activity = ActivitySources.Main.StartActivity(name: ActivityNames.IncomingMessageActivityName, ActivityKind.Consumer, CreateNewRootActivityContext(), tags: null, links: links);
}
else
{
// this is a regular message and should therefore start a child trace
// TryParse doesn't have an overload that supports changing the isRemote setting yet
// This can be removed with .NET 7, see https://github.com/dotnet/runtime/issues/42575
var remoteParentActivityContext = new ActivityContext(sendSpanContext.TraceId, sendSpanContext.SpanId, sendSpanContext.TraceFlags, sendSpanContext.TraceState, isRemote: true);
activity = ActivitySources.Main.CreateActivity(name: ActivityNames.IncomingMessageActivityName, ActivityKind.Consumer, remoteParentActivityContext);
}
}
else // otherwise start new trace
{
Expand All @@ -54,6 +69,8 @@ public Activity StartIncomingActivity(MessageContext context)
return activity;
}

static ActivityContext CreateNewRootActivityContext() => new(Activity.TraceIdGenerator is null ? ActivityTraceId.CreateRandom() : Activity.TraceIdGenerator(), default, default, default);

public Activity StartOutgoingPipelineActivity(string activityName, string displayName, IBehaviorContext outgoingContext)
{

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

interface IActivityFactory
{
Activity StartIncomingActivity(MessageContext context);
Activity StartIncomingPipelineActivity(MessageContext context);
Activity StartOutgoingPipelineActivity(string activityName, string displayName, IBehaviorContext outgoingContext);
Activity StartHandlerActivity(MessageHandler messageHandler, ActiveSagaInstance saga);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

class NoOpActivityFactory : IActivityFactory
{
public Activity StartIncomingActivity(MessageContext context) => null;
public Activity StartIncomingPipelineActivity(MessageContext context) => null;

public Activity StartOutgoingPipelineActivity(string activityName, string displayName, IBehaviorContext outgoingContext) => null;

Expand Down
2 changes: 1 addition & 1 deletion src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public async Task Invoke(MessageContext messageContext, CancellationToken cancel
{
var pipelineStartedAt = DateTimeOffset.UtcNow;

using var activity = activityFactory.StartIncomingActivity(messageContext);
using var activity = activityFactory.StartIncomingPipelineActivity(messageContext);

var childScope = rootBuilder.CreateAsyncScope();
await using (childScope.ConfigureAwait(false))
Expand Down