Skip to content

Commit

Permalink
Join duration w/ backoff
Browse files Browse the repository at this point in the history
  • Loading branch information
jwolski committed Oct 20, 2015
1 parent 3ded16c commit 568a68d
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 74 deletions.
7 changes: 6 additions & 1 deletion config.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,12 @@ Config.prototype._seed = function _seed(seed) {
seedOrDefault('dampScoringReuseLimit', 2500);
seedOrDefault('dampScoringSuppressDuration', 60 * 60 * 1000); // 1 hr in ms
seedOrDefault('dampScoringSuppressLimit', 5000);
seedOrDefault('maxJoinAttempts', 50, numValidator);

// Joiner config
seedOrDefault('joinDelayMin', 100, numValidator); // ms
seedOrDefault('joinDelayMax', 2 * 60 * 1000, numValidator); // 2 min in ms
seedOrDefault('maxJoinDuration', 20 * 60 * 1000, numValidator); // 20 mins in ms

seedOrDefault('memberBlacklist', [], function validator(vals) {
return _.all(vals, function all(val) {
return val instanceof RegExp;
Expand Down
5 changes: 1 addition & 4 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,10 @@ var registerRingpopListeners = require('./lib/on_ringpop_event.js').register;
var RingpopClient = require('./client.js');
var RingpopServer = require('./server');
var safeParse = require('./lib/util').safeParse;
var sendJoin = require('./lib/gossip/join-sender.js').joinCluster;
var sendJoin = require('./lib/gossip/joiner.js').joinCluster;
var TracerStore = require('./lib/trace/store.js');

var HOST_PORT_PATTERN = /^(\d+.\d+.\d+.\d+):\d+$/;
var MAX_JOIN_DURATION = 300000;
var MEMBERSHIP_UPDATE_FLUSH_INTERVAL = 5000;

function RingPop(options) {
Expand Down Expand Up @@ -115,7 +114,6 @@ function RingPop(options) {
this.pingTimeout = options.pingTimeout || 1500;
this.joinTimeout = options.joinTimeout || 1000;
this.proxyReqTimeout = options.proxyReqTimeout || 30000;
this.maxJoinDuration = options.maxJoinDuration || MAX_JOIN_DURATION;
this.membershipUpdateFlushInterval = options.membershipUpdateFlushInterval ||
MEMBERSHIP_UPDATE_FLUSH_INTERVAL;

Expand Down Expand Up @@ -282,7 +280,6 @@ RingPop.prototype.bootstrap = function bootstrap(opts, callback) {

sendJoin({
ringpop: self,
maxJoinDuration: self.maxJoinDuration,
joinSize: self.joinSize,
parallelismFactor: opts.joinParallelismFactor,
joinTimeout: self.joinTimeout
Expand Down
121 changes: 54 additions & 67 deletions lib/gossip/join-sender.js → lib/gossip/joiner.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

var captureHost = require('../util.js').captureHost;
var errors = require('../errors.js');
var globalTimers = require('timers');
var isEmptyArray = require('../util.js').isEmptyArray;
var mergeJoinResponses = require('./join-response-merge.js');
var numOrDefault = require('../util.js').numOrDefault;
Expand All @@ -40,29 +41,8 @@ var JoinDurationExceededError = TypedError({
max: null
});

var JoinAttemptsExceededError = TypedError({
type: 'ringpop.join-attempts-exceeded',
message: 'Join attempts of `{joinAttempts}` ' +
'exceeded max `{maxJoinAttempts}`.\n',
joinAttempts: null,
maxJoinAttempts: null
});

var JOIN_RETRY_DELAY = 100;
var JOIN_SIZE = 3;
var JOIN_TIMEOUT = 1000;
// If a node cannot complete a join within MAX_JOIN_DURATION
// there is likely something very wrong. The aim is for the join
// operation to take no more than 1s, under normal conditions.
//
// The duration assigned below is very high for the following
// purposes:
// - Gives an application developer some time to diagnose
// what could be wrong.
// - Gives an operator some time to bootstrap a newly
// provisioned cluster
// - Trying forever is futile
var MAX_JOIN_DURATION = 120000;
var PARALLELISM_FACTOR = 2;

function isSingleNodeCluster(ringpop) {
Expand All @@ -79,7 +59,7 @@ function takeNode(hosts) {
return host;
}

function JoinCluster(opts) {
function Joiner(opts) {
opts = opts || {};

if (!opts.ringpop) {
Expand All @@ -97,6 +77,7 @@ function JoinCluster(opts) {
}

this.ringpop = opts.ringpop;
this.timers = opts.timers || globalTimers;
this.host = captureHost(this.ringpop.hostPort);
this.joinTimeout = numOrDefault(opts.joinTimeout, JOIN_TIMEOUT);

Expand All @@ -107,18 +88,6 @@ function JoinCluster(opts) {
this.parallelismFactor = numOrDefault(opts.parallelismFactor,
PARALLELISM_FACTOR);

// We eventually want to give up if the join process cannot
// succeed. `maxJoinDuration` is used to restrict that process
// to a certain time limit.
this.maxJoinDuration = numOrDefault(opts.maxJoinDuration,
MAX_JOIN_DURATION);

// We do not want to retry joining as hard as we can. We
// want to have some fixed backoff applied before we try
// to join again
this.joinRetryDelay = numOrDefault(opts.joinRetryDelay,
JOIN_RETRY_DELAY);

// Potential nodes are nodes in the ringpop bootstrap
// list that can be joined. Upon instantiation, this step
// simply filters out a node from attempting to join itself.
Expand All @@ -144,10 +113,13 @@ function JoinCluster(opts) {
// Changes received by other nodes will be aggregated and
// applied once the join process is complete.
this.joinResponses = [];

this.joinDelay = this.ringpop.config.get('joinDelayMin');
this.joinRetries = 0;
}

// Potential nodes are those that are not this instance of ringpop.
JoinCluster.prototype.collectPotentialNodes = function collectPotentialNodes(nodesJoined) {
Joiner.prototype.collectPotentialNodes = function collectPotentialNodes(nodesJoined) {
nodesJoined = nodesJoined || [];

var self = this;
Expand All @@ -157,15 +129,15 @@ JoinCluster.prototype.collectPotentialNodes = function collectPotentialNodes(nod
};

// Preferred nodes are those that are not on the same host as this instance of ringpop.
JoinCluster.prototype.collectPreferredNodes = function collectPreferredNodes() {
Joiner.prototype.collectPreferredNodes = function collectPreferredNodes() {
var self = this;
return this.potentialNodes.filter(function filterHost(hostPort) {
return self.host !== captureHost(hostPort);
});
};

// Non-preferred nodes are everyone else.
JoinCluster.prototype.collectNonPreferredNodes = function collectNonPreferredNodes() {
Joiner.prototype.collectNonPreferredNodes = function collectNonPreferredNodes() {
var self = this;

if (isEmptyArray(this.preferredNodes)) {
Expand All @@ -177,7 +149,7 @@ JoinCluster.prototype.collectNonPreferredNodes = function collectNonPreferredNod
}
};

JoinCluster.prototype.init = function init(nodesJoined) {
Joiner.prototype.init = function init(nodesJoined) {
// TODO The "collect" operations are fairly inefficient. This
// can be improved by indexing by host/port values.
this.potentialNodes = this.collectPotentialNodes(nodesJoined);
Expand All @@ -191,7 +163,7 @@ JoinCluster.prototype.init = function init(nodesJoined) {
this.roundNonPreferredNodes = this.nonPreferredNodes.slice(0);
};

JoinCluster.prototype.join = function join(callback) {
Joiner.prototype.join = function join(callback) {
var self = this;

if (this.ringpop.destroyed) {
Expand Down Expand Up @@ -221,7 +193,6 @@ JoinCluster.prototype.join = function join(callback) {
var numFailed = 0;
var startTime = Date.now();
var calledBack = false;
var maxJoinAttempts = this.ringpop.config.get('maxJoinAttempts');

function onJoin(err, nodes) {
if (calledBack) {
Expand Down Expand Up @@ -271,29 +242,14 @@ JoinCluster.prototype.join = function join(callback) {

calledBack = true;
callback(null, nodesJoined);
} else if (numFailed >= maxJoinAttempts) {
self.ringpop.logger.warn('ringpop max join attempts exceeded', {
local: self.ringpop.whoami(),
joinAttempts: numFailed,
maxJoinAttempts: maxJoinAttempts,
numJoined: numJoined,
numFailed: numFailed,
startTime: startTime
});

calledBack = true;
callback(JoinAttemptsExceededError({
joinAttempts: numFailed,
maxJoinAttempts: maxJoinAttempts
}));
return;
} else {
var joinDuration = Date.now() - startTime;
if (joinDuration > self.maxJoinDuration) {
var maxJoinDuration = self.ringpop.config.get('maxJoinDuration');
if (joinDuration > maxJoinDuration) {
self.ringpop.logger.warn('ringpop max join duration exceeded', {
local: self.ringpop.whoami(),
joinDuration: joinDuration,
maxJoinDuration: self.maxJoinDuration,
maxJoinDuration: maxJoinDuration,
numJoined: numJoined,
numFailed: numFailed,
startTime: startTime
Expand All @@ -302,7 +258,7 @@ JoinCluster.prototype.join = function join(callback) {
calledBack = true;
callback(JoinDurationExceededError({
joinDuration: joinDuration,
maxJoinDuration: self.maxJoinDuration
maxJoinDuration: maxJoinDuration
}));
return;
}
Expand All @@ -315,18 +271,49 @@ JoinCluster.prototype.join = function join(callback) {
numNodesLeft: self.joinSize - numJoined
});

setTimeout(reJoin, self.joinRetryDelay);
}
var oldJoinDelay = self.joinDelay;
var delayMax = self.ringpop.config.get('joinDelayMax');
var delayMin = self.ringpop.config.get('joinDelayMin');
self.joinDelay = Math.min(delayMax, delayMin * Math.pow(2,
self.joinRetries));

// Determine if the join delay has exceeded the maximum
// delay and send out a warning letting developer's know
// that Ringpop is having trouble.
if (oldJoinDelay < delayMax &&
self.joinDelay >= delayMax) {
var errorMsg = 'ringpop joiner reached max retry delay. ' +
'this is a strong indication that ringpop is having ' +
'trouble joining a cluster and could be due to a ' +
'misconfiguration of your environment. ringpop will ' +
'continue to join up to the max join duration.';
self.ringpop.logger.error(errorMsg, {
local: self.ringpop.whoami(),
retriesSoFar: self.joinRetries,
joinDelayMax: delayMax,
maxJoinDuration: maxJoinDuration
});
}

function reJoin() {
self.joinGroup(nodesJoined, onJoin);
self.ringpop.logger.info('ringpop joiner will attempt retry after delay', {
local: self.ringpop.whoami(),
retriesSoFar: self.joinRetries,
delay: self.joinDelay,
maxJoinDuration: maxJoinDuration,
timeJoiningSoFar: Date.now() - startTime
});
// Attempt to retry the join after applying the delay backoff.
self.timers.setTimeout(function onTimeout() {
self.joinRetries++;
self.joinGroup(nodesJoined, onJoin);
}, self.joinDelay);
}
}

this.joinGroup(nodesJoined, onJoin);
};

JoinCluster.prototype.joinGroup = function joinGroup(totalNodesJoined, callback) {
Joiner.prototype.joinGroup = function joinGroup(totalNodesJoined, callback) {
var self = this;
var group = this.selectGroup(totalNodesJoined);

Expand Down Expand Up @@ -383,7 +370,7 @@ JoinCluster.prototype.joinGroup = function joinGroup(totalNodesJoined, callback)
}
};

JoinCluster.prototype.joinNode = function joinNode(node, callback) {
Joiner.prototype.joinNode = function joinNode(node, callback) {
var self = this;
var joinOpts = {
host: node,
Expand Down Expand Up @@ -442,7 +429,7 @@ JoinCluster.prototype.joinNode = function joinNode(node, callback) {
}
};

JoinCluster.prototype.selectGroup = function selectGroup(nodesJoined) {
Joiner.prototype.selectGroup = function selectGroup(nodesJoined) {
nodesJoined = nodesJoined || [];

var self = this;
Expand Down Expand Up @@ -483,7 +470,7 @@ JoinCluster.prototype.selectGroup = function selectGroup(nodesJoined) {
};

function createJoiner(opts) {
return new JoinCluster(opts);
return new Joiner(opts);
}

function joinCluster(opts, callback) {
Expand Down
2 changes: 1 addition & 1 deletion server/admin/member.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

var errors = require('../../lib/errors.js');
var Member = require('../../lib/membership/member.js');
var sendJoin = require('../../lib/gossip/join-sender.js').joinCluster;
var sendJoin = require('../../lib/gossip/joiner.js').joinCluster;
var TypedError = require('error/typed');

var RedundantLeaveError = TypedError({
Expand Down
2 changes: 1 addition & 1 deletion test/unit/join-sender-test.js → test/unit/joiner_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// THE SOFTWARE.
'use strict';

var createJoiner = require('../../lib/gossip/join-sender.js').createJoiner;
var createJoiner = require('../../lib/gossip/joiner.js').createJoiner;
var Ringpop = require('../../index.js');
var test = require('tape');

Expand Down

0 comments on commit 568a68d

Please sign in to comment.