Skip to content

Commit

Permalink
Merge pull request #2 from uber/leave
Browse files Browse the repository at this point in the history
Implement admin leave
  • Loading branch information
jwolski2 committed Jan 23, 2015
2 parents 468ad82 + f33c52a commit 28cf922
Show file tree
Hide file tree
Showing 10 changed files with 666 additions and 132 deletions.
207 changes: 110 additions & 97 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var TypedError = require('error/typed');
var AdminJoiner = require('./lib/swim').AdminJoiner;
var createRingPopTChannel = require('./lib/tchannel.js').createRingPopTChannel;
var Dissemination = require('./lib/members').Dissemination;
var Gossip = require('./lib/swim.js').Gossip;
var HashRing = require('./lib/ring');
var Membership = require('./lib/members').Membership;
var MemberIterator = require('./lib/members').MemberIterator;
Expand All @@ -36,10 +37,26 @@ var PingReqSender = require('./lib/swim').PingReqSender;
var PingSender = require('./lib/swim').PingSender;
var safeParse = require('./lib/util').safeParse;
var RequestProxy = require('./lib/request-proxy');
var Suspicion = require('./lib/swim.js').Suspicion;

var HOST_PORT_PATTERN = /^(\d+.\d+.\d+.\d+):\d+$/;
var MAX_JOIN_DURATION = 300000;

var AppRequiredError = TypedError({
type: 'ringpop.options-app.required',
message: 'Expected `options.app` to be a non-empty string.\n' +
'Must specify an app for ringpop to work.\n'
});

var HostPortRequiredError = TypedError({
type: 'ringpop.options-host-port.required',
message: 'Expected `options.hostPort` to be valid.\n' +
'Got {hostPort} which is not {reason}.\n' +
'Must specify a HOST:PORT string.\n',
hostPort: null,
reason: null
});

