Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
chkr1011 committed Jul 8, 2018
2 parents 15d4ee4 + 54e3cd7 commit 0bdb7e9
Show file tree
Hide file tree
Showing 34 changed files with 576 additions and 197 deletions.
5 changes: 3 additions & 2 deletions Build/MQTTnet.AspNetCore.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
<iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>This is a support library to integrate MQTTnet into AspNetCore.</description>
<releaseNotes>* Updated to MQTTnet 2.8.0.
<releaseNotes>* Updated to MQTTnet 2.8.1.
* For more release notes please check the MQTTnet release notes.
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
<dependencies>
<dependency id="MQTTnet" version="2.8.0" />
<dependency id="MQTTnet" version="2.8.1" />
<dependency id="Microsoft.AspNetCore.Connections.Abstractions" version="2.1.0" />
<dependency id="Microsoft.AspNetCore.WebSockets" version="2.0.1" />
<dependency id="Microsoft.Extensions.Hosting.Abstractions" version="2.0.1" />
Expand Down
5 changes: 3 additions & 2 deletions Build/MQTTnet.Extensions.ManagedClient.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
<iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>This is an extension library which provides a managed MQTT client with additional features using MQTTnet.</description>
<releaseNotes>* Updated to MQTTnet 2.8.0.
<releaseNotes>* Updated to MQTTnet 2.8.1.
* For more release notes please check the MQTTnet release notes.
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
<dependencies>
<dependency id="MQTTnet" version="2.8.0" />
<dependency id="MQTTnet" version="2.8.1" />
</dependencies>
</metadata>

Expand Down
5 changes: 3 additions & 2 deletions Build/MQTTnet.Extensions.Rpc.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
<iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>This is an extension library which allows executing synchronous device calls including a response using MQTTnet.</description>
<releaseNotes>* Updated to MQTTnet 2.8.0.
<releaseNotes>* Updated to MQTTnet 2.8.1.
* For more release notes please check the MQTTnet release notes.
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
<dependencies>
<dependency id="MQTTnet" version="2.8.0" />
<dependency id="MQTTnet" version="2.8.1" />
</dependencies>
</metadata>

