Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
chkr1011 committed Apr 11, 2018
2 parents 06a299d + ec41efd commit 9afff6d
Show file tree
Hide file tree
Showing 28 changed files with 260 additions and 148 deletions.
6 changes: 3 additions & 3 deletions Build/MQTTnet.AspNetCore.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@
<package >
<metadata>
<id>MQTTnet.AspNetCore</id>
<version>2.7.3</version>
<version>2.7.4</version>
<authors>Christian Kratky</authors>
<owners>Christian Kratky</owners>
<licenseUrl>https://github.com/chkr1011/MQTTnet/blob/master/LICENSE</licenseUrl>
<projectUrl>https://github.com/chkr1011/MQTTnet</projectUrl>
<iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>This is a support library to integrate MQTTnet into AspNetCore.</description>
<releaseNotes>* Updated to MQTTnet 2.7.3.
<releaseNotes>* Updated to MQTTnet 2.7.4.
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
<dependencies>
<group targetFramework="netstandard2.0">
<dependency id="MQTTnet" version="2.7.3" />
<dependency id="MQTTnet" version="2.7.4" />
</group>
</dependencies>
</metadata>
Expand Down
19 changes: 9 additions & 10 deletions Build/MQTTnet.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,21 @@
<package >
<metadata>
<id>MQTTnet</id>
<version>2.7.3</version>
<version>2.7.4</version>
<authors>Christian Kratky</authors>
<owners>Christian Kratky</owners>
<licenseUrl>https://github.com/chkr1011/MQTTnet/blob/master/LICENSE</licenseUrl>
<projectUrl>https://github.com/chkr1011/MQTTnet</projectUrl>
<iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description>
<releaseNotes>* [Core] Add several new extension methods.
* [Client] Fixed an issue in _ManagedMqttClientOptionsBuilder_ when using _WithClientOptions_ and an options builder.
* [Client] Added the "IsStarted" property for the managed client.
* [Client] Optimized stream buffer for UWP apps.
* [Client] Added the _BufferSize_ to the TCP options.
* [Client] Fixed a race condition which leads to exceptions when reconnecting rapidly.
* [Server] Fixed a race condition which leads to exceptions when clients are reconnecting rapidly.
* [Core] Fixed some issues in stream and socket handling.
<releaseNotes> * [Client] Fixed a deadlock while the client disconnects.
* [Client] Fixed broken support for protocol version 3.1.0.
* [Server] The _MqttTcpServerAdapter_ is now added to the ASP.NET services.
* [Server] _MqttServerAdapter_ is renamed to _MqttTcpServerAdapter_ (BREAKING CHANGE!).
* [Server] The server no longer sends the will message of a client if the disconnect was clean (via _Disconnect_ packet).
* [Server] The application message interceptor now allows closing the connection.
* [Server] Added a new flag for the _ClientDisconnected_ event which contains a value indicating whether the disconnect was clean (via _Disconnect_ packet).
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
Expand Down Expand Up @@ -46,7 +45,7 @@

<group targetFramework="net461">
</group>

</dependencies>
</metadata>

Expand Down
5 changes: 4 additions & 1 deletion Frameworks/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using MQTTnet.Adapter;
using MQTTnet.Diagnostics;
using MQTTnet.Server;
using MQTTnet.Implementations;

