Skip to content

Commit

Permalink
[api] Integrated commits from donnerjack and worked on pool changes
Browse files Browse the repository at this point in the history
  • Loading branch information
indexzero committed Sep 17, 2010
2 parents 5d54ea5 + 7e61f0c commit 3bb458e
Show file tree
Hide file tree
Showing 2 changed files with 300 additions and 24 deletions.
270 changes: 246 additions & 24 deletions lib/node-http-proxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

var sys = require('sys'),
http = require('http'),
events = require('events'),
Expand All @@ -43,24 +43,36 @@ exports.createServer = function () {
callback = typeof args[args.length - 1] === 'function' && args.pop();
if (args[0]) port = args[0];
if (args[1]) host = args[1];

var server = http.createServer(function (req, res){
var proxy = new HttpProxy(req, res);

proxy.emitter.on('proxy', function (err, body) {
server.emit('proxy', err, body);
});

// If we were passed a callback to process the request
// or response in some way, then call it.
if(callback) {
callback(req, res, proxy);
}
else {
else {
proxy.proxyRequest(port, server);
}
});


// If callback is empty - tunnel websocket request automatically
if (!callback) {
// WebSocket support
server.on('upgrade', function(req, socket, head) {
var proxy = new HttpProxy(req, socket, head);

// Tunnel websocket requests too
proxy.proxyWebSocketRequest(port, host);

});
}

return server;
};

Expand All @@ -74,12 +86,21 @@ exports.setMax = function (value) {
manager.setMaxClients(max);
};

var HttpProxy = function (req, res) {
var HttpProxy = function (req, res, head) {
this.emitter = new(events.EventEmitter);
this.events = {};
this.req = req;
this.res = res;
this.watch(req);
// If this request is upgrade request
// No response will be passed
if (!req.headers.upgrade) {
this.res = res;
this.watch(req);
} else {
// Second argument will be socket
this.sock = res;
this.head = head;
this.watch(res);
}
};

HttpProxy.prototype = {
Expand All @@ -91,7 +112,7 @@ HttpProxy.prototype = {
}
return arr;
},

watch: function (req) {
this.events = [];
var self = this;
Expand All @@ -106,11 +127,11 @@ HttpProxy.prototype = {
req.addListener('data', this.onData);
req.addListener('end', this.onEnd);
},

unwatch: function (req) {
req.removeListener('data', this.onData);
req.removeListener('end', this.onEnd);

// Rebroadcast any events that have been buffered
for (var i = 0, len = this.events.length; i < len; ++i) {
req.emit.apply(req, this.events[i]);
Expand All @@ -121,33 +142,31 @@ HttpProxy.prototype = {
// Remark: nodeProxy.body exists solely for testability
var self = this, req = this.req, res = this.res;
self.body = '';

// Open new HTTP request to internal resource with will act as a reverse proxy pass
var p = manager.getPool(port, server);

sys.puts('current pool count for ' + req.headers.host + ":" + port + ' ' +p.clients.length);

p.on('error', function (err) {
// Remark: We should probably do something here
// but this is a hot-fix because I don't think 'pool'
// but this is a hot-fix because I don't think 'pool'
// should be emitting this event.
sys.puts('p.on, error fired'.red);
eyes.inspect(err);
this.res.end();
});

p.request(req.method, req.url, req.headers, function (reverse_proxy) {
// Create an error handler so we can use it temporarily
var error = function (err) {
res.writeHead(200, {'Content-Type': 'text/plain'});
res.writeHead(500, {'Content-Type': 'text/plain'});

if(req.method !== 'HEAD') {
res.write('An error has occurred: ' + sys.puts(JSON.stringify(err)));
res.write('An error has occurred: ' + JSON.stringify(err));
}


// Response end may never come so removeListener here
reverse_proxy.removeListener('error', error);
res.end();
};

// Add a listener for the connection timeout event
reverse_proxy.addListener('error', error);

Expand All @@ -161,6 +180,13 @@ HttpProxy.prototype = {
// Set the response headers of the client response
res.writeHead(response.statusCode, response.headers);

// Status code = 304
// No 'data' event and no 'end'
if (response.statusCode === 304) {
res.end();
return;
}

// Add event handler for the proxied response in chunks
response.addListener('data', function (chunk) {
if(req.method !== 'HEAD') {
Expand All @@ -173,6 +199,7 @@ HttpProxy.prototype = {
response.addListener('end', function () {
// Remark: Emit the end event for testability
self.emitter.emit('proxy', null, self.body);
reverse_proxy.removeListener('error', error);
res.end();
});
});
Expand All @@ -185,11 +212,206 @@ HttpProxy.prototype = {
// At the end of the client request, we are going to stop the proxied request
req.addListener('end', function () {
reverse_proxy.end();
reverse_proxy.removeListener('error', error);
});

self.unwatch(req);
});
},

/**
* WebSocket Tunnel realization
* Copyright (c) 2010 Fedor Indutny : http://github.com/donnerjack13589
*/
proxyWebSocketRequest: function (port, server, host) {
var self = this, req = self.req, socket = self.sock, head = self.head,
headers = new _headers(req.headers), CRLF = '\r\n';

// Will generate clone of headers
// To not change original
function _headers(headers) {
var h = {};
for (var i in headers) {
h[i] = headers[i];
}
return h;
}

// WebSocket requests has
// method = GET
if (req.method !== 'GET' || headers.upgrade.toLowerCase() !== 'websocket') {
// This request is not WebSocket request
return;
}

// Turn of all bufferings
// For server set KeepAlive
// For client set encoding
function _socket(socket, server) {
socket.setTimeout(0);
socket.setNoDelay(true);
if (server) {
socket.setKeepAlive(true, 0);
}
else {
socket.setEncoding('utf8');
}
}

// Client socket
_socket(socket);

// If host is undefined
// Get it from headers
if (!host) {
host = headers.Host;
}

// Remote host address
var remote_host = server + (port - 80 === 0 ? '' : ':' + port);

// Change headers
headers.Host = remote_host;
headers.Origin = 'http://' + remote_host;

// Open request
var p = manager.getPool(port, server);

p.getClient(function(client) {
// Based on 'pool/main.js'
var request = client.request('GET', req.url, headers);

var errorListener = function (error) {
client.removeListener('error', errorListener);

// Remove the client from the pool's available clients since it has errored
p.clients.splice(p.clients.indexOf(client), 1);
socket.end();
}

// Not disconnect on update
client.on('upgrade', function(request, remote_socket, head) {
// Prepare socket
_socket(remote_socket, true);

// Emit event
onUpgrade(remote_socket);
});

client.on('error', errorListener);
request.on('response', function (response) {
response.on('end', function () {
client.removeListener('error', errorListener);
client.busy = false;
p.onFree(client);
})
})
client.busy = true;

var handshake;
request.socket.on('data', handshake = function(data) {
// Handshaking

// Ok, kind of harmfull part of code
// Socket.IO is sending hash at the end of handshake
// If protocol = 76
// But we need to replace 'host' and 'origin' in response
// So we split data to printable data and to non-printable
// (Non-printable will come after double-CRLF)
var sdata = data.toString();

// Get Printable
sdata = sdata.substr(0, sdata.search(CRLF + CRLF));

// Get Non-Printable
data = data.slice(Buffer.byteLength(sdata), data.length);

// Replace host and origin
sdata = sdata.replace(remote_host, host)
.replace(remote_host, host);

try {
// Write printable
socket.write(sdata);

// Write non-printable
socket.write(data);
}
catch (e) {
request.end();
socket.end();
}

// Catch socket errors
socket.on('error', function() {
request.end();
});

// Remove data listener now that the 'handshake' is complete
request.socket.removeListener('data', handshake);
});

// Write upgrade-head
try {
request.write(head);
}
catch(e) {
request.end();
socket.end();
}
self.unwatch(socket);
});

// Request

function onUpgrade(reverse_proxy) {
var listeners = {};

// We're now connected to the server, so lets change server socket
reverse_proxy.on('data', listeners._r_data = function(data) {
// Pass data to client
if (socket.writable) {
try {
socket.write(data);
}
catch (e) {
socket.end();
reverse_proxy.end();
}
}
});

socket.on('data', listeners._data = function(data){
// Pass data from client to server
try {
reverse_proxy.write(data);
}
catch (e) {
reverse_proxy.end();
socket.end();
}
});

// Detach event listeners from reverse_proxy
function detach() {
reverse_proxy.removeListener('close', listeners._r_close);
reverse_proxy.removeListener('data', listeners._r_data);
socket.removeListener('data', listeners._data);
socket.removeListener('close', listeners._close);
}

// Hook disconnections
reverse_proxy.on('end', listeners._r_close = function() {
socket.end();
detach();
});

socket.on('end', listeners._close = function() {
reverse_proxy.end();
detach();
});

};

}
};

Expand Down
Loading

0 comments on commit 3bb458e

Please sign in to comment.