From 44642dc802b9d70be1c1f4c1c55dbc0d06cf4876 Mon Sep 17 00:00:00 2001 From: Behrad Date: Tue, 12 Apr 2016 15:57:37 +0430 Subject: [PATCH 1/9] use QlobberDedup to prevent matching before adding subs --- lib/persistence/levelup.js | 9 +++------ lib/persistence/matcher.js | 2 +- lib/persistence/mongo.js | 2 +- lib/persistence/redis.js | 12 ++++-------- 4 files changed, 9 insertions(+), 16 deletions(-) diff --git a/lib/persistence/levelup.js b/lib/persistence/levelup.js index 4a1ce03..768cf93 100644 --- a/lib/persistence/levelup.js +++ b/lib/persistence/levelup.js @@ -149,7 +149,7 @@ LevelUpPersistence.prototype.lookupRetained = function(pattern, cb) { }); stream.on("data", function(data) { - if (matcher.match(data.key).length > 0) { + if (matcher.match(data.key).size > 0) { matched.push(data.value); } }); @@ -176,9 +176,7 @@ LevelUpPersistence.prototype.storeSubscriptions = function(client, done) { qos: subscriptions[key].qos }; var levelKey = util.format("%s:%s", key, client.id); - if (that._subMatcher.match(key).indexOf(levelKey) < 0) { - that._subMatcher.add(key, levelKey); - } + that._subMatcher.add(key, levelKey); that._subscriptions.put(levelKey, sub); }); } else if (done) { @@ -232,8 +230,7 @@ LevelUpPersistence.prototype.lookupSubscriptions = function(client, done) { LevelUpPersistence.prototype.storeOfflinePacket = function(packet, done) { var that = this; var subs = this._subMatcher.match(packet.topic); - - async.each(subs, function(key, cb) { + async.each(Array.from(subs), function(key, cb) { that._subscriptions.get(key, function(err, sub) { if (err) { return cb(err); diff --git a/lib/persistence/matcher.js b/lib/persistence/matcher.js index 51af830..c1da680 100644 --- a/lib/persistence/matcher.js +++ b/lib/persistence/matcher.js @@ -23,7 +23,7 @@ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ -var Qlobber = require("qlobber").Qlobber; +var Qlobber = require("qlobber").QlobberDedup; var util = require("util"); function Matcher() { diff --git a/lib/persistence/mongo.js b/lib/persistence/mongo.js index 5b31310..fcc8a0e 100644 --- a/lib/persistence/mongo.js +++ b/lib/persistence/mongo.js @@ -279,7 +279,7 @@ MongoPersistence.prototype.lookupRetained = function(pattern, cb) { }); stream.on("data", function(data) { - if (matcher.match(data.topic).length > 0) { + if (matcher.match(data.topic).size > 0) { data.payload = data.payload.buffer; matched.push(data); } diff --git a/lib/persistence/redis.js b/lib/persistence/redis.js index b460033..070604e 100644 --- a/lib/persistence/redis.js +++ b/lib/persistence/redis.js @@ -105,9 +105,7 @@ function RedisPersistence(options, callback) { } Object.keys(subs).forEach(function(sub) { - if (that._subMatcher.match(sub).indexOf(id) < 0) { - that._subMatcher.add(sub, id); - } + that._subMatcher.add(sub, id); }); if( unsubs ) { @@ -229,7 +227,7 @@ RedisPersistence.prototype.lookupRetained = function(pattern, done) { this._client.hkeys("retained", function(err, topics) { topics.sort(); topics = topics.filter(function(topic) { - return matcher.match(topic).length > 0; + return matcher.match(topic).size > 0; }); async.each(topics, match, function(err) { @@ -286,9 +284,7 @@ RedisPersistence.prototype.storeSubscriptions = function(client, cb) { .pexpire(clientSubKey, this.options.ttl.subscriptions); Object.keys(subscriptions).forEach(function(e) { - if (that._subMatcher.match(e).indexOf(client.id) < 0) { - that._subMatcher.add(e, client.id); - } + that._subMatcher.add(e, client.id); }); op.exec(cb); @@ -353,7 +349,7 @@ RedisPersistence.prototype.storeOfflinePacket = function(packet, done) { var that = this; var matches = this._subMatcher.match(packet.topic); - async.each(matches, function(client, cb) { + async.each(Array.from(matches), function(client, cb) { that._storePacket(client, packet, cb); }, done); }; From 1aefcf891cbfd59e4a19f0a6c1f0b99a8ba1f086 Mon Sep 17 00:00:00 2001 From: Behrad Date: Tue, 12 Apr 2016 15:57:37 +0430 Subject: [PATCH 2/9] use QlobberDedup to prevent matching before adding subs --- lib/persistence/levelup.js | 9 +++------ lib/persistence/matcher.js | 2 +- lib/persistence/mongo.js | 2 +- lib/persistence/redis.js | 12 ++++-------- 4 files changed, 9 insertions(+), 16 deletions(-) diff --git a/lib/persistence/levelup.js b/lib/persistence/levelup.js index 4a1ce03..768cf93 100644 --- a/lib/persistence/levelup.js +++ b/lib/persistence/levelup.js @@ -149,7 +149,7 @@ LevelUpPersistence.prototype.lookupRetained = function(pattern, cb) { }); stream.on("data", function(data) { - if (matcher.match(data.key).length > 0) { + if (matcher.match(data.key).size > 0) { matched.push(data.value); } }); @@ -176,9 +176,7 @@ LevelUpPersistence.prototype.storeSubscriptions = function(client, done) { qos: subscriptions[key].qos }; var levelKey = util.format("%s:%s", key, client.id); - if (that._subMatcher.match(key).indexOf(levelKey) < 0) { - that._subMatcher.add(key, levelKey); - } + that._subMatcher.add(key, levelKey); that._subscriptions.put(levelKey, sub); }); } else if (done) { @@ -232,8 +230,7 @@ LevelUpPersistence.prototype.lookupSubscriptions = function(client, done) { LevelUpPersistence.prototype.storeOfflinePacket = function(packet, done) { var that = this; var subs = this._subMatcher.match(packet.topic); - - async.each(subs, function(key, cb) { + async.each(Array.from(subs), function(key, cb) { that._subscriptions.get(key, function(err, sub) { if (err) { return cb(err); diff --git a/lib/persistence/matcher.js b/lib/persistence/matcher.js index 51af830..c1da680 100644 --- a/lib/persistence/matcher.js +++ b/lib/persistence/matcher.js @@ -23,7 +23,7 @@ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ -var Qlobber = require("qlobber").Qlobber; +var Qlobber = require("qlobber").QlobberDedup; var util = require("util"); function Matcher() { diff --git a/lib/persistence/mongo.js b/lib/persistence/mongo.js index 5b31310..fcc8a0e 100644 --- a/lib/persistence/mongo.js +++ b/lib/persistence/mongo.js @@ -279,7 +279,7 @@ MongoPersistence.prototype.lookupRetained = function(pattern, cb) { }); stream.on("data", function(data) { - if (matcher.match(data.topic).length > 0) { + if (matcher.match(data.topic).size > 0) { data.payload = data.payload.buffer; matched.push(data); } diff --git a/lib/persistence/redis.js b/lib/persistence/redis.js index b460033..070604e 100644 --- a/lib/persistence/redis.js +++ b/lib/persistence/redis.js @@ -105,9 +105,7 @@ function RedisPersistence(options, callback) { } Object.keys(subs).forEach(function(sub) { - if (that._subMatcher.match(sub).indexOf(id) < 0) { - that._subMatcher.add(sub, id); - } + that._subMatcher.add(sub, id); }); if( unsubs ) { @@ -229,7 +227,7 @@ RedisPersistence.prototype.lookupRetained = function(pattern, done) { this._client.hkeys("retained", function(err, topics) { topics.sort(); topics = topics.filter(function(topic) { - return matcher.match(topic).length > 0; + return matcher.match(topic).size > 0; }); async.each(topics, match, function(err) { @@ -286,9 +284,7 @@ RedisPersistence.prototype.storeSubscriptions = function(client, cb) { .pexpire(clientSubKey, this.options.ttl.subscriptions); Object.keys(subscriptions).forEach(function(e) { - if (that._subMatcher.match(e).indexOf(client.id) < 0) { - that._subMatcher.add(e, client.id); - } + that._subMatcher.add(e, client.id); }); op.exec(cb); @@ -353,7 +349,7 @@ RedisPersistence.prototype.storeOfflinePacket = function(packet, done) { var that = this; var matches = this._subMatcher.match(packet.topic); - async.each(matches, function(client, cb) { + async.each(Array.from(matches), function(client, cb) { that._storePacket(client, packet, cb); }, done); }; From 312153213a0a75e62482db975162c8061c469287 Mon Sep 17 00:00:00 2001 From: Behrad Date: Sat, 16 Apr 2016 11:40:09 +0430 Subject: [PATCH 3/9] add Array.from polyfill --- lib/persistence/abstract.js | 16 ++++++++++++++++ lib/persistence/levelup.js | 2 +- lib/persistence/redis.js | 2 +- 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/lib/persistence/abstract.js b/lib/persistence/abstract.js index c0c0060..b833773 100644 --- a/lib/persistence/abstract.js +++ b/lib/persistence/abstract.js @@ -138,6 +138,22 @@ AbstractPersistence.prototype.wire = function(server) { }; }; +function arrayFrom(object) { + return [].slice.call(object); +} + +/** + * Array.from polyfill + * @param aSet the set to changed to an array + */ +AbstractPersistence.prototype.arrayFrom = function(aSet) { + if (Array.from) { + return Array.from(aSet); + } else { + return arrayFrom(aSet); + } +}; + /** * Close the persistance. * diff --git a/lib/persistence/levelup.js b/lib/persistence/levelup.js index 768cf93..d557b18 100644 --- a/lib/persistence/levelup.js +++ b/lib/persistence/levelup.js @@ -230,7 +230,7 @@ LevelUpPersistence.prototype.lookupSubscriptions = function(client, done) { LevelUpPersistence.prototype.storeOfflinePacket = function(packet, done) { var that = this; var subs = this._subMatcher.match(packet.topic); - async.each(Array.from(subs), function(key, cb) { + async.each(this.arrayFrom(subs), function(key, cb) { that._subscriptions.get(key, function(err, sub) { if (err) { return cb(err); diff --git a/lib/persistence/redis.js b/lib/persistence/redis.js index 070604e..9e0e010 100644 --- a/lib/persistence/redis.js +++ b/lib/persistence/redis.js @@ -349,7 +349,7 @@ RedisPersistence.prototype.storeOfflinePacket = function(packet, done) { var that = this; var matches = this._subMatcher.match(packet.topic); - async.each(Array.from(matches), function(client, cb) { + async.each(this.arrayFrom(matches), function(client, cb) { that._storePacket(client, packet, cb); }, done); }; From 8f4e0f15f61ff15451990f2c49eae3ac77684471 Mon Sep 17 00:00:00 2001 From: Behrad Date: Sat, 16 Apr 2016 18:39:34 +0430 Subject: [PATCH 4/9] take removing unSubs out of multi callback & also reduce extra .publish --- lib/persistence/redis.js | 40 +++++++++++++++++----------------------- 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/lib/persistence/redis.js b/lib/persistence/redis.js index 9e0e010..08c6f9d 100644 --- a/lib/persistence/redis.js +++ b/lib/persistence/redis.js @@ -258,36 +258,30 @@ RedisPersistence.prototype.storeSubscriptions = function(client, cb) { } }); - var op = this._client.multi() - .get(clientSubKey, function(err, currentSubs){ - if( err || !currentSubs ) { - return; - } - currentSubs = JSON.parse( currentSubs ); - var unsubs = Object.keys(currentSubs).filter(function(topic){ + this._client.get(clientSubKey, function(err, currentSubs){ + if( !err && currentSubs ) { + currentSubs = JSON.parse(currentSubs); + var unsubs = Object.keys(currentSubs).filter(function (topic) { return !subscriptions[topic]; }); - unsubs.forEach(function(topic) { + unsubs.forEach(function (topic) { that._subMatcher.remove(topic, client.id); }); - that._client.publish(that.options.channel, JSON.stringify({ + } + var op = that._client.multi() + .set(clientSubKey, JSON.stringify(subscriptions)) + .publish(that.options.channel, JSON.stringify({ key: clientSubKey, - unsubs: unsubs, process: that._id - })); - }) - .set(clientSubKey, JSON.stringify(subscriptions)) - .publish(this.options.channel, JSON.stringify({ - key: clientSubKey, - process: this._id - })) - .pexpire(clientSubKey, this.options.ttl.subscriptions); - - Object.keys(subscriptions).forEach(function(e) { - that._subMatcher.add(e, client.id); - }); + })) + .pexpire(clientSubKey, that.options.ttl.subscriptions); - op.exec(cb); + Object.keys(subscriptions).forEach(function(e) { + that._subMatcher.add(e, client.id); + }); + + op.exec(cb); + }); }; RedisPersistence.prototype._cleanClient = function(client, done) { From c8c50be35ecf204327a25172d64e471ab95f44ca Mon Sep 17 00:00:00 2001 From: Behrad Date: Thu, 14 Apr 2016 19:25:15 +0430 Subject: [PATCH 5/9] Introduce publishSubscriptions option, so that subscription $SYS topics can be switched off, default is on. --- lib/options.js | 8 ++++++-- lib/server.js | 33 +++++++++++++++++++-------------- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/lib/options.js b/lib/options.js index 41d67c1..9ee6069 100755 --- a/lib/options.js +++ b/lib/options.js @@ -61,7 +61,8 @@ function modernize(legacy) { "maxInflightMessages", "stats", "publishNewClient", - "publishClientDisconnect" + "publishClientDisconnect", + "publishSubscriptions" ]; // copy all conserved options @@ -250,7 +251,8 @@ function validate(opts, validationOptions) { 'maxInflightMessages': { type: 'integer' }, 'stats': { type: 'boolean' }, 'publishNewClient': { type: 'boolean' }, - 'publishClientDisconnect': { type: 'boolean' } + 'publishClientDisconnect': { type: 'boolean' }, + 'publishSubscriptions': { type: 'boolean' } } }); @@ -333,6 +335,7 @@ function defaultsLegacy() { stats: false, publishNewClient: true, publishClientDisconnect: true, + publishSubscriptions: true, maxInflightMessages: 1024, logger: { name: "mosca", @@ -368,6 +371,7 @@ function defaultsModern() { stats: false, publishNewClient: true, publishClientDisconnect: true, + publishSubscriptions: true, maxInflightMessages: 1024, logger: { name: "mosca", diff --git a/lib/server.js b/lib/server.js index c7c7913..fcae99e 100755 --- a/lib/server.js +++ b/lib/server.js @@ -67,6 +67,7 @@ var nop = function() {}; * - `stats`, publish the stats every 10s (default false). * - `publishNewClient`, publish message to topic "$SYS/{broker-id}/new/clients" when new client connects. * - `publishClientDisconnect`, publish message to topic "$SYS/{broker-id}/disconnect/clients" when a client disconnects. + * - `publishSubscriptions`, publish message to topic "$SYS/{broker-id}/new/(un)subscribes" when a client subscribes/unsubscribes. * * Interface may contain following properties: * - `type`, name of a build-in type or a custom type factory @@ -262,23 +263,27 @@ function Server(opts, callback) { }); that.on("subscribed", function(topic, client) { - that.publish({ - topic: "$SYS/" + that.id + "/new/subscribes", - payload: JSON.stringify({ - clientId: client.id, - topic: topic - }) - }); + if(that.modernOpts.publishSubscriptions) { + that.publish({ + topic: "$SYS/" + that.id + "/new/subscribes", + payload: JSON.stringify({ + clientId: client.id, + topic: topic + }) + }); + } }); that.on("unsubscribed", function(topic, client) { - that.publish({ - topic: "$SYS/" + that.id + "/new/unsubscribes", - payload: JSON.stringify({ - clientId: client.id, - topic: topic - }) - }); + if(that.modernOpts.publishSubscriptions) { + that.publish({ + topic: "$SYS/" + that.id + "/new/unsubscribes", + payload: JSON.stringify({ + clientId: client.id, + topic: topic + }) + }); + } }); that.on("clientDisconnected", function(client) { From c2120047de32794f57078dba9ad65aeefe0e3074 Mon Sep 17 00:00:00 2001 From: Behrad Date: Sat, 16 Apr 2016 11:17:22 +0430 Subject: [PATCH 6/9] move publishSubscriptions check out of callback --- lib/server.js | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/lib/server.js b/lib/server.js index fcae99e..8ea4b36 100755 --- a/lib/server.js +++ b/lib/server.js @@ -262,8 +262,8 @@ function Server(opts, callback) { ); }); - that.on("subscribed", function(topic, client) { - if(that.modernOpts.publishSubscriptions) { + if(that.modernOpts.publishSubscriptions) { + that.on("subscribed", function(topic, client) { that.publish({ topic: "$SYS/" + that.id + "/new/subscribes", payload: JSON.stringify({ @@ -271,11 +271,9 @@ function Server(opts, callback) { topic: topic }) }); - } - }); + }); - that.on("unsubscribed", function(topic, client) { - if(that.modernOpts.publishSubscriptions) { + that.on("unsubscribed", function(topic, client) { that.publish({ topic: "$SYS/" + that.id + "/new/unsubscribes", payload: JSON.stringify({ @@ -283,8 +281,8 @@ function Server(opts, callback) { topic: topic }) }); - } - }); + }); + } that.on("clientDisconnected", function(client) { if(that.modernOpts.publishClientDisconnect) { From e5da73b651f1b42ba2609ab4539d7d1a66f1d861 Mon Sep 17 00:00:00 2001 From: Behrad Date: Fri, 22 Apr 2016 13:21:06 +0430 Subject: [PATCH 7/9] update async to support ES2015 Set iteration, remove arrayFrom --- lib/persistence/abstract.js | 16 ---------------- lib/persistence/levelup.js | 2 +- lib/persistence/redis.js | 2 +- package.json | 2 +- 4 files changed, 3 insertions(+), 19 deletions(-) diff --git a/lib/persistence/abstract.js b/lib/persistence/abstract.js index b833773..c0c0060 100644 --- a/lib/persistence/abstract.js +++ b/lib/persistence/abstract.js @@ -138,22 +138,6 @@ AbstractPersistence.prototype.wire = function(server) { }; }; -function arrayFrom(object) { - return [].slice.call(object); -} - -/** - * Array.from polyfill - * @param aSet the set to changed to an array - */ -AbstractPersistence.prototype.arrayFrom = function(aSet) { - if (Array.from) { - return Array.from(aSet); - } else { - return arrayFrom(aSet); - } -}; - /** * Close the persistance. * diff --git a/lib/persistence/levelup.js b/lib/persistence/levelup.js index d557b18..2456efd 100644 --- a/lib/persistence/levelup.js +++ b/lib/persistence/levelup.js @@ -230,7 +230,7 @@ LevelUpPersistence.prototype.lookupSubscriptions = function(client, done) { LevelUpPersistence.prototype.storeOfflinePacket = function(packet, done) { var that = this; var subs = this._subMatcher.match(packet.topic); - async.each(this.arrayFrom(subs), function(key, cb) { + async.each(subs, function(key, cb) { that._subscriptions.get(key, function(err, sub) { if (err) { return cb(err); diff --git a/lib/persistence/redis.js b/lib/persistence/redis.js index 08c6f9d..83e339c 100644 --- a/lib/persistence/redis.js +++ b/lib/persistence/redis.js @@ -343,7 +343,7 @@ RedisPersistence.prototype.storeOfflinePacket = function(packet, done) { var that = this; var matches = this._subMatcher.match(packet.topic); - async.each(this.arrayFrom(matches), function(client, cb) { + async.each(matches, function(client, cb) { that._storePacket(client, packet, cb); }, done); }; diff --git a/package.json b/package.json index f5af5d0..666fe5f 100644 --- a/package.json +++ b/package.json @@ -71,7 +71,7 @@ }, "dependencies": { "ascoltatori": "^2.0.0", - "async": "~1.5.2", + "async": "^2.0.0-rc.3", "brfs": "~1.4.2", "bunyan": "^1.5.1", "clone": "^1.0.2", From 18fedc675f8d40f31b7d265f5396f9d8ca7b9444 Mon Sep 17 00:00:00 2001 From: Behrad Date: Fri, 22 Apr 2016 13:22:49 +0430 Subject: [PATCH 8/9] drop node 0.10 support --- .travis.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 6442f00..1eec2eb 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,7 +7,6 @@ addons: packages: - g++-4.8 node_js: - - 0.10 - 0.12 - 4 - 5 From 598b6570507743bd64346238bbcd012242fb1e22 Mon Sep 17 00:00:00 2001 From: Behrad Date: Thu, 12 May 2016 01:48:48 +0430 Subject: [PATCH 9/9] upgrade to qlobber 0.7.0 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 666fe5f..456a8ed 100644 --- a/package.json +++ b/package.json @@ -91,7 +91,7 @@ "mqtt-connection": "^2.1.1", "msgpack5": "^3.3.0", "pbkdf2-password": "^1.1.0", - "qlobber": "~0.6.0", + "qlobber": "^0.7.0", "retimer": "^1.0.1", "shortid": "^2.2.4", "st": "~1.1.0",