Skip to content

Commit

Permalink
Compute a different checksum for the ring and use it for request prox…
Browse files Browse the repository at this point in the history
…ying
  • Loading branch information
jwolski committed Mar 13, 2015
1 parent 422235f commit c129d6e
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 24 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ All other properties should be considered private. Any mutation of properties no
* `requestProxy.retrySucceeded` - a request that is retried succeeds
* `requestProxy.retryFailed` - a request is retried up to the maximum number of retries and fails
* `ringChanged` - ring state has changed for one or more nodes either having joined or left the cluster. All ring changes are member changes, but not vice versa.
* `ringChecksumComputed` - the hash ring's checksum was computed
* `ringServerAdded` - a server was added to the ring
* `ringServerRemoved` - a server was removed to the ring

## Code Walkthrough
Instantiate ringpop by providing it the title and listening address of your application. It's important to note that the listening address of your ringpop instance is also used as a globally unique identifier for the instance within the ring. Therefore, make sure `hostPort` is unique.
Expand Down
20 changes: 20 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@ var MAX_JOIN_DURATION = 300000;
var MEMBERSHIP_UPDATE_FLUSH_INTERVAL = 5000;
var PROXY_REQ_PROPS = ['keys', 'dest', 'req', 'res'];

function onRingChecksumComputed(ringpop) {
ringpop.stat('increment', 'ring.checksum-computed');
ringpop.emit('ringChecksumComputed');
}

function onRingServerAdded(ringpop) {
ringpop.stat('increment', 'ring.server-added');
ringpop.emit('ringServerAdded');
}

function onRingServerRemoved(ringpop) {
ringpop.stat('increment', 'ring.server-removed');
ringpop.emit('ringServerRemoved');
}

