diff --git a/lib/server.js b/lib/server.js index 95e02ac..969ecdd 100644 --- a/lib/server.js +++ b/lib/server.js @@ -49,6 +49,7 @@ var defaults = { wildcardSome: '#' }, stats: true, + publishNewClient: true, maxInflightMessages: 1024, logger: { name: "mosca", @@ -92,6 +93,7 @@ var nop = function() {}; * - `bundle`, serve the bundled mqtt client * - `static`, serve a directory * - `stats`, publish the stats every 10s (default false). + * - `publishNewClient`, publish message to topic "$SYS/{broker-id}/new/clients" when new client connects. * * Events: * - `clientConnected`, when a client is connected; @@ -242,9 +244,37 @@ function Server(opts, callback) { ]); that.on("clientConnected", function(client) { + if(that.opts.publishNewClient) { + that.publish({ + topic: "$SYS/" + that.id + "/new/clients", + payload: client.id + }); + } + this.clients[client.id] = client; }); + that.ascoltatore.subscribe( + "$SYS/+/new/clients", + function(topic, payload) { + var serverId, clientId; + + serverId = topic.split('/')[1]; + clientId = payload; + + if(that.clients[clientId] && serverId !== that.id) { + that.clients[clientId].close(); + } + } + ); + + that.on("subscribed", function(topic, client) { + that.publish({ + topic: "$SYS/" + that.id + "/new/subscribes", + payload: client.id + }); + }); + that.on("clientDisconnected", function(client) { delete this.clients[client.id]; }); diff --git a/test/abstract_server.js b/test/abstract_server.js index 223eccd..3c450b6 100644 --- a/test/abstract_server.js +++ b/test/abstract_server.js @@ -9,6 +9,7 @@ module.exports = function(moscaSettings, createConnection) { beforeEach(function(done) { settings = moscaSettings(); + settings.publishNewClient = false; instance = new mosca.Server(settings, done); this.instance = instance; this.settings = settings; @@ -58,6 +59,145 @@ module.exports = function(moscaSettings, createConnection) { }); } + it("should publish connected client to '$SYS/{broker-id}/new/clients'", function(done) { + var connectedClient = null, + publishedClientId = null; + + settings = moscaSettings(); + settings.publishNewClient = true; + + function verify() { + if (connectedClient && publishedClientId) { + expect(publishedClientId).to.be.equal(connectedClient.opts.clientId); + connectedClient.disconnect(); + } + } + + secondInstance = new mosca.Server(settings, function(err, server) { + server.on("published", function(packet, clientId) { + expect(packet.topic).to.be.equal("$SYS/" + secondInstance.id + "/new/clients"); + publishedClientId = packet.payload; + verify(); + }); + + buildAndConnect(done, function(client) { + connectedClient = client; + verify(); + }); + }); + }); + + it("should publish each subscribe to '$SYS/{broker-id}/new/subscribes'", function(done) { + var d = donner(2, done), + connectedClient = null, + publishedClientId = null; + + function verify() { + if (connectedClient && publishedClientId) { + expect(publishedClientId).to.be.equal(connectedClient.opts.clientId); + d(); + } + } + + instance.on("published", function(packet) { + expect(packet.topic).to.be.equal("$SYS/" + instance.id + "/new/subscribes"); + publishedClientId = packet.payload; + verify(); + }); + + buildAndConnect(d, function(client) { + var messageId = Math.floor(65535 * Math.random()); + var subscriptions = [{ + topic: "hello", + qos: 1 + } + ]; + + connectedClient = client; + + client.on("suback", function(packet) { + client.disconnect(); + }); + + client.subscribe({ + subscriptions: subscriptions, + messageId: messageId + }); + }); + + }); + + describe("multi mosca servers", function() { + var serverOne = null, + serverTwo = null, + clientOpts = buildOpts(); + + afterEach(function(done) { + var instances = []; + instances.push(serverOne); + instances.push(serverTwo); + + async.each(instances, function(instance, cb) { + if (instance) { + instance.close(cb); + } else { + cb(); + } + }, done); + }); + + it("should disconnect client connected to another broker", function(done) { + var settingsOne = moscaSettings(), + settingsTwo = moscaSettings(); + + if (!settings.backend || !settings.backend.type) { + // only need to validate cases with backend + return done(); + } + + clientOpts.clientId = '123456'; + clientOpts.keepalive = 0; + + settingsOne.publishNewClient = settingsTwo.publishNewClient = true; + + settingsOne.backend = settingsTwo.backend = settings.backend; + + async.series([ + function(cb) { + serverOne = new mosca.Server(settingsOne, function(err, server) { + serverOne.on('clientDisconnected', function(serverClient) { + expect(serverClient).not.to.be.equal(undefined); + done(); + }); + cb(); + }); + }, + function(cb) { + serverTwo = new mosca.Server(settingsTwo, function(err, server) { + cb(); + }); + }, + function(cb) { + var clientOne = createConnection(settingsOne.port, settingsOne.host); + clientOne.connect(clientOpts); + + clientOne.on("connected", function() { + cb(); + }); + }, + function(cb) { + var clientTwo = createConnection(settingsTwo.port, settingsTwo.host); + clientTwo.connect(clientOpts); + + clientTwo.on("connected", function() { + cb(); + }); + } + ]); + }); + + }); + it("should pass itself in the callback", function(done) { secondInstance = new mosca.Server(moscaSettings(), function(err, server) { expect(server === secondInstance).to.be.true; diff --git a/test/server.js b/test/server.js index 8c84388..2656d5a 100644 --- a/test/server.js +++ b/test/server.js @@ -8,6 +8,7 @@ var moscaSettings = function() { return { port: nextPort(), stats: false, + publishNewClient: false, persistence: { factory: mosca.persistence.Memory }, diff --git a/test/server_mongo.js b/test/server_mongo.js index 7569d9b..bd0805b 100644 --- a/test/server_mongo.js +++ b/test/server_mongo.js @@ -31,6 +31,7 @@ describe("mosca.Server with mongo persistence", function() { return { port: nextPort(), stats: false, + publishNewClient: false, logger: { childOf: globalLogger, level: 60 diff --git a/test/server_redis.js b/test/server_redis.js index b4d4c8a..cfb69c2 100644 --- a/test/server_redis.js +++ b/test/server_redis.js @@ -19,6 +19,7 @@ describe("mosca.Server with redis persistence", function() { return { port: nextPort(), stats: false, + publishNewClient: false, logger: { childOf: globalLogger, level: 60