Skip to content

Commit

Permalink
If a node gets a join request, it no longer adds the joining node to …
Browse files Browse the repository at this point in the history
…the membership.

A node is therefore expected to disseminate its own existence to the membership.
  • Loading branch information
Wieger Steggerda authored and jwolski committed Nov 24, 2015
1 parent 59d49e9 commit 329ba37
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 42 deletions.
1 change: 1 addition & 0 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ Config.prototype._seed = function _seed(seed) {
seedOrDefault('dampingLogLevel', LoggingLevels.warn);
seedOrDefault('gossipLogLevel', LoggingLevels.off);
seedOrDefault('joinLogLevel', LoggingLevels.warn);
seedOrDefault('disseminationLogLevel', LoggingLevels.off);

// Gossip configs
seedOrDefault('autoGossip', true);
Expand Down
13 changes: 13 additions & 0 deletions lib/gossip/dissemination.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

var EventEmitter = require('events').EventEmitter;
var util = require('util');
var GossipEvents = require('./events');

var LOG_10 = Math.log(10);

Expand All @@ -31,6 +32,7 @@ function Dissemination(ringpop) {
this.changes = {};
this.maxPiggybackCount = Dissemination.Defaults.maxPiggybackCount;
this.piggybackFactor = Dissemination.Defaults.piggybackFactor;
this.logger = this.ringpop.loggerFactory.getLogger('dissemination');
}

util.inherits(Dissemination, EventEmitter);
Expand Down Expand Up @@ -156,6 +158,9 @@ Dissemination.prototype._issueAs = function _issueAs(filterChange, mapChanges) {

if (change.piggybackCount > this.maxPiggybackCount) {
delete this.changes[address];
if (Object.keys(this.changes).length === 0) {
this.emit('changesExhausted', new GossipEvents.ChangesExhaustedEvent());
}
continue;
}

Expand All @@ -172,6 +177,14 @@ Dissemination.prototype._issueAs = function _issueAs(filterChange, mapChanges) {

this.ringpop.stat('gauge', 'changes.disseminate', changesToDisseminate.length);


if (changesToDisseminate.length > 0) {
this.logger.info('ringpop dissemination send', {
local: this.ringpop.whoami(),
changesCount: changesToDisseminate.length
});
}

return mapChanges(changesToDisseminate);
};

Expand Down
28 changes: 28 additions & 0 deletions lib/gossip/events.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) 2015 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
'use strict';

function ChangesExhaustedEvent() {
this.name = this.constructor.name;
}

module.exports = {
ChangesExhaustedEvent: ChangesExhaustedEvent
};
4 changes: 4 additions & 0 deletions lib/logging/logger_factory.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ LoggerFactory.prototype.getLogger = function getLogger(name) {
this.loggers[name] = new ModuleLogger(this.ringpop,
'joinLogLevel');
break;
case 'dissemination':
this.loggers[name] = new ModuleLogger(this.ringpop,
'disseminationLogLevel');
break;
default:
this.loggers[name] = new ModuleLogger(this.ringpop,
'defaultLogLevel');
Expand Down
2 changes: 0 additions & 2 deletions server/protocol/join.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ module.exports = function createJoinHandler(ringpop) {
ringpop.serverRate.mark();
ringpop.totalRate.mark();

ringpop.membership.makeAlive(source, incarnationNumber);

callback(null, null, JSON.stringify({
app: ringpop.app,
coordinator: ringpop.whoami(),
Expand Down
6 changes: 4 additions & 2 deletions test/integration/admin_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ var RingpopClient = require('../../client.js');
var testRingpopCluster = require('../lib/test-ringpop-cluster.js');

testRingpopCluster({
size: 1
size: 1,
waitForConvergence: false
}, 'config endpoints', function t(bootRes, cluster, assert) {
assert.plan(4);

Expand Down Expand Up @@ -55,7 +56,8 @@ testRingpopCluster({
// Test to make sure these endpoints are available. Find tests that test
// behavior of handlers under unit tests.
testRingpopCluster({
size: 1
size: 1,
waitForConvergence: false
}, 'gossip endpoints', function t(bootRes, cluster, assert) {
assert.plan(4);

Expand Down
51 changes: 27 additions & 24 deletions test/integration/gossip_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,8 @@ function assertNumBadStatuses(assert, res, num) {
}

function mkNoGossip(cluster) {
var noop = function noop() {
};

cluster.forEach(function eachRingpop(ringpop) {
ringpop.gossip.start = noop;
ringpop.gossip.stop();
});
}

Expand All @@ -59,10 +56,11 @@ function mkBadPingReqResponder(ringpop) {
}

testRingpopCluster({
tap: function tap(cluster) {
tapAfterConvergence: function tapAfterConvergence(cluster) {
mkNoGossip(cluster);
}
}, 'ping-reqs 1 member', function t(bootRes, cluster, assert) {

var ringpop = cluster[0];
var unreachableMember = ringpop.membership.findMemberByAddress(cluster[1].hostPort);

Expand All @@ -82,10 +80,10 @@ testRingpopCluster({

testRingpopCluster({
size: 5,
tap: function tap(cluster) {
tapAfterConvergence: function tapAfterConvergence(cluster) {
mkNoGossip(cluster);
}
}, 'ping-reqs pingReqSize members', function t(bootRes, cluster, assert) {
}, 'ping-reqs 3 members', function t(bootRes, cluster, assert) {
var ringpop = cluster[0];
var unreachableMember = ringpop.membership.
findMemberByAddress(cluster[1].hostPort);
Expand All @@ -106,32 +104,35 @@ testRingpopCluster({

testRingpopCluster({
size: 5,
tap: function tap(cluster) {
tapAfterConvergence: function tapAfterConvergence(cluster) {
mkNoGossip(cluster);
}
}, 'ping-req target unreachable', function t(bootRes, cluster, assert) {
var badRingpop = cluster[4];
badRingpop.on('destroyed', onDestroyed);
badRingpop.destroy();

var ringpop = cluster[0];
var unreachableMember = ringpop.membership.findMemberByAddress(badRingpop.hostPort);
var pingReqSize = 3;

sendPingReq({
ringpop: ringpop,
unreachableMember: unreachableMember,
pingReqSize: pingReqSize
}, function onPingReq(err, res) {
assert.ifErr(err, 'no error occurred');
assertNumBadStatuses(assert, res, pingReqSize);
assertSuspect(assert, ringpop, unreachableMember.address);
assert.end();
});
function onDestroyed() {
var ringpop = cluster[0];
var unreachableMember = ringpop.membership.findMemberByAddress(badRingpop.hostPort);
var pingReqSize = 3;

sendPingReq({
ringpop: ringpop,
unreachableMember: unreachableMember,
pingReqSize: pingReqSize
}, function onPingReq(err, res) {
assert.ifErr(err, 'no error occurred');
assertNumBadStatuses(assert, res, pingReqSize);
assertSuspect(assert, ringpop, unreachableMember.address);
assert.end();
});
}
});

testRingpopCluster({
size: 2,
tap: function tap(cluster) {
tapAfterConvergence: function tapAfterConvergence(cluster) {
mkNoGossip(cluster);
}
}, 'no ping-req members', function t(bootRes, cluster, assert) {
Expand All @@ -157,6 +158,8 @@ testRingpopCluster({
size: 5,
tap: function tap(cluster) {
mkBadPingReqResponder(cluster[3]);
},
tapAfterConvergence: function tapAfterConvergence(cluster) {
mkNoGossip(cluster);
}
}, 'some bad ping-statuses', function t(bootRes, cluster, assert) {
Expand All @@ -181,7 +184,7 @@ testRingpopCluster({

testRingpopCluster({
size: 5,
tap: function tap(cluster) {
tapAfterConvergence: function tapAfterConvergence(cluster) {
mkNoGossip(cluster);
}
}, 'ping-req inconclusive', function t(bootRes, cluster, assert) {
Expand Down
17 changes: 10 additions & 7 deletions test/integration/join-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ test('bootstrap with self is ok', function t(assert) {
});

testRingpopCluster({
size: 1
size: 1,
waitForConvergence: false
}, 'one node can join', function t(bootRes, cluster, assert) {
assert.ifErr(bootRes[cluster[0].hostPort].err, 'no error occurred');
assert.end();
Expand Down Expand Up @@ -76,8 +77,7 @@ testRingpopCluster({
var badNode = cluster[2].hostPort;

cluster.forEach(function eachNode(node) {
assert.ok(node.isReady, 'node is bootstrapped');

assert.ok(node.isReady, 'node is ready');
var nodesJoined = bootRes[node.hostPort].nodesJoined;
assert.ok(nodesJoined.length >= 1, 'joined at least one other node');
assert.ok(nodesJoined.indexOf(badNode) === -1, 'no one can join bad node');
Expand All @@ -92,12 +92,13 @@ testRingpopCluster({
tap: function tap(cluster) {
cluster[1].denyJoins();
cluster[2].denyJoins();
}
},
waitForConvergence: false
}, 'three nodes, two of them bad, join size equals two', function t(bootRes, cluster, assert) {
assert.equal(cluster.length, 3, 'cluster of 3');

cluster.forEach(function eachNode(node) {
assert.notok(node.isReady, 'node is not bootstrapped');
assert.notok(node.isReady, 'node is not ready');
assert.equal(bootRes[node.hostPort].err.type,
'ringpop.join-duration-exceeded',
'join duration exceeded error');
Expand Down Expand Up @@ -133,7 +134,8 @@ testRingpopCluster({
}));
}, 100);
});
}
},
waitForConvergence: false
}, 'slow joiner', function t(bootRes, cluster, assert) {
assert.equal(cluster.length, 2, 'cluster of 2');

Expand All @@ -157,7 +159,8 @@ testRingpopCluster({
cluster[0].config.set('joinDelayMin', 0);
cluster[0].config.set('maxJoinDuration', 1);
cluster[1].config.set('memberBlacklist', [/127.0.0.1:10000/]);
}
},
checkChecksums: false
}, 'join blacklist', function t(bootRes, cluster, assert) {
assert.notok(cluster[0].isReady, 'node one is not ready');
assert.ok(cluster[1].isReady, 'node two is ready');
Expand Down
2 changes: 1 addition & 1 deletion test/integration/proxy_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1103,7 +1103,7 @@ test('proxies big json', function t(assert) {

var opts = {
bodyLimit: bodyLimit,
requestProxyMaxRetries: 0,
requestProxyMaxRetries: 0
};

var cluster = allocCluster(opts, function onReady() {
Expand Down
75 changes: 69 additions & 6 deletions test/lib/test-ringpop-cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ var tape = require('tape');
var Ringpop = require('../../index.js');
var TChannel = require('tchannel');

function bootstrapClusterOf(opts, onBootstrap) {
var cluster = createClusterOf(opts);
function bootstrapClusterOf(cluster, opts, onBootstrap) {

var bootstrapHosts = cluster.map(function mapRingpop(ringpop) {
return ringpop.hostPort;
Expand Down Expand Up @@ -120,6 +119,31 @@ function destroyCluster(cluster) {
});
}

function assertEqualChecksums(assert, cluster) {
var checksums = _.chain(cluster)
.pluck('membership')
.pluck('checksum')
.uniq();
if (!(checksums.length === 1 && typeof checksums[0] === 'number')) {
assert.fail('not all checksums are equal');
}
}

function speedUpGossipProtocol(cluster) {
var tmpMinProtocolPeriods = [];
cluster.forEach(function each(ringpop, i) {
tmpMinProtocolPeriods[i] = ringpop.gossip.minProtocolPeriod;
ringpop.gossip.minProtocolPeriod = 1;
});
return tmpMinProtocolPeriods;
}

function revertGossipProtocolSpeedUp(cluster, periods) {
cluster.forEach(function each(ringpop, i) {
ringpop.gossip.minProtocolPeriod = periods[i];
});
}

function testRingpopCluster(opts, name, test) {
if (typeof opts === 'string' && typeof name === 'function') {
test = name;
Expand All @@ -128,13 +152,52 @@ function testRingpopCluster(opts, name, test) {
}

tape(name, function onTest(assert) {
var cluster = bootstrapClusterOf(opts, function onBootstrap(results) {
assert.on('end', function onEnd() {
destroyCluster(cluster);
var cluster = createClusterOf(opts);
var joinResults;

// Speed up gossip protocol; make sure to revert when after convergence
var periods = speedUpGossipProtocol(cluster);

if (opts.waitForConvergence !== false) {
var onOneExhausted = _.after(cluster.length, onSteadyState);
cluster.forEach(function each(ringpop) {
ringpop.dissemination.on('changesExhausted', onOneExhausted);
});
}

test(results, cluster, assert);
cluster = bootstrapClusterOf(cluster, opts, function onBootstrap(results) {
joinResults = results;

// Not all tests converge. e.g. join tests with broken nodes shouldn't.
// This option allows us to still run the checks
if (opts.waitForConvergence === false) {
onSteadyState();
}
});

assert.on('end', function onEnd() {
destroyCluster(cluster);
});

function onSteadyState() {
// do not run onConverged if not all joins are succesful
if (joinResults === undefined) {
return;
}

revertGossipProtocolSpeedUp(cluster, periods);

if (opts.waitForConvergence !== false && opts.checkChecksums === true) {
assertEqualChecksums(assert, cluster);
}

if (typeof opts.tapAfterConvergence === 'function') {
opts.tapAfterConvergence(cluster);
}

test(joinResults, cluster, assert);
}

});
}

Expand Down

0 comments on commit 329ba37

Please sign in to comment.