Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Commit

Permalink
Merge pull request #500 from adpdigital/localSubImprove
Browse files Browse the repository at this point in the history
Improve local subscription update in persistent
  • Loading branch information
mcollina authored Jun 16, 2016
2 parents 863329d + c6daf18 commit 6e48d3d
Showing 1 changed file with 24 additions and 17 deletions.
41 changes: 24 additions & 17 deletions lib/persistence/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ function RedisPersistence(options, callback) {
this._closing = false;
this._closed = false;

var newSub = function(key, unsubs, retried, cb) {
var fetchAndUpdateLocalSub = function(key, unsubs, retried, cb) {
that._client.get(key, function(err, result) {
if (err) {
if (cb) {
Expand All @@ -94,33 +94,37 @@ function RedisPersistence(options, callback) {
}
}

var xs = key.split(":");
var id = key.substr(xs[0].length + xs[1].length + 2);
var subs = JSON.parse(result);

if (!result || typeof subs !== 'object') {
if (!retried) {
setTimeout(newSub.bind(null, key, unsubs, true, cb), 500);
setTimeout(fetchAndUpdateLocalSub.bind(null, key, unsubs, true, cb), 500);
}
return;
}

Object.keys(subs).forEach(function(sub) {
that._subMatcher.add(sub, id);
});

if( unsubs ) {
unsubs.forEach(function(sub) {
that._subMatcher.remove(sub, id);
});
}
updateLocalSub(key, subs, unsubs);

if (cb) {
cb();
}
});
};

var updateLocalSub = function(key, subs, unsubs) {
var xs = key.split(":");
var id = key.substr(xs[0].length + xs[1].length + 2);

Object.keys(subs).forEach(function(sub) {
that._subMatcher.add(sub, id);
});

if( unsubs ) {
unsubs.forEach(function(unsub) {
that._subMatcher.remove(unsub, id);
});
}
};

var that = this;

this._pubSubClient.subscribe(this.options.channel, function(){
Expand All @@ -138,7 +142,7 @@ function RedisPersistence(options, callback) {
});
subsStream.on('end', function(){
steed.each(keys, function(k,next){
newSub(k,null,false,next);
fetchAndUpdateLocalSub(k,null,false,next);
}, function(err) {
if (callback) {
callback(err, that);
Expand All @@ -153,7 +157,7 @@ function RedisPersistence(options, callback) {
}
var parsed = JSON.parse(message);
if (parsed.process !== that._id) {
newSub(parsed.key, parsed.unsubs);
updateLocalSub(parsed.key, parsed.subs, parsed.unsubs);
}
});
}
Expand Down Expand Up @@ -258,9 +262,10 @@ RedisPersistence.prototype.storeSubscriptions = function(client, cb) {
});

this._client.get(clientSubKey, function(err, currentSubs){
var unsubs;
if( !err && currentSubs ) {
currentSubs = JSON.parse(currentSubs);
var unsubs = Object.keys(currentSubs).filter(function (topic) {
unsubs = Object.keys(currentSubs).filter(function (topic) {
return !subscriptions[topic];
});
unsubs.forEach(function (topic) {
Expand All @@ -271,6 +276,8 @@ RedisPersistence.prototype.storeSubscriptions = function(client, cb) {
.set(clientSubKey, JSON.stringify(subscriptions))
.publish(that.options.channel, JSON.stringify({
key: clientSubKey,
subs: subscriptions,
unsubs: unsubs,
process: that._id
}))
.pexpire(clientSubKey, that.options.ttl.subscriptions);
Expand Down

0 comments on commit 6e48d3d

Please sign in to comment.