namespace MQTTnet.AspNetCore
{
Expand All @@ -20,7 +21,9 @@ public static IServiceCollection AddHostedMqttServer(this IServiceCollection ser
services.AddSingleton<IMqttServer>(s => s.GetService<MqttHostedServer>());

services.AddSingleton<MqttWebSocketServerAdapter>();
services.AddSingleton<IMqttServerAdapter>(s => s.GetService<MqttWebSocketServerAdapter>());
services.AddSingleton<MqttTcpServerAdapter>();
services.AddSingleton<IMqttServerAdapter>(s => s.GetService<MqttWebSocketServerAdapter>());
services.AddSingleton<IMqttServerAdapter>(s => s.GetService<MqttTcpServerAdapter>());

return services;
}
Expand Down
8 changes: 4 additions & 4 deletions Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ public MqttChannelAdapter(IMqttChannel channel, IMqttPacketSerializer serializer
public Task ConnectAsync(TimeSpan timeout)
{
ThrowIfDisposed();
_logger.Trace<MqttChannelAdapter>("Connecting [Timeout={0}]", timeout);
_logger.Verbose<MqttChannelAdapter>("Connecting [Timeout={0}]", timeout);

return ExecuteAndWrapExceptionAsync(() => _channel.ConnectAsync().TimeoutAfter(timeout));
}

public Task DisconnectAsync(TimeSpan timeout)
{
ThrowIfDisposed();
_logger.Trace<MqttChannelAdapter>("Disconnecting [Timeout={0}]", timeout);
_logger.Verbose<MqttChannelAdapter>("Disconnecting [Timeout={0}]", timeout);

return ExecuteAndWrapExceptionAsync(() => _channel.DisconnectAsync().TimeoutAfter(timeout));
}
Expand All @@ -70,7 +70,7 @@ public Task SendPacketsAsync(TimeSpan timeout, CancellationToken cancellationTok
continue;
}

_logger.Trace<MqttChannelAdapter>("TX >>> {0} [Timeout={1}]", packet, timeout);
_logger.Verbose<MqttChannelAdapter>("TX >>> {0} [Timeout={1}]", packet, timeout);

var chunks = PacketSerializer.Serialize(packet);
foreach (var chunk in chunks)
Expand Down Expand Up @@ -135,7 +135,7 @@ await ExecuteAndWrapExceptionAsync(async () =>
throw new MqttProtocolViolationException("Received malformed packet.");
}

_logger.Trace<MqttChannelAdapter>("RX <<< {0}", packet);
_logger.Verbose<MqttChannelAdapter>("RX <<< {0}", packet);
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ namespace MQTTnet.Client
{
public interface IMqttClientAdapterFactory
{
IMqttChannelAdapter CreateClientAdapter(IMqttClientChannelOptions options, IMqttNetLogger logger);
IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public interface IMqttClientOptions

TimeSpan CommunicationTimeout { get; }
TimeSpan KeepAlivePeriod { get; }
TimeSpan? KeepAliveSendInterval { get; set; }
TimeSpan? KeepAliveSendInterval { get; }

MqttProtocolVersion ProtocolVersion { get; }

Expand Down
92 changes: 46 additions & 46 deletions Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,16 @@ public async Task<MqttClientConnectResult> ConnectAsync(IMqttClientOptions optio
_packetIdentifierProvider.Reset();
_packetDispatcher.Reset();

_adapter = _adapterFactory.CreateClientAdapter(options.ChannelOptions, _logger);
_adapter = _adapterFactory.CreateClientAdapter(options, _logger);

_logger.Trace<MqttClient>("Trying to connect with server.");
_logger.Verbose<MqttClient>("Trying to connect with server.");
await _adapter.ConnectAsync(_options.CommunicationTimeout).ConfigureAwait(false);
_logger.Trace<MqttClient>("Connection with server established.");
_logger.Verbose<MqttClient>("Connection with server established.");

await StartReceivingPacketsAsync().ConfigureAwait(false);

var connectResponse = await AuthenticateAsync(options.WillMessage).ConfigureAwait(false);
_logger.Trace<MqttClient>("MQTT connection with server established.");
_logger.Verbose<MqttClient>("MQTT connection with server established.");

_sendTracker.Restart();

Expand All @@ -77,12 +77,14 @@ public async Task<MqttClientConnectResult> ConnectAsync(IMqttClientOptions optio

IsConnected = true;
Connected?.Invoke(this, new MqttClientConnectedEventArgs(connectResponse.IsSessionPresent));

_logger.Info<MqttClient>("Connected.");
return new MqttClientConnectResult(connectResponse.IsSessionPresent);
}
catch (Exception exception)
{
_logger.Error<MqttClient>(exception, "Error while connecting with server.");
await DisconnectInternalAsync(exception).ConfigureAwait(false);
await DisconnectInternalAsync(null, exception).ConfigureAwait(false);

throw;
}
Expand All @@ -104,7 +106,7 @@ public async Task DisconnectAsync()
}
finally
{
await DisconnectInternalAsync(null).ConfigureAwait(false);
await DisconnectInternalAsync(null, null).ConfigureAwait(false);
}
}

Expand Down Expand Up @@ -159,7 +161,7 @@ public async Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMe
case MqttQualityOfServiceLevel.AtMostOnce:
{
// No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier]
await SendAsync((MqttPublishPacket[])qosGroup.ToArray()).ConfigureAwait(false);
await SendAsync(qosGroup.Cast<MqttBasePacket>().ToArray()).ConfigureAwait(false);
break;
}
case MqttQualityOfServiceLevel.AtLeastOnce:
Expand Down Expand Up @@ -236,33 +238,48 @@ private void ThrowIfConnected(string message)
if (IsConnected) throw new MqttProtocolViolationException(message);
}

private async Task DisconnectInternalAsync(Exception exception)
private async Task DisconnectInternalAsync(Task sender, Exception exception)
{
await _disconnectLock.WaitAsync();
var clientWasConnected = IsConnected;
try
{
IsConnected = false;

if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested)
{
return;
}

_cancellationTokenSource.Cancel(false);
}
catch (Exception adapterException)
{
_logger.Warning<MqttClient>(adapterException, "Error while disconnecting from adapter.");
}
finally
{
_disconnectLock.Release();
}

var clientWasConnected = IsConnected;
IsConnected = false;

if (_packetReceiverTask != null)
try
{
if (_packetReceiverTask != null && _packetReceiverTask != sender)
{
Task.WaitAll(_packetReceiverTask);
_packetReceiverTask.Wait();
}

if (_keepAliveMessageSenderTask != null)
if (_keepAliveMessageSenderTask != null && _keepAliveMessageSenderTask != sender)
{
Task.WaitAll(_keepAliveMessageSenderTask);
_keepAliveMessageSenderTask.Wait();
}

await _adapter.DisconnectAsync(_options.CommunicationTimeout).ConfigureAwait(false);
_logger.Trace<MqttClient>("Disconnected from adapter.");
if (_adapter != null)
{
await _adapter.DisconnectAsync(_options.CommunicationTimeout).ConfigureAwait(false);
}

_logger.Verbose<MqttClient>("Disconnected from adapter.");
}
catch (Exception adapterException)
{
Expand All @@ -272,12 +289,9 @@ private async Task DisconnectInternalAsync(Exception exception)
{
_adapter?.Dispose();
_adapter = null;

_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;

_disconnectLock.Release();

_logger.Info<MqttClient>("Disconnected.");
Disconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientWasConnected, exception));
}
Expand All @@ -287,8 +301,6 @@ private async Task ProcessReceivedPacketAsync(MqttBasePacket packet)
{
try
{
_logger.Info<MqttClient>("Received <<< {0}", packet);

if (packet is MqttPublishPacket publishPacket)
{
await ProcessReceivedPublishPacketAsync(publishPacket).ConfigureAwait(false);
Expand Down Expand Up @@ -395,7 +407,7 @@ private async Task<TResponsePacket> SendAndReceiveAsync<TResponsePacket>(MqttBas

private async Task SendKeepAliveMessagesAsync()
{
_logger.Info<MqttClient>("Start sending keep alive packets.");
_logger.Verbose<MqttClient>("Start sending keep alive packets.");

try
{
Expand All @@ -415,37 +427,31 @@ private async Task SendKeepAliveMessagesAsync()
await Task.Delay(keepAliveSendInterval, _cancellationTokenSource.Token).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
{
}
catch (Exception exception)
{
if (_cancellationTokenSource.Token.IsCancellationRequested)
if (exception is OperationCanceledException)
{
return;
}

if (exception is MqttCommunicationException)
else if (exception is MqttCommunicationException)
{
_logger.Warning<MqttClient>(exception, "MQTT communication exception while sending/receiving keep alive packets.");
}
else
{
_logger.Warning<MqttClient>(exception, "Unhandled exception while sending/receiving keep alive packets.");

_logger.Error<MqttClient>(exception, "Unhandled exception while sending/receiving keep alive packets.");
}

await DisconnectInternalAsync(exception).ConfigureAwait(false);
await DisconnectInternalAsync(_keepAliveMessageSenderTask, exception).ConfigureAwait(false);
}
finally
{
_logger.Info<MqttClient>("Stopped sending keep alive packets.");
_logger.Verbose<MqttClient>("Stopped sending keep alive packets.");
}
}

private async Task ReceivePacketsAsync()
{
_logger.Info<MqttClient>("Start receiving packets.");
_logger.Verbose<MqttClient>("Start receiving packets.");

try
{
Expand All @@ -463,31 +469,25 @@ private async Task ReceivePacketsAsync()
StartProcessReceivedPacket(packet);
}
}
catch (OperationCanceledException)
{
}
catch (Exception exception)
{
if (_cancellationTokenSource.IsCancellationRequested)
if (exception is OperationCanceledException)
{
return;
}

if (exception is MqttCommunicationException)
else if (exception is MqttCommunicationException)
{
_logger.Warning<MqttClient>(exception, "MQTT communication exception while receiving packets.");
}
else
{
_logger.Error<MqttClient>(exception, "Unhandled exception while receiving packets.");

}

await DisconnectInternalAsync(exception).ConfigureAwait(false);
await DisconnectInternalAsync(_packetReceiverTask, exception).ConfigureAwait(false);
}
finally
{
_logger.Info<MqttClient>("Stopped receiving packets.");
_logger.Verbose<MqttClient>("Stopped receiving packets.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ public interface IMqttNetLogger
{
event EventHandler<MqttNetLogMessagePublishedEventArgs> LogMessagePublished;

void Trace<TSource>(string message, params object[] parameters);
void Verbose<TSource>(string message, params object[] parameters);

void Info<TSource>(string message, params object[] parameters);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public MqttNetLogger(string logId = null)

public event EventHandler<MqttNetLogMessagePublishedEventArgs> LogMessagePublished;

public void Trace<TSource>(string message, params object[] parameters)
public void Verbose<TSource>(string message, params object[] parameters)
{
Publish<TSource>(MqttNetLogLevel.Verbose, null, message, parameters);
}
Expand Down
Loading

0 comments on commit 9afff6d

Please sign in to comment.