From 2b9e09b00ac40e6c6de2b68754df7b8e8c1e3878 Mon Sep 17 00:00:00 2001 From: isaacs Date: Wed, 31 Aug 2011 21:48:50 -0700 Subject: [PATCH] Add flow control Not in the "async/fibers/coro" sense of flow control, but in the TCP backpressure sense. Pause the stream when a write isn't flushed, and then resume it once the writable stream drains. --- lib/node-http-proxy.js | 54 +++++++++++++++++++++++++++++++++++------- 1 file changed, 46 insertions(+), 8 deletions(-) diff --git a/lib/node-http-proxy.js b/lib/node-http-proxy.js index 1e5df658b..ed920a227 100644 --- a/lib/node-http-proxy.js +++ b/lib/node-http-proxy.js @@ -539,7 +539,7 @@ HttpProxy.prototype.proxyRequest = function (req, res, options) { response.on('data', function (chunk) { if (req.method !== 'HEAD' && res.writable) { try { - res.write(chunk); + var flushed = res.write(chunk); } catch (er) { console.error("res.write error: %s", er.message); try { @@ -547,8 +547,15 @@ HttpProxy.prototype.proxyRequest = function (req, res, options) { } catch (er) { console.error("res.end error: %s", er.message); } + return; } } + if (!flushed) { + response.pause(); + res.once('drain', function () { + response.resume(); + }); + } }); // When the `reverseProxy` `response` ends, end the @@ -578,7 +585,13 @@ HttpProxy.prototype.proxyRequest = function (req, res, options) { // `req` write it to the `reverseProxy` request. req.on('data', function (chunk) { if (!errState) { - reverseProxy.write(chunk); + var flushed = reverseProxy.write(chunk); + if (!flushed) { + req.pause(); + reverseProxy.once('drain', function () { + req.resume(); + }); + } } }); @@ -646,7 +659,13 @@ HttpProxy.prototype._forwardRequest = function (req) { // Chunk the client request body as chunks from the proxied request come in req.on('data', function (chunk) { - forwardProxy.write(chunk); + var flushed = forwardProxy.write(chunk); + if (!flushed) { + req.pause(); + forwardProxy.once('drain', function () { + req.resume(); + }); + } }) // At the end of the client request, we are going to stop the proxied request @@ -741,7 +760,13 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options if (reverseProxy.incoming.socket.writable) { try { self.emit('websocket:outgoing', req, socket, head, data); - reverseProxy.incoming.socket.write(data); + var flushed = reverseProxy.incoming.socket.write(data); + if (!flushed) { + proxySocket.pause(); + reverseProxy.incoming.socket.once('drain', function () { + proxySocket.resume(); + }); + } } catch (e) { detach(); @@ -758,7 +783,13 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options reverseProxy.incoming.socket.on('data', listeners.onOutgoing = function(data) { try { self.emit('websocket:incoming', reverseProxy, reverseProxy.incoming, head, data); - proxySocket.write(data); + var flushed = proxySocket.write(data); + if (!flushed) { + reverseProxy.incoming.socket.pause(); + proxySocket.once('drain', function () { + reverseProxy.incoming.socket.resume(); + }); + } } catch (e) { detach(); @@ -918,7 +949,14 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options // self.emit('websocket:handshake', req, socket, head, sdata, data); socket.write(sdata); - socket.write(data); + var flushed = socket.write(data); + if (!flushed) { + reverseProxy.socket.pause(); + socket.once('drain', function () { + reverseProxy.socket.resume(); + }); + } + } catch (ex) { proxyError(ex) @@ -935,9 +973,9 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options reverseProxy.on('error', proxyError); try { - // // Attempt to write the upgrade-head to the reverseProxy request. - // + // This is small, and there's only ever one of it. + // No need for pause/resume. reverseProxy.write(head); } catch (ex) {