diff --git a/lib/core/client.js b/lib/core/client.js index 22aac49feb5..525a4b7845f 100644 --- a/lib/core/client.js +++ b/lib/core/client.js @@ -555,7 +555,7 @@ class Parser extends HTTPParser { try { if (request.onBody(chunk, offset, length) === false) { - socket.pause() + socket[kPause]() } } catch (err) { util.destroy(socket, err) @@ -630,7 +630,7 @@ class Parser extends HTTPParser { util.destroy(socket, new InformationalError('reset')) } } else { - socket.resume() + socket[kResume]() resume(client) } } @@ -779,8 +779,8 @@ function connect (client) { parser.consume(socket._handle._externalStream) } - socket[kPause] = socket.pause.bind(socket) - socket[kResume] = socket.resume.bind(socket) + socket[kPause] = socketPause.bind(socket) + socket[kResume] = socketResume.bind(socket) socket[kError] = null socket[kParser] = parser socket[kClient] = client @@ -794,6 +794,22 @@ function connect (client) { .on('close', onSocketClose) } +function socketPause () { + // TODO: Pause parser. + if (this._handle && this._handle.reading) { + this._handle.reading = false + this._handle.readStop() + } +} + +function socketResume () { + // TODO: Resume parser. + if (this._handle && !this._handle.reading) { + this._handle.reading = true + this._handle.readStart() + } +} + function emitDrain (client) { client[kNeedDrain] = 0 client.emit('drain') diff --git a/test/socket-back-pressure.js b/test/socket-back-pressure.js new file mode 100644 index 00000000000..d0710607a62 --- /dev/null +++ b/test/socket-back-pressure.js @@ -0,0 +1,45 @@ +'use strict' + +const { Client } = require('..') +const { createServer } = require('http') +const { Readable } = require('stream') +const { test } = require('tap') + +test('socket back-pressure', (t) => { + t.plan(2) + + const server = createServer() + + let body + + server.on('request', (req, res) => { + let bytesWritten = 0 + const buf = Buffer.allocUnsafe(16384) + new Readable({ + read () { + bytesWritten += buf.length + this.push(buf) + if (bytesWritten >= 1e6) { + this.push(null) + } + } + }).on('end', () => { + t.ok(body._readableState.length < body._readableState.highWaterMark) + }).pipe(res) + }) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`, { + pipelining: 1 + }) + + client.request({ path: '/', method: 'GET', opaque: 'asd' }, (err, data) => { + t.error(err) + body = data.body + .resume() + .on('data', () => { + data.body.pause() + }) + }) + }) +})