Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Commit

Permalink
Fixed multiple message delivery on tree based topology.
Browse files Browse the repository at this point in the history
Closes #8.
  • Loading branch information
mcollina committed Feb 20, 2013
1 parent 37b29b7 commit 296b1be
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 16 deletions.
44 changes: 32 additions & 12 deletions lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,13 @@ Server.prototype.serve = function (client) {
client.timer = setTimeout(function() {
debug("keepalive timemout for " + client.id);
that.closeConn(client);
}, client.keepalive * 1000 * 5/4);
}, client.keepalive * 1001 * 5/4);
};

client.subscriptions = {};

var forward = function(topic, payload) {
debug("delivering message on " + topic + " to " + client.id);
client.publish({ topic: topic, payload: payload });
};

Expand All @@ -152,14 +155,19 @@ Server.prototype.serve = function (client) {
});

client.on("subscribe", function(packet) {
var granted = packet.subscriptions.map(function(e) {
var granted = packet.subscriptions.map(function (e) {
return 0;
});

async.parallel(packet.subscriptions.map(function(s) {
var subs = packet.subscriptions.filter(function (s) {
return !client.subscriptions[s.topic];
});

async.parallel(subs.map(function(s) {
return function(cb) {
that.ascoltatore.subscribe(s.topic.replace("#", "*"), forward, function() {
debug("subscribed " + client.id + " to " + s.topic);
client.subscriptions[s.topic] = true;
cb();
});
};
Expand All @@ -174,26 +182,37 @@ Server.prototype.serve = function (client) {
that.emit("published", packet, client);
});

client.on("unsubscribe", function(packet) {
async.parallel(packet.unsubscriptions.map(function(topic) {
return function(cb) {
var unsubscribeMapTo = function(topic) {
return function(cb) {
that.ascoltatore.unsubscribe(topic.replace("#", "*"), forward, function() {
debug("unsubscribed " + client.id + " from " + topic);
that.ascoltatore.unsubscribe(topic.replace("#", "*"), forward, cb);
};
}), function() {
delete client.subscriptions[topic];
cb();
});
};
};

client.on("unsubscribe", function(packet) {
async.parallel(packet.unsubscriptions.map(unsubscribeMapTo), function() {
client.unsuback({ messageId: packet.messageId });
});
});


var unsubAndClose = function () {
async.parallel(Object.keys(client.subscriptions).map(unsubscribeMapTo), function() {
that.closeConn(client);
});
};

client.on("disconnect", function() {
debug("disconnected client " + client.id);
that.closeConn(client);
unsubAndClose();
});

client.on("error", function(err) {
debug("error for client " + client.id);
debug(err);
that.closeConn(client);
unsubAndClose();
});
};

Expand All @@ -210,6 +229,7 @@ Server.prototype.closeConn = function(client, callback) {
clearTimeout(client.timer);
delete this.clients[client.id];
}

client.stream.end();
client.removeAllListeners();
next(callback);
Expand Down
76 changes: 72 additions & 4 deletions test/server_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ var ascoltatori = require("ascoltatori");
describe(mosca.Server, function() {

var instance;
var secondInstance = null;
var settings;

beforeEach(function(done) {
Expand All @@ -14,7 +15,17 @@ describe(mosca.Server, function() {
});

afterEach(function(done) {
instance.close(done);
var instances = [instance];

if (secondInstance) {
instances.push(secondInstance);
}

async.parallel(instances.map(function (i) {
return function (cb) {
i.close(cb);
};
}), done);
});

function donner(count, done) {
Expand All @@ -28,9 +39,9 @@ describe(mosca.Server, function() {

function buildClient(done, callback) {
mqtt.createClient(settings.port, settings.host, function(err, client) {
if(err)
done(err)
else {
if(err) {
done(err);
} else {
client.on('close', function() {
done();
});
Expand Down Expand Up @@ -342,4 +353,61 @@ describe(mosca.Server, function() {
client1.subscribe({ topic: "hello/#" });
});
});

it("should support subscribing correctly to wildcards in a tree-based topology", function(done) {
var d = donner(3, done);

async.waterfall([
function (cb) {
settings.backend = {
port: settings.port,
type: "mqtt"
};
settings.port = settings.port + 1000;
secondInstance = new mosca.Server(settings, cb);
},
function (cb) {
buildAndConnect(d, function (client1) {
cb(null, client1);
});
},
function (client1, cb) {
var called = false;
client1.on("publish", function(packet) {
expect(called).to.be.eql(false);
called = true;
setTimeout(function () {
client1.disconnect();
});
});

client1.subscribe({ topic: "hello/#" });
client1.on("suback", function () {
cb(null);
});
},
function (cb) {
buildAndConnect(d, function (client3) {
cb(null, client3);
});
},
function (client3, cb) {
client3.subscribe({ topic: "hello/#" });
client3.on("suback", function () {
// we need to simulate a "stuck" subscription
client3.stream.end();
cb(null);
});
},
function (cb) {
buildAndConnect(d, function(client2) {
cb(null, client2);
});
},
function (client2, cb) {
client2.publish({ topic: "hello/world", payload: "some data" });
client2.disconnect();
}
]);
});
});

0 comments on commit 296b1be

Please sign in to comment.