Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Switching to Qlobber.Dedup #449

Closed
wants to merge 12 commits into from
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ addons:
packages:
- g++-4.8
node_js:
- 0.10
- 0.12
- 4
- 5
Expand Down
8 changes: 6 additions & 2 deletions lib/options.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ function modernize(legacy) {
"maxInflightMessages",
"stats",
"publishNewClient",
"publishClientDisconnect"
"publishClientDisconnect",
"publishSubscriptions"
];

// copy all conserved options
Expand Down Expand Up @@ -250,7 +251,8 @@ function validate(opts, validationOptions) {
'maxInflightMessages': { type: 'integer' },
'stats': { type: 'boolean' },
'publishNewClient': { type: 'boolean' },
'publishClientDisconnect': { type: 'boolean' }
'publishClientDisconnect': { type: 'boolean' },
'publishSubscriptions': { type: 'boolean' }
}
});

Expand Down Expand Up @@ -333,6 +335,7 @@ function defaultsLegacy() {
stats: false,
publishNewClient: true,
publishClientDisconnect: true,
publishSubscriptions: true,
maxInflightMessages: 1024,
logger: {
name: "mosca",
Expand Down Expand Up @@ -368,6 +371,7 @@ function defaultsModern() {
stats: false,
publishNewClient: true,
publishClientDisconnect: true,
publishSubscriptions: true,
maxInflightMessages: 1024,
logger: {
name: "mosca",
Expand Down
7 changes: 2 additions & 5 deletions lib/persistence/levelup.js
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ LevelUpPersistence.prototype.lookupRetained = function(pattern, cb) {
});

stream.on("data", function(data) {
if (matcher.match(data.key).length > 0) {
if (matcher.match(data.key).size > 0) {
matched.push(data.value);
}
});
Expand All @@ -176,9 +176,7 @@ LevelUpPersistence.prototype.storeSubscriptions = function(client, done) {
qos: subscriptions[key].qos
};
var levelKey = util.format("%s:%s", key, client.id);
if (that._subMatcher.match(key).indexOf(levelKey) < 0) {
that._subMatcher.add(key, levelKey);
}
that._subMatcher.add(key, levelKey);
that._subscriptions.put(levelKey, sub);
});
} else if (done) {
Expand Down Expand Up @@ -232,7 +230,6 @@ LevelUpPersistence.prototype.lookupSubscriptions = function(client, done) {
LevelUpPersistence.prototype.storeOfflinePacket = function(packet, done) {
var that = this;
var subs = this._subMatcher.match(packet.topic);

async.each(subs, function(key, cb) {
that._subscriptions.get(key, function(err, sub) {
if (err) {
Expand Down
2 changes: 1 addition & 1 deletion lib/persistence/matcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.
*/

var Qlobber = require("qlobber").Qlobber;
var Qlobber = require("qlobber").QlobberDedup;
var util = require("util");

function Matcher() {
Expand Down
2 changes: 1 addition & 1 deletion lib/persistence/mongo.js
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ MongoPersistence.prototype.lookupRetained = function(pattern, cb) {
});

stream.on("data", function(data) {
if (matcher.match(data.topic).length > 0) {
if (matcher.match(data.topic).size > 0) {
data.payload = data.payload.buffer;
matched.push(data);
}
Expand Down
46 changes: 18 additions & 28 deletions lib/persistence/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,7 @@ function RedisPersistence(options, callback) {
}

Object.keys(subs).forEach(function(sub) {
if (that._subMatcher.match(sub).indexOf(id) < 0) {
that._subMatcher.add(sub, id);
}
that._subMatcher.add(sub, id);
});

if( unsubs ) {
Expand Down Expand Up @@ -229,7 +227,7 @@ RedisPersistence.prototype.lookupRetained = function(pattern, done) {
this._client.hkeys("retained", function(err, topics) {
topics.sort();
topics = topics.filter(function(topic) {
return matcher.match(topic).length > 0;
return matcher.match(topic).size > 0;
});

async.each(topics, match, function(err) {
Expand Down Expand Up @@ -260,38 +258,30 @@ RedisPersistence.prototype.storeSubscriptions = function(client, cb) {
}
});

var op = this._client.multi()
.get(clientSubKey, function(err, currentSubs){
if( err || !currentSubs ) {
return;
}
currentSubs = JSON.parse( currentSubs );
var unsubs = Object.keys(currentSubs).filter(function(topic){
this._client.get(clientSubKey, function(err, currentSubs){
if( !err && currentSubs ) {
currentSubs = JSON.parse(currentSubs);
var unsubs = Object.keys(currentSubs).filter(function (topic) {
return !subscriptions[topic];
});
unsubs.forEach(function(topic) {
unsubs.forEach(function (topic) {
that._subMatcher.remove(topic, client.id);
});
that._client.publish(that.options.channel, JSON.stringify({
}
var op = that._client.multi()
.set(clientSubKey, JSON.stringify(subscriptions))
.publish(that.options.channel, JSON.stringify({
key: clientSubKey,
unsubs: unsubs,
process: that._id
}));
})
.set(clientSubKey, JSON.stringify(subscriptions))
.publish(this.options.channel, JSON.stringify({
key: clientSubKey,
process: this._id
}))
.pexpire(clientSubKey, this.options.ttl.subscriptions);

Object.keys(subscriptions).forEach(function(e) {
if (that._subMatcher.match(e).indexOf(client.id) < 0) {
}))
.pexpire(clientSubKey, that.options.ttl.subscriptions);

Object.keys(subscriptions).forEach(function(e) {
that._subMatcher.add(e, client.id);
}
});
});

op.exec(cb);
op.exec(cb);
});
};

RedisPersistence.prototype._cleanClient = function(client, done) {
Expand Down
35 changes: 19 additions & 16 deletions lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ var nop = function() {};
* - `stats`, publish the stats every 10s (default false).
* - `publishNewClient`, publish message to topic "$SYS/{broker-id}/new/clients" when new client connects.
* - `publishClientDisconnect`, publish message to topic "$SYS/{broker-id}/disconnect/clients" when a client disconnects.
* - `publishSubscriptions`, publish message to topic "$SYS/{broker-id}/new/(un)subscribes" when a client subscribes/unsubscribes.
*
* Interface may contain following properties:
* - `type`, name of a build-in type or a custom type factory
Expand Down Expand Up @@ -261,25 +262,27 @@ function Server(opts, callback) {
);
});

that.on("subscribed", function(topic, client) {
that.publish({
topic: "$SYS/" + that.id + "/new/subscribes",
payload: JSON.stringify({
clientId: client.id,
topic: topic
})
if(that.modernOpts.publishSubscriptions) {
that.on("subscribed", function(topic, client) {
that.publish({
topic: "$SYS/" + that.id + "/new/subscribes",
payload: JSON.stringify({
clientId: client.id,
topic: topic
})
});
});
});

that.on("unsubscribed", function(topic, client) {
that.publish({
topic: "$SYS/" + that.id + "/new/unsubscribes",
payload: JSON.stringify({
clientId: client.id,
topic: topic
})
that.on("unsubscribed", function(topic, client) {
that.publish({
topic: "$SYS/" + that.id + "/new/unsubscribes",
payload: JSON.stringify({
clientId: client.id,
topic: topic
})
});
});
});
}

that.on("clientDisconnected", function(client) {
if(that.modernOpts.publishClientDisconnect) {
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
},
"dependencies": {
"ascoltatori": "^2.0.0",
"async": "~1.5.2",
"async": "^2.0.0-rc.3",
"brfs": "~1.4.2",
"bunyan": "^1.5.1",
"clone": "^1.0.2",
Expand All @@ -91,7 +91,7 @@
"mqtt-connection": "^2.1.1",
"msgpack5": "^3.3.0",
"pbkdf2-password": "^1.1.0",
"qlobber": "~0.6.0",
"qlobber": "^0.7.0",
"retimer": "^1.0.1",
"shortid": "^2.2.4",
"st": "~1.1.0",
Expand Down