Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Commit

Permalink
Merged upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
samirnaik committed Jul 16, 2013
2 parents f3f3bfb + ae2f0d0 commit 9e72258
Show file tree
Hide file tree
Showing 12 changed files with 266 additions and 95 deletions.
3 changes: 2 additions & 1 deletion .jshintrc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"mosca": false,
"expect": false,
"before": false,
"moscaSettings": false
"moscaSettings": false,
"globalLogger": false
}
}
17 changes: 17 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,23 @@
History
=======

## 0.9.0

* Bumped Ascoltatori to 0.11.0.
* Improved logging for each client.
* Fixed multiple topic naming, mainly 'test/topic' and 'test/topic/'
should be the same
[#46](https://github.com/mcollina/mosca/pull/46).
* Better handling of defaults
[#39](https://github.com/mcollina/mosca/pull/39).
* Enforcing client identifier length
[#33](https://github.com/mcollina/mosca/pull/33).

## 0.8.2

* Passing the correct Client object to `authorizePublish`,
[#43](https://github.com/mcollina/mosca/pull/43).

## 0.8.1

* Refactored the Server-Persistence wiring interface
Expand Down
32 changes: 23 additions & 9 deletions lib/client.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"use strict";

var async = require("async");
var async = require("async");
var REGEXP = /(([^/])\/+$)|(^\/+([^/]))|(\/+(\/))/g;

/**
* The Client is just the object modelling a server representation
Expand Down Expand Up @@ -28,7 +29,7 @@ function Client(conn, server) {
* @api private
*/
Client.prototype._setup = function() {
var that = this, logger = this.logger, client = that.connection;
var that = this, client = that.connection;

this._buildForward();

Expand All @@ -41,7 +42,7 @@ Client.prototype._setup = function() {
});

client.on("pingreq", function() {
logger.debug("pingreq");
that.logger.debug("pingreq");
that.setUpTimer();
that.connection.pingresp();
});
Expand All @@ -53,17 +54,18 @@ Client.prototype._setup = function() {

client.on("publish", function(packet) {
that.setUpTimer();
that.server.authorizePublish(client, packet.topic, packet.payload, function(err, success) {
packet.topic = packet.topic.replace(REGEXP, "$2$4$6");
that.server.authorizePublish(that, packet.topic, packet.payload, function(err, success) {
that.handleAuthorizePublish(err, success, packet);
});
});

client.on("unsubscribe", function(packet) {
that.setUpTimer();
logger.info({ packet: packet }, "unsubscribe received");
that.logger.info({ packet: packet }, "unsubscribe received");
async.parallel(packet.unsubscriptions.map(that.unsubscribeMapTo.bind(that)), function(err) {
if (err) {
logger.warn(err);
that.logger.warn(err);
that.unsubAndClose();
return;
}
Expand All @@ -74,17 +76,17 @@ Client.prototype._setup = function() {
});

client.on("disconnect", function() {
logger.debug("disconnect requested");
that.logger.debug("disconnect requested");
that.unsubAndClose();
});

client.on("error", function(err) {
logger.warn(err);
that.logger.warn(err);
that.unsubAndClose();
});

client.on("close", function() {
logger.info("disconnected");
that.logger.info("disconnected");
that._closed = true;
that.onClose();
});
Expand Down Expand Up @@ -223,6 +225,14 @@ Client.prototype.handleConnect = function(packet) {
this.id = packet.clientId;
logger = that.logger.child({ client: this });

if (this.id.length > 23) {
client.connack({
returnCode: 2
});
client.stream.end();
return;
}

that.server.authenticate(this, packet.username, packet.password,
function(err, verdict) {

Expand All @@ -244,6 +254,9 @@ Client.prototype.handleConnect = function(packet) {

that.keepalive = packet.keepalive;
that.will = packet.will;
if (that.will) {
that.will.topic = that.will.topic.replace(REGEXP, "$2$4$6");
}

that.clean = packet.clean;

Expand Down Expand Up @@ -342,6 +355,7 @@ Client.prototype.handleSubscribe = function(packet) {

var granted = calculateGranted(this, packet);
var subs = packet.subscriptions.filter(function(s) {
s.topic = s.topic.replace(REGEXP, "$2$4$6");
return that.subscriptions[s.topic] === undefined;
});

Expand Down
29 changes: 16 additions & 13 deletions lib/persistence/levelup.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,19 @@ var range = require('level-range');
var ttl = require('level-ttl');
var Qlobber = require("qlobber").Qlobber;
var async = require("async");
var extend = require("extend");
var defaults = {
ttl: {
// TTL for subscriptions is 1 hour
subscriptions: 60 * 60 * 1000,

// TTL for packets is 1 hour
packets: 60 * 60 * 1000,

// the checkFrequency is 1 minute
checkFrequency: 60 * 1000
}
};

/**
* A LevelUp-based persistance.
Expand All @@ -32,21 +45,11 @@ function LevelUpPersistence(options, callback) {
if (!(this instanceof LevelUpPersistence)) {
return new LevelUpPersistence(options, callback);
}
options = options || {};
options.valueEncoding = "json";
options.ttl = options.ttl || {};

// TTL for subscriptions is 1 hour
options.ttl.subscriptions = options.ttl.subscriptions || 60 * 60 * 1000;

// TTL for packets is 1 hour
options.ttl.packets = options.ttl.packets || 60 * 60 * 1000;

// the checkFrequency is 1 minute
options.ttl.checkFrequency = options.ttl.checkFrequency || 60 * 1000;
this.options = extend(true, {}, defaults, options);
this.options.valueEncoding = "json";

this.options = options;
this.db = ttl(levelup(options.path, options), options.ttl);
this.db = ttl(levelup(this.options.path, this.options), options.ttl);
this._retained = this.db.sublevel("retained");
this._clientSubscriptions = this.db.sublevel("clientSubscriptions");
this._subscriptions = this.db.sublevel("subscriptions");
Expand Down
31 changes: 16 additions & 15 deletions lib/persistence/mongo.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@ var util = require("util");
var async = require("async");
var Qlobber = require("qlobber").Qlobber;
var topicPatterns = require("./utils").topicPatterns;
var extend = require("extend");
var defaults = {
ttl: {
// TTL for subscriptions is 1 hour
subscriptions: 60 * 60 * 1000,

// TTL for packets is 1 hour
packets: 60 * 60 * 1000,
},
mongo: {}
};

/**
* A persistance based on MongoDB.
Expand All @@ -31,19 +42,9 @@ function MongoPersistence(options, done) {
return new MongoPersistence(options);
}

options.ttl = options.ttl || {};

// TTL for subscriptions is 1 hour
options.ttl.subscriptions = options.ttl.subscriptions || 60 * 60 * 1000;

// TTL for packets is 1 hour
options.ttl.packets = options.ttl.packets || 60 * 60 * 1000;

options.mongo = options.mongo || {};

options.mongo.safe = true;

this.options = options;
this.options = extend(true, {}, defaults, options);
this.options.mongo.safe = true;

var that = this;

Expand All @@ -63,7 +64,7 @@ function MongoPersistence(options, done) {
that._subscriptions = coll;
async.parallel([
that._subscriptions.ensureIndex.bind(that._subscriptions, "client"),
that._subscriptions.ensureIndex.bind(that._subscriptions, { "added": 1 }, { expireAfterSeconds: Math.round(options.ttl.subscriptions / 1000 )} )
that._subscriptions.ensureIndex.bind(that._subscriptions, { "added": 1 }, { expireAfterSeconds: Math.round(that.options.ttl.subscriptions / 1000 )} )
], cb);
});
},
Expand All @@ -88,9 +89,9 @@ function MongoPersistence(options, done) {

// Connect to the db
if (options.connection) {
connected(null, options.connection);
connected(null, this.options.connection);
} else {
MongoClient.connect(options.url, options.mongo, connected);
MongoClient.connect(this.options.url, this.options.mongo, connected);
}
}

Expand Down
28 changes: 16 additions & 12 deletions lib/persistence/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,20 @@ var redis = require("redis");
var util = require("util");
var Qlobber = require("qlobber").Qlobber;
var async = require("async");
var extend = require("extend");
var defaults = {
channel: "moscaSync",
ttl: {
// TTL for subscriptions is 1 hour
subscriptions: 60 * 60 * 1000,

// TTL for packets is 1 hour
packets: 60 * 60 * 1000,

// the checkFrequency is 1 minute
checkFrequency: 60 * 1000
}
};

/**
* A Redis-based persistance.
Expand All @@ -31,17 +45,7 @@ function RedisPersistence(options, callback) {
return new RedisPersistence(options, callback);
}

options.ttl = options.ttl || {};

// TTL for subscriptions is 1 hour
options.ttl.subscriptions = options.ttl.subscriptions || 60 * 60 * 1000;

// TTL for packets is 1 hour
options.ttl.packets = options.ttl.packets || 60 * 60 * 1000;

options.channel = options.channel || "moscaSync";

this.options = options;
this.options = extend(true, {}, defaults, options);

this._subLobber = new Qlobber({ separator: "/" });

Expand Down Expand Up @@ -81,7 +85,7 @@ function RedisPersistence(options, callback) {

var that = this;

this._pubSubClient.subscribe(options.channel);
this._pubSubClient.subscribe(this.options.channel);
this._pubSubClient.on("message", function(channel, message) {
var parsed = JSON.parse(message);
if (parsed.process !== that._uuid) {
Expand Down
32 changes: 17 additions & 15 deletions lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,23 @@ var async = require("async");
var ascoltatori = require("ascoltatori");
var EventEmitter = require("events").EventEmitter;
var bunyan = require("bunyan");
var extend = require("extend");
var Client = require("./client");
var defaults = {
port: 1883,
backend: {
json: false
},
baseRetryTimeout: 1000,
logger: {
name: "mosca",
level: 40,
serializers: {
client: clientSerializer,
packet: packetSerializer
}
}
};

/**
* The Mosca Server is a very simple MQTT server that
Expand Down Expand Up @@ -44,17 +60,7 @@ var Client = require("./client");
function Server(opts, callback) {
EventEmitter.call(this);

this.opts = opts || {};
this.opts.port = this.opts.port || 1883;
this.opts.backend = this.opts.backend || {};
this.opts.baseRetryTimeout = this.opts.baseRetryTimeout || 1000;
this.opts.logger = this.opts.logger || {};
this.opts.logger.name = this.opts.logger.name || "mosca";
this.opts.logger.level = this.opts.logger.level || 40;
this.opts.logger.serializers = {
client: clientSerializer,
packet: packetSerializer
};
this.opts = extend(true, {}, defaults, opts);

if (this.opts.persistence && this.opts.persistence.factory) {
this.opts.persistence.factory(this.opts.persistence).wire(this);
Expand All @@ -73,10 +79,6 @@ function Server(opts, callback) {
new Client(connection, that);
};

if (this.opts.backend.json === undefined) {
this.opts.backend.json = false;
}

this.ascoltatore = ascoltatori.build(this.opts.backend);
this.ascoltatore.on("error", this.emit.bind(this));

Expand Down
7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "mosca",
"version": "0.8.1",
"version": "0.9.0",
"description": "The multi-transport MQTT broker for node.js. It supports AMQP, Redis, ZeroMQ, MongoDB or just MQTT.",
"main": "index.js",
"bin": {
Expand Down Expand Up @@ -47,7 +47,7 @@
"dependencies": {
"mqtt": "~0.2.10",
"async": "~0.2.4",
"ascoltatori": "~0.8.0",
"ascoltatori": "~0.11.0",
"debug": "~0.7.2",
"commander": "~1.1.1",
"minimatch": "~0.2.11",
Expand All @@ -60,7 +60,8 @@
"level-ttl": "~0.2.0",
"qlobber": "~0.3.0",
"lru-cache": "~2.3.0",
"node-uuid": "~1.4.0"
"node-uuid": "~1.4.0",
"extend": "~1.1.3"
},
"optionalDependencies": {
"leveldown": "~0.6.1",
Expand Down
3 changes: 2 additions & 1 deletion test/cli_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ describe("mosca.cli", function() {
args.push("--config");
args.push("test/sample_config.js");
startServer(done, function(server) {
expect(server.opts).to.eql(require("./sample_config"));
expect(server.opts).to.have.property("port", 2883);
expect(server.opts).to.have.deep.property("backend.port", 3833);
});
});

Expand Down
Loading

0 comments on commit 9e72258

Please sign in to comment.