From 6741636b3f174a9fafb1a0c575ae7494618cecb9 Mon Sep 17 00:00:00 2001 From: Khafra <42794878+KhafraDev@users.noreply.github.com> Date: Sun, 24 Apr 2022 13:59:34 -0400 Subject: [PATCH] feat: add `undici:body:consumed` diagnostics channel --- docs/api/DiagnosticsChannel.md | 18 ++++ lib/api/api-request.js | 4 +- lib/api/readable.js | 22 ++++- lib/client.js | 2 +- lib/core/request.js | 2 +- lib/handler/redirect.js | 2 +- test/diagnostics-channel/body.js | 144 +++++++++++++++++++++++++++++++ 7 files changed, 187 insertions(+), 7 deletions(-) create mode 100644 test/diagnostics-channel/body.js diff --git a/docs/api/DiagnosticsChannel.md b/docs/api/DiagnosticsChannel.md index 09a7f9a06c0..10d9a37b4a4 100644 --- a/docs/api/DiagnosticsChannel.md +++ b/docs/api/DiagnosticsChannel.md @@ -135,3 +135,21 @@ diagnosticsChannel.channel('undici:client:connectError').subscribe(({ error, soc // connector is a function that creates the socket console.log(`Connect failed with ${error.message}`) }) +``` + +## `undici:body:consumed` + +This message is published once a body is consumed using a [body mixin](https://github.com/nodejs/undici#body-mixins). + +```js +diagnosticsChannel.channel('undici:body:consumed').subscribe(({ body, request }) => { + // request is the same object undici:request:create + + // body is the body consumed (this corresponds with how the body was consumed. For example, if the body was consumed using .blob(), this property will be a blob) + if (typeof body === 'string') { + // the body was consumed with `.text()` + } +}) +``` + +Note: Consuming the body in other ways (ie. async iterators or [stream consumers](https://nodejs.org/api/webstreams.html#utility-consumers)) will not cause a message to be published in this channel. diff --git a/lib/api/api-request.js b/lib/api/api-request.js index bcfe483ebef..c6ee12b0ecc 100644 --- a/lib/api/api-request.js +++ b/lib/api/api-request.js @@ -70,7 +70,7 @@ class RequestHandler extends AsyncResource { this.context = context } - onHeaders (statusCode, rawHeaders, resume) { + onHeaders (statusCode, rawHeaders, resume, statusText, request) { const { callback, opaque, abort, context } = this if (statusCode < 200) { @@ -82,7 +82,7 @@ class RequestHandler extends AsyncResource { } const parsedHeaders = util.parseHeaders(rawHeaders) - const body = new Readable(resume, abort, parsedHeaders['content-type']) + const body = new Readable(resume, abort, parsedHeaders['content-type'], request) this.callback = null this.res = body diff --git a/lib/api/readable.js b/lib/api/readable.js index 4184a867569..864353f559e 100644 --- a/lib/api/readable.js +++ b/lib/api/readable.js @@ -9,15 +9,24 @@ const util = require('../core/util') const { ReadableStreamFrom, toUSVString } = require('../core/util') let Blob +const channels = {} + +try { + const diagnosticsChannel = require('diagnostics_channel') + channels.trailers = diagnosticsChannel.channel('undici:body:consumed') +} catch { + channels.trailers = { hasSubscribers: false } +} const kConsume = Symbol('kConsume') const kReading = Symbol('kReading') const kBody = Symbol('kBody') const kAbort = Symbol('abort') const kContentType = Symbol('kContentType') +const kRequest = Symbol('kRequest') module.exports = class BodyReadable extends Readable { - constructor (resume, abort, contentType = '') { + constructor (resume, abort, contentType = '', request) { super({ autoDestroy: true, read: resume, @@ -30,6 +39,7 @@ module.exports = class BodyReadable extends Readable { this[kConsume] = null this[kBody] = null this[kContentType] = contentType + this[kRequest] = channels.trailers.hasSubscribers ? request : null // Is stream being consumed through Readable API? // This is an optimization so that we avoid checking @@ -178,10 +188,18 @@ async function consume (stream, type) { assert(!stream[kConsume]) return new Promise((resolve, reject) => { + const res = (value) => { + if (channels.trailers.hasSubscribers && value !== undefined) { + channels.trailers.publish({ body: value, request: stream[kRequest] }) + } + + resolve(value) + } + stream[kConsume] = { type, stream, - resolve, + resolve: res, reject, length: 0, body: [] diff --git a/lib/client.js b/lib/client.js index d3d4cfc705d..f4697f289e3 100644 --- a/lib/client.js +++ b/lib/client.js @@ -763,7 +763,7 @@ class Parser { let pause try { - pause = request.onHeaders(statusCode, headers, this.resume, statusText) === false + pause = request.onHeaders(statusCode, headers, this.resume, statusText, request) === false } catch (err) { util.destroy(socket, err) return -1 diff --git a/lib/core/request.js b/lib/core/request.js index f04fe4fab21..e12fc4bed02 100644 --- a/lib/core/request.js +++ b/lib/core/request.js @@ -194,7 +194,7 @@ class Request { channels.headers.publish({ request: this, response: { statusCode, headers, statusText } }) } - return this[kHandler].onHeaders(statusCode, headers, resume, statusText) + return this[kHandler].onHeaders(statusCode, headers, resume, statusText, this) } onData (chunk) { diff --git a/lib/handler/redirect.js b/lib/handler/redirect.js index 32f74ffa381..261db23725d 100644 --- a/lib/handler/redirect.js +++ b/lib/handler/redirect.js @@ -96,7 +96,7 @@ class RedirectHandler { } if (!this.location) { - return this.handler.onHeaders(statusCode, headers, resume, statusText) + return this.handler.onHeaders(statusCode, headers, resume, statusText, this) } const { origin, pathname, search } = util.parseURL(new URL(this.location, this.opts.origin)) diff --git a/test/diagnostics-channel/body.js b/test/diagnostics-channel/body.js new file mode 100644 index 00000000000..f8fc85d913f --- /dev/null +++ b/test/diagnostics-channel/body.js @@ -0,0 +1,144 @@ +'use strict' + +const { test, skip } = require('tap') + +/** @type {import('diagnostics_channel')} */ +let diagnosticsChannel + +try { + diagnosticsChannel = require('diagnostics_channel') +} catch { + skip('missing diagnostics_channel') + process.exit(0) +} + +const { request } = require('../..') +const { createServer } = require('http') +const { once } = require('events') +const util = require('util') +const Blob = require('buffer').Blob + +if (!Blob) { + skip('missing Blob') + process.exit(0) +} + +const server = createServer((req, res) => { + if (req.url === '/json') { + return res.end(JSON.stringify({ + hello: 'world' + })) + } else { + return res.end('hello world') + } +}) + +test('undici:body:consumed diagnostics channel works', async (t) => { + server.listen(0) + await once(server, 'listening') + t.teardown(server.close.bind(server)) + + t.test('body consumed as text is a string', async (t) => { + t.plan(2) + + const channel = diagnosticsChannel.channel('undici:body:consumed') + const listener = (v) => t.equal(v.body, 'hello world') + + t.teardown(() => channel.unsubscribe(listener)) + channel.subscribe(listener) + + const { body } = await request(`http://localhost:${server.address().port}`) + const text = await body.text() + + t.equal(text, 'hello world') + }) + + t.test('body consumed as json is an object', async (t) => { + t.plan(2) + + const channel = diagnosticsChannel.channel('undici:body:consumed') + const listener = (v) => t.same(v.body, { hello: 'world' }) + + t.teardown(() => channel.unsubscribe(listener)) + channel.subscribe(listener) + + const { body } = await request(`http://localhost:${server.address().port}/json`) + const json = await body.json() + + t.same(json, { hello: 'world' }) + }) + + t.test('body consumed as an ArrayBuffer is an ArrayBuffer', async (t) => { + t.plan(2) + + const uint8 = new Uint8Array(Buffer.from('hello world')) + + const channel = diagnosticsChannel.channel('undici:body:consumed') + const listener = (v) => t.same(v.body, uint8) + + t.teardown(() => channel.unsubscribe(listener)) + channel.subscribe(listener) + + const { body } = await request(`http://localhost:${server.address().port}`) + const arrayBuffer = await body.arrayBuffer() + + t.same(arrayBuffer, uint8) + }) + + t.test('body consumed as an ArrayBuffer is an ArrayBuffer', async (t) => { + t.plan(2) + + const uint8 = new Uint8Array(Buffer.from('hello world')) + + const channel = diagnosticsChannel.channel('undici:body:consumed') + const listener = (v) => t.same(v.body, uint8) + + t.teardown(() => channel.unsubscribe(listener)) + channel.subscribe(listener) + + const { body } = await request(`http://localhost:${server.address().port}`) + const arrayBuffer = await body.arrayBuffer() + + t.same(arrayBuffer, uint8) + }) + + t.test('body consumed as a Blob is a Blob', async (t) => { + t.plan(2) + + const expectedBlob = new Blob(['hello world']) + + const channel = diagnosticsChannel.channel('undici:body:consumed') + const listener = (v) => t.same(v.body, expectedBlob) + + t.teardown(() => channel.unsubscribe(listener)) + channel.subscribe(listener) + + const { body } = await request(`http://localhost:${server.address().port}`) + const blob = await body.blob() + + t.same(expectedBlob, blob) + }) + + t.test('ensure request is a request', async (t) => { + let req1, req2 + + const listener1 = ({ request }) => (req1 = request) + const listener2 = ({ request }) => (req2 = request) + + diagnosticsChannel.channel('undici:request:create').subscribe(listener1) + diagnosticsChannel.channel('undici:body:consumed').subscribe(listener2) + + t.teardown(() => { + diagnosticsChannel.channel('undici:request:create').unsubscribe(listener1) + diagnosticsChannel.channel('undici:body:consumed').unsubscribe(listener2) + }) + + const { body } = await request(`http://localhost:${server.address().port}/json`) + await body.arrayBuffer() // consume the body so undici:body:consumed is triggered + + t.ok(util.isDeepStrictEqual(req1, req2)) + t.end() + }) + + t.end() +})