Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Commit

Permalink
Merge pull request #57 from chriswiggins/master
Browse files Browse the repository at this point in the history
Fix closing issues and clientID specification
  • Loading branch information
mcollina committed Sep 18, 2013
2 parents f726ad1 + 7098389 commit a0a9c3d
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 36 deletions.
78 changes: 42 additions & 36 deletions lib/client.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"use strict";

var async = require("async");

var REGEXP = /(([^/])\/+$)|(([^/]))|(\/+(\/))/g;
var rewriteTopic = function(topic) {
return topic.replace(REGEXP, "$2$4$6");
Expand Down Expand Up @@ -69,7 +70,7 @@ Client.prototype._setup = function() {
async.parallel(packet.unsubscriptions.map(that.unsubscribeMapTo.bind(that)), function(err) {
if (err) {
that.logger.warn(err);
that.unsubAndClose();
that.close();
return;
}
client.unsuback({
Expand All @@ -80,12 +81,12 @@ Client.prototype._setup = function() {

client.on("disconnect", function() {
that.logger.debug("disconnect requested");
that.unsubAndClose();
that.close();
});

client.on("error", function(err) {
that.logger.warn(err);
that.unsubAndClose();
that.close();
});

client.on("close", function() {
Expand Down Expand Up @@ -127,7 +128,7 @@ Client.prototype.actualSend = function(packet, retry) {
var packetToLog = { packet: packet, retry: retry };

if (that._closed) {
this.logger.warn(packetToLog, "tryint to send a packet to a disconnected client");
this.logger.warn(packetToLog, "trying to send a packet to a disconnected client");
} else if (retry === 10) {
this.logger.info(packetToLog, "could not deliver the message");
this.connection.emit("error", new Error("client not responding to acks"));
Expand Down Expand Up @@ -208,19 +209,6 @@ Client.prototype.unsubscribeMapTo = function(topic) {
};
};

/**
* Unsubscribes from everything and closing down.
*
* @api private
*/
Client.prototype.unsubAndClose = function(cb) {
var that = this;
this._closing = true;
async.parallel(Object.keys(that.subscriptions).map(that.unsubscribeMapTo.bind(that)), function() {
that.close(cb);
});
};

/**
* Handle a connect packet, doing authentication.
*
Expand Down Expand Up @@ -266,15 +254,26 @@ Client.prototype.handleConnect = function(packet) {
}

that.clean = packet.clean;

logger.info("client connected");

that.setUpTimer();
client.connack({
returnCode: 0
});
that.server.restoreClient(that);
that.server.emit("clientConnected", that);

var completeConnection = function(){
logger.info("client connected");

that.setUpTimer();

client.connack({
returnCode: 0
});

that.server.restoreClient(that);
that.server.emit("clientConnected", that);
};

if (that.id in that.server.clients){
that.server.clients[that.id].close(completeConnection.bind(that));
}else{
completeConnection();
}

});
};

Expand Down Expand Up @@ -374,7 +373,7 @@ Client.prototype.handleSubscribe = function(packet) {
};
}), function(err) {
if (err) {
that.unsubAndClose();
that.close();
return;
}

Expand All @@ -401,7 +400,7 @@ Client.prototype.handleAuthorizePublish = function(err, success, packet) {
var that = this;

if (err || !success) {
that.unsubAndClose();
that.close();
return;
}

Expand Down Expand Up @@ -443,7 +442,7 @@ Client.prototype.onClose = function() {

this.server.emit("clientDisconnecting", that);

this.unsubAndClose(function() {
this.close(function() {
if (that.will) {
logger.info({ willTopic: that.will.topic }, "delivering last will");
that.server.ascoltatore.publish(
Expand Down Expand Up @@ -488,13 +487,20 @@ Client.prototype.close = function(callback) {
}
};

if (this._closed) {
cleanup();
} else {
this.server.persistClient(this);
this.connection.stream.on("end", cleanup);
this.connection.stream.end();
}

that._closing = true;
async.parallel(Object.keys(that.subscriptions).map(that.unsubscribeMapTo.bind(that)), function() {
if (that._closed) {
cleanup();
} else {
that.server.persistClient(that);
that.connection.stream.on("end", cleanup);
that.connection.stream.end();
}
});



};

module.exports = Client;
23 changes: 23 additions & 0 deletions test/abstract_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,29 @@ module.exports = function(moscaSettings, createConnection) {
});
});
});

it("should close the first client if a second client with the same clientId connects", function(done){
var d = donner(2, done);
var opts = buildOpts(), clientId = "123456789";
opts.clientId = clientId;
async.waterfall([
function(cb){
buildAndConnect(d, opts, function(client1){
cb(null, client1);
});
},
function(client1, cb){
buildAndConnect(d, opts, function(client2){
if(settings.secure === undefined){
expect(client1.stream.destroyed).to.eql(true);
}else{
expect(client1.stream._destroyed).to.eql(true);
}
client2.disconnect();
});
}
]);
});

it("should close the connection after the keepalive interval", function(done) {
buildClient(done, function(client) {
Expand Down

0 comments on commit a0a9c3d

Please sign in to comment.