Skip to content

Commit

Permalink
Backpressure based on number of inflight requests and event loop lag
Browse files Browse the repository at this point in the history
  • Loading branch information
jwolski committed Oct 28, 2015
1 parent 34cc63e commit 617f43b
Show file tree
Hide file tree
Showing 9 changed files with 404 additions and 22 deletions.
6 changes: 6 additions & 0 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ Config.prototype._seed = function _seed(seed) {
seedOrDefault('joinTroubleErrorEnabled', true);
seedOrDefault('maxJoinDuration', 20 * 60 * 1000, numValidator); // 20 mins in ms

// Forwarding config
seedOrDefault('backpressureEnabled', false);
seedOrDefault('lagSamplerInterval', 1000, numValidator);
seedOrDefault('maxEventLoopLag', 1500, numValidator);
seedOrDefault('maxInflightRequests', 5000, numValidator);

seedOrDefault('memberBlacklist', [], function validator(vals) {
return _.all(vals, function all(val) {
return val instanceof RegExp;
Expand Down
7 changes: 7 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,13 @@ var getTChannelVersion = require('./lib/util.js').getTChannelVersion;
var HashRing = require('./lib/ring');
var initMembership = require('./lib/membership/index.js');
var LoggerFactory = require('./lib/logging/logger_factory.js');
var LagSampler = require('./lib/lag_sampler.js');
var MembershipIterator = require('./lib/membership/iterator.js');
var MembershipUpdateRollup = require('./lib/membership/rollup.js');
var nulls = require('./lib/nulls');
var rawHead = require('./lib/request-proxy/util.js').rawHead;
var RequestProxy = require('./lib/request-proxy/index.js');
var registerConfigListeners = require('./lib/on_config_event.js').register;
var registerMembershipListeners = require('./lib/on_membership_event.js').register;
var registerRingListeners = require('./lib/on_ring_event.js').register;
var registerRingpopListeners = require('./lib/on_ringpop_event.js').register;
Expand Down Expand Up @@ -132,6 +134,10 @@ function RingPop(options) {
this.hashFunc = farmhash.hash32;
}

this.lagSampler = new LagSampler({
ringpop: this
});

this.requestProxy = new RequestProxy({
ringpop: this,
maxRetries: options.requestProxyMaxRetries,
Expand Down Expand Up @@ -162,6 +168,7 @@ function RingPop(options) {

this.tracers = new TracerStore(this);

registerConfigListeners(this);
registerMembershipListeners(this);
registerRingListeners(this);
registerRingpopListeners(this);
Expand Down
81 changes: 81 additions & 0 deletions lib/lag_sampler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (c) 2015 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
'use strict';

var globalTimers = require('timers');

function LagSampler(opts) {
this.ringpop = opts.ringpop;
this.timers = opts.timers || globalTimers;
this.lagTimer = null;
this.currentLag = 0;

// toobusy auto-starts sampling. lazily-require until started.
this.toobusy = null;
}

LagSampler.prototype.start = function start() {
var self = this;

if (this.lagTimer) {
this.ringpop.logger.debug('ringpop lag sampler lag timer already started', {
local: this.ringpop.whoami()
});
return;
}

if (!this.toobusy) {
this.toobusy = require('toobusy');
}

schedule();
this.ringpop.logger.debug('ringpop lag sampler started', {
local: this.ringpop.whoami()
});

function schedule() {
self.lagTimer = self.timers.setTimeout(function onTimeout() {
self.currentLag = self.toobusy.lag();
schedule();
}, self.ringpop.config.get('lagSamplerInterval'));
}
};

LagSampler.prototype.stop = function stop() {
if (!this.lagTimer) {
this.ringpop.logger.debug('ringpop lag sampler lag timer already stopped', {
local: this.ringpop.whoami()
});
return;
}

this.timers.clearTimeout(this.lagTimer);
this.lagTimer = null;
this.ringpop.logger.debug('ringpop lag sampler stopped', {
local: this.ringpop.whoami()
});

if (this.toobusy) {
this.toobusy.shutdown();
this.toobusy = null;
}
};

module.exports = LagSampler;
39 changes: 39 additions & 0 deletions lib/on_config_event.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) 2015 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
'use strict';

function createBackpressureEnabledHandler(ringpop) {
return function onBackpressureEnabled(newValue) {
if (newValue === true) {
ringpop.lagSampler.start();
} else {
ringpop.lagSampler.stop();
}
};
}

function register(ringpop) {
ringpop.config.on('set.backpressureEnabled',
createBackpressureEnabledHandler(ringpop));
}

module.exports = {
register: register
};
12 changes: 12 additions & 0 deletions lib/on_ringpop_event.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,26 @@ function createReadyHandler(ringpop) {
if (ringpop.config.get('autoGossip')) {
ringpop.gossip.start();
}

if (ringpop.config.get('backpressureEnabled')) {
ringpop.lagSampler.start();
}
};
}

function createDestroyedHandler(ringpop) {
return function onDestroyed() {
ringpop.lagSampler.stop();
};
}

function register(ringpop) {
ringpop.on('destroyed', createDestroyedHandler(ringpop));
ringpop.on('ready', createReadyHandler(ringpop));
}

module.exports = {
createDestroyedHandler: createDestroyedHandler,
createReadyHandler: createReadyHandler,
register: register
};
Loading

0 comments on commit 617f43b

Please sign in to comment.