Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue577 #580

Merged
merged 3 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions example/mqtt_client_publish_qos1.dart
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ Future<int> main() async {
client.published!.listen((MqttPublishMessage message) {
print(
'EXAMPLE::Published notification:: topic is ${message.variableHeader!.topicName}, with Qos ${message.header!.qos}');
if (message.variableHeader!.topicName == topic3) {
print('EXAMPLE:: Non subscribed topic received.');
}
});

final builder1 = MqttClientPayloadBuilder();
Expand Down
2 changes: 0 additions & 2 deletions lib/src/mqtt_client_publishing_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,10 @@ class PublishingManager implements IPublishingManager {
// QOS AtMostOnce 0 require no response.
// Send the message for processing to whoever is waiting.
_fireMessageReceived(topic, msg);
_notifyPublish(msg);
} else if (pubMsg.header!.qos == MqttQos.atLeastOnce) {
// QOS AtLeastOnce 1 requires an acknowledgement
// Send the message for processing to whoever is waiting.
_fireMessageReceived(topic, msg);
_notifyPublish(msg);
// If configured the client will send the acknowledgement, else the user must.
final messageIdentifier = pubMsg.variableHeader!.messageIdentifier;
if (!manuallyAcknowledgeQos1) {
Expand Down
59 changes: 51 additions & 8 deletions test/mqtt_client_publishing_manager_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ void main() {
expect(pubMess.payload.toString(),
'Payload: {4 bytes={<116><101><115><116>');
});
test('Publish consecutive topics', () {
test('Publish consecutive topics', () async {
final clientEventBus = events.EventBus();
final testCHS = TestConnectionHandlerSend(clientEventBus,
socketOptions: socketOptions);
Expand All @@ -197,7 +197,8 @@ void main() {
pm.publish(PublicationTopic('A/rawTopic'), MqttQos.exactlyOnce, buff);
expect(msgId2, msgId1 + 1);
});
test('Publish at least once and ack', () {
test('Publish at least once and ack', () async {
var messageOnPublished = false;
final clientEventBus = events.EventBus();
final testCHS = TestConnectionHandlerSend(clientEventBus,
socketOptions: socketOptions);
Expand All @@ -208,14 +209,20 @@ void main() {
buff[1] = 'e'.codeUnitAt(0);
buff[2] = 's'.codeUnitAt(0);
buff[3] = 't'.codeUnitAt(0);
pm.published.stream.listen((message) {
messageOnPublished = true;
});
final msgId =
pm.publish(PublicationTopic('A/rawTopic'), MqttQos.atLeastOnce, buff);
expect(msgId, 1);
pm.handlePublishAcknowledgement(
MqttPublishAckMessage().withMessageIdentifier(msgId));
expect(pm.publishedMessages.containsKey(1), isFalse);
await Future.delayed(Duration(seconds: 1));
expect(messageOnPublished, isTrue);
});
test('Publish exactly once, release and complete', () {
test('Publish exactly once, release and complete', () async {
var messageOnPublished = false;
final clientEventBus = events.EventBus();
final testCHS = TestConnectionHandlerSend(clientEventBus,
socketOptions: socketOptions);
Expand All @@ -226,6 +233,9 @@ void main() {
buff[1] = 'e'.codeUnitAt(0);
buff[2] = 's'.codeUnitAt(0);
buff[3] = 't'.codeUnitAt(0);
pm.published.stream.listen((message) {
messageOnPublished = true;
});
final msgId =
pm.publish(PublicationTopic('A/rawTopic'), MqttQos.exactlyOnce, buff);
expect(msgId, 1);
Expand All @@ -237,8 +247,11 @@ void main() {
pm.handlePublishComplete(
MqttPublishCompleteMessage().withMessageIdentifier(msgId));
expect(pm.publishedMessages, isEmpty);
await Future.delayed(Duration(seconds: 1));
expect(messageOnPublished, isTrue);
});
test('Publish received at most once', () {
test('Publish received at most once', () async {
var nothingOnPublished = true;
final clientEventBus = events.EventBus();
final testCHS = TestConnectionHandlerSend(clientEventBus,
socketOptions: socketOptions);
Expand All @@ -253,11 +266,17 @@ void main() {
.toTopic('A/rawTopic')
.withQos(MqttQos.atMostOnce)
.publishData(data);
pm.published.stream.listen((MqttPublishMessage message) {
nothingOnPublished = false;
});
pm.handlePublish(pubMess);
expect(pm.receivedMessages.containsKey(msgId), isFalse);
expect(testCHS.sentMessages.isEmpty, isTrue);
await Future.delayed(Duration(seconds: 1));
expect(nothingOnPublished, isTrue);
});
test('Publish received at least once', () {
test('Publish received at least once', () async {
var nothingOnPublished = true;
final clientEventBus = events.EventBus();
final testCHS = TestConnectionHandlerSend(clientEventBus,
socketOptions: socketOptions);
Expand All @@ -272,12 +291,19 @@ void main() {
.toTopic('A/rawTopic')
.withQos(MqttQos.atLeastOnce)
.publishData(data);
pm.published.stream.listen((MqttPublishMessage message) {
nothingOnPublished = true;
});
pm.handlePublish(pubMess);
expect(pm.receivedMessages.containsKey(msgId), isFalse);
expect(testCHS.sentMessages[0]!.header!.messageType,
MqttMessageType.publishAck);
await Future.delayed(Duration(seconds: 1));
expect(nothingOnPublished, isTrue);
});
test('Publish received at least once - manual acknowledge in force', () {
test('Publish received at least once - manual acknowledge in force',
() async {
var nothingOnPublished = true;
final clientEventBus = events.EventBus();
final testCHS = TestConnectionHandlerSend(clientEventBus,
socketOptions: socketOptions);
Expand All @@ -293,13 +319,19 @@ void main() {
.toTopic('A/rawTopic')
.withQos(MqttQos.atLeastOnce)
.publishData(data);
pm.published.stream.listen((MqttPublishMessage message) {
nothingOnPublished = true;
});
pm.handlePublish(pubMess);
expect(pm.receivedMessages.containsKey(msgId), isFalse);
expect(testCHS.sentMessages.length, 0);
expect(pm.awaitingManualAcknowledge.length, 1);
expect(pm.awaitingManualAcknowledge.keys.contains(msgId), isTrue);
await Future.delayed(Duration(seconds: 1));
expect(nothingOnPublished, isTrue);
});
test('Publish recieved exactly once', () {
test('Publish received exactly once', () async {
var nothingOnPublished = true;
final clientEventBus = events.EventBus();
final testCHS = TestConnectionHandlerSend(clientEventBus,
socketOptions: socketOptions);
Expand All @@ -314,12 +346,18 @@ void main() {
.toTopic('A/rawTopic')
.withQos(MqttQos.exactlyOnce)
.publishData(data);
pm.published.stream.listen((MqttPublishMessage message) {
nothingOnPublished = false;
});
pm.handlePublish(pubMess);
expect(pm.receivedMessages.containsKey(msgId), isTrue);
expect(testCHS.sentMessages[0]!.header!.messageType,
MqttMessageType.publishReceived);
await Future.delayed(Duration(seconds: 1));
expect(nothingOnPublished, isTrue);
});
test('Release recieved exactly once', () {
test('Release received exactly once', () async {
var nothingOnPublished = true;
final clientEventBus = events.EventBus();
final testCHS = TestConnectionHandlerSend(clientEventBus,
socketOptions: socketOptions);
Expand All @@ -334,6 +372,9 @@ void main() {
.toTopic('A/rawTopic')
.withQos(MqttQos.exactlyOnce)
.publishData(data);
pm.published.stream.listen((MqttPublishMessage message) {
nothingOnPublished = false;
});
pm.handlePublish(pubMess);
expect(pm.receivedMessages.containsKey(msgId), isTrue);
expect(testCHS.sentMessages[0]!.header!.messageType,
Expand All @@ -343,6 +384,8 @@ void main() {
expect(pm.receivedMessages.containsKey(msgId), isFalse);
expect(testCHS.sentMessages[1]!.header!.messageType,
MqttMessageType.publishComplete);
await Future.delayed(Duration(seconds: 1));
expect(nothingOnPublished, isTrue);
});
test('Publish exactly once, interleaved scenario 1', () {
final clientEventBus = events.EventBus();
Expand Down
Loading