Skip to content

Commit

Permalink
Do the same with protocol and proxy/health handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
jwolski committed Aug 26, 2015
1 parent 0ccffcc commit 274c0ba
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 189 deletions.
11 changes: 5 additions & 6 deletions server/proxy-req-handler.js → server/health.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
// THE SOFTWARE.
'use strict';

module.exports = function handleProxyReq(opts, callback) {
var ringpop = opts.ringpop;
var header = opts.header;
var body = opts.body;

ringpop.requestProxy.handleRequest(header, body, callback);
module.exports = function createHealthHandler(/*ringpop*/) {
return function handleHealth(arg1, arg2, hostInfo, callback) {
callback(null, null, 'ok');
};
};

122 changes: 15 additions & 107 deletions server/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,27 @@
// THE SOFTWARE.
'use strict';

var handleJoin = require('./join-handler.js');
var handlePing = require('./ping-handler.js');
var handlePingReq = require('./ping-req-handler.js');
var handleProxyReq = require('./proxy-req-handler.js');
var safeParse = require('../lib/util').safeParse;

var commands = {
'/health': 'health',
'/protocol/join': 'protocolJoin',
'/protocol/ping': 'protocolPing',
'/protocol/ping-req': 'protocolPingReq',
'/proxy/req': 'proxyReq'
};

function RingPopTChannel(ringpop, tchannel) {
this.ringpop = ringpop;
this.tchannel = tchannel;

var self = this;
registerEndpointHandlers(require('./admin'));
registerEndpointHandlers(require('./protocol'));

// Register stragglers ;)
var createProxyReqHandler = require('./proxy-req.js');
registerEndpoint('/proxy/req', createProxyReqHandler(this.ringpop));

// Register admin endpoint handlers
var endpointHandlers = require('./admin');
Object.keys(endpointHandlers).forEach(function each(key) {
var endpointHandler = endpointHandlers[key];
registerEndpoint(endpointHandler.endpoint,
endpointHandler.handler(ringpop));
});
var createHealthHandler = require('./health.js');
registerEndpoint('/health', createHealthHandler());

// Register protocol and proxy endpoint handlers
Object.keys(commands).forEach(function each(url) {
registerEndpoint(url, self[commands[url]].bind(self));
});
function registerEndpointHandlers(endpointHandlers) {
Object.keys(endpointHandlers).forEach(function each(key) {
var endpointHandler = endpointHandlers[key];
registerEndpoint(endpointHandler.endpoint,
endpointHandler.handler(ringpop));
});
}

// Wraps endpoint handler so that it doesn't have to
// know TChannel req/res API.
Expand All @@ -70,87 +59,6 @@ function RingPopTChannel(ringpop, tchannel) {
}
}

RingPopTChannel.prototype.health = function (arg1, arg2, hostInfo, cb) {
cb(null, null, 'ok');
};

RingPopTChannel.prototype.protocolJoin = function (arg1, arg2, hostInfo, cb) {
var body = safeParse(arg2.toString());
if (body === null) {
return cb(new Error('need JSON req body with source and incarnationNumber'));
}

var app = body.app;
var source = body.source;
var incarnationNumber = body.incarnationNumber;
if (app === undefined || source === undefined || incarnationNumber === undefined) {
return cb(new Error('need req body with app, source and incarnationNumber'));
}

handleJoin({
ringpop: this.ringpop,
app: app,
source: source,
incarnationNumber: incarnationNumber
}, function(err, res) {
cb(err, null, JSON.stringify(res));
});
};

RingPopTChannel.prototype.protocolPing = function (arg1, arg2, hostInfo, cb) {
var body = safeParse(arg2);

// NOTE sourceIncarnationNumber is an optional argument. It was not present
// until after the v9.8.12 release.
if (body === null || !body.source || !body.changes || !body.checksum) {
return cb(new Error('need req body with source, changes, and checksum'));
}

handlePing({
ringpop: this.ringpop,
source: body.source,
sourceIncarnationNumber: body.sourceIncarnationNumber,
changes: body.changes,
checksum: body.checksum
}, function(err, res) {
cb(err, null, JSON.stringify(res));
});
};

RingPopTChannel.prototype.protocolPingReq = function protocolPingReq(arg1, arg2, hostInfo, cb) {
var body = safeParse(arg2);

// NOTE sourceIncarnationNumber is an optional argument. It was not present
// until after the v9.8.12 release.
if (body === null || !body.source || !body.target || !body.changes || !body.checksum) {
return cb(new Error('need req body with source, target, changes, and checksum'));
}

handlePingReq({
ringpop: this.ringpop,
source: body.source,
sourceIncarnationNumber: body.sourceIncarnationNumber,
target: body.target,
changes: body.changes,
checksum: body.checksum
}, function onHandled(err, result) {
cb(err, null, JSON.stringify(result));
});
};

RingPopTChannel.prototype.proxyReq = function (arg1, arg2, hostInfo, cb) {
var header = safeParse(arg1);
if (header === null) {
return cb(new Error('need header to exist'));
}

handleProxyReq({
ringpop: this.ringpop,
header: header,
body: arg2
}, cb);
};

function createServer(ringpop, tchannel) {
return new RingPopTChannel(ringpop, tchannel);
}
Expand Down
35 changes: 35 additions & 0 deletions server/protocol/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) 2015 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
'use strict';

module.exports = {
join: {
endpoint: '/protocol/join',
handler: require('./join.js')
},
ping: {
endpoint: '/protocol/ping',
handler: require('./ping.js')
},
pingReq: {
endpoint: '/protocol/ping-req',
handler: require('./ping-req.js')
}
};
57 changes: 35 additions & 22 deletions server/join-handler.js → server/protocol/join.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
// THE SOFTWARE.
'use strict';

