diff --git a/lib/persistence/levelup.js b/lib/persistence/levelup.js index d60c636..c98ff2e 100644 --- a/lib/persistence/levelup.js +++ b/lib/persistence/levelup.js @@ -4,7 +4,6 @@ var levelup = require("levelup"); var sublevel = require("level-sublevel"); var AbstractPersistence = require("./abstract"); var util = require("util"); -var range = require('level-range'); var ttl = require('level-ttl'); var Qlobber = require("qlobber").Qlobber; var async = require("async"); @@ -135,6 +134,7 @@ LevelUpPersistence.prototype.lookupSubscriptions = function(client, done) { if (subscriptions && client.clean) { that._clientSubscriptions.del(client.id, function() { that.streamOfflinePackets(client, nop, function() { + Object.keys(subscriptions).forEach(function(key) { var levelKey = util.format("%s:%s", key, client.id); that._subLobber.remove(levelKey); @@ -175,22 +175,40 @@ LevelUpPersistence.prototype.storeOfflinePacket = function(packet, done) { LevelUpPersistence.prototype.streamOfflinePackets = function(client, cb, done) { var that = this; - var stream = range(that._offlinePackets, '%s:', client.id); + var prefix = util.format('%s:', client.id); + var stream = that._offlinePackets.createReadStream({ + start : prefix, + end : prefix + '~' + }); + var count = 0; + var ended = false; + stream.on("data", function(data) { - var key = util.format('%s:%s', client.id, data.key); - that._offlinePackets.del(key, function() { + count++; + that._offlinePackets.del(data.key, function() { + count--; if (!client.clean) { cb(null, data.value); } + if (ended && count === 0 && done) { + done(); + } }); }); if (cb) { stream.on("error", cb); } + + stream.on("end", function() { + ended = true; + if (count === 0 && done) { + done(); + } + }); if (done) { - stream.on("end", done); + stream.on("error", done); } }; diff --git a/package.json b/package.json index e91eae6..e697339 100644 --- a/package.json +++ b/package.json @@ -52,9 +52,8 @@ "minimatch": "~0.2.11", "bunyan": "~0.21.3", "memdown": "~0.2.0", - "levelup": "~0.12.0", - "level-sublevel": "~4.8.1", - "level-range": "0.0.0", + "levelup": "~0.16.0", + "level-sublevel": "~5.1.1", "level-ttl": "~0.4.0", "qlobber": "~0.3.0", "lru-cache": "~2.3.0", @@ -62,7 +61,7 @@ "extend": "~1.1.3" }, "optionalDependencies": { - "leveldown": "~0.6.2", + "leveldown": "~0.8.0", "zmq": "~2.4.0", "amqp": "~0.1.4", "redis": "~0.8.2",