diff --git a/.eslintrc.js b/.eslintrc.js index 0f8edb6c..f31dc14e 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -52,6 +52,7 @@ module.exports = { '@typescript-eslint/no-unnecessary-type-assertion': 'off', '@typescript-eslint/no-unsafe-argument': 'off', '@typescript-eslint/unbound-method': 'off', + '@typescript-eslint/no-empty-interface': 'off', '@typescript-eslint/member-delimiter-style': ['error', { multiline: { delimiter: 'none', diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 8ed164f0..4dbb37ec 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -10,6 +10,7 @@ on: - test-* tags: - v* + - wasi-threads-v* pull_request: paths-ignore: - '**/*.md' @@ -25,6 +26,7 @@ env: jobs: build: + timeout-minutes: 15 name: Build runs-on: ubuntu-latest strategy: @@ -93,6 +95,14 @@ jobs: shell: bash run: npm run test -w packages/ts-transform-emscripten-esm-library + - name: Test @emnapi/wasi-threads + if: ${{ matrix.target == 'wasm32-wasi-threads' }} + shell: bash + run: | + node ./packages/wasi-threads/test/build.js + npm run test -w packages/wasi-threads + timeout-minutes: 1 + # - name: Lint # run: npm run lint @@ -133,7 +143,7 @@ jobs: release: name: Release - if: ${{ startsWith(github.event.ref, 'refs/tags') }} + if: ${{ startsWith(github.event.ref, 'refs/tags/v') }} needs: build runs-on: ubuntu-latest @@ -187,3 +197,31 @@ jobs: prerelease: false generate_release_notes: true files: ./script/emnapi.zip + + release-wasi-threads: + name: Release + if: ${{ startsWith(github.event.ref, 'refs/tags/wasi-threads-v') }} + needs: build + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-node@v3 + with: + node-version: '20.9.0' + registry-url: 'https://registry.npmjs.org' + env: + NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }} + + - name: NPM Install + shell: bash + run: | + npm install -g node-gyp + npm install + + - name: NPM Build + shell: bash + run: npm run build -w packages/wasi-threads + + - name: Publish + run: npm publish --ignore-scripts -w packages/wasi-threads diff --git a/.vscode/launch.json b/.vscode/launch.json index 3e349f69..d1d68099 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -22,7 +22,15 @@ "program": "${file}", "args": [], "preLaunchTask": "CMake: build ${input:target}" - } + }, + { + "type": "node", + "request": "launch", + "name": "wasi-threads test", + "runtimeArgs": [], + "program": "${workspaceFolder}/packages/wasi-threads/test/index.js", + "args": [] + }, ], "inputs": [ { diff --git a/package.json b/package.json index 0cd3b914..b2bfa736 100644 --- a/package.json +++ b/package.json @@ -59,6 +59,7 @@ "packages/rollup-plugin-emscripten-esm-library", "packages/runtime", "packages/node", + "packages/wasi-threads", "packages/emnapi", "packages/core", "packages/test", diff --git a/packages/core/package.json b/packages/core/package.json index 86931308..dcef87d0 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -23,11 +23,11 @@ }, "./dist/emnapi-core.min.mjs": { "types": "./dist/emnapi-core.d.mts", - "import": "./dist/emnapi-core.min.mjs", - "require": null + "default": "./dist/emnapi-core.min.mjs" } }, "dependencies": { + "@emnapi/wasi-threads": "1.0.0", "tslib": "^2.4.0" }, "scripts": { diff --git a/packages/core/script/build.js b/packages/core/script/build.js index 75955c6f..99cb0bf8 100644 --- a/packages/core/script/build.js +++ b/packages/core/script/build.js @@ -14,7 +14,7 @@ const dist = path.join(__dirname, '../dist') function build () { compile(path.join(__dirname, '../tsconfig.json'), { optionsToExtend: { - target: require('typescript').ScriptTarget.ES2019, + target: ts.ScriptTarget.ES2019, emitDeclarationOnly: true, declaration: true, declarationDir: path.join(__dirname, '../lib/typings') @@ -108,7 +108,7 @@ function build () { } }, { - input: createInput(ts.ScriptTarget.ES2019, false, ['tslib']), + input: createInput(ts.ScriptTarget.ES2019, false, ['tslib', '@emnapi/wasi-threads']), output: { file: path.join(dist, 'emnapi-core.cjs.js'), format: 'cjs', @@ -118,7 +118,7 @@ function build () { } }, { - input: createInput(ts.ScriptTarget.ES2019, true, ['tslib']), + input: createInput(ts.ScriptTarget.ES2019, true, ['tslib', '@emnapi/wasi-threads']), output: { file: path.join(dist, 'emnapi-core.cjs.min.js'), format: 'cjs', @@ -128,7 +128,7 @@ function build () { } }, { - input: createInput(ts.ScriptTarget.ES2019, false, ['tslib']), + input: createInput(ts.ScriptTarget.ES2019, false, ['tslib', '@emnapi/wasi-threads']), output: { file: path.join(dist, 'emnapi-core.mjs'), format: 'esm', @@ -138,7 +138,7 @@ function build () { } }, { - input: createInput(ts.ScriptTarget.ES2019, true, ['tslib']), + input: createInput(ts.ScriptTarget.ES2019, true, ['tslib', '@emnapi/wasi-threads']), output: { file: path.join(dist, 'emnapi-core.min.mjs'), format: 'esm', @@ -148,7 +148,7 @@ function build () { } }, { - input: createInput(ts.ScriptTarget.ES5, false, ['tslib']), + input: createInput(ts.ScriptTarget.ES5, false, ['tslib', '@emnapi/wasi-threads']), output: { file: path.join(dist, 'emnapi-core.esm-bundler.js'), format: 'esm', diff --git a/packages/core/src/emnapi/index.d.ts b/packages/core/src/emnapi/index.d.ts index e9c5b174..cb86cbdd 100644 --- a/packages/core/src/emnapi/index.d.ts +++ b/packages/core/src/emnapi/index.d.ts @@ -1,4 +1,5 @@ import type { Context } from '@emnapi/runtime' +import type { ThreadManager } from '@emnapi/wasi-threads' /** @public */ export declare interface PointerInfo { @@ -34,14 +35,17 @@ export declare interface NapiModule { len?: number ): T getMemoryAddress (arrayBufferOrView: ArrayBuffer | ArrayBufferView): PointerInfo + addSendListener (worker: any): boolean } init (options: InitOptions): any - spawnThread (startArg: number, errorOrTid?: number): number - startThread (tid: number, startArg: number): void initWorker (arg: number): void executeAsyncWork (work: number): void postMessage?: (msg: any) => any + + waitThreadStart: boolean | number + /** @internal */ + PThread: ThreadManager } /** @public */ @@ -69,7 +73,7 @@ export declare type BaseCreateOptions = { nodeBinding?: NodeBinding reuseWorker?: boolean asyncWorkPoolSize?: number - waitThreadStart?: boolean + waitThreadStart?: boolean | number onCreateWorker?: (info: CreateWorkerInfo) => any print?: (str: string) => void printErr?: (str: string) => void diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 0f6fa713..10107696 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -24,15 +24,16 @@ export type { export type { LoadOptions, InstantiateOptions, - InstantiatedSource, - ReactorWASI + LoadedSource, + InstantiatedSource } from './load' export type { - OnLoadData, - HandleOptions + MessageHandlerOptions } from './worker' export type { InputType } from './util' + +export * from '@emnapi/wasi-threads' diff --git a/packages/core/src/load.ts b/packages/core/src/load.ts index 56bb7550..5b326127 100644 --- a/packages/core/src/load.ts +++ b/packages/core/src/load.ts @@ -1,22 +1,21 @@ +import { type WASIInstance, WASIThreads } from '@emnapi/wasi-threads' import { type InputType, load, loadSync } from './util' import { createNapiModule } from './emnapi/index' import type { CreateOptions, NapiModule } from './emnapi/index' /** @public */ -export interface InstantiatedSource extends WebAssembly.WebAssemblyInstantiatedSource { - napiModule: NapiModule +export interface LoadedSource extends WebAssembly.WebAssemblyInstantiatedSource { + usedInstance: WebAssembly.Instance } /** @public */ -export interface ReactorWASI { - readonly wasiImport?: Record - initialize (instance: object): void - getImportObject? (): any +export interface InstantiatedSource extends LoadedSource { + napiModule: NapiModule } /** @public */ export interface LoadOptions { - wasi?: ReactorWASI + wasi?: WASIInstance overwriteImports?: (importObject: WebAssembly.Imports) => WebAssembly.Imports beforeInit?: (source: WebAssembly.WebAssemblyInstantiatedSource) => void getMemory?: (exports: WebAssembly.Exports) => WebAssembly.Memory @@ -67,25 +66,37 @@ function loadNapiModuleImpl (loadFn: Function, userNapiModule: NapiModule | unde } const wasi = options!.wasi + let wasiThreads: WASIThreads | undefined + let importObject: WebAssembly.Imports = { env: napiModule.imports.env, napi: napiModule.imports.napi, - emnapi: napiModule.imports.emnapi, - wasi: { - // eslint-disable-next-line camelcase - 'thread-spawn': function __imported_wasi_thread_spawn (startArg: number, errorOrTid: number) { - return napiModule.spawnThread(startArg, errorOrTid) - } - } + emnapi: napiModule.imports.emnapi } if (wasi) { + wasiThreads = new WASIThreads( + napiModule.childThread + ? { + wasi, + childThread: true, + postMessage: napiModule.postMessage! + } + : { + wasi, + threadManager: napiModule.PThread, + waitThreadStart: napiModule.waitThreadStart + } + ) + Object.assign( importObject, typeof wasi.getImportObject === 'function' ? wasi.getImportObject() : { wasi_snapshot_preview1: wasi.wasiImport } ) + + Object.assign(importObject, wasiThreads.getImportObject()) } const overwriteImports = options!.overwriteImports @@ -124,58 +135,11 @@ function loadNapiModuleImpl (loadFn: Function, userNapiModule: NapiModule | unde instance = { exports } } const module = source.module + if (wasi) { - if (napiModule.childThread) { - // https://github.com/nodejs/help/issues/4102 - const createHandler = function (target: WebAssembly.Exports): ProxyHandler { - const handlers = [ - 'apply', - 'construct', - 'defineProperty', - 'deleteProperty', - 'get', - 'getOwnPropertyDescriptor', - 'getPrototypeOf', - 'has', - 'isExtensible', - 'ownKeys', - 'preventExtensions', - 'set', - 'setPrototypeOf' - ] - const handler: ProxyHandler = {} - for (let i = 0; i < handlers.length; i++) { - const name = handlers[i] as keyof ProxyHandler - handler[name] = function () { - const args = Array.prototype.slice.call(arguments, 1) - args.unshift(target) - return (Reflect[name] as any).apply(Reflect, args) - } - } - return handler - } - const handler = createHandler(originalExports) - const noop = (): void => {} - handler.get = function (_target, p, receiver) { - if (p === 'memory') { - return memory - } - if (p === '_initialize') { - return noop - } - return Reflect.get(originalExports, p, receiver) - } - const exportsProxy = new Proxy(Object.create(null), handler) - instance = new Proxy(instance, { - get (target, p, receiver) { - if (p === 'exports') { - return exportsProxy - } - return Reflect.get(target, p, receiver) - } - }) - } - wasi.initialize(instance) + instance = wasiThreads!.initialize(instance, module, memory) + } else { + napiModule.PThread.setup(module, memory) } if (beforeInit) { @@ -192,7 +156,11 @@ function loadNapiModuleImpl (loadFn: Function, userNapiModule: NapiModule | unde table }) - const ret: any = { instance: originalInstance, module } + const ret: any = { + instance: originalInstance, + module, + usedInstance: instance + } if (!isLoad) { ret.napiModule = napiModule } @@ -229,7 +197,7 @@ export function loadNapiModule ( /** Only support `BufferSource` or `WebAssembly.Module` on Node.js */ wasmInput: InputType | Promise, options?: LoadOptions -): Promise { +): Promise { if (typeof napiModule !== 'object' || napiModule === null) { throw new TypeError('Invalid napiModule') } @@ -241,7 +209,7 @@ export function loadNapiModuleSync ( napiModule: NapiModule, wasmInput: BufferSource | WebAssembly.Module, options?: LoadOptions -): WebAssembly.WebAssemblyInstantiatedSource { +): LoadedSource { if (typeof napiModule !== 'object' || napiModule === null) { throw new TypeError('Invalid napiModule') } diff --git a/packages/core/src/worker.ts b/packages/core/src/worker.ts index 2def93ef..00c50fb3 100644 --- a/packages/core/src/worker.ts +++ b/packages/core/src/worker.ts @@ -1,144 +1,58 @@ +import { + ThreadMessageHandler, + type ThreadMessageHandlerOptions, + type LoadPayload +} from '@emnapi/wasi-threads' import type { NapiModule } from './emnapi/index' import type { InstantiatedSource } from './load' -/** @public */ -export interface OnLoadData { - wasmModule: WebAssembly.Module - wasmMemory: WebAssembly.Memory -} +export type { ThreadMessageHandlerOptions, LoadPayload } /** @public */ -export interface HandleOptions { - onLoad (data: OnLoadData): InstantiatedSource | Promise +export interface MessageHandlerOptions extends ThreadMessageHandlerOptions { + onLoad: (data: LoadPayload) => InstantiatedSource | PromiseLike } /** @public */ -export class MessageHandler { - onLoad: (data: OnLoadData) => InstantiatedSource | Promise - instance: WebAssembly.Instance | undefined - // module: WebAssembly.Module | undefined - napiModule: NapiModule | undefined - messagesBeforeLoad: any[] +export class MessageHandler extends ThreadMessageHandler { + public napiModule: NapiModule | undefined - constructor (options: HandleOptions) { - const onLoad = options.onLoad - if (typeof onLoad !== 'function') { + public constructor (options: MessageHandlerOptions) { + if (typeof options.onLoad !== 'function') { throw new TypeError('options.onLoad is not a function') } - this.onLoad = onLoad - this.instance = undefined - // this.module = undefined + super(options) this.napiModule = undefined - this.messagesBeforeLoad = [] } - handle (e: any): void { + public override instantiate (data: LoadPayload): InstantiatedSource | PromiseLike { + const source = this.onLoad!(data) as InstantiatedSource | PromiseLike + const then = (source as PromiseLike).then + if (typeof then === 'function') { + return (source as PromiseLike).then((result) => { + this.napiModule = result.napiModule + return result + }) + } + this.napiModule = (source as InstantiatedSource).napiModule + return source + } + + public override handle (e: any): void { + super.handle(e) if (e?.data?.__emnapi__) { const type = e.data.__emnapi__.type const payload = e.data.__emnapi__.payload - const onLoad = this.onLoad - if (type === 'load') { - if (this.instance !== undefined) return - let source: InstantiatedSource | Promise - try { - source = onLoad(payload) - } catch (err) { - onLoaded.call(this, err, null, payload) - return - } - const then = source && 'then' in source ? source.then : undefined - if (typeof then === 'function') { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - then.call( - source, - (source) => { onLoaded.call(this, null, source, payload) }, - (err) => { onLoaded.call(this, err, null, payload) } - ) - } else { - onLoaded.call(this, null, source as InstantiatedSource, payload) - } - } else if (type === 'start') { - handleAfterLoad.call(this, e, () => { - notifyPthreadCreateResult(payload.sab, 1) - this.napiModule!.startThread(payload.tid, payload.arg) - }) - } else if (type === 'async-worker-init') { - handleAfterLoad.call(this, e, () => { + if (type === 'async-worker-init') { + this.handleAfterLoad(e, () => { this.napiModule!.initWorker(payload.arg) }) } else if (type === 'async-work-execute') { - handleAfterLoad.call(this, e, () => { + this.handleAfterLoad(e, () => { this.napiModule!.executeAsyncWork(payload.work) }) } } } } - -function handleAfterLoad (this: MessageHandler, e: any, f: (e: any) => void): void { - if (this.instance !== undefined) { - f.call(this, e) - } else { - this.messagesBeforeLoad.push(e.data) - } -} - -interface LoadPayload { - wasmModule: WebAssembly.Module - wasmMemory: WebAssembly.Memory - sab?: Int32Array -} - -function notifyPthreadCreateResult (sab: Int32Array | undefined, result: number): void { - if (sab) { - Atomics.store(sab, 0, result) - Atomics.notify(sab, 0) - } -} - -function onLoaded (this: MessageHandler, err: Error | null, source: InstantiatedSource | null, payload: LoadPayload): void { - if (err) { - notifyPthreadCreateResult(payload.sab, 2) - throw err - } - - if (source == null) { - notifyPthreadCreateResult(payload.sab, 2) - throw new TypeError('onLoad should return an object') - } - - const instance = source.instance - const napiModule = source.napiModule - - if (!instance) { - notifyPthreadCreateResult(payload.sab, 2) - throw new TypeError('onLoad should return an object which includes "instance"') - } - if (!napiModule) { - notifyPthreadCreateResult(payload.sab, 2) - throw new TypeError('onLoad should return an object which includes "napiModule"') - } - if (!napiModule.childThread) { - notifyPthreadCreateResult(payload.sab, 2) - throw new Error('napiModule should be created with `childThread: true`') - } - - this.instance = instance - this.napiModule = napiModule - - const postMessage = napiModule.postMessage! - postMessage({ - __emnapi__: { - type: 'loaded', - payload: {} - } - }) - - const messages = this.messagesBeforeLoad - this.messagesBeforeLoad = [] - for (let i = 0; i < messages.length; i++) { - const data = messages[i] - this.handle({ data }) - } -} diff --git a/packages/core/tsconfig.json b/packages/core/tsconfig.json index 577085da..12f574f3 100644 --- a/packages/core/tsconfig.json +++ b/packages/core/tsconfig.json @@ -10,6 +10,7 @@ "outDir": "lib", "paths": { "tslib" : ["../../node_modules/tslib/tslib.d.ts"], + "@emnapi/wasi-threads": ["../wasi-threads/lib/typings/index.d.ts"], "@/*": ["./src/*"], }, "lib": [ diff --git a/packages/emnapi/script/build.js b/packages/emnapi/script/build.js index 916bfeb4..ba24e49c 100644 --- a/packages/emnapi/script/build.js +++ b/packages/emnapi/script/build.js @@ -129,6 +129,7 @@ async function build () { }) const parsedCode = compiler.parseCode(code) return `import { _WebAssembly as WebAssembly } from '@/util' +import { ThreadManager } from '@emnapi/wasi-threads' export function createNapiModule (options) { ${parsedCode} diff --git a/packages/emnapi/src/core/async-work.ts b/packages/emnapi/src/core/async-work.ts index c3ee2929..1cb77f2a 100644 --- a/packages/emnapi/src/core/async-work.ts +++ b/packages/emnapi/src/core/async-work.ts @@ -86,7 +86,7 @@ var emnapiAWMT = { } try { for (let i = 0; i < n; ++i) { - const worker = onCreateWorker({ type: 'async-work' }) + const worker = onCreateWorker({ type: 'async-work', name: 'emnapi-async-worker' }) const p = PThread.loadWasmModuleToWorker(worker) emnapiAWMT.addListener(worker) promises.push(p.then(() => { diff --git a/packages/emnapi/src/core/init.ts b/packages/emnapi/src/core/init.ts index ac35ecca..fc38a2f9 100644 --- a/packages/emnapi/src/core/init.ts +++ b/packages/emnapi/src/core/init.ts @@ -3,8 +3,6 @@ import { makeDynCall, to64 } from 'emscripten:parse-tools' -type SharedInt32Array = Int32Array - export interface InitOptions { instance: WebAssembly.Instance module: WebAssembly.Module @@ -26,11 +24,12 @@ export interface INapiModule { envObject?: Env init (options: InitOptions): any - spawnThread (startArg: number, errorOrTid?: number): number - startThread (tid: number, startArg: number): void initWorker (arg: number): void executeAsyncWork (work: number): void postMessage?: (msg: any) => any + + waitThreadStart: boolean | number + PThread: ThreadManager } declare const process: any @@ -38,7 +37,7 @@ declare const process: any export var ENVIRONMENT_IS_NODE = typeof process === 'object' && process !== null && typeof process.versions === 'object' && process.versions !== null && typeof process.versions.node === 'string' export var ENVIRONMENT_IS_PTHREAD = Boolean(options.childThread) export var reuseWorker = Boolean(options.reuseWorker) -export var waitThreadStart = Boolean(options.waitThreadStart) +export var waitThreadStart = typeof options.waitThreadStart === 'number' ? options.waitThreadStart : Boolean(options.waitThreadStart) export var wasmInstance: WebAssembly.Instance export var wasmModule: WebAssembly.Module @@ -72,11 +71,12 @@ export var napiModule: INapiModule = { filename: '', childThread: Boolean(options.childThread), - spawnThread: undefined!, - startThread: undefined!, initWorker: undefined!, executeAsyncWork: undefined!, + waitThreadStart, + PThread: undefined!, + init (options: InitOptions) { if (napiModule.loaded) return napiModule.exports if (!options) throw new TypeError('Invalid napi init options') @@ -136,7 +136,7 @@ export var napiModule: INapiModule = { export var emnapiCtx: Context export var emnapiNodeBinding: NodeBinding -export var onCreateWorker: (info: { type: 'thread' | 'async-work' }) => any +export var onCreateWorker: (info: { type: 'thread' | 'async-work'; name: string }) => any = undefined! export var out: (str: string) => void export var err: (str: string) => void @@ -244,270 +244,15 @@ function emnapiAddSendListener (worker: any): boolean { napiModule.emnapi.addSendListener = emnapiAddSendListener -function terminateWorker (worker: any): void { - const tid = worker.__emnapi_tid - worker.terminate() - worker.onmessage = (e: any) => { - if (e.data.__emnapi__) { - err('received "' + e.data.__emnapi__.type + '" command from terminated worker: ' + tid) - } - } -} - -function cleanThread (worker: any, tid: number, force?: boolean): void { - if (!force && reuseWorker) { - PThread.returnWorkerToPool(worker) - } else { - delete PThread.pthreads[tid] - const index = PThread.runningWorkers.indexOf(worker) - if (index !== -1) { - PThread.runningWorkers.splice(index, 1) - } - terminateWorker(worker) - delete worker.__emnapi_tid - } -} - -function checkSharedWasmMemory (): void { - if (typeof SharedArrayBuffer === 'undefined' || !(wasmMemory.buffer instanceof SharedArrayBuffer)) { - throw new Error( - 'Multithread features require shared wasm memory. ' + - 'Try to compile with `-matomics -mbulk-memory` and use `--import-memory --shared-memory` during linking' - ) - } -} - -function spawnThread (startArg: number, errorOrTid: number): number { - checkSharedWasmMemory() - - const isNewABI = errorOrTid !== undefined - if (!isNewABI) { - errorOrTid = _malloc(to64('8')) - if (!errorOrTid) { - return -48 /* ENOMEM */ - } - } - const struct = new Int32Array(wasmMemory.buffer, errorOrTid, 2) - Atomics.store(struct, 0, 0) - Atomics.store(struct, 1, 0) - - if (ENVIRONMENT_IS_PTHREAD) { - const postMessage = napiModule.postMessage! - postMessage({ - __emnapi__: { - type: 'spawn-thread', - payload: { - startArg, - errorOrTid - } - } - }) - Atomics.wait(struct, 1, 0) - const isError = Atomics.load(struct, 0) - const result = Atomics.load(struct, 1) - if (isNewABI) { - return isError - } - _free(to64('errorOrTid')) - return isError ? -result : result - } - - let sab: Int32Array | undefined - if (waitThreadStart) { - sab = new Int32Array(new SharedArrayBuffer(4)) - Atomics.store(sab, 0, 0) - } - - let worker: any - const tid = PThread.nextWorkerID + 43 - try { - worker = PThread.getNewWorker(sab) - if (!worker) { - throw new Error('failed to get new worker') - } - - const WASI_THREADS_MAX_TID = 0x1FFFFFFF - PThread.nextWorkerID = (PThread.nextWorkerID + 1) % (WASI_THREADS_MAX_TID - 42) - PThread.pthreads[tid] = worker - worker.__emnapi_tid = tid - if (ENVIRONMENT_IS_NODE) { - worker.ref() - } - worker.postMessage({ - __emnapi__: { - type: 'start', - payload: { - tid, - arg: startArg, - sab - } - } - }) - if (waitThreadStart) { - Atomics.wait(sab!, 0, 0) - const r = Atomics.load(sab!, 0) - if (r === 2) { - throw new Error('failed to start pthread') - } - } - } catch (e) { - const EAGAIN = 6 - - Atomics.store(struct, 0, 1) - Atomics.store(struct, 1, EAGAIN) - Atomics.notify(struct, 1) - - err(e.message) - if (isNewABI) { - return 1 - } - _free(to64('errorOrTid')) - return -EAGAIN - } - - Atomics.store(struct, 0, 0) - Atomics.store(struct, 1, tid) - Atomics.notify(struct, 1) - - PThread.runningWorkers.push(worker) - if (!waitThreadStart) { - worker.whenLoaded.catch((err: any) => { - delete worker.whenLoaded - cleanThread(worker, tid, true) - throw err - }) - } - - if (isNewABI) { - return 0 - } - _free(to64('errorOrTid')) - return tid -} - -function startThread (tid: number, startArg: number): void { - if (napiModule.childThread) { - if (typeof wasmInstance.exports.wasi_thread_start !== 'function') { - throw new TypeError('wasi_thread_start is not exported') - } - const postMessage = napiModule.postMessage! - ;(wasmInstance.exports.wasi_thread_start as Function)(tid, startArg) - postMessage({ - __emnapi__: { - type: 'cleanup-thread', - payload: { - tid - } - } - }) - } else { - throw new Error('startThread is only available in child threads') - } -} - -napiModule.spawnThread = spawnThread -napiModule.startThread = startThread - -export var PThread = { - unusedWorkers: [] as any[], - runningWorkers: [] as any[], - pthreads: Object.create(null), - nextWorkerID: 0, - init () {}, - returnWorkerToPool (worker: any) { - var tid = worker.__emnapi_tid - delete PThread.pthreads[tid] - PThread.unusedWorkers.push(worker) - PThread.runningWorkers.splice(PThread.runningWorkers.indexOf(worker), 1) - delete worker.__emnapi_tid - if (ENVIRONMENT_IS_NODE) { - worker.unref() - } +export var PThread = new ThreadManager({ + printErr: err, + beforeLoad: (worker) => { + emnapiAddSendListener(worker) }, - loadWasmModuleToWorker: (worker: any, sab?: SharedInt32Array) => { - if (worker.whenLoaded) return worker.whenLoaded - worker.whenLoaded = new Promise((resolve, reject) => { - worker.onmessage = function (e: any) { - if (e.data.__emnapi__) { - const type = e.data.__emnapi__.type - const payload = e.data.__emnapi__.payload - if (type === 'loaded') { - worker.loaded = true - if (ENVIRONMENT_IS_NODE && !worker.__emnapi_tid) { - worker.unref() - } - resolve(worker) - // if (payload.err) { - // err('failed to load in child thread: ' + (payload.err.message || payload.err)) - // } - } else if (type === 'spawn-thread') { - spawnThread(payload.startArg, payload.errorOrTid) - } else if (type === 'cleanup-thread') { - cleanThread(worker, payload.tid) - } - } - } - worker.onerror = (e: any) => { - const message = 'worker sent an error!' - // if (worker.pthread_ptr) { - // message = 'Pthread ' + ptrToString(worker.pthread_ptr) + ' sent an error!' - // } - err(message + ' ' + e.message) - reject(e) - throw e - } - if (ENVIRONMENT_IS_NODE) { - worker.on('message', function (data: any) { - worker.onmessage({ - data - }) - }) - worker.on('error', function (e: any) { - worker.onerror(e) - }) - worker.on('detachedExit', function () {}) - } - // napiModule.emnapi.addSendListener(worker) - emnapiAddSendListener(worker) - // if (typeof emnapiTSFN !== 'undefined') { - // emnapiTSFN.addListener(worker) - // } - try { - worker.postMessage({ - __emnapi__: { - type: 'load', - payload: { - wasmModule, - wasmMemory, - sab - } - } - }) - } catch (err) { - checkSharedWasmMemory() - throw err - } - }) - return worker.whenLoaded - }, - allocateUnusedWorker () { - if (typeof onCreateWorker !== 'function') { - throw new TypeError('`options.onCreateWorker` is not provided') - } - const worker = onCreateWorker({ type: 'thread' }) - PThread.unusedWorkers.push(worker) - return worker - }, - getNewWorker (sab?: SharedInt32Array) { - if (reuseWorker) { - if (PThread.unusedWorkers.length === 0) { - const worker = PThread.allocateUnusedWorker() - PThread.loadWasmModuleToWorker(worker, sab) - } - return PThread.unusedWorkers.pop() - } - const worker = PThread.allocateUnusedWorker() - PThread.loadWasmModuleToWorker(worker, sab) - return PThread.unusedWorkers.pop() - } -} + reuseWorker, + onCreateWorker: onCreateWorker as ThreadManagerOptions['onCreateWorker'] ?? (() => { + throw new Error('options.onCreateWorker` is not provided') + }) +}) + +napiModule.PThread = PThread diff --git a/packages/emnapi/src/core/scope.d.ts b/packages/emnapi/src/core/scope.d.ts index dca5563c..6e1f463e 100644 --- a/packages/emnapi/src/core/scope.d.ts +++ b/packages/emnapi/src/core/scope.d.ts @@ -5,7 +5,7 @@ declare interface CreateOptions { childThread?: boolean reuseWorker?: boolean asyncWorkPoolSize?: number - waitThreadStart?: boolean + waitThreadStart?: boolean | number onCreateWorker?: () => any print?: (str: string) => void printErr?: (str: string) => void @@ -14,3 +14,8 @@ declare interface CreateOptions { // factory parameter declare const options: CreateOptions + +declare type ThreadManagerOptions = import('../../../wasi-threads/lib/typings/index').ThreadManagerOptions +declare const ThreadManager: typeof import('../../../wasi-threads/lib/typings/index').ThreadManager +// eslint-disable-next-line @typescript-eslint/no-redeclare +declare type ThreadManager = import('../../../wasi-threads/lib/typings/index').ThreadManager diff --git a/packages/runtime/package.json b/packages/runtime/package.json index 3f6acea4..4099f8da 100644 --- a/packages/runtime/package.json +++ b/packages/runtime/package.json @@ -23,8 +23,7 @@ }, "./dist/emnapi.min.mjs": { "types": "./dist/emnapi.d.mts", - "import": "./dist/emnapi.min.mjs", - "require": null + "default": "./dist/emnapi.min.mjs" } }, "dependencies": { diff --git a/packages/test/util.js b/packages/test/util.js index bdc3ad5c..7caf830b 100644 --- a/packages/test/util.js +++ b/packages/test/util.js @@ -50,7 +50,7 @@ function loadPath (request, options) { : -RUNTIME_UV_THREADPOOL_SIZE, filename: request, reuseWorker: true, - waitThreadStart: true, + waitThreadStart: 1000, onCreateWorker () { return new Worker(join(__dirname, './worker.js'), { env: process.env, diff --git a/packages/wasi-threads/.gitignore b/packages/wasi-threads/.gitignore new file mode 100644 index 00000000..152d3668 --- /dev/null +++ b/packages/wasi-threads/.gitignore @@ -0,0 +1,6 @@ +/lib +node_modules +/dist +/src/emnapi/**/*.js +/test/**/*.wasm +/test/**/*.wat diff --git a/packages/wasi-threads/.npmignore b/packages/wasi-threads/.npmignore new file mode 100644 index 00000000..d6275066 --- /dev/null +++ b/packages/wasi-threads/.npmignore @@ -0,0 +1,10 @@ +.vscode +node_modules +/script +/lib +/src +.gitignore +.npmignore +api-extractor.json +tsconfig.json +/test diff --git a/packages/wasi-threads/LICENSE b/packages/wasi-threads/LICENSE new file mode 100644 index 00000000..05a59441 --- /dev/null +++ b/packages/wasi-threads/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021-present Toyobayashi + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/packages/wasi-threads/README.md b/packages/wasi-threads/README.md new file mode 100644 index 00000000..81cbea1c --- /dev/null +++ b/packages/wasi-threads/README.md @@ -0,0 +1,170 @@ +# @emnapi/wasi-threads + +This package makes [wasi-threads proposal](https://github.com/WebAssembly/wasi-threads) based WASI modules work in Node.js and browser. + +## Quick Start + +`index.html` + +```html + + + +``` + +`index.js` + +```js +(function (main) { + const ENVIRONMENT_IS_NODE = + typeof process === 'object' && process !== null && + typeof process.versions === 'object' && process.versions !== null && + typeof process.versions.node === 'string' + + if (ENVIRONMENT_IS_NODE) { + main(require) + } else { + const nodeWasi = { WASI: globalThis.wasmUtil.WASI } + const nodeWorkerThreads = { + Worker: globalThis.Worker + } + const _require = function (request) { + if (request === 'node:wasi' || request === 'wasi') return nodeWasi + if (request === 'node:worker_threads' || request === 'worker_threads') return nodeWorkerThreads + if (request === '@emnapi/wasi-threads') return globalThis.wasiThreads + throw new Error('Can not find module: ' + request) + } + main(_require) + } +})(async function () { + const { WASI } = require('wasi') + const Worker = require('worker_threads') + const { WASIThreads } = require('@emnapi/wasi-threads') + + const wasi = new WASI({ + version: 'preview1' + }) + const wasiThreads = new WASIThreads({ + wasi, + onCreateWorker: () => { + return new Worker('./worker.js', { + execArgv: ['--experimental-wasi-unstable-preview1'] + }) + } + }) + const memory = new WebAssembly.Memory({ + initial: 16777216 / 65536, + maximum: 2147483648 / 65536, + shared: true + }) + let input + const file = 'path/to/your/wasi-module.wasm' + try { + input = require('fs').readFileSync(require('path').join(__dirname, file)) + } catch (err) { + const response = await fetch(file) + input = await response.arrayBuffer() + } + let { module, instance } = await WebAssembly.instantiate(input, { + env: { memory }, + wasi_snapshot_preview1: wasi.wasiImport, + ...wasiThreads.getImportObject() + }) + + if (typeof instance.exports._start === 'function') { + const { exitCode } = wasiThreads.start(instance, module, memory) + return exitCode + } else { + instance = wasiThreads.initialize(instance, module, memory) + // instance.exports.exported_wasm_function() + } +}) +``` + +`worker.js` + +```js +(function (main) { + const ENVIRONMENT_IS_NODE = + typeof process === 'object' && process !== null && + typeof process.versions === 'object' && process.versions !== null && + typeof process.versions.node === 'string' + + if (ENVIRONMENT_IS_NODE) { + const _require = function (request) { + return require(request) + } + + const _init = function () { + const nodeWorkerThreads = require('worker_threads') + const parentPort = nodeWorkerThreads.parentPort + + parentPort.on('message', (data) => { + globalThis.onmessage({ data }) + }) + + Object.assign(globalThis, { + self: globalThis, + require, + Worker: nodeWorkerThreads.Worker, + importScripts: function (f) { + (0, eval)(require('fs').readFileSync(f, 'utf8') + '//# sourceURL=' + f) + }, + postMessage: function (msg) { + parentPort.postMessage(msg) + } + }) + } + + main(_require, _init) + } else { + importScripts('./node_modules/@tybys/wasm-util/dist/wasm-util.js') + importScripts('./node_modules/@emnapi/wasi-threads/dist/wasi-threads.js') + + const nodeWasi = { WASI: globalThis.wasmUtil.WASI } + const _require = function (request) { + if (request === '@emnapi/wasi-threads') return globalThis.wasiThreads + if (request === 'node:wasi' || request === 'wasi') return nodeWasi + throw new Error('Can not find module: ' + request) + } + const _init = function () {} + main(_require, _init) + } +})(function main (require, init) { + init() + + const { WASI } = require('wasi') + const { ThreadMessageHandler, WASIThreads } = require('@emnapi/wasi-threads') + + const handler = new ThreadMessageHandler({ + async onLoad ({ wasmModule, wasmMemory }) { + const wasi = new WASI({ + version: 'preview1' + }) + + const wasiThreads = new WASIThreads({ + wasi, + childThread: true + }) + + const originalInstance = await WebAssembly.instantiate(wasmModule, { + env: { + memory: wasmMemory, + }, + wasi_snapshot_preview1: wasi.wasiImport, + ...wasiThreads.getImportObject() + }) + + // must call `initialize` instead of `start` in child thread + const instance = wasiThreads.initialize(originalInstance, wasmModule, wasmMemory) + + return { module: wasmModule, instance } + } + }) + + globalThis.onmessage = function (e) { + handler.handle(e) + // handle other messages + } +}) +``` diff --git a/packages/wasi-threads/api-extractor.json b/packages/wasi-threads/api-extractor.json new file mode 100644 index 00000000..b4269167 --- /dev/null +++ b/packages/wasi-threads/api-extractor.json @@ -0,0 +1,115 @@ +{ + "$schema": "https://developer.microsoft.com/json-schemas/api-extractor/v7/api-extractor.schema.json", + + // "extends": "./shared/api-extractor-base.json" + // "extends": "my-package/include/api-extractor-base.json" + + "projectFolder": ".", + + "mainEntryPointFilePath": "/lib/typings/index.d.ts", + + "bundledPackages": [], + + "compiler": { + "tsconfigFilePath": "/tsconfig.json" + + // "overrideTsconfig": { + // . . . + // } + + // "skipLibCheck": true, + }, + + "apiReport": { + "enabled": false + + // "reportFileName": ".api.md", + + // "reportFolder": "/etc/", + + // "reportTempFolder": "/api/temp/" + }, + + "docModel": { + "enabled": false + + // "apiJsonFilePath": "/temp/.api.json" + }, + + "dtsRollup": { + "enabled": true, + + "untrimmedFilePath": "", + + // "betaTrimmedFilePath": "/dist/-beta.d.ts", + + "publicTrimmedFilePath": "/dist/wasi-threads.d.ts" + + // "omitTrimmingComments": true + }, + + "tsdocMetadata": { + "enabled": false, + + "tsdocMetadataFilePath": "/dist/tsdoc-metadata.json" + }, + + // "newlineKind": "crlf", + + "messages": { + /** + * Configures handling of diagnostic messages reported by the TypeScript compiler engine while analyzing + * the input .d.ts files. + * + * TypeScript message identifiers start with "TS" followed by an integer. For example: "TS2551" + * + * DEFAULT VALUE: A single "default" entry with logLevel=warning. + */ + "compilerMessageReporting": { + "default": { + "logLevel": "warning" + + // "addToApiReportFile": false + } + + // "TS2551": { + // "logLevel": "warning", + // "addToApiReportFile": true + // }, + // + // . . . + }, + + "extractorMessageReporting": { + "default": { + "logLevel": "warning" + // "addToApiReportFile": false + }, + "ae-missing-release-tag": { + "logLevel": "none", + "addToApiReportFile": false + } + + // "ae-extra-release-tag": { + // "logLevel": "warning", + // "addToApiReportFile": true + // }, + // + // . . . + }, + + "tsdocMessageReporting": { + "default": { + "logLevel": "warning" + // "addToApiReportFile": false + } + + // "tsdoc-link-tag-unescaped-text": { + // "logLevel": "warning", + // "addToApiReportFile": true + // }, + // + // . . . + } + } +} diff --git a/packages/wasi-threads/index.js b/packages/wasi-threads/index.js new file mode 100644 index 00000000..7e6eabd6 --- /dev/null +++ b/packages/wasi-threads/index.js @@ -0,0 +1,5 @@ +if (typeof process !== 'undefined' && process.env.NODE_ENV === 'production') { + module.exports = require('./dist/wasi-threads.cjs.min.js') +} else { + module.exports = require('./dist/wasi-threads.cjs.js') +} diff --git a/packages/wasi-threads/package.json b/packages/wasi-threads/package.json new file mode 100644 index 00000000..2d244043 --- /dev/null +++ b/packages/wasi-threads/package.json @@ -0,0 +1,50 @@ +{ + "name": "@emnapi/wasi-threads", + "version": "1.0.0", + "description": "WASI threads proposal implementation in JavaScript", + "main": "index.js", + "module": "./dist/wasi-threads.esm-bundler.js", + "types": "./dist/wasi-threads.d.ts", + "sideEffects": false, + "exports": { + ".": { + "types": { + "module": "./dist/wasi-threads.d.ts", + "import": "./dist/wasi-threads.d.mts", + "default": "./dist/wasi-threads.d.ts" + }, + "module": "./dist/wasi-threads.esm-bundler.js", + "import": "./dist/wasi-threads.mjs", + "default": "./index.js" + }, + "./dist/wasi-threads.cjs.min": { + "types": "./dist/wasi-threads.d.ts", + "default": "./dist/wasi-threads.cjs.min.js" + }, + "./dist/wasi-threads.min.mjs": { + "types": "./dist/wasi-threads.d.mts", + "default": "./dist/wasi-threads.min.mjs" + } + }, + "dependencies": { + "tslib": "^2.4.0" + }, + "scripts": { + "build": "node ./script/build.js", + "build:test": "node ./test/build.js", + "test": "node ./test/index.js" + }, + "repository": { + "type": "git", + "url": "git+https://github.com/toyobayashi/emnapi.git" + }, + "author": "toyobayashi", + "license": "MIT", + "bugs": { + "url": "https://github.com/toyobayashi/emnapi/issues" + }, + "homepage": "https://github.com/toyobayashi/emnapi#readme", + "publishConfig": { + "access": "public" + } +} diff --git a/packages/wasi-threads/script/build.js b/packages/wasi-threads/script/build.js new file mode 100644 index 00000000..d5eb320f --- /dev/null +++ b/packages/wasi-threads/script/build.js @@ -0,0 +1,194 @@ +const path = require('path') +const fs = require('fs-extra') +const rollup = require('rollup') +const ts = require('typescript') +const rollupTypescript = require('@rollup/plugin-typescript').default +const rollupNodeResolve = require('@rollup/plugin-node-resolve').default +const rollupReplace = require('@rollup/plugin-replace').default +const rollupTerser = require('@rollup/plugin-terser').default +const rollupAlias = require('@rollup/plugin-alias').default +const { compile } = require('@tybys/tsapi') +const dist = path.join(__dirname, '../dist') + +function build () { + compile(path.join(__dirname, '../tsconfig.json'), { + optionsToExtend: { + target: require('typescript').ScriptTarget.ES2019, + emitDeclarationOnly: true, + declaration: true, + declarationMap: true, + declarationDir: path.join(__dirname, '../lib/typings') + } + }) + + /** + * @param {ts.ScriptTarget} esversion + * @param {boolean=} minify + * @returns {rollup.RollupOptions} + */ + function createInput (esversion, minify, external) { + return { + input: path.join(__dirname, '../src/index.ts'), + external, + plugins: [ + rollupTypescript({ + tsconfig: path.join(__dirname, '../tsconfig.json'), + tslib: path.join( + path.dirname(require.resolve('tslib')), + JSON.parse(fs.readFileSync(path.join(path.dirname(require.resolve('tslib')), 'package.json'))).module + ), + compilerOptions: { + target: esversion, + ...(esversion !== ts.ScriptTarget.ES5 ? { removeComments: true, downlevelIteration: false } : {}) + }, + include: [ + './src/**/*' + ], + transformers: { + after: [ + require('@tybys/ts-transform-pure-class').default + ] + } + }), + rollupAlias({ + entries: [ + { find: '@', replacement: path.join(__dirname, '../src') } + ] + }), + rollupNodeResolve({ + mainFields: ['module', 'main'] + }), + rollupReplace({ + preventAssignment: true, + values: { + __VERSION__: JSON.stringify(require('../package.json').version) + } + }), + ...(minify + ? [ + rollupTerser({ + compress: true, + mangle: true, + format: { + comments: false + } + }) + ] + : []) + ] + } + } + + const globalName = 'wasiThreads' + + return Promise.all(([ + { + input: createInput(ts.ScriptTarget.ES5, false), + output: { + file: path.join(dist, 'wasi-threads.js'), + format: 'umd', + name: globalName, + exports: 'named', + strict: false + } + }, + { + input: createInput(ts.ScriptTarget.ES5, true), + output: { + file: path.join(dist, 'wasi-threads.min.js'), + format: 'umd', + name: globalName, + exports: 'named', + strict: false + } + }, + { + input: createInput(ts.ScriptTarget.ES2019, false, ['tslib']), + output: { + file: path.join(dist, 'wasi-threads.cjs.js'), + format: 'cjs', + name: globalName, + exports: 'named', + strict: false + } + }, + { + input: createInput(ts.ScriptTarget.ES2019, true, ['tslib']), + output: { + file: path.join(dist, 'wasi-threads.cjs.min.js'), + format: 'cjs', + name: globalName, + exports: 'named', + strict: false + } + }, + { + input: createInput(ts.ScriptTarget.ES2019, false, ['tslib']), + output: { + file: path.join(dist, 'wasi-threads.mjs'), + format: 'esm', + name: globalName, + exports: 'named', + strict: false + } + }, + { + input: createInput(ts.ScriptTarget.ES2019, true, ['tslib']), + output: { + file: path.join(dist, 'wasi-threads.min.mjs'), + format: 'esm', + name: globalName, + exports: 'named', + strict: false + } + }, + { + input: createInput(ts.ScriptTarget.ES5, false, ['tslib']), + output: { + file: path.join(dist, 'wasi-threads.esm-bundler.js'), + format: 'esm', + name: globalName, + exports: 'named', + strict: false + } + } + ]).map(conf => { + return rollup.rollup(conf.input).then(bundle => bundle.write(conf.output)) + })).then(() => { + const { + Extractor, + ExtractorConfig + } = require('@microsoft/api-extractor') + const apiExtractorJsonPath = path.join(__dirname, '../api-extractor.json') + const extractorConfig = ExtractorConfig.loadFileAndPrepare(apiExtractorJsonPath) + const extractorResult = Extractor.invoke(extractorConfig, { + localBuild: true, + showVerboseMessages: true + }) + if (extractorResult.succeeded) { + console.log('API Extractor completed successfully') + } else { + const errmsg = `API Extractor completed with ${extractorResult.errorCount} errors and ${extractorResult.warningCount} warnings` + return Promise.reject(new Error(errmsg)) + } + + const dts = extractorConfig.publicTrimmedFilePath + + const mDts = path.join(__dirname, '../dist/wasi-threads.d.mts') + const cjsMinDts = path.join(__dirname, '../dist/wasi-threads.cjs.min.d.ts') + const mjsMinDts = path.join(__dirname, '../dist/wasi-threads.min.d.mts') + fs.copyFileSync(dts, mDts) + fs.copyFileSync(dts, cjsMinDts) + fs.copyFileSync(dts, mjsMinDts) + fs.appendFileSync(dts, `\nexport as namespace ${globalName};\n`, 'utf8') + }) +} + +exports.build = build + +if (module === require.main) { + build().catch(err => { + console.error(err) + process.exit(1) + }) +} diff --git a/packages/wasi-threads/src/command.ts b/packages/wasi-threads/src/command.ts new file mode 100644 index 00000000..72e09ed1 --- /dev/null +++ b/packages/wasi-threads/src/command.ts @@ -0,0 +1,53 @@ +export interface LoadPayload { + wasmModule: WebAssembly.Module + wasmMemory: WebAssembly.Memory + sab?: Int32Array +} + +export interface LoadedPayload {} + +export interface StartPayload { + tid: number + arg: number + sab?: Int32Array +} + +export interface CleanupThreadPayload { + tid: number +} + +export interface TerminateAllThreadsPayload {} + +export interface SpawnThreadPayload { + startArg: number + errorOrTid: number +} + +export interface CommandPayloadMap { + load: LoadPayload + loaded: LoadedPayload + start: StartPayload + 'cleanup-thread': CleanupThreadPayload + 'terminate-all-threads': TerminateAllThreadsPayload + 'spawn-thread': SpawnThreadPayload +} + +type CommandType = keyof CommandPayloadMap + +export interface CommandInfo { + type: CommandType + payload: CommandPayloadMap[T] +} + +export interface MessageEventData { + __emnapi__: CommandInfo +} + +export function createMessage (type: T, payload: CommandPayloadMap[T]): MessageEventData { + return { + __emnapi__: { + type, + payload + } + } +} diff --git a/packages/wasi-threads/src/index.ts b/packages/wasi-threads/src/index.ts new file mode 100644 index 00000000..708d6b6f --- /dev/null +++ b/packages/wasi-threads/src/index.ts @@ -0,0 +1,25 @@ +export type { ThreadManagerOptions, WorkerLike, WorkerMessageEvent, WorkerFactory } from './thread-manager' +export { ThreadManager } from './thread-manager' + +export type { + WASIInstance, + StartResult, + WASIThreadsOptions, + MainThreadOptions, + ChildThreadOptions, + BaseOptions, + WASIThreadsImports, + MainThreadBaseOptions, + MainThreadOptionsWithThreadManager, + MainThreadOptionsCreateThreadManager +} from './wasi-threads' +export { WASIThreads } from './wasi-threads' + +export { ThreadMessageHandler } from './worker' +export type { ThreadMessageHandlerOptions } from './worker' + +export { createInstanceProxy } from './proxy' + +export { isTrapError } from './util' + +export type { LoadPayload } from './command' diff --git a/packages/wasi-threads/src/proxy.ts b/packages/wasi-threads/src/proxy.ts new file mode 100644 index 00000000..9bb35249 --- /dev/null +++ b/packages/wasi-threads/src/proxy.ts @@ -0,0 +1,70 @@ +export const kIsProxy = Symbol('kIsProxy') + +/** @public */ +export function createInstanceProxy ( + instance: WebAssembly.Instance, + memory?: WebAssembly.Memory | (() => WebAssembly.Memory) +): WebAssembly.Instance { + if ((instance as any)[kIsProxy]) return instance + + // https://github.com/nodejs/help/issues/4102 + const originalExports = instance.exports + const createHandler = function (target: WebAssembly.Exports): ProxyHandler { + const handlers = [ + 'apply', + 'construct', + 'defineProperty', + 'deleteProperty', + 'get', + 'getOwnPropertyDescriptor', + 'getPrototypeOf', + 'has', + 'isExtensible', + 'ownKeys', + 'preventExtensions', + 'set', + 'setPrototypeOf' + ] + const handler: ProxyHandler = {} + for (let i = 0; i < handlers.length; i++) { + const name = handlers[i] as keyof ProxyHandler + handler[name] = function () { + const args = Array.prototype.slice.call(arguments, 1) + args.unshift(target) + return (Reflect[name] as any).apply(Reflect, args) + } + } + return handler + } + const handler = createHandler(originalExports) + const _initialize = (): void => {} + const _start = (): number => 0 + handler.get = function (_target, p, receiver) { + if (p === 'memory') { + return (typeof memory === 'function' ? memory() : memory) ?? Reflect.get(originalExports, p, receiver) + } + if (p === '_initialize') { + return p in originalExports ? _initialize : undefined + } + if (p === '_start') { + return p in originalExports ? _start : undefined + } + return Reflect.get(originalExports, p, receiver) + } + handler.has = function (_target, p) { + if (p === 'memory') return true + return Reflect.has(originalExports, p) + } + const exportsProxy = new Proxy(Object.create(null), handler) + return new Proxy(instance, { + get (target, p, receiver) { + if (p === 'exports') { + return exportsProxy + } + if (p === kIsProxy) { + return true + } + return Reflect.get(target, p, receiver) + } + }) +} diff --git a/packages/wasi-threads/src/thread-manager.ts b/packages/wasi-threads/src/thread-manager.ts new file mode 100644 index 00000000..af5938b6 --- /dev/null +++ b/packages/wasi-threads/src/thread-manager.ts @@ -0,0 +1,262 @@ +import type { Worker as NodeWorker } from 'worker_threads' +import { ENVIRONMENT_IS_NODE } from './util' +import { type MessageEventData, createMessage, type CommandPayloadMap, type CleanupThreadPayload } from './command' + +/** @public */ +export type WorkerLike = (Worker | NodeWorker) & { + whenLoaded?: Promise + loaded?: boolean + __emnapi_tid?: number +} + +/** @public */ +export interface WorkerMessageEvent { + data: T +} + +/** @public */ +export type WorkerFactory = (ctx: { type: string; name: string }) => WorkerLike + +/** @public */ +export interface ThreadManagerOptions { + printErr?: (message: string) => void + beforeLoad?: (worker: WorkerLike) => any + reuseWorker?: boolean + onCreateWorker: WorkerFactory +} + +const WASI_THREADS_MAX_TID = 0x1FFFFFFF + +export function checkSharedWasmMemory (wasmMemory?: WebAssembly.Memory | null): void { + if (typeof SharedArrayBuffer === 'undefined' || (wasmMemory && !(wasmMemory.buffer instanceof SharedArrayBuffer))) { + throw new Error( + 'Multithread features require shared wasm memory. ' + + 'Try to compile with `-matomics -mbulk-memory` and use `--import-memory --shared-memory` during linking' + ) + } +} + +let nextWorkerID = 0 + +/** @public */ +export class ThreadManager { + public unusedWorkers: WorkerLike[] = [] + public runningWorkers: WorkerLike[] = [] + public pthreads: Record = Object.create(null) + public get nextWorkerID (): number { return nextWorkerID } + + public wasmModule: WebAssembly.Module | null = null + public wasmMemory: WebAssembly.Memory | null = null + private readonly messageEvents = new WeakMap void>>() + + private readonly _onCreateWorker: WorkerFactory + private readonly _reuseWorker: boolean + private readonly _beforeLoad?: (worker: WorkerLike) => any + + /** @internal */ + public readonly printErr: (message: string) => void + + public constructor (options: ThreadManagerOptions) { + const onCreateWorker = options.onCreateWorker + if (typeof onCreateWorker !== 'function') { + throw new TypeError('`options.onCreateWorker` is not provided') + } + this._onCreateWorker = onCreateWorker + this._reuseWorker = options.reuseWorker ?? false + this._beforeLoad = options.beforeLoad + this.printErr = options.printErr ?? console.error.bind(console) + } + + public init (): void {} + + public setup (wasmModule: WebAssembly.Module, wasmMemory: WebAssembly.Memory): void { + this.wasmModule = wasmModule + this.wasmMemory = wasmMemory + } + + public markId (worker: WorkerLike): number { + if (worker.__emnapi_tid) return worker.__emnapi_tid + const tid = nextWorkerID + 43 + nextWorkerID = (nextWorkerID + 1) % (WASI_THREADS_MAX_TID - 42) + this.pthreads[tid] = worker + worker.__emnapi_tid = tid + return tid + } + + public returnWorkerToPool (worker: WorkerLike): void { + var tid = worker.__emnapi_tid + if (tid !== undefined) { + delete this.pthreads[tid] + } + this.unusedWorkers.push(worker) + this.runningWorkers.splice(this.runningWorkers.indexOf(worker), 1) + delete worker.__emnapi_tid + if (ENVIRONMENT_IS_NODE) { + (worker as NodeWorker).unref() + } + } + + public loadWasmModuleToWorker (worker: WorkerLike, sab?: Int32Array): Promise { + if (worker.whenLoaded) return worker.whenLoaded + const err = this.printErr + const beforeLoad = this._beforeLoad + // eslint-disable-next-line @typescript-eslint/no-this-alias + const _this = this + worker.whenLoaded = new Promise((resolve, reject) => { + const handleError = function (e: { message: string }): void { + let message = 'worker sent an error!' + if (worker.__emnapi_tid !== undefined) { + message = 'worker (tid = ' + worker.__emnapi_tid + ') sent an error!' + } + err(message + ' ' + e.message) + if (e.message.indexOf('RuntimeError') !== -1 || e.message.indexOf('unreachable') !== -1) { + try { + _this.terminateAllThreads() + } catch (_) {} + } + reject(e) + throw e as Error + } + const handleMessage = (data: MessageEventData): void => { + if (data.__emnapi__) { + const type = data.__emnapi__.type + const payload = data.__emnapi__.payload + if (type === 'loaded') { + worker.loaded = true + if (ENVIRONMENT_IS_NODE && !worker.__emnapi_tid) { + (worker as NodeWorker).unref() + } + resolve(worker) + // if (payload.err) { + // err('failed to load in child thread: ' + (payload.err.message || payload.err)) + // } + } else if (type === 'cleanup-thread') { + if ((payload as CleanupThreadPayload).tid in this.pthreads) { + this.cleanThread(worker, (payload as CleanupThreadPayload).tid) + } + } + } + }; + (worker as Worker).onmessage = (e) => { + handleMessage(e.data) + + this.fireMessageEvent(worker, e) + }; + (worker as Worker).onerror = handleError + if (ENVIRONMENT_IS_NODE) { + (worker as NodeWorker).on('message', function (data: any) { + (worker as any).onmessage?.({ + data + }) + }); + (worker as NodeWorker).on('error', function (e) { + (worker as any).onerror?.(e) + }); + (worker as NodeWorker).on('detachedExit', function () {}) + } + + if (typeof beforeLoad === 'function') { + beforeLoad(worker) + } + + try { + worker.postMessage(createMessage('load', { + wasmModule: this.wasmModule!, + wasmMemory: this.wasmMemory!, + sab + })) + } catch (err) { + checkSharedWasmMemory(this.wasmMemory) + throw err + } + }) + return worker.whenLoaded + } + + public allocateUnusedWorker (): WorkerLike { + const _onCreateWorker = this._onCreateWorker + const worker = _onCreateWorker({ type: 'thread', name: 'emnapi-pthread' }) + this.unusedWorkers.push(worker) + return worker + } + + public getNewWorker (sab?: Int32Array): WorkerLike | undefined { + if (this._reuseWorker) { + if (this.unusedWorkers.length === 0) { + const worker = this.allocateUnusedWorker() + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this.loadWasmModuleToWorker(worker, sab) + } + return this.unusedWorkers.pop() + } + const worker = this.allocateUnusedWorker() + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this.loadWasmModuleToWorker(worker, sab) + return this.unusedWorkers.pop() + } + + public cleanThread (worker: WorkerLike, tid: number, force?: boolean): void { + if (!force && this._reuseWorker) { + this.returnWorkerToPool(worker) + } else { + delete this.pthreads[tid] + const index = this.runningWorkers.indexOf(worker) + if (index !== -1) { + this.runningWorkers.splice(index, 1) + } + this.terminateWorker(worker) + delete worker.__emnapi_tid + } + } + + public terminateWorker (worker: WorkerLike): void { + const tid = worker.__emnapi_tid + // eslint-disable-next-line @typescript-eslint/no-floating-promises + worker.terminate() + this.messageEvents.get(worker)?.clear() + this.messageEvents.delete(worker); + (worker as Worker).onmessage = (e: any) => { + if (e.data.__emnapi__) { + const err = this.printErr + err('received "' + e.data.__emnapi__.type + '" command from terminated worker: ' + tid) + } + } + } + + public terminateAllThreads (): void { + for (let i = 0; i < this.runningWorkers.length; ++i) { + this.terminateWorker(this.runningWorkers[i]) + } + for (let i = 0; i < this.unusedWorkers.length; ++i) { + this.terminateWorker(this.unusedWorkers[i]) + } + this.unusedWorkers = [] + this.runningWorkers = [] + this.pthreads = Object.create(null) + } + + public addMessageEventListener (worker: WorkerLike, onMessage: (e: WorkerMessageEvent) => void): () => void { + let listeners = this.messageEvents.get(worker) + if (!listeners) { + listeners = new Set() + this.messageEvents.set(worker, listeners) + } + listeners.add(onMessage) + return () => { + listeners?.delete(onMessage) + } + } + + public fireMessageEvent (worker: WorkerLike, e: WorkerMessageEvent): void { + const listeners = this.messageEvents.get(worker) + if (!listeners) return + const err = this.printErr + listeners.forEach((listener) => { + try { + listener(e) + } catch (e) { + err(e.stack) + } + }) + } +} diff --git a/packages/wasi-threads/src/util.ts b/packages/wasi-threads/src/util.ts new file mode 100644 index 00000000..8e657d96 --- /dev/null +++ b/packages/wasi-threads/src/util.ts @@ -0,0 +1,75 @@ +declare const WXWebAssembly: typeof WebAssembly | undefined +const _WebAssembly = typeof WebAssembly !== 'undefined' + ? WebAssembly + : typeof WXWebAssembly !== 'undefined' + ? WXWebAssembly + : undefined! + +export const ENVIRONMENT_IS_NODE = typeof process === 'object' && process !== null && + typeof process.versions === 'object' && process.versions !== null && + typeof process.versions.node === 'string' + +export function getPostMessage (options?: { postMessage?: (message: any) => void }): ((message: any) => void) | undefined { + return typeof options?.postMessage === 'function' + ? options.postMessage + : typeof postMessage === 'function' + ? postMessage + : undefined +} + +export function serizeErrorToBuffer (sab: SharedArrayBuffer, code: number, error?: Error): void { + const i32array = new Int32Array(sab) + Atomics.store(i32array, 0, code) + if (code > 1 && error) { + const name = error.name + const message = error.message + const stack = error.stack + const nameBuffer = new TextEncoder().encode(name) + const messageBuffer = new TextEncoder().encode(message) + const stackBuffer = new TextEncoder().encode(stack) + Atomics.store(i32array, 1, nameBuffer.length) + Atomics.store(i32array, 2, messageBuffer.length) + Atomics.store(i32array, 3, stackBuffer.length) + const buffer = new Uint8Array(sab) + buffer.set(nameBuffer, 16) + buffer.set(messageBuffer, 16 + nameBuffer.length) + buffer.set(stackBuffer, 16 + nameBuffer.length + messageBuffer.length) + } +} + +export function deserizeErrorFromBuffer (sab: SharedArrayBuffer): Error | null { + const i32array = new Int32Array(sab) + const status = Atomics.load(i32array, 0) + if (status <= 1) { + return null + } + const nameLength = Atomics.load(i32array, 1) + const messageLength = Atomics.load(i32array, 2) + const stackLength = Atomics.load(i32array, 3) + const buffer = new Uint8Array(sab) + const nameBuffer = buffer.slice(16, 16 + nameLength) + const messageBuffer = buffer.slice(16 + nameLength, 16 + nameLength + messageLength) + const stackBuffer = buffer.slice(16 + nameLength + messageLength, 16 + nameLength + messageLength + stackLength) + const name = new TextDecoder().decode(nameBuffer) + const message = new TextDecoder().decode(messageBuffer) + const stack = new TextDecoder().decode(stackBuffer) + + const ErrorConstructor = (globalThis as any)[name] ?? Error + const error = new ErrorConstructor(message) + Object.defineProperty(error, 'stack', { + value: stack, + writable: true, + enumerable: false, + configurable: true + }) + return error +} + +/** @public */ +export function isTrapError (e: Error): e is WebAssembly.RuntimeError { + try { + return e instanceof _WebAssembly.RuntimeError + } catch (_) { + return false + } +} diff --git a/packages/wasi-threads/src/wasi-threads.ts b/packages/wasi-threads/src/wasi-threads.ts new file mode 100644 index 00000000..308ced0a --- /dev/null +++ b/packages/wasi-threads/src/wasi-threads.ts @@ -0,0 +1,371 @@ +import { ENVIRONMENT_IS_NODE, deserizeErrorFromBuffer, getPostMessage, isTrapError } from './util' +import { checkSharedWasmMemory, ThreadManager } from './thread-manager' +import type { WorkerMessageEvent, ThreadManagerOptions } from './thread-manager' +import { type CommandPayloadMap, type MessageEventData, createMessage, type SpawnThreadPayload } from './command' +import { createInstanceProxy } from './proxy' + +/** @public */ +export interface WASIInstance { + readonly wasiImport?: Record + initialize (instance: object): void + start (instance: object): number + getImportObject? (): any +} + +/** @public */ +export interface BaseOptions { + wasi: WASIInstance + version?: 'preview1' + wasm64?: boolean +} + +/** @public */ +export interface MainThreadBaseOptions extends BaseOptions { + waitThreadStart?: boolean | number +} + +/** @public */ +export interface MainThreadOptionsWithThreadManager extends MainThreadBaseOptions { + threadManager?: ThreadManager | (() => ThreadManager) +} + +/** @public */ +export interface MainThreadOptionsCreateThreadManager extends MainThreadBaseOptions, ThreadManagerOptions {} + +/** @public */ +export type MainThreadOptions = MainThreadOptionsWithThreadManager | MainThreadOptionsCreateThreadManager + +/** @public */ +export interface ChildThreadOptions extends BaseOptions { + childThread: true + postMessage?: (data: any) => void +} + +/** @public */ +export type WASIThreadsOptions = MainThreadOptions | ChildThreadOptions + +/** @public */ +export interface WASIThreadsImports { + 'thread-spawn': (startArg: number, errorOrTid?: number) => number +} + +/** @public */ +export interface StartResult { + exitCode: number + instance: WebAssembly.Instance +} + +const patchedWasiInstances = new WeakMap>() + +/** @public */ +export class WASIThreads { + public PThread: ThreadManager | undefined + private wasmMemory!: WebAssembly.Memory + private wasmInstance!: WebAssembly.Instance + + private readonly threadSpawn: (startArg: number, errorOrTid?: number) => number + public readonly childThread: boolean + private readonly postMessage: ((message: any) => void) | undefined + public readonly wasi: WASIInstance + + public constructor (options: WASIThreadsOptions) { + if (!options) { + throw new TypeError('WASIThreads(): options is not provided') + } + + if (!options.wasi) { + throw new TypeError('WASIThreads(): options.wasi is not provided') + } + + patchedWasiInstances.set(this, new WeakSet()) + + const wasi = options.wasi + patchWasiInstance(this, wasi) + this.wasi = wasi + + if ('childThread' in options) { + this.childThread = Boolean(options.childThread) + } else { + this.childThread = false + } + + this.PThread = undefined + if ('threadManager' in options) { + if (typeof options.threadManager === 'function') { + this.PThread = options.threadManager() + } else { + this.PThread = options.threadManager + } + } else { + if (!this.childThread) { + this.PThread = new ThreadManager(options as ThreadManagerOptions) + } + } + + let waitThreadStart: boolean | number = false + if ('waitThreadStart' in options) { + waitThreadStart = typeof options.waitThreadStart === 'number' ? options.waitThreadStart : Boolean(options.waitThreadStart) + } + + const postMessage = getPostMessage(options as ChildThreadOptions) + if (this.childThread && typeof postMessage !== 'function') { + throw new TypeError('options.postMessage is not a function') + } + this.postMessage = postMessage + + const wasm64 = Boolean(options.wasm64) + + const onMessage = (e: WorkerMessageEvent>): void => { + if (e.data.__emnapi__) { + const type = e.data.__emnapi__.type + const payload = e.data.__emnapi__.payload + if (type === 'spawn-thread') { + threadSpawn( + (payload as SpawnThreadPayload).startArg, + (payload as SpawnThreadPayload).errorOrTid + ) + } else if (type === 'terminate-all-threads') { + this.terminateAllThreads() + } + } + } + + const threadSpawn = (startArg: number, errorOrTid?: number): number => { + checkSharedWasmMemory(this.wasmMemory) + + const isNewABI = errorOrTid !== undefined + if (!isNewABI) { + const malloc = this.wasmInstance.exports.malloc as Function + errorOrTid = wasm64 ? Number(malloc(BigInt(8))) : malloc(8) + if (!errorOrTid) { + return -48 /* ENOMEM */ + } + } + const _free = this.wasmInstance.exports.free as Function + const free = wasm64 ? (ptr: number) => { _free(BigInt(ptr)) } : _free + const struct = new Int32Array(this.wasmMemory.buffer, errorOrTid!, 2) + Atomics.store(struct, 0, 0) + Atomics.store(struct, 1, 0) + + if (this.childThread) { + postMessage!(createMessage('spawn-thread', { + startArg, + errorOrTid: errorOrTid! + })) + Atomics.wait(struct, 1, 0) + const isError = Atomics.load(struct, 0) + const result = Atomics.load(struct, 1) + if (isNewABI) { + return isError + } + free(errorOrTid!) + return isError ? -result : result + } + + const shouldWait = waitThreadStart || (waitThreadStart === 0) + + let sab: Int32Array | undefined + if (shouldWait) { + sab = new Int32Array(new SharedArrayBuffer(16 + 8192)) + Atomics.store(sab, 0, 0) + } + + let worker: any + let tid: number + const PThread = this.PThread + try { + worker = PThread!.getNewWorker(sab) + if (!worker) { + throw new Error('failed to get new worker') + } + PThread!.addMessageEventListener(worker, onMessage) + + tid = PThread!.markId(worker) + if (ENVIRONMENT_IS_NODE) { + worker.ref() + } + worker.postMessage(createMessage('start', { + tid, + arg: startArg, + sab + })) + if (shouldWait) { + if (typeof waitThreadStart === 'number') { + const waitResult = Atomics.wait(sab!, 0, 0, waitThreadStart) + if (waitResult === 'timed-out') { + throw new Error('Spawning thread timed out. Please check if the worker is created successfully and if message is handled properly in the worker.') + } + } else { + Atomics.wait(sab!, 0, 0) + } + const r = Atomics.load(sab!, 0) + if (r > 1) { + throw deserizeErrorFromBuffer(sab!.buffer as SharedArrayBuffer)! + } + } + } catch (e) { + const EAGAIN = 6 + + Atomics.store(struct, 0, 1) + Atomics.store(struct, 1, EAGAIN) + Atomics.notify(struct, 1) + + PThread?.printErr(e.stack) + if (isNewABI) { + return 1 + } + free(errorOrTid!) + return -EAGAIN + } + + Atomics.store(struct, 0, 0) + Atomics.store(struct, 1, tid) + Atomics.notify(struct, 1) + + PThread!.runningWorkers.push(worker) + if (!shouldWait) { + worker.whenLoaded.catch((err: any) => { + delete worker.whenLoaded + PThread!.cleanThread(worker, tid, true) + throw err + }) + } + + if (isNewABI) { + return 0 + } + free(errorOrTid!) + return tid + } + + this.threadSpawn = threadSpawn + } + + public getImportObject (): { wasi: WASIThreadsImports } { + return { + wasi: { + 'thread-spawn': this.threadSpawn + } + } + } + + public setup (wasmInstance: WebAssembly.Instance, wasmModule: WebAssembly.Module, wasmMemory?: WebAssembly.Memory): void { + wasmMemory ??= wasmInstance.exports.memory as WebAssembly.Memory + this.wasmInstance = wasmInstance + this.wasmMemory = wasmMemory + if (this.PThread) { + this.PThread.setup(wasmModule, wasmMemory) + } + } + + /** + * It's ok to call this method to a WASI command module. + * + * in child thread, must call this method instead of {@link WASIThreads.start} even if it's a WASI command module + * + * @returns A proxied WebAssembly instance if in child thread, other wise the original instance + */ + public initialize (instance: WebAssembly.Instance, module: WebAssembly.Module, memory?: WebAssembly.Memory): WebAssembly.Instance { + const exports = instance.exports + memory ??= exports.memory as WebAssembly.Memory + if (this.childThread) { + instance = createInstanceProxy(instance, memory) + } + this!.setup(instance, module, memory) + const wasi = this.wasi + if (('_start' in exports) && (typeof exports._start === 'function')) { + if (this.childThread) { + wasi.start(instance) + try { + const kStarted = getWasiSymbol(wasi, 'kStarted'); + (wasi as any)[kStarted!] = false + } catch (_) {} + } else { + setupInstance(wasi, instance) + } + } else { + wasi.initialize(instance) + } + return instance + } + + /** + * Equivalent to calling {@link WASIThreads.initialize} and then calling {@link WASIInstance.start} + * ```js + * this.initialize(instance, module, memory) + * this.wasi.start(instance) + * ``` + */ + public start (instance: WebAssembly.Instance, module: WebAssembly.Module, memory?: WebAssembly.Memory): StartResult { + const exports = instance.exports + memory ??= exports.memory as WebAssembly.Memory + if (this.childThread) { + instance = createInstanceProxy(instance, memory) + } + this!.setup(instance, module, memory) + const exitCode = this.wasi.start(instance) + return { exitCode, instance } + } + + public terminateAllThreads (): void { + if (!this.childThread) { + this.PThread?.terminateAllThreads() + } else { + this.postMessage!(createMessage('terminate-all-threads', {})) + } + } +} + +function patchWasiInstance (wasiThreads: WASIThreads, wasi: WASIInstance): void { + const patched = patchedWasiInstances.get(wasiThreads)! + if (patched.has(wasi)) { + return + } + + const _this = wasiThreads + const wasiImport = wasi.wasiImport + if (wasiImport) { + const proc_exit = wasiImport.proc_exit + wasiImport.proc_exit = function (code: number): number { + _this.terminateAllThreads() + return proc_exit.call(this, code) + } + } + const start = wasi.start + if (typeof start === 'function') { + wasi.start = function (instance: object): number { + try { + return start.call(this, instance) + } catch (err) { + if (isTrapError(err)) { + _this.terminateAllThreads() + } + throw err + } + } + } + patched.add(wasi) +} + +function getWasiSymbol (wasi: WASIInstance, description: string): symbol | undefined +function getWasiSymbol (wasi: WASIInstance, description: string[]): Array +function getWasiSymbol (wasi: WASIInstance, description: string | string[]): symbol | undefined | Array { + const symbols = Object.getOwnPropertySymbols(wasi) + const selectDescription = (description: string) => (s: symbol) => { + if (s.description) { + return s.description === description + } + return s.toString() === `Symbol(${description})` + } + if (Array.isArray(description)) { + return description.map(d => symbols.filter(selectDescription(d))[0]) + } + return symbols.filter(selectDescription(description))[0] +} + +function setupInstance (wasi: WASIInstance, instance: WebAssembly.Instance): void { + const [kInstance, kSetMemory] = getWasiSymbol(wasi, ['kInstance', 'kSetMemory']); + + (wasi as any)[kInstance!] = instance; + (wasi as any)[kSetMemory!](instance.exports.memory) +} diff --git a/packages/wasi-threads/src/worker.ts b/packages/wasi-threads/src/worker.ts new file mode 100644 index 00000000..b9d8e0c5 --- /dev/null +++ b/packages/wasi-threads/src/worker.ts @@ -0,0 +1,150 @@ +import { type LoadPayload, createMessage } from './command' +import type { WorkerMessageEvent } from './thread-manager' +import { getPostMessage, isTrapError, serizeErrorToBuffer } from './util' + +export interface OnStartData { + tid: number + arg: number + sab?: Int32Array +} + +/** @public */ +export interface ThreadMessageHandlerOptions { + onLoad?: (data: LoadPayload) => WebAssembly.WebAssemblyInstantiatedSource | PromiseLike + postMessage?: (message: any) => void +} + +/** @public */ +export class ThreadMessageHandler { + protected instance: WebAssembly.Instance | undefined + private messagesBeforeLoad: any[] + protected postMessage: (message: any) => void + protected onLoad?: (data: LoadPayload) => WebAssembly.WebAssemblyInstantiatedSource | PromiseLike + + public constructor (options?: ThreadMessageHandlerOptions) { + const postMsg = getPostMessage(options) + if (typeof postMsg !== 'function') { + throw new TypeError('options.postMessage is not a function') + } + this.postMessage = postMsg + this.onLoad = options?.onLoad + this.instance = undefined + // this.module = undefined + this.messagesBeforeLoad = [] + } + + /** @virtual */ + public instantiate (data: LoadPayload): WebAssembly.WebAssemblyInstantiatedSource | PromiseLike { + if (typeof this.onLoad === 'function') { + return this.onLoad(data) + } + throw new Error('ThreadMessageHandler.prototype.instantiate is not implemented') + } + + /** @virtual */ + public handle (e: WorkerMessageEvent): void { + if (e?.data?.__emnapi__) { + const type = e.data.__emnapi__.type + const payload = e.data.__emnapi__.payload + + if (type === 'load') { + this._load(payload) + } else if (type === 'start') { + this.handleAfterLoad(e, () => { + this._start(payload) + }) + } + } + } + + private _load (payload: LoadPayload): void { + if (this.instance !== undefined) return + let source: WebAssembly.WebAssemblyInstantiatedSource | PromiseLike + try { + source = this.instantiate(payload) + } catch (err) { + this._loaded(err, null, payload) + return + } + const then = source && 'then' in source ? source.then : undefined + if (typeof then === 'function') { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + then.call( + source, + (source) => { this._loaded(null, source, payload) }, + (err) => { this._loaded(err, null, payload) } + ) + } else { + this._loaded(null, source as WebAssembly.WebAssemblyInstantiatedSource, payload) + } + } + + private _start (payload: OnStartData): void { + if (typeof this.instance!.exports.wasi_thread_start !== 'function') { + const err = new TypeError('wasi_thread_start is not exported') + notifyPthreadCreateResult(payload.sab, 2, err) + throw err + } + const postMessage = this.postMessage! + const tid = payload.tid + const startArg = payload.arg + notifyPthreadCreateResult(payload.sab, 1) + try { + (this.instance!.exports.wasi_thread_start as Function)(tid, startArg) + } catch (err) { + if (isTrapError(err)) { + postMessage(createMessage('terminate-all-threads', {})) + } + throw err + } + postMessage(createMessage('cleanup-thread', { tid })) + } + + protected _loaded (err: Error | null, source: WebAssembly.WebAssemblyInstantiatedSource | null, payload: LoadPayload): void { + if (err) { + notifyPthreadCreateResult(payload.sab, 2, err) + throw err + } + + if (source == null) { + const err = new TypeError('onLoad should return an object') + notifyPthreadCreateResult(payload.sab, 2, err) + throw err + } + + const instance = source.instance + + if (!instance) { + const err = new TypeError('onLoad should return an object which includes "instance"') + notifyPthreadCreateResult(payload.sab, 2, err) + throw err + } + + this.instance = instance + + const postMessage = this.postMessage! + postMessage(createMessage('loaded', {})) + + const messages = this.messagesBeforeLoad + this.messagesBeforeLoad = [] + for (let i = 0; i < messages.length; i++) { + const data = messages[i] + this.handle({ data }) + } + } + + protected handleAfterLoad (e: E, f: (e: E) => void): void { + if (this.instance !== undefined) { + f.call(this, e) + } else { + this.messagesBeforeLoad.push(e.data) + } + } +} + +function notifyPthreadCreateResult (sab: Int32Array | undefined, result: number, error?: Error): void { + if (sab) { + serizeErrorToBuffer(sab.buffer as SharedArrayBuffer, result, error) + Atomics.notify(sab, 0) + } +} diff --git a/packages/wasi-threads/test/build.js b/packages/wasi-threads/test/build.js new file mode 100644 index 00000000..9623e2b1 --- /dev/null +++ b/packages/wasi-threads/test/build.js @@ -0,0 +1,48 @@ +const { join, resolve } = require('node:path') +const { spawnSync } = require('node:child_process') + +const ExecutionModel = { + Command: 'command', + Reactor: 'reactor' +} + +function build (model) { + const bin = resolve(process.env.WASI_SDK_PATH, 'bin', 'clang') + (process.platform === 'win32' ? '.exe' : '') + const args = [ + '-o', join(__dirname, model === ExecutionModel.Command ? 'main.wasm' : 'lib.wasm'), + '-mbulk-memory', + '-matomics', + `-mexec-model=${model}`, + ...(model === ExecutionModel.Command + ? [ + '-D__WASI_COMMAND__=1' + ] + : [ + '-Wl,--no-entry' + ] + ), + '--target=wasm32-wasi-threads', + // '-O3', + '-g', + '-pthread', + '-Wl,--import-memory', + '-Wl,--shared-memory', + '-Wl,--export-memory', + '-Wl,--export-dynamic', + '-Wl,--max-memory=2147483648', + '-Wl,--export=malloc,--export=free', + join(__dirname, 'main.c') + ] + const quote = s => s.includes(' ') ? `"${s}"` : s + console.log(`> ${quote(bin)} ${args.map(quote).join(' ')}`) + const { error } = spawnSync(bin, args, { + stdio: 'inherit', + env: process.env + }) + if (error) { + throw error + } +} + +build(ExecutionModel.Command) +build(ExecutionModel.Reactor) diff --git a/packages/wasi-threads/test/index.html b/packages/wasi-threads/test/index.html new file mode 100644 index 00000000..0a0fda57 --- /dev/null +++ b/packages/wasi-threads/test/index.html @@ -0,0 +1,17 @@ + + + + + + test + + + + + + + + diff --git a/packages/wasi-threads/test/index.js b/packages/wasi-threads/test/index.js new file mode 100644 index 00000000..511a04f4 --- /dev/null +++ b/packages/wasi-threads/test/index.js @@ -0,0 +1,113 @@ +(function (main) { + const ENVIRONMENT_IS_NODE = + typeof process === 'object' && process !== null && + typeof process.versions === 'object' && process.versions !== null && + typeof process.versions.node === 'string' + + if (ENVIRONMENT_IS_NODE) { + const _require = function (request) { + if (request === '@emnapi/wasi-threads') return require('..') + return require(request) + } + main(_require, process, __dirname) + } else { + if (typeof importScripts === 'function') { + // eslint-disable-next-line no-undef + importScripts('../../../node_modules/@tybys/wasm-util/dist/wasm-util.min.js') + // eslint-disable-next-line no-undef + importScripts('../dist/wasi-threads.js') + // eslint-disable-next-line no-undef + importScripts('./proxy.js') + } + + const nodeWasi = { WASI: globalThis.wasmUtil.WASI } + const nodePath = { + join: function () { + return Array.prototype.join.call(arguments, '/') + } + } + const nodeWorkerThreads = { + Worker: globalThis.proxyWorker.Worker + } + const _require = function (request) { + if (request === '@emnapi/wasi-threads') return globalThis.wasiThreads + if (request === 'node:worker_threads' || request === 'worker_threads') return nodeWorkerThreads + if (request === 'node:wasi' || request === 'wasi') return nodeWasi + if (request === 'node:path' || request === 'path') return nodePath + throw new Error('Can not find module: ' + request) + } + const _process = { + env: {}, + exit: () => {} + } + main(_require, _process, '.') + } +})(async function (require, process, __dirname) { + const { WASI } = require('node:wasi') + const { WASIThreads } = require('@emnapi/wasi-threads') + const { Worker } = require('node:worker_threads') + const { join } = require('node:path') + + async function run (file) { + const wasi = new WASI({ + version: 'preview1', + args: [file, 'node'], + env: process.env + }) + const wasiThreads = new WASIThreads({ + wasi, + onCreateWorker: ({ name }) => { + return new Worker(join(__dirname, 'worker.js'), { + name, + workerData: { + name + }, + env: process.env, + execArgv: ['--experimental-wasi-unstable-preview1'] + }) + }, + // optional + waitThreadStart: 1000 + }) + const memory = new WebAssembly.Memory({ + initial: 16777216 / 65536, + maximum: 2147483648 / 65536, + shared: true + }) + let input + try { + input = require('node:fs').readFileSync(require('node:path').join(__dirname, file)) + } catch (err) { + console.warn(err) + const response = await fetch(file) + input = await response.arrayBuffer() + } + let { module, instance } = await WebAssembly.instantiate(input, { + env: { + memory, + print_string: function (ptr) { + const HEAPU8 = new Uint8Array(memory.buffer) + let len = 0 + while (HEAPU8[ptr + len] !== 0) len++ + const string = new TextDecoder().decode(HEAPU8.slice(ptr, ptr + len)) + console.log(string) + } + }, + ...wasi.getImportObject(), + ...wasiThreads.getImportObject() + }) + + if (typeof instance.exports._start === 'function') { + const { exitCode } = wasiThreads.start(instance, module, memory) + return exitCode + } else { + instance = wasiThreads.initialize(instance, module, memory) + return instance.exports.fn(1) + } + } + + console.log('-------- command --------') + await run('main.wasm') + console.log('-------- reactor --------') + await run('lib.wasm') +}) diff --git a/packages/wasi-threads/test/main.c b/packages/wasi-threads/test/main.c new file mode 100644 index 00000000..5f946f31 --- /dev/null +++ b/packages/wasi-threads/test/main.c @@ -0,0 +1,55 @@ +#include +#include +#include +#include + +// #ifdef __wasm__ +// __attribute__((import_module("env"), import_name("print_string"))) +// void print_string(const char *str); +// #else +#define print_string(str) printf("%s\n", (str)) +// #endif + +void *print_message_function(void *ptr) { + char *message; + message = (char *)ptr; + print_string(message); + return NULL; +} + +__attribute__((visibility("default"))) +int fn(int join) { + pthread_t thread1, thread2; + const char *message1 = "Thread 1"; + const char *message2 = "Thread 2"; + int iret1, iret2; + + iret1 = pthread_create(&thread1, NULL, print_message_function, + (void *)message1); + iret2 = pthread_create(&thread2, NULL, print_message_function, + (void *)message2); + + if (join) { + printf("pthread_join()\n"); + pthread_join(thread1, NULL); + pthread_join(thread2, NULL); + } + + printf("Thread 1 returns: %d\n", iret1); + printf("Thread 2 returns: %d\n", iret2); + return 0; +} + +#ifdef __WASI_COMMAND__ +int main(int argc, char **argv) { + printf("argc: %d \n", argc); + for (int i = 0; i < argc; ++i) { + printf("argv[%d]: %s \n", i, *(argv + i)); + } + if (argc > 1) { + return strcmp(*(argv + 1), "web") == 0 ? fn(0) : fn(1); + } else { + return fn(1); + } +} +#endif diff --git a/packages/wasi-threads/test/proxy.js b/packages/wasi-threads/test/proxy.js new file mode 100644 index 00000000..e4377d03 --- /dev/null +++ b/packages/wasi-threads/test/proxy.js @@ -0,0 +1,93 @@ +(function (exports) { + function addProxyListener (worker) { + const map = new Map() + worker.onmessage = (e) => { + const { type, payload } = e.data + if (type === 'new') { + const { id, url, options } = payload + const w = new globalThis.Worker(url, options) + map.set(id, w) + w.onmessage = (e) => { + worker.postMessage({ type: 'onmessage', payload: { id, data: e.data } }) + } + w.onmessageerror = (e) => { + worker.postMessage({ type: 'onmessageerror', payload: { id, data: e.data } }) + } + w.onerror = (e) => { + worker.postMessage({ + type: 'onerror', + payload: { + id, + data: { + message: e.message, + filename: e.filename, + lineno: e.lineno, + colno: e.colno, + error: e.error + } + } + }) + } + } else if (type === 'postMessage') { + const { id, args } = payload + const w = map.get(id) + w.postMessage.apply(w, args) + } else if (type === 'terminate') { + const { id } = payload + map.get(id).terminate() + map.delete(id) + } + } + } + + class Worker { + constructor (url, options) { + if (typeof window !== 'undefined') { + throw new Error('Can not use ProxyWorker in browser main thread') + } + this.id = String(Math.random()) + globalThis.addEventListener('message', ({ data }) => { + if (data.payload.id === this.id) { + if (data.type === 'onmessage' || data.type === 'onmessageerror') { + this[data.type]?.({ data: data.payload.data }) + } + if (data.type === 'error') { + this.onerror?.(data.payload.data) + } + } + }) + postMessage({ + type: 'new', + payload: { + id: this.id, + url, + options + } + }) + } + + postMessage () { + postMessage({ + type: 'postMessage', + payload: { + id: this.id, + args: Array.prototype.slice.call(arguments) + } + }) + } + + terminate () { + postMessage({ + type: 'terminate', + payload: { + id: this.id + } + }) + } + } + + exports.proxyWorker = { + Worker, + addProxyListener + } +})(globalThis) diff --git a/packages/wasi-threads/test/worker.js b/packages/wasi-threads/test/worker.js new file mode 100644 index 00000000..f69e7f42 --- /dev/null +++ b/packages/wasi-threads/test/worker.js @@ -0,0 +1,103 @@ +/* eslint-disable no-eval */ + +(function (main) { + const ENVIRONMENT_IS_NODE = + typeof process === 'object' && process !== null && + typeof process.versions === 'object' && process.versions !== null && + typeof process.versions.node === 'string' + + if (ENVIRONMENT_IS_NODE) { + const _require = function (request) { + if (request === '@emnapi/wasi-threads') return require('..') + return require(request) + } + + const _init = function () { + const nodeWorkerThreads = require('node:worker_threads') + const parentPort = nodeWorkerThreads.parentPort + + parentPort.on('message', (data) => { + globalThis.onmessage({ data }) + }) + + Object.assign(globalThis, { + self: globalThis, + require, + Worker: nodeWorkerThreads.Worker, + importScripts: function (f) { + (0, eval)(require('node:fs').readFileSync(f, 'utf8') + '//# sourceURL=' + f) + }, + postMessage: function (msg) { + parentPort.postMessage(msg) + } + }) + } + + main(_require, _init) + } else { + // eslint-disable-next-line no-undef + importScripts('../../../node_modules/@tybys/wasm-util/dist/wasm-util.min.js') + // eslint-disable-next-line no-undef + importScripts('../dist/wasi-threads.js') + + const nodeWasi = { WASI: globalThis.wasmUtil.WASI } + const nodeWorkerThreads = { + workerData: { + name: globalThis.name + } + } + const _require = function (request) { + if (request === '@emnapi/wasi-threads') return globalThis.wasiThreads + if (request === 'node:worker_threads' || request === 'worker_threads') return nodeWorkerThreads + if (request === 'node:wasi' || request === 'wasi') return nodeWasi + throw new Error('Can not find module: ' + request) + } + const _init = function () {} + main(_require, _init) + } +})(function main (require, init) { + init() + + const { WASI } = require('node:wasi') + const { workerData } = require('node:worker_threads') + const { ThreadMessageHandler, WASIThreads } = require('@emnapi/wasi-threads') + + console.log(`name: ${workerData.name}`) + + const handler = new ThreadMessageHandler({ + async onLoad ({ wasmModule, wasmMemory }) { + const wasi = new WASI({ + version: 'preview1' + }) + + const wasiThreads = new WASIThreads({ + wasi, + childThread: true + }) + + const originalInstance = await WebAssembly.instantiate(wasmModule, { + env: { + memory: wasmMemory, + print_string: function (ptr) { + const HEAPU8 = new Uint8Array(wasmMemory.buffer) + let len = 0 + while (HEAPU8[ptr + len] !== 0) len++ + const string = new TextDecoder().decode(HEAPU8.slice(ptr, ptr + len)) + console.log(string) + } + }, + ...wasi.getImportObject(), + ...wasiThreads.getImportObject() + }) + + const instance = wasiThreads.initialize(originalInstance, wasmModule, wasmMemory) + + return { module: wasmModule, instance } + } + }) + + globalThis.onmessage = function (e) { + handler.handle(e) + // handle other messages + } +}) diff --git a/packages/wasi-threads/tsconfig.json b/packages/wasi-threads/tsconfig.json new file mode 100644 index 00000000..577085da --- /dev/null +++ b/packages/wasi-threads/tsconfig.json @@ -0,0 +1,27 @@ +{ + "extends": "../shared/tsconfig.base.json", + "compilerOptions": { + "allowJs": true, + "target": "ES5", + "module": "ESNext", + "moduleResolution": "Bundler", + "noEmitHelpers": true, + "importHelpers": true, + "outDir": "lib", + "paths": { + "tslib" : ["../../node_modules/tslib/tslib.d.ts"], + "@/*": ["./src/*"], + }, + "lib": [ + "ES5", + "ES2015", + "ES2020.BigInt", + "ES2021.WeakRef", + "ES2017.SharedMemory", + "DOM" + ] + }, + "include": [ + "./src/**/*" + ] +}