From fe294f1ffe6484ecd2d39c0a8cdff1b99c6be719 Mon Sep 17 00:00:00 2001 From: Simon Hailes Date: Fri, 19 May 2017 15:12:03 +0100 Subject: [PATCH 1/5] Add option 'qos2Puback' - if set to true, will modify published messages to QOS-1, resulting in a puback message. Prevents mqtt.js keeping qos-2 messages forever if incorrectly configured. --- .../Server_With_All_Interfaces-Settings.js | 1 + lib/client.js | 13 ++- lib/options.js | 8 +- lib/server.js | 3 + test/abstract_server.js | 84 +++++++++++++++++++ 5 files changed, 105 insertions(+), 4 deletions(-) diff --git a/examples/Server_With_All_Interfaces-Settings.js b/examples/Server_With_All_Interfaces-Settings.js index 4ab7680..cfecadc 100644 --- a/examples/Server_With_All_Interfaces-Settings.js +++ b/examples/Server_With_All_Interfaces-Settings.js @@ -19,6 +19,7 @@ var moscaSetting = { { type: "https", port: 3001, bundle: true, credentials: { keyPath: SECURE_KEY, certPath: SECURE_CERT } } ], stats: false, + qos2Puback: false, // can set to true if using a client which will eat puback for QOS 2; e.g. mqtt.js logger: { name: 'MoscaServer', level: 'debug' }, diff --git a/lib/client.js b/lib/client.js index 6f6dbb7..34a61cf 100644 --- a/lib/client.js +++ b/lib/client.js @@ -536,14 +536,22 @@ Client.prototype.handleAuthorizePublish = function(err, success, packet) { } var dopuback = function() { + // if qos2Puback, then if qos 2, don't just ignore the message, puback it + // by converting internally to qos 1. + // this fools mqtt.js into not holding all messages forever + if (that.server.qos2Puback === true){ + if (packet.qos === 2){ + packet.qos = 1; + } + } + if (packet.qos === 1 && !(that._closed || that._closing)) { that.connection.puback({ messageId: packet.messageId }); } }; - - + // if success is passed as 'ignore', ack but don't publish. if (success !== 'ignore'){ // publish message @@ -552,6 +560,7 @@ Client.prototype.handleAuthorizePublish = function(err, success, packet) { // ignore but acknowledge message dopuback(); } + }; /** diff --git a/lib/options.js b/lib/options.js index 1737484..ee9b2f5 100755 --- a/lib/options.js +++ b/lib/options.js @@ -62,7 +62,8 @@ function modernize(legacy) { "stats", "publishNewClient", "publishClientDisconnect", - "publishSubscriptions" + "publishSubscriptions", + "qos2Puback" ]; // copy all conserved options @@ -252,7 +253,8 @@ function validate(opts, validationOptions) { 'stats': { type: 'boolean' }, 'publishNewClient': { type: 'boolean' }, 'publishClientDisconnect': { type: 'boolean' }, - 'publishSubscriptions': { type: 'boolean' } + 'publishSubscriptions': { type: 'boolean' }, + 'qos2Puback': { type: 'boolean' }, } }); @@ -330,6 +332,7 @@ function defaultsLegacy() { publishClientDisconnect: true, publishSubscriptions: true, maxInflightMessages: 1024, + qos2Puback: false, logger: { name: "mosca", level: "warn", @@ -366,6 +369,7 @@ function defaultsModern() { publishClientDisconnect: true, publishSubscriptions: true, maxInflightMessages: 1024, + qos2Puback: false, logger: { name: "mosca", level: "warn", diff --git a/lib/server.js b/lib/server.js index 4edee03..02615a3 100755 --- a/lib/server.js +++ b/lib/server.js @@ -158,6 +158,9 @@ function Server(opts, callback) { var that = this; + // put QOS-2 spoofing as a variable direct on server + this.qos2Puback = this.modernOpts.qos2Puback; + // each Server has a dummy id for logging purposes this.id = this.modernOpts.id || shortid.generate(); diff --git a/test/abstract_server.js b/test/abstract_server.js index f32ad2c..311f37d 100644 --- a/test/abstract_server.js +++ b/test/abstract_server.js @@ -956,6 +956,90 @@ module.exports = function(moscaSettings, createConnection) { }); }); + it("should by default not puback client publish to QOS 2", function(done) { + var onPublishedCalled = false; + var clientId; + var count = 0; + var timer; + + instance.published = function(packet, serverClient, callback) { + onPublishedCalled = true; + expect(packet.topic).to.be.equal("testQOS2"); + callback(); + }; + + buildAndConnect(done, function(client) { + clientId = client.opts.clientId; + + client.publish({ + messageId: 42, + topic: "testQOS2", + payload: "publish expected", + qos: 2 + }); + + // allow 1 second to hear puback + timer = setTimeout(function(){ + client.disconnect(); + }, 1000); + + // default QOS 2 should NOT puback + client.on("puback", function() { + count++; + //expect(count).to.eql(1); + client.disconnect(); + }); + client.on("close", function() { + expect(count).to.eql(0); + client.disconnect(); + clearTimeout(timer); + }); + }); + }); + + + it("should optionally (.qos2Puback) puback client publish to QOS 2", function(done) { + var onPublishedCalled = false; + var clientId; + var count = 0; + var timer; + + instance.qos2Puback = true; + instance.published = function(packet, serverClient, callback) { + onPublishedCalled = true; + expect(packet.topic).to.be.equal("testQOS2"); + callback(); + }; + + buildAndConnect(done, function(client) { + clientId = client.opts.clientId; + + client.publish({ + messageId: 42, + topic: "testQOS2", + payload: "publish expected", + qos: 2 + }); + + // allow 1 second to hear puback + timer = setTimeout(function(){ + client.disconnect(); + }, 1000); + + // with maxqos=1, QOS 2 should puback + client.on("puback", function() { + count++; + expect(count).to.eql(1); + client.disconnect(); + }); + client.on("close", function() { + expect(count).to.eql(1); + client.disconnect(); + clearTimeout(timer); + }); + }); + }); + it("should emit an event when a new client is connected", function(done) { buildClient(done, function(client) { From 9a7e3ecca91cfb7774683ff8e2d501ae5fbb3edc Mon Sep 17 00:00:00 2001 From: Simon Hailes Date: Fri, 19 May 2017 15:54:57 +0100 Subject: [PATCH 2/5] change qos2puback option to a string called onQoS2publish, which is initially 'noack', and can optionally be 'disconnect' or 'dropToQoS1' 'disconnect' will cause a client disconnect on receipt of a QoS2 message. 'dropToQoS1' will puback the message, allowing mqtt.js to work; just not at QoS2 service level. --- .../Server_With_All_Interfaces-Settings.js | 2 +- lib/client.js | 14 ++++- lib/options.js | 8 +-- lib/server.js | 2 +- test/abstract_server.js | 53 ++++++++++++++++++- 5 files changed, 69 insertions(+), 10 deletions(-) diff --git a/examples/Server_With_All_Interfaces-Settings.js b/examples/Server_With_All_Interfaces-Settings.js index cfecadc..f6f5334 100644 --- a/examples/Server_With_All_Interfaces-Settings.js +++ b/examples/Server_With_All_Interfaces-Settings.js @@ -19,7 +19,7 @@ var moscaSetting = { { type: "https", port: 3001, bundle: true, credentials: { keyPath: SECURE_KEY, certPath: SECURE_CERT } } ], stats: false, - qos2Puback: false, // can set to true if using a client which will eat puback for QOS 2; e.g. mqtt.js + onQoS2publish: 'noack', // can set to 'disconnect', or to 'dropToQoS1' if using a client which will eat puback for QOS 2; e.g. mqtt.js logger: { name: 'MoscaServer', level: 'debug' }, diff --git a/lib/client.js b/lib/client.js index 34a61cf..a68bdbb 100644 --- a/lib/client.js +++ b/lib/client.js @@ -539,9 +539,19 @@ Client.prototype.handleAuthorizePublish = function(err, success, packet) { // if qos2Puback, then if qos 2, don't just ignore the message, puback it // by converting internally to qos 1. // this fools mqtt.js into not holding all messages forever - if (that.server.qos2Puback === true){ - if (packet.qos === 2){ + if (packet.qos === 2){ + switch(that.server.onQoS2publish){ + case 'dropToQoS1': packet.qos = 1; + break; + case 'disconnect': + if (!this._closed && !this._closing) { + that.close(null, "qos2 caused disconnect"); + } + return; + break; + default: + break; } } diff --git a/lib/options.js b/lib/options.js index ee9b2f5..56c7a6e 100755 --- a/lib/options.js +++ b/lib/options.js @@ -63,7 +63,7 @@ function modernize(legacy) { "publishNewClient", "publishClientDisconnect", "publishSubscriptions", - "qos2Puback" + "onQoS2publish" ]; // copy all conserved options @@ -254,7 +254,7 @@ function validate(opts, validationOptions) { 'publishNewClient': { type: 'boolean' }, 'publishClientDisconnect': { type: 'boolean' }, 'publishSubscriptions': { type: 'boolean' }, - 'qos2Puback': { type: 'boolean' }, + 'onQoS2publish': { type: 'string' }, } }); @@ -332,7 +332,7 @@ function defaultsLegacy() { publishClientDisconnect: true, publishSubscriptions: true, maxInflightMessages: 1024, - qos2Puback: false, + onQoS2publish: 'noack', logger: { name: "mosca", level: "warn", @@ -369,7 +369,7 @@ function defaultsModern() { publishClientDisconnect: true, publishSubscriptions: true, maxInflightMessages: 1024, - qos2Puback: false, + onQoS2publish: 'noack', logger: { name: "mosca", level: "warn", diff --git a/lib/server.js b/lib/server.js index 02615a3..abc17a5 100755 --- a/lib/server.js +++ b/lib/server.js @@ -159,7 +159,7 @@ function Server(opts, callback) { var that = this; // put QOS-2 spoofing as a variable direct on server - this.qos2Puback = this.modernOpts.qos2Puback; + this.onQoS2publish = this.modernOpts.onQoS2publish; // each Server has a dummy id for logging purposes this.id = this.modernOpts.id || shortid.generate(); diff --git a/test/abstract_server.js b/test/abstract_server.js index 311f37d..53e98b7 100644 --- a/test/abstract_server.js +++ b/test/abstract_server.js @@ -998,13 +998,13 @@ module.exports = function(moscaSettings, createConnection) { }); - it("should optionally (.qos2Puback) puback client publish to QOS 2", function(done) { + it("should optionally (onQoS2publish='droptoQoS1') puback client publish to QOS 2", function(done) { var onPublishedCalled = false; var clientId; var count = 0; var timer; - instance.qos2Puback = true; + instance.onQoS2publish = 'droptoQoS1'; instance.published = function(packet, serverClient, callback) { onPublishedCalled = true; expect(packet.topic).to.be.equal("testQOS2"); @@ -1039,6 +1039,55 @@ module.exports = function(moscaSettings, createConnection) { }); }); }); + +it("should optionally (onQoS2publish='disconnect') disconnect client on publish of QOS2 message", function(done) { + var onPublishedCalled = false; + var clientId; + var count = 0; + var timer; + + instance.onQoS2publish = 'disconnect'; + instance.published = function(packet, serverClient, callback) { + onPublishedCalled = true; + expect(packet.topic).to.be.equal("should not have published"); + callback(); + }; + + buildAndConnect(done, function(client) { + clientId = client.opts.clientId; + + client.publish({ + messageId: 42, + topic: "QOS2Test", + payload: "some data to cause close", + qos: 1 + }); + + // if after 2 seconds, we've not closed + timer = setTimeout(function(){ + var test = false; + expect(count).to.eql(0); + expect(test).to.eql(true); + client.disconnect(); + }, 2000); + + // onQoS2publish = 'disconnect' should NOT puback + client.on("puback", function() { + expect(onPublishedCalled).to.eql(false); + count++; + expect(count).to.eql(0); + client.disconnect(); + }); + client.on("close", function() { + expect(onPublishedCalled).to.eql(false); + expect(count).to.eql(0); + client.disconnect(); + clearTimeout(timer); + }); + }); + }); + + it("should emit an event when a new client is connected", function(done) { buildClient(done, function(client) { From 79fb72013092e0ac0712fcf431715903ae0380a5 Mon Sep 17 00:00:00 2001 From: Simon Hailes Date: Fri, 19 May 2017 15:58:22 +0100 Subject: [PATCH 3/5] fix testing typo --- test/abstract_server.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/abstract_server.js b/test/abstract_server.js index 53e98b7..c094931 100644 --- a/test/abstract_server.js +++ b/test/abstract_server.js @@ -1004,7 +1004,7 @@ module.exports = function(moscaSettings, createConnection) { var count = 0; var timer; - instance.onQoS2publish = 'droptoQoS1'; + instance.onQoS2publish = 'dropToQoS1'; instance.published = function(packet, serverClient, callback) { onPublishedCalled = true; expect(packet.topic).to.be.equal("testQOS2"); From f06c6652fd151db812424be2af819701a6e6e195 Mon Sep 17 00:00:00 2001 From: Simon Hailes Date: Fri, 19 May 2017 16:11:39 +0100 Subject: [PATCH 4/5] fix QoS2 'disconnect' test --- test/abstract_server.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/abstract_server.js b/test/abstract_server.js index c094931..470f0b4 100644 --- a/test/abstract_server.js +++ b/test/abstract_server.js @@ -998,7 +998,7 @@ module.exports = function(moscaSettings, createConnection) { }); - it("should optionally (onQoS2publish='droptoQoS1') puback client publish to QOS 2", function(done) { + it("should optionally (onQoS2publish='dropToQoS1') puback client publish to QOS 2", function(done) { var onPublishedCalled = false; var clientId; var count = 0; @@ -1060,7 +1060,7 @@ it("should optionally (onQoS2publish='disconnect') disconnect client on publish messageId: 42, topic: "QOS2Test", payload: "some data to cause close", - qos: 1 + qos: 2 }); // if after 2 seconds, we've not closed From 32f071461a20b5ada646844397578010b40a4d27 Mon Sep 17 00:00:00 2001 From: Simon Hailes Date: Fri, 19 May 2017 16:31:41 +0100 Subject: [PATCH 5/5] fix onQos2publish code & update comments --- lib/client.js | 38 ++++++++++++++++++++------------------ 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/lib/client.js b/lib/client.js index a68bdbb..39822e6 100644 --- a/lib/client.js +++ b/lib/client.js @@ -535,26 +535,28 @@ Client.prototype.handleAuthorizePublish = function(err, success, packet) { packet.payload = success; } - var dopuback = function() { - // if qos2Puback, then if qos 2, don't just ignore the message, puback it - // by converting internally to qos 1. - // this fools mqtt.js into not holding all messages forever - if (packet.qos === 2){ - switch(that.server.onQoS2publish){ - case 'dropToQoS1': - packet.qos = 1; - break; - case 'disconnect': - if (!this._closed && !this._closing) { - that.close(null, "qos2 caused disconnect"); - } - return; - break; - default: - break; - } + // Mosca does not support QoS2 + // if onQoS2publish === 'dropToQoS1', don't just ignore QoS2 message, puback it + // by converting internally to qos 1. + // this fools mqtt.js into not holding all messages forever + // if onQoS2publish === 'disconnect', then break the client connection if QoS2 + if (packet.qos === 2){ + switch(that.server.onQoS2publish){ + case 'dropToQoS1': + packet.qos = 1; + break; + case 'disconnect': + if (!this._closed && !this._closing) { + that.close(null, "qos2 caused disconnect"); + } + return; + break; + default: + break; } + } + var dopuback = function() { if (packet.qos === 1 && !(that._closed || that._closing)) { that.connection.puback({ messageId: packet.messageId