diff --git a/src/NServiceBus.Core.Tests/Pipeline/IncomingPipelineMetricTagsTests.cs b/src/NServiceBus.Core.Tests/Pipeline/IncomingPipelineMetricTagsTests.cs new file mode 100644 index 00000000000..356863fb4e4 --- /dev/null +++ b/src/NServiceBus.Core.Tests/Pipeline/IncomingPipelineMetricTagsTests.cs @@ -0,0 +1,57 @@ +namespace NServiceBus.Core.Tests.Pipeline.Incoming; + +using System; +using System.Collections.Generic; +using System.IO; +using System.Threading.Tasks; +using MessageInterfaces.MessageMapper.Reflection; +using NServiceBus.Pipeline; +using NUnit.Framework; +using Serialization; +using Testing; +using Transport; +using Unicast.Messages; + +[TestFixture] +public class IncomingPipelineMetricTagsTests +{ + [Test] + public async Task Should_not_fail_when_handling_more_than_one_logical_message() + { + var registry = new MessageMetadataRegistry(new Conventions().IsMessageType, true); + + registry.RegisterMessageTypesFoundIn( + [ + typeof(MyMessage) + ]); + + var context = new TestableIncomingPhysicalMessageContext + { + Message = new IncomingMessage("messageId", new Dictionary + { + { Headers.EnclosedMessageTypes, typeof(MyMessage).AssemblyQualifiedName } + }, new ReadOnlyMemory(new byte[] { 1 })) + }; + + var messageMapper = new MessageMapper(); + var behavior = new DeserializeMessageConnector(new MessageDeserializerResolver(new FakeSerializer(), []), new LogicalMessageFactory(registry, messageMapper), registry, messageMapper, false); + + await behavior.Invoke(context, c => + { + c.Extensions.Get().Add("Same", "Same"); + return Task.CompletedTask; + }); + + Assert.That(true, Is.True); + } + + class MyMessage : IMessage { } + + class FakeSerializer : IMessageSerializer + { + public string ContentType { get; } + public void Serialize(object message, Stream stream) => throw new NotImplementedException(); + + public object[] Deserialize(ReadOnlyMemory body, IList messageTypes = null) => [new MyMessage(), new MyMessage()]; + } +} \ No newline at end of file diff --git a/src/NServiceBus.Core/Pipeline/Incoming/IncomingPipelineMetricTags.cs b/src/NServiceBus.Core/Pipeline/Incoming/IncomingPipelineMetricTags.cs index 702b97362ce..534d735e08b 100644 --- a/src/NServiceBus.Core/Pipeline/Incoming/IncomingPipelineMetricTags.cs +++ b/src/NServiceBus.Core/Pipeline/Incoming/IncomingPipelineMetricTags.cs @@ -14,14 +14,15 @@ public sealed class IncomingPipelineMetricTags Dictionary>? tags; /// - /// Adds the specified tag and value to the collection. + /// Adds the specified tag and value to the collection if not already present. /// /// The tag to add. /// The value assigned to the tag. public void Add(string tagKey, object value) { tags ??= []; - tags.Add(tagKey, new(tagKey, value)); + // We are using tryAdd to mitigate multiple logical messages transmitted in a single physical message + tags.TryAdd(tagKey, new(tagKey, value)); } ///