Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce asynchronous message processing #891

Merged
merged 2 commits into from
Apr 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Build/MQTTnet.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
* [LowLevelMqttClient] Added low level MQTT client in order to provide more flexibility when using the MQTT protocol. This client requires detailed knowledge about the MQTT protocol.
* [Client] Improve connection stability (thanks to @jltjohanlindqvist).
* [Client] Support WithConnectionUri to configure client (thanks to @PMExtra).
* [Client] Support PublishAsync with QoS 1 and QoS 2 from within an ApplicationMessageReceivedHandler (#648, #587, thanks to @PSanetra).
* [Client] Fixed MqttCommunicationTimedOutExceptions, caused by a long running ApplicationMessageReceivedHandler, which blocked MQTT packets from being processed (#829, thanks to @PSanetra).
* [ManagedClient] Added builder class for MqttClientUnsubscribeOptions (thanks to @dominikviererbe).
* [ManagedClient] Added support for persisted sessions (thansk to @PMExtra).
* [ManagedClient] Fixed a memory leak (thanks to @zawodskoj).
Expand Down
92 changes: 64 additions & 28 deletions Source/MQTTnet/Client/MqttClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public class MqttClient : Disposable, IMqttClient
private CancellationTokenSource _backgroundCancellationTokenSource;
private Task _packetReceiverTask;
private Task _keepAlivePacketsSenderTask;
private Task _publishPacketReceiverTask;

private AsyncQueue<MqttPublishPacket> _publishPacketReceiverQueue;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this must be disposed as soon as the client gets disposed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


private IMqttChannelAdapter _adapter;
private bool _cleanDisconnectInitiated;
Expand Down Expand Up @@ -88,6 +91,9 @@ public async Task<MqttClientAuthenticateResult> ConnectAsync(IMqttClientOptions
await _adapter.ConnectAsync(options.CommunicationTimeout, combined.Token).ConfigureAwait(false);
_logger.Verbose("Connection with server established.");

_publishPacketReceiverQueue = new AsyncQueue<MqttPublishPacket>();
_publishPacketReceiverTask = Task.Run(() => ProcessReceivedPublishPackets(backgroundCancellationToken), backgroundCancellationToken);

_packetReceiverTask = Task.Run(() => TryReceivePacketsAsync(backgroundCancellationToken), backgroundCancellationToken);

authenticateResult = await AuthenticateAsync(adapter, options.WillMessage, combined.Token).ConfigureAwait(false);
Expand Down Expand Up @@ -230,6 +236,9 @@ private void Cleanup()
_backgroundCancellationTokenSource?.Dispose();
_backgroundCancellationTokenSource = null;

_publishPacketReceiverQueue?.Dispose();
_publishPacketReceiverQueue = null;

_adapter?.Dispose();
_adapter = null;
}
Expand Down Expand Up @@ -300,9 +309,12 @@ private async Task DisconnectInternalAsync(Task sender, Exception exception, Mqt
try
{
var receiverTask = WaitForTaskAsync(_packetReceiverTask, sender);
var publishPacketReceiverTask = WaitForTaskAsync(_publishPacketReceiverTask, sender);
var keepAliveTask = WaitForTaskAsync(_keepAlivePacketsSenderTask, sender);

await Task.WhenAll(receiverTask, keepAliveTask).ConfigureAwait(false);
await Task.WhenAll(receiverTask, publishPacketReceiverTask, keepAliveTask).ConfigureAwait(false);

_publishPacketReceiverQueue.Dispose();
}
catch (Exception e)
{
Expand Down Expand Up @@ -522,7 +534,7 @@ private async Task TryProcessReceivedPacketAsync(MqttBasePacket packet, Cancella

if (packet is MqttPublishPacket publishPacket)
{
await TryProcessReceivedPublishPacketAsync(publishPacket, cancellationToken).ConfigureAwait(false);
EnqueueReceivedPublishPacket(publishPacket);
}
else if (packet is MqttPubRelPacket pubRelPacket)
{
Expand Down Expand Up @@ -584,47 +596,71 @@ await SendAsync(new MqttPubCompPacket
}
}

private async Task TryProcessReceivedPublishPacketAsync(MqttPublishPacket publishPacket, CancellationToken cancellationToken)
private void EnqueueReceivedPublishPacket(MqttPublishPacket publishPacket)
{
try
{
if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
{
await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false);
}
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
_publishPacketReceiverQueue.Enqueue(publishPacket);
}
catch (Exception exception)
{
_logger.Error(exception, "Error while enqueueing application message.");
}
}

private async Task ProcessReceivedPublishPackets(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
if (await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false))
var publishPacketDequeueResult = await _publishPacketReceiverQueue.TryDequeueAsync(cancellationToken);

if (!publishPacketDequeueResult.IsSuccess)
{
return;
}

var publishPacket = publishPacketDequeueResult.Item;

if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
{
await SendAsync(new MqttPubAckPacket
await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false);
}
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
{
if (await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false))
{
PacketIdentifier = publishPacket.PacketIdentifier,
ReasonCode = MqttPubAckReasonCode.Success
}, cancellationToken).ConfigureAwait(false);
await SendAsync(new MqttPubAckPacket
{
PacketIdentifier = publishPacket.PacketIdentifier,
ReasonCode = MqttPubAckReasonCode.Success
}, cancellationToken).ConfigureAwait(false);
}
}
}
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
{
if (await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false))
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
{
var pubRecPacket = new MqttPubRecPacket
if (await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false))
{
PacketIdentifier = publishPacket.PacketIdentifier,
ReasonCode = MqttPubRecReasonCode.Success
};
var pubRecPacket = new MqttPubRecPacket
{
PacketIdentifier = publishPacket.PacketIdentifier,
ReasonCode = MqttPubRecReasonCode.Success
};

