-
Notifications
You must be signed in to change notification settings - Fork 121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add stream example #34
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
'use strict'; | ||
|
||
const Piscina = require('../../dist/src'); | ||
const { resolve } = require('path'); | ||
const { MessageChannel } = require('worker_threads'); | ||
const PortDuplex = require('./port_duplex'); | ||
const { pipeline, Writable } = require('stream'); | ||
|
||
const piscina = new Piscina({ | ||
filename: resolve(__dirname, 'worker.js') | ||
}); | ||
|
||
class W extends Writable { | ||
length = 0; | ||
_write (chunk, encoding, callback) { | ||
this.length += chunk.length; | ||
callback(); | ||
} | ||
}; | ||
|
||
(async function () { | ||
const channel = new MessageChannel(); | ||
const duplex = new PortDuplex(channel.port2, { writable: false }); | ||
const w = new W(); | ||
|
||
duplex.on('close', () => channel.port2.close()); | ||
|
||
pipeline(duplex, w, () => console.log(w.length)); | ||
|
||
await piscina.runTask({ port: channel.port1 }, [channel.port1]); | ||
})(); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
'use strict'; | ||
|
||
const { Duplex } = require('stream'); | ||
|
||
class PortDuplex extends Duplex { | ||
#port = undefined; | ||
|
||
constructor (port, options) { | ||
const { | ||
readable = true, | ||
writable = true | ||
} = { ...options }; | ||
if (typeof readable !== 'boolean') { | ||
throw new TypeError('readable must be a boolean'); | ||
} | ||
if (typeof writable !== 'boolean') { | ||
throw new TypeError('writable must be a boolean'); | ||
} | ||
super({ autoDestroy: true, readable, writable }); | ||
this.#port = port; | ||
this.#port.onmessage = PortDuplex.#onmessage.bind(this); | ||
} | ||
|
||
_write (chunk, encoding, callback) { | ||
// Chunk should always be a buffer here | ||
const temp = new Uint8Array(chunk); | ||
// TODO(@jasnell): This will need backpressure implemented | ||
this.#port.postMessage(temp, [temp.buffer]); | ||
callback(); | ||
} | ||
|
||
_read () { | ||
// Do nothing here. A more complete example would | ||
// implement proper read/pause behavior. | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Btw, I’ve thought about porting the code from our internal Worker stdio implementation to npm. This is definitely giving more motivation for that :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Likely would be good I think. I'm considering the possibility of Piscina providing a more complete version of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jasnell The thing is, in the end streaming use cases for Workers are even rarer than use cases for Workers in general. I don’t think we should encourage this pattern if users don’t really need it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's absolutely fair. The key reason I'm thinking through this is to explore the various good and bad ways of using the workers and what the various caveats are. |
||
|
||
_destroy (err, callback) { | ||
if (err) { | ||
this.#port.close(); | ||
callback(err); | ||
return; | ||
} | ||
if (this.writableEnded) { | ||
this.#port.postMessage(null); | ||
} | ||
callback(); | ||
} | ||
|
||
static #onmessage = function ({ data }) { | ||
if (data != null) { | ||
this.push(data); | ||
} else { | ||
this.push(null); | ||
} | ||
}; | ||
} | ||
|
||
module.exports = PortDuplex; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
'use strict'; | ||
|
||
const PortDuplex = require('./port_duplex'); | ||
const { createReadStream } = require('fs'); | ||
const { pipeline } = require('stream'); | ||
const { createGzip } = require('zlib'); | ||
|
||
module.exports = ({ port }) => { | ||
return new Promise((resolve, reject) => { | ||
const input = createReadStream(__filename); | ||
const stream = new PortDuplex(port, { readable: false }); | ||
pipeline(input, createGzip(), stream, (err) => { | ||
if (err) { | ||
reject(err); | ||
return; | ||
} | ||
resolve(); | ||
}); | ||
}); | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a feeling there should be some sort of backpressure built in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Absolutely it should