Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Commit

Permalink
Upgraded LevelUp and associates.
Browse files Browse the repository at this point in the history
  • Loading branch information
mcollina committed Sep 18, 2013
1 parent ccf0e06 commit 95feb36
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 9 deletions.
28 changes: 23 additions & 5 deletions lib/persistence/levelup.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ var levelup = require("levelup");
var sublevel = require("level-sublevel");
var AbstractPersistence = require("./abstract");
var util = require("util");
var range = require('level-range');
var ttl = require('level-ttl');
var Qlobber = require("qlobber").Qlobber;
var async = require("async");
Expand Down Expand Up @@ -135,6 +134,7 @@ LevelUpPersistence.prototype.lookupSubscriptions = function(client, done) {
if (subscriptions && client.clean) {
that._clientSubscriptions.del(client.id, function() {
that.streamOfflinePackets(client, nop, function() {

Object.keys(subscriptions).forEach(function(key) {
var levelKey = util.format("%s:%s", key, client.id);
that._subLobber.remove(levelKey);
Expand Down Expand Up @@ -175,22 +175,40 @@ LevelUpPersistence.prototype.storeOfflinePacket = function(packet, done) {
LevelUpPersistence.prototype.streamOfflinePackets = function(client, cb, done) {

var that = this;
var stream = range(that._offlinePackets, '%s:', client.id);
var prefix = util.format('%s:', client.id);
var stream = that._offlinePackets.createReadStream({
start : prefix,
end : prefix + '~'
});
var count = 0;
var ended = false;

stream.on("data", function(data) {
var key = util.format('%s:%s', client.id, data.key);
that._offlinePackets.del(key, function() {
count++;
that._offlinePackets.del(data.key, function() {
count--;
if (!client.clean) {
cb(null, data.value);
}
if (ended && count === 0 && done) {
done();
}
});
});

if (cb) {
stream.on("error", cb);
}

stream.on("end", function() {
ended = true;
if (count === 0 && done) {
done();
}
});

if (done) {
stream.on("end", done);
stream.on("error", done);
}
};

Expand Down
7 changes: 3 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,16 @@
"minimatch": "~0.2.11",
"bunyan": "~0.21.3",
"memdown": "~0.2.0",
"levelup": "~0.12.0",
"level-sublevel": "~4.8.1",
"level-range": "0.0.0",
"levelup": "~0.16.0",
"level-sublevel": "~5.1.1",
"level-ttl": "~0.4.0",
"qlobber": "~0.3.0",
"lru-cache": "~2.3.0",
"node-uuid": "~1.4.0",
"extend": "~1.1.3"
},
"optionalDependencies": {
"leveldown": "~0.6.2",
"leveldown": "~0.8.0",
"zmq": "~2.4.0",
"amqp": "~0.1.4",
"redis": "~0.8.2",
Expand Down

0 comments on commit 95feb36

Please sign in to comment.