Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: time out and cancel propagation in managed client #1987

Merged
merged 3 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
* [Core] The package inspector is now fully async (#1941).
* [Client] Added a dedicated exception when the client is not connected (#1954, thanks to @marcpiulachs).
* [Client] Exposed the certificate selection event handler in client options (#1984).
* [Server] The server will no longer send _NoMatchingSubscribers_ when the actual subscription was non success (#1965, BREAKING CHANGE!).
* [Server] The server will no longer send _NoMatchingSubscribers_ when the actual subscription was non success (#1965, BREAKING CHANGE!).
* [Server] Fixed broken support for _null_ in _AddServer_ method in ASP.NET integration (#1981).
* [ManagedClient] Added a new event (SubscriptionsChangedAsync) which is fired when a subscription or unsubscription was made (#1894, thanks to @pdufrene).
* [ManagedClient] Fixed race condition when server shuts down while subscribing (#1987, thanks to @marve).
* [TopicTemplate] Added new extension which provides a template engine for topics (#1932, thanks to @simonthum).
70 changes: 42 additions & 28 deletions Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ namespace MQTTnet.Extensions.ManagedClient
{
public sealed class ManagedMqttClient : Disposable, IManagedMqttClient
{
readonly MqttNetSourceLogger _logger;

readonly AsyncEvent<InterceptingPublishMessageEventArgs> _interceptingPublishMessageEvent = new AsyncEvent<InterceptingPublishMessageEventArgs>();
readonly AsyncEvent<ApplicationMessageProcessedEventArgs> _applicationMessageProcessedEvent = new AsyncEvent<ApplicationMessageProcessedEventArgs>();
readonly AsyncEvent<ApplicationMessageSkippedEventArgs> _applicationMessageSkippedEvent = new AsyncEvent<ApplicationMessageSkippedEventArgs>();
Expand All @@ -27,9 +29,7 @@ public sealed class ManagedMqttClient : Disposable, IManagedMqttClient
readonly AsyncEvent<ManagedProcessFailedEventArgs> _synchronizingSubscriptionsFailedEvent = new AsyncEvent<ManagedProcessFailedEventArgs>();
readonly AsyncEvent<SubscriptionsChangedEventArgs> _subscriptionsChangedEvent = new AsyncEvent<SubscriptionsChangedEventArgs>();

readonly MqttNetSourceLogger _logger;
readonly BlockingQueue<ManagedMqttApplicationMessage> _messageQueue = new BlockingQueue<ManagedMqttApplicationMessage>();

readonly AsyncLock _messageQueueLock = new AsyncLock();

/// <summary>
Expand Down Expand Up @@ -83,21 +83,19 @@ public event Func<ApplicationMessageSkippedEventArgs, Task> ApplicationMessageSk
add => _applicationMessageSkippedEvent.AddHandler(value);
remove => _applicationMessageSkippedEvent.RemoveHandler(value);
}

public event Func<ApplicationMessageProcessedEventArgs, Task> ApplicationMessageProcessedAsync
{
add => _applicationMessageProcessedEvent.AddHandler(value);
remove => _applicationMessageProcessedEvent.RemoveHandler(value);
}


public event Func<InterceptingPublishMessageEventArgs, Task> InterceptPublishMessageAsync
{
add => _interceptingPublishMessageEvent.AddHandler(value);
remove => _interceptingPublishMessageEvent.RemoveHandler(value);
}


public event Func<MqttApplicationMessageReceivedEventArgs, Task> ApplicationMessageReceivedAsync
{
add => InternalClient.ApplicationMessageReceivedAsync += value;
Expand Down Expand Up @@ -149,7 +147,7 @@ public event Func<SubscriptionsChangedEventArgs, Task> SubscriptionsChangedAsync
public ManagedMqttClientOptions Options { get; private set; }

public int PendingApplicationMessagesCount => _messageQueue.Count;

public async Task EnqueueAsync(MqttApplicationMessage applicationMessage)
{
ThrowIfDisposed();
Expand Down Expand Up @@ -277,7 +275,7 @@ public async Task StopAsync(bool cleanDisconnect = true)
ThrowIfDisposed();

_isCleanDisconnect = cleanDisconnect;

StopPublishing();
StopMaintainingConnection();

Expand Down Expand Up @@ -369,6 +367,13 @@ static TimeSpan GetRemainingTime(DateTime endTime)
return remainingTime < TimeSpan.Zero ? TimeSpan.Zero : remainingTime;
}

CancellationTokenSource NewTimeoutToken(CancellationToken linkedToken)
{
var newTimeoutToken = CancellationTokenSource.CreateLinkedTokenSource(linkedToken);
newTimeoutToken.CancelAfter(Options.ClientOptions.Timeout);
return newTimeoutToken;
}

async Task HandleSubscriptionExceptionAsync(Exception exception, List<MqttTopicFilter> addedSubscriptions, List<string> removedSubscriptions)
{
_logger.Warning(exception, "Synchronizing subscriptions failed.");
Expand Down Expand Up @@ -411,7 +416,7 @@ async Task MaintainConnectionAsync(CancellationToken cancellationToken)
{
if (_isCleanDisconnect)
{
using (var disconnectTimeout = new CancellationTokenSource(Options.ClientOptions.Timeout))
using (var disconnectTimeout = NewTimeoutToken(CancellationToken.None))
{
await InternalClient.DisconnectAsync(new MqttClientDisconnectOptions(), disconnectTimeout.Token).ConfigureAwait(false);
}
Expand Down Expand Up @@ -461,7 +466,7 @@ async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken)

cancellationToken.ThrowIfCancellationRequested();

await TryPublishQueuedMessageAsync(message).ConfigureAwait(false);
await TryPublishQueuedMessageAsync(message, cancellationToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
Expand All @@ -477,7 +482,7 @@ async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken)
}
}

async Task PublishReconnectSubscriptionsAsync()
async Task PublishReconnectSubscriptionsAsync(CancellationToken cancellationToken)
{
_logger.Info("Publishing subscriptions at reconnect");

Expand All @@ -489,20 +494,20 @@ async Task PublishReconnectSubscriptionsAsync()
{
topicFilters = new List<MqttTopicFilter>();
SendSubscribeUnsubscribeResult subscribeUnsubscribeResult;

foreach (var sub in _reconnectSubscriptions)
{
topicFilters.Add(sub.Value);

if (topicFilters.Count == Options.MaxTopicFiltersInSubscribeUnsubscribePackets)
{
subscribeUnsubscribeResult = await SendSubscribeUnsubscribe(topicFilters, null).ConfigureAwait(false);
subscribeUnsubscribeResult = await SendSubscribeUnsubscribe(topicFilters, null, cancellationToken).ConfigureAwait(false);
topicFilters.Clear();
await HandleSubscriptionsResultAsync(subscribeUnsubscribeResult).ConfigureAwait(false);
}
}

subscribeUnsubscribeResult = await SendSubscribeUnsubscribe(topicFilters, null).ConfigureAwait(false);
subscribeUnsubscribeResult = await SendSubscribeUnsubscribe(topicFilters, null, cancellationToken).ConfigureAwait(false);
await HandleSubscriptionsResultAsync(subscribeUnsubscribeResult).ConfigureAwait(false);
}
}
Expand Down Expand Up @@ -555,13 +560,13 @@ async Task PublishSubscriptionsAsync(TimeSpan timeout, CancellationToken cancell

if (addedTopicFilters.Count == Options.MaxTopicFiltersInSubscribeUnsubscribePackets)
{
subscribeUnsubscribeResult = await SendSubscribeUnsubscribe(addedTopicFilters, null).ConfigureAwait(false);
subscribeUnsubscribeResult = await SendSubscribeUnsubscribe(addedTopicFilters, null, cancellationToken).ConfigureAwait(false);
addedTopicFilters.Clear();
await HandleSubscriptionsResultAsync(subscribeUnsubscribeResult).ConfigureAwait(false);
}
}
subscribeUnsubscribeResult = await SendSubscribeUnsubscribe(addedTopicFilters, null).ConfigureAwait(false);

subscribeUnsubscribeResult = await SendSubscribeUnsubscribe(addedTopicFilters, null, cancellationToken).ConfigureAwait(false);
await HandleSubscriptionsResultAsync(subscribeUnsubscribeResult).ConfigureAwait(false);

var removedTopicFilters = new List<string>();
Expand All @@ -571,13 +576,13 @@ async Task PublishSubscriptionsAsync(TimeSpan timeout, CancellationToken cancell

if (removedTopicFilters.Count == Options.MaxTopicFiltersInSubscribeUnsubscribePackets)
{
subscribeUnsubscribeResult = await SendSubscribeUnsubscribe(null, removedTopicFilters).ConfigureAwait(false);
subscribeUnsubscribeResult = await SendSubscribeUnsubscribe(null, removedTopicFilters, cancellationToken).ConfigureAwait(false);
removedTopicFilters.Clear();
await HandleSubscriptionsResultAsync(subscribeUnsubscribeResult).ConfigureAwait(false);
}
}

subscribeUnsubscribeResult = await SendSubscribeUnsubscribe(null, removedTopicFilters).ConfigureAwait(false);
subscribeUnsubscribeResult = await SendSubscribeUnsubscribe(null, removedTopicFilters, cancellationToken).ConfigureAwait(false);
await HandleSubscriptionsResultAsync(subscribeUnsubscribeResult).ConfigureAwait(false);
}
}
Expand All @@ -592,7 +597,7 @@ async Task<ReconnectionResult> ReconnectIfRequiredAsync(CancellationToken cancel
MqttClientConnectResult connectResult = null;
try
{
using (var connectTimeout = new CancellationTokenSource(Options.ClientOptions.Timeout))
using (var connectTimeout = NewTimeoutToken(cancellationToken))
{
connectResult = await InternalClient.ConnectAsync(Options.ClientOptions, connectTimeout.Token).ConfigureAwait(false);
}
Expand All @@ -611,7 +616,7 @@ async Task<ReconnectionResult> ReconnectIfRequiredAsync(CancellationToken cancel
}
}

async Task<SendSubscribeUnsubscribeResult> SendSubscribeUnsubscribe(List<MqttTopicFilter> addedSubscriptions, List<string> removedSubscriptions)
async Task<SendSubscribeUnsubscribeResult> SendSubscribeUnsubscribe(List<MqttTopicFilter> addedSubscriptions, List<string> removedSubscriptions, CancellationToken cancellationToken)
{
var subscribeResults = new List<MqttClientSubscribeResult>();
var unsubscribeResults = new List<MqttClientUnsubscribeResult>();
Expand All @@ -626,8 +631,11 @@ async Task<SendSubscribeUnsubscribeResult> SendSubscribeUnsubscribe(List<MqttTop
unsubscribeOptionsBuilder.WithTopicFilter(removedSubscription);
}

var unsubscribeResult = await InternalClient.UnsubscribeAsync(unsubscribeOptionsBuilder.Build()).ConfigureAwait(false);
unsubscribeResults.Add(unsubscribeResult);
using (var unsubscribeTimeout = NewTimeoutToken(cancellationToken))
{
var unsubscribeResult = await InternalClient.UnsubscribeAsync(unsubscribeOptionsBuilder.Build(), unsubscribeTimeout.Token).ConfigureAwait(false);
unsubscribeResults.Add(unsubscribeResult);
}

//clear because these worked, maybe the subscribe below will fail, only report those
removedSubscriptions.Clear();
Expand All @@ -642,8 +650,11 @@ async Task<SendSubscribeUnsubscribeResult> SendSubscribeUnsubscribe(List<MqttTop
subscribeOptionsBuilder.WithTopicFilter(addedSubscription);
}

var subscribeResult = await InternalClient.SubscribeAsync(subscribeOptionsBuilder.Build()).ConfigureAwait(false);
subscribeResults.Add(subscribeResult);
using (var subscribeTimeout = NewTimeoutToken(cancellationToken))
{
var subscribeResult = await InternalClient.SubscribeAsync(subscribeOptionsBuilder.Build(), subscribeTimeout.Token).ConfigureAwait(false);
subscribeResults.Add(subscribeResult);
}
}
}
catch (Exception exception)
Expand Down Expand Up @@ -697,15 +708,15 @@ async Task TryMaintainConnectionAsync(CancellationToken cancellationToken)
{
var oldConnectionState = InternalClient.IsConnected;
var connectionState = await ReconnectIfRequiredAsync(cancellationToken).ConfigureAwait(false);

if (connectionState == ReconnectionResult.NotConnected)
{
StopPublishing();
await Task.Delay(Options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
}
else if (connectionState == ReconnectionResult.Reconnected)
{
await PublishReconnectSubscriptionsAsync().ConfigureAwait(false);
await PublishReconnectSubscriptionsAsync(cancellationToken).ConfigureAwait(false);
StartPublishing();
}
else if (connectionState == ReconnectionResult.Recovered)
Expand Down Expand Up @@ -735,7 +746,7 @@ async Task TryMaintainConnectionAsync(CancellationToken cancellationToken)
}
}

async Task TryPublishQueuedMessageAsync(ManagedMqttApplicationMessage message)
async Task TryPublishQueuedMessageAsync(ManagedMqttApplicationMessage message, CancellationToken cancellationToken)
{
Exception transmitException = null;
bool acceptPublish = true;
Expand All @@ -750,7 +761,10 @@ async Task TryPublishQueuedMessageAsync(ManagedMqttApplicationMessage message)

if (acceptPublish)
{
await InternalClient.PublishAsync(message.ApplicationMessage).ConfigureAwait(false);
using (var publishTimeout = NewTimeoutToken(cancellationToken))
{
await InternalClient.PublishAsync(message.ApplicationMessage, publishTimeout.Token).ConfigureAwait(false);
}
}

using (await _messageQueueLock.EnterAsync().ConfigureAwait(false)) //lock to avoid conflict with this.PublishAsync
Expand Down
Loading
Loading