From f33c52a58bbfeaf829902f865379e38dc85d08a5 Mon Sep 17 00:00:00 2001 From: Jeff Wolski Date: Mon, 19 Jan 2015 18:50:18 -0800 Subject: [PATCH] Implemented admin leave --- index.js | 207 +++++++++++++++++++++------------------- lib/members.js | 93 +++++++++++++----- lib/swim.js | 167 +++++++++++++++++++++++++++++++- lib/tchannel.js | 6 +- test/index_test.js | 98 +++++++++++++++++++ test/members_test.js | 30 ++++++ test/mock/index.js | 4 +- test/mock/membership.js | 10 ++ test/mock/noop.js | 20 ++++ test/swim_test.js | 163 ++++++++++++++++++++++++++++++- 10 files changed, 666 insertions(+), 132 deletions(-) create mode 100644 test/mock/noop.js diff --git a/index.js b/index.js index 816b881f..5887280d 100644 --- a/index.js +++ b/index.js @@ -28,6 +28,7 @@ var TypedError = require('error/typed'); var AdminJoiner = require('./lib/swim').AdminJoiner; var createRingPopTChannel = require('./lib/tchannel.js').createRingPopTChannel; var Dissemination = require('./lib/members').Dissemination; +var Gossip = require('./lib/swim.js').Gossip; var HashRing = require('./lib/ring'); var Membership = require('./lib/members').Membership; var MemberIterator = require('./lib/members').MemberIterator; @@ -36,10 +37,26 @@ var PingReqSender = require('./lib/swim').PingReqSender; var PingSender = require('./lib/swim').PingSender; var safeParse = require('./lib/util').safeParse; var RequestProxy = require('./lib/request-proxy'); +var Suspicion = require('./lib/swim.js').Suspicion; var HOST_PORT_PATTERN = /^(\d+.\d+.\d+.\d+):\d+$/; var MAX_JOIN_DURATION = 300000; +var AppRequiredError = TypedError({ + type: 'ringpop.options-app.required', + message: 'Expected `options.app` to be a non-empty string.\n' + + 'Must specify an app for ringpop to work.\n' +}); + +var HostPortRequiredError = TypedError({ + type: 'ringpop.options-host-port.required', + message: 'Expected `options.hostPort` to be valid.\n' + + 'Got {hostPort} which is not {reason}.\n' + + 'Must specify a HOST:PORT string.\n', + hostPort: null, + reason: null +}); + var InvalidJoinAppError = TypedError({ type: 'ringpop.invalid-join.app', message: 'A node tried joining a different app cluster. The expected app' + @@ -55,10 +72,9 @@ var InvalidJoinSourceError = TypedError({ actual: null }); -var AppRequiredError = TypedError({ - type: 'ringpop.options-app.required', - message: 'Expected `options.app` to be a non-empty string.\n' + - 'Must specify an app for ringpop to work.\n' +var InvalidLocalMemberError = TypedError({ + type: 'ringpop.invalid-local-member', + message: 'Operation could not be performed because local member has not been added to membership' }); var OptionsRequiredError = TypedError({ @@ -67,13 +83,9 @@ var OptionsRequiredError = TypedError({ 'Must specify options for `RingPop({ ... })`.\n' }); -var HostPortRequiredError = TypedError({ - type: 'ringpop.options-host-port.required', - message: 'Expected `options.hostPort` to be valid.\n' + - 'Got {hostPort} which is not {reason}.\n' + - 'Must specify a HOST:PORT string.\n', - hostPort: null, - reason: null +var RedundantLeaveError = TypedError({ + type: 'ringpop.invalid-leave.redundant', + message: 'A node cannot leave its cluster when it has already left.' }); function RingPop(options) { @@ -114,7 +126,6 @@ function RingPop(options) { this.bootstrapFile = options.bootstrapFile; this.isReady = false; - this.isRunning = false; this.debugFlags = {}; this.joinSize = 3; // join fanout @@ -126,7 +137,6 @@ function RingPop(options) { this.lastProtocolPeriod = Date.now(); this.lastProtocolRate = 0; this.protocolPeriods = 0; - this.suspectPeriod = 5000; this.maxJoinDuration = options.maxJoinDuration || MAX_JOIN_DURATION; this.requestProxy = new RequestProxy(this); @@ -135,6 +145,8 @@ function RingPop(options) { this.membership = new Membership(this); this.membership.on('updated', this.onMembershipUpdated.bind(this)); this.memberIterator = new MemberIterator(this); + this.gossip = new Gossip(this); + this.suspicion = new Suspicion(this); this.timing = new metrics.Histogram(); this.timing.update(this.minProtocolPeriod); @@ -142,9 +154,7 @@ function RingPop(options) { this.serverRate = new metrics.Meter(); this.totalRate = new metrics.Meter(); - this.gossipTimer = null; this.protocolRateTimer = null; - this.suspectTimers = {}; this.statHostPort = this.hostPort.replace(':', '_'); this.statPrefix = 'ringpop.' + this.statHostPort; @@ -157,7 +167,8 @@ require('util').inherits(RingPop, EventEmitter); RingPop.prototype.destroy = function destroy() { this.destroyed = true; - clearTimeout(this.gossipTimer); + this.gossip.stop(); + this.suspicion.stopAll(); clearInterval(this.protocolRateTimer); this.clientRate.m1Rate.stop(); @@ -174,11 +185,6 @@ RingPop.prototype.destroy = function destroy() { this.joiner.destroy(); } - Object.keys(this.suspectTimers) - .forEach(function clearSuspect(timerKey) { - clearTimeout(this.suspectTimers[timerKey]); - }, this); - if (this.channel) { this.channel.quit(); } @@ -188,7 +194,28 @@ RingPop.prototype.setupChannel = function setupChannel() { createRingPopTChannel(this, this.channel); }; +RingPop.prototype.addLocalMember = function addLocalMember(info) { + this.membership.addMember({ + address: this.hostPort, + incarnationNumber: info && info.incarnationNumber + }); +}; + RingPop.prototype.adminJoin = function adminJoin(target, callback) { + if (!this.membership.localMember) { + process.nextTick(function() { + callback(InvalidLocalMemberError()); + }); + return; + } + + if (this.membership.localMember.status === 'leave') { + this.rejoin(function() { + callback(null, null, 'rejoined'); + }); + return; + } + if (this.joiner) { this.joiner.destroy(); this.joiner = null; @@ -203,6 +230,31 @@ RingPop.prototype.adminJoin = function adminJoin(target, callback) { this.joiner.sendJoin(); }; +RingPop.prototype.adminLeave = function adminLeave(callback) { + if (!this.membership.localMember) { + process.nextTick(function() { + callback(InvalidLocalMemberError()); + }); + return; + } + + if (this.membership.localMember.status === 'leave') { + process.nextTick(function() { + callback(RedundantLeaveError()); + }); + return; + } + + // TODO Explicitly infect other members (like admin join)? + this.membership.makeLeave(); + this.gossip.stop(); + this.suspicion.stopAll(); + + process.nextTick(function() { + callback(null, null, 'ok'); + }); +}; + RingPop.prototype.bootstrap = function bootstrap(bootstrapFile, callback) { if (typeof bootstrapFile === 'function') { callback = bootstrapFile; @@ -235,8 +287,7 @@ RingPop.prototype.bootstrap = function bootstrap(bootstrapFile, callback) { this.checkForMissingBootstrapHost(); this.checkForHostnameIpMismatch(); - // Add local member - this.membership.addMember({ address: this.hostPort }); + this.addLocalMember(); this.adminJoin(function(err) { if (err) { @@ -264,9 +315,8 @@ RingPop.prototype.bootstrap = function bootstrap(bootstrapFile, callback) { memberCount: self.membership.getMemberCount() }); - self.startProtocolPeriod(); + self.gossip.start(); self.startProtocolRateTimer(); - self.isReady = true; self.emit('ready'); @@ -289,7 +339,6 @@ RingPop.prototype.checkForMissingBootstrapHost = function checkForMissingBootstr return true; }; - RingPop.prototype.checkForHostnameIpMismatch = function checkForHostnameIpMismatch() { var self = this; @@ -356,26 +405,6 @@ RingPop.prototype.getStats = function getStats() { }; }; -RingPop.prototype.gossip = function gossip() { - var self = this; - var start = new Date(); - - if (this.destroyed) { - return; - } - - function callback() { - self.stat('timing', 'protocol.frequency', start); - self.gossip(); - } - - var protocolDelay = this.computeProtocolDelay(); - this.stat('timing', 'protocol.delay', protocolDelay); - this.gossipTimer = setTimeout(function () { - self.pingMemberNow(callback); - }, protocolDelay); -}; - RingPop.prototype.handleTick = function handleTick(cb) { var self = this; this.pingMemberNow(function () { @@ -483,11 +512,6 @@ RingPop.prototype.whoami = function whoami() { return this.hostPort; }; -RingPop.prototype.clearSuspectTimeout = function clearSuspectTimeout(member) { - this.logger.debug('canceled suspect period member=' + member.address); - clearTimeout(this.suspectTimers[member.address]); -}; - RingPop.prototype.computeProtocolDelay = function computeProtocolDelay() { if (this.protocolPeriods) { var target = this.lastProtocolPeriod + this.lastProtocolRate; @@ -506,14 +530,14 @@ RingPop.prototype.onMembershipUpdated = function onMembershipUpdated(updates) { var self = this; var updateHandlers = { - 'alive': function onAliveMember(member) { + alive: function onAliveMember(member) { /* jshint camelcase: false */ self.stat('increment', 'membership-update.alive'); self.logger.info('member is alive', { local: self.membership.localMember.address, alive: member.address }); - self.clearSuspectTimeout(member); + self.suspicion.stop(member); self.ring.addServer(member.address); self.dissemination.addChange({ address: member.address, @@ -522,14 +546,30 @@ RingPop.prototype.onMembershipUpdated = function onMembershipUpdated(updates) { piggybackCount: 0 }); }, - 'faulty': function onFaultyMember(member) { + faulty: function onFaultyMember(member) { /* jshint camelcase: false */ self.stat('increment', 'membership-update.faulty'); self.logger.warn('member is faulty', { local: self.membership.localMember.address, faulty: member.address }); - self.clearSuspectTimeout(member); + self.suspicion.stop(member); + self.ring.removeServer(member.address); + self.dissemination.addChange({ + address: member.address, + status: member.status, + incarnationNumber: member.incarnationNumber, + piggybackCount: 0 + }); + }, + leave: function onLeaveMember(member) { + /* jshint camelcase: false */ + self.stat('increment', 'membership-update.leave'); + self.logger.warn('member has left', { + local: self.membership.localMember.address, + leave: member.address + }); + self.suspicion.stop(member); self.ring.removeServer(member.address); self.dissemination.addChange({ address: member.address, @@ -538,7 +578,7 @@ RingPop.prototype.onMembershipUpdated = function onMembershipUpdated(updates) { piggybackCount: 0 }); }, - 'new': function onNewMember(member) { + new: function onNewMember(member) { /* jshint camelcase: false */ self.stat('increment', 'membership-update.new'); self.ring.addServer(member.address); @@ -549,13 +589,13 @@ RingPop.prototype.onMembershipUpdated = function onMembershipUpdated(updates) { piggybackCount: 0 }); }, - 'suspect': function onSuspectMember(member) { + suspect: function onSuspectMember(member) { self.stat('increment', 'membership-update.suspect'); self.logger.warn('member is suspect', { local: self.membership.localMember.address, suspect: member.address }); - self.startSuspectPeriod(member); + self.suspicion.start(member); self.dissemination.addChange({ address: member.address, status: member.status, @@ -649,6 +689,18 @@ RingPop.prototype.readHostsFile = function readHostsFile(file) { } }; +RingPop.prototype.rejoin = function rejoin(callback) { + this.membership.makeAlive(); + this.gossip.start(); + this.suspicion.reenable(); + + // TODO Rejoin may eventually necessitate fan-out thus + // the need for the asynchronous-style callback. + process.nextTick(function() { + callback(); + }); +}; + RingPop.prototype.seedBootstrapHosts = function seedBootstrapHosts(file) { if (Array.isArray(file)) { this.bootstrapHosts = file; @@ -717,51 +769,12 @@ RingPop.prototype.setLogger = function setLogger(logger) { }; }; -RingPop.prototype.startProtocolPeriod = function startProtocolPeriod() { - if (this.isRunning) { - this.logger.warn('ringpop is already gossiping and will not' + - ' start another protocol period.', { address: this.hostPort }); - return; - } - - this.isRunning = true; - this.membership.shuffle(); - this.gossip(); - this.logger.info('ringpop has started gossiping', { address: this.hostPort }); -}; - RingPop.prototype.startProtocolRateTimer = function startProtocolRateTimer() { this.protocolRateTimer = setInterval(function () { this.lastProtocolRate = this.protocolRate(); }.bind(this), 1000); }; -RingPop.prototype.startSuspectPeriod = function startSuspectPeriod(member) { - if (this.destroyed) { - return; - } - - this.logger.debug('starting suspect period member=' + member.address); - - // An existing suspect could exist in the event that a previously suspected - // member is still suspected, but overriden by a higher incarnation number. - // In that case, this function effectively renews and reissues a suspect - // period. - if (this.suspectTimers[member.address]) { - this.logger.debug('canceling existing suspect period suspect=' + member.address); - clearTimeout(this.suspectTimers[member.address]); - } - - this.suspectTimers[member.address] = setTimeout(function() { - this.membership.update([{ - address: member.address, - incarnationNumber: member.incarnationNumber, - status: 'faulty' - }]); - delete this.suspectTimers[member.address]; - }.bind(this), this.suspectPeriod); -}; - RingPop.prototype.stat = function stat(type, key, value) { if (!this.statKeys[key]) { this.statKeys[key] = this.statPrefix + '.' + key; diff --git a/lib/members.js b/lib/members.js index e8e0230e..9d84f3cf 100644 --- a/lib/members.js +++ b/lib/members.js @@ -23,6 +23,12 @@ var EventEmitter = require('events').EventEmitter; var util = require('util'); var LOG_10 = Math.log(10); +var MEMBER_STATUSES = { + alive: true, + faulty: true, + leave: true, + suspect: true +}; function Dissemination(ringpop) { this.ringpop = ringpop; @@ -140,11 +146,36 @@ function Membership(ringpop) { util.inherits(Membership, EventEmitter); +Membership.evalOverride = function evalOverride(member, change) { + if (Membership.isLocalSuspectOverride(member, change) || Membership.isLocalFaultyOverride(member, change)) { + // Local node should never allow itself to become suspect or faulty. In response, + // it affirms its "aliveness" and bumps its incarnation number. + member.status = 'alive'; + member.incarnationNumber = +new Date(); + return _.extend(member, { type: 'alive' }); + } else if (Membership.isAliveOverride(member, change)) { + member.status = 'alive'; + member.incarnationNumber = change.incarnationNumber || member.incarnationNumber; + return _.extend(member, { type: 'alive' }); + } else if (Membership.isSuspectOverride(member, change)) { + member.status = 'suspect'; + member.incarnationNumber = change.incarnationNumber || member.incarnationNumber; + return _.extend(member, { type: 'suspect' }); + } else if (Membership.isFaultyOverride(member, change)) { + member.status = 'faulty'; + member.incarnationNumber = change.incarnationNumber || member.incarnationNumber; + return _.extend(member, { type: 'faulty' }); + } else if (Membership.isLeaveOverride(member, change)) { + member.status = 'leave'; + member.incarnationNumber = change.incarnationNumber || member.incarnationNumber; + return _.extend(member, { type: 'leave' }); + } +}; + Membership.isAliveOverride = function isAliveOverride(member, change) { return change.status === 'alive' && - ((member.status === 'suspect' && change.incarnationNumber > member.incarnationNumber) || - (member.status === 'faulty' && change.incarnationNumber > member.incarnationNumber) || - (member.status === 'alive' && change.incarnationNumber > member.incarnationNumber)); + MEMBER_STATUSES[member.status] && + change.incarnationNumber > member.incarnationNumber; }; Membership.isFaultyOverride = function isFaultyOverride(member, change) { @@ -154,6 +185,12 @@ Membership.isFaultyOverride = function isFaultyOverride(member, change) { (member.status === 'alive' && change.incarnationNumber > member.incarnationNumber)); }; +Membership.isLeaveOverride = function isLeaveOverride(member, change) { + return change.status === 'leave' && + MEMBER_STATUSES[member.status] && + change.incarnationNumber > member.incarnationNumber; +}; + Membership.isLocalFaultyOverride = function isLocalFaultyOverride(member, change) { return member.isLocal && change.status === 'faulty'; }; @@ -220,10 +257,6 @@ Membership.prototype.computeChecksum = function computeChecksum() { return this.checksum; }; -Membership.prototype.hasMember = function hasMember(member) { - return !!this.findMemberByAddress(member.address); -}; - Membership.prototype.findMemberByAddress = function findMemberByAddress(address) { return _.find(this.members, function(member) { return member.address === address; @@ -246,6 +279,10 @@ Membership.prototype.getJoinPosition = function getJoinPosition() { return Math.floor(Math.random() * (this.members.length - 0)) + 0; }; +Membership.prototype.getLocalMemberAddress = function getLocalMemberAddress() { + return this.localMember && this.localMember.address; +}; + Membership.prototype.getMemberAt = function getMemberAt(index) { return this.members[index]; }; @@ -283,6 +320,26 @@ Membership.prototype.getStats = function getStats() { }; }; +Membership.prototype.hasMember = function hasMember(member) { + return !!this.findMemberByAddress(member.address); +}; + +Membership.prototype.makeAlive = function makeAlive() { + this.update([{ + address: this.localMember.address, + status: 'alive', + incarnationNumber: +new Date() + }]); +}; + +Membership.prototype.makeLeave = function makeLeave() { + this.update([{ + address: this.localMember.address, + status: 'leave', + incarnationNumber: +new Date() + }]); +}; + Membership.prototype.shuffle = function shuffle() { this.members = _.shuffle(this.members); }; @@ -306,24 +363,10 @@ Membership.prototype.update = function update(changes) { var member = this.findMemberByAddress(change.address); if (member) { - if (Membership.isLocalSuspectOverride(member, change) || Membership.isLocalFaultyOverride(member, change)) { - // Local node should never allow itself to become suspect or faulty. In response, - // it affirms its "aliveness" and bumps its incarnation number. - member.status = 'alive'; - member.incarnationNumber = +new Date(); - updates.push(_.extend(member, { type: 'alive' })); - } else if (Membership.isAliveOverride(member, change)) { - member.status = 'alive'; - member.incarnationNumber = change.incarnationNumber || member.incarnationNumber; - updates.push(_.extend(member, { type: 'alive' })); - } else if (Membership.isSuspectOverride(member, change)) { - member.status = 'suspect'; - member.incarnationNumber = change.incarnationNumber || member.incarnationNumber; - updates.push(_.extend(member, { type: 'suspect' })); - } else if (Membership.isFaultyOverride(member, change)) { - member.status = 'faulty'; - member.incarnationNumber = change.incarnationNumber || member.incarnationNumber; - updates.push(_.extend(member, { type: 'faulty' })); + var update = Membership.evalOverride(member, change); + + if (update) { + updates.push(update); } } else { member = { diff --git a/lib/swim.js b/lib/swim.js index cea748fc..3beca9c9 100644 --- a/lib/swim.js +++ b/lib/swim.js @@ -17,6 +17,9 @@ // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +var clearTimeout = require('timers').clearTimeout; +var globalSetTimeout = require('timers').setTimeout; +var safeParse = require('./util').safeParse; var TypedError = require('error/typed'); var NoHostsError = TypedError({ @@ -26,8 +29,6 @@ var NoHostsError = TypedError({ bootstrapHosts: null }); -var safeParse = require('./util').safeParse; - function AdminJoiner(params) { this.ringpop = params.ringpop; this.target = params.target; @@ -171,7 +172,7 @@ AdminJoiner.prototype.rejoin = function rejoin() { AdminJoiner.prototype.destroy = function destroy() { clearTimeout(this.timer); -} +}; function PingReqSender(ring, member, target, callback) { this.ring = ring; @@ -264,8 +265,166 @@ PingSender.prototype.doCallback = function doCallback(isOk, bodyObj) { } }; +function Gossip(ringpop) { + this.ringpop = ringpop; + this.isStopped = true; + this.timer = null; +} + +Gossip.prototype.run = function run() { + var self = this; + + var protocolDelay = this.ringpop.computeProtocolDelay(); + this.ringpop.stat('timing', 'protocol.delay', protocolDelay); + + var startTime = new Date(); + this.timer = setTimeout(function() { + self.ringpop.pingMemberNow(function() { + self.ringpop.stat('timing', 'protocol.frequency', startTime); + + if (self.isStopped) { + self.ringpop.logger.debug('stopped recurring gossip loop', { + local: self.ringpop.membership.getLocalMemberAddress() + }); + return; + } + + self.run(); + }); + }, protocolDelay); +}; + +Gossip.prototype.start = function start() { + if (!this.isStopped) { + this.ringpop.logger.debug('gossip has already started', { + local: this.ringpop.membership.getLocalMemberAddress() + }); + return; + } + + this.ringpop.membership.shuffle(); + this.run(); + this.isStopped = false; + + this.ringpop.logger.debug('started gossip protocol', { + local: this.ringpop.membership.getLocalMemberAddress() + }); +}; + +Gossip.prototype.stop = function stop() { + if (this.isStopped) { + this.ringpop.logger.warn('gossip is already stopped', { + local: this.ringpop.membership.getLocalMemberAddress() + }); + return; + } + + clearTimeout(this.timer); + this.timer = null; + this.isStopped = true; + + this.ringpop.logger.debug('stopped gossip protocol', { + local: this.ringpop.membership.getLocalMemberAddress() + }); +}; + +function Suspicion(ringpop) { + this.ringpop = ringpop; + this.isStoppedAll = null; + this.timers = {}; + this.period = 5000; +} + +Suspicion.prototype.reenable = function reenable() { + if (this.isStoppedAll !== true) { + this.ringpop.logger.warn('cannot reenable suspicion protocol because it was never disabled', { + local: this.ringpop.membership.getLocalMemberAddress() + }); + return; + } + + this.isStoppedAll = null; + + this.ringpop.logger.debug('reenabled suspicion protocol', { + local: this.ringpop.membership.getLocalMemberAddress() + }); +}; + +Suspicion.prototype.setTimeout = function setTimeout(fn) { + return globalSetTimeout(fn, this.period); +}; + +Suspicion.prototype.start = function start(member) { + if (this.isStoppedAll === true) { + this.ringpop.logger.debug('cannot start a suspect period because suspicion has not been reenabled', { + local: this.ringpop.membership.getLocalMemberAddress() + }); + return; + } + + if (member.address === this.ringpop.membership.getLocalMemberAddress()) { + this.ringpop.logger.debug('cannot start a suspect period for the local member', { + local: this.ringpop.membership.getLocalMemberAddress(), + suspect: member.address + }); + return; + } + + if (this.timers[member.address]) { + this.stop(member); + } + + this.timers[member.address] = this.setTimeout(function() { + this.ringpop.membership.update([{ + address: member.address, + incarnationNumber: member.incarnationNumber, + status: 'faulty' + }]); + }.bind(this)); + + this.ringpop.logger.debug('started suspect period', { + local: this.ringpop.membership.getLocalMemberAddress(), + suspect: member.address + }); +}; + +Suspicion.prototype.stop = function stop(member) { + clearTimeout(this.timers[member.address]); + delete this.timers[member.address]; + + this.ringpop.logger.debug('stopped members suspect timer', { + local: this.ringpop.membership.getLocalMemberAddress(), + suspect: member.address + }); +}; + +Suspicion.prototype.stopAll = function stopAll() { + this.isStoppedAll = true; + + var timerKeys = Object.keys(this.timers); + + if (timerKeys.length === 0) { + this.ringpop.logger.debug('stopped no suspect timers', { + local: this.ringpop.membership.getLocalMemberAddress() + }); + return; + } + + timerKeys.forEach(function clearSuspect(timerKey) { + clearTimeout(this.timers[timerKey]); + delete this.timers[timerKey]; + }, this); + + this.ringpop.logger.debug('stopped all suspect timers', { + local: this.ringpop.membership.getLocalMemberAddress(), + numTimers: timerKeys.length + }); +}; + module.exports = { AdminJoiner: AdminJoiner, + Gossip: Gossip, PingReqSender: PingReqSender, - PingSender: PingSender + PingSender: PingSender, + Suspicion: Suspicion }; diff --git a/lib/tchannel.js b/lib/tchannel.js index 2f59c0a2..1d1f906f 100644 --- a/lib/tchannel.js +++ b/lib/tchannel.js @@ -79,16 +79,16 @@ RingPopTChannel.prototype.adminDebugClear = function (arg1, arg2, hostInfo, cb) }; RingPopTChannel.prototype.adminGossip = function (arg1, arg2, hostInfo, cb) { - this.ringPop.startProtocolPeriod(); + this.ringPop.gossip.start(); cb(null, null, 'ok'); }; RingPopTChannel.prototype.adminLeave = function (arg1, arg2, hostInfo, cb) { - cb(new Error('not implemented')); + this.ringPop.adminLeave(cb); }; RingPopTChannel.prototype.adminJoin = function (arg1, arg2, hostInfo, cb) { - var body = safeParse(arg2.toString()); + var body = safeParse(arg2.toString()) || {}; if (body) { this.ringPop.adminJoin(body.target, function (err, candidateHosts) { if (err) { diff --git a/test/index_test.js b/test/index_test.js index dd4a324f..bb08f4b2 100644 --- a/test/index_test.js +++ b/test/index_test.js @@ -53,6 +53,104 @@ test('key hashes to only server', function t(assert) { assert.end(); }); +test('admin join rejoins if member has previously left', function t(assert) { + assert.plan(3); + + var ringpop = new RingPop({ app: 'ringpop', hostPort: '127.0.0.1:3000' }); + ringpop.addLocalMember({ incarnationNumber: 1 }); + ringpop.adminLeave(function(err, res1, res2) { + assert.equals(res2, 'ok', 'node left cluster'); + + ringpop.membership.localMember.incarnationNumber = 2; + ringpop.adminJoin(null, function(err, res1, res2) { + assert.equals(res2, 'rejoined', 'node rejoined cluster'); + assert.equals(ringpop.membership.localMember.status, 'alive', 'local member is alive'); + + ringpop.destroy(); + assert.end(); + }); + }); +}); + +test('admin join cannot be performed before local member is added to membership', function t(assert) { + assert.plan(2); + + var ringpop = new RingPop({ app: 'ringpop', hostPort: '127.0.0.1:3000' }); + ringpop.adminJoin(null, function(err) { + assert.ok(err, 'an error occurred'); + assert.equals(err.type, 'ringpop.invalid-local-member', 'invalid local member error'); + assert.end(); + }); +}); + +test('admin leave prevents redundant leave', function t(assert) { + assert.plan(2); + + var ringpop = new RingPop({ app: 'ringpop', hostPort: '127.0.0.1:3000' }); + ringpop.addLocalMember({ incarnationNumber: 1 }); + ringpop.membership.makeLeave(); + ringpop.adminLeave(function(err) { + assert.ok(err, 'an error occurred'); + assert.equals(err.type, 'ringpop.invalid-leave.redundant', 'cannot leave cluster twice'); + assert.end(); + }); +}); + +test('admin leave makes local member leave', function t(assert) { + assert.plan(3); + + var ringpop = new RingPop({ app: 'ringpop', hostPort: '127.0.0.1:3000' }); + ringpop.addLocalMember({ incarnationNumber: 1 }); + ringpop.adminLeave(function(err, _, res2) { + assert.notok(err, 'an error did not occur'); + assert.ok('leave', ringpop.membership.localMember.status, 'local member has correct status'); + assert.equals('ok', res2, 'admin leave was successful'); + assert.end(); + }); +}); + +test('admin leave stops gossip', function t(assert) { + assert.plan(2); + + var ringpop = new RingPop({ app: 'ringpop', hostPort: '127.0.0.1:3000' }); + ringpop.addLocalMember({ incarnationNumber: 1 }); + ringpop.gossip.start(); + ringpop.adminLeave(function(err) { + assert.notok(err, 'an error did not occur'); + assert.equals(true, ringpop.gossip.isStopped, 'gossip is stopped'); + assert.end(); + }); +}); + +test('admin leave stops suspicion subprotocol', function t(assert) { + assert.plan(2); + + var local = '127.0.0.1:3000'; + var remote = { address: '127.0.0.2:3000' }; + var ringpop = new RingPop({ app: 'ringpop', hostPort: local }); + ringpop.addLocalMember({ incarnationNumber: 1 }); + ringpop.membership.addMember(remote); + ringpop.suspicion.start(remote); + + ringpop.adminLeave(function(err) { + assert.notok(err, 'an error did not occur'); + assert.equals(true, ringpop.suspicion.isStoppedAll, 'suspicion subprotocol is stopped'); + assert.end(); + }); +}); + +test('admin leave cannot be attempted before local member is added', function t(assert) { + assert.plan(2); + + var ringpop = new RingPop({ app: 'ringpop', hostPort: '127.0.0.1:3000' }); + + ringpop.adminLeave(function(err) { + assert.ok(err, 'an error occurred'); + assert.equals(err.type, 'ringpop.invalid-local-member', 'an invalid leave occurred'); + assert.end(); + }); +}); + test('protocol join disallows joining itself', function t(assert) { assert.plan(2); diff --git a/test/members_test.js b/test/members_test.js index acd45928..f6680f7e 100644 --- a/test/members_test.js +++ b/test/members_test.js @@ -33,3 +33,33 @@ test('checksum is changed when membership is updated', function t(assert) { assert.doesNotEqual(membership.checksum, prevChecksum, 'checksum is changed'); assert.end(); }); + +test('change with higher incarnation number results in leave override', function t(assert) { + var member = { status: 'alive', incarnationNumber: 1 }; + var change = { status: 'leave', incarnationNumber: 2 }; + + var update = Membership.evalOverride(member, change); + + assert.equals(update.type, 'leave', 'results in leave'); + assert.end(); +}); + +test('change with same incarnation number does not result in leave override', function t(assert) { + var member = { status: 'alive', incarnationNumber: 1 }; + var change = { status: 'leave', incarnationNumber: 1 }; + + var update = Membership.evalOverride(member, change); + + assert.notok(update, 'no override'); + assert.end(); +}); + +test('change with lower incarnation number does not result in leave override', function t(assert) { + var member = { status: 'alive', incarnationNumber: 1 }; + var change = { status: 'leave', incarnationNumber: 0 }; + + var update = Membership.evalOverride(member, change); + + assert.notok(update, 'no override'); + assert.end(); +}); diff --git a/test/mock/index.js b/test/mock/index.js index 5abd16fc..54bfb692 100644 --- a/test/mock/index.js +++ b/test/mock/index.js @@ -17,9 +17,9 @@ // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. - module.exports = { channel: require('./channel.js'), logger: require('./logger.js'), - membership: require('./membership.js') + membership: require('./membership.js'), + noop: require('./noop.js') }; diff --git a/test/mock/membership.js b/test/mock/membership.js index 8f9cefd7..6c86866c 100644 --- a/test/mock/membership.js +++ b/test/mock/membership.js @@ -25,5 +25,15 @@ module.exports = { address: '127.0.0.1:3000', incarnationNumber: 123456789 }, + remoteMember: { + address: '127.0.0.1:3001', + incarnationNumber: 123456789 + }, + remoteMember2: { + address: '127.0.0.1:3002', + incarnationNumber: 123456789 + }, + getLocalMemberAddress: noop, + shuffle: noop, update: noop }; diff --git a/test/mock/noop.js b/test/mock/noop.js new file mode 100644 index 00000000..a8cc53f6 --- /dev/null +++ b/test/mock/noop.js @@ -0,0 +1,20 @@ +// Copyright (c) 2015 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. +module.exports = function() {}; diff --git a/test/swim_test.js b/test/swim_test.js index 4d62f97a..a708201d 100644 --- a/test/swim_test.js +++ b/test/swim_test.js @@ -18,10 +18,30 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -var AdminJoiner = require('../lib/swim.js').AdminJoiner; var mock = require('./mock'); var test = require('tape'); +var AdminJoiner = require('../lib/swim.js').AdminJoiner; +var Gossip = require('../lib/swim.js').Gossip; +var Suspicion = require('../lib/swim.js').Suspicion; + +function createRingpop() { + return { + computeProtocolDelay: mock.noop, + logger: mock.logger, + membership: mock.membership, + stat: mock.noop + }; +} + +function createGossip() { + return new Gossip(createRingpop()); +} + +function createSuspicion() { + return new Suspicion(createRingpop()); +} + test('join is aborted when max join duration is exceeded', function t(assert) { assert.plan(2); @@ -43,3 +63,144 @@ test('join is aborted when max join duration is exceeded', function t(assert) { assert.end(); }); + +test('starting and stopping gossip sets timer / unsets timer', function t(assert) { + var gossip = createGossip(); + + gossip.start(); + assert.ok(gossip.timer, 'timer was set'); + + gossip.stop(); + assert.notok(gossip.timer, 'timer was cleared'); + + assert.end(); +}); + +test('stopping gossip is a noop if gossip was never started', function t(assert) { + var gossip = createGossip(); + gossip.timer = 'nochange'; + + gossip.stop(); + assert.equals(gossip.timer, 'nochange', 'timer was not cleared'); + assert.equals(gossip.isStopped, true, 'gossip was not stopped'); + + assert.end(); +}); + +test('gossip can be restarted', function t(assert) { + var gossip = createGossip(); + gossip.start(); + + gossip.stop(); + assert.equals(gossip.timer, null, 'timer was cleared'); + assert.equals(gossip.isStopped, true, 'gossip was stopped'); + + gossip.start(); + assert.ok(gossip.timer, 'timer was set'); + assert.equals(gossip.isStopped, false, 'gossip was started'); + + gossip.stop(); // Cleanup + assert.end(); +}); + +test('suspect period for member is started', function t(assert) { + var member = { address: '127.0.0.1:3000' }; + var suspicion = createSuspicion(); + + suspicion.start(member); + assert.ok(suspicion.timers[member.address], 'timer is set for member suspect period'); + + suspicion.stopAll(); // Cleanup + assert.end(); +}); + +test('suspect period cannot be started for local member', function t(assert) { + var localMember = { address: '127.0.0.1:3000' }; + var suspicion = createSuspicion(); + suspicion.ringpop.membership.localMember = localMember; + suspicion.ringpop.membership.getLocalMemberAddress = function() { return localMember.address; }; + + suspicion.start(localMember); + assert.notok(suspicion.timers[localMember.address], 'timer is not set for local member suspect period'); + + suspicion.stopAll(); + assert.end(); +}); + +test('suspect period for member is stopped before another is started', function t(assert) { + assert.plan(2); + + var suspicion = createSuspicion(); + var remoteMember = suspicion.ringpop.membership.remoteMember; + suspicion.timers[remoteMember.address] = true; + suspicion.stop = function(member) { + assert.equals(member.address, remoteMember.address, 'stopping correct member period'); + assert.pass('stop was called on previous suspect period'); + }; + + suspicion.start(remoteMember); + + suspicion.stopAll(); + assert.end(); +}); + +test('suspect period can\'t be started until reenabled', function t(assert) { + var suspicion = createSuspicion(); + var remoteMember = suspicion.ringpop.membership.remoteMember; + suspicion.stopAll(); + + suspicion.start(remoteMember); + assert.notok(suspicion.timers[remoteMember.address], 'timer for member was not set'); + + suspicion.reenable(); + assert.equals(suspicion.isStoppedAll, null, 'suspicion reenabled'); + + suspicion.start(remoteMember); + assert.ok(suspicion.timers[remoteMember.address], 'timer for member was set'); + + suspicion.stopAll(); + assert.end(); +}); + +test('suspect period stop all clears all timers', function t(assert) { + var suspicion = createSuspicion(); + var remoteMember = suspicion.ringpop.membership.remoteMember; + var remoteMember2 = suspicion.ringpop.membership.remoteMember2; + + suspicion.start(remoteMember); + suspicion.start(remoteMember2); + assert.ok(suspicion.timers[remoteMember.address], 'suspect timer started for first member'); + assert.ok(suspicion.timers[remoteMember2.address], 'suspect timer started for next member'); + + suspicion.stopAll(); + assert.notok(suspicion.timers[remoteMember.address], 'suspect timer clear for first member'); + assert.notok(suspicion.timers[remoteMember2.address], 'suspect timer clear for next member'); + assert.ok(suspicion.isStoppedAll, 'stopped all timers'); + + assert.end(); +}); + +test('suspicion subprotocol cannot be reenabled without all timers first being stopped', function t(assert) { + var suspicion = createSuspicion(); + suspicion.isStoppedAll = 'fakestopall'; + suspicion.reenable(); + assert.equals(suspicion.isStoppedAll, 'fakestopall', 'suspicion not reenabled'); + assert.end(); +}); + +test('marks member faulty after suspect period', function t(assert) { + assert.plan(2); + + var suspicion = createSuspicion(); + var member = suspicion.ringpop.membership.remoteMember; + suspicion.setTimeout = function(fn) { return fn(); }; + suspicion.ringpop.membership.update = function(changes) { + assert.equals(changes[0].address, member.address, 'updates correct member'); + assert.equals(changes[0].status, 'faulty', 'marks member faulty'); + }; + + suspicion.start(member); + + suspicion.stopAll(); + assert.end(); +});