From a45a3f5d83a94d15b2757313c974740672fe62f3 Mon Sep 17 00:00:00 2001 From: Morgan Cheng Date: Thu, 7 Aug 2014 18:58:26 +0800 Subject: [PATCH 1/3] publish subscribes and connected-client --- lib/server.js | 35 ++++++++++++++ test/abstract_server.js | 105 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 140 insertions(+) diff --git a/lib/server.js b/lib/server.js index 95e02ac..64b3908 100644 --- a/lib/server.js +++ b/lib/server.js @@ -92,6 +92,7 @@ var nop = function() {}; * - `bundle`, serve the bundled mqtt client * - `static`, serve a directory * - `stats`, publish the stats every 10s (default false). + * - `publishNewClient`, publish message to topic "$SYS/broker-id/new/clients" when new client connects. * * Events: * - `clientConnected`, when a client is connected; @@ -241,10 +242,44 @@ function Server(opts, callback) { } ]); + var SysNewTopic = "$SYS/broker-id/new/clients"; that.on("clientConnected", function(client) { + if(that.opts.publishNewClient) { + that.publish({ + topic: SysNewTopic, + payload: JSON.stringify({'serverId': that.id, 'clientId': client.id}) + }); + } + this.clients[client.id] = client; }); + that.ascoltatore.subscribe( + SysNewTopic, + function(topic, payload) { + var serverId, clientId; + + try { + payload = JSON.parse(payload); + serverId = payload.serverId, + clientId = payload.clientId; + } catch(err) { + that.logger.info('json parse err:', err); + } + + if(that.clients[clientId] && serverId !== that.id) { + that.clients[clientId].close(); + } + } + ); + + that.on("subscribed", function(topic, client) { + that.publish({ + topic: "$SYS/broker-id/new/subscribes", + payload: JSON.stringify({'serverId': that.id, 'clientId': client.id}) + }); + }); + that.on("clientDisconnected", function(client) { delete this.clients[client.id]; }); diff --git a/test/abstract_server.js b/test/abstract_server.js index 223eccd..689d78a 100644 --- a/test/abstract_server.js +++ b/test/abstract_server.js @@ -58,6 +58,111 @@ module.exports = function(moscaSettings, createConnection) { }); } + it("should publish connected client to '$SYS/broker-id/new/clients'", function(done) { + settings = moscaSettings(); + settings.publishNewClient = true; + + secondInstance = new mosca.Server(settings, function(err, server) { + buildAndConnect(done, function(client) { + server.on("published", function(packet, serverClient) { + expect(packet.topic).to.be.equal("$SYS/broker-id/new/clients"); + client.disconnect(); + }); + }); + }); + }); + + it("should publish each subscribe to '$SYS/broker-id/new/subscribes'", function(done) { + var d = donner(2, done); + buildAndConnect(d, function(client) { + var messageId = Math.floor(65535 * Math.random()); + var subscriptions = [{ + topic: "hello", + qos: 1 + } + ]; + + client.on("suback", function(packet) { + client.disconnect(); + }); + + client.subscribe({ + subscriptions: subscriptions, + messageId: messageId + }); + }); + + instance.on("published", function(packet, client) { + expect(packet.topic).to.be.equal("$SYS/broker-id/new/subscribes"); + d(); + }); + }); + + describe("multi mosca servers", function() { + var serverOne = null, + serverTwo = null, + clientOpts = buildOpts(); + + afterEach(function(done) { + var instances = []; + instances.push(serverOne); + instances.push(serverTwo); + + async.each(instances, function(instance, cb) { + if (instance) { + instance.close(cb); + } + }, done); + }); + + it("should disconnect client connected to another broker", function(done) { + var settingsOne = moscaSettings(), + settingsTwo = moscaSettings(); + + clientOpts.clientId = '123456'; + clientOpts.keepalive = 0; + + settingsOne.publishNewClient = settingsTwo.publishNewClient = true; + settingsOne.backend = settingsTwo.backend = { + type: 'redis' + }; + + async.series([ + function(cb) { + serverOne = new mosca.Server(settingsOne, function(err, server) { + serverOne.on('clientDisconnected', function(serverClient) { + expect(serverClient).not.to.be.equal(undefined); + done(); + }); + cb(); + }); + }, + function(cb) { + serverTwo = new mosca.Server(settingsTwo, function(err, server) { + cb(); + }); + }, + function(cb) { + var clientOne = createConnection(settingsOne.port, settingsOne.host); + clientOne.connect(clientOpts); + + clientOne.on("connected", function() { + cb(); + }); + }, + function(cb) { + var clientTwo = createConnection(settingsTwo.port, settingsTwo.host); + clientTwo.connect(clientOpts); + + clientTwo.on("connected", function() { + cb(); + }); + } + ]); + }); + + }); + it("should pass itself in the callback", function(done) { secondInstance = new mosca.Server(moscaSettings(), function(err, server) { expect(server === secondInstance).to.be.true; From bb3fe9c890b2b9cebb9f402e5a0df4ec058ff770 Mon Sep 17 00:00:00 2001 From: Morgan Cheng Date: Fri, 8 Aug 2014 15:19:52 +0800 Subject: [PATCH 2/3] {broker-id} turns to be variable in topic name --- lib/server.js | 22 ++++++--------- test/abstract_server.js | 62 +++++++++++++++++++++++++++++++---------- 2 files changed, 56 insertions(+), 28 deletions(-) diff --git a/lib/server.js b/lib/server.js index 64b3908..531a5f1 100644 --- a/lib/server.js +++ b/lib/server.js @@ -92,7 +92,7 @@ var nop = function() {}; * - `bundle`, serve the bundled mqtt client * - `static`, serve a directory * - `stats`, publish the stats every 10s (default false). - * - `publishNewClient`, publish message to topic "$SYS/broker-id/new/clients" when new client connects. + * - `publishNewClient`, publish message to topic "$SYS/{broker-id}/new/clients" when new client connects. * * Events: * - `clientConnected`, when a client is connected; @@ -242,12 +242,11 @@ function Server(opts, callback) { } ]); - var SysNewTopic = "$SYS/broker-id/new/clients"; that.on("clientConnected", function(client) { if(that.opts.publishNewClient) { that.publish({ - topic: SysNewTopic, - payload: JSON.stringify({'serverId': that.id, 'clientId': client.id}) + topic: "$SYS/" + that.id + "/new/clients", + payload: client.id }); } @@ -255,17 +254,12 @@ function Server(opts, callback) { }); that.ascoltatore.subscribe( - SysNewTopic, + "$SYS/+/new/clients", function(topic, payload) { var serverId, clientId; - try { - payload = JSON.parse(payload); - serverId = payload.serverId, - clientId = payload.clientId; - } catch(err) { - that.logger.info('json parse err:', err); - } + serverId = topic.split('/')[1]; + clientId = payload; if(that.clients[clientId] && serverId !== that.id) { that.clients[clientId].close(); @@ -275,8 +269,8 @@ function Server(opts, callback) { that.on("subscribed", function(topic, client) { that.publish({ - topic: "$SYS/broker-id/new/subscribes", - payload: JSON.stringify({'serverId': that.id, 'clientId': client.id}) + topic: "$SYS/" + that.id + "/new/subscribes", + payload: client.id }); }); diff --git a/test/abstract_server.js b/test/abstract_server.js index 689d78a..4839f95 100644 --- a/test/abstract_server.js +++ b/test/abstract_server.js @@ -58,22 +58,52 @@ module.exports = function(moscaSettings, createConnection) { }); } - it("should publish connected client to '$SYS/broker-id/new/clients'", function(done) { + it("should publish connected client to '$SYS/{broker-id}/new/clients'", function(done) { + var connectedClient = null, + publishedClientId = null; + settings = moscaSettings(); settings.publishNewClient = true; + function verify() { + if (connectedClient && publishedClientId) { + expect(publishedClientId).to.be.equal(connectedClient.opts.clientId); + connectedClient.disconnect(); + } + } + secondInstance = new mosca.Server(settings, function(err, server) { + server.on("published", function(packet, clientId) { + expect(packet.topic).to.be.equal("$SYS/" + secondInstance.id + "/new/clients"); + publishedClientId = packet.payload; + verify(); + }); + buildAndConnect(done, function(client) { - server.on("published", function(packet, serverClient) { - expect(packet.topic).to.be.equal("$SYS/broker-id/new/clients"); - client.disconnect(); - }); + connectedClient = client; + verify(); }); }); }); - it("should publish each subscribe to '$SYS/broker-id/new/subscribes'", function(done) { - var d = donner(2, done); + it("should publish each subscribe to '$SYS/{broker-id}/new/subscribes'", function(done) { + var d = donner(2, done), + connectedClient = null, + publishedClientId = null; + + function verify() { + if (connectedClient && publishedClientId) { + expect(publishedClientId).to.be.equal(connectedClient.opts.clientId); + d(); + } + } + + instance.on("published", function(packet) { + expect(packet.topic).to.be.equal("$SYS/" + instance.id + "/new/subscribes"); + publishedClientId = packet.payload; + verify(); + }); + buildAndConnect(d, function(client) { var messageId = Math.floor(65535 * Math.random()); var subscriptions = [{ @@ -82,6 +112,8 @@ module.exports = function(moscaSettings, createConnection) { } ]; + connectedClient = client; + client.on("suback", function(packet) { client.disconnect(); }); @@ -92,10 +124,6 @@ module.exports = function(moscaSettings, createConnection) { }); }); - instance.on("published", function(packet, client) { - expect(packet.topic).to.be.equal("$SYS/broker-id/new/subscribes"); - d(); - }); }); describe("multi mosca servers", function() { @@ -111,6 +139,8 @@ module.exports = function(moscaSettings, createConnection) { async.each(instances, function(instance, cb) { if (instance) { instance.close(cb); + } else { + cb(); } }, done); }); @@ -119,13 +149,17 @@ module.exports = function(moscaSettings, createConnection) { var settingsOne = moscaSettings(), settingsTwo = moscaSettings(); + if (!settings.backend || !settings.backend.type) { + // only need to validate cases with backend + return done(); + } + clientOpts.clientId = '123456'; clientOpts.keepalive = 0; settingsOne.publishNewClient = settingsTwo.publishNewClient = true; - settingsOne.backend = settingsTwo.backend = { - type: 'redis' - }; + + settingsOne.backend = settingsTwo.backend = settings.backend; async.series([ function(cb) { From bf01dd4ca8b2177843a8dca9f7dd6c5658b0bfb5 Mon Sep 17 00:00:00 2001 From: Morgan Cheng Date: Fri, 8 Aug 2014 17:24:32 +0800 Subject: [PATCH 3/3] enable server.opts.publishNewClient by default --- lib/server.js | 1 + test/abstract_server.js | 1 + test/server.js | 1 + test/server_mongo.js | 1 + test/server_redis.js | 1 + 5 files changed, 5 insertions(+) diff --git a/lib/server.js b/lib/server.js index 531a5f1..969ecdd 100644 --- a/lib/server.js +++ b/lib/server.js @@ -49,6 +49,7 @@ var defaults = { wildcardSome: '#' }, stats: true, + publishNewClient: true, maxInflightMessages: 1024, logger: { name: "mosca", diff --git a/test/abstract_server.js b/test/abstract_server.js index 4839f95..3c450b6 100644 --- a/test/abstract_server.js +++ b/test/abstract_server.js @@ -9,6 +9,7 @@ module.exports = function(moscaSettings, createConnection) { beforeEach(function(done) { settings = moscaSettings(); + settings.publishNewClient = false; instance = new mosca.Server(settings, done); this.instance = instance; this.settings = settings; diff --git a/test/server.js b/test/server.js index 8c84388..2656d5a 100644 --- a/test/server.js +++ b/test/server.js @@ -8,6 +8,7 @@ var moscaSettings = function() { return { port: nextPort(), stats: false, + publishNewClient: false, persistence: { factory: mosca.persistence.Memory }, diff --git a/test/server_mongo.js b/test/server_mongo.js index 7569d9b..bd0805b 100644 --- a/test/server_mongo.js +++ b/test/server_mongo.js @@ -31,6 +31,7 @@ describe("mosca.Server with mongo persistence", function() { return { port: nextPort(), stats: false, + publishNewClient: false, logger: { childOf: globalLogger, level: 60 diff --git a/test/server_redis.js b/test/server_redis.js index b4d4c8a..cfb69c2 100644 --- a/test/server_redis.js +++ b/test/server_redis.js @@ -19,6 +19,7 @@ describe("mosca.Server with redis persistence", function() { return { port: nextPort(), stats: false, + publishNewClient: false, logger: { childOf: globalLogger, level: 60