Expand Down
33 changes: 6 additions & 27 deletions Build/MQTTnet.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,12 @@
<iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description>
<releaseNotes> ** MQTTnet is now available at Open Collective for donations (https://opencollective.com/mqttnet). **
* [Core] Performance optimizations.
* [Core] Due to performance reasons the timestamp of log messages is now in UTC format.
* [Core] Added several packet validations.
* [Core] Log messages now contain the complete source path including parent components.
* [Core] The adapter now has an _Endpoint_ definition as string containing remote IP and port.
* [Client] Received messages are now processed completely in the worker thread without creating new Tasks.
* [Client] Fixed wrong calculation for sending keep alive packets (thanks to @cstichlberger)
* [Client] A clean disconnect (via DisconnectAsync) will no longer throw an exception.
* [Client] Added new overloads for quick message publishing.
* [ManagedClient] The managed client is moved to a separate nuget package.
* [ManagedClient] Added an own message format with extended properties like ID (BREAKING CHANGE).
* [ManagedClient] Fixed a loading issue of stored application messages (thanks to @JTrotta).
* [ManagedClient] Added a new event which is fired when a synchronization of the subscriptions has failed.
* [ManagedClient] Added a new event which is fired when a connection attempt has failed.
* [ManagedClient] Exposed a new property which provides the count of not published messages (pending messages count).
* [Server] Added support for other WebSocket sub protocol formats like mqttv-3.1.1 (thanks to @israellot).
* [Server] The takeover of an existing client sessions is now treated as a _clean_ disconnect of the previous client.
* [Server] The pending messages queue per client is now limited to 250 messages. Overflow strategy and count can be changed via options (thanks to @VladimirAkopyan)
* [Server] Keep alive checking is now suspended while large packages are being received (and thus the client is connected). Keep alive checking continues after a large packet is received completely.
* [Server] Rewritten the _ConnectedClients_ API and added new features for disconnecting and Endpoint information (IP etc.).
* [Server] Added settings for disabling persistent sessions and defining a max pending messages queue size per session.
* [Server] Persistent sessions are disabled by default (BREAKING CHANGE!).
* [Server] Added a new interceptor which is invoked before a new message is added to the client queue.
* [Server] Added support for Linux servers by dividing IPv4 and IPv6 support and adding new options (BREAKING CHANGE!).
* [Server] Gracefully closed connections are no longer reported as warnings.
* [Server] Added new overloads for initializing the ASP.NET Core integration.
<releaseNotes>* [Core] Performance optimizations.
* [Core] Fixed a bug which prevents receiving large packets (UWP only)
* [Client] The ManagedClient options now allow configuring the interval for connection checks.
* [Server] Added the Endpoint of the Adapter (remote IP and port) to the connection validation callback.
* [Server] The ipv4 and ipv6 endpoint can be disabled now by setting the bound IP address to _None_.
* [Server] Fix a bug in the keep alive monitor which caused high CPU load (thanks to @GarageGadget).
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
Expand Down
14 changes: 10 additions & 4 deletions Source/MQTTnet.AspnetCore/ApplicationBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Microsoft.Extensions.DependencyInjection;
using System.Linq;
using MQTTnet.Server;
using System.Collections.Generic;

namespace MQTTnet.AspNetCore
{
Expand All @@ -23,10 +24,7 @@ public static IApplicationBuilder UseMqttEndpoint(this IApplicationBuilder app,

if (context.Request.Headers.TryGetValue("Sec-WebSocket-Protocol", out var requestedSubProtocolValues))
{
// Order the protocols to also match "mqtt", "mqttv-3.1", "mqttv-3.11" etc.
subProtocol = requestedSubProtocolValues
.OrderByDescending(p => p.Length)
.FirstOrDefault(p => p.ToLower().StartsWith("mqtt"));
subProtocol = SelectSubProtocol(requestedSubProtocolValues);
}

var adapter = app.ApplicationServices.GetRequiredService<MqttWebSocketServerAdapter>();
Expand All @@ -40,6 +38,14 @@ public static IApplicationBuilder UseMqttEndpoint(this IApplicationBuilder app,
return app;
}

public static string SelectSubProtocol(IList<string> requestedSubProtocolValues)
{
// Order the protocols to also match "mqtt", "mqttv-3.1", "mqttv-3.11" etc.
return requestedSubProtocolValues
.OrderByDescending(p => p.Length)
.FirstOrDefault(p => p.ToLower().StartsWith("mqtt"));
}

public static IApplicationBuilder UseMqttServer(this IApplicationBuilder app, Action<IMqttServer> configure)
{
var server = app.ApplicationServices.GetRequiredService<IMqttServer>();
Expand Down
9 changes: 9 additions & 0 deletions Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Connections.Features;
using MQTTnet.Adapter;
using MQTTnet.Serializer;
using MQTTnet.Server;
Expand All @@ -13,6 +14,14 @@ public class MqttConnectionHandler : ConnectionHandler, IMqttServerAdapter

public override async Task OnConnectedAsync(ConnectionContext connection)
{
// required for websocket transport to work
var transferFormatFeature = connection.Features.Get<ITransferFormatFeature>();
if (transferFormatFeature != null)
{
transferFormatFeature.ActiveFormat = TransferFormat.Binary;
}


var serializer = new MqttPacketSerializer();
using (var adapter = new MqttConnectionContext(serializer, connection))
{
Expand Down
3 changes: 2 additions & 1 deletion Source/MQTTnet.AspnetCore/ReaderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public static bool TryDeserialize(this IMqttPacketSerializer serializer, in Read
}

var bodySlice = copy.Slice(0, bodyLength);
packet = serializer.Deserialize(new ReceivedMqttPacket(fixedheader, new MqttPacketBodyReader(bodySlice.GetArray(), 0)));
var buffer = bodySlice.GetArray();
packet = serializer.Deserialize(new ReceivedMqttPacket(fixedheader, new MqttPacketBodyReader(buffer, 0, buffer.Length)));
consumed = bodySlice.End;
observed = bodySlice.End;
return true;
Expand Down
2 changes: 1 addition & 1 deletion Source/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public static IServiceCollection AddHostedMqttServer<TOptions>(this IServiceColl
return services;
}

public static IServiceCollection AddHostedMqttServer(this IServiceCollection services)
private static IServiceCollection AddHostedMqttServer(this IServiceCollection services)
{
var logger = new MqttNetLogger();
var childLogger = logger.CreateChildLogger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ public interface IManagedMqttClientOptions

TimeSpan AutoReconnectDelay { get; }

TimeSpan ConnectionCheckInterval { get; }

IManagedMqttClientStorage Storage { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ private async Task TryMaintainConnectionAsync(CancellationToken cancellationToke

if (connectionState == ReconnectionResult.StillConnected)
{
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false);
await Task.Delay(_options.ConnectionCheckInterval, cancellationToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ public class ManagedMqttClientOptions : IManagedMqttClientOptions

public TimeSpan AutoReconnectDelay { get; set; } = TimeSpan.FromSeconds(5);

public TimeSpan ConnectionCheckInterval { get; set; } = TimeSpan.FromSeconds(1);

public IManagedMqttClientStorage Storage { get; set; }
}
}
2 changes: 1 addition & 1 deletion Source/MQTTnet.Extensions.Rpc/SampleCCode.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ _mqttClient.subscribe("MQTTnet.RPC/+/ping");
_mqttClient.subscribe("MQTTnet.RPC/+/do_something");

// It is not allowed to change the structure of the topic. Otherwise RPC will not work. So method names can be separated using
// an _ or . but no +, # or . If it is required to distinguish between devices own rules can be defined like the following.
// an _ or . but no +, # or /. If it is required to distinguish between devices own rules can be defined like the following.
_mqttClient.subscribe("MQTTnet.RPC/+/deviceA.ping");
_mqttClient.subscribe("MQTTnet.RPC/+/deviceB.ping");
_mqttClient.subscribe("MQTTnet.RPC/+/deviceC.getTemperature");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
using MQTTnet.Client;
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Security.Authentication;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Adapter;
using MQTTnet.Channel;
using MQTTnet.Diagnostics;
using MQTTnet.Serializer;
using WebSocket4Net;

namespace MQTTnet.TestApp.NetCore
{
public class WebSocket4NetMqttClientAdapterFactory : IMqttClientAdapterFactory
{
public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetChildLogger logger)
{
if (options == null) throw new ArgumentNullException(nameof(options));
if (logger == null) throw new ArgumentNullException(nameof(logger));

if (!(options.ChannelOptions is MqttClientWebSocketOptions))
{
throw new NotSupportedException("Only WebSocket connections are supported.");
}

return new MqttChannelAdapter(new WebSocket4NetMqttChannel(options), new MqttPacketSerializer(), logger);
}

private class WebSocket4NetMqttChannel : IMqttChannel
{
private readonly BlockingCollection<byte> _receiveBuffer = new BlockingCollection<byte>();

private readonly IMqttClientOptions _clientOptions;
private WebSocket4Net.WebSocket _webSocket;

public WebSocket4NetMqttChannel(IMqttClientOptions clientOptions)
{
_clientOptions = clientOptions ?? throw new ArgumentNullException(nameof(clientOptions));
}

public string Endpoint { get; } = "";

public Task ConnectAsync(CancellationToken cancellationToken)
{
var channelOptions = (MqttClientWebSocketOptions)_clientOptions.ChannelOptions;

var uri = "ws://" + channelOptions.Uri;
var sslProtocols = SslProtocols.None;

if (channelOptions.TlsOptions.UseTls)
{
uri = "wss://" + channelOptions.Uri;
sslProtocols = SslProtocols.Tls12;
}

var subProtocol = channelOptions.SubProtocols.FirstOrDefault() ?? string.Empty;

_webSocket = new WebSocket4Net.WebSocket(uri, subProtocol, sslProtocols: sslProtocols);
_webSocket.DataReceived += OnDataReceived;
_webSocket.Open();
SpinWait.SpinUntil(() => _webSocket.State == WebSocketState.Open, _clientOptions.CommunicationTimeout);

return Task.FromResult(0);
}

public Task DisconnectAsync()
{
if (_webSocket != null)
{
_webSocket.DataReceived -= OnDataReceived;
_webSocket.Close();
SpinWait.SpinUntil(() => _webSocket.State == WebSocketState.Closed, _clientOptions.CommunicationTimeout);
}

_webSocket = null;

return Task.FromResult(0);
}

public Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
var readBytes = 0;
while (count > 0 && !cancellationToken.IsCancellationRequested)
{
byte @byte;
if (readBytes == 0)
{
// Block until at lease one byte was received.
@byte = _receiveBuffer.Take(cancellationToken);
}
else
{
if (!_receiveBuffer.TryTake(out @byte))
{
return Task.FromResult(readBytes);
}
}

buffer[offset] = @byte;
offset++;
count--;
readBytes++;
}

return Task.FromResult(readBytes);
}

public Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
_webSocket.Send(buffer, offset, count);
return Task.FromResult(0);
}

public void Dispose()
{
if (_webSocket != null)
{
_webSocket.DataReceived -= OnDataReceived;
_webSocket.Dispose();
}
}

private void OnDataReceived(object sender, WebSocket4Net.DataReceivedEventArgs e)
{
foreach (var @byte in e.Data)
{
_receiveBuffer.Add(@byte);
}
}
}
}
}
Loading

0 comments on commit 0bdb7e9

Please sign in to comment.