Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce asynchronous message processing #891

Merged
merged 2 commits into from
Apr 4, 2020

Conversation

PSanetra
Copy link
Contributor

@PSanetra PSanetra commented Apr 3, 2020

This PR fixes #648, #587 and #829 by introducing asynchronous message processing.

The current code calls the ApplicationMessageReceivedHandler synchronously in the packet receiver loop. This blocks KeepAlive and Ack packets from being processed while a message is still being processed.

This PR also fixes the test MQTTnet.Tests/Server_Tests/Same_Client_Id_Connect_Disconnect_Event_Order as it failed on my machine due to a race condition in the MqttClientSessionsManager.

Fixes #648
Fixes #587
Fixes #829

@SeppPenner SeppPenner requested review from chkr1011 and JanEggers April 3, 2020 19:53
@SeppPenner SeppPenner added the feature-request New feature or request label Apr 3, 2020
@chkr1011
Copy link
Collaborator

chkr1011 commented Apr 3, 2020

I also tried to fix the UnitTest at the same time so that my change is already there since a couple of minutes 😄 Please merge master into your branch. I am not sure which fix is better but I think your strategy might be better. I added a comment which describes the problem at the SpinWait.

Regarding async processing. There is already an open pull request with that feature. In the comments I proposed to add some parameter in the context of the message handler like other SDKs do in Azure (AcknowlegeMode = Auto|Manual etc.).

@PSanetra
Copy link
Contributor Author

PSanetra commented Apr 4, 2020

@chkr1011 Interesting approach 😄

I think the strategy without the SpinWait is cleaner. It is also possible that you would need to declare the IsFinalized property as volatile, because it is accessed from different Threads in parallel. (Not sure how often this would cause issues in the real world.)

Regarding PR #875: I think that PR is related but does not fully solve the issue I try to solve in this PR. PR #875 tries to implement concurrent processing which can violate the order of message processing. This might be ok in some use cases, but I think the default should be to maintain the order of message processing.

This PR maintains the order of message processing and just starts one additional Task which dequeues messages from an AsyncQueue. Inside the packet receiver loop the publish packets are just getting enqueued and then the loop continues. This solves the issues I mentioned.

I will rebase this PR onto the master and keep the strategy of this PR for the unit test fix.

@PSanetra PSanetra force-pushed the async-message-processing branch from a14b1a6 to f372a8f Compare April 4, 2020 08:32
@chkr1011
Copy link
Collaborator

chkr1011 commented Apr 4, 2020

Ah OK so only processing of incoming application messages is moved to a dedicated thread right? That is OK for me because as you already said the order of messages is important.

So we have to update the other pull request as discussed there. But this will not affect this PR.

I never used the volatile keyword so I have to read some docs first 😄 But for that special case I do not think it is required because the unit test is running successful and the SpinWait only may run a little bit longer than required.

@chkr1011
Copy link
Collaborator

chkr1011 commented Apr 4, 2020

@PSanetra Please let me know if this pull request is fully updated. Then I will start my review.

…t_Order

This failing test was caused by a raise condition in the MqttClientSessionsManager
The test failed as the value of `flow` in the assertion `Assert.AreEqual("cdc", flow);` was "ccd"
@PSanetra PSanetra force-pushed the async-message-processing branch from f372a8f to eeb06fa Compare April 4, 2020 13:09
@PSanetra
Copy link
Contributor Author

PSanetra commented Apr 4, 2020

@chkr1011 yes I have rebased it again onto the current master. It should now be ready for review.

Copy link
Collaborator

@chkr1011 chkr1011 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a general question. If we move the publish packet processing to a new thread now. What happens if the client takes too much time to process them? I remember that our decision to NOT do this was because it might happen that either a message failed and the client crashed. In such cases all messages in the queue are lost. Another problem is that the list gets bigger and bigger if processing takes more time than new messages arrive. This will lead to a memory leak which might crash the client and all messages in queue are lost. The broker is not able to detect this.

@@ -34,6 +34,9 @@ public class MqttClient : Disposable, IMqttClient
private CancellationTokenSource _backgroundCancellationTokenSource;
private Task _packetReceiverTask;
private Task _keepAlivePacketsSenderTask;
private Task _publishPacketReceiverTask;

private AsyncQueue<MqttPublishPacket> _publishPacketReceiverQueue;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this must be disposed as soon as the client gets disposed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@PSanetra
Copy link
Contributor Author

PSanetra commented Apr 4, 2020

@chkr1011 the messages in the queue are not lost if they are received via a QoS 1 or QoS 2 subscription as they would be resend after a client restart. QoS 0 messages are semantically allowed to get lost.

Regarding the unbound queue and possible memory leak due to too many messages:
I think in MQTT 3.1.1 there is no mechanism in the protocol which limits the maximum in-flight messages a broker should send to the client. But I guess there should always be a way to configure this in the broker even if this is not part of the MQTT 3.1.1 specification. (E.g. there is such an option in vernemq for QoS 1 and 2 messages which defaults to 20 in-flight messages)

With MQTT 5 it is possible to configure a receive maximum for the connection. The default is 65535. Sadly this is only specified for QoS 1 and QoS 2 publications, too.

Maybe it would be an option to keep processing QoS 0 messages synchronously and QoS 1 and 2 messages asynchronously?! Otherwise this would mean that the issues #648 and #829 will still apply to processing of QoS 0 messages. Another question: Isn't the broker sending the packets anyway even if the client is not handling them? I guess that would mean that the packets are queued by the OS and could maybe result in such a client crash, too?