var InvalidJoinAppError = TypedError({
type: 'ringpop.invalid-join.app',
message: 'A node tried joining a different app cluster. The expected app' +
Expand All @@ -55,10 +72,9 @@ var InvalidJoinSourceError = TypedError({
actual: null
});

var AppRequiredError = TypedError({
type: 'ringpop.options-app.required',
message: 'Expected `options.app` to be a non-empty string.\n' +
'Must specify an app for ringpop to work.\n'
var InvalidLocalMemberError = TypedError({
type: 'ringpop.invalid-local-member',
message: 'Operation could not be performed because local member has not been added to membership'
});

var OptionsRequiredError = TypedError({
Expand All @@ -67,13 +83,9 @@ var OptionsRequiredError = TypedError({
'Must specify options for `RingPop({ ... })`.\n'
});

var HostPortRequiredError = TypedError({
type: 'ringpop.options-host-port.required',
message: 'Expected `options.hostPort` to be valid.\n' +
'Got {hostPort} which is not {reason}.\n' +
'Must specify a HOST:PORT string.\n',
hostPort: null,
reason: null
var RedundantLeaveError = TypedError({
type: 'ringpop.invalid-leave.redundant',
message: 'A node cannot leave its cluster when it has already left.'
});

function RingPop(options) {
Expand Down Expand Up @@ -114,7 +126,6 @@ function RingPop(options) {
this.bootstrapFile = options.bootstrapFile;

this.isReady = false;
this.isRunning = false;

this.debugFlags = {};
this.joinSize = 3; // join fanout
Expand All @@ -126,7 +137,6 @@ function RingPop(options) {
this.lastProtocolPeriod = Date.now();
this.lastProtocolRate = 0;
this.protocolPeriods = 0;
this.suspectPeriod = 5000;
this.maxJoinDuration = options.maxJoinDuration || MAX_JOIN_DURATION;

this.requestProxy = new RequestProxy(this);
Expand All @@ -135,16 +145,16 @@ function RingPop(options) {
this.membership = new Membership(this);
this.membership.on('updated', this.onMembershipUpdated.bind(this));
this.memberIterator = new MemberIterator(this);
this.gossip = new Gossip(this);
this.suspicion = new Suspicion(this);

this.timing = new metrics.Histogram();
this.timing.update(this.minProtocolPeriod);
this.clientRate = new metrics.Meter();
this.serverRate = new metrics.Meter();
this.totalRate = new metrics.Meter();

this.gossipTimer = null;
this.protocolRateTimer = null;
this.suspectTimers = {};

this.statHostPort = this.hostPort.replace(':', '_');
this.statPrefix = 'ringpop.' + this.statHostPort;
Expand All @@ -157,7 +167,8 @@ require('util').inherits(RingPop, EventEmitter);

RingPop.prototype.destroy = function destroy() {
this.destroyed = true;
clearTimeout(this.gossipTimer);
this.gossip.stop();
this.suspicion.stopAll();
clearInterval(this.protocolRateTimer);

this.clientRate.m1Rate.stop();
Expand All @@ -174,11 +185,6 @@ RingPop.prototype.destroy = function destroy() {
this.joiner.destroy();
}

Object.keys(this.suspectTimers)
.forEach(function clearSuspect(timerKey) {
clearTimeout(this.suspectTimers[timerKey]);
}, this);

if (this.channel) {
this.channel.quit();
}
Expand All @@ -188,7 +194,28 @@ RingPop.prototype.setupChannel = function setupChannel() {
createRingPopTChannel(this, this.channel);
};

RingPop.prototype.addLocalMember = function addLocalMember(info) {
this.membership.addMember({
address: this.hostPort,
incarnationNumber: info && info.incarnationNumber
});
};

RingPop.prototype.adminJoin = function adminJoin(target, callback) {
if (!this.membership.localMember) {
process.nextTick(function() {
callback(InvalidLocalMemberError());
});
return;
}

if (this.membership.localMember.status === 'leave') {
this.rejoin(function() {
callback(null, null, 'rejoined');
});
return;
}

if (this.joiner) {
this.joiner.destroy();
this.joiner = null;
Expand All @@ -203,6 +230,31 @@ RingPop.prototype.adminJoin = function adminJoin(target, callback) {
this.joiner.sendJoin();
};

RingPop.prototype.adminLeave = function adminLeave(callback) {
if (!this.membership.localMember) {
process.nextTick(function() {
callback(InvalidLocalMemberError());
});
return;
}

if (this.membership.localMember.status === 'leave') {
process.nextTick(function() {
callback(RedundantLeaveError());
});
return;
}

// TODO Explicitly infect other members (like admin join)?
this.membership.makeLeave();
this.gossip.stop();
this.suspicion.stopAll();

process.nextTick(function() {
callback(null, null, 'ok');
});
};

RingPop.prototype.bootstrap = function bootstrap(bootstrapFile, callback) {
if (typeof bootstrapFile === 'function') {
callback = bootstrapFile;
Expand Down Expand Up @@ -235,8 +287,7 @@ RingPop.prototype.bootstrap = function bootstrap(bootstrapFile, callback) {
this.checkForMissingBootstrapHost();
this.checkForHostnameIpMismatch();

// Add local member
this.membership.addMember({ address: this.hostPort });
this.addLocalMember();

this.adminJoin(function(err) {
if (err) {
Expand Down Expand Up @@ -264,9 +315,8 @@ RingPop.prototype.bootstrap = function bootstrap(bootstrapFile, callback) {
memberCount: self.membership.getMemberCount()
});

self.startProtocolPeriod();
self.gossip.start();
self.startProtocolRateTimer();

self.isReady = true;
self.emit('ready');

Expand All @@ -289,7 +339,6 @@ RingPop.prototype.checkForMissingBootstrapHost = function checkForMissingBootstr
return true;
};


RingPop.prototype.checkForHostnameIpMismatch = function checkForHostnameIpMismatch() {
var self = this;

Expand Down Expand Up @@ -356,26 +405,6 @@ RingPop.prototype.getStats = function getStats() {
};
};

RingPop.prototype.gossip = function gossip() {
var self = this;
var start = new Date();

if (this.destroyed) {
return;
}

function callback() {
self.stat('timing', 'protocol.frequency', start);
self.gossip();
}

var protocolDelay = this.computeProtocolDelay();
this.stat('timing', 'protocol.delay', protocolDelay);
this.gossipTimer = setTimeout(function () {
self.pingMemberNow(callback);
}, protocolDelay);
};

RingPop.prototype.handleTick = function handleTick(cb) {
var self = this;
this.pingMemberNow(function () {
Expand Down Expand Up @@ -483,11 +512,6 @@ RingPop.prototype.whoami = function whoami() {
return this.hostPort;
};

RingPop.prototype.clearSuspectTimeout = function clearSuspectTimeout(member) {
this.logger.debug('canceled suspect period member=' + member.address);
clearTimeout(this.suspectTimers[member.address]);
};

RingPop.prototype.computeProtocolDelay = function computeProtocolDelay() {
if (this.protocolPeriods) {
var target = this.lastProtocolPeriod + this.lastProtocolRate;
Expand All @@ -506,14 +530,14 @@ RingPop.prototype.onMembershipUpdated = function onMembershipUpdated(updates) {
var self = this;

var updateHandlers = {
'alive': function onAliveMember(member) {
alive: function onAliveMember(member) {
/* jshint camelcase: false */
self.stat('increment', 'membership-update.alive');
self.logger.info('member is alive', {
local: self.membership.localMember.address,
alive: member.address
});
self.clearSuspectTimeout(member);
self.suspicion.stop(member);
self.ring.addServer(member.address);
self.dissemination.addChange({
address: member.address,
Expand All @@ -522,14 +546,30 @@ RingPop.prototype.onMembershipUpdated = function onMembershipUpdated(updates) {
piggybackCount: 0
});
},
'faulty': function onFaultyMember(member) {
faulty: function onFaultyMember(member) {
/* jshint camelcase: false */
self.stat('increment', 'membership-update.faulty');
self.logger.warn('member is faulty', {
local: self.membership.localMember.address,
faulty: member.address
});
self.clearSuspectTimeout(member);
self.suspicion.stop(member);
self.ring.removeServer(member.address);
self.dissemination.addChange({
address: member.address,
status: member.status,
incarnationNumber: member.incarnationNumber,
piggybackCount: 0
});
},
leave: function onLeaveMember(member) {
/* jshint camelcase: false */
self.stat('increment', 'membership-update.leave');
self.logger.warn('member has left', {
local: self.membership.localMember.address,
leave: member.address
});
self.suspicion.stop(member);
self.ring.removeServer(member.address);
self.dissemination.addChange({
address: member.address,
Expand All @@ -538,7 +578,7 @@ RingPop.prototype.onMembershipUpdated = function onMembershipUpdated(updates) {
piggybackCount: 0
});
},
'new': function onNewMember(member) {
new: function onNewMember(member) {
/* jshint camelcase: false */
self.stat('increment', 'membership-update.new');
self.ring.addServer(member.address);
Expand All @@ -549,13 +589,13 @@ RingPop.prototype.onMembershipUpdated = function onMembershipUpdated(updates) {
piggybackCount: 0
});
},
'suspect': function onSuspectMember(member) {
suspect: function onSuspectMember(member) {
self.stat('increment', 'membership-update.suspect');
self.logger.warn('member is suspect', {
local: self.membership.localMember.address,
suspect: member.address
});
self.startSuspectPeriod(member);
self.suspicion.start(member);
self.dissemination.addChange({
address: member.address,
status: member.status,
Expand Down Expand Up @@ -649,6 +689,18 @@ RingPop.prototype.readHostsFile = function readHostsFile(file) {
}
};

RingPop.prototype.rejoin = function rejoin(callback) {
this.membership.makeAlive();
this.gossip.start();
this.suspicion.reenable();

// TODO Rejoin may eventually necessitate fan-out thus
// the need for the asynchronous-style callback.
process.nextTick(function() {
callback();
});
};

RingPop.prototype.seedBootstrapHosts = function seedBootstrapHosts(file) {
if (Array.isArray(file)) {
this.bootstrapHosts = file;
Expand Down Expand Up @@ -717,51 +769,12 @@ RingPop.prototype.setLogger = function setLogger(logger) {
};
};

RingPop.prototype.startProtocolPeriod = function startProtocolPeriod() {
if (this.isRunning) {
this.logger.warn('ringpop is already gossiping and will not' +
' start another protocol period.', { address: this.hostPort });
return;
}

this.isRunning = true;
this.membership.shuffle();
this.gossip();
this.logger.info('ringpop has started gossiping', { address: this.hostPort });
};

RingPop.prototype.startProtocolRateTimer = function startProtocolRateTimer() {
this.protocolRateTimer = setInterval(function () {
this.lastProtocolRate = this.protocolRate();
}.bind(this), 1000);
};

RingPop.prototype.startSuspectPeriod = function startSuspectPeriod(member) {
if (this.destroyed) {
return;
}

this.logger.debug('starting suspect period member=' + member.address);

// An existing suspect could exist in the event that a previously suspected
// member is still suspected, but overriden by a higher incarnation number.
// In that case, this function effectively renews and reissues a suspect
// period.
if (this.suspectTimers[member.address]) {
this.logger.debug('canceling existing suspect period suspect=' + member.address);
clearTimeout(this.suspectTimers[member.address]);
}

this.suspectTimers[member.address] = setTimeout(function() {
this.membership.update([{
address: member.address,
incarnationNumber: member.incarnationNumber,
status: 'faulty'
}]);
delete this.suspectTimers[member.address];
}.bind(this), this.suspectPeriod);
};

RingPop.prototype.stat = function stat(type, key, value) {
if (!this.statKeys[key]) {
this.statKeys[key] = this.statPrefix + '.' + key;
Expand Down
Loading

0 comments on commit 28cf922

Please sign in to comment.