Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Commit

Permalink
No more clientConnected < 0.
Browse files Browse the repository at this point in the history
Closes #135.
  • Loading branch information
mcollina committed Jun 6, 2014
1 parent a13c0cc commit 94dec7b
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 59 deletions.
120 changes: 61 additions & 59 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,61 +68,78 @@ Client.prototype._setup = function() {

this._buildForward();

client.on("connect", function(packet) {
that.handleConnect(packet);
});
client.on("error", nop);

client.on("puback", function(packet) {
function completeConnection() {
that.setUpTimer();
that.handlePuback(packet);
});

client.on("pingreq", function() {
that.logger.debug("pingreq");
that.setUpTimer();
that.connection.pingresp();
});
that.server.restoreClientSubscriptions(that, function() {
client.connack({
returnCode: 0
});

client.on("subscribe", function(packet) {
that.setUpTimer();
that.handleSubscribe(packet);
});
that.logger.info("client connected");
that.server.emit("clientConnected", that);
that.server.forwardOfflinePackets(that);
});

client.on("publish", function(packet) {
that.setUpTimer();
packet.topic = rewriteTopic(packet.topic);
that.server.authorizePublish(that, packet.topic, packet.payload, function(err, success) {
that.handleAuthorizePublish(err, success, packet);
client.on("puback", function(packet) {
that.setUpTimer();
that.handlePuback(packet);
});
});

client.on("unsubscribe", function(packet) {
that.setUpTimer();
that.logger.info({ packet: packet }, "unsubscribe received");
async.parallel(packet.unsubscriptions.map(that.unsubscribeMapTo.bind(that)), function(err) {
if (err) {
that.logger.warn(err);
that.close();
return;
}
client.unsuback({
messageId: packet.messageId
client.on("pingreq", function() {
that.logger.debug("pingreq");
that.setUpTimer();
that.connection.pingresp();
});

client.on("subscribe", function(packet) {
that.setUpTimer();
that.handleSubscribe(packet);
});

client.on("publish", function(packet) {
that.setUpTimer();
packet.topic = rewriteTopic(packet.topic);
that.server.authorizePublish(that, packet.topic, packet.payload, function(err, success) {
that.handleAuthorizePublish(err, success, packet);
});
});
});

client.on("disconnect", function() {
that.logger.debug("disconnect requested");
that.close();
});
client.on("unsubscribe", function(packet) {
that.setUpTimer();
that.logger.info({ packet: packet }, "unsubscribe received");
async.parallel(packet.unsubscriptions.map(that.unsubscribeMapTo.bind(that)), function(err) {
if (err) {
that.logger.warn(err);
that.close();
return;
}
client.unsuback({
messageId: packet.messageId
});
});
});

client.on("error", function(err) {
that.logger.warn(err);
that.onNonDisconnectClose();
});
client.on("disconnect", function() {
that.logger.debug("disconnect requested");
that.close();
});

client.on("close", function() {
that.onNonDisconnectClose();
client.removeListener("error", nop);
client.on("error", function(err) {
that.logger.warn(err);
that.onNonDisconnectClose();
});

client.on("close", function() {
that.onNonDisconnectClose();
});
}

client.once("connect", function(packet) {
that.handleConnect(packet, completeConnection);
});
};

Expand Down Expand Up @@ -266,7 +283,7 @@ Client.prototype.unsubscribeMapTo = function(topic) {
*
* @api private
*/
Client.prototype.handleConnect = function(packet) {
Client.prototype.handleConnect = function(packet, completeConnection) {
var that = this, logger, client = this.connection;

this.id = packet.clientId;
Expand All @@ -278,7 +295,6 @@ Client.prototype.handleConnect = function(packet) {
if (err) {
logger.info({ username: packet.username }, "authentication error");
client.stream.end();
that.connection.emit("error", err);
return;
}

Expand All @@ -299,20 +315,6 @@ Client.prototype.handleConnect = function(packet) {

that.clean = packet.clean;

var completeConnection = function(){
that.setUpTimer();

that.server.restoreClientSubscriptions(that, function() {
client.connack({
returnCode: 0
});

logger.info("client connected");
that.server.emit("clientConnected", that);
that.server.forwardOfflinePackets(that);
});
};

if (that.id in that.server.clients){
that.server.clients[that.id].close(completeConnection);
} else {
Expand Down
12 changes: 12 additions & 0 deletions test/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ var mqtt = require("mqtt");
var async = require("async");
var ascoltatori = require("ascoltatori");
var abstractServerTests = require("./abstract_server");
var net = require("net");

var moscaSettings = function() {
return {
Expand Down Expand Up @@ -55,6 +56,17 @@ describe("mosca.Server", function() {
this.instance.close(done);
});

it("should not emit 'clientDisconnected' for a non-mqtt client", function(done) {
var stream = net.connect({ port: this.settings.port });

this.instance.on("clientDisconnected", done);

stream.on("connect", function() {
stream.end();
done();
});
});

it("should pass mosca options to backend when publishing", function(done) {
var instance = this.instance;
buildClient(instance, done, function(client) {
Expand Down

0 comments on commit 94dec7b

Please sign in to comment.