Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rethrow MqttClientUnexpectedDisconnectReceivedException when Publish … #1974

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ReleaseNotes.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
* [Core] Optimized packet serialization of PUBACK and PUBREC packets for protocol version 5.0.0 (#1939, thanks to @Y-Sindo).
* [Core] The package inspector is now fully async (#1941).
* [Client] Added a dedicated exception when the client is not connected (#1954, thanks to @marcpiulachs).
* [Client] The client will now throw a _MqttClientUnexpectedDisconnectReceivedException_ when publishing a QoS 0 message which leads to a server disconnect (BREAKING CHANGE!, #1974, thanks to @fazho).
* [Client] Exposed the certificate selection event handler in client options (#1984).
* [Server] The server will no longer send _NoMatchingSubscribers_ when the actual subscription was non success (#1965, BREAKING CHANGE!).
* [Server] Fixed broken support for _null_ in _AddServer_ method in ASP.NET integration (#1981).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Generic;
using MQTTnet.Exceptions;
using MQTTnet.Packets;
Expand All @@ -11,8 +12,9 @@ namespace MQTTnet.Client
{
public sealed class MqttClientUnexpectedDisconnectReceivedException : MqttCommunicationException
{
public MqttClientUnexpectedDisconnectReceivedException(MqttDisconnectPacket disconnectPacket)
: base($"Unexpected DISCONNECT (Reason code={disconnectPacket.ReasonCode}) received.")
public MqttClientUnexpectedDisconnectReceivedException(MqttDisconnectPacket disconnectPacket, Exception innerExcpetion = null) : base(
$"Unexpected DISCONNECT (Reason code={disconnectPacket.ReasonCode}) received.",
innerExcpetion)
{
ReasonCode = disconnectPacket.ReasonCode;
SessionExpiryInterval = disconnectPacket.SessionExpiryInterval;
Expand All @@ -23,12 +25,12 @@ public MqttClientUnexpectedDisconnectReceivedException(MqttDisconnectPacket disc

public MqttDisconnectReasonCode? ReasonCode { get; }

public uint? SessionExpiryInterval { get; }

public string ReasonString { get; }

public List<MqttUserProperty> UserProperties { get; }

public string ServerReference { get; }

public uint? SessionExpiryInterval { get; }

public List<MqttUserProperty> UserProperties { get; }
}
}
}
35 changes: 28 additions & 7 deletions Source/MQTTnet/Client/MqttClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public sealed class MqttClient : Disposable, IMqttClient
Task _packetReceiverTask;
AsyncQueue<MqttPublishPacket> _publishPacketReceiverQueue;
Task _publishPacketReceiverTask;
MqttDisconnectPacket _unexpectedDisconnectPacket;

public MqttClient(IMqttClientAdapterFactory channelFactory, IMqttNetLogger logger)
{
Expand Down Expand Up @@ -123,6 +124,8 @@ public async Task<MqttClientConnectResult> ConnectAsync(MqttClientOptions option
var adapter = _adapterFactory.CreateClientAdapter(options, new MqttPacketInspector(_events.InspectPacketEvent, _rootLogger), _rootLogger);
_adapter = adapter ?? throw new InvalidOperationException("The adapter factory did not provide an adapter.");

_unexpectedDisconnectPacket = null;

if (cancellationToken.CanBeCanceled)
{
connectResult = await ConnectInternal(adapter, cancellationToken).ConfigureAwait(false);
Expand Down Expand Up @@ -239,7 +242,7 @@ public async Task DisconnectAsync(MqttClientDisconnectOptions options, Cancellat
public async Task PingAsync(CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();

ThrowIfDisposed();
ThrowIfNotConnected();

Expand Down Expand Up @@ -330,7 +333,7 @@ public async Task<MqttClientSubscribeResult> SubscribeAsync(MqttClientSubscribeO

ThrowIfDisposed();
ThrowIfNotConnected();

if (Options.ValidateFeatures)
{
MqttClientSubscribeOptionsValidator.ThrowIfNotSupported(options, _adapter.PacketFormatterAdapter.ProtocolVersion);
Expand Down Expand Up @@ -682,9 +685,10 @@ Task ProcessReceivedDisconnectPacket(MqttDisconnectPacket disconnectPacket)
_disconnectReason = (int)disconnectPacket.ReasonCode;
_disconnectReasonString = disconnectPacket.ReasonString;
_disconnectUserProperties = disconnectPacket.UserProperties;
_unexpectedDisconnectPacket = disconnectPacket;

// Also dispatch disconnect to waiting threads to generate a proper exception.
_packetDispatcher.Dispose(new MqttClientUnexpectedDisconnectReceivedException(disconnectPacket));
_packetDispatcher.Dispose(new MqttClientUnexpectedDisconnectReceivedException(disconnectPacket, null));

return DisconnectInternal(_packetReceiverTask, null, null);
}
Expand Down Expand Up @@ -751,10 +755,27 @@ async Task<MqttClientPublishResult> PublishAtLeastOnce(MqttPublishPacket publish

async Task<MqttClientPublishResult> PublishAtMostOnce(MqttPublishPacket publishPacket, CancellationToken cancellationToken)
{
// No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier]
await Send(publishPacket, cancellationToken).ConfigureAwait(false);
try
{
// No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier]
await Send(publishPacket, cancellationToken).ConfigureAwait(false);

return MqttClientResultFactory.PublishResult.Create(null);
}
catch (Exception exception)
{
// We have to check if the server has sent a disconnect packet in response to the published message.
// Since we are in QoS 0 we do not get a direct response via an PUBACK packet and thus basically no
// feedback at all.
var localUnexpectedDisconnectPacket = _unexpectedDisconnectPacket;

return MqttClientResultFactory.PublishResult.Create(null);
if (localUnexpectedDisconnectPacket != null)
{
throw new MqttClientUnexpectedDisconnectReceivedException(localUnexpectedDisconnectPacket, exception);
}

throw;
}
}

async Task<MqttClientPublishResult> PublishExactlyOnce(MqttPublishPacket publishPacket, CancellationToken cancellationToken)
Expand Down Expand Up @@ -1071,4 +1092,4 @@ async Task TrySendKeepAliveMessages(CancellationToken cancellationToken)
}
}
}
}
}