Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Commit

Permalink
Using extend for default options.
Browse files Browse the repository at this point in the history
Closes #39.
  • Loading branch information
mcollina committed Jul 16, 2013
1 parent 628581d commit 2c5efdd
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 79 deletions.
3 changes: 2 additions & 1 deletion .jshintrc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"mosca": false,
"expect": false,
"before": false,
"moscaSettings": false
"moscaSettings": false,
"globalLogger": false
}
}
29 changes: 16 additions & 13 deletions lib/persistence/levelup.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,19 @@ var range = require('level-range');
var ttl = require('level-ttl');
var Qlobber = require("qlobber").Qlobber;
var async = require("async");
var extend = require("extend");
var defaults = {
ttl: {
// TTL for subscriptions is 1 hour
subscriptions: 60 * 60 * 1000,

// TTL for packets is 1 hour
packets: 60 * 60 * 1000,

// the checkFrequency is 1 minute
checkFrequency: 60 * 1000
}
};

/**
* A LevelUp-based persistance.
Expand All @@ -32,21 +45,11 @@ function LevelUpPersistence(options, callback) {
if (!(this instanceof LevelUpPersistence)) {
return new LevelUpPersistence(options, callback);
}
options = options || {};
options.valueEncoding = "json";
options.ttl = options.ttl || {};

// TTL for subscriptions is 1 hour
options.ttl.subscriptions = options.ttl.subscriptions || 60 * 60 * 1000;

// TTL for packets is 1 hour
options.ttl.packets = options.ttl.packets || 60 * 60 * 1000;

// the checkFrequency is 1 minute
options.ttl.checkFrequency = options.ttl.checkFrequency || 60 * 1000;
this.options = extend(true, {}, defaults, options);
this.options.valueEncoding = "json";

this.options = options;
this.db = ttl(levelup(options.path, options), options.ttl);
this.db = ttl(levelup(this.options.path, this.options), options.ttl);
this._retained = this.db.sublevel("retained");
this._clientSubscriptions = this.db.sublevel("clientSubscriptions");
this._subscriptions = this.db.sublevel("subscriptions");
Expand Down
31 changes: 16 additions & 15 deletions lib/persistence/mongo.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@ var util = require("util");
var async = require("async");
var Qlobber = require("qlobber").Qlobber;
var topicPatterns = require("./utils").topicPatterns;
var extend = require("extend");
var defaults = {
ttl: {
// TTL for subscriptions is 1 hour
subscriptions: 60 * 60 * 1000,

// TTL for packets is 1 hour
packets: 60 * 60 * 1000,
},
mongo: {}
};

/**
* A persistance based on MongoDB.
Expand All @@ -31,19 +42,9 @@ function MongoPersistence(options, done) {
return new MongoPersistence(options);
}

options.ttl = options.ttl || {};

// TTL for subscriptions is 1 hour
options.ttl.subscriptions = options.ttl.subscriptions || 60 * 60 * 1000;

// TTL for packets is 1 hour
options.ttl.packets = options.ttl.packets || 60 * 60 * 1000;

options.mongo = options.mongo || {};

options.mongo.safe = true;

this.options = options;
this.options = extend(true, {}, defaults, options);
this.options.mongo.safe = true;

var that = this;

Expand All @@ -63,7 +64,7 @@ function MongoPersistence(options, done) {
that._subscriptions = coll;
async.parallel([
that._subscriptions.ensureIndex.bind(that._subscriptions, "client"),
that._subscriptions.ensureIndex.bind(that._subscriptions, { "added": 1 }, { expireAfterSeconds: Math.round(options.ttl.subscriptions / 1000 )} )
that._subscriptions.ensureIndex.bind(that._subscriptions, { "added": 1 }, { expireAfterSeconds: Math.round(that.options.ttl.subscriptions / 1000 )} )
], cb);
});
},
Expand All @@ -88,9 +89,9 @@ function MongoPersistence(options, done) {

// Connect to the db
if (options.connection) {
connected(null, options.connection);
connected(null, this.options.connection);
} else {
MongoClient.connect(options.url, options.mongo, connected);
MongoClient.connect(this.options.url, this.options.mongo, connected);
}
}

Expand Down
28 changes: 16 additions & 12 deletions lib/persistence/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,20 @@ var redis = require("redis");
var util = require("util");
var Qlobber = require("qlobber").Qlobber;
var async = require("async");
var extend = require("extend");
var defaults = {
channel: "moscaSync",
ttl: {
// TTL for subscriptions is 1 hour
subscriptions: 60 * 60 * 1000,

// TTL for packets is 1 hour
packets: 60 * 60 * 1000,

// the checkFrequency is 1 minute
checkFrequency: 60 * 1000
}
};

/**
* A Redis-based persistance.
Expand All @@ -31,17 +45,7 @@ function RedisPersistence(options, callback) {
return new RedisPersistence(options, callback);
}

options.ttl = options.ttl || {};

// TTL for subscriptions is 1 hour
options.ttl.subscriptions = options.ttl.subscriptions || 60 * 60 * 1000;

// TTL for packets is 1 hour
options.ttl.packets = options.ttl.packets || 60 * 60 * 1000;

options.channel = options.channel || "moscaSync";

this.options = options;
this.options = extend(true, {}, defaults, options);

this._subLobber = new Qlobber({ separator: "/" });

Expand Down Expand Up @@ -81,7 +85,7 @@ function RedisPersistence(options, callback) {

var that = this;

this._pubSubClient.subscribe(options.channel);
this._pubSubClient.subscribe(this.options.channel);
this._pubSubClient.on("message", function(channel, message) {
var parsed = JSON.parse(message);
if (parsed.process !== that._uuid) {
Expand Down
32 changes: 17 additions & 15 deletions lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,23 @@ var async = require("async");
var ascoltatori = require("ascoltatori");
var EventEmitter = require("events").EventEmitter;
var bunyan = require("bunyan");
var extend = require("extend");
var Client = require("./client");
var defaults = {
port: 1883,
backend: {
json: false
},
baseRetryTimeout: 1000,
logger: {
name: "mosca",
level: 40,
serializers: {
client: clientSerializer,
packet: packetSerializer
}
}
};

/**
* The Mosca Server is a very simple MQTT server that
Expand Down Expand Up @@ -44,17 +60,7 @@ var Client = require("./client");
function Server(opts, callback) {
EventEmitter.call(this);

this.opts = opts || {};
this.opts.port = this.opts.port || 1883;
this.opts.backend = this.opts.backend || {};
this.opts.baseRetryTimeout = this.opts.baseRetryTimeout || 1000;
this.opts.logger = this.opts.logger || {};
this.opts.logger.name = this.opts.logger.name || "mosca";
this.opts.logger.level = this.opts.logger.level || 40;
this.opts.logger.serializers = {
client: clientSerializer,
packet: packetSerializer
};
this.opts = extend(true, {}, defaults, opts);

if (this.opts.persistence && this.opts.persistence.factory) {
this.opts.persistence.factory(this.opts.persistence).wire(this);
Expand All @@ -73,10 +79,6 @@ function Server(opts, callback) {
new Client(connection, that);
};

if (this.opts.backend.json === undefined) {
this.opts.backend.json = false;
}

this.ascoltatore = ascoltatori.build(this.opts.backend);
this.ascoltatore.on("error", this.emit.bind(this));

Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@
"level-ttl": "~0.2.0",
"qlobber": "~0.3.0",
"lru-cache": "~2.3.0",
"node-uuid": "~1.4.0"
"node-uuid": "~1.4.0",
"extend": "~1.1.3"
},
"optionalDependencies": {
"leveldown": "~0.6.1",
Expand Down
3 changes: 2 additions & 1 deletion test/cli_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ describe("mosca.cli", function() {
args.push("--config");
args.push("test/sample_config.js");
startServer(done, function(server) {
expect(server.opts).to.eql(require("./sample_config"));
expect(server.opts).to.have.property("port", 2883);
expect(server.opts).to.have.deep.property("backend.port", 3833);
});
});

Expand Down
15 changes: 9 additions & 6 deletions test/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,19 @@ global.rabbitSettings = function() {
};

var bunyan = require("bunyan");
global.moscaSettings = function() {
var logger = bunyan.createLogger({
name: "moscaTests"
});

logger.level(60);
global.globalLogger = bunyan.createLogger({
name: "moscaTests",
level: 60
});

global.moscaSettings = function() {
return {
port: nextPort(),
logger: logger
logger: {
name: "moscaTests",
level: 60
}
};
};

Expand Down
30 changes: 15 additions & 15 deletions test/persistence/abstract.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ module.exports = function(create) {
};

var client = {
logger: moscaSettings().logger,
logger: globalLogger,
forward: function(topic, payload, options, pattern) {
expect(topic).to.eql(packet1.topic);
expect(payload).to.eql(packet1.payload);
Expand All @@ -161,7 +161,7 @@ module.exports = function(create) {
var client = {
id: "my client id - 42",
clean: false,
logger: moscaSettings().logger,
logger: globalLogger,
subscriptions: {
hello: {
qos: 1
Expand All @@ -175,7 +175,7 @@ module.exports = function(create) {
var client = {
id: "my client id - 42",
clean: false,
logger: moscaSettings().logger,
logger: globalLogger,
subscriptions: {
hello: {
qos: 1
Expand All @@ -193,7 +193,7 @@ module.exports = function(create) {
var client = {
id: "my client id - 42",
clean: false,
logger: moscaSettings().logger,
logger: globalLogger,
subscriptions: {
hello: {
qos: 1
Expand All @@ -214,7 +214,7 @@ module.exports = function(create) {
var client = {
id: "my client id - 42",
clean: true,
logger: moscaSettings().logger,
logger: globalLogger,
subscriptions: {
hello: {
qos: 1
Expand All @@ -236,7 +236,7 @@ module.exports = function(create) {
var client = {
id: "my client id - 42",
clean: false,
logger: moscaSettings().logger,
logger: globalLogger,
subscriptions: {
hello: {
qos: 1
Expand All @@ -258,7 +258,7 @@ module.exports = function(create) {
var client = {
id: "my client id - 42",
clean: false,
logger: moscaSettings().logger,
logger: globalLogger,
subscriptions: {
hello: {
qos: 1
Expand All @@ -285,7 +285,7 @@ module.exports = function(create) {
var client = {
id: "my client id - 42",
clean: false,
logger: moscaSettings().logger,
logger: globalLogger,
subscriptions: {
hello: {
qos: 1
Expand Down Expand Up @@ -313,7 +313,7 @@ module.exports = function(create) {
var client = {
id: "my client id - 42",
clean: false,
logger: moscaSettings().logger,
logger: globalLogger,
subscriptions: {
hello: {
qos: 1
Expand All @@ -336,7 +336,7 @@ module.exports = function(create) {
var client = {
id: "my client id - 42",
clean: false,
logger: moscaSettings().logger,
logger: globalLogger,
subscriptions: {
hello: {
qos: 1
Expand All @@ -359,7 +359,7 @@ module.exports = function(create) {
var client = {
id: "my client id - 42",
clean: false,
logger: moscaSettings().logger,
logger: globalLogger,
subscriptions: {
hello: {
qos: 0
Expand All @@ -380,7 +380,7 @@ module.exports = function(create) {
var client = {
id: "my client id - 42",
clean: false,
logger: moscaSettings().logger,
logger: globalLogger,
subscriptions: {
hello: {
qos: 1
Expand Down Expand Up @@ -462,7 +462,7 @@ module.exports = function(create) {
var client = {
id: "my client id - 42",
clean: false,
logger: moscaSettings().logger,
logger: globalLogger,
subscriptions: {
hello: 1
}
Expand Down Expand Up @@ -541,7 +541,7 @@ module.exports = function(create) {
var client = {
id: "my client id - 42",
clean: false,
logger: moscaSettings().logger,
logger: globalLogger,
subscriptions: {
"hello/#": {
qos: 1
Expand Down Expand Up @@ -581,7 +581,7 @@ module.exports = function(create) {
var client = {
id: "my client id - 42",
clean: false,
logger: moscaSettings().logger,
logger: globalLogger,
subscriptions: {
hello: {
qos: 1
Expand Down
Loading

0 comments on commit 2c5efdd

Please sign in to comment.