Another option would be to just drop new messages if the queue would be getting too big. This would be semantically ok for QoS 0 messages. QoS 1 and QoS 2 messages would be resend by the broker as the client would not send PUBACK and PUBREC packets for those messages back to the broker. I think this option is my favorite one.

@chkr1011
Copy link
Collaborator

chkr1011 commented Apr 4, 2020

You are right. I was only thinking about QoS 0. So QoS 1 and 2 are covered. Regarding the flooding of the inflight queue: In my opinion the receival will stop if the receive buffer is full. When I am not wrong this is handled by TCP automatically so sending will stop because the buffer on the other side is full. So there is already a change for package loss even with the current implementation.

I agree that we have to add some limit at broker level. Either a general broker setting (MQTT <= 3.1.1) or an individual value per connection/session (MQTT >= 5.0.0).

TLDR: I think we can proceed with your changes...

@chkr1011
Copy link
Collaborator

chkr1011 commented Apr 4, 2020

One last thing. Please add a one liner in the nuspec describing the change like the already existing entries.

This commit fixes issues which were caused by processing messages synchronously in the packet receiver loop. This blocked KeepAlive and Ack packets from being processed while a message was processed.

Fixes dotnet#648
Fixes dotnet#587
Fixes dotnet#829
@PSanetra PSanetra force-pushed the async-message-processing branch from eeb06fa to fcd6b7f Compare April 4, 2020 20:41
@PSanetra
Copy link
Contributor Author

PSanetra commented Apr 4, 2020

@chkr1011 I have added release notes to the nuspec file.

Should I begin with implementing a packet drop mechanism for the _publishPacketReceiverQueue or should we just rely on the broker to not send more in-flight messages than the client could handle?

@chkr1011
Copy link
Collaborator

chkr1011 commented Apr 4, 2020

@PSanetra In my opinion we need such thing at client and server. Then the users have full control.

@chkr1011 chkr1011 merged commit 9fd7c7b into dotnet:master Apr 4, 2020
@chkr1011
Copy link
Collaborator

chkr1011 commented Apr 4, 2020

@PSanetra Please check the master branch. After merging this PR I get errors in some unit tests like:

X Send_Reply_In_Message_Handler [131ms]
Error Message:
Test method MQTTnet.Tests.Client_Tests.Send_Reply_In_Message_Handler threw exception:
System.Exception: Client(s) had 1 errors ($[2020-04-04T21:39:36.9102228Z] [client] [12] [MqttClient] [Error]: Error while handling application message.
System.NullReferenceException: Object reference not set to an instance of an object.
at MQTTnet.Client.MqttClient.PublishAtLeastOnceAsync(MqttPublishPacket publishPacket, CancellationToken cancellationToken) in C:\projects\mqttnet\Source\MQTTnet\Client\MqttClient.cs:line 677
at MQTTnet.Tests.Client_Tests.<>c__DisplayClass5_0.<<Send_Reply_In_Message_Handler>b__1>d.MoveNext() in C:\projects\mqttnet\Tests\MQTTnet.Core.Tests\MqttClient_Tests.cs:line 82
--- End of stack trace from previous location where exception was thrown ---
at MQTTnet.Client.MqttClient.HandleReceivedApplicationMessageAsync(MqttPublishPacket publishPacket) in C:\projects\mqttnet\Source\MQTTnet\Client\MqttClient.cs:line 706
at MQTTnet.Client.MqttClient.ProcessReceivedPublishPackets(CancellationToken cancellationToken) in C:\projects\mqttnet\Source\MQTTnet\Client\MqttClient.cs:line 628).
Stack Trace:
at MQTTnet.Tests.Mockups.TestEnvironment.ThrowIfLogErrors() in C:\projects\mqttnet\Tests\MQTTnet.Core.Tests\Mockups\TestEnvironment.cs:line 138
at MQTTnet.Tests.Mockups.TestEnvironment.Dispose(Boolean disposing) in C:\projects\mqttnet\Tests\MQTTnet.Core.Tests\Mockups\TestEnvironment.cs:line 156
at MQTTnet.Internal.Disposable.Dispose() in C:\projects\mqttnet\Source\MQTTnet\Internal\Disposable.cs:line 41
at MQTTnet.Tests.Client_Tests.Send_Reply_In_Message_Handler() in C:\projects\mqttnet\Tests\MQTTnet.Core.Tests\MqttClient_Tests.cs:line 89
at Microsoft.VisualStudio.TestPlatform.MSTestAdapter.PlatformServices.ThreadOperations.ExecuteWithAbortSafety(Action action)
X Publish_Multiple_Clients [865ms]
Error Message:

@PSanetra
Copy link
Contributor Author

PSanetra commented Apr 6, 2020

@chkr1011 I can not reproduce the failing test locally and it seems like the test does not fail anymore in the current build. Did you find the cause?

@chkr1011
Copy link
Collaborator

chkr1011 commented Apr 6, 2020

I ended up avoiding to set the adapter to null when disconnecting. It might happen that a running task (especially for strange scenarios in unit tests) is finishing after disconnect and dispose. This thread will access the adapter for result conversion but it is already set to null.

But there are still other errors in the build log. They do not happen on my machine but often at the build server.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature-request New feature or request
Projects
None yet
3 participants