diff --git a/.gitignore b/.gitignore index e5f99134..c8d9301b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ .nyc_output .vscode +.idea node_modules package-lock.json dist diff --git a/src/index.ts b/src/index.ts index 16ab8662..7e3ce673 100644 --- a/src/index.ts +++ b/src/index.ts @@ -699,7 +699,8 @@ class ThreadPool { task : any, transferList : TransferList, filename : string | null, - abortSignal : AbortSignalAny | null) : Promise { + abortSignal : AbortSignalAny | null, + workerInfo : WorkerInfo | null) : Promise { if (filename === null) { filename = this.options.filename; } @@ -753,7 +754,7 @@ class ThreadPool { // If there is a task queue, there's no point in looking for an available // Worker thread. Add this task to the queue, if possible. - if (this.taskQueue.size > 0) { + if (!workerInfo && this.taskQueue.size > 0) { const totalCapacity = this.options.maxQueue + this.pendingCapacity(); if (this.taskQueue.size >= totalCapacity) { if (this.options.maxQueue === 0) { @@ -771,33 +772,35 @@ class ThreadPool { return ret; } - // Look for a Worker with a minimum number of tasks it is currently running. - let workerInfo : WorkerInfo | null = this.workers.findAvailable(); - - // If we want the ability to abort this task, use only workers that have - // no running tasks. - if (workerInfo !== null && workerInfo.currentUsage() > 0 && abortSignal) { - workerInfo = null; - } + if (!workerInfo) { + // Look for a Worker with a minimum number of tasks it is currently running. + workerInfo = this.workers.findAvailable(); - // If no Worker was found, or that Worker was handling another task in some - // way, and we still have the ability to spawn new threads, do so. - let waitingForNewWorker = false; - if ((workerInfo === null || workerInfo.currentUsage() > 0) && - this.workers.size < this.options.maxThreads) { - this._addNewWorker(); - waitingForNewWorker = true; - } + // If we want the ability to abort this task, use only workers that have + // no running tasks. + if (workerInfo !== null && workerInfo.currentUsage() > 0 && abortSignal) { + workerInfo = null; + } - // If no Worker is found, try to put the task into the queue. - if (workerInfo === null) { - if (this.options.maxQueue <= 0 && !waitingForNewWorker) { - return Promise.reject(Errors.NoTaskQueueAvailable()); - } else { - this.taskQueue.push(taskInfo); + // If no Worker was found, or that Worker was handling another task in some + // way, and we still have the ability to spawn new threads, do so. + let waitingForNewWorker = false; + if ((workerInfo === null || workerInfo.currentUsage() > 0) && + this.workers.size < this.options.maxThreads) { + this._addNewWorker(); + waitingForNewWorker = true; } - return ret; + // If no Worker is found, try to put the task into the queue. + if (workerInfo === null) { + if (this.options.maxQueue <= 0 && !waitingForNewWorker) { + return Promise.reject(Errors.NoTaskQueueAvailable()); + } else { + this.taskQueue.push(taskInfo); + } + + return ret; + } } // TODO(addaleax): Clean up the waitTime/runTime recording. @@ -901,12 +904,12 @@ class Piscina extends EventEmitterAsyncResource { this.#pool = new ThreadPool(this, options); } - runTask (task : any, transferList? : TransferList, filename? : string, abortSignal? : AbortSignalAny) : Promise; - runTask (task : any, transferList? : TransferList, filename? : AbortSignalAny, abortSignal? : undefined) : Promise; - runTask (task : any, transferList? : string, filename? : AbortSignalAny, abortSignal? : undefined) : Promise; - runTask (task : any, transferList? : AbortSignalAny, filename? : undefined, abortSignal? : undefined) : Promise; + runTask (task : any, transferList? : TransferList, filename? : string, abortSignal? : AbortSignalAny, workerInfo? : WorkerInfo) : Promise; + runTask (task : any, transferList? : TransferList, filename? : AbortSignalAny, abortSignal? : undefined, workerInfo? : WorkerInfo) : Promise; + runTask (task : any, transferList? : string, filename? : AbortSignalAny, abortSignal? : undefined, workerInfo? : WorkerInfo) : Promise; + runTask (task : any, transferList? : AbortSignalAny, filename? : undefined, abortSignal? : undefined, workerInfo? : WorkerInfo) : Promise; - runTask (task : any, transferList? : any, filename? : any, abortSignal? : any) { + runTask (task : any, transferList? : any, filename? : any, abortSignal? : any, workerInfo? : WorkerInfo) { // If transferList is a string or AbortSignal, shift it. if ((typeof transferList === 'object' && !Array.isArray(transferList)) || typeof transferList === 'string') { @@ -933,7 +936,22 @@ class Piscina extends EventEmitterAsyncResource { new TypeError('abortSignal argument must be an object')); } return this.#pool.runTask( - task, transferList, filename || null, abortSignal || null); + task, transferList, filename || null, abortSignal || null, workerInfo || null); + } + + broadcastTask (task : any, transferList? : TransferList, filename? : string, abortSignal? : AbortSignalAny) : Promise; + broadcastTask (task : any, transferList? : TransferList, filename? : AbortSignalAny, abortSignal? : undefined) : Promise; + broadcastTask (task : any, transferList? : string, filename? : AbortSignalAny, abortSignal? : undefined) : Promise; + broadcastTask (task : any, transferList? : AbortSignalAny, filename? : undefined, abortSignal? : undefined) : Promise; + + broadcastTask (task : any, transferList? : any, filename? : any, abortSignal? : any) { + const promises = []; + + for (const workerInfo of this.#pool.workers) { + promises.push(this.runTask(task, transferList, filename, abortSignal, workerInfo)); + } + + return Promise.all(promises); } destroy () { diff --git a/test/simple-test.ts b/test/simple-test.ts index 99314e48..1c70a910 100644 --- a/test/simple-test.ts +++ b/test/simple-test.ts @@ -58,6 +58,16 @@ test('async eval() handler works', async ({ is }) => { is(result, 42); }); +test('broadcasting works', async ({ same }) => { + const worker = new Piscina({ + minThreads: 4, + maxThreads: 4, + filename: resolve(__dirname, 'fixtures/eval.js') + }); + const result = await worker.broadcastTask('Promise.resolve(42)'); + same(result, [42, 42, 42, 42]); +}); + test('filename can be provided while posting', async ({ is }) => { const worker = new Piscina(); const result = await worker.runTask(