Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broadcast to workers #113

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.nyc_output
.vscode
.idea
node_modules
package-lock.json
dist
Expand Down
80 changes: 49 additions & 31 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,8 @@ class ThreadPool {
task : any,
transferList : TransferList,
filename : string | null,
abortSignal : AbortSignalAny | null) : Promise<any> {
abortSignal : AbortSignalAny | null,
workerInfo : WorkerInfo | null) : Promise<any> {
if (filename === null) {
filename = this.options.filename;
}
Expand Down Expand Up @@ -753,7 +754,7 @@ class ThreadPool {

// If there is a task queue, there's no point in looking for an available
// Worker thread. Add this task to the queue, if possible.
if (this.taskQueue.size > 0) {
if (!workerInfo && this.taskQueue.size > 0) {
const totalCapacity = this.options.maxQueue + this.pendingCapacity();
if (this.taskQueue.size >= totalCapacity) {
if (this.options.maxQueue === 0) {
Expand All @@ -771,33 +772,35 @@ class ThreadPool {
return ret;
}

// Look for a Worker with a minimum number of tasks it is currently running.
let workerInfo : WorkerInfo | null = this.workers.findAvailable();

// If we want the ability to abort this task, use only workers that have
// no running tasks.
if (workerInfo !== null && workerInfo.currentUsage() > 0 && abortSignal) {
workerInfo = null;
}
if (!workerInfo) {
// Look for a Worker with a minimum number of tasks it is currently running.
workerInfo = this.workers.findAvailable();

// If no Worker was found, or that Worker was handling another task in some
// way, and we still have the ability to spawn new threads, do so.
let waitingForNewWorker = false;
if ((workerInfo === null || workerInfo.currentUsage() > 0) &&
this.workers.size < this.options.maxThreads) {
this._addNewWorker();
waitingForNewWorker = true;
}
// If we want the ability to abort this task, use only workers that have
// no running tasks.
if (workerInfo !== null && workerInfo.currentUsage() > 0 && abortSignal) {
workerInfo = null;
}

// If no Worker is found, try to put the task into the queue.
if (workerInfo === null) {
if (this.options.maxQueue <= 0 && !waitingForNewWorker) {
return Promise.reject(Errors.NoTaskQueueAvailable());
} else {
this.taskQueue.push(taskInfo);
// If no Worker was found, or that Worker was handling another task in some
// way, and we still have the ability to spawn new threads, do so.
let waitingForNewWorker = false;
if ((workerInfo === null || workerInfo.currentUsage() > 0) &&
this.workers.size < this.options.maxThreads) {
this._addNewWorker();
waitingForNewWorker = true;
}

return ret;
// If no Worker is found, try to put the task into the queue.
if (workerInfo === null) {
if (this.options.maxQueue <= 0 && !waitingForNewWorker) {
return Promise.reject(Errors.NoTaskQueueAvailable());
} else {
this.taskQueue.push(taskInfo);
}

return ret;
}
}

// TODO(addaleax): Clean up the waitTime/runTime recording.
Expand Down Expand Up @@ -901,12 +904,12 @@ class Piscina extends EventEmitterAsyncResource {
this.#pool = new ThreadPool(this, options);
}

runTask (task : any, transferList? : TransferList, filename? : string, abortSignal? : AbortSignalAny) : Promise<any>;
runTask (task : any, transferList? : TransferList, filename? : AbortSignalAny, abortSignal? : undefined) : Promise<any>;
runTask (task : any, transferList? : string, filename? : AbortSignalAny, abortSignal? : undefined) : Promise<any>;
runTask (task : any, transferList? : AbortSignalAny, filename? : undefined, abortSignal? : undefined) : Promise<any>;
runTask (task : any, transferList? : TransferList, filename? : string, abortSignal? : AbortSignalAny, workerInfo? : WorkerInfo) : Promise<any>;
runTask (task : any, transferList? : TransferList, filename? : AbortSignalAny, abortSignal? : undefined, workerInfo? : WorkerInfo) : Promise<any>;
runTask (task : any, transferList? : string, filename? : AbortSignalAny, abortSignal? : undefined, workerInfo? : WorkerInfo) : Promise<any>;
runTask (task : any, transferList? : AbortSignalAny, filename? : undefined, abortSignal? : undefined, workerInfo? : WorkerInfo) : Promise<any>;

runTask (task : any, transferList? : any, filename? : any, abortSignal? : any) {
runTask (task : any, transferList? : any, filename? : any, abortSignal? : any, workerInfo? : WorkerInfo) {
// If transferList is a string or AbortSignal, shift it.
if ((typeof transferList === 'object' && !Array.isArray(transferList)) ||
typeof transferList === 'string') {
Expand All @@ -933,7 +936,22 @@ class Piscina extends EventEmitterAsyncResource {
new TypeError('abortSignal argument must be an object'));
}
return this.#pool.runTask(
task, transferList, filename || null, abortSignal || null);
task, transferList, filename || null, abortSignal || null, workerInfo || null);
}

broadcastTask (task : any, transferList? : TransferList, filename? : string, abortSignal? : AbortSignalAny) : Promise<any[]>;
broadcastTask (task : any, transferList? : TransferList, filename? : AbortSignalAny, abortSignal? : undefined) : Promise<any[]>;
broadcastTask (task : any, transferList? : string, filename? : AbortSignalAny, abortSignal? : undefined) : Promise<any[]>;
broadcastTask (task : any, transferList? : AbortSignalAny, filename? : undefined, abortSignal? : undefined) : Promise<any[]>;

broadcastTask (task : any, transferList? : any, filename? : any, abortSignal? : any) {
const promises = [];

for (const workerInfo of this.#pool.workers) {
promises.push(this.runTask(task, transferList, filename, abortSignal, workerInfo));
}

return Promise.all(promises);
}

destroy () {
Expand Down
10 changes: 10 additions & 0 deletions test/simple-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ test('async eval() handler works', async ({ is }) => {
is(result, 42);
});

test('broadcasting works', async ({ same }) => {
const worker = new Piscina({
minThreads: 4,
maxThreads: 4,
filename: resolve(__dirname, 'fixtures/eval.js')
});
const result = await worker.broadcastTask('Promise.resolve(42)');
same(result, [42, 42, 42, 42]);
});

test('filename can be provided while posting', async ({ is }) => {
const worker = new Piscina();
const result = await worker.runTask(
Expand Down