diff --git a/Source/MQTTnet.Server/Exceptions/MqttPendingMessagesOverflowException.cs b/Source/MQTTnet.Server/Exceptions/MqttPendingMessagesOverflowException.cs
new file mode 100644
index 000000000..0c045bc8c
--- /dev/null
+++ b/Source/MQTTnet.Server/Exceptions/MqttPendingMessagesOverflowException.cs
@@ -0,0 +1,19 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+namespace MQTTnet.Server.Exceptions;
+
+public class MqttPendingMessagesOverflowException : Exception
+{
+ public MqttPendingMessagesOverflowException(string sessionId, MqttPendingMessagesOverflowStrategy overflowStrategy) : base(
+ $"Send buffer max pending messages overflow occurred for session '{sessionId}'. Strategy: {overflowStrategy}.")
+ {
+ SessionId = sessionId;
+ OverflowStrategy = overflowStrategy;
+ }
+
+ public MqttPendingMessagesOverflowStrategy OverflowStrategy { get; }
+
+ public string SessionId { get; }
+}
\ No newline at end of file
diff --git a/Source/MQTTnet.Server/Internal/MqttSession.cs b/Source/MQTTnet.Server/Internal/MqttSession.cs
index 55814b849..7e3ab1279 100644
--- a/Source/MQTTnet.Server/Internal/MqttSession.cs
+++ b/Source/MQTTnet.Server/Internal/MqttSession.cs
@@ -6,6 +6,7 @@
using MQTTnet.Internal;
using MQTTnet.Packets;
using MQTTnet.Protocol;
+using MQTTnet.Server.Exceptions;
namespace MQTTnet.Server.Internal;
@@ -111,10 +112,11 @@ public void EnqueueControlPacket(MqttPacketBusItem packetBusItem)
public EnqueueDataPacketResult EnqueueDataPacket(MqttPacketBusItem packetBusItem)
{
- if (_packetBus.ItemsCount(MqttPacketBusPartition.Data) >= _serverOptions.MaxPendingMessagesPerClient)
+ if (PendingDataPacketsCount >= _serverOptions.MaxPendingMessagesPerClient)
{
if (_serverOptions.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
{
+ packetBusItem.Fail(new MqttPendingMessagesOverflowException(Id, _serverOptions.PendingMessagesOverflowStrategy));
return EnqueueDataPacketResult.Dropped;
}
@@ -123,10 +125,15 @@ public EnqueueDataPacketResult EnqueueDataPacket(MqttPacketBusItem packetBusItem
// Only drop from the data partition. Dropping from control partition might break the connection
// because the client does not receive PINGREQ packets etc. any longer.
var firstItem = _packetBus.DropFirstItem(MqttPacketBusPartition.Data);
- if (firstItem != null && _eventContainer.QueuedApplicationMessageOverwrittenEvent.HasHandlers)
+ if (firstItem != null)
{
- var eventArgs = new QueueMessageOverwrittenEventArgs(Id, firstItem.Packet);
- _eventContainer.QueuedApplicationMessageOverwrittenEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
+ firstItem.Fail(new MqttPendingMessagesOverflowException(Id, _serverOptions.PendingMessagesOverflowStrategy));
+
+ if (_eventContainer.QueuedApplicationMessageOverwrittenEvent.HasHandlers)
+ {
+ var eventArgs = new QueueMessageOverwrittenEventArgs(Id, firstItem.Packet);
+ _eventContainer.QueuedApplicationMessageOverwrittenEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
+ }
}
}
}
diff --git a/Source/MQTTnet.Server/Options/MqttServerOptionsBuilder.cs b/Source/MQTTnet.Server/Options/MqttServerOptionsBuilder.cs
index 2e86e21eb..c238ea016 100644
--- a/Source/MQTTnet.Server/Options/MqttServerOptionsBuilder.cs
+++ b/Source/MQTTnet.Server/Options/MqttServerOptionsBuilder.cs
@@ -115,6 +115,12 @@ public MqttServerOptionsBuilder WithMaxPendingMessagesPerClient(int value)
return this;
}
+ public MqttServerOptionsBuilder WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy value)
+ {
+ _options.PendingMessagesOverflowStrategy = value;
+ return this;
+ }
+
public MqttServerOptionsBuilder WithoutDefaultEndpoint()
{
_options.DefaultEndpointOptions.IsEnabled = false;
diff --git a/Source/MQTTnet.Server/Status/MqttSessionStatus.cs b/Source/MQTTnet.Server/Status/MqttSessionStatus.cs
index ecb8460fb..780c08130 100644
--- a/Source/MQTTnet.Server/Status/MqttSessionStatus.cs
+++ b/Source/MQTTnet.Server/Status/MqttSessionStatus.cs
@@ -40,22 +40,65 @@ public Task DeleteAsync()
return _session.DeleteAsync();
}
- public Task DeliverApplicationMessageAsync(MqttApplicationMessage applicationMessage)
+ ///
+ /// Delivers an application message immediately to the session.
+ ///
+ /// The application message to deliver.
+ ///
+ /// A task that represents the asynchronous operation.
+ /// The result contains the that includes the packet identifier of the enqueued message.
+ ///
+ public async Task DeliverApplicationMessageAsync(MqttApplicationMessage applicationMessage)
{
ArgumentNullException.ThrowIfNull(applicationMessage);
- var packetBusItem = new MqttPacketBusItem(MqttPublishPacketFactory.Create(applicationMessage));
+ var publishPacket = MqttPublishPacketFactory.Create(applicationMessage);
+ var packetBusItem = new MqttPacketBusItem(publishPacket);
_session.EnqueueDataPacket(packetBusItem);
- return packetBusItem.WaitAsync();
+ await packetBusItem.WaitAsync().ConfigureAwait(false);
+
+ var injectResult = new InjectMqttApplicationMessageResult()
+ {
+ PacketIdentifier = publishPacket.PacketIdentifier
+ };
+
+ return injectResult;
}
- public Task EnqueueApplicationMessageAsync(MqttApplicationMessage applicationMessage)
+ ///
+ /// Attempts to enqueue an application message to the session's send buffer.
+ ///
+ /// The application message to enqueue.
+ /// that includes the packet identifier of the enqueued message.
+ /// true if the message was successfully enqueued; otherwise, false.
+ ///
+ /// When is set to ,
+ /// this method always returns true.
+ /// However, an existing message in the queue may be dropped later to make room for the newly enqueued message.
+ /// Such dropped messages can be tracked by subscribing to event.
+ ///
+ public bool TryEnqueueApplicationMessage(MqttApplicationMessage applicationMessage, out InjectMqttApplicationMessageResult injectResult)
{
ArgumentNullException.ThrowIfNull(applicationMessage);
- _session.EnqueueDataPacket(new MqttPacketBusItem(MqttPublishPacketFactory.Create(applicationMessage)));
+ var publishPacket = MqttPublishPacketFactory.Create(applicationMessage);
+ var enqueueDataPacketResult = _session.EnqueueDataPacket(new MqttPacketBusItem(publishPacket));
+
+ if (enqueueDataPacketResult != EnqueueDataPacketResult.Enqueued)
+ {
+ injectResult = null;
+ return false;
+ }
+ injectResult = new InjectMqttApplicationMessageResult() { PacketIdentifier = publishPacket.PacketIdentifier };
+ return true;
+ }
+
+ [Obsolete("This method is obsolete. Use TryEnqueueApplicationMessage instead.")]
+ public Task EnqueueApplicationMessageAsync(MqttApplicationMessage applicationMessage)
+ {
+ TryEnqueueApplicationMessage(applicationMessage, out _);
return CompletedTask.Instance;
}
}
\ No newline at end of file
diff --git a/Source/MQTTnet.Tests/BaseTestClass.cs b/Source/MQTTnet.Tests/BaseTestClass.cs
index 8e5248e7f..00291b024 100644
--- a/Source/MQTTnet.Tests/BaseTestClass.cs
+++ b/Source/MQTTnet.Tests/BaseTestClass.cs
@@ -13,10 +13,11 @@ namespace MQTTnet.Tests
public abstract class BaseTestClass
{
public TestContext TestContext { get; set; }
-
- protected TestEnvironment CreateTestEnvironment(MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311)
+
+ protected TestEnvironment CreateTestEnvironment(
+ MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311, bool trackUnobservedTaskException = true)
{
- return new TestEnvironment(TestContext, protocolVersion);
+ return new TestEnvironment(TestContext, protocolVersion, trackUnobservedTaskException);
}
protected Task LongTestDelay()
diff --git a/Source/MQTTnet.Tests/Mockups/TestEnvironment.cs b/Source/MQTTnet.Tests/Mockups/TestEnvironment.cs
index 4f1391f15..6f2d8ae73 100644
--- a/Source/MQTTnet.Tests/Mockups/TestEnvironment.cs
+++ b/Source/MQTTnet.Tests/Mockups/TestEnvironment.cs
@@ -32,12 +32,16 @@ public TestEnvironment() : this(null)
{
}
- public TestEnvironment(TestContext testContext, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311)
+ public TestEnvironment(
+ TestContext testContext, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311, bool trackUnobservedTaskException = true)
{
_protocolVersion = protocolVersion;
TestContext = testContext;
- TaskScheduler.UnobservedTaskException += TrackUnobservedTaskException;
+ if (trackUnobservedTaskException)
+ {
+ TaskScheduler.UnobservedTaskException += TrackUnobservedTaskException;
+ }
ServerLogger.LogMessagePublished += (s, e) =>
{
diff --git a/Source/MQTTnet.Tests/Server/Injection_Tests.cs b/Source/MQTTnet.Tests/Server/Injection_Tests.cs
index cefbc34dd..85ac53414 100644
--- a/Source/MQTTnet.Tests/Server/Injection_Tests.cs
+++ b/Source/MQTTnet.Tests/Server/Injection_Tests.cs
@@ -1,7 +1,11 @@
+using System;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Internal;
+using MQTTnet.Packets;
+using MQTTnet.Protocol;
using MQTTnet.Server;
+using MQTTnet.Server.Exceptions;
namespace MQTTnet.Tests.Server
{
@@ -9,79 +13,443 @@ namespace MQTTnet.Tests.Server
public sealed class Injection_Tests : BaseTestClass
{
[TestMethod]
- public async Task Inject_Application_Message_At_Session_Level()
+ public async Task Enqueue_Application_Message_At_Session_Level()
{
- using (var testEnvironment = CreateTestEnvironment())
+ using var testEnvironment = CreateTestEnvironment();
+
+ var server = await testEnvironment.StartServer();
+ var receiver1 = await testEnvironment.ConnectClient();
+ var receiver2 = await testEnvironment.ConnectClient();
+ var messageReceivedHandler1 = testEnvironment.CreateApplicationMessageHandler(receiver1);
+ var messageReceivedHandler2 = testEnvironment.CreateApplicationMessageHandler(receiver2);
+
+ var status = await server.GetSessionsAsync();
+ var clientStatus = status[0];
+
+ await receiver1.SubscribeAsync("#");
+ await receiver2.SubscribeAsync("#");
+
+ var message = new MqttApplicationMessageBuilder()
+ .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
+ .WithTopic("InjectedOne").Build();
+
+ var enqueued = clientStatus.TryEnqueueApplicationMessage(message, out var injectResult);
+
+ Assert.IsTrue(enqueued);
+
+ await LongTestDelay();
+
+ Assert.AreEqual(1, messageReceivedHandler1.ReceivedEventArgs.Count);
+ Assert.AreEqual(injectResult.PacketIdentifier, messageReceivedHandler1.ReceivedEventArgs[0].PacketIdentifier);
+ Assert.AreEqual("InjectedOne", messageReceivedHandler1.ReceivedEventArgs[0].ApplicationMessage.Topic);
+
+ // The second receiver should NOT receive the message.
+ Assert.AreEqual(0, messageReceivedHandler2.ReceivedEventArgs.Count);
+ }
+
+ [TestMethod]
+ public async Task Enqueue_Application_Message_At_Session_Level_QueueOverflow_DropNewMessageStrategy()
+ {
+ using var testEnvironment = CreateTestEnvironment(trackUnobservedTaskException: false);
+
+ var server = await testEnvironment.StartServer(
+ builder => builder
+ .WithMaxPendingMessagesPerClient(1)
+ .WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy.DropNewMessage));
+
+ var receiver = await testEnvironment.ConnectClient();
+
+ var firstMessageOutboundPacketInterceptedTcs = new TaskCompletionSource();
+ server.InterceptingOutboundPacketAsync += async args =>
{
- var server = await testEnvironment.StartServer();
- var receiver1 = await testEnvironment.ConnectClient();
- var receiver2 = await testEnvironment.ConnectClient();
- var messageReceivedHandler1 = testEnvironment.CreateApplicationMessageHandler(receiver1);
- var messageReceivedHandler2 = testEnvironment.CreateApplicationMessageHandler(receiver2);
+ // - The first message is dequeued normally and calls this delay
+ // - The second message fills the outbound queue
+ // - The third message overflows the outbound queue
+ if (args.Packet is MqttPublishPacket)
+ {
+ firstMessageOutboundPacketInterceptedTcs.SetResult();
+ await Task.Delay(TimeSpan.FromDays(1), args.CancellationToken);
+ }
+ };
- var status = await server.GetSessionsAsync();
- var clientStatus = status[0];
+ var firstMessageEvicted = false;
+ var secondMessageEvicted = false;
+ var thirdMessageEvicted = false;
- await receiver1.SubscribeAsync("#");
- await receiver2.SubscribeAsync("#");
+ server.QueuedApplicationMessageOverwrittenAsync += args =>
+ {
+ if (args.Packet is not MqttPublishPacket publishPacket)
+ {
+ return Task.CompletedTask;
+ }
+
+ switch (publishPacket.Topic)
+ {
+ case "InjectedOne":
+ firstMessageEvicted = true;
+ break;
+ case "InjectedTwo":
+ secondMessageEvicted = true;
+ break;
+ case "InjectedThree":
+ thirdMessageEvicted = true;
+ break;
+ }
- await clientStatus.EnqueueApplicationMessageAsync(new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build());
+ return Task.CompletedTask;
+ };
- await LongTestDelay();
+ var status = await server.GetSessionsAsync();
+ var clientStatus = status[0];
+ await receiver.SubscribeAsync("#");
- Assert.AreEqual(1, messageReceivedHandler1.ReceivedEventArgs.Count);
- Assert.AreEqual("InjectedOne", messageReceivedHandler1.ReceivedEventArgs[0].ApplicationMessage.Topic);
+ var firstMessageEnqueued = clientStatus.TryEnqueueApplicationMessage(
+ new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build(), out _);
+ await firstMessageOutboundPacketInterceptedTcs.Task;
- // The second receiver should NOT receive the message.
- Assert.AreEqual(0, messageReceivedHandler2.ReceivedEventArgs.Count);
- }
+ var secondMessageEnqueued = clientStatus.TryEnqueueApplicationMessage(
+ new MqttApplicationMessageBuilder().WithTopic("InjectedTwo").Build(), out _);
+
+ var thirdMessageEnqueued = clientStatus.TryEnqueueApplicationMessage(
+ new MqttApplicationMessageBuilder().WithTopic("InjectedThree").Build(), out _);
+
+ // Due to the DropNewMessage strategy the third message will not be enqueued.
+ // As a result, no existing messages in the queue will be dropped (evicted).
+ Assert.IsTrue(firstMessageEnqueued);
+ Assert.IsTrue(secondMessageEnqueued);
+ Assert.IsFalse(thirdMessageEnqueued);
+
+ Assert.IsFalse(firstMessageEvicted);
+ Assert.IsFalse(secondMessageEvicted);
+ Assert.IsFalse(thirdMessageEvicted);
}
+
[TestMethod]
- public async Task Inject_ApplicationMessage_At_Server_Level()
+ public async Task Enqueue_Application_Message_At_Session_Level_QueueOverflow_DropOldestQueuedMessageStrategy()
{
- using (var testEnvironment = CreateTestEnvironment())
+ using var testEnvironment = CreateTestEnvironment(trackUnobservedTaskException: false);
+
+ var server = await testEnvironment.StartServer(
+ builder => builder
+ .WithMaxPendingMessagesPerClient(1)
+ .WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage));
+
+ var receiver = await testEnvironment.ConnectClient();
+
+ var firstMessageOutboundPacketInterceptedTcs = new TaskCompletionSource();
+ server.InterceptingOutboundPacketAsync += async args =>
{
- var server = await testEnvironment.StartServer();
+ // - The first message is dequeued normally and calls this delay
+ // - The second message fills the outbound queue
+ // - The third message overflows the outbound queue
+ if (args.Packet is MqttPublishPacket)
+ {
+ firstMessageOutboundPacketInterceptedTcs.SetResult();
+ await Task.Delay(TimeSpan.FromDays(1), args.CancellationToken);
+ }
+ };
+
+ var firstMessageEvicted = false;
+ var secondMessageEvicted = false;
+ var thirdMessageEvicted = false;
+
+ server.QueuedApplicationMessageOverwrittenAsync += args =>
+ {
+ if (args.Packet is not MqttPublishPacket publishPacket)
+ {
+ return Task.CompletedTask;
+ }
- var receiver = await testEnvironment.ConnectClient();
+ switch (publishPacket.Topic)
+ {
+ case "InjectedOne":
+ firstMessageEvicted = true;
+ break;
+ case "InjectedTwo":
+ secondMessageEvicted = true;
+ break;
+ case "InjectedThree":
+ thirdMessageEvicted = true;
+ break;
+ }
+
+ return Task.CompletedTask;
+ };
- var messageReceivedHandler = testEnvironment.CreateApplicationMessageHandler(receiver);
+ var status = await server.GetSessionsAsync();
+ var clientStatus = status[0];
+ await receiver.SubscribeAsync("#");
- await receiver.SubscribeAsync("#");
+ var firstMessageEnqueued = clientStatus.TryEnqueueApplicationMessage(
+ new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build(), out _);
+ await firstMessageOutboundPacketInterceptedTcs.Task;
- var injectedApplicationMessage = new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build();
+ var secondMessageEnqueued = clientStatus.TryEnqueueApplicationMessage(
+ new MqttApplicationMessageBuilder().WithTopic("InjectedTwo").Build(), out _);
- await server.InjectApplicationMessage(new InjectedMqttApplicationMessage(injectedApplicationMessage));
+ var thirdMessageEnqueued = clientStatus.TryEnqueueApplicationMessage(
+ new MqttApplicationMessageBuilder().WithTopic("InjectedThree").Build(), out _);
- await LongTestDelay();
+ // Due to the DropOldestQueuedMessage strategy, all messages will be enqueued initially.
+ // But the second message will eventually be dropped (evicted) to make room for the third one.
+ Assert.IsTrue(firstMessageEnqueued);
+ Assert.IsTrue(secondMessageEnqueued);
+ Assert.IsTrue(thirdMessageEnqueued);
- Assert.AreEqual(1, messageReceivedHandler.ReceivedEventArgs.Count);
- Assert.AreEqual("InjectedOne", messageReceivedHandler.ReceivedEventArgs[0].ApplicationMessage.Topic);
- }
+ Assert.IsFalse(firstMessageEvicted);
+ Assert.IsTrue(secondMessageEvicted);
+ Assert.IsFalse(thirdMessageEvicted);
}
[TestMethod]
- public async Task Intercept_Injected_Application_Message()
+ public async Task Deliver_Application_Message_At_Session_Level()
+ {
+ using var testEnvironment = CreateTestEnvironment();
+
+ var server = await testEnvironment.StartServer();
+ var receiver1 = await testEnvironment.ConnectClient();
+ var receiver2 = await testEnvironment.ConnectClient();
+ var messageReceivedHandler1 = testEnvironment.CreateApplicationMessageHandler(receiver1);
+ var messageReceivedHandler2 = testEnvironment.CreateApplicationMessageHandler(receiver2);
+
+ var status = await server.GetSessionsAsync();
+ var clientStatus = status[0];
+
+ await receiver1.SubscribeAsync("#");
+ await receiver2.SubscribeAsync("#");
+
+ var mqttApplicationMessage = new MqttApplicationMessageBuilder()
+ .WithTopic("InjectedOne")
+ .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
+ .Build();
+ var injectResult = await clientStatus.DeliverApplicationMessageAsync(mqttApplicationMessage);
+
+ await LongTestDelay();
+
+ Assert.AreEqual(1, messageReceivedHandler1.ReceivedEventArgs.Count);
+ Assert.AreEqual(injectResult.PacketIdentifier, messageReceivedHandler1.ReceivedEventArgs[0].PacketIdentifier);
+ Assert.AreEqual("InjectedOne", messageReceivedHandler1.ReceivedEventArgs[0].ApplicationMessage.Topic);
+
+ // The second receiver should NOT receive the message.
+ Assert.AreEqual(0, messageReceivedHandler2.ReceivedEventArgs.Count);
+ }
+
+ [TestMethod]
+ public async Task Deliver_Application_Message_At_Session_Level_QueueOverflow_DropNewMessageStrategy()
+ {
+ using var testEnvironment = CreateTestEnvironment();
+
+ var server = await testEnvironment.StartServer(
+ builder => builder
+ .WithMaxPendingMessagesPerClient(1)
+ .WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy.DropNewMessage));
+
+ var receiver = await testEnvironment.ConnectClient();
+
+ var firstMessageOutboundPacketInterceptedTcs = new TaskCompletionSource();
+ server.InterceptingOutboundPacketAsync += async args =>
+ {
+ // - The first message is dequeued normally and calls this delay
+ // - The second message fills the outbound queue
+ // - The third message overflows the outbound queue
+ if (args.Packet is MqttPublishPacket)
+ {
+ firstMessageOutboundPacketInterceptedTcs.SetResult();
+ await Task.Delay(TimeSpan.FromDays(1), args.CancellationToken);
+ }
+ };
+
+ var firstMessageEvicted = false;
+ var secondMessageEvicted = false;
+ var thirdMessageEvicted = false;
+
+ server.QueuedApplicationMessageOverwrittenAsync += args =>
+ {
+ if (args.Packet is not MqttPublishPacket publishPacket)
+ {
+ return Task.CompletedTask;
+ }
+
+ switch (publishPacket.Topic)
+ {
+ case "InjectedOne":
+ firstMessageEvicted = true;
+ break;
+ case "InjectedTwo":
+ secondMessageEvicted = true;
+ break;
+ case "InjectedThree":
+ thirdMessageEvicted = true;
+ break;
+ }
+
+ return Task.CompletedTask;
+ };
+
+ var status = await server.GetSessionsAsync();
+ var clientStatus = status[0];
+ await receiver.SubscribeAsync("#");
+
+ var firstMessageTask = Task.Run(
+ () => clientStatus.DeliverApplicationMessageAsync(
+ new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build()));
+ await LongTestDelay();
+ await firstMessageOutboundPacketInterceptedTcs.Task;
+
+ var secondMessageTask = Task.Run(
+ () => clientStatus.DeliverApplicationMessageAsync(
+ new MqttApplicationMessageBuilder().WithTopic("InjectedTwo").Build()));
+ await LongTestDelay();
+
+ var thirdMessageTask = Task.Run(
+ () => clientStatus.DeliverApplicationMessageAsync(
+ new MqttApplicationMessageBuilder().WithTopic("InjectedThree").Build()));
+ await LongTestDelay();
+
+ Task.WaitAny(firstMessageTask, secondMessageTask, thirdMessageTask);
+
+ // Due to the DropNewMessage strategy the third message delivery will fail.
+ // As a result, no existing messages in the queue will be dropped (evicted).
+ Assert.AreEqual(firstMessageTask.Status, TaskStatus.WaitingForActivation);
+ Assert.AreEqual(secondMessageTask.Status, TaskStatus.WaitingForActivation);
+ Assert.AreEqual(thirdMessageTask.Status, TaskStatus.Faulted);
+ Assert.IsTrue(thirdMessageTask.Exception?.InnerException is MqttPendingMessagesOverflowException);
+
+ Assert.IsFalse(firstMessageEvicted);
+ Assert.IsFalse(secondMessageEvicted);
+ Assert.IsFalse(thirdMessageEvicted);
+ }
+
+ [TestMethod]
+ public async Task Deliver_Application_Message_At_Session_Level_QueueOverflow_DropOldestQueuedMessageStrategy()
{
- using (var testEnvironment = CreateTestEnvironment())
+ using var testEnvironment = CreateTestEnvironment(trackUnobservedTaskException: false);
+
+ var server = await testEnvironment.StartServer(
+ builder => builder
+ .WithMaxPendingMessagesPerClient(1)
+ .WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage));
+
+ var receiver = await testEnvironment.ConnectClient();
+
+ var firstMessageOutboundPacketInterceptedTcs = new TaskCompletionSource();
+ server.InterceptingOutboundPacketAsync += async args =>
{
- var server = await testEnvironment.StartServer();
+ // - The first message is dequeued normally and calls this delay
+ // - The second message fills the outbound queue
+ // - The third message overflows the outbound queue
+ if (args.Packet is MqttPublishPacket)
+ {
+ firstMessageOutboundPacketInterceptedTcs.SetResult();
+ await Task.Delay(TimeSpan.FromDays(1), args.CancellationToken);
+ }
+ };
- MqttApplicationMessage interceptedMessage = null;
- server.InterceptingPublishAsync += eventArgs =>
+ var firstMessageEvicted = false;
+ var secondMessageEvicted = false;
+ var thirdMessageEvicted = false;
+
+ server.QueuedApplicationMessageOverwrittenAsync += args =>
+ {
+ if (args.Packet is not MqttPublishPacket publishPacket)
+ {
+ return Task.CompletedTask;
+ }
+
+ switch (publishPacket.Topic)
{
- interceptedMessage = eventArgs.ApplicationMessage;
- return CompletedTask.Instance;
- };
+ case "InjectedOne":
+ firstMessageEvicted = true;
+ break;
+ case "InjectedTwo":
+ secondMessageEvicted = true;
+ break;
+ case "InjectedThree":
+ thirdMessageEvicted = true;
+ break;
+ }
+
+ return Task.CompletedTask;
+ };
+
+ var status = await server.GetSessionsAsync();
+ var clientStatus = status[0];
+ await receiver.SubscribeAsync("#");
+
+ var firstMessageTask = Task.Run(
+ () => clientStatus.DeliverApplicationMessageAsync(
+ new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build()));
+ await LongTestDelay();
+ await firstMessageOutboundPacketInterceptedTcs.Task;
+
+ var secondMessageTask = Task.Run(
+ () => clientStatus.DeliverApplicationMessageAsync(
+ new MqttApplicationMessageBuilder().WithTopic("InjectedTwo").Build()));
+ await LongTestDelay();
+
+ var thirdMessageTask = Task.Run(
+ () => clientStatus.DeliverApplicationMessageAsync(
+ new MqttApplicationMessageBuilder().WithTopic("InjectedThree").Build()));
+ await LongTestDelay();
+
+ Task.WaitAny(firstMessageTask, secondMessageTask, thirdMessageTask);
+
+ // Due to the DropOldestQueuedMessage strategy, the second message delivery will fail
+ // to make room for the third one.
+ Assert.AreEqual(firstMessageTask.Status, TaskStatus.WaitingForActivation);
+ Assert.AreEqual(secondMessageTask.Status, TaskStatus.Faulted);
+ Assert.IsTrue(secondMessageTask.Exception?.InnerException is MqttPendingMessagesOverflowException);
+ Assert.AreEqual(thirdMessageTask.Status, TaskStatus.WaitingForActivation);
+
+ Assert.IsFalse(firstMessageEvicted);
+ Assert.IsTrue(secondMessageEvicted);
+ Assert.IsFalse(thirdMessageEvicted);
+ }
+
+ [TestMethod]
+ public async Task Inject_ApplicationMessage_At_Server_Level()
+ {
+ using var testEnvironment = CreateTestEnvironment();
+
+ var server = await testEnvironment.StartServer();
+
+ var receiver = await testEnvironment.ConnectClient();
+
+ var messageReceivedHandler = testEnvironment.CreateApplicationMessageHandler(receiver);
+
+ await receiver.SubscribeAsync("#");
+
+ var injectedApplicationMessage = new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build();
+
+ await server.InjectApplicationMessage(new InjectedMqttApplicationMessage(injectedApplicationMessage));
+
+ await LongTestDelay();
+
+ Assert.AreEqual(1, messageReceivedHandler.ReceivedEventArgs.Count);
+ Assert.AreEqual("InjectedOne", messageReceivedHandler.ReceivedEventArgs[0].ApplicationMessage.Topic);
+ }
+
+ [TestMethod]
+ public async Task Intercept_Injected_Application_Message()
+ {
+ using var testEnvironment = CreateTestEnvironment();
+
+ var server = await testEnvironment.StartServer();
+
+ MqttApplicationMessage interceptedMessage = null;
+ server.InterceptingPublishAsync += eventArgs =>
+ {
+ interceptedMessage = eventArgs.ApplicationMessage;
+ return CompletedTask.Instance;
+ };
- var injectedApplicationMessage = new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build();
- await server.InjectApplicationMessage(new InjectedMqttApplicationMessage(injectedApplicationMessage));
+ var injectedApplicationMessage = new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build();
+ await server.InjectApplicationMessage(new InjectedMqttApplicationMessage(injectedApplicationMessage));
- await LongTestDelay();
+ await LongTestDelay();
- Assert.IsNotNull(interceptedMessage);
- }
+ Assert.IsNotNull(interceptedMessage);
}
}
}
\ No newline at end of file
diff --git a/Source/MQTTnet/InjectMqttApplicationMessageResult.cs b/Source/MQTTnet/InjectMqttApplicationMessageResult.cs
new file mode 100644
index 000000000..b2e6da802
--- /dev/null
+++ b/Source/MQTTnet/InjectMqttApplicationMessageResult.cs
@@ -0,0 +1,10 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+namespace MQTTnet;
+
+public class InjectMqttApplicationMessageResult
+{
+ public ushort PacketIdentifier { get; init; }
+}
\ No newline at end of file
diff --git a/Source/MQTTnet/Internal/MqttPacketBus.cs b/Source/MQTTnet/Internal/MqttPacketBus.cs
index 8aec37565..46e2b3493 100644
--- a/Source/MQTTnet/Internal/MqttPacketBus.cs
+++ b/Source/MQTTnet/Internal/MqttPacketBus.cs
@@ -141,14 +141,6 @@ public List ExportPackets(MqttPacketBusPartition partition)
}
}
- public int ItemsCount(MqttPacketBusPartition partition)
- {
- lock (_syncRoot)
- {
- return _partitions[(int)partition].Count;
- }
- }
-
public int PartitionItemsCount(MqttPacketBusPartition partition)
{
lock (_syncRoot)
diff --git a/Source/MQTTnet/Internal/MqttPacketBusItem.cs b/Source/MQTTnet/Internal/MqttPacketBusItem.cs
index b94654cf5..6fb5b114d 100644
--- a/Source/MQTTnet/Internal/MqttPacketBusItem.cs
+++ b/Source/MQTTnet/Internal/MqttPacketBusItem.cs
@@ -10,8 +10,8 @@ namespace MQTTnet.Internal
{
public sealed class MqttPacketBusItem
{
- readonly AsyncTaskCompletionSource _promise = new AsyncTaskCompletionSource();
-
+ readonly AsyncTaskCompletionSource _promise = new AsyncTaskCompletionSource();
+
public MqttPacketBusItem(MqttPacket packet)
{
Packet = packet ?? throw new ArgumentNullException(nameof(packet));
@@ -28,7 +28,7 @@ public void Cancel()
public void Complete()
{
- _promise.TrySetResult(true);
+ _promise.TrySetResult(Packet);
Completed?.Invoke(this, EventArgs.Empty);
}
@@ -37,7 +37,7 @@ public void Fail(Exception exception)
_promise.TrySetException(exception);
}
- public Task WaitAsync()
+ public Task WaitAsync()
{
return _promise.Task;
}
diff --git a/Source/ReleaseNotes.md b/Source/ReleaseNotes.md
index 7c70587e3..e68ff4c39 100644
--- a/Source/ReleaseNotes.md
+++ b/Source/ReleaseNotes.md
@@ -9,6 +9,7 @@
* Removal of Managed Client **(BREAKING CHANGE)**
* Fixed missing release notes in nuget packages.
+
* Client: MQTT 5.0.0 is now the default version when connecting with a server **(BREAKING CHANGE)**
* Client: Fixed enhanced authentication.
* Client: Exposed WebSocket compression options in MQTT client options (thanks to @victornor, #2127)
@@ -19,3 +20,4 @@
* Server: Set SSL version to "None" which will let the OS choose the version **(BREAKING CHANGE)**
* Server: Added API for getting a single session (thanks to @AntonSmolkov, #2131)
* Server: Fixed "TryPrivate" (Mosquitto feature) handling (thanks to @victornor, #2125) **(BREAKING CHANGE)**
+* Server: Fixed dead lock when awaiting a packet transmission but the packet gets dropped due to quotas (#2117, thanks to @AntonSmolkov)