diff --git a/index.js b/index.js index 82a599a..d2f8334 100644 --- a/index.js +++ b/index.js @@ -30,14 +30,18 @@ function createThreadInterceptor (opts) { const port = roundRobin.next() - if (opts.headers) { - delete opts.headers.connection - delete opts.headers['transfer-encoding'] + const headers = { + ...opts?.headers, } + delete headers.connection + delete headers['transfer-encoding'] + headers.host = url.host + const id = nextId() const newOpts = { ...opts, + headers, } delete newOpts.dispatcher @@ -169,7 +173,7 @@ function wire (server, port, opts) { statusCode: res.statusCode, } - if (res.headers['content-length'].indexOf('application/json')) { + if (res.headers['content-type'].indexOf('application/json')) { // fast path because it's utf-8, use a string newRes.rawPayload = res.payload } else { diff --git a/package.json b/package.json index 8fc338f..8748f24 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,7 @@ "description": "An Undici interceptor that routes requests over a worker thread", "main": "index.js", "scripts": { - "test": "eslint && borp --coverage" + "test": "eslint && borp --coverage --check-coverage" }, "keywords": [ "undici", diff --git a/test/basic.test.js b/test/basic.test.js index 4afb42f..f8c4fbe 100644 --- a/test/basic.test.js +++ b/test/basic.test.js @@ -1,13 +1,14 @@ 'use strict' const { test } = require('node:test') -const { deepStrictEqual, strictEqual } = require('node:assert') +const { deepStrictEqual, strictEqual, rejects } = require('node:assert') const { join } = require('path') const { Worker } = require('worker_threads') const { createThreadInterceptor } = require('../') const { Agent, request } = require('undici') const { once } = require('events') const { setTimeout: sleep } = require('timers/promises') +const Fastify = require('fastify') test('basic', async (t) => { const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js')) @@ -113,3 +114,108 @@ test('two service in a mesh, one is terminated, then a message is sent', async ( statusCode: 500, }) }) + +test('buffer', async (t) => { + const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js')) + t.after(() => worker.terminate()) + + const interceptor = createThreadInterceptor({ + domain: '.local', + }) + interceptor.route('myserver', worker) + + const agent = new Agent().compose(interceptor) + + const { statusCode, body } = await request('http://myserver.local/buffer', { + dispatcher: agent, + }) + + strictEqual(statusCode, 200) + deepStrictEqual(Buffer.from(await body.arrayBuffer()), Buffer.from('hello')) +}) + +test('handle errors from inject', async (t) => { + const worker = new Worker(join(__dirname, 'fixtures', 'error.js')) + t.after(() => worker.terminate()) + + const interceptor = createThreadInterceptor({ + domain: '.local', + }) + interceptor.route('myserver', worker) + + const agent = new Agent().compose(interceptor) + + await rejects(request('http://myserver.local', { + dispatcher: agent, + }), new Error('kaboom')) +}) + +test('pass through with domain', async (t) => { + const app = Fastify() + app.get('/', async () => { + return { hello: 'world' } + }) + await app.listen({ port: 0 }) + t.after(() => app.close()) + + const interceptor = createThreadInterceptor({ + domain: '.local', + }) + + const agent = new Agent().compose(interceptor) + + const { statusCode, body } = await request(app.listeningOrigin, { + dispatcher: agent, + }) + + strictEqual(statusCode, 200) + deepStrictEqual(await body.json(), { hello: 'world' }) +}) + +test('unwanted headers are removed', async (t) => { + const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js')) + t.after(() => worker.terminate()) + + const interceptor = createThreadInterceptor({ + domain: '.local', + }) + interceptor.route('myserver', worker) + + const agent = new Agent().compose(interceptor) + + const { statusCode, body } = await request('http://myserver.local/echo-headers', { + headers: { + 'x-foo': 'bar', + connection: 'keep-alive', + 'transfer-encoding': 'chunked', + }, + dispatcher: agent, + }) + + strictEqual(statusCode, 200) + deepStrictEqual(await body.json(), { + 'user-agent': 'lightMyRequest', + host: 'myserver.local', + 'x-foo': 'bar', + }) +}) + +test('multiple headers', async (t) => { + const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js')) + t.after(() => worker.terminate()) + + const interceptor = createThreadInterceptor({ + domain: '.local', + }) + interceptor.route('myserver', worker) + + const agent = new Agent().compose(interceptor) + + const { statusCode, body, headers } = await request('http://myserver.local/headers', { + dispatcher: agent, + }) + + strictEqual(statusCode, 200) + deepStrictEqual(headers['x-foo'], ['bar', 'baz']) + await body.json() +}) diff --git a/test/fixtures/error.js b/test/fixtures/error.js new file mode 100644 index 0000000..c017263 --- /dev/null +++ b/test/fixtures/error.js @@ -0,0 +1,8 @@ +'use strict' + +const { parentPort } = require('worker_threads') +const { wire } = require('../../') + +wire(function (req, res) { + res.destroy(new Error('kaboom')) +}, parentPort) diff --git a/test/fixtures/worker1.js b/test/fixtures/worker1.js index c019549..2bb8080 100644 --- a/test/fixtures/worker1.js +++ b/test/fixtures/worker1.js @@ -6,12 +6,26 @@ const { wire } = require('../../') const app = fastify() -app.get('/', async (req, reply) => { +app.get('/', (req, reply) => { reply.send({ hello: workerData?.message || 'world' }) }) -app.get('/whoami', async (req, reply) => { +app.get('/whoami', (req, reply) => { reply.send({ threadId }) }) +app.get('/buffer', (req, reply) => { + reply.send(Buffer.from('hello')) +}) + +app.get('/echo-headers', (req, reply) => { + reply.send(req.headers) +}) + +app.get('/headers', (req, reply) => { + reply + .header('x-foo', ['bar', 'baz']) + .send({ hello: 'world' }) +}) + wire(app, parentPort) diff --git a/test/round-robin.test.js b/test/round-robin.test.js index 976a0a9..3fa1111 100644 --- a/test/round-robin.test.js +++ b/test/round-robin.test.js @@ -7,6 +7,7 @@ const { Worker } = require('worker_threads') const { createThreadInterceptor } = require('../') const { Agent, request } = require('undici') const { once } = require('events') +const RoundRobin = require('../lib/roundrobin') test('round-robin .route with array', async (t) => { const worker1 = new Worker(join(__dirname, 'fixtures', 'worker1.js'), { @@ -141,3 +142,8 @@ test('round-robin one worker exits, in flight request', async (t) => { dispatcher: agent, })) }) + +test('RoundRobin remove unknown port', () => { + const rr = new RoundRobin() + rr.remove({}) +})