diff --git a/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_endpoint_is_warmed_up.Make_sure_things_are_in_DI.approved.txt b/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_endpoint_is_warmed_up.Make_sure_things_are_in_DI.approved.txt index f46eb014c93..c7b6e2d0f7d 100644 --- a/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_endpoint_is_warmed_up.Make_sure_things_are_in_DI.approved.txt +++ b/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_endpoint_is_warmed_up.Make_sure_things_are_in_DI.approved.txt @@ -16,6 +16,7 @@ NServiceBus.Pipeline.LogicalMessageFactory - Singleton NServiceBus.Settings.IReadOnlySettings - Singleton NServiceBus.Transport.ISubscriptionManager - Singleton ----------- Private registrations used by Core----------- +NServiceBus.IncomingPipelineMetrics - Singleton NServiceBus.InferredMessageTypeEnricherBehavior - Transient NServiceBus.SubscriptionReceiverBehavior - Transient NServiceBus.SubscriptionRouter - Singleton diff --git a/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/Metrics/TestingMetricListener.cs b/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/Metrics/TestingMetricListener.cs index a9e15433c54..01cce8da1dd 100644 --- a/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/Metrics/TestingMetricListener.cs +++ b/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/Metrics/TestingMetricListener.cs @@ -36,11 +36,21 @@ public TestingMetricListener(string sourceName) ReportedMeters.AddOrUpdate(instrument.Name, measurement, (_, val) => val + measurement); Tags.AddOrUpdate(instrument.Name, _ => tags, (_, _) => tags); }); + meterListener.SetMeasurementEventCallback((Instrument instrument, + double measurement, + ReadOnlySpan> t, + object _) => + { + TestContext.WriteLine($"{instrument.Meter.Name}\\{instrument.Name}:{measurement}"); + var tags = t.ToArray(); + ReportedMeters.AddOrUpdate(instrument.Name, 1, (_, val) => val + 1); + Tags.AddOrUpdate(instrument.Name, _ => tags, (_, _) => tags); + }); meterListener.Start(); } public static TestingMetricListener SetupNServiceBusMetricsListener() => - SetupMetricsListener("NServiceBus.Core"); + SetupMetricsListener("NServiceBus.Core.Pipeline.Incoming"); public static TestingMetricListener SetupMetricsListener(string sourceName) { @@ -82,4 +92,13 @@ public object AssertTagKeyExists(string metricName, string tagKey) return meterTag.Value; } + + public void AssertTags(string metricName, Dictionary expectedTags) + { + foreach (var kvp in expectedTags) + { + var actualTagValue = AssertTagKeyExists(metricName, kvp.Key); + Assert.AreEqual(kvp.Value, actualTagValue); + } + } } \ No newline at end of file diff --git a/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/Metrics/When_message_is_processed_successfully.cs b/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/Metrics/When_message_is_processed_successfully.cs index 584701cc036..5b67e5dd4ce 100644 --- a/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/Metrics/When_message_is_processed_successfully.cs +++ b/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/Metrics/When_message_is_processed_successfully.cs @@ -1,5 +1,6 @@ namespace NServiceBus.AcceptanceTests.Core.OpenTelemetry.Metrics; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using NServiceBus; @@ -15,7 +16,7 @@ public async Task Should_report_successful_message_metric() using var metricsListener = TestingMetricListener.SetupNServiceBusMetricsListener(); _ = await Scenario.Define() - .WithEndpoint(b => b + .WithEndpoint(b => b.CustomConfig(x => x.MakeInstanceUniquelyAddressable("disc")) .When(async (session, ctx) => { for (var x = 0; x < 5; x++) @@ -30,13 +31,20 @@ public async Task Should_report_successful_message_metric() metricsListener.AssertMetric("nservicebus.messaging.fetches", 5); metricsListener.AssertMetric("nservicebus.messaging.failures", 0); - var successEndpoint = metricsListener.AssertTagKeyExists("nservicebus.messaging.successes", "nservicebus.queue"); - var successType = metricsListener.AssertTagKeyExists("nservicebus.messaging.successes", "nservicebus.message_type"); - var fetchedEndpoint = metricsListener.AssertTagKeyExists("nservicebus.messaging.fetches", "nservicebus.queue"); + metricsListener.AssertTags("nservicebus.messaging.fetches", + new Dictionary + { + ["nservicebus.queue"] = Conventions.EndpointNamingConvention(typeof(EndpointWithMetrics)), + ["nservicebus.discriminator"] = "disc", + }); - Assert.AreEqual(Conventions.EndpointNamingConvention(typeof(EndpointWithMetrics)), successEndpoint); - Assert.AreEqual(Conventions.EndpointNamingConvention(typeof(EndpointWithMetrics)), fetchedEndpoint); - Assert.AreEqual(typeof(OutgoingMessage).FullName, successType); + metricsListener.AssertTags("nservicebus.messaging.successes", + new Dictionary + { + ["nservicebus.queue"] = Conventions.EndpointNamingConvention(typeof(EndpointWithMetrics)), + ["nservicebus.discriminator"] = "disc", + ["nservicebus.message_type"] = typeof(OutgoingMessage).FullName, + }); } [Test] diff --git a/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/Metrics/When_message_processing_fails.cs b/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/Metrics/When_message_processing_fails.cs index b3904324904..7179c24e2d1 100644 --- a/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/Metrics/When_message_processing_fails.cs +++ b/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/Metrics/When_message_processing_fails.cs @@ -1,8 +1,10 @@ namespace NServiceBus.AcceptanceTests.Core.OpenTelemetry.Metrics; +using System.Collections.Generic; using System.Threading.Tasks; using AcceptanceTesting; using NUnit.Framework; +using AcceptanceTesting.Customization; public class When_message_processing_fails : OpenTelemetryAcceptanceTest { @@ -13,6 +15,7 @@ public async Task Should_report_failing_message_metrics() _ = await Scenario.Define() .WithEndpoint(e => e .DoNotFailOnErrorMessages() + .CustomConfig(x => x.MakeInstanceUniquelyAddressable("disc")) .When(s => s.SendLocal(new FailingMessage()))) .Done(c => c.HandlerInvoked) .Run(); @@ -20,6 +23,14 @@ public async Task Should_report_failing_message_metrics() metricsListener.AssertMetric("nservicebus.messaging.fetches", 1); metricsListener.AssertMetric("nservicebus.messaging.failures", 1); metricsListener.AssertMetric("nservicebus.messaging.successes", 0); + + metricsListener.AssertTags("nservicebus.messaging.failures", + new Dictionary + { + ["nservicebus.queue"] = Conventions.EndpointNamingConvention(typeof(FailingEndpoint)), + ["nservicebus.discriminator"] = "disc", + ["error.type"] = typeof(SimulatedException).FullName, + }); } class Context : ScenarioContext diff --git a/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/When_incoming_message_handled.cs b/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/When_incoming_message_handled.cs new file mode 100644 index 00000000000..a6ed79faec3 --- /dev/null +++ b/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/When_incoming_message_handled.cs @@ -0,0 +1,150 @@ +namespace NServiceBus.AcceptanceTests.Core.OpenTelemetry; + +using System; +using System.Threading; +using System.Threading.Tasks; +using AcceptanceTesting; +using Metrics; +using NUnit.Framework; +using Conventions = AcceptanceTesting.Customization.Conventions; + +public class When_incoming_message_handled : OpenTelemetryAcceptanceTest +{ + static readonly string HandlerTimeMetricName = "nservicebus.messaging.handler_time"; + static readonly string CriticalTimeMetricName = "nservicebus.messaging.critical_time"; + + [Test] + public async Task Should_record_critical_time() + { + using TestingMetricListener metricsListener = await WhenMessagesHandled(() => new MyMessage()); + metricsListener.AssertMetric(CriticalTimeMetricName, 5); + AssertMandatoryTags(metricsListener, CriticalTimeMetricName, typeof(MyMessage)); + } + + [Test] + public async Task Should_record_success_handling_time() + { + using TestingMetricListener metricsListener = await WhenMessagesHandled(() => new MyMessage()); + metricsListener.AssertMetric(HandlerTimeMetricName, 5); + AssertMandatoryTags(metricsListener, HandlerTimeMetricName, typeof(MyMessage)); + var handlerType = metricsListener.AssertTagKeyExists(HandlerTimeMetricName, "nservicebus.message_handler_type"); + Assert.AreEqual(typeof(MyMessageHandler).FullName, handlerType); + var result = metricsListener.AssertTagKeyExists(HandlerTimeMetricName, "execution.result"); + Assert.AreEqual("success", result); + } + + [Test] + public async Task Should_record_failure_handling_time() + { + using TestingMetricListener metricsListener = await WhenMessagesHandled(() => new MyExceptionalMessage()); + metricsListener.AssertMetric(HandlerTimeMetricName, 5); + AssertMandatoryTags(metricsListener, HandlerTimeMetricName, typeof(MyExceptionalMessage)); + var handlerType = metricsListener.AssertTagKeyExists(HandlerTimeMetricName, "nservicebus.message_handler_type"); + Assert.AreEqual(typeof(MyExceptionalHandler).FullName, handlerType); + var error = metricsListener.AssertTagKeyExists(HandlerTimeMetricName, "error.type"); + Assert.AreEqual(typeof(Exception).FullName, error); + var result = metricsListener.AssertTagKeyExists(HandlerTimeMetricName, "execution.result"); + Assert.AreEqual("failure", result); + } + + [Test] + public async Task Should_not_record_critical_time_on_failure() + { + using TestingMetricListener metricsListener = await WhenMessagesHandled(() => new MyExceptionalMessage()); + metricsListener.AssertMetric(CriticalTimeMetricName, 0); + } + + static async Task WhenMessagesHandled(Func messageFactory) + { + TestingMetricListener metricsListener = null; + try + { + metricsListener = TestingMetricListener.SetupNServiceBusMetricsListener(); + + _ = await Scenario.Define() + .WithEndpoint(b => + b.DoNotFailOnErrorMessages() + .CustomConfig(c => c.MakeInstanceUniquelyAddressable("discriminator")) + .When(async (session, _) => + { + for (var x = 0; x < 5; x++) + { + try + { + await session.SendLocal(messageFactory.Invoke()); + } + catch (Exception e) + { + Console.WriteLine(e); + } + } + })) + .Done(c => c.TotalHandledMessages == 5) + .Run(); + return metricsListener; + } + catch + { + metricsListener?.Dispose(); + throw; + } + } + + + static void AssertMandatoryTags( + TestingMetricListener metricsListener, + string metricName, + Type expectedMessageType) + { + var messageType = metricsListener.AssertTagKeyExists(metricName, "nservicebus.message_type"); + Assert.AreEqual(expectedMessageType.FullName, messageType); + var endpoint = metricsListener.AssertTagKeyExists(metricName, "nservicebus.queue"); + Assert.AreEqual(Conventions.EndpointNamingConvention(typeof(EndpointWithMetrics)), endpoint); + var discriminator = metricsListener.AssertTagKeyExists(metricName, "nservicebus.discriminator"); + Assert.AreEqual("discriminator", discriminator); + } + + class Context : ScenarioContext + { + public int TotalHandledMessages; + } + + class EndpointWithMetrics : EndpointConfigurationBuilder + { + public EndpointWithMetrics() => EndpointSetup(); + } + + class MyMessageHandler : IHandleMessages + { + readonly Context testContext; + + public MyMessageHandler(Context testContext) => this.testContext = testContext; + + public Task Handle(MyMessage message, IMessageHandlerContext context) + { + Interlocked.Increment(ref testContext.TotalHandledMessages); + return Task.CompletedTask; + } + } + + class MyExceptionalHandler : IHandleMessages + { + readonly Context testContext; + + public MyExceptionalHandler(Context testContext) => this.testContext = testContext; + + public Task Handle(MyExceptionalMessage message, IMessageHandlerContext context) + { + Interlocked.Increment(ref testContext.TotalHandledMessages); + throw new Exception(); + } + } + + public class MyMessage : IMessage + { + } + + public class MyExceptionalMessage : IMessage + { + } +} \ No newline at end of file diff --git a/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/When_incoming_message_handled_successfully.cs b/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/When_incoming_message_handled_successfully.cs new file mode 100644 index 00000000000..cd5fd6e9e1f --- /dev/null +++ b/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/When_incoming_message_handled_successfully.cs @@ -0,0 +1,66 @@ +namespace NServiceBus.AcceptanceTests.Core.OpenTelemetry; + +using System.Threading; +using System.Threading.Tasks; +using AcceptanceTesting; +using Metrics; +using NUnit.Framework; +using Conventions = AcceptanceTesting.Customization.Conventions; + +[NonParallelizable] +public class WhenIncomingMessageHandledSuccessfully : NServiceBusAcceptanceTest +{ + [Test] + public async Task Should_record_handling_time() + { + using var metricsListener = TestingMetricListener.SetupNServiceBusMetricsListener(); + + _ = await Scenario.Define() + .WithEndpoint(b => + b.When(async (session, _) => + { + for (var x = 0; x < 5; x++) + { + await session.SendLocal(new MyMessage()); + } + })) + .Done(c => c.TotalHandledMessages == 5) + .Run(); + + string handlingTime = "nservicebus.messaging.handler_time"; + metricsListener.AssertMetric(handlingTime, 5); + var messageType = metricsListener.AssertTagKeyExists(handlingTime, "nservicebus.message_type"); + Assert.AreEqual(typeof(MyMessage).FullName, messageType); + var handlerType = metricsListener.AssertTagKeyExists(handlingTime, "nservicebus.message_handler_type"); + Assert.AreEqual(typeof(MyMessageHandler).FullName, handlerType); + var endpoint = metricsListener.AssertTagKeyExists(handlingTime, "nservicebus.queue"); + Assert.AreEqual(Conventions.EndpointNamingConvention(typeof(EndpointWithMetrics)), endpoint); + } + + class Context : ScenarioContext + { + public int TotalHandledMessages; + } + + class EndpointWithMetrics : EndpointConfigurationBuilder + { + public EndpointWithMetrics() => EndpointSetup(); + } + + class MyMessageHandler : IHandleMessages + { + readonly Context testContext; + + public MyMessageHandler(Context testContext) => this.testContext = testContext; + + public Task Handle(MyMessage message, IMessageHandlerContext context) + { + Interlocked.Increment(ref testContext.TotalHandledMessages); + return Task.CompletedTask; + } + } + + public class MyMessage : IMessage + { + } +} \ No newline at end of file diff --git a/src/NServiceBus.AcceptanceTests/Routing/MessageDrivenSubscriptions/Sub_to_scaled_out_pubs.cs b/src/NServiceBus.AcceptanceTests/Routing/MessageDrivenSubscriptions/Sub_to_scaled_out_pubs.cs index 8a045025fc9..497cab5b35e 100644 --- a/src/NServiceBus.AcceptanceTests/Routing/MessageDrivenSubscriptions/Sub_to_scaled_out_pubs.cs +++ b/src/NServiceBus.AcceptanceTests/Routing/MessageDrivenSubscriptions/Sub_to_scaled_out_pubs.cs @@ -1,7 +1,6 @@ namespace NServiceBus.AcceptanceTests.Routing.MessageDrivenSubscriptions; using System.Collections.Concurrent; -using System.Collections.Generic; using System.Threading.Tasks; using AcceptanceTesting; using AcceptanceTesting.Customization; diff --git a/src/NServiceBus.Core.Tests/ApprovalFiles/MeterTagsTests.Verify_MeterTags.approved.txt b/src/NServiceBus.Core.Tests/ApprovalFiles/MeterTagsTests.Verify_MeterTags.approved.txt deleted file mode 100644 index 333deef4bb9..00000000000 --- a/src/NServiceBus.Core.Tests/ApprovalFiles/MeterTagsTests.Verify_MeterTags.approved.txt +++ /dev/null @@ -1,11 +0,0 @@ -{ - "Note": "Changes to meter tags should result in Meters version updates", - "ActivitySourceVersion": "0.2.0", - "Tags": [ - "EndpointDiscriminator => nservicebus.discriminator", - "QueueName => nservicebus.queue", - "MessageType => nservicebus.message_type", - "FailureType => nservicebus.failure_type", - "MessageHandlerTypes => nservicebus.message_handler_types" - ] -} \ No newline at end of file diff --git a/src/NServiceBus.Core.Tests/ApprovalFiles/MeterTests.Verify_MeterAPI.approved.txt b/src/NServiceBus.Core.Tests/ApprovalFiles/MeterTests.Verify_MeterAPI.approved.txt new file mode 100644 index 00000000000..331b25092e3 --- /dev/null +++ b/src/NServiceBus.Core.Tests/ApprovalFiles/MeterTests.Verify_MeterAPI.approved.txt @@ -0,0 +1,23 @@ +{ + "Note": "Changes to metrics API should result in an update to NServiceBusMeter version.", + "ActivitySourceVersion": "0.2.0", + "Tags": [ + "error.type", + "execution.result", + "nservicebus.discriminator", + "nservicebus.message_handler_type", + "nservicebus.message_handler_types", + "nservicebus.message_type", + "nservicebus.queue" + ], + "Metrics": [ + "nservicebus.messaging.critical_time => Histogram, Unit: s", + "nservicebus.messaging.failures => Counter", + "nservicebus.messaging.fetches => Counter", + "nservicebus.messaging.handler_time => Histogram, Unit: s", + "nservicebus.messaging.successes => Counter", + "nservicebus.recoverability.delayed => Counter", + "nservicebus.recoverability.error => Counter", + "nservicebus.recoverability.immediate => Counter" + ] +} \ No newline at end of file diff --git a/src/NServiceBus.Core.Tests/OpenTelemetry/Helpers/TestingMetricListener.cs b/src/NServiceBus.Core.Tests/OpenTelemetry/Helpers/TestingMetricListener.cs index a9e15433c54..08d3fafca80 100644 --- a/src/NServiceBus.Core.Tests/OpenTelemetry/Helpers/TestingMetricListener.cs +++ b/src/NServiceBus.Core.Tests/OpenTelemetry/Helpers/TestingMetricListener.cs @@ -10,6 +10,8 @@ namespace NServiceBus.AcceptanceTests.Core.OpenTelemetry.Metrics; class TestingMetricListener : IDisposable { readonly MeterListener meterListener; + public List metrics = []; + public string version = ""; public TestingMetricListener(string sourceName) { @@ -21,6 +23,8 @@ public TestingMetricListener(string sourceName) { TestContext.WriteLine($"Subscribing to {instrument.Meter.Name}\\{instrument.Name}"); listener.EnableMeasurementEvents(instrument); + metrics.Add(instrument); + version = instrument.Meter.Version; } } }; @@ -40,7 +44,7 @@ public TestingMetricListener(string sourceName) } public static TestingMetricListener SetupNServiceBusMetricsListener() => - SetupMetricsListener("NServiceBus.Core"); + SetupMetricsListener("NServiceBus.Core.Pipeline.Incoming"); public static TestingMetricListener SetupMetricsListener(string sourceName) { diff --git a/src/NServiceBus.Core.Tests/OpenTelemetry/MeterTagsTests.cs b/src/NServiceBus.Core.Tests/OpenTelemetry/MeterTagsTests.cs deleted file mode 100644 index 5e5fa1a47f3..00000000000 --- a/src/NServiceBus.Core.Tests/OpenTelemetry/MeterTagsTests.cs +++ /dev/null @@ -1,27 +0,0 @@ -namespace NServiceBus.Core.Tests.OpenTelemetry; - -using System.Linq; -using System.Reflection; -using NUnit.Framework; -using Particular.Approvals; - -[TestFixture] -public class MeterTagsTests -{ - [Test] - public void Verify_MeterTags() - { - var meterTags = typeof(MeterTags) - .GetFields(BindingFlags.Public | BindingFlags.Static) - .Where(fi => fi.IsLiteral && !fi.IsInitOnly) - .Select(x => $"{x.Name} => {x.GetRawConstantValue()}") - .ToList(); - - Approver.Verify(new - { - Note = "Changes to meter tags should result in Meters version updates", - ActivitySourceVersion = Meters.NServiceBusMeter.Version, - Tags = meterTags - }); - } -} \ No newline at end of file diff --git a/src/NServiceBus.Core.Tests/OpenTelemetry/MeterTests.cs b/src/NServiceBus.Core.Tests/OpenTelemetry/MeterTests.cs new file mode 100644 index 00000000000..13ab01938e2 --- /dev/null +++ b/src/NServiceBus.Core.Tests/OpenTelemetry/MeterTests.cs @@ -0,0 +1,38 @@ +namespace NServiceBus.Core.Tests.OpenTelemetry; + +using System.Linq; +using System.Reflection; +using AcceptanceTests.Core.OpenTelemetry.Metrics; +using NUnit.Framework; +using Particular.Approvals; + +[TestFixture] +public class MeterTests +{ + [Test] + public void Verify_MeterAPI() + { + var meterTags = typeof(MeterTags) + .GetFields(BindingFlags.Public | BindingFlags.Static) + .Where(fi => fi.IsLiteral && !fi.IsInitOnly) + .Select(x => x.GetRawConstantValue()) + .OrderBy(value => value) + .ToList(); + using var metricsListener = TestingMetricListener.SetupNServiceBusMetricsListener(); + //The IncomingPipelineMetrics constructor creates the meters, therefore a new instance before collecting the metrics. +#pragma warning disable CA1806 + new IncomingPipelineMetrics(new TestMeterFactory(), "queue", "disc"); +#pragma warning restore CA1806 + var metrics = metricsListener.metrics + .Select(x => $"{x.Name} => {x.GetType().Name.Split("`").First()}{(x.Unit == null ? "" : ", Unit: ")}{x.Unit ?? ""}") + .OrderBy(value => value) + .ToList(); + Approver.Verify(new + { + Note = "Changes to metrics API should result in an update to NServiceBusMeter version.", + ActivitySourceVersion = metricsListener.version, + Tags = meterTags, + Metrics = metrics + }); + } +} \ No newline at end of file diff --git a/src/NServiceBus.Core.Tests/OpenTelemetry/ReceiveDiagnosticsBehaviorTests.cs b/src/NServiceBus.Core.Tests/OpenTelemetry/ReceiveDiagnosticsBehaviorTests.cs deleted file mode 100644 index 64456b84b44..00000000000 --- a/src/NServiceBus.Core.Tests/OpenTelemetry/ReceiveDiagnosticsBehaviorTests.cs +++ /dev/null @@ -1,90 +0,0 @@ -namespace NServiceBus.Core.Tests.OpenTelemetry; - -using System; -using System.Collections.Immutable; -using System.Threading; -using System.Threading.Tasks; -using AcceptanceTests.Core.OpenTelemetry.Metrics; -using NServiceBus.Testing; -using NUnit.Framework; - -[TestFixture] -class ReceiveDiagnosticsBehaviorTests -{ - [Test] - public async Task Should_increase_total_fetched_when_processing_message() - { - var behavior = new ReceiveDiagnosticsBehavior("queueBaseName", "discriminator"); - using var metricsListener = TestingMetricListener.SetupNServiceBusMetricsListener(); - var testableIncomingPhysicalMessageContext = new TestableIncomingPhysicalMessageContext(); - await behavior.Invoke(testableIncomingPhysicalMessageContext, _ => Task.CompletedTask); - - metricsListener.AssertMetric(Meters.TotalFetched.Name, 1); - - var fetchedTags = metricsListener.Tags[Meters.TotalFetched.Name].ToImmutableDictionary(); - Assert.AreEqual("discriminator", fetchedTags[MeterTags.EndpointDiscriminator]); - Assert.AreEqual("queueBaseName", fetchedTags[MeterTags.QueueName]); - } - - [Test] - public async Task Should_increase_total_successful_when_processing_message_successfully() - { - var behavior = new ReceiveDiagnosticsBehavior("queueBaseName", "discriminator"); - - using var metricsListener = TestingMetricListener.SetupNServiceBusMetricsListener(); - var testableIncomingPhysicalMessageContext = new TestableIncomingPhysicalMessageContext(); - var tags = testableIncomingPhysicalMessageContext.Extensions.Get(); - tags.Add(MeterTags.MessageType, "SomeType"); - await behavior.Invoke(testableIncomingPhysicalMessageContext, _ => Task.CompletedTask); - - metricsListener.AssertMetric(Meters.TotalFetched.Name, 1); - metricsListener.AssertMetric(Meters.TotalProcessedSuccessfully.Name, 1); - metricsListener.AssertMetric(Meters.TotalFailures.Name, 0); - - var processedTags = metricsListener.Tags[Meters.TotalProcessedSuccessfully.Name].ToImmutableDictionary(); - Assert.AreEqual("discriminator", processedTags[MeterTags.EndpointDiscriminator]); - Assert.AreEqual("queueBaseName", processedTags[MeterTags.QueueName]); - Assert.AreEqual("SomeType", processedTags[MeterTags.MessageType]); - } - - [Test] - public void Should_increase_failures_error_when_processing_message_fails() - { - var behavior = new ReceiveDiagnosticsBehavior("queueBaseName", "discriminator"); - var context = new TestableIncomingPhysicalMessageContext(); - - using var metricsListener = TestingMetricListener.SetupNServiceBusMetricsListener(); - Assert.ThrowsAsync(() => behavior.Invoke(context, _ => throw new Exception("test"))); - - metricsListener.AssertMetric(Meters.TotalFetched.Name, 1); - metricsListener.AssertMetric(Meters.TotalProcessedSuccessfully.Name, 0); - metricsListener.AssertMetric(Meters.TotalFailures.Name, 1); - - var failureTags = metricsListener.Tags[Meters.TotalFailures.Name].ToImmutableDictionary(); - Assert.AreEqual(typeof(Exception), failureTags[MeterTags.FailureType]); - Assert.AreEqual("discriminator", failureTags[MeterTags.EndpointDiscriminator]); - Assert.AreEqual("queueBaseName", failureTags[MeterTags.QueueName]); - } - - [Test] - public void Should_not_increase_total_failures_when_cancellation_exception() - { - var behavior = new ReceiveDiagnosticsBehavior("queueBaseName", "discriminator"); - - using var cts = new CancellationTokenSource(); - using var metricsListener = TestingMetricListener.SetupNServiceBusMetricsListener(); - - var context = new TestableIncomingPhysicalMessageContext { CancellationToken = cts.Token }; - - cts.Cancel(); - Assert.ThrowsAsync(() => behavior.Invoke(context, ctx => - { - ctx.CancellationToken.ThrowIfCancellationRequested(); - return Task.CompletedTask; - })); - - metricsListener.AssertMetric(Meters.TotalFetched.Name, 1); - metricsListener.AssertMetric(Meters.TotalProcessedSuccessfully.Name, 0); - metricsListener.AssertMetric(Meters.TotalFailures.Name, 0); - } -} \ No newline at end of file diff --git a/src/NServiceBus.Core.Tests/OpenTelemetry/TestMeterFactory.cs b/src/NServiceBus.Core.Tests/OpenTelemetry/TestMeterFactory.cs new file mode 100644 index 00000000000..8bbabf8795b --- /dev/null +++ b/src/NServiceBus.Core.Tests/OpenTelemetry/TestMeterFactory.cs @@ -0,0 +1,24 @@ +namespace NServiceBus.Core.Tests.OpenTelemetry; + +using System.Collections.Generic; +using System.Diagnostics.Metrics; + +class TestMeterFactory() : IMeterFactory +{ + List meters = []; + + public void Dispose() + { + foreach (Meter meter in meters) + { + meter.Dispose(); + } + } + + public Meter Create(MeterOptions options) + { + var meter = new Meter(options); + meters.Add(meter); + return meter; + } +} diff --git a/src/NServiceBus.Core.Tests/Pipeline/Incoming/InvokeHandlerTerminatorTest.cs b/src/NServiceBus.Core.Tests/Pipeline/Incoming/InvokeHandlerTerminatorTest.cs index 986a6219c91..f1715defdc7 100644 --- a/src/NServiceBus.Core.Tests/Pipeline/Incoming/InvokeHandlerTerminatorTest.cs +++ b/src/NServiceBus.Core.Tests/Pipeline/Incoming/InvokeHandlerTerminatorTest.cs @@ -1,17 +1,17 @@ namespace NServiceBus.Core.Tests.Pipeline.Incoming; using System; -using System.Collections.Generic; using System.Threading.Tasks; using NServiceBus.Pipeline; using NServiceBus.Sagas; using NUnit.Framework; +using OpenTelemetry; using Testing; [TestFixture] public class InvokeHandlerTerminatorTest { - InvokeHandlerTerminator terminator = new InvokeHandlerTerminator(new NoOpActivityFactory()); + InvokeHandlerTerminator terminator = new(new NoOpActivityFactory(), new IncomingPipelineMetrics(new TestMeterFactory(), "queue", "disc")); [Test] public async Task When_saga_found_and_handler_is_saga_should_invoke_handler() diff --git a/src/NServiceBus.Core.Tests/Pipeline/MainPipelineExecutorTests.cs b/src/NServiceBus.Core.Tests/Pipeline/MainPipelineExecutorTests.cs index 1fc5d0a6816..0b05c46af5d 100644 --- a/src/NServiceBus.Core.Tests/Pipeline/MainPipelineExecutorTests.cs +++ b/src/NServiceBus.Core.Tests/Pipeline/MainPipelineExecutorTests.cs @@ -7,6 +7,7 @@ using Microsoft.Extensions.DependencyInjection; using NServiceBus.Pipeline; using NUnit.Framework; +using OpenTelemetry; using OpenTelemetry.Helpers; using Transport; @@ -118,7 +119,8 @@ static MainPipelineExecutor CreateMainPipelineExecutor(IPipeline(), receivePipeline, - new ActivityFactory()); + new ActivityFactory(), + new IncomingPipelineMetrics(new TestMeterFactory(), "queue", "disc")); return executor; } diff --git a/src/NServiceBus.Core.Tests/Reliability/Outbox/TransportReceiveToPhysicalMessageConnectorTests.cs b/src/NServiceBus.Core.Tests/Reliability/Outbox/TransportReceiveToPhysicalMessageConnectorTests.cs index f63220478c6..5c3e4bdd94d 100644 --- a/src/NServiceBus.Core.Tests/Reliability/Outbox/TransportReceiveToPhysicalMessageConnectorTests.cs +++ b/src/NServiceBus.Core.Tests/Reliability/Outbox/TransportReceiveToPhysicalMessageConnectorTests.cs @@ -10,6 +10,8 @@ using NServiceBus.Pipeline; using NServiceBus.Routing; using NUnit.Framework; +using OpenTelemetry; +using Settings; using Testing; using Transport; using TransportOperation = Transport.TransportOperation; @@ -180,7 +182,7 @@ public void SetUp() fakeOutbox = new FakeOutboxStorage(); fakeBatchPipeline = new FakeBatchPipeline(); - behavior = new TransportReceiveToPhysicalMessageConnector(fakeOutbox); + behavior = new TransportReceiveToPhysicalMessageConnector(fakeOutbox, new IncomingPipelineMetrics(new TestMeterFactory(), "queue", "disc")); } Task Invoke(ITransportReceiveContext context, Func next = null) => behavior.Invoke(context, next ?? (_ => Task.CompletedTask)); diff --git a/src/NServiceBus.Core/EndpointCreator.cs b/src/NServiceBus.Core/EndpointCreator.cs index 072b56a9777..5d731e149d2 100644 --- a/src/NServiceBus.Core/EndpointCreator.cs +++ b/src/NServiceBus.Core/EndpointCreator.cs @@ -99,7 +99,7 @@ void Configure() hostingConfiguration, pipelineSettings); - pipelineComponent = PipelineComponent.Initialize(pipelineSettings, hostingConfiguration); + pipelineComponent = PipelineComponent.Initialize(pipelineSettings, hostingConfiguration, receiveConfiguration); // The settings can only be locked after initializing the feature component since it uses the settings to store & share feature state. // As well as all the other components have been initialized @@ -117,6 +117,9 @@ void Configure() } ); + // Make Metrics a first class citizen in Core by enabling once and for all them when creating the endpoint + _ = hostingConfiguration.Services.AddMetrics(); + hostingComponent = HostingComponent.Initialize(hostingConfiguration); } diff --git a/src/NServiceBus.Core/Hosting/HostingComponent.Settings.cs b/src/NServiceBus.Core/Hosting/HostingComponent.Settings.cs index 9128f800c37..4cf3f4db74a 100644 --- a/src/NServiceBus.Core/Hosting/HostingComponent.Settings.cs +++ b/src/NServiceBus.Core/Hosting/HostingComponent.Settings.cs @@ -44,6 +44,8 @@ public string DisplayName public string EndpointName => settings.EndpointName(); + public string Discriminator => settings.GetOrDefault("EndpointInstanceDiscriminator"); + public Dictionary Properties { get { return settings.Get>(PropertiesSettingsKey); } diff --git a/src/NServiceBus.Core/NServiceBus.Core.csproj b/src/NServiceBus.Core/NServiceBus.Core.csproj index cab99563ff3..194a5651146 100644 --- a/src/NServiceBus.Core/NServiceBus.Core.csproj +++ b/src/NServiceBus.Core/NServiceBus.Core.csproj @@ -12,6 +12,7 @@ + diff --git a/src/NServiceBus.Core/OpenTelemetry/Metrics/MessagingMetricsFeature.cs b/src/NServiceBus.Core/OpenTelemetry/Metrics/MessagingMetricsFeature.cs deleted file mode 100644 index 66bc33b7470..00000000000 --- a/src/NServiceBus.Core/OpenTelemetry/Metrics/MessagingMetricsFeature.cs +++ /dev/null @@ -1,24 +0,0 @@ -namespace NServiceBus; - -using Features; - -/// -/// MessagingMetricsFeature captures messaging metrics -/// -class MessagingMetricsFeature : Feature -{ - public MessagingMetricsFeature() => Prerequisite(c => !c.Receiving.IsSendOnlyEndpoint, "Processing metrics are not supported on send-only endpoints"); - - /// - protected internal override void Setup(FeatureConfigurationContext context) - { - var discriminator = context.Receiving.InstanceSpecificQueueAddress?.Discriminator; - var queueNameBase = context.Receiving.QueueNameBase; - var performanceDiagnosticsBehavior = new ReceiveDiagnosticsBehavior(queueNameBase, discriminator); - - context.Pipeline.Register( - performanceDiagnosticsBehavior, - "Provides OpenTelemetry counters for message processing" - ); - } -} \ No newline at end of file diff --git a/src/NServiceBus.Core/OpenTelemetry/Metrics/MeterTags.cs b/src/NServiceBus.Core/OpenTelemetry/Metrics/MeterTags.cs index 4c068d9b500..5db35d1fe52 100644 --- a/src/NServiceBus.Core/OpenTelemetry/Metrics/MeterTags.cs +++ b/src/NServiceBus.Core/OpenTelemetry/Metrics/MeterTags.cs @@ -5,6 +5,8 @@ static class MeterTags public const string EndpointDiscriminator = "nservicebus.discriminator"; public const string QueueName = "nservicebus.queue"; public const string MessageType = "nservicebus.message_type"; - public const string FailureType = "nservicebus.failure_type"; public const string MessageHandlerTypes = "nservicebus.message_handler_types"; + public const string MessageHandlerType = "nservicebus.message_handler_type"; + public const string ExecutionResult = "execution.result"; + public const string ErrorType = "error.type"; } \ No newline at end of file diff --git a/src/NServiceBus.Core/OpenTelemetry/Metrics/Meters.cs b/src/NServiceBus.Core/OpenTelemetry/Metrics/Meters.cs deleted file mode 100644 index 62722a78ea1..00000000000 --- a/src/NServiceBus.Core/OpenTelemetry/Metrics/Meters.cs +++ /dev/null @@ -1,28 +0,0 @@ -namespace NServiceBus; - -using System.Diagnostics.Metrics; - -class Meters -{ - internal static readonly Meter NServiceBusMeter = new Meter( - "NServiceBus.Core", - "0.2.0"); - - internal static readonly Counter TotalProcessedSuccessfully = - NServiceBusMeter.CreateCounter("nservicebus.messaging.successes", description: "Total number of messages processed successfully by the endpoint."); - - internal static readonly Counter TotalFetched = - NServiceBusMeter.CreateCounter("nservicebus.messaging.fetches", description: "Total number of messages fetched from the queue by the endpoint."); - - internal static readonly Counter TotalFailures = - NServiceBusMeter.CreateCounter("nservicebus.messaging.failures", description: "Total number of messages processed unsuccessfully by the endpoint."); - - internal static readonly Counter TotalImmediateRetries = - NServiceBusMeter.CreateCounter("nservicebus.recoverability.immediate", description: "Total number of immediate retries requested."); - - internal static readonly Counter TotalDelayedRetries = - NServiceBusMeter.CreateCounter("nservicebus.recoverability.delayed", description: "Total number of delayed retries requested."); - - internal static readonly Counter TotalSentToErrorQueue = - NServiceBusMeter.CreateCounter("nservicebus.recoverability.error", description: "Total number of messages sent to the error queue."); -} \ No newline at end of file diff --git a/src/NServiceBus.Core/OpenTelemetry/Metrics/MetricsExtensions.cs b/src/NServiceBus.Core/OpenTelemetry/Metrics/MetricsExtensions.cs new file mode 100644 index 00000000000..f43696f46d4 --- /dev/null +++ b/src/NServiceBus.Core/OpenTelemetry/Metrics/MetricsExtensions.cs @@ -0,0 +1,29 @@ +namespace NServiceBus; + +using System; +using System.Collections.Generic; + +static class MetricsExtensions +{ + public static bool TryGetTimeSent(this Dictionary headers, out DateTimeOffset timeSent) + { + if (headers.TryGetValue(Headers.TimeSent, out var timeSentString)) + { + timeSent = DateTimeOffsetHelper.ToDateTimeOffset(timeSentString); + return true; + } + timeSent = DateTimeOffset.MinValue; + return false; + } + + public static bool TryGetDeliverAt(this Dictionary headers, out DateTimeOffset deliverAt) + { + if (headers.TryGetValue(Headers.DeliverAt, out var deliverAtString)) + { + deliverAt = DateTimeOffsetHelper.ToDateTimeOffset(deliverAtString); + return true; + } + deliverAt = DateTimeOffset.MinValue; + return false; + } +} \ No newline at end of file diff --git a/src/NServiceBus.Core/OpenTelemetry/Metrics/ReceiveDiagnosticsBehavior.cs b/src/NServiceBus.Core/OpenTelemetry/Metrics/ReceiveDiagnosticsBehavior.cs deleted file mode 100644 index 83bbd1f8bf3..00000000000 --- a/src/NServiceBus.Core/OpenTelemetry/Metrics/ReceiveDiagnosticsBehavior.cs +++ /dev/null @@ -1,44 +0,0 @@ -namespace NServiceBus; - -using System; -using System.Diagnostics; -using System.Threading.Tasks; -using Pipeline; - -class ReceiveDiagnosticsBehavior : IBehavior -{ - public ReceiveDiagnosticsBehavior(string queueNameBase, string discriminator) - { - this.queueNameBase = queueNameBase; - this.discriminator = discriminator; - } - - public async Task Invoke(IIncomingPhysicalMessageContext context, Func next) - { - var availableMetricTags = context.Extensions.Get(); - availableMetricTags.Add(MeterTags.EndpointDiscriminator, discriminator); - availableMetricTags.Add(MeterTags.QueueName, queueNameBase); - - var tags = new TagList(); - availableMetricTags.ApplyTags(ref tags, [MeterTags.EndpointDiscriminator, MeterTags.QueueName]); - Meters.TotalFetched.Add(1, tags); - - try - { - await next(context).ConfigureAwait(false); - } - catch (Exception ex) when (!ex.IsCausedBy(context.CancellationToken)) - { - tags.Add(new(MeterTags.FailureType, ex.GetType())); - availableMetricTags.ApplyTags(ref tags, [MeterTags.MessageType, MeterTags.MessageHandlerTypes]); - Meters.TotalFailures.Add(1, tags); - throw; - } - - availableMetricTags.ApplyTags(ref tags, [MeterTags.MessageType, MeterTags.MessageHandlerTypes]); - Meters.TotalProcessedSuccessfully.Add(1, tags); - } - - readonly string queueNameBase; - readonly string discriminator; -} \ No newline at end of file diff --git a/src/NServiceBus.Core/OpenTelemetry/OpenTelemetryConfigurationExtensions.cs b/src/NServiceBus.Core/OpenTelemetry/OpenTelemetryConfigurationExtensions.cs index a643354a468..455aa2fb6a6 100644 --- a/src/NServiceBus.Core/OpenTelemetry/OpenTelemetryConfigurationExtensions.cs +++ b/src/NServiceBus.Core/OpenTelemetry/OpenTelemetryConfigurationExtensions.cs @@ -16,6 +16,5 @@ public static void EnableOpenTelemetry(this EndpointConfiguration endpointConfig endpointConfiguration.Settings.Get().EnableOpenTelemetry = true; endpointConfiguration.EnableFeature(); - endpointConfiguration.EnableFeature(); } } \ No newline at end of file diff --git a/src/NServiceBus.Core/Pipeline/Incoming/IncomingPipelineMetrics.cs b/src/NServiceBus.Core/Pipeline/Incoming/IncomingPipelineMetrics.cs new file mode 100644 index 00000000000..5f676e2ada4 --- /dev/null +++ b/src/NServiceBus.Core/Pipeline/Incoming/IncomingPipelineMetrics.cs @@ -0,0 +1,225 @@ +namespace NServiceBus; + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Diagnostics.Metrics; +using Pipeline; + +class IncomingPipelineMetrics +{ + const string TotalProcessedSuccessfully = "nservicebus.messaging.successes"; + const string TotalFetched = "nservicebus.messaging.fetches"; + const string TotalFailures = "nservicebus.messaging.failures"; + const string MessageHandlerTime = "nservicebus.messaging.handler_time"; + const string CriticalTime = "nservicebus.messaging.critical_time"; + const string RecoverabilityImmediate = "nservicebus.recoverability.immediate"; + const string RecoverabilityDelayed = "nservicebus.recoverability.delayed"; + const string RecoverabilityError = "nservicebus.recoverability.error"; + + public IncomingPipelineMetrics(IMeterFactory meterFactory, string queueName, string discriminator) + { + var meter = meterFactory.Create("NServiceBus.Core.Pipeline.Incoming", "0.2.0"); + totalProcessedSuccessfully = meter.CreateCounter(TotalProcessedSuccessfully, + description: "Total number of messages processed successfully by the endpoint."); + totalFetched = meter.CreateCounter(TotalFetched, + description: "Total number of messages fetched from the queue by the endpoint."); + totalFailures = meter.CreateCounter(TotalFailures, + description: "Total number of messages processed unsuccessfully by the endpoint."); + messageHandlerTime = meter.CreateHistogram(MessageHandlerTime, "s", + "The time in seconds for the execution of the business code."); + criticalTime = meter.CreateHistogram(CriticalTime, "s", + "The time in seconds between when the message was sent until processed by the endpoint."); + totalImmediateRetries = meter.CreateCounter(RecoverabilityImmediate, + description: "Total number of immediate retries requested."); + totalDelayedRetries = meter.CreateCounter(RecoverabilityDelayed, + description: "Total number of delayed retries requested."); + totalSentToErrorQueue = meter.CreateCounter(RecoverabilityError, + description: "Total number of messages sent to the error queue."); + + queueNameBase = queueName; + endpointDiscriminator = discriminator; + } + + public void AddDefaultIncomingPipelineMetricTags(IncomingPipelineMetricTags incomingPipelineMetricsTags) + { + incomingPipelineMetricsTags.Add(MeterTags.QueueName, queueNameBase); + incomingPipelineMetricsTags.Add(MeterTags.EndpointDiscriminator, endpointDiscriminator ?? ""); + } + + public void RecordMessageSuccessfullyProcessed(ITransportReceiveContext context, IncomingPipelineMetricTags incomingPipelineMetricTags) + { + if (!totalProcessedSuccessfully.Enabled && !criticalTime.Enabled) + { + return; + } + + TagList tags; + tags.Add(new(MeterTags.ExecutionResult, "success")); + incomingPipelineMetricTags.ApplyTags(ref tags, [ + MeterTags.QueueName, + MeterTags.EndpointDiscriminator, + MeterTags.MessageType, + MeterTags.MessageHandlerTypes]); + + if (totalProcessedSuccessfully.Enabled) + { + totalProcessedSuccessfully.Add(1, tags); + } + if (criticalTime.Enabled) + { + var completedAt = DateTimeOffset.UtcNow; + + if (context.Message.Headers.TryGetDeliverAt(out var startTime) + || context.Message.Headers.TryGetTimeSent(out startTime)) + { + var criticalTimeElapsed = completedAt - startTime; + criticalTime.Record(criticalTimeElapsed.TotalSeconds, tags); + } + } + } + + public void RecordMessageProcessingFailure(IncomingPipelineMetricTags incomingPipelineMetricTags, Exception error) + { + if (!totalFailures.Enabled) + { + return; + } + + TagList tags; + tags.Add(new(MeterTags.ErrorType, error.GetType().FullName)); + tags.Add(new(MeterTags.ExecutionResult, "failure")); + incomingPipelineMetricTags.ApplyTags(ref tags, [ + MeterTags.QueueName, + MeterTags.EndpointDiscriminator, + MeterTags.MessageType, + MeterTags.MessageHandlerTypes]); + totalFailures.Add(1, tags); + + // the critical time is intentionally not recorded in case of failure + } + + public void RecordFetchedMessage(IncomingPipelineMetricTags incomingPipelineMetricTags) + { + if (!totalFetched.Enabled) + { + return; + } + + TagList tags; + incomingPipelineMetricTags.ApplyTags(ref tags, [ + MeterTags.EndpointDiscriminator, + MeterTags.QueueName]); + + totalFetched.Add(1, tags); + } + + public void RecordSuccessfulMessageHandlerTime(IInvokeHandlerContext invokeHandlerContext, TimeSpan elapsed) + { + if (!messageHandlerTime.Enabled) + { + return; + } + + var incomingPipelineMetricTags = invokeHandlerContext.Extensions.Get(); + TagList meterTags; + incomingPipelineMetricTags.ApplyTags(ref meterTags, [ + MeterTags.QueueName, + MeterTags.EndpointDiscriminator, + MeterTags.MessageType, + MeterTags.MessageHandlerType]); + // This is what Add(string, object) does so skipping an unnecessary stack frame + meterTags.Add(new KeyValuePair(MeterTags.MessageHandlerType, invokeHandlerContext.MessageHandler.HandlerType.FullName)); + meterTags.Add(new KeyValuePair(MeterTags.ExecutionResult, "success")); + messageHandlerTime.Record(elapsed.TotalSeconds, meterTags); + } + + public void RecordFailedMessageHandlerTime(IInvokeHandlerContext invokeHandlerContext, TimeSpan elapsed, Exception error) + { + if (!messageHandlerTime.Enabled) + { + return; + } + + var incomingPipelineMetricTags = invokeHandlerContext.Extensions.Get(); + TagList meterTags; + incomingPipelineMetricTags.ApplyTags(ref meterTags, [ + MeterTags.QueueName, + MeterTags.EndpointDiscriminator, + MeterTags.MessageType, + MeterTags.MessageHandlerType]); + // This is what Add(string, object) does so skipping an unnecessary stack frame + meterTags.Add(new KeyValuePair(MeterTags.MessageHandlerType, invokeHandlerContext.MessageHandler.HandlerType.FullName)); + meterTags.Add(new KeyValuePair(MeterTags.ExecutionResult, "failure")); + meterTags.Add(new KeyValuePair(MeterTags.ErrorType, error.GetType().FullName)); + messageHandlerTime.Record(elapsed.TotalSeconds, meterTags); + } + + public void RecordImmediateRetry(IRecoverabilityContext recoverabilityContext) + { + if (!totalImmediateRetries.Enabled) + { + return; + } + + var incomingPipelineMetricTags = recoverabilityContext.Extensions.Get(); + TagList meterTags; + incomingPipelineMetricTags.ApplyTags(ref meterTags, [ + MeterTags.QueueName, + MeterTags.EndpointDiscriminator, + MeterTags.MessageType, + MeterTags.MessageHandlerType]); + // This is what Add(string, object) does so skipping an unnecessary stack frame + meterTags.Add(new KeyValuePair(MeterTags.ErrorType, recoverabilityContext.Exception.GetType().FullName)); + totalImmediateRetries.Add(1, meterTags); + } + + public void RecordDelayedRetry(IRecoverabilityContext recoverabilityContext) + { + if (!totalDelayedRetries.Enabled) + { + return; + } + + var incomingPipelineMetricTags = recoverabilityContext.Extensions.Get(); + TagList meterTags; + incomingPipelineMetricTags.ApplyTags(ref meterTags, [ + MeterTags.QueueName, + MeterTags.EndpointDiscriminator, + MeterTags.MessageType, + MeterTags.MessageHandlerType]); + // This is what Add(string, object) does so skipping an unnecessary stack frame + meterTags.Add(new KeyValuePair(MeterTags.ErrorType, recoverabilityContext.Exception.GetType().FullName)); + totalDelayedRetries.Add(1, meterTags); + } + + public void RecordSendToErrorQueue(IRecoverabilityContext recoverabilityContext) + { + if (!totalSentToErrorQueue.Enabled) + { + return; + } + + var incomingPipelineMetricTags = recoverabilityContext.Extensions.Get(); + TagList meterTags; + incomingPipelineMetricTags.ApplyTags(ref meterTags, [ + MeterTags.QueueName, + MeterTags.EndpointDiscriminator, + MeterTags.MessageType, + MeterTags.MessageHandlerType]); + // This is what Add(string, object) does so skipping an unnecessary stack frame + meterTags.Add(new KeyValuePair(MeterTags.ErrorType, recoverabilityContext.Exception.GetType().FullName)); + totalSentToErrorQueue.Add(1, meterTags); + } + + readonly Counter totalProcessedSuccessfully; + readonly Counter totalFetched; + readonly Counter totalFailures; + readonly Histogram messageHandlerTime; + readonly Histogram criticalTime; + readonly Counter totalImmediateRetries; + readonly Counter totalDelayedRetries; + readonly Counter totalSentToErrorQueue; + string queueNameBase; + string endpointDiscriminator; +} \ No newline at end of file diff --git a/src/NServiceBus.Core/Pipeline/Incoming/InvokeHandlerTerminator.cs b/src/NServiceBus.Core/Pipeline/Incoming/InvokeHandlerTerminator.cs index ddccce8a2fa..1d5538eb7ab 100644 --- a/src/NServiceBus.Core/Pipeline/Incoming/InvokeHandlerTerminator.cs +++ b/src/NServiceBus.Core/Pipeline/Incoming/InvokeHandlerTerminator.cs @@ -6,13 +6,8 @@ using Pipeline; using Sagas; -class InvokeHandlerTerminator : PipelineTerminator +class InvokeHandlerTerminator(IActivityFactory activityFactory, IncomingPipelineMetrics messagingMetricsMeters) : PipelineTerminator { - public InvokeHandlerTerminator(IActivityFactory activityFactory) - { - this.activityFactory = activityFactory; - } - protected override async Task Terminate(IInvokeHandlerContext context) { if (context.Extensions.TryGet(out ActiveSagaInstance saga) && saga.NotFound && saga.Metadata.SagaType == context.MessageHandler.Instance.GetType()) @@ -26,7 +21,6 @@ protected override async Task Terminate(IInvokeHandlerContext context) // Might as well abort before invoking the handler if we're shutting down context.CancellationToken.ThrowIfCancellationRequested(); - var startTime = DateTimeOffset.UtcNow; try { @@ -36,6 +30,7 @@ await messageHandler .ConfigureAwait(false); activity?.SetStatus(ActivityStatusCode.Ok); + messagingMetricsMeters.RecordSuccessfulMessageHandlerTime(context, DateTimeOffset.UtcNow - startTime); } #pragma warning disable PS0019 // Do not catch Exception without considering OperationCanceledException - enriching and rethrowing catch (Exception ex) @@ -48,10 +43,9 @@ await messageHandler ex.Data["Handler canceled"] = context.CancellationToken.IsCancellationRequested; activity?.SetErrorStatus(ex); - + messagingMetricsMeters.RecordFailedMessageHandlerTime(context, DateTimeOffset.UtcNow - startTime, ex); throw; } } - readonly IActivityFactory activityFactory; } \ No newline at end of file diff --git a/src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveContext.cs b/src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveContext.cs index 4e6780f43a9..7651fa07b27 100644 --- a/src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveContext.cs +++ b/src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveContext.cs @@ -20,8 +20,6 @@ public TransportReceiveContext(IServiceProvider serviceProvider, MessageOperatio Message = receivedMessage; Set(Message); Set(transportTransaction); - //Hack to be able to get the tags in the recoverability pipeline - _ = parentContext.GetOrCreate(); } /// diff --git a/src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveToPhysicalMessageConnector.cs b/src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveToPhysicalMessageConnector.cs index 2bb09ec9f89..6f0818a3bc0 100644 --- a/src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveToPhysicalMessageConnector.cs +++ b/src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveToPhysicalMessageConnector.cs @@ -12,9 +12,10 @@ namespace NServiceBus; class TransportReceiveToPhysicalMessageConnector : IStageForkConnector { - public TransportReceiveToPhysicalMessageConnector(IOutboxStorage outboxStorage) + public TransportReceiveToPhysicalMessageConnector(IOutboxStorage outboxStorage, IncomingPipelineMetrics incomingPipelineMetrics) { this.outboxStorage = outboxStorage; + this.incomingPipelineMetrics = incomingPipelineMetrics; } public async Task Invoke(ITransportReceiveContext context, Func next) @@ -33,6 +34,9 @@ public async Task Invoke(ITransportReceiveContext context, Func(out IncomingPipelineMetricTags incomingPipelineMetricsTags); + incomingPipelineMetrics.RecordMessageSuccessfullyProcessed(context, incomingPipelineMetricsTags); + var outboxMessage = new OutboxMessage(messageId, ConvertToOutboxOperations(pendingTransportOperations.Operations)); await outboxStorage.Store(outboxMessage, outboxTransaction, context.Extensions, context.CancellationToken).ConfigureAwait(false); @@ -131,4 +135,5 @@ static AddressTag DeserializeRoutingStrategy(Dictionary options) } readonly IOutboxStorage outboxStorage; + readonly IncomingPipelineMetrics incomingPipelineMetrics; } diff --git a/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs b/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs index 431d72ef07d..78d9f35660e 100644 --- a/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs +++ b/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs @@ -7,24 +7,27 @@ namespace NServiceBus; using Pipeline; using Transport; -class MainPipelineExecutor : IPipelineExecutor +class MainPipelineExecutor( + IServiceProvider rootBuilder, + IPipelineCache pipelineCache, + MessageOperations messageOperations, + INotificationSubscriptions receivePipelineNotification, + IPipeline receivePipeline, + IActivityFactory activityFactory, + IncomingPipelineMetrics incomingPipelineMetrics) + : IPipelineExecutor { - public MainPipelineExecutor(IServiceProvider rootBuilder, IPipelineCache pipelineCache, MessageOperations messageOperations, INotificationSubscriptions receivePipelineNotification, IPipeline receivePipeline, IActivityFactory activityFactory) - { - this.rootBuilder = rootBuilder; - this.pipelineCache = pipelineCache; - this.messageOperations = messageOperations; - this.receivePipelineNotification = receivePipelineNotification; - this.receivePipeline = receivePipeline; - this.activityFactory = activityFactory; - } - public async Task Invoke(MessageContext messageContext, CancellationToken cancellationToken = default) { var pipelineStartedAt = DateTimeOffset.UtcNow; using var activity = activityFactory.StartIncomingPipelineActivity(messageContext); + var incomingPipelineMetricsTags = messageContext.Extensions.Get(); + + incomingPipelineMetrics.AddDefaultIncomingPipelineMetricTags(incomingPipelineMetricsTags); + incomingPipelineMetrics.RecordFetchedMessage(incomingPipelineMetricsTags); + var childScope = rootBuilder.CreateAsyncScope(); await using (childScope.ConfigureAwait(false)) { @@ -61,17 +64,13 @@ public async Task Invoke(MessageContext messageContext, CancellationToken cancel ex.Data["Pipeline canceled"] = transportReceiveContext.CancellationToken.IsCancellationRequested; + incomingPipelineMetrics.RecordMessageProcessingFailure(incomingPipelineMetricsTags, ex); + throw; } - await receivePipelineNotification.Raise(new ReceivePipelineCompleted(message, pipelineStartedAt, DateTimeOffset.UtcNow), cancellationToken).ConfigureAwait(false); + var completedAt = DateTimeOffset.UtcNow; + await receivePipelineNotification.Raise(new ReceivePipelineCompleted(message, pipelineStartedAt, completedAt), cancellationToken).ConfigureAwait(false); } } - - readonly IServiceProvider rootBuilder; - readonly IPipelineCache pipelineCache; - readonly MessageOperations messageOperations; - readonly INotificationSubscriptions receivePipelineNotification; - readonly IPipeline receivePipeline; - readonly IActivityFactory activityFactory; } \ No newline at end of file diff --git a/src/NServiceBus.Core/Pipeline/PipelineComponent.cs b/src/NServiceBus.Core/Pipeline/PipelineComponent.cs index 207c93e86ec..97d2641b355 100644 --- a/src/NServiceBus.Core/Pipeline/PipelineComponent.cs +++ b/src/NServiceBus.Core/Pipeline/PipelineComponent.cs @@ -1,6 +1,7 @@ namespace NServiceBus; using System; +using System.Diagnostics.Metrics; using Microsoft.Extensions.DependencyInjection; using Pipeline; @@ -11,7 +12,8 @@ class PipelineComponent this.modifications = modifications; } - public static PipelineComponent Initialize(PipelineSettings settings, HostingComponent.Configuration hostingConfiguration) + public static PipelineComponent Initialize(PipelineSettings settings, + HostingComponent.Configuration hostingConfiguration, ReceiveComponent.Configuration receiveConfiguration) { var modifications = settings.modifications; @@ -25,6 +27,14 @@ public static PipelineComponent Initialize(PipelineSettings settings, HostingCom step.ApplyContainerRegistration(hostingConfiguration.Services); } + // make the PipelineMetrics available to the Pipeline + hostingConfiguration.Services.AddSingleton(sp => + { + var meterFactory = sp.GetService(); + string discriminator = receiveConfiguration.InstanceSpecificQueueAddress?.Discriminator ?? ""; + return new IncomingPipelineMetrics(meterFactory, receiveConfiguration.QueueNameBase, discriminator); + }); + return new PipelineComponent(modifications); } diff --git a/src/NServiceBus.Core/Receiving/ReceiveComponent.cs b/src/NServiceBus.Core/Receiving/ReceiveComponent.cs index bcc6050f667..74b9eb7d1be 100644 --- a/src/NServiceBus.Core/Receiving/ReceiveComponent.cs +++ b/src/NServiceBus.Core/Receiving/ReceiveComponent.cs @@ -63,7 +63,7 @@ public static ReceiveComponent Configure( pipelineSettings.Register("TransportReceiveToPhysicalMessageProcessingConnector", b => { var storage = b.GetService() ?? new NoOpOutboxStorage(); - return new TransportReceiveToPhysicalMessageConnector(storage); + return new TransportReceiveToPhysicalMessageConnector(storage, b.GetRequiredService()); }, "Allows to abort processing the message"); pipelineSettings.Register("LoadHandlersConnector", b => @@ -71,7 +71,7 @@ public static ReceiveComponent Configure( return new LoadHandlersConnector(b.GetRequiredService()); }, "Gets all the handlers to invoke from the MessageHandler registry based on the message type."); - pipelineSettings.Register("InvokeHandlers", new InvokeHandlerTerminator(hostingConfiguration.ActivityFactory), "Calls the IHandleMessages.Handle(T)"); + pipelineSettings.Register("InvokeHandlers", sp => new InvokeHandlerTerminator(hostingConfiguration.ActivityFactory, sp.GetService()), "Calls the IHandleMessages.Handle(T)"); var handlerDiagnostics = new Dictionary>(); @@ -154,7 +154,8 @@ public async Task Initialize( var receivePipeline = pipelineComponent.CreatePipeline(builder); - var mainPipelineExecutor = new MainPipelineExecutor(builder, pipelineCache, messageOperations, configuration.PipelineCompletedSubscribers, receivePipeline, activityFactory); + var pipelineMetrics = builder.GetService(); + var mainPipelineExecutor = new MainPipelineExecutor(builder, pipelineCache, messageOperations, configuration.PipelineCompletedSubscribers, receivePipeline, activityFactory, pipelineMetrics); var recoverabilityPipelineExecutor = recoverabilityComponent.CreateRecoverabilityPipelineExecutor( builder, diff --git a/src/NServiceBus.Core/Recoverability/RecoverabilityComponent.cs b/src/NServiceBus.Core/Recoverability/RecoverabilityComponent.cs index d284a3a8757..1f7a7c53c9a 100644 --- a/src/NServiceBus.Core/Recoverability/RecoverabilityComponent.cs +++ b/src/NServiceBus.Core/Recoverability/RecoverabilityComponent.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; using System.Linq; +using Microsoft.Extensions.DependencyInjection; using NServiceBus.Hosting; using NServiceBus.Logging; using NServiceBus.Pipeline; @@ -56,7 +57,7 @@ public void Initialize( faultMetadataExtractor = CreateFaultMetadataExtractor(); - pipelineSettings.Register(new RecoverabilityRoutingConnector(messageRetryNotification, messageFaultedNotification), "Executes the configured retry policy"); + pipelineSettings.Register(sp => new RecoverabilityRoutingConnector(sp.GetRequiredService(), messageRetryNotification, messageFaultedNotification), "Executes the configured retry policy"); hostingConfiguration.AddStartupDiagnosticsSection("Recoverability", new { diff --git a/src/NServiceBus.Core/Recoverability/RecoverabilityRoutingConnector.cs b/src/NServiceBus.Core/Recoverability/RecoverabilityRoutingConnector.cs index 799ca062201..a383541a84d 100644 --- a/src/NServiceBus.Core/Recoverability/RecoverabilityRoutingConnector.cs +++ b/src/NServiceBus.Core/Recoverability/RecoverabilityRoutingConnector.cs @@ -1,16 +1,19 @@ namespace NServiceBus; using System; -using System.Diagnostics; using System.Threading.Tasks; using Pipeline; class RecoverabilityRoutingConnector : StageConnector { + readonly IncomingPipelineMetrics incomingPipelineMetrics; + public RecoverabilityRoutingConnector( + IncomingPipelineMetrics incomingPipelineMetrics, INotificationSubscriptions messageRetryNotification, INotificationSubscriptions messageFaultedNotification) { + this.incomingPipelineMetrics = incomingPipelineMetrics; notifications = new CompositeNotification(); notifications.Register(messageRetryNotification); notifications.Register(messageFaultedNotification); @@ -18,7 +21,6 @@ public RecoverabilityRoutingConnector( public override async Task Invoke(IRecoverabilityContext context, Func stage) { - var availableMetricTags = context.Extensions.Get(); var recoverabilityActionContext = context.PreventChanges(); var recoverabilityAction = context.RecoverabilityAction; var routingContexts = recoverabilityAction @@ -29,22 +31,17 @@ public override async Task Invoke(IRecoverabilityContext context, Func headers Extensions = context; ReceiveAddress = receiveAddress; TransportTransaction = transportTransaction; + + context.GetOrCreate(); } ///