Skip to content

Commit

Permalink
feat: add undici:body:consumed diagnostics channel
Browse files Browse the repository at this point in the history
  • Loading branch information
KhafraDev committed Apr 24, 2022
1 parent ae369b2 commit 6741636
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 7 deletions.
18 changes: 18 additions & 0 deletions docs/api/DiagnosticsChannel.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,21 @@ diagnosticsChannel.channel('undici:client:connectError').subscribe(({ error, soc
// connector is a function that creates the socket
console.log(`Connect failed with ${error.message}`)
})
```

## `undici:body:consumed`

This message is published once a body is consumed using a [body mixin](https://github.com/nodejs/undici#body-mixins).

```js
diagnosticsChannel.channel('undici:body:consumed').subscribe(({ body, request }) => {
// request is the same object undici:request:create

// body is the body consumed (this corresponds with how the body was consumed. For example, if the body was consumed using .blob(), this property will be a blob)
if (typeof body === 'string') {
// the body was consumed with `.text()`
}
})
```

Note: Consuming the body in other ways (ie. async iterators or [stream consumers](https://nodejs.org/api/webstreams.html#utility-consumers)) will not cause a message to be published in this channel.
4 changes: 2 additions & 2 deletions lib/api/api-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class RequestHandler extends AsyncResource {
this.context = context
}

onHeaders (statusCode, rawHeaders, resume) {
onHeaders (statusCode, rawHeaders, resume, statusText, request) {
const { callback, opaque, abort, context } = this

if (statusCode < 200) {
Expand All @@ -82,7 +82,7 @@ class RequestHandler extends AsyncResource {
}

const parsedHeaders = util.parseHeaders(rawHeaders)
const body = new Readable(resume, abort, parsedHeaders['content-type'])
const body = new Readable(resume, abort, parsedHeaders['content-type'], request)

this.callback = null
this.res = body
Expand Down
22 changes: 20 additions & 2 deletions lib/api/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,24 @@ const util = require('../core/util')
const { ReadableStreamFrom, toUSVString } = require('../core/util')

let Blob
const channels = {}

try {
const diagnosticsChannel = require('diagnostics_channel')
channels.trailers = diagnosticsChannel.channel('undici:body:consumed')
} catch {
channels.trailers = { hasSubscribers: false }
}

const kConsume = Symbol('kConsume')
const kReading = Symbol('kReading')
const kBody = Symbol('kBody')
const kAbort = Symbol('abort')
const kContentType = Symbol('kContentType')
const kRequest = Symbol('kRequest')

module.exports = class BodyReadable extends Readable {
constructor (resume, abort, contentType = '') {
constructor (resume, abort, contentType = '', request) {
super({
autoDestroy: true,
read: resume,
Expand All @@ -30,6 +39,7 @@ module.exports = class BodyReadable extends Readable {
this[kConsume] = null
this[kBody] = null
this[kContentType] = contentType
this[kRequest] = channels.trailers.hasSubscribers ? request : null

// Is stream being consumed through Readable API?
// This is an optimization so that we avoid checking
Expand Down Expand Up @@ -178,10 +188,18 @@ async function consume (stream, type) {
assert(!stream[kConsume])

return new Promise((resolve, reject) => {
const res = (value) => {
if (channels.trailers.hasSubscribers && value !== undefined) {
channels.trailers.publish({ body: value, request: stream[kRequest] })
}

resolve(value)
}

stream[kConsume] = {
type,
stream,
resolve,
resolve: res,
reject,
length: 0,
body: []
Expand Down
2 changes: 1 addition & 1 deletion lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ class Parser {

let pause
try {
pause = request.onHeaders(statusCode, headers, this.resume, statusText) === false
pause = request.onHeaders(statusCode, headers, this.resume, statusText, request) === false
} catch (err) {
util.destroy(socket, err)
return -1
Expand Down
2 changes: 1 addition & 1 deletion lib/core/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class Request {
channels.headers.publish({ request: this, response: { statusCode, headers, statusText } })
}

return this[kHandler].onHeaders(statusCode, headers, resume, statusText)
return this[kHandler].onHeaders(statusCode, headers, resume, statusText, this)
}

onData (chunk) {
Expand Down
2 changes: 1 addition & 1 deletion lib/handler/redirect.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class RedirectHandler {
}

if (!this.location) {
return this.handler.onHeaders(statusCode, headers, resume, statusText)
return this.handler.onHeaders(statusCode, headers, resume, statusText, this)
}

const { origin, pathname, search } = util.parseURL(new URL(this.location, this.opts.origin))
Expand Down
144 changes: 144 additions & 0 deletions test/diagnostics-channel/body.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
'use strict'

const { test, skip } = require('tap')

/** @type {import('diagnostics_channel')} */
let diagnosticsChannel

try {
diagnosticsChannel = require('diagnostics_channel')
} catch {
skip('missing diagnostics_channel')
process.exit(0)
}

const { request } = require('../..')
const { createServer } = require('http')
const { once } = require('events')
const util = require('util')
const Blob = require('buffer').Blob

if (!Blob) {
skip('missing Blob')
process.exit(0)
}

const server = createServer((req, res) => {
if (req.url === '/json') {
return res.end(JSON.stringify({
hello: 'world'
}))
} else {
return res.end('hello world')
}
})

test('undici:body:consumed diagnostics channel works', async (t) => {
server.listen(0)
await once(server, 'listening')
t.teardown(server.close.bind(server))

t.test('body consumed as text is a string', async (t) => {
t.plan(2)

const channel = diagnosticsChannel.channel('undici:body:consumed')
const listener = (v) => t.equal(v.body, 'hello world')

t.teardown(() => channel.unsubscribe(listener))
channel.subscribe(listener)

const { body } = await request(`http://localhost:${server.address().port}`)
const text = await body.text()

t.equal(text, 'hello world')
})