await SendAsync(pubRecPacket, cancellationToken).ConfigureAwait(false);
await SendAsync(pubRecPacket, cancellationToken).ConfigureAwait(false);
}
}
else
{
throw new MqttProtocolViolationException("Received a not supported QoS level.");
}
}
else
catch (Exception exception)
{
throw new MqttProtocolViolationException("Received a not supported QoS level.");
_logger.Error(exception, "Error while handling application message.");
}
}
catch (Exception exception)
{
_logger.Error(exception, "Error while handling application message.");
}
}

private async Task<MqttClientPublishResult> PublishAtMostOnce(MqttPublishPacket publishPacket, CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,15 @@ public async Task<AsyncQueueDequeueResult<TItem>> TryDequeueAsync(CancellationTo
{
while (!cancellationToken.IsCancellationRequested)
{
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);

cancellationToken.ThrowIfCancellationRequested();
try
{
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested();
}
catch (OperationCanceledException)
{
return new AsyncQueueDequeueResult<TItem>(false, default(TItem));
}

if (_queue.TryDequeue(out var item))
{
Expand Down
38 changes: 25 additions & 13 deletions Source/MQTTnet/Server/MqttClientConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public sealed class MqttClientConnection : IDisposable
readonly CancellationTokenSource _cancellationToken = new CancellationTokenSource();

readonly IMqttRetainedMessagesManager _retainedMessagesManager;
readonly Func<Task> _onStart;
readonly Func<MqttClientDisconnectType, Task> _onStop;
readonly MqttClientKeepAliveMonitor _keepAliveMonitor;
readonly MqttClientSessionsManager _sessionsManager;

Expand All @@ -34,7 +36,7 @@ public sealed class MqttClientConnection : IDisposable
readonly string _endpoint;
readonly DateTime _connectedTimestamp;

Task<MqttClientDisconnectType> _packageReceiverTask;
volatile Task _packageReceiverTask;
DateTime _lastPacketReceivedTimestamp;
DateTime _lastNonKeepAlivePacketReceivedTimestamp;

Expand All @@ -43,7 +45,7 @@ public sealed class MqttClientConnection : IDisposable
long _receivedApplicationMessagesCount;
long _sentApplicationMessagesCount;

bool _isTakeover;
volatile bool _isTakeover;

public MqttClientConnection(
MqttConnectPacket connectPacket,
Expand All @@ -52,12 +54,16 @@ public MqttClientConnection(
IMqttServerOptions serverOptions,
MqttClientSessionsManager sessionsManager,
IMqttRetainedMessagesManager retainedMessagesManager,
Func<Task> onStart,
Func<MqttClientDisconnectType, Task> onStop,
IMqttNetLogger logger)
{
Session = session ?? throw new ArgumentNullException(nameof(session));
_serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(serverOptions));
_sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager));
_retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager));
_onStart = onStart ?? throw new ArgumentNullException(nameof(onStart));
_onStop = onStop ?? throw new ArgumentNullException(nameof(onStop));

_channelAdapter = channelAdapter ?? throw new ArgumentNullException(nameof(channelAdapter));
_dataConverter = _channelAdapter.PacketFormatterAdapter.DataConverter;
Expand All @@ -80,15 +86,13 @@ public MqttClientConnection(

public MqttClientSession Session { get; }

public bool IsFinalized { get; set; }

public Task StopAsync(bool isTakeover = false)
{
_isTakeover = isTakeover;
var task = _packageReceiverTask;

StopInternal();

var task = _packageReceiverTask;
if (task != null)
{
return task;
Expand Down Expand Up @@ -127,17 +131,18 @@ public void Dispose()
_cancellationToken.Dispose();
}

public Task<MqttClientDisconnectType> RunAsync(MqttConnectionValidatorContext connectionValidatorContext)
public Task RunAsync(MqttConnectionValidatorContext connectionValidatorContext)
{
_packageReceiverTask = RunInternalAsync(connectionValidatorContext);
return _packageReceiverTask;
}

async Task<MqttClientDisconnectType> RunInternalAsync(MqttConnectionValidatorContext connectionValidatorContext)
async Task RunInternalAsync(MqttConnectionValidatorContext connectionValidatorContext)
{
var disconnectType = MqttClientDisconnectType.NotClean;
try
{
await _onStart();
_logger.Info("Client '{0}': Session started.", ClientId);

_channelAdapter.ReadingPacketStartedCallback = OnAdapterReadingPacketStarted;
Expand Down Expand Up @@ -241,6 +246,11 @@ await SendAsync(
}
finally
{
if (_isTakeover)
{
disconnectType = MqttClientDisconnectType.Takeover;
}

if (Session.WillMessage != null)
{
_sessionsManager.DispatchApplicationMessage(Session.WillMessage, this);
Expand All @@ -255,14 +265,16 @@ await SendAsync(
_logger.Info("Client '{0}': Connection stopped.", ClientId);

_packageReceiverTask = null;
}

if (_isTakeover)
{
return MqttClientDisconnectType.Takeover;
try
{
await _onStop(disconnectType);
}
catch (Exception e)
{
_logger.Error(e, "client '{0}': Error while cleaning up", ClientId);
}
}

return disconnectType;
}

void StopInternal()
Expand Down
Loading