Skip to content

Commit

Permalink
Ping-req to use only verified results from selected ping-req members to
Browse files Browse the repository at this point in the history
start suspect subprotocol
  • Loading branch information
jwolski committed Mar 20, 2015
1 parent af48df6 commit da73bc9
Show file tree
Hide file tree
Showing 8 changed files with 543 additions and 121 deletions.
95 changes: 18 additions & 77 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ var hammock = require('uber-hammock');
var metrics = require('metrics');

var Gossip = require('./lib/swim/gossip');
var PingReqSender = require('./lib/swim/ping-req-sender');
var PingSender = require('./lib/swim/ping-sender');
var sendPingReq = require('./lib/swim/ping-req-sender.js');
var Suspicion = require('./lib/swim/suspicion');

var createRingPopTChannel = require('./lib/tchannel.js').createRingPopTChannel;
Expand Down Expand Up @@ -248,7 +248,9 @@ RingPop.prototype.adminLeave = function adminLeave(callback) {
});
};

RingPop.prototype.bootstrap = function bootstrap(bootstrapFile, callback) {
RingPop.prototype.bootstrap = function bootstrap(opts, callback) {
var bootstrapFile = opts.bootstrapFile || opts;

if (typeof bootstrapFile === 'function') {
callback = bootstrapFile;
bootstrapFile = null;
Expand Down Expand Up @@ -469,35 +471,6 @@ RingPop.prototype.protocolPing = function protocolPing(options, callback) {
});
};

RingPop.prototype.protocolPingReq = function protocolPingReq(options, callback) {
this.stat('increment', 'ping-req.recv');

var source = options.source;
var target = options.target;
var changes = options.changes;
var checksum = options.checksum;

this.serverRate.mark();
this.totalRate.mark();
this.membership.update(changes);

var self = this;
this.debugLog('ping-req send ping source=' + source + ' target=' + target, 'p');
var start = new Date();
this.sendPing(target, function (isOk, body) {
self.stat('timing', 'ping-req-ping', start);
self.debugLog('ping-req recv ping source=' + source + ' target=' + target + ' isOk=' + isOk, 'p');
if (isOk) {
self.membership.update(body.changes);
}
callback(null, {
changes: self.issueMembershipChanges(checksum, source),
pingStatus: isOk,
target: target
});
});
};

RingPop.prototype.lookup = function lookup(key) {
this.stat('increment', 'lookup');
var dest = this.ring.lookup(key + '');
Expand Down Expand Up @@ -668,9 +641,20 @@ RingPop.prototype.pingMemberNow = function pingMemberNow(callback) {
return callback(new Error('destroyed whilst pinging'));
}

start = new Date();
self.sendPingReq(member, function() {
self.stat('timing', 'ping-req', start);
var pingReqStartTime = new Date();
// TODO The pinged member's status could have changed to
// faulty by the time we received and processed the ping
// response. There are no ill effects to membership state
// by sending a ping-req to the faulty member (and processing
// the response), though it does delay the protocol period
// unnecessarily. We may want to bypass the ping-req here
// if the member's status is faulty.
sendPingReq({
ringpop: self,
unreachableMember: member,
pingReqSize: self.pingReqSize
}, function onPingReq() {
self.stat('timing', 'ping-req', pingReqStartTime);
self.isPinging = false;

callback.apply(null, Array.prototype.splice.call(arguments, 0));
Expand Down Expand Up @@ -725,49 +709,6 @@ RingPop.prototype.sendPing = function sendPing(member, callback) {
return new PingSender(this, member, callback);
};

// TODO Exclude suspect memebers from ping-req as well?
RingPop.prototype.sendPingReq = function sendPingReq(unreachableMember, callback) {
this.stat('increment', 'ping-req.send');

var otherMembers = this.membership.getRandomPingableMembers(this.pingReqSize, [unreachableMember.address]);
var self = this;
var completed = 0;
var anySuccess = false;
function onComplete(err) {
anySuccess |= !err;

if (++completed === otherMembers.length) {
if (anySuccess) {
self.membership.makeAlive(unreachableMember.address);
self.logger.info('ringpop member knows member is alive', {
local: self.whoami(),
alive: unreachableMember.address
});
} else {
self.membership.makeSuspect(unreachableMember.address);
self.logger.info('ringpop member suspects member', {
local: self.whoami(),
suspect: unreachableMember.address
});
}

callback();
}
}

this.stat('timing', 'ping-req.other-members', otherMembers.length);

if (otherMembers.length > 0) {
otherMembers.forEach(function (member) {
self.debugLog('ping-req send peer=' + member.address +
' target=' + unreachableMember.address, 'p');
return new PingReqSender(self, member, unreachableMember, onComplete);
});
} else {
callback(new Error('No members to ping-req'));
}
};

RingPop.prototype.setDebugFlag = function setDebugFlag(flag) {
this.debugFlags[flag] = true;
};
Expand Down
53 changes: 53 additions & 0 deletions lib/swim/ping-req-recvr.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) 2015 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
'use strict';

module.exports = function recvPingReq(opts, callback) {
var ringpop = opts.ringpop;

ringpop.stat('increment', 'ping-req.recv');

var source = opts.source;
var target = opts.target;
var changes = opts.changes;
var checksum = opts.checksum;

ringpop.serverRate.mark();
ringpop.totalRate.mark();
ringpop.membership.update(changes);

ringpop.debugLog('ping-req send ping source=' + source + ' target=' + target, 'p');

var start = new Date();
ringpop.sendPing(target, function (isOk, body) {
ringpop.stat('timing', 'ping-req-ping', start);
ringpop.debugLog('ping-req recv ping source=' + source + ' target=' + target + ' isOk=' + isOk, 'p');

if (isOk) {
ringpop.membership.update(body.changes);
}

callback(null, {
changes: ringpop.issueMembershipChanges(checksum, source),
pingStatus: isOk,
target: target
});
});
};
Loading

0 comments on commit da73bc9

Please sign in to comment.