function RingPop(options) {
if (!(this instanceof RingPop)) {
return new RingPop(options);
Expand Down Expand Up @@ -104,7 +119,12 @@ function RingPop(options) {
maxRetries: options.requestProxyMaxRetries,
retrySchedule: options.requestProxyRetrySchedule
});

this.ring = new HashRing();
this.ring.on('added', onRingServerAdded.bind(null, this));
this.ring.on('removed', onRingServerRemoved.bind(null, this));
this.ring.on('checksumComputed', onRingChecksumComputed.bind(null, this));

this.dissemination = new Dissemination(this);
this.membership = new Membership(this);
this.membership.on('updated', this.onMembershipUpdated.bind(this));
Expand Down
4 changes: 2 additions & 2 deletions lib/request-proxy/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ proto.handleRequest = function handleRequest(head, body, cb) {
var httpVersion = head.httpVersion;
var checksum = head.ringpopChecksum;

if (checksum !== ringpop.membership.checksum) {
if (checksum !== ringpop.ring.checksum) {
var err = InvalidCheckSumError({
expected: ringpop.membership.checksum,
expected: ringpop.ring.checksum,
actual: checksum
});
ringpop.logger.warn('handleRequest got invalid checksum', {
Expand Down
6 changes: 3 additions & 3 deletions lib/request-proxy/send.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var TypedError = require('error/typed');

var MaxRetriesExceeded = TypedError({
type: 'ringpop.request-proxy.max-retries-exceeded',
message: 'Max number of retries exceeded. ({maxRetries}) were attempted',
message: 'Max number of retries exceeded. {maxRetries} were attempted',
maxRetries: null
});

Expand Down Expand Up @@ -127,14 +127,14 @@ RequestProxySend.prototype.destroy = function destroy() {

RequestProxySend.prototype.getRawHead = function getRawHead() {
return rawHead(this.request.obj, {
checksum: this.ringpop.membership.checksum,
checksum: this.ringpop.ring.checksum,
keys: this.keys
});
};

RequestProxySend.prototype.getStrHead = function getStrHead() {
return strHead(this.request.obj, {
checksum: this.ringpop.membership.checksum,
checksum: this.ringpop.ring.checksum,
keys: this.keys
});
};
Expand Down
16 changes: 16 additions & 0 deletions lib/ring.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ function HashRing(options) {

this.rbtree = new RBTree();
this.servers = {};
this.checksum = null;
}

util.inherits(HashRing, EventEmitter);
Expand All @@ -46,9 +47,22 @@ HashRing.prototype.addServer = function addServer(name) {
this.rbtree.insert(farmhash.hash32(name + i), name);
}

this.computeChecksum();

this.emit('added', name);
};

HashRing.prototype.computeChecksum = function computeChecksum() {
// If servers is empty, a checksum will still be computed
// for the empty string.
var serverNames = Object.keys(this.servers);
var serverNameStr = serverNames.sort().join(';');

this.checksum = farmhash.hash32(serverNameStr);

this.emit('checksumComputed');
};

HashRing.prototype.getServerCount = function getServerCount() {
return Object.keys(this.servers).length;
};
Expand All @@ -70,6 +84,8 @@ HashRing.prototype.removeServer = function removeServer(name) {
this.rbtree.remove(farmhash.hash32(name + i), name);
}

this.computeChecksum();

this.emit('removed', name);
};

Expand Down
73 changes: 73 additions & 0 deletions test/hashring_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,76 @@ test('HashRing.removeServer', function t(assert) {

assert.end();
});

test('checksum is null upon instantiation', function t(assert) {
var ring = new HashRing();
assert.equals(ring.checksum, null, 'checksum is null');
assert.end();
});

test('checksum is not null when server added', function t(assert) {
var ring = new HashRing();
ring.addServer('127.0.0.1:3000');
assert.doesNotEqual(ring.checksum, null, 'checksum is not null');
assert.end();
});

test('checksum is still null when non-existent server removed', function t(assert) {
var ring = new HashRing();
ring.removeServer('127.0.0.1:3000');
assert.equals(ring.checksum, null, 'checksum is null');
assert.end();
});

test('checksum recomputed after server added, then removed', function t(assert) {
var ring = new HashRing();

ring.addServer('127.0.0.1:3000');
var firstChecksum = ring.checksum;

ring.removeServer('127.0.0.1:3000');
var secondChecksum = ring.checksum;

assert.doesNotEqual(firstChecksum, null, 'first checksum is not null');
assert.doesNotEqual(secondChecksum, null, 'second checksum is not null');
assert.doesNotEqual(firstChecksum, secondChecksum, 'checksums are different');
assert.end();
});

test('servers added out of order result in same checksum', function t(assert) {
var ring1 = new HashRing();
ring1.addServer('127.0.0.1:3000');
ring1.addServer('127.0.0.1:3001');

var ring2 = new HashRing();
ring2.addServer('127.0.0.1:3001');
ring2.addServer('127.0.0.1:3000');

assert.doesNotEqual(ring1.checksum, null, 'ring1 checksum is not null');
assert.doesNotEqual(ring2.checksum, null, 'ring2 checksum is not null');
assert.equals(ring1.checksum, ring2.checksum, 'checksums are same');
assert.end();
});

test('servers removed out of order result in same checksum', function t(assert) {
var ring1 = new HashRing();
addServers(ring1);
ring1.removeServer('127.0.0.1:3001');
ring1.removeServer('127.0.0.1:3002');

var ring2 = new HashRing();
addServers(ring2);
ring2.removeServer('127.0.0.1:3002');
ring2.removeServer('127.0.0.1:3001');

assert.doesNotEqual(ring1.checksum, null, 'ring1 checksum is not null');
assert.doesNotEqual(ring2.checksum, null, 'ring2 checksum is not null');
assert.equals(ring1.checksum, ring2.checksum, 'checksums are same');
assert.end();

function addServers(ring) {
for (var i = 0; i < 4; i++) {
ring.addServer('127.0.0.1:300' + i);
}
}
});
26 changes: 13 additions & 13 deletions test/integration/proxy_req_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ test('one retry', function t(assert) {
};

var cluster = allocCluster(ringpopOpts, function onReady() {
cluster.two.membership.checksum = cluster.one.membership.checksum + 1;
cluster.two.ring.checksum = cluster.one.ring.checksum + 1;

cluster.two.once('requestProxy.checksumsDiffer', function onBadChecksum() {
assert.pass('received request with invalid checksum');
cluster.two.membership.checksum = cluster.one.membership.checksum;
cluster.two.ring.checksum = cluster.one.ring.checksum;
});

cluster.two.once('request', function onGoodChecksum() {
Expand Down Expand Up @@ -105,14 +105,14 @@ test('two retries', function t(assert) {
var numAttempts = 0;

var cluster = allocCluster(ringpopOpts, function onReady() {
cluster.two.membership.checksum = cluster.one.membership.checksum + 1;
cluster.two.ring.checksum = cluster.one.ring.checksum + 1;

cluster.two.on('requestProxy.checksumsDiffer', function onBadChecksum() {
numAttempts++;

// If last retry
if (numAttempts === cluster.two.requestProxy.maxRetries) {
cluster.two.membership.checksum = cluster.one.membership.checksum;
cluster.two.ring.checksum = cluster.one.ring.checksum;
}

assert.pass('received request with invalid checksum');
Expand Down Expand Up @@ -151,7 +151,7 @@ test('no retries, invalid checksum', function t(assert) {
};

var cluster = allocCluster(ringpopOpts, function onReady() {
cluster.two.membership.checksum = cluster.one.membership.checksum + 1;
cluster.two.ring.checksum = cluster.one.ring.checksum + 1;

cluster.two.on('requestProxy.checksumsDiffer', function onBadChecksum() {
numAttempts++;
Expand Down Expand Up @@ -189,7 +189,7 @@ test('exceeds max retries, errors out', function t(assert) {
};

var cluster = allocCluster(ringpopOpts, function onReady() {
cluster.two.membership.checksum = cluster.one.membership.checksum + 1;
cluster.two.ring.checksum = cluster.one.ring.checksum + 1;

cluster.two.on('requestProxy.checksumsDiffer', function onBadChecksum() {
numAttempts++;
Expand Down Expand Up @@ -238,7 +238,7 @@ test('cleans up pending sends', function t(assert) {
};

var cluster = allocCluster(ringpopOpts, function onReady() {
cluster.two.membership.checksum = cluster.one.membership.checksum + 1;
cluster.two.ring.checksum = cluster.one.ring.checksum + 1;

cluster.one.on('requestProxy.retryScheduled', function onRetry() {
done();
Expand All @@ -265,11 +265,11 @@ test('cleans up some pending sends', function t(assert) {
};

var cluster = allocCluster(ringpopOpts, function onReady() {
cluster.two.membership.checksum = cluster.one.membership.checksum + 1;
cluster.two.ring.checksum = cluster.one.ring.checksum + 1;

// Only one retry will be attempted, others will still be waiting
cluster.one.on('requestProxy.retryAttempted', function onRetry() {
cluster.two.membership.checksum = cluster.one.membership.checksum;
cluster.two.ring.checksum = cluster.one.ring.checksum;
});

for (var i = 0; i < 2; i++) {
Expand Down Expand Up @@ -332,7 +332,7 @@ test('overrides /proxy/req endpoint', function t(assert) {
});

var head = strHead(request, {
checksum: cluster.two.membership.checksum,
checksum: cluster.two.ring.checksum,
keys: [cluster.keys.two]
});
});
Expand Down Expand Up @@ -375,7 +375,7 @@ test('aborts retry because keys diverge', function t(assert) {
useFakeTimers: true
}, function onReady() {
// Make node two refuse initial request
cluster.two.membership.checksum = cluster.one.membership.checksum + 1;
cluster.two.ring.checksum = cluster.one.ring.checksum + 1;

cluster.one.on('requestProxy.retryAborted', function onRetryAborted() {
assert.pass('retry aborted');
Expand Down Expand Up @@ -425,7 +425,7 @@ test('reroutes retry to local', function t(assert) {
useFakeTimers: true
}, function onReady() {
// Make node two refuse initial request
cluster.two.membership.checksum = cluster.one.membership.checksum + 1;
cluster.two.ring.checksum = cluster.one.ring.checksum + 1;

cluster.one.on('requestProxy.retryRerouted', function onRetryRerouted() {
assert.pass('retry rerouted');
Expand Down Expand Up @@ -467,7 +467,7 @@ test('reroutes retry to remote', function t(assert) {
useFakeTimers: true
}, function onReady() {
// Make node two refuse initial request
cluster.two.membership.checksum = cluster.one.membership.checksum + 1;
cluster.two.ring.checksum = cluster.one.ring.checksum + 1;

cluster.one.on('requestProxy.retryRerouted', function onRetryRerouted() {
assert.pass('retry rerouted');
Expand Down
7 changes: 1 addition & 6 deletions test/integration/request_proxy_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -325,12 +325,7 @@ test('handles checksum failures', function t(assert) {
};

var cluster = allocCluster(ringpopOpts, function onReady() {
cluster.two.membership.addMember({
address: 'localhost:9999',
status: 'fake',
incarnationNumber: 10
});
cluster.two.membership.computeChecksum();
cluster.two.ring.addServer('localhost:9999');

cluster.request({
host: 'one', key: cluster.keys.two
Expand Down

0 comments on commit c129d6e

Please sign in to comment.