Skip to content

Commit

Permalink
Merge pull request #943 from relu91/fix_942
Browse files Browse the repository at this point in the history
Fix subscribe unsubscribe and then subscribe again MQTT issue
  • Loading branch information
relu91 authored Mar 28, 2023
2 parents 85a1bb7 + 151f292 commit 31feb64
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 2 deletions.
9 changes: 9 additions & 0 deletions packages/binding-mqtt/src/mqtt-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ export default class MqttClient implements ProtocolClient {
this.client = mqtt.connect(brokerUri, this.config);
}

if (this.client.connected) {
this.client.subscribe(topic);
resolve(
new Subscription(() => {
this.client.unsubscribe(topic);
})
);
}

this.client.on("connect", () => {
this.client.subscribe(topic);
resolve(
Expand Down
39 changes: 37 additions & 2 deletions packages/binding-mqtt/test/mqtt-client-subscribe-test.unit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,43 @@ describe("MQTT client implementation", () => {
done(err);
}
})
.then(() => mqttClient.invokeResource(form, new Content("", Readable.from(Buffer.from("test")))))
.then(() => mqttClient.stop())
.then(async (sub) => {
await mqttClient.invokeResource(form, new Content("", Readable.from(Buffer.from("test"))));
// Need to manually unsubscribe because stopping the client will not unsubscribe all subscriptions
sub.unsubscribe();
await mqttClient.stop();
})
.catch((err) => done(err));
}).timeout(10000);

it("should subscribe unsubscribe and subscribe again", (done: Mocha.Done) => {
const mqttClient = new MqttClient();
const form: MqttForm = {
href: brokerUri + "/" + property,
"mqtt:qos": MqttQoS.QoS0,
"mqtt:retain": false,
};

mqttClient
.subscribeResource(form, () => {
/** No-op */
})
.then(async (sub) => {
sub.unsubscribe();
const sub2 = await mqttClient.subscribeResource(form, async (value: Content) => {
try {
const data = await value.toBuffer();
expect(data.toString()).to.be.equal("test");
done();
} catch (err) {
done(err);
} finally {
await mqttClient.stop();
}
});
await mqttClient.invokeResource(form, new Content("", Readable.from(Buffer.from("test"))));
sub2.unsubscribe();
})
.catch((err) => done(err));
}).timeout(10000);
});
Expand Down

0 comments on commit 31feb64

Please sign in to comment.