diff --git a/.travis.yml b/.travis.yml index 68c6475..f518c13 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,24 +1,32 @@ language: node_js -# currently needed for some native builds on iojs 3 -addons: - apt: - sources: - - ubuntu-toolchain-r-test - packages: - - g++-4.8 +sudo: false node_js: - 0.12 - 4 - - 5 - 6 services: - redis-server - - mongodb script: - npm run coverage -# currently needed for some native builds on iojs 3 -before_install: - - sudo update-alternatives --install /usr/bin/g++ g++ /usr/bin/g++-4.8 90 - - g++ -v after_success: - npm run publish-coverage +env: + global: + - CC=gcc-4.8 + - CXX=g++-4.8 + - MONGODB_VERSION="3.2.5" +addons: + apt: + sources: + - ubuntu-toolchain-r-test + packages: + - gcc-4.8 + - g++-4.8 + - libzmq3-dev +before_install: + - wget http://fastdl.mongodb.org/linux/mongodb-linux-x86_64-$MONGODB_VERSION.tgz + - tar xfz mongodb-linux-x86_64-$MONGODB_VERSION.tgz + - export PATH=`pwd`/mongodb-linux-x86_64-$MONGODB_VERSION/bin:$PATH + - mkdir -p data/db + - mongod --dbpath=data/db > /dev/null & + - sleep 5 diff --git a/README.md b/README.md index f0c214c..e9d9378 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -Mosca   [![Build Status](https://travis-ci.org/mcollina/mosca.png)](https://travis-ci.org/mcollina/mosca)  [![Coverage Status](https://coveralls.io/repos/mcollina/mosca/badge.png)](https://coveralls.io/r/mcollina/mosca) +Mosca   [![Build Status](https://travis-ci.org/mcollina/mosca.svg)](https://travis-ci.org/mcollina/mosca)  [![Coverage Status](https://coveralls.io/repos/mcollina/mosca/badge.svg)](https://coveralls.io/r/mcollina/mosca) ==================== [![MOSCA](http://cloud.dynamatik.com/image/3I3I0q1M1x0E/mosca_small.png)](https://github.com/mcollina/mosca) diff --git a/lib/persistence/abstract.js b/lib/persistence/abstract.js index 4b03b95..14ab724 100644 --- a/lib/persistence/abstract.js +++ b/lib/persistence/abstract.js @@ -86,7 +86,7 @@ AbstractPersistence.prototype.wire = function(server) { server.forwardRetained = function(pattern, client, done) { that.lookupRetained(pattern, function(err, matches) { if (err) { - client.emit("error", err); + client.connection.emit("error", err); return; } steed.each(matches, function(match, cb) { @@ -102,7 +102,7 @@ AbstractPersistence.prototype.wire = function(server) { server.restoreClientSubscriptions = function restoreClientSubscriptions(client, done) { that.lookupSubscriptions(client, function(err, subscriptions) { if (err) { - client.emit("error", err); + client.connection.emit("error", err); return; } diff --git a/lib/persistence/redis.js b/lib/persistence/redis.js index b0905fd..607e4b3 100644 --- a/lib/persistence/redis.js +++ b/lib/persistence/redis.js @@ -146,6 +146,8 @@ function RedisPersistence(options, callback) { }, function(err) { if (callback) { callback(err, that); + callback = null; + return; } }); }); diff --git a/lib/server.js b/lib/server.js index fdc7897..96afbbc 100755 --- a/lib/server.js +++ b/lib/server.js @@ -308,6 +308,18 @@ Server.prototype.toString = function() { return 'mosca.Server'; }; +/** + * Subscribes to a topic on the MQTT broker. + * + * @api public + * @param {String} topic The MQTT topic + * @param {Function} callback The callback with (topic, payload) arguments + * @param {Function} done The subscription result + */ +Server.prototype.subscribe = function subscribe(topic, callback, done) { + this.ascoltatore.subscribe(topic, callback, done); +}; + /** * Publishes a packet on the MQTT broker. * diff --git a/package.json b/package.json index a57da63..e818a8b 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "mosca", - "version": "1.3.0", + "version": "1.4.1", "description": "MQTT broker as a module", "main": "index.js", "bin": { diff --git a/test/server.js b/test/server.js index 9c25681..365a458 100644 --- a/test/server.js +++ b/test/server.js @@ -186,6 +186,26 @@ describe("mosca.Server", function() { }); }); + it("should support subscribing via server.subscribe", function(done) { + var that = this; + buildAndConnect(done, this.instance, buildOpts(), function(client) { + + that.instance.subscribe('a/+', function(topic, payload){ + expect(topic).to.be.equal('a/b'); + expect(payload.toString()).to.be.equal('some data'); + client.disconnect(); + }, function(){ + var messageId = Math.floor(65535 * Math.random()); + client.publish({ + topic: "a/b", + payload: "some data", + messageId: messageId, + qos: 1 + }); + }); + }); + }); + describe("timers", function() { var clock;