diff --git a/lib/client.js b/lib/client.js index 28ff071..64d5f37 100644 --- a/lib/client.js +++ b/lib/client.js @@ -1,6 +1,7 @@ "use strict"; var async = require("async"); + var REGEXP = /(([^/])\/+$)|(([^/]))|(\/+(\/))/g; var rewriteTopic = function(topic) { return topic.replace(REGEXP, "$2$4$6"); @@ -69,7 +70,7 @@ Client.prototype._setup = function() { async.parallel(packet.unsubscriptions.map(that.unsubscribeMapTo.bind(that)), function(err) { if (err) { that.logger.warn(err); - that.unsubAndClose(); + that.close(); return; } client.unsuback({ @@ -80,12 +81,12 @@ Client.prototype._setup = function() { client.on("disconnect", function() { that.logger.debug("disconnect requested"); - that.unsubAndClose(); + that.close(); }); client.on("error", function(err) { that.logger.warn(err); - that.unsubAndClose(); + that.close(); }); client.on("close", function() { @@ -127,7 +128,7 @@ Client.prototype.actualSend = function(packet, retry) { var packetToLog = { packet: packet, retry: retry }; if (that._closed) { - this.logger.warn(packetToLog, "tryint to send a packet to a disconnected client"); + this.logger.warn(packetToLog, "trying to send a packet to a disconnected client"); } else if (retry === 10) { this.logger.info(packetToLog, "could not deliver the message"); this.connection.emit("error", new Error("client not responding to acks")); @@ -208,19 +209,6 @@ Client.prototype.unsubscribeMapTo = function(topic) { }; }; -/** - * Unsubscribes from everything and closing down. - * - * @api private - */ -Client.prototype.unsubAndClose = function(cb) { - var that = this; - this._closing = true; - async.parallel(Object.keys(that.subscriptions).map(that.unsubscribeMapTo.bind(that)), function() { - that.close(cb); - }); -}; - /** * Handle a connect packet, doing authentication. * @@ -266,15 +254,26 @@ Client.prototype.handleConnect = function(packet) { } that.clean = packet.clean; - - logger.info("client connected"); - - that.setUpTimer(); - client.connack({ - returnCode: 0 - }); - that.server.restoreClient(that); - that.server.emit("clientConnected", that); + + var completeConnection = function(){ + logger.info("client connected"); + + that.setUpTimer(); + + client.connack({ + returnCode: 0 + }); + + that.server.restoreClient(that); + that.server.emit("clientConnected", that); + }; + + if (that.id in that.server.clients){ + that.server.clients[that.id].close(completeConnection.bind(that)); + }else{ + completeConnection(); + } + }); }; @@ -374,7 +373,7 @@ Client.prototype.handleSubscribe = function(packet) { }; }), function(err) { if (err) { - that.unsubAndClose(); + that.close(); return; } @@ -401,7 +400,7 @@ Client.prototype.handleAuthorizePublish = function(err, success, packet) { var that = this; if (err || !success) { - that.unsubAndClose(); + that.close(); return; } @@ -443,7 +442,7 @@ Client.prototype.onClose = function() { this.server.emit("clientDisconnecting", that); - this.unsubAndClose(function() { + this.close(function() { if (that.will) { logger.info({ willTopic: that.will.topic }, "delivering last will"); that.server.ascoltatore.publish( @@ -488,13 +487,20 @@ Client.prototype.close = function(callback) { } }; - if (this._closed) { - cleanup(); - } else { - this.server.persistClient(this); - this.connection.stream.on("end", cleanup); - this.connection.stream.end(); - } + + that._closing = true; + async.parallel(Object.keys(that.subscriptions).map(that.unsubscribeMapTo.bind(that)), function() { + if (that._closed) { + cleanup(); + } else { + that.server.persistClient(that); + that.connection.stream.on("end", cleanup); + that.connection.stream.end(); + } + }); + + + }; module.exports = Client; diff --git a/test/abstract_server.js b/test/abstract_server.js index f364f12..61bc5c9 100644 --- a/test/abstract_server.js +++ b/test/abstract_server.js @@ -118,6 +118,29 @@ module.exports = function(moscaSettings, createConnection) { }); }); }); + + it("should close the first client if a second client with the same clientId connects", function(done){ + var d = donner(2, done); + var opts = buildOpts(), clientId = "123456789"; + opts.clientId = clientId; + async.waterfall([ + function(cb){ + buildAndConnect(d, opts, function(client1){ + cb(null, client1); + }); + }, + function(client1, cb){ + buildAndConnect(d, opts, function(client2){ + if(settings.secure === undefined){ + expect(client1.stream.destroyed).to.eql(true); + }else{ + expect(client1.stream._destroyed).to.eql(true); + } + client2.disconnect(); + }); + } + ]); + }); it("should close the connection after the keepalive interval", function(done) { buildClient(done, function(client) {