diff --git a/index.js b/index.js index b4731d67..9956c9d2 100644 --- a/index.js +++ b/index.js @@ -31,6 +31,7 @@ var sendPing = require('./lib/swim/ping-sender.js'); var sendPingReq = require('./lib/swim/ping-req-sender.js'); var Suspicion = require('./lib/swim/suspicion'); +var createMembershipUpdateListener = require('./lib/membership-update-listener.js'); var createRingPopTChannel = require('./lib/tchannel.js').createRingPopTChannel; var Dissemination = require('./lib/dissemination.js'); var errors = require('./lib/errors.js'); @@ -127,7 +128,7 @@ function RingPop(options) { this.dissemination = new Dissemination(this); this.membership = new Membership(this); - this.membership.on('updated', this.onMembershipUpdated.bind(this)); + this.membership.on('updated', createMembershipUpdateListener(this)); this.memberIterator = new MembershipIterator(this); this.gossip = new Gossip({ ringpop: this, @@ -401,89 +402,6 @@ RingPop.prototype.whoami = function whoami() { return this.hostPort; }; -RingPop.prototype.onMemberAlive = function onMemberAlive(change) { - this.stat('increment', 'membership-update.alive'); - this.logger.debug('member is alive', { - local: this.whoami(), - alive: change.address - }); - - this.dissemination.recordChange(change); - this.ring.addServer(change.address); - this.suspicion.stop(change); -}; - -RingPop.prototype.onMemberFaulty = function onMemberFaulty(change) { - this.stat('increment', 'membership-update.faulty'); - this.logger.debug('member is faulty', { - local: this.whoami(), - faulty: change.address, - }); - - this.dissemination.recordChange(change); - this.ring.removeServer(change.address); - this.suspicion.stop(change); -}; - -RingPop.prototype.onMemberLeave = function onMemberLeave(change) { - this.stat('increment', 'membership-update.leave'); - this.logger.debug('member has left', { - local: this.whoami(), - left: change.address - }); - - this.dissemination.recordChange(change); - this.ring.removeServer(change.address); - this.suspicion.stop(change); -}; - -RingPop.prototype.onMemberSuspect = function onMemberSuspect(change) { - this.stat('increment', 'membership-update.suspect'); - this.logger.debug('member is suspect', { - local: this.whoami(), - suspect: change.address - }); - - this.suspicion.start(change); - this.dissemination.recordChange(change); -}; - -RingPop.prototype.onMembershipUpdated = function onMembershipUpdated(updates) { - var self = this; - var membershipChanged = false; - var ringChanged = false; - - updates.forEach(function(update) { - if (update.status === 'alive') { - self.onMemberAlive(update); - ringChanged = membershipChanged = true; - } else if (update.status === 'faulty') { - self.onMemberFaulty(update); - ringChanged = membershipChanged = true; - } else if (update.status === 'leave') { - self.onMemberLeave(update); - ringChanged = membershipChanged = true; - } else if (update.status === 'suspect') { - self.onMemberSuspect(update); - membershipChanged = true; - } - }); - - if (!!membershipChanged) { - this.emit('membershipChanged'); - this.emit('changed'); // Deprecated - } - - if (!!ringChanged) { - this.emit('ringChanged'); - } - - this.membershipUpdateRollup.trackUpdates(updates); - - this.stat('gauge', 'num-members', this.membership.members.length); - this.stat('timing', 'updates', updates.length); -}; - RingPop.prototype.pingMemberNow = function pingMemberNow(callback) { callback = callback || function() {}; diff --git a/lib/membership-update-listener.js b/lib/membership-update-listener.js new file mode 100644 index 00000000..41ceb00e --- /dev/null +++ b/lib/membership-update-listener.js @@ -0,0 +1,76 @@ +// Copyright (c) 2015 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. +'use strict'; + +var Member = require('./member.js'); + +module.exports = function createListener(ringpop) { + return function onMembershipUpdated(updates) { + var serversToAdd = []; + var serversToRemove = []; + + for (var i = 0; i < updates.length; i++) { + var update = updates[i]; + + ringpop.stat('increment', 'membership-update.' + (update.status || 'unknown')); + + if (update.status === Member.Status.alive) { + serversToAdd.push(update.address); + ringpop.suspicion.stop(update); + } else if (update.status === Member.Status.suspect) { + ringpop.suspicion.start(update); + } else if (update.status === Member.Status.faulty) { + serversToRemove.push(update.address); + ringpop.suspicion.stop(update); + } else if (update.status === Member.Status.leave) { + serversToRemove.push(update.address); + ringpop.suspicion.stop(update); + } + + ringpop.dissemination.recordChange(update); + + ringpop.logger.debug('member updated', { + local: ringpop.whoami(), + address: update.address, + incarnationNumber: update.incarnationNumber, + status: update.status + }); + } + + // Must add/remove servers from ring in batch. There are + // efficiency gains when only having to compute the ring + // checksum once. + if (serversToAdd.length > 0 || serversToRemove.length > 0) { + var ringChanged = ringpop.ring.addRemoveServers(serversToAdd, serversToRemove); + + if (ringChanged) { + ringpop.emit('ringChanged'); + } + } + + ringpop.membershipUpdateRollup.trackUpdates(updates); + + ringpop.stat('gauge', 'num-members', ringpop.membership.members.length); + ringpop.stat('timing', 'updates', updates.length); + + ringpop.emit('membershipChanged'); + ringpop.emit('changed'); // Deprecated + }; +}; diff --git a/lib/ring.js b/lib/ring.js index d20ebffe..e6f7f8e7 100644 --- a/lib/ring.js +++ b/lib/ring.js @@ -34,22 +34,62 @@ function HashRing(options) { util.inherits(HashRing, EventEmitter); -// TODO - error checking around adding a server that's already there // TODO - error checking from rbtree.insert HashRing.prototype.addServer = function addServer(name) { if (this.hasServer(name)) { return; } - this.servers[name] = true; + this.addServerReplicas(name); + this.computeChecksum(); + + this.emit('added', name); +}; + +HashRing.prototype.addServerReplicas = function addServerReplicas(server) { + // Assumes server has not been previously added. + this.servers[server] = true; for (var i = 0; i < this.replicaPoints; i++) { - this.rbtree.insert(farmhash.hash32(name + i), name); + var hash = farmhash.hash32(server + i); + this.rbtree.insert(hash, server); } +}; - this.computeChecksum(); +HashRing.prototype.addRemoveServers = function addRemoveServers(serversToAdd, serversToRemove) { + serversToAdd = serversToAdd || []; + serversToRemove = serversToRemove || []; - this.emit('added', name); + var addedServers = false; + var removedServers = false; + + var server; + + for (var i = 0; i < serversToAdd.length; i++) { + server = serversToAdd[i]; + + if (!this.hasServer(server)) { + this.addServerReplicas(server); + addedServers = true; + } + } + + for (var j = 0; j < serversToRemove.length; j++) { + server = serversToRemove[j]; + + if (this.hasServer(server)) { + this.removeServerReplicas(server); + removedServers = true; + } + } + + var ringChanged = addedServers || removedServers; + + if (ringChanged) { + this.computeChecksum(); + } + + return ringChanged; }; HashRing.prototype.computeChecksum = function computeChecksum() { @@ -78,17 +118,22 @@ HashRing.prototype.removeServer = function removeServer(name) { return; } - delete this.servers[name]; - - for (var i = 0; i < this.replicaPoints; i++) { - this.rbtree.remove(farmhash.hash32(name + i), name); - } - + this.removeServerReplicas(name); this.computeChecksum(); this.emit('removed', name); }; +HashRing.prototype.removeServerReplicas = function removeServerReplicas(server) { + // Assumes server has been previously added. + delete this.servers[server]; + + for (var i = 0; i < this.replicaPoints; i++) { + var hash = farmhash.hash32(server + i); + this.rbtree.remove(hash, server); + } +}; + HashRing.prototype.lookup = function lookup(str) { var hash = farmhash.hash32(str); var iter = this.rbtree.upperBound(hash); diff --git a/test/ring-test.js b/test/ring-test.js new file mode 100644 index 00000000..efabb542 --- /dev/null +++ b/test/ring-test.js @@ -0,0 +1,76 @@ +// Copyright (c) 2015 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. +'use strict'; + +var _ = require('underscore'); +var HashRing = require('../lib/ring.js'); +var test = require('tape'); + +function createServers(size) { + return _.times(size, function each(i) { + return '127.0.0.1:' + (3000 + i); + }); +} + +var servers = createServers(1000); + +test('has correct number of servers on add/remove', function t(assert) { + var ring = new HashRing(); + + ring.addRemoveServers(servers, null); + assert.equal(ring.getServerCount(), 1000, 'has 1000 servers'); + + ring.addRemoveServers(null, servers); + assert.equal(ring.getServerCount(), 0, 'has 0 servers'); + + ring.addRemoveServers(servers, servers); + assert.equal(ring.getServerCount(), 0, 'has 0 servers'); + + assert.end(); +}); + +test('checksum computed only once', function t(assert) { + assert.plan(1); + + var ring = new HashRing(); + ring.on('checksumComputed', function onComputed() { + assert.pass('checksum computed'); + }); + + ring.addRemoveServers(servers, servers); + + assert.end(); +}); + +test('1000 lookups', function t(assert) { + assert.plan(1000); + + var ring = new HashRing(); + ring.addRemoveServers(servers, null); + + for (var i = 0; i < servers.length; i++) { + var server = servers[i]; + + assert.equal(ring.lookup(server + '0'), server, + 'server hashes correctly'); + } + + assert.end(); +});