Skip to content

Commit

Permalink
Fix protocol period timing bug by updating the protocol timing metric…
Browse files Browse the repository at this point in the history
… after every ping
  • Loading branch information
jwolski committed Feb 13, 2015
1 parent a642d81 commit b1355a3
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 49 deletions.
38 changes: 2 additions & 36 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,6 @@ function RingPop(options) {
this.pingReqTimeout = 5000;
this.pingTimeout = 1500;
this.proxyReqTimeout = options.proxyReqTimeout || 30000;
this.minProtocolPeriod = 200;
this.lastProtocolPeriod = Date.now();
this.lastProtocolRate = 0;
this.protocolPeriods = 0;
this.maxJoinDuration = options.maxJoinDuration || MAX_JOIN_DURATION;

this.requestProxy = new RequestProxy(this);
Expand All @@ -104,14 +100,10 @@ function RingPop(options) {
this.gossip = new Gossip(this);
this.suspicion = new Suspicion(this);

this.timing = new metrics.Histogram();
this.timing.update(this.minProtocolPeriod);
this.clientRate = new metrics.Meter();
this.serverRate = new metrics.Meter();
this.totalRate = new metrics.Meter();

this.protocolRateTimer = null;

this.statHostPort = this.hostPort.replace(':', '_');
this.statPrefix = 'ringpop.' + this.statHostPort;
this.statKeys = {};
Expand All @@ -127,7 +119,6 @@ RingPop.prototype.destroy = function destroy() {
this.destroyed = true;
this.gossip.stop();
this.suspicion.stopAll();
clearInterval(this.protocolRateTimer);

this.clientRate.m1Rate.stop();
this.clientRate.m5Rate.stop();
Expand Down Expand Up @@ -274,7 +265,6 @@ RingPop.prototype.bootstrap = function bootstrap(bootstrapFile, callback) {
});

self.gossip.start();
self.startProtocolRateTimer();
self.isReady = true;
self.emit('ready');

Expand Down Expand Up @@ -340,11 +330,6 @@ RingPop.prototype.clearDebugFlags = function clearDebugFlags() {
this.debugFlags = {};
};

RingPop.prototype.protocolRate = function () {
var observed = this.timing.percentiles([0.5])['0.5'] * 2;
return Math.max(observed, this.minProtocolPeriod);
};

RingPop.prototype.getStatsHooksStats = function getStatsHooksStats() {
if (Object.keys(this.statsHooks).length === 0) {
return null;
Expand All @@ -368,8 +353,8 @@ RingPop.prototype.getStats = function getStats() {
pid: process.pid
},
protocol: {
timing: this.timing.printObj(),
protocolRate: this.protocolRate(),
timing: this.gossip.protocolTiming.printObj(),
protocolRate: this.gossip.computeProtocolRate(),
clientRate: this.clientRate.printObj().m1,
serverRate: this.serverRate.printObj().m1,
totalRate: this.totalRate.printObj().m1
Expand Down Expand Up @@ -491,16 +476,6 @@ RingPop.prototype.whoami = function whoami() {
return this.hostPort;
};

RingPop.prototype.computeProtocolDelay = function computeProtocolDelay() {
if (this.protocolPeriods) {
var target = this.lastProtocolPeriod + this.lastProtocolRate;
return Math.max(target - Date.now(), this.minProtocolPeriod);
} else {
// Delay for first tick will be staggered from 0 to `minProtocolPeriod` ms.
return Math.floor(Math.random() * (this.minProtocolPeriod + 1));
}
};

RingPop.prototype.issueMembershipChanges = function issueMembershipChanges(checksum, source) {
return this.dissemination.getChanges(checksum, source);
};
Expand Down Expand Up @@ -613,9 +588,6 @@ RingPop.prototype.pingMemberNow = function pingMemberNow(callback) {
return callback();
}

this.lastProtocolPeriod = Date.now();
this.protocolPeriods++;

var member = this.memberIterator.next();

if (! member) {
Expand Down Expand Up @@ -744,12 +716,6 @@ RingPop.prototype.setLogger = function setLogger(logger) {
this.logger = logger;
};

RingPop.prototype.startProtocolRateTimer = function startProtocolRateTimer() {
this.protocolRateTimer = setInterval(function () {
this.lastProtocolRate = this.protocolRate();
}.bind(this), 1000);
};

RingPop.prototype.stat = function stat(type, key, value) {
if (!this.statKeys[key]) {
this.statKeys[key] = this.statPrefix + '.' + key;
Expand Down
53 changes: 47 additions & 6 deletions lib/swim.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
// THE SOFTWARE.
var clearTimeout = require('timers').clearTimeout;
var globalSetTimeout = require('timers').setTimeout;
var metrics = require('metrics');
var safeParse = require('./util').safeParse;
var TypedError = require('error/typed');

Expand Down Expand Up @@ -267,20 +268,48 @@ PingSender.prototype.doCallback = function doCallback(isOk, bodyObj) {

function Gossip(ringpop) {
this.ringpop = ringpop;

this.isStopped = true;
this.timer = null;
this.lastProtocolPeriod = Date.now();
this.lastProtocolRate = 0;
this.minProtocolPeriod = 200;
this.numProtocolPeriods = 0;
this.protocolTiming = new metrics.Histogram();
this.protocolTiming.update(this.minProtocolPeriod);
this.protocolPeriodTimer = null;
this.protocolRateTimer = null;
}

Gossip.prototype.computeProtocolDelay = function computeProtocolDelay() {
if (this.numProtocolPeriods) {
var target = this.lastProtocolPeriod + this.lastProtocolRate;
return Math.max(target - Date.now(), this.minProtocolPeriod);
} else {
// Delay for first tick will be staggered from 0 to `minProtocolPeriod` ms.
return Math.floor(Math.random() * (this.minProtocolPeriod + 1));
}
};

Gossip.prototype.computeProtocolRate = function computeProtocolRate() {
var observed = this.protocolTiming.percentiles([0.5])['0.5'] * 2;
return Math.max(observed, this.minProtocolPeriod);
};

Gossip.prototype.run = function run() {
var self = this;

var protocolDelay = this.ringpop.computeProtocolDelay();
var protocolDelay = this.computeProtocolDelay();
this.ringpop.stat('timing', 'protocol.delay', protocolDelay);

var startTime = new Date();
this.timer = setTimeout(function() {
self.ringpop.pingMemberNow(function() {
this.protocolPeriodTimer = setTimeout(function onGossipTimer() {
var pingStartTime = Date.now();

self.ringpop.pingMemberNow(function onMemberPinged() {
self.lastProtocolPeriod = Date.now();
self.numProtocolPeriods++;
self.ringpop.stat('timing', 'protocol.frequency', startTime);
self.protocolTiming.update(Date.now() - pingStartTime); // This keeps the protocol rate in check

if (self.isStopped) {
self.ringpop.logger.debug('stopped recurring gossip loop', {
Expand All @@ -304,13 +333,21 @@ Gossip.prototype.start = function start() {

this.ringpop.membership.shuffle();
this.run();
this.startProtocolRateTimer();
this.isStopped = false;

this.ringpop.logger.debug('started gossip protocol', {
local: this.ringpop.membership.getLocalMemberAddress()
});
};

Gossip.prototype.startProtocolRateTimer = function startProtocolRateTimer() {
var self = this;
this.protocolRateTimer = setInterval(function () {
self.lastProtocolRate = self.computeProtocolRate();
}, 1000);
};

Gossip.prototype.stop = function stop() {
if (this.isStopped) {
this.ringpop.logger.warn('gossip is already stopped', {
Expand All @@ -319,8 +356,12 @@ Gossip.prototype.stop = function stop() {
return;
}

clearTimeout(this.timer);
this.timer = null;
clearInterval(this.protocolRateTimer);
this.protocolRateTimer = null;

clearTimeout(this.protocolPeriodTimer);
this.protocolPeriodTimer = null;

this.isStopped = true;

this.ringpop.logger.debug('stopped gossip protocol', {
Expand Down
16 changes: 9 additions & 7 deletions test/swim_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,24 +64,26 @@ test('join is aborted when max join duration is exceeded', function t(assert) {
assert.end();
});

test('starting and stopping gossip sets timer / unsets timer', function t(assert) {
test('starting and stopping gossip sets timer / unsets timers', function t(assert) {
var gossip = createGossip();

gossip.start();
assert.ok(gossip.timer, 'timer was set');
assert.ok(gossip.protocolPeriodTimer, 'protocol period timer was set');
assert.ok(gossip.protocolRateTimer, 'protocol rate timer was set');

gossip.stop();
assert.notok(gossip.timer, 'timer was cleared');
assert.notok(gossip.protocolPeriodTimer, 'protocol period timer was cleared');
assert.notok(gossip.protocolRateTimer, 'protocol rate timer was cleared');

assert.end();
});

test('stopping gossip is a noop if gossip was never started', function t(assert) {
var gossip = createGossip();
gossip.timer = 'nochange';
gossip.protocolPeriodTimer = 'nochange';

gossip.stop();
assert.equals(gossip.timer, 'nochange', 'timer was not cleared');
assert.equals(gossip.protocolPeriodTimer, 'nochange', 'timer was not cleared');
assert.equals(gossip.isStopped, true, 'gossip was not stopped');

assert.end();
Expand All @@ -92,11 +94,11 @@ test('gossip can be restarted', function t(assert) {
gossip.start();

gossip.stop();
assert.equals(gossip.timer, null, 'timer was cleared');
assert.equals(gossip.protocolPeriodTimer, null, 'timer was cleared');
assert.equals(gossip.isStopped, true, 'gossip was stopped');

gossip.start();
assert.ok(gossip.timer, 'timer was set');
assert.ok(gossip.protocolPeriodTimer, 'timer was set');
assert.equals(gossip.isStopped, false, 'gossip was started');

gossip.stop(); // Cleanup
Expand Down

0 comments on commit b1355a3

Please sign in to comment.