Skip to content

Commit

Permalink
Merge branch 'main' into feat/2264
Browse files Browse the repository at this point in the history
  • Loading branch information
metcoder95 authored Nov 10, 2023
2 parents 74b5f67 + a1342a7 commit 26d492b
Show file tree
Hide file tree
Showing 14 changed files with 236 additions and 152 deletions.
65 changes: 40 additions & 25 deletions lib/api/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ const kBody = Symbol('kBody')
const kAbort = Symbol('abort')
const kContentType = Symbol('kContentType')

const noop = () => {}

module.exports = class BodyReadable extends Readable {
constructor ({
resume,
Expand Down Expand Up @@ -149,37 +151,50 @@ module.exports = class BodyReadable extends Readable {
return this[kBody]
}

async dump (opts) {
dump (opts) {
let limit = opts && Number.isFinite(opts.limit) ? opts.limit : 262144
const signal = opts && opts.signal
const abortFn = () => {
this.destroy()
}
let signalListenerCleanup

if (signal) {
if (typeof signal !== 'object' || !('aborted' in signal)) {
throw new InvalidArgumentError('signal must be an AbortSignal')
}
util.throwIfAborted(signal)
signalListenerCleanup = util.addAbortListener(signal, abortFn)
}
try {
for await (const chunk of this) {
util.throwIfAborted(signal)
limit -= Buffer.byteLength(chunk)
if (limit < 0) {
return
try {
if (typeof signal !== 'object' || !('aborted' in signal)) {
throw new InvalidArgumentError('signal must be an AbortSignal')
}
util.throwIfAborted(signal)
} catch (err) {
return Promise.reject(err)
}
} catch {
util.throwIfAborted(signal)
} finally {
if (typeof signalListenerCleanup === 'function') {
signalListenerCleanup()
} else if (signalListenerCleanup) {
signalListenerCleanup[Symbol.dispose]()
}
}

if (this.closed) {
return Promise.resolve(null)
}

return new Promise((resolve, reject) => {
const signalListenerCleanup = signal
? util.addAbortListener(signal, () => {
this.destroy()
})
: noop

this
.on('close', function () {
signalListenerCleanup()
if (signal?.aborted) {
reject(signal.reason || Object.assign(new Error('The operation was aborted'), { name: 'AbortError' }))
} else {
resolve(null)
}
})
.on('error', noop)
.on('data', function (chunk) {
limit -= chunk.length
if (limit <= 0) {
this.destroy()
}
})
.resume()
})
}
}

Expand Down
32 changes: 11 additions & 21 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -1183,7 +1183,7 @@ async function connect (client) {
const idx = hostname.indexOf(']')

assert(idx !== -1)
const ip = hostname.substr(1, idx - 1)
const ip = hostname.substring(1, idx)

assert(net.isIP(ip))
hostname = ip
Expand Down Expand Up @@ -1462,23 +1462,7 @@ function _resume (client, sync) {
return
}

if (util.isStream(request.body) && util.bodyLength(request.body) === 0) {
request.body
.on('data', /* istanbul ignore next */ function () {
/* istanbul ignore next */
assert(false)
})
.on('error', function (err) {
errorRequest(client, request, err)
})
.on('end', function () {
util.destroy(this)
})

request.body = null
}

if (client[kRunning] > 0 &&
if (client[kRunning] > 0 && util.bodyLength(request.body) !== 0 &&
(util.isStream(request.body) || util.isAsyncIterable(request.body))) {
// Request with stream or iterator body can error while other requests
// are inflight and indirectly error those as well.
Expand Down Expand Up @@ -1532,7 +1516,9 @@ function write (client, request) {
body.read(0)
}

let contentLength = util.bodyLength(body)
const bodyLength = util.bodyLength(body)

let contentLength = bodyLength

if (contentLength === null) {
contentLength = request.contentLength
Expand Down Expand Up @@ -1630,7 +1616,7 @@ function write (client, request) {
}

/* istanbul ignore else: assertion */
if (!body) {
if (!body || bodyLength === 0) {
if (contentLength === 0) {
socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1')
} else {
Expand Down Expand Up @@ -1986,7 +1972,11 @@ function writeStream ({ h2stream, body, client, request, socket, contentLength,
}
}
const onAbort = function () {
onFinished(new RequestAbortedError())
if (finished) {
return
}
const err = new RequestAbortedError()
queueMicrotask(() => onFinished(err))
}
const onFinished = function (err) {
if (finished) {
Expand Down
42 changes: 41 additions & 1 deletion lib/core/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,29 @@ class Request {

this.method = method

this.abort = null

if (body == null) {
this.body = null
} else if (util.isStream(body)) {
this.body = body

const rState = this.body._readableState
if (!rState || !rState.autoDestroy) {
this.endHandler = function autoDestroy () {
util.destroy(this)
}
this.body.on('end', this.endHandler)
}

this.errorHandler = err => {
if (this.abort) {
this.abort(err)
} else {
this.error = err
}
}
this.body.on('error', this.errorHandler)
} else if (util.isBuffer(body)) {
this.body = body.byteLength ? body : null
} else if (ArrayBuffer.isView(body)) {
Expand Down Expand Up @@ -236,7 +255,12 @@ class Request {
assert(!this.aborted)
assert(!this.completed)

return this[kHandler].onConnect(abort)
if (this.error) {
abort(this.error)
} else {
this.abort = abort
return this[kHandler].onConnect(abort)
}
}

onHeaders (statusCode, headers, resume, statusText) {
Expand Down Expand Up @@ -265,6 +289,8 @@ class Request {
}

onComplete (trailers) {
this.onFinally()

assert(!this.aborted)

this.completed = true
Expand All @@ -275,6 +301,8 @@ class Request {
}

onError (error) {
this.onFinally()

if (channels.error.hasSubscribers) {
channels.error.publish({ request: this, error })
}
Expand All @@ -286,6 +314,18 @@ class Request {
return this[kHandler].onError(error)
}

onFinally () {
if (this.errorHandler) {
this.body.off('error', this.errorHandler)
this.errorHandler = null
}

if (this.endHandler) {
this.body.off('end', this.endHandler)
this.endHandler = null
}
}

// TODO: adjust to support H2
addHeader (key, value) {
processHeader(this, key, value)
Expand Down
17 changes: 4 additions & 13 deletions lib/core/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,13 @@ function getHostname (host) {
const idx = host.indexOf(']')

assert(idx !== -1)
return host.substr(1, idx - 1)
return host.substring(1, idx)
}

const idx = host.indexOf(':')
if (idx === -1) return host

return host.substr(0, idx)
return host.substring(0, idx)
}

// IP addresses are not valid server names per RFC6066
Expand Down Expand Up @@ -190,7 +190,7 @@ function isReadableAborted (stream) {
}

function destroy (stream, err) {
if (!isStream(stream) || isDestroyed(stream)) {
if (stream == null || !isStream(stream) || isDestroyed(stream)) {
return
}

Expand Down Expand Up @@ -228,7 +228,7 @@ function parseHeaders (headers, obj = {}) {

if (!val) {
if (Array.isArray(headers[i + 1])) {
obj[key] = headers[i + 1]
obj[key] = headers[i + 1].map(x => x.toString('utf8'))
} else {
obj[key] = headers[i + 1].toString('utf8')
}
Expand Down Expand Up @@ -431,16 +431,7 @@ function throwIfAborted (signal) {
}
}

let events
function addAbortListener (signal, listener) {
if (typeof Symbol.dispose === 'symbol') {
if (!events) {
events = require('events')
}
if (typeof events.addAbortListener === 'function' && 'aborted' in signal) {
return events.addAbortListener(signal, listener)
}
}
if ('addEventListener' in signal) {
signal.addEventListener('abort', listener, { once: true })
return () => signal.removeEventListener('abort', listener)
Expand Down
Loading

0 comments on commit 26d492b

Please sign in to comment.