Skip to content

Commit

Permalink
Simplifies handleOrProxyAll interface by making it async instead of sync
Browse files Browse the repository at this point in the history
  • Loading branch information
markyen committed Feb 12, 2015
1 parent d2dea77 commit fcd0ff8
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 37 deletions.
42 changes: 27 additions & 15 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -843,26 +843,35 @@ RingPop.prototype.handleOrProxy =
};

RingPop.prototype.handleOrProxyAll =
function handleOrProxyAll(opts) {
function handleOrProxyAll(opts, cb) {
var self = this;
var keys = opts.keys;
var req = opts.req;
var localHandler = opts.localHandler;

var whoami = this.whoami();
var keysByDest = _.groupBy(keys, this.lookup.bind(this));
var dests = Object.keys(keysByDest);
var pending = dests.length;
var responses = {};

var dests = mapUniq(keys, this.lookup.bind(this));
return dests.reduce(function(responses, dest) {
dests.forEach(function(dest) {
var res = hammock.Response();
res.on('response', function(err, data) {
onResponse(err, data, dest);
});
if (whoami === dest) {
self.logger.trace('handleOrProxyAll was handled', {
keys: keys,
url: req && req.url
url: req && req.url,
dest: dest
});
process.nextTick(localHandler.bind(null, req, res));
} else {
self.logger.trace('handleOrProxyAll was proxied', {
keys: keys,
url: req && req.url
url: req && req.url,
dest: dest
});
process.nextTick(self.proxyReq.bind(self, {
keys: keys,
Expand All @@ -871,9 +880,19 @@ RingPop.prototype.handleOrProxyAll =
dest: dest
}));
}
responses[dest] = res;
return responses;
}, {});
});

function onResponse(err, data, dest) {
if (data) {
data.dest = dest;
data.keys = keysByDest[dest];
}
responses[dest] = data;
if ((--pending === 0 || err) && cb) {
cb(err, responses);
cb = null;
}
}
};

RingPop.prototype.validateProps = function validateProps(opts, props) {
Expand All @@ -886,11 +905,4 @@ RingPop.prototype.validateProps = function validateProps(opts, props) {
}
};

function mapUniq(list, iteratee) {
return Object.keys(list.reduce(function(acc, val) {
acc[iteratee(val)] = null;
return acc;
}, {}));
}

module.exports = RingPop;
35 changes: 15 additions & 20 deletions test/integration/handle_or_proxy_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,39 +66,34 @@ test('handleOrProxyAll() proxies and calls local handler', function t(assert) {
var k = cluster.keys;
var keys = [k.one, k.two, k.two, k.three]

var reses = cluster.requestAll({
cluster.requestAll({
keys: keys,
host: 'one',
localHandler: createServerHandler('one'),
json: { hello: true }
});
}, onResponses);
});

var hosts = Object.keys(reses);
function onResponses(err, responses) {
assert.ifError(err);

var hosts = Object.keys(responses);
assert.equal(hosts.length, 3);
var onLastResponse = _.after(hosts.length, _onLastResponse);
hosts.forEach(function(host) {
var res = reses[host];
res.on('response', onResponse);
res.on('response', onLastResponse);
});

function onResponse(err, data) {
assert.ifError(err);
var data = responses[host];
assert.equal(data.statusCode, 200);
tryIt(function parse() {
data.body = JSON.parse(data.body);
assert.equal(data.body.payload.hello, true);
}, assert.ifError);
}
function _onLastResponse() {
assert.equal(handlerCallCounts.one, 1);
assert.equal(handlerCallCounts.two, 1);
assert.equal(handlerCallCounts.three, 1);
});
assert.equal(handlerCallCounts.one, 1);
assert.equal(handlerCallCounts.two, 1);
assert.equal(handlerCallCounts.three, 1);

cluster.destroy();
assert.end();
}
});
cluster.destroy();
assert.end();
}

function createServerHandler(name) {
return function serverHandle(req, res) {
Expand Down
4 changes: 2 additions & 2 deletions test/lib/alloc-cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ function allocCluster(options, onReady) {
return handle;
}

function requestAll(opts) {
function requestAll(opts, cb) {
var host = opts.host;
opts.req = allocRequest(opts);
return cluster[host].handleOrProxyAll(opts);
cluster[host].handleOrProxyAll(opts, cb);
}
}

Expand Down

0 comments on commit fcd0ff8

Please sign in to comment.