t.test('body consumed as json is an object', async (t) => {
t.plan(2)

const channel = diagnosticsChannel.channel('undici:body:consumed')
const listener = (v) => t.same(v.body, { hello: 'world' })

t.teardown(() => channel.unsubscribe(listener))
channel.subscribe(listener)

const { body } = await request(`http://localhost:${server.address().port}/json`)
const json = await body.json()

t.same(json, { hello: 'world' })
})

t.test('body consumed as an ArrayBuffer is an ArrayBuffer', async (t) => {
t.plan(2)

const uint8 = new Uint8Array(Buffer.from('hello world'))

const channel = diagnosticsChannel.channel('undici:body:consumed')
const listener = (v) => t.same(v.body, uint8)

t.teardown(() => channel.unsubscribe(listener))
channel.subscribe(listener)

const { body } = await request(`http://localhost:${server.address().port}`)
const arrayBuffer = await body.arrayBuffer()

t.same(arrayBuffer, uint8)
})

t.test('body consumed as an ArrayBuffer is an ArrayBuffer', async (t) => {
t.plan(2)

const uint8 = new Uint8Array(Buffer.from('hello world'))

const channel = diagnosticsChannel.channel('undici:body:consumed')
const listener = (v) => t.same(v.body, uint8)

t.teardown(() => channel.unsubscribe(listener))
channel.subscribe(listener)

const { body } = await request(`http://localhost:${server.address().port}`)
const arrayBuffer = await body.arrayBuffer()

t.same(arrayBuffer, uint8)
})

t.test('body consumed as a Blob is a Blob', async (t) => {
t.plan(2)

const expectedBlob = new Blob(['hello world'])

const channel = diagnosticsChannel.channel('undici:body:consumed')
const listener = (v) => t.same(v.body, expectedBlob)

t.teardown(() => channel.unsubscribe(listener))
channel.subscribe(listener)

const { body } = await request(`http://localhost:${server.address().port}`)
const blob = await body.blob()

t.same(expectedBlob, blob)
})

t.test('ensure request is a request', async (t) => {
let req1, req2

const listener1 = ({ request }) => (req1 = request)
const listener2 = ({ request }) => (req2 = request)

diagnosticsChannel.channel('undici:request:create').subscribe(listener1)
diagnosticsChannel.channel('undici:body:consumed').subscribe(listener2)

t.teardown(() => {
diagnosticsChannel.channel('undici:request:create').unsubscribe(listener1)
diagnosticsChannel.channel('undici:body:consumed').unsubscribe(listener2)
})

const { body } = await request(`http://localhost:${server.address().port}/json`)
await body.arrayBuffer() // consume the body so undici:body:consumed is triggered

t.ok(util.isDeepStrictEqual(req1, req2))
t.end()
})

t.end()
})

0 comments on commit 6741636

Please sign in to comment.