var safeParse = require('../../lib/util').safeParse;
var TypedError = require('error/typed');

var DenyJoinError = TypedError({
Expand Down Expand Up @@ -73,26 +74,38 @@ function validateJoinerApp(ringpop, app, callback) {
return true;
}

module.exports = function handleJoin(opts, callback) {
var ringpop = opts.ringpop;

ringpop.stat('increment', 'join.recv');

if (!validateDenyingJoins(ringpop, callback) ||
!validateJoinerAddress(ringpop, opts.source, callback) ||
!validateJoinerApp(ringpop, opts.app, callback)) {
return;
}

ringpop.serverRate.mark();
ringpop.totalRate.mark();

ringpop.membership.makeAlive(opts.source, opts.incarnationNumber);

callback(null, {
app: ringpop.app,
coordinator: ringpop.whoami(),
membership: ringpop.dissemination.fullSync(),
membershipChecksum: ringpop.membership.checksum
});
module.exports = function createJoinHandler(ringpop) {
return function handleJoin(arg1, arg2, hostInfo, callback) {
var body = safeParse(arg2.toString());
if (body === null) {
return callback(new Error('need JSON req body with source and incarnationNumber'));
}

var app = body.app;
var source = body.source;
var incarnationNumber = body.incarnationNumber;
if (app === undefined || source === undefined || incarnationNumber === undefined) {
return callback(new Error('need req body with app, source and incarnationNumber'));
}

ringpop.stat('increment', 'join.recv');

if (!validateDenyingJoins(ringpop, callback) ||
!validateJoinerAddress(ringpop, source, callback) ||
!validateJoinerApp(ringpop, app, callback)) {
return;
}

ringpop.serverRate.mark();
ringpop.totalRate.mark();

ringpop.membership.makeAlive(source, incarnationNumber);

callback(null, null, JSON.stringify({
app: ringpop.app,
coordinator: ringpop.whoami(),
membership: ringpop.dissemination.fullSync(),
membershipChecksum: ringpop.membership.checksum
}));
};
};
69 changes: 69 additions & 0 deletions server/protocol/ping-req.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) 2015 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
'use strict';

var safeParse = require('../../lib/util').safeParse;
var sendPing = require('../../lib/swim/ping-sender.js');

module.exports = function createPingReqHandler(ringpop) {
return function handlePingReq(arg1, arg2, hostInfo, callback) {
ringpop.stat('increment', 'ping-req.recv');

var body = safeParse(arg2);

// NOTE sourceIncarnationNumber is an optional argument. It was not present
// until after the v9.8.12 release.
if (body === null || !body.source || !body.target || !body.changes || !body.checksum) {
return callback(new Error('need req body with source, target, changes, and checksum'));
}

var source = body.source;
var sourceIncarnationNumber = body.sourceIncarnationNumber;
var target = body.target;
var changes = body.changes;
var checksum = body.checksum;

ringpop.serverRate.mark();
ringpop.totalRate.mark();
ringpop.membership.update(changes);

ringpop.debugLog('ping-req send ping source=' + source + ' target=' + target, 'p');

var start = new Date();
sendPing({
ringpop: ringpop,
target: target
}, function (isOk, body) {
ringpop.stat('timing', 'ping-req-ping', start);
ringpop.debugLog('ping-req recv ping source=' + source + ' target=' + target + ' isOk=' + isOk, 'p');

if (isOk) {
ringpop.membership.update(body.changes);
}

callback(null, null, JSON.stringify({
changes: ringpop.dissemination.issueAsReceiver(source,
sourceIncarnationNumber, checksum),
pingStatus: isOk,
target: target
}));
});
};
};
49 changes: 20 additions & 29 deletions server/ping-req-handler.js → server/protocol/ping.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,42 +19,33 @@
// THE SOFTWARE.
'use strict';

var sendPing = require('../lib/swim/ping-sender.js');
var safeParse = require('../../lib/util').safeParse;

module.exports = function handlePingReq(opts, callback) {
var ringpop = opts.ringpop;
module.exports = function createPingHandler(ringpop) {
return function handlePing(arg1, arg2, hostInfo, callback) {
var body = safeParse(arg2);

ringpop.stat('increment', 'ping-req.recv');

var source = opts.source;
var sourceIncarnationNumber = opts.sourceIncarnationNumber;
var target = opts.target;
var changes = opts.changes;
var checksum = opts.checksum;
// NOTE sourceIncarnationNumber is an optional argument. It was not present
// until after the v9.8.12 release.
if (body === null || !body.source || !body.changes || !body.checksum) {
return callback(new Error('need req body with source, changes, and checksum'));
}

ringpop.serverRate.mark();
ringpop.totalRate.mark();
ringpop.membership.update(changes);
var source = body.source;
var sourceIncarnationNumber = body.sourceIncarnationNumber;
var changes = body.changes;
var checksum = body.checksum;

ringpop.debugLog('ping-req send ping source=' + source + ' target=' + target, 'p');
ringpop.stat('increment', 'ping.recv');

var start = new Date();
sendPing({
ringpop: ringpop,
target: target
}, function (isOk, body) {
ringpop.stat('timing', 'ping-req-ping', start);
ringpop.debugLog('ping-req recv ping source=' + source + ' target=' + target + ' isOk=' + isOk, 'p');
ringpop.serverRate.mark();
ringpop.totalRate.mark();

if (isOk) {
ringpop.membership.update(body.changes);
}
ringpop.membership.update(changes);

callback(null, {
callback(null, null, JSON.stringify({
changes: ringpop.dissemination.issueAsReceiver(source,
sourceIncarnationNumber, checksum),
pingStatus: isOk,
target: target
});
});
}));
};
};
Loading

0 comments on commit 274c0ba

Please sign in to comment.