Skip to content

Commit

Permalink
Add/remove servers from ring in batch
Browse files Browse the repository at this point in the history
  • Loading branch information
jwolski committed Jun 24, 2015
1 parent e7e6e22 commit fe8187e
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 95 deletions.
86 changes: 2 additions & 84 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var sendPing = require('./lib/swim/ping-sender.js');
var sendPingReq = require('./lib/swim/ping-req-sender.js');
var Suspicion = require('./lib/swim/suspicion');

var createMembershipUpdateListener = require('./lib/membership-update-listener.js');
var createRingPopTChannel = require('./lib/tchannel.js').createRingPopTChannel;
var Dissemination = require('./lib/dissemination.js');
var errors = require('./lib/errors.js');
Expand Down Expand Up @@ -127,7 +128,7 @@ function RingPop(options) {

this.dissemination = new Dissemination(this);
this.membership = new Membership(this);
this.membership.on('updated', this.onMembershipUpdated.bind(this));
this.membership.on('updated', createMembershipUpdateListener(this));
this.memberIterator = new MembershipIterator(this);
this.gossip = new Gossip({
ringpop: this,
Expand Down Expand Up @@ -401,89 +402,6 @@ RingPop.prototype.whoami = function whoami() {
return this.hostPort;
};

RingPop.prototype.onMemberAlive = function onMemberAlive(change) {
this.stat('increment', 'membership-update.alive');
this.logger.debug('member is alive', {
local: this.whoami(),
alive: change.address
});

this.dissemination.recordChange(change);
this.ring.addServer(change.address);
this.suspicion.stop(change);
};

RingPop.prototype.onMemberFaulty = function onMemberFaulty(change) {
this.stat('increment', 'membership-update.faulty');
this.logger.debug('member is faulty', {
local: this.whoami(),
faulty: change.address,
});

this.dissemination.recordChange(change);
this.ring.removeServer(change.address);
this.suspicion.stop(change);
};

RingPop.prototype.onMemberLeave = function onMemberLeave(change) {
this.stat('increment', 'membership-update.leave');
this.logger.debug('member has left', {
local: this.whoami(),
left: change.address
});

this.dissemination.recordChange(change);
this.ring.removeServer(change.address);
this.suspicion.stop(change);
};

RingPop.prototype.onMemberSuspect = function onMemberSuspect(change) {
this.stat('increment', 'membership-update.suspect');
this.logger.debug('member is suspect', {
local: this.whoami(),
suspect: change.address
});

this.suspicion.start(change);
this.dissemination.recordChange(change);
};

RingPop.prototype.onMembershipUpdated = function onMembershipUpdated(updates) {
var self = this;
var membershipChanged = false;
var ringChanged = false;

updates.forEach(function(update) {
if (update.status === 'alive') {
self.onMemberAlive(update);
ringChanged = membershipChanged = true;
} else if (update.status === 'faulty') {
self.onMemberFaulty(update);
ringChanged = membershipChanged = true;
} else if (update.status === 'leave') {
self.onMemberLeave(update);
ringChanged = membershipChanged = true;
} else if (update.status === 'suspect') {
self.onMemberSuspect(update);
membershipChanged = true;
}
});

if (!!membershipChanged) {
this.emit('membershipChanged');
this.emit('changed'); // Deprecated
}

if (!!ringChanged) {
this.emit('ringChanged');
}

this.membershipUpdateRollup.trackUpdates(updates);

this.stat('gauge', 'num-members', this.membership.members.length);
this.stat('timing', 'updates', updates.length);
};

RingPop.prototype.pingMemberNow = function pingMemberNow(callback) {
callback = callback || function() {};

Expand Down
76 changes: 76 additions & 0 deletions lib/membership-update-listener.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) 2015 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
'use strict';

var Member = require('./member.js');

module.exports = function createListener(ringpop) {
return function onMembershipUpdated(updates) {
var serversToAdd = [];
var serversToRemove = [];

for (var i = 0; i < updates.length; i++) {
var update = updates[i];

ringpop.stat('increment', 'membership-update.' + (update.status || 'unknown'));

if (update.status === Member.Status.alive) {
serversToAdd.push(update.address);
ringpop.suspicion.stop(update);
} else if (update.status === Member.Status.suspect) {
ringpop.suspicion.start(update);
} else if (update.status === Member.Status.faulty) {
serversToRemove.push(update.address);
ringpop.suspicion.stop(update);
} else if (update.status === Member.Status.leave) {
serversToRemove.push(update.address);
ringpop.suspicion.stop(update);
}

ringpop.dissemination.recordChange(update);

ringpop.logger.debug('member updated', {
local: ringpop.whoami(),
address: update.address,
incarnationNumber: update.incarnationNumber,
status: update.status
});
}

// Must add/remove servers from ring in batch. There are
// efficiency gains when only having to compute the ring
// checksum once.
if (serversToAdd.length > 0 || serversToRemove.length > 0) {
var ringChanged = ringpop.ring.addRemoveServers(serversToAdd, serversToRemove);

if (ringChanged) {
ringpop.emit('ringChanged');
}
}

ringpop.membershipUpdateRollup.trackUpdates(updates);

ringpop.stat('gauge', 'num-members', ringpop.membership.members.length);
ringpop.stat('timing', 'updates', updates.length);

ringpop.emit('membershipChanged');
ringpop.emit('changed'); // Deprecated
};
};
67 changes: 56 additions & 11 deletions lib/ring.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,62 @@ function HashRing(options) {

util.inherits(HashRing, EventEmitter);

// TODO - error checking around adding a server that's already there
// TODO - error checking from rbtree.insert
HashRing.prototype.addServer = function addServer(name) {
if (this.hasServer(name)) {
return;
}

this.servers[name] = true;
this.addServerReplicas(name);
this.computeChecksum();

this.emit('added', name);
};

HashRing.prototype.addServerReplicas = function addServerReplicas(server) {
// Assumes server has not been previously added.
this.servers[server] = true;

for (var i = 0; i < this.replicaPoints; i++) {
this.rbtree.insert(farmhash.hash32(name + i), name);
var hash = farmhash.hash32(server + i);
this.rbtree.insert(hash, server);
}
};

this.computeChecksum();
HashRing.prototype.addRemoveServers = function addRemoveServers(serversToAdd, serversToRemove) {
serversToAdd = serversToAdd || [];
serversToRemove = serversToRemove || [];

this.emit('added', name);
var addedServers = false;
var removedServers = false;

var server;

for (var i = 0; i < serversToAdd.length; i++) {
server = serversToAdd[i];

if (!this.hasServer(server)) {
this.addServerReplicas(server);
addedServers = true;
}
}

for (var j = 0; j < serversToRemove.length; j++) {
server = serversToRemove[j];

if (this.hasServer(server)) {
this.removeServerReplicas(server);
removedServers = true;
}
}

var ringChanged = addedServers || removedServers;

if (ringChanged) {
this.computeChecksum();
}

return ringChanged;
};

HashRing.prototype.computeChecksum = function computeChecksum() {
Expand Down Expand Up @@ -78,17 +118,22 @@ HashRing.prototype.removeServer = function removeServer(name) {
return;
}

delete this.servers[name];

for (var i = 0; i < this.replicaPoints; i++) {
this.rbtree.remove(farmhash.hash32(name + i), name);
}

this.removeServerReplicas(name);
this.computeChecksum();

this.emit('removed', name);
};

HashRing.prototype.removeServerReplicas = function removeServerReplicas(server) {
// Assumes server has been previously added.
delete this.servers[server];

for (var i = 0; i < this.replicaPoints; i++) {
var hash = farmhash.hash32(server + i);
this.rbtree.remove(hash, server);
}
};

HashRing.prototype.lookup = function lookup(str) {
var hash = farmhash.hash32(str);
var iter = this.rbtree.upperBound(hash);
Expand Down
76 changes: 76 additions & 0 deletions test/ring-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) 2015 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
'use strict';

var _ = require('underscore');
var HashRing = require('../lib/ring.js');
var test = require('tape');

function createServers(size) {
return _.times(size, function each(i) {
return '127.0.0.1:' + (3000 + i);
});
}

var servers = createServers(1000);

test('has correct number of servers on add/remove', function t(assert) {
var ring = new HashRing();

ring.addRemoveServers(servers, null);
assert.equal(ring.getServerCount(), 1000, 'has 1000 servers');

ring.addRemoveServers(null, servers);
assert.equal(ring.getServerCount(), 0, 'has 0 servers');

ring.addRemoveServers(servers, servers);
assert.equal(ring.getServerCount(), 0, 'has 0 servers');

assert.end();
});

test('checksum computed only once', function t(assert) {
assert.plan(1);

var ring = new HashRing();
ring.on('checksumComputed', function onComputed() {
assert.pass('checksum computed');
});

ring.addRemoveServers(servers, servers);

assert.end();
});

test('1000 lookups', function t(assert) {
assert.plan(1000);

var ring = new HashRing();
ring.addRemoveServers(servers, null);

for (var i = 0; i < servers.length; i++) {
var server = servers[i];

assert.equal(ring.lookup(server + '0'), server,
'server hashes correctly');
}

assert.end();
});

0 comments on commit fe8187e

Please sign in to comment.