diff --git a/lib/core/request.js b/lib/core/request.js index 6cd9b2f8307..a8680b5057f 100644 --- a/lib/core/request.js +++ b/lib/core/request.js @@ -130,7 +130,6 @@ class Request { } this.completed = false - this.aborted = false this.upgrade = upgrade || null @@ -272,6 +271,7 @@ class Request { this.onFinally() assert(!this.aborted) + assert(!this.completed) this.completed = true if (channels.trailers.hasSubscribers) { diff --git a/lib/dispatcher/client-h2.js b/lib/dispatcher/client-h2.js index ef6d47a0c9c..bb06ab957c5 100644 --- a/lib/dispatcher/client-h2.js +++ b/lib/dispatcher/client-h2.js @@ -184,18 +184,19 @@ function onHttp2SessionEnd () { * @param {number} errorCode */ function onHttp2SessionGoAway (errorCode) { - // We cannot recover, so best to close the session and the socket + // TODO(mcollina): Verify if GOAWAY implements the spec correctly: + // https://datatracker.ietf.org/doc/html/rfc7540#section-6.8 + // Specifically, we do not verify the "valid" stream id. + const err = this[kError] || new SocketError(`HTTP/2: "GOAWAY" frame received with code ${errorCode}`, util.getSocketInfo(this[kSocket])) const client = this[kClient] client[kSocket] = null client[kHTTPContext] = null - if (this[kHTTP2Session] !== null) { - this[kHTTP2Session].close() - this[kHTTP2Session].destroy(err) - this[kHTTP2Session] = null - } + // this is an HTTP2 session + this.close() + this[kHTTP2Session] = null util.destroy(this[kSocket], err) @@ -321,13 +322,15 @@ function writeH2 (client, request) { util.errorRequest(client, request, err) if (stream != null) { + // Some chunks might still come after abort, + // let's ignore them + stream.removeAllListeners('data') + // On Abort, we close the stream to send RST_STREAM frame stream.close() - // We delay the destroy to allow the stream to send the RST_STREAM frame - queueMicrotask(() => util.destroy(stream, err)) + // We move the running index to the next request - client[kQueue][client[kRunningIdx]++] = null - client[kPendingIdx] = client[kRunningIdx] + client[kOnError](err) client[kResume]() } @@ -356,7 +359,7 @@ function writeH2 (client, request) { // We disabled endStream to allow the user to write to the stream stream = session.request(headers, { endStream: false, signal }) - if (stream.id && !stream.pending) { + if (!stream.pending) { request.onUpgrade(null, null, stream) ++session[kOpenStreams] client[kQueue][client[kRunningIdx]++] = null @@ -463,26 +466,31 @@ function writeH2 (client, request) { // for those scenarios, best effort is to destroy the stream immediately // as there's no value to keep it open. if (request.aborted) { + stream.removeAllListeners('data') return } if (request.onHeaders(Number(statusCode), parseH2Headers(realHeaders), stream.resume.bind(stream), '') === false) { stream.pause() } + }) - stream.on('data', (chunk) => { - if (request.onData(chunk) === false) { - stream.pause() - } - }) + stream.on('data', (chunk) => { + if (request.onData(chunk) === false) { + stream.pause() + } }) stream.once('end', (err) => { + stream.removeAllListeners('data') // When state is null, it means we haven't consumed body and the stream still do not have // a state. // Present specially when using pipeline or stream if (stream.state?.state == null || stream.state.state < 6) { - request.onComplete([]) + // Do not complete the request if it was aborted + if (!request.aborted) { + request.onComplete([]) + } client[kQueue][client[kRunningIdx]++] = null client[kResume]() @@ -503,6 +511,7 @@ function writeH2 (client, request) { }) stream.once('close', () => { + stream.removeAllListeners('data') session[kOpenStreams] -= 1 if (session[kOpenStreams] === 0) { session.unref() diff --git a/test/http2.js b/test/http2.js index 83ea5f62cbf..51f66b10ced 100644 --- a/test/http2.js +++ b/test/http2.js @@ -1621,12 +1621,21 @@ test('#3753 - Handle GOAWAY Gracefully', async (t) => { 'x-my-header': 'foo' } }, (err, response) => { - if (i === 9 || i === 8) { - t.strictEqual(err?.message, 'HTTP/2: "GOAWAY" frame received with code 0') - t.strictEqual(err?.code, 'UND_ERR_SOCKET') + if (err) { + t.strictEqual(err.message, 'HTTP/2: "GOAWAY" frame received with code 0') + t.strictEqual(err.code, 'UND_ERR_SOCKET') } else { - t.ifError(err) t.strictEqual(response.statusCode, 200) + ;(async function () { + let body + try { + body = await response.body.text() + } catch (err) { + t.strictEqual(err.code, 'UND_ERR_SOCKET') + return + } + t.strictEqual(body, 'hello world') + })() } }) }