Skip to content

Commit

Permalink
Merge pull request #238 from uber/wedge-tracking
Browse files Browse the repository at this point in the history
Track potentially wedged requests and cancel them
  • Loading branch information
jwolski2 committed Feb 1, 2016
2 parents 67a04ba + ea5e3a7 commit 66df6d2
Show file tree
Hide file tree
Showing 12 changed files with 821 additions and 77 deletions.
179 changes: 108 additions & 71 deletions client.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,26 @@
// THE SOFTWARE.
'use strict';

var safeParse = require('./lib/util.js').safeParse;
var validateHostPort = require('./lib/util.js').validateHostPort;
var _ = require('underscore');
var Config = require('./config.js');
var ClientErrors = require('./client_errors.js');
var ClientRequest = require('./client_request.js');
var globalTimers = require('timers');
var RingpopErrors = require('./ringpop_errors.js');
var TChannel = require('tchannel');
var TypedError = require('error/typed');

var ChannelDestroyedError = TypedError({
type: 'ringpop.client.channel-destroyed',
message: 'Channel is already destroyed',
endpoint: null,
channelType: null
});

var InvalidHostPortError = TypedError({
type: 'ringpop.client.invalid-hostport',
message: 'Request made with invalid host port combination ({hostPort})',
hostPort: null
});

function RingpopClient(subChannel) {

function RingpopClient(ringpop, subChannel, timers, date) {
this.ringpop = ringpop;
this.subChannel = subChannel;
this.timers = timers || globalTimers;
this.date = date || Date;

this.config = this.ringpop.config;
this.logger = this.ringpop.loggerFactory.getLogger('client');

// If no subChannel provided, create one from a new instance
// of TChannel. This client then becomes the owner of that
// instance and is responsible for closing.
this.isChannelOwner = false;
if (!this.subChannel) {
this.tchannel = new TChannel();
Expand All @@ -48,6 +48,10 @@ function RingpopClient(subChannel) {
});
this.isChannelOwner = true;
}

this.requestsById = {};
this.wedgedTimer = null;
this.isDestroyed = false;
}

RingpopClient.prototype.adminConfigGet = function adminConfigGet(host, body, callback) {
Expand Down Expand Up @@ -100,75 +104,108 @@ RingpopClient.prototype.destroy = function destroy(callback) {
if (this.isChannelOwner) {
this.tchannel.close(callback);
}

this.timers.clearTimeout(this.wedgedTimer);
this.isDestroyed = true;
};

// This function is "public" for ease of testing, but shouldn't be called by
// anything other than the class itself.
RingpopClient.prototype.scanForWedgedRequests = function scanForWedgedRequests() {
var wedgedRequests = [];

// Find all requests that have not been timed-out by TChannel.
// Induce Ringpop timeout for wedged requests and cancel them.
var requestIds = Object.keys(this.requestsById);
var scanTime = this.date.now();
for (var i = 0; i < requestIds.length; i++) {
var request = this.requestsById[requestIds[i]];
var timeInflight = scanTime - request.timestamp;
var isWedged = timeInflight >= (this.config.get('wedgedRequestTimeout') ||
Config.Defaults.wedgedRequestTimeout);
if (isWedged) {
delete this.requestsById[request.id];
request.cancel();
wedgedRequests.push(request);
}
}

if (wedgedRequests.length > 0) {
this.logger.error('ringpop canceled wedged requests', {
local: this.ringpop.whoami(),
numWedgedRequests: wedgedRequests.length,
wedgedRequests: wedgedRequestsToLog()
});
this._emitWedgedError();
}

return wedgedRequests;

// Log the first N requests only. We don't want to flood
// our pipes.
function wedgedRequestsToLog() {
return _.take(wedgedRequests, 10).map(function toLog(request) {
return request.toLog();
});
}
};

RingpopClient.prototype._emitWedgedError = function _emitWedgedError() {
var errorListeners = this.ringpop.listeners('error');
if (errorListeners.length > 0) {
this.ringpop.emit('error',
RingpopErrors.PotentiallyWedgedRequestsError());
}
};

/* jshint maxparams: 5 */
RingpopClient.prototype._request = function _request(opts, endpoint, head, body, callback) {
var self = this;

if (this.subChannel && this.subChannel.destroyed) {
process.nextTick(function onTick() {
callback(ChannelDestroyedError({
endpoint: endpoint,
channelType: 'subChannel'
}));
});
return;
if (!this.wedgedTimer) {
this._scheduleWedgedTimer();
}

if (this.subChannel &&
this.subChannel.topChannel &&
this.subChannel.topChannel.destroyed) {
var request = new ClientRequest(this, opts, endpoint, head, body,
this.date, onSend);

// Evaluate the number of inflight requests. If we've got more than the
// allowable limit it's a strong indication that at least some are wedged.
// Apply backpressure until the amount of inflight requests is reduced
// by cancellation or otherwise.
var inflightCurrent = Object.keys(this.requestsById).length;
var inflightMax = this.config.get('inflightClientRequestsLimit');
if (inflightCurrent >= inflightMax) {
process.nextTick(function onTick() {
callback(ChannelDestroyedError({
endpoint: endpoint,
channelType: 'topChannel'
callback(ClientErrors.ClientRequestsLimitError({
inflightCurrent: inflightCurrent,
inflightMax: inflightMax,
endpoint: endpoint
}));
});
return;
}

if (!opts || !validateHostPort(opts.host)) {
callback(InvalidHostPortError({
hostPort: String(opts && opts.host)
}));
return;
}

self.subChannel.waitForIdentified({
host: opts.host
}, function onIdentified(err) {
if (err) {
callback(err);
return;
}
// Registering the request by its ID must happen at the precise
// moment before we know it will be "inflight."
this.requestsById[request.id] = request;
request.send();

self.subChannel.request({
host: opts.host,
serviceName: 'ringpop',
hasNoParent: true,
retryLimit: opts.retryLimit || 0,
trace: false,
headers: {
as: 'raw',
cn: 'ringpop'
},
timeout: opts.timeout || 30000
}).send(endpoint, JSON.stringify(head), JSON.stringify(body), onSend);
});

function onSend(err, res, arg2, arg3) {
if (!err && !res.ok) {
err = safeParse(arg3) || new Error('Server Error');
}

if (err) {
callback(err);
return;
}

callback(null, safeParse(arg3));
function onSend() {
delete self.requestsById[request.id];
var args = Array.prototype.splice.call(arguments, 0);
callback.apply(null, args);
}
};

RingpopClient.prototype._scheduleWedgedTimer = function _scheduleWedgedTimer() {
var self = this;
this.wedgedTimer = this.timers.setTimeout(function onTimeout() {
if (self.isDestroyed) return;
self.scanForWedgedRequests();
self._scheduleWedgedTimer();
}, this.config.get('wedgedTimerInterval') || Config.Defaults.wedgedTimerInterval);
this.wedgedTimer.unref();
};

module.exports = RingpopClient;
32 changes: 32 additions & 0 deletions client_errors.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright (c) 2015 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
'use strict';

var TypedError = require('error/typed');

module.exports = {
ClientRequestsLimitError: TypedError({
type: 'ringpop.client.limit-exceeded',
message: 'Client request limit reached',
inflightCurrent: null,
inflightMax: null,
endpoint: null
})
};
Loading

0 comments on commit 66df6d2

Please sign in to comment.