Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add undici:body:consumed diagnostics channel #1369

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/api/DiagnosticsChannel.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,21 @@ diagnosticsChannel.channel('undici:client:connectError').subscribe(({ error, soc
// connector is a function that creates the socket
console.log(`Connect failed with ${error.message}`)
})
```

## `undici:body:consumed`

This message is published once a body is consumed using a [body mixin](https://github.com/nodejs/undici#body-mixins).

```js
diagnosticsChannel.channel('undici:body:consumed').subscribe(({ body, request }) => {
// request is the same object undici:request:create

// body is the body consumed (this corresponds with how the body was consumed. For example, if the body was consumed using .blob(), this property will be a blob)
if (typeof body === 'string') {
// the body was consumed with `.text()`
}
})
```

Note: Consuming the body in other ways (ie. async iterators or [stream consumers](https://nodejs.org/api/webstreams.html#utility-consumers)) will not cause a message to be published in this channel.
4 changes: 2 additions & 2 deletions lib/api/api-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class RequestHandler extends AsyncResource {
this.context = context
}

onHeaders (statusCode, rawHeaders, resume) {
onHeaders (statusCode, rawHeaders, resume, statusText, request) {
const { callback, opaque, abort, context } = this

if (statusCode < 200) {
Expand All @@ -82,7 +82,7 @@ class RequestHandler extends AsyncResource {
}

const parsedHeaders = util.parseHeaders(rawHeaders)
const body = new Readable(resume, abort, parsedHeaders['content-type'])
const body = new Readable(resume, abort, parsedHeaders['content-type'], request)

this.callback = null
this.res = body
Expand Down
22 changes: 20 additions & 2 deletions lib/api/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,24 @@ const util = require('../core/util')
const { ReadableStreamFrom, toUSVString } = require('../core/util')

let Blob
const channels = {}

try {
const diagnosticsChannel = require('diagnostics_channel')
channels.trailers = diagnosticsChannel.channel('undici:body:consumed')
} catch {
channels.trailers = { hasSubscribers: false }
}

const kConsume = Symbol('kConsume')
const kReading = Symbol('kReading')
const kBody = Symbol('kBody')
const kAbort = Symbol('abort')
const kContentType = Symbol('kContentType')
const kRequest = Symbol('kRequest')

module.exports = class BodyReadable extends Readable {
constructor (resume, abort, contentType = '') {
constructor (resume, abort, contentType = '', request) {
super({
autoDestroy: true,
read: resume,
Expand All @@ -30,6 +39,7 @@ module.exports = class BodyReadable extends Readable {
this[kConsume] = null
this[kBody] = null
this[kContentType] = contentType
this[kRequest] = channels.trailers.hasSubscribers ? request : null

// Is stream being consumed through Readable API?
// This is an optimization so that we avoid checking
Expand Down Expand Up @@ -178,10 +188,18 @@ async function consume (stream, type) {
assert(!stream[kConsume])

return new Promise((resolve, reject) => {
const res = (value) => {
if (channels.trailers.hasSubscribers && value !== undefined) {
channels.trailers.publish({ body: value, request: stream[kRequest] })
}

resolve(value)
}

stream[kConsume] = {
type,
stream,
resolve,
resolve: res,
reject,
length: 0,
body: []
Expand Down
2 changes: 1 addition & 1 deletion lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ class Parser {

let pause
try {
pause = request.onHeaders(statusCode, headers, this.resume, statusText) === false
pause = request.onHeaders(statusCode, headers, this.resume, statusText, request) === false
} catch (err) {
util.destroy(socket, err)
return -1
Expand Down
2 changes: 1 addition & 1 deletion lib/core/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class Request {
channels.headers.publish({ request: this, response: { statusCode, headers, statusText } })
}

return this[kHandler].onHeaders(statusCode, headers, resume, statusText)
return this[kHandler].onHeaders(statusCode, headers, resume, statusText, this)
}

onData (chunk) {
Expand Down
2 changes: 1 addition & 1 deletion lib/handler/redirect.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class RedirectHandler {
}

if (!this.location) {
return this.handler.onHeaders(statusCode, headers, resume, statusText)
return this.handler.onHeaders(statusCode, headers, resume, statusText, this)
}

const { origin, pathname, search } = util.parseURL(new URL(this.location, this.opts.origin))
Expand Down
144 changes: 144 additions & 0 deletions test/diagnostics-channel/body.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
'use strict'

const { test, skip } = require('tap')

/** @type {import('diagnostics_channel')} */
let diagnosticsChannel

try {
diagnosticsChannel = require('diagnostics_channel')
} catch {
skip('missing diagnostics_channel')
process.exit(0)
}

const { request } = require('../..')
const { createServer } = require('http')
const { once } = require('events')
const util = require('util')
const Blob = require('buffer').Blob

if (!Blob) {
skip('missing Blob')
process.exit(0)
}

const server = createServer((req, res) => {
if (req.url === '/json') {
return res.end(JSON.stringify({
hello: 'world'
}))
} else {
return res.end('hello world')
}
})

test('undici:body:consumed diagnostics channel works', async (t) => {
server.listen(0)
await once(server, 'listening')
t.teardown(server.close.bind(server))

t.test('body consumed as text is a string', async (t) => {
t.plan(2)

const channel = diagnosticsChannel.channel('undici:body:consumed')
const listener = (v) => t.equal(v.body, 'hello world')

t.teardown(() => channel.unsubscribe(listener))
channel.subscribe(listener)

const { body } = await request(`http://localhost:${server.address().port}`)
const text = await body.text()

t.equal(text, 'hello world')
})

t.test('body consumed as json is an object', async (t) => {
t.plan(2)

const channel = diagnosticsChannel.channel('undici:body:consumed')
const listener = (v) => t.same(v.body, { hello: 'world' })

t.teardown(() => channel.unsubscribe(listener))
channel.subscribe(listener)

const { body } = await request(`http://localhost:${server.address().port}/json`)
const json = await body.json()

t.same(json, { hello: 'world' })
})

t.test('body consumed as an ArrayBuffer is an ArrayBuffer', async (t) => {
t.plan(2)

const uint8 = new Uint8Array(Buffer.from('hello world'))

const channel = diagnosticsChannel.channel('undici:body:consumed')
const listener = (v) => t.same(v.body, uint8)

t.teardown(() => channel.unsubscribe(listener))
channel.subscribe(listener)

const { body } = await request(`http://localhost:${server.address().port}`)
const arrayBuffer = await body.arrayBuffer()

t.same(arrayBuffer, uint8)
})

t.test('body consumed as an ArrayBuffer is an ArrayBuffer', async (t) => {
t.plan(2)

const uint8 = new Uint8Array(Buffer.from('hello world'))

const channel = diagnosticsChannel.channel('undici:body:consumed')
const listener = (v) => t.same(v.body, uint8)

t.teardown(() => channel.unsubscribe(listener))
channel.subscribe(listener)

const { body } = await request(`http://localhost:${server.address().port}`)
const arrayBuffer = await body.arrayBuffer()

t.same(arrayBuffer, uint8)
})

t.test('body consumed as a Blob is a Blob', async (t) => {
t.plan(2)

const expectedBlob = new Blob(['hello world'])

const channel = diagnosticsChannel.channel('undici:body:consumed')
const listener = (v) => t.same(v.body, expectedBlob)

t.teardown(() => channel.unsubscribe(listener))
channel.subscribe(listener)

const { body } = await request(`http://localhost:${server.address().port}`)
const blob = await body.blob()

t.same(expectedBlob, blob)
})

t.test('ensure request is a request', async (t) => {
let req1, req2

const listener1 = ({ request }) => (req1 = request)
const listener2 = ({ request }) => (req2 = request)

diagnosticsChannel.channel('undici:request:create').subscribe(listener1)
diagnosticsChannel.channel('undici:body:consumed').subscribe(listener2)

t.teardown(() => {
diagnosticsChannel.channel('undici:request:create').unsubscribe(listener1)
diagnosticsChannel.channel('undici:body:consumed').unsubscribe(listener2)
})

const { body } = await request(`http://localhost:${server.address().port}/json`)
await body.arrayBuffer() // consume the body so undici:body:consumed is triggered

t.ok(util.isDeepStrictEqual(req1, req2))
t.end()
})

t.end()
})