From c129d6e777ac5d35b492fa075bacf7078e76b613 Mon Sep 17 00:00:00 2001 From: Jeff Wolski Date: Fri, 13 Mar 2015 08:21:39 -0700 Subject: [PATCH] Compute a different checksum for the ring and use it for request proxying --- README.md | 3 ++ index.js | 20 +++++++ lib/request-proxy/index.js | 4 +- lib/request-proxy/send.js | 6 +-- lib/ring.js | 16 ++++++ test/hashring_test.js | 73 ++++++++++++++++++++++++++ test/integration/proxy_req_test.js | 26 ++++----- test/integration/request_proxy_test.js | 7 +-- 8 files changed, 131 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index cd920e2f..438d14d6 100644 --- a/README.md +++ b/README.md @@ -65,6 +65,9 @@ All other properties should be considered private. Any mutation of properties no * `requestProxy.retrySucceeded` - a request that is retried succeeds * `requestProxy.retryFailed` - a request is retried up to the maximum number of retries and fails * `ringChanged` - ring state has changed for one or more nodes either having joined or left the cluster. All ring changes are member changes, but not vice versa. +* `ringChecksumComputed` - the hash ring's checksum was computed +* `ringServerAdded` - a server was added to the ring +* `ringServerRemoved` - a server was removed to the ring ## Code Walkthrough Instantiate ringpop by providing it the title and listening address of your application. It's important to note that the listening address of your ringpop instance is also used as a globally unique identifier for the instance within the ring. Therefore, make sure `hostPort` is unique. diff --git a/index.js b/index.js index 4a68035f..909ce59c 100644 --- a/index.js +++ b/index.js @@ -49,6 +49,21 @@ var MAX_JOIN_DURATION = 300000; var MEMBERSHIP_UPDATE_FLUSH_INTERVAL = 5000; var PROXY_REQ_PROPS = ['keys', 'dest', 'req', 'res']; +function onRingChecksumComputed(ringpop) { + ringpop.stat('increment', 'ring.checksum-computed'); + ringpop.emit('ringChecksumComputed'); +} + +function onRingServerAdded(ringpop) { + ringpop.stat('increment', 'ring.server-added'); + ringpop.emit('ringServerAdded'); +} + +function onRingServerRemoved(ringpop) { + ringpop.stat('increment', 'ring.server-removed'); + ringpop.emit('ringServerRemoved'); +} + function RingPop(options) { if (!(this instanceof RingPop)) { return new RingPop(options); @@ -104,7 +119,12 @@ function RingPop(options) { maxRetries: options.requestProxyMaxRetries, retrySchedule: options.requestProxyRetrySchedule }); + this.ring = new HashRing(); + this.ring.on('added', onRingServerAdded.bind(null, this)); + this.ring.on('removed', onRingServerRemoved.bind(null, this)); + this.ring.on('checksumComputed', onRingChecksumComputed.bind(null, this)); + this.dissemination = new Dissemination(this); this.membership = new Membership(this); this.membership.on('updated', this.onMembershipUpdated.bind(this)); diff --git a/lib/request-proxy/index.js b/lib/request-proxy/index.js index 0ab850fc..92e1ba50 100644 --- a/lib/request-proxy/index.js +++ b/lib/request-proxy/index.js @@ -158,9 +158,9 @@ proto.handleRequest = function handleRequest(head, body, cb) { var httpVersion = head.httpVersion; var checksum = head.ringpopChecksum; - if (checksum !== ringpop.membership.checksum) { + if (checksum !== ringpop.ring.checksum) { var err = InvalidCheckSumError({ - expected: ringpop.membership.checksum, + expected: ringpop.ring.checksum, actual: checksum }); ringpop.logger.warn('handleRequest got invalid checksum', { diff --git a/lib/request-proxy/send.js b/lib/request-proxy/send.js index 4f0153e7..6af450f7 100644 --- a/lib/request-proxy/send.js +++ b/lib/request-proxy/send.js @@ -27,7 +27,7 @@ var TypedError = require('error/typed'); var MaxRetriesExceeded = TypedError({ type: 'ringpop.request-proxy.max-retries-exceeded', - message: 'Max number of retries exceeded. ({maxRetries}) were attempted', + message: 'Max number of retries exceeded. {maxRetries} were attempted', maxRetries: null }); @@ -127,14 +127,14 @@ RequestProxySend.prototype.destroy = function destroy() { RequestProxySend.prototype.getRawHead = function getRawHead() { return rawHead(this.request.obj, { - checksum: this.ringpop.membership.checksum, + checksum: this.ringpop.ring.checksum, keys: this.keys }); }; RequestProxySend.prototype.getStrHead = function getStrHead() { return strHead(this.request.obj, { - checksum: this.ringpop.membership.checksum, + checksum: this.ringpop.ring.checksum, keys: this.keys }); }; diff --git a/lib/ring.js b/lib/ring.js index 9677f7b1..d20ebffe 100644 --- a/lib/ring.js +++ b/lib/ring.js @@ -29,6 +29,7 @@ function HashRing(options) { this.rbtree = new RBTree(); this.servers = {}; + this.checksum = null; } util.inherits(HashRing, EventEmitter); @@ -46,9 +47,22 @@ HashRing.prototype.addServer = function addServer(name) { this.rbtree.insert(farmhash.hash32(name + i), name); } + this.computeChecksum(); + this.emit('added', name); }; +HashRing.prototype.computeChecksum = function computeChecksum() { + // If servers is empty, a checksum will still be computed + // for the empty string. + var serverNames = Object.keys(this.servers); + var serverNameStr = serverNames.sort().join(';'); + + this.checksum = farmhash.hash32(serverNameStr); + + this.emit('checksumComputed'); +}; + HashRing.prototype.getServerCount = function getServerCount() { return Object.keys(this.servers).length; }; @@ -70,6 +84,8 @@ HashRing.prototype.removeServer = function removeServer(name) { this.rbtree.remove(farmhash.hash32(name + i), name); } + this.computeChecksum(); + this.emit('removed', name); }; diff --git a/test/hashring_test.js b/test/hashring_test.js index 00d6f948..fbfc346f 100644 --- a/test/hashring_test.js +++ b/test/hashring_test.js @@ -91,3 +91,76 @@ test('HashRing.removeServer', function t(assert) { assert.end(); }); + +test('checksum is null upon instantiation', function t(assert) { + var ring = new HashRing(); + assert.equals(ring.checksum, null, 'checksum is null'); + assert.end(); +}); + +test('checksum is not null when server added', function t(assert) { + var ring = new HashRing(); + ring.addServer('127.0.0.1:3000'); + assert.doesNotEqual(ring.checksum, null, 'checksum is not null'); + assert.end(); +}); + +test('checksum is still null when non-existent server removed', function t(assert) { + var ring = new HashRing(); + ring.removeServer('127.0.0.1:3000'); + assert.equals(ring.checksum, null, 'checksum is null'); + assert.end(); +}); + +test('checksum recomputed after server added, then removed', function t(assert) { + var ring = new HashRing(); + + ring.addServer('127.0.0.1:3000'); + var firstChecksum = ring.checksum; + + ring.removeServer('127.0.0.1:3000'); + var secondChecksum = ring.checksum; + + assert.doesNotEqual(firstChecksum, null, 'first checksum is not null'); + assert.doesNotEqual(secondChecksum, null, 'second checksum is not null'); + assert.doesNotEqual(firstChecksum, secondChecksum, 'checksums are different'); + assert.end(); +}); + +test('servers added out of order result in same checksum', function t(assert) { + var ring1 = new HashRing(); + ring1.addServer('127.0.0.1:3000'); + ring1.addServer('127.0.0.1:3001'); + + var ring2 = new HashRing(); + ring2.addServer('127.0.0.1:3001'); + ring2.addServer('127.0.0.1:3000'); + + assert.doesNotEqual(ring1.checksum, null, 'ring1 checksum is not null'); + assert.doesNotEqual(ring2.checksum, null, 'ring2 checksum is not null'); + assert.equals(ring1.checksum, ring2.checksum, 'checksums are same'); + assert.end(); +}); + +test('servers removed out of order result in same checksum', function t(assert) { + var ring1 = new HashRing(); + addServers(ring1); + ring1.removeServer('127.0.0.1:3001'); + ring1.removeServer('127.0.0.1:3002'); + + var ring2 = new HashRing(); + addServers(ring2); + ring2.removeServer('127.0.0.1:3002'); + ring2.removeServer('127.0.0.1:3001'); + + assert.doesNotEqual(ring1.checksum, null, 'ring1 checksum is not null'); + assert.doesNotEqual(ring2.checksum, null, 'ring2 checksum is not null'); + assert.equals(ring1.checksum, ring2.checksum, 'checksums are same'); + assert.end(); + + function addServers(ring) { + for (var i = 0; i < 4; i++) { + ring.addServer('127.0.0.1:300' + i); + } + } +}); diff --git a/test/integration/proxy_req_test.js b/test/integration/proxy_req_test.js index c9bee976..85e4da29 100644 --- a/test/integration/proxy_req_test.js +++ b/test/integration/proxy_req_test.js @@ -67,11 +67,11 @@ test('one retry', function t(assert) { }; var cluster = allocCluster(ringpopOpts, function onReady() { - cluster.two.membership.checksum = cluster.one.membership.checksum + 1; + cluster.two.ring.checksum = cluster.one.ring.checksum + 1; cluster.two.once('requestProxy.checksumsDiffer', function onBadChecksum() { assert.pass('received request with invalid checksum'); - cluster.two.membership.checksum = cluster.one.membership.checksum; + cluster.two.ring.checksum = cluster.one.ring.checksum; }); cluster.two.once('request', function onGoodChecksum() { @@ -105,14 +105,14 @@ test('two retries', function t(assert) { var numAttempts = 0; var cluster = allocCluster(ringpopOpts, function onReady() { - cluster.two.membership.checksum = cluster.one.membership.checksum + 1; + cluster.two.ring.checksum = cluster.one.ring.checksum + 1; cluster.two.on('requestProxy.checksumsDiffer', function onBadChecksum() { numAttempts++; // If last retry if (numAttempts === cluster.two.requestProxy.maxRetries) { - cluster.two.membership.checksum = cluster.one.membership.checksum; + cluster.two.ring.checksum = cluster.one.ring.checksum; } assert.pass('received request with invalid checksum'); @@ -151,7 +151,7 @@ test('no retries, invalid checksum', function t(assert) { }; var cluster = allocCluster(ringpopOpts, function onReady() { - cluster.two.membership.checksum = cluster.one.membership.checksum + 1; + cluster.two.ring.checksum = cluster.one.ring.checksum + 1; cluster.two.on('requestProxy.checksumsDiffer', function onBadChecksum() { numAttempts++; @@ -189,7 +189,7 @@ test('exceeds max retries, errors out', function t(assert) { }; var cluster = allocCluster(ringpopOpts, function onReady() { - cluster.two.membership.checksum = cluster.one.membership.checksum + 1; + cluster.two.ring.checksum = cluster.one.ring.checksum + 1; cluster.two.on('requestProxy.checksumsDiffer', function onBadChecksum() { numAttempts++; @@ -238,7 +238,7 @@ test('cleans up pending sends', function t(assert) { }; var cluster = allocCluster(ringpopOpts, function onReady() { - cluster.two.membership.checksum = cluster.one.membership.checksum + 1; + cluster.two.ring.checksum = cluster.one.ring.checksum + 1; cluster.one.on('requestProxy.retryScheduled', function onRetry() { done(); @@ -265,11 +265,11 @@ test('cleans up some pending sends', function t(assert) { }; var cluster = allocCluster(ringpopOpts, function onReady() { - cluster.two.membership.checksum = cluster.one.membership.checksum + 1; + cluster.two.ring.checksum = cluster.one.ring.checksum + 1; // Only one retry will be attempted, others will still be waiting cluster.one.on('requestProxy.retryAttempted', function onRetry() { - cluster.two.membership.checksum = cluster.one.membership.checksum; + cluster.two.ring.checksum = cluster.one.ring.checksum; }); for (var i = 0; i < 2; i++) { @@ -332,7 +332,7 @@ test('overrides /proxy/req endpoint', function t(assert) { }); var head = strHead(request, { - checksum: cluster.two.membership.checksum, + checksum: cluster.two.ring.checksum, keys: [cluster.keys.two] }); }); @@ -375,7 +375,7 @@ test('aborts retry because keys diverge', function t(assert) { useFakeTimers: true }, function onReady() { // Make node two refuse initial request - cluster.two.membership.checksum = cluster.one.membership.checksum + 1; + cluster.two.ring.checksum = cluster.one.ring.checksum + 1; cluster.one.on('requestProxy.retryAborted', function onRetryAborted() { assert.pass('retry aborted'); @@ -425,7 +425,7 @@ test('reroutes retry to local', function t(assert) { useFakeTimers: true }, function onReady() { // Make node two refuse initial request - cluster.two.membership.checksum = cluster.one.membership.checksum + 1; + cluster.two.ring.checksum = cluster.one.ring.checksum + 1; cluster.one.on('requestProxy.retryRerouted', function onRetryRerouted() { assert.pass('retry rerouted'); @@ -467,7 +467,7 @@ test('reroutes retry to remote', function t(assert) { useFakeTimers: true }, function onReady() { // Make node two refuse initial request - cluster.two.membership.checksum = cluster.one.membership.checksum + 1; + cluster.two.ring.checksum = cluster.one.ring.checksum + 1; cluster.one.on('requestProxy.retryRerouted', function onRetryRerouted() { assert.pass('retry rerouted'); diff --git a/test/integration/request_proxy_test.js b/test/integration/request_proxy_test.js index 86785329..f2f2b0cd 100644 --- a/test/integration/request_proxy_test.js +++ b/test/integration/request_proxy_test.js @@ -325,12 +325,7 @@ test('handles checksum failures', function t(assert) { }; var cluster = allocCluster(ringpopOpts, function onReady() { - cluster.two.membership.addMember({ - address: 'localhost:9999', - status: 'fake', - incarnationNumber: 10 - }); - cluster.two.membership.computeChecksum(); + cluster.two.ring.addServer('localhost:9999'); cluster.request({ host: 'one', key: cluster.keys.two