Skip to content

Commit

Permalink
feat: Improve process queue
Browse files Browse the repository at this point in the history
BREAKING CHANGE: return value of handler is added to result set if the value defined.

- fix TS types
- add support for async generators
- return value of handlers are added to results if defined (fixes #735)
  • Loading branch information
tripodsan authored Nov 23, 2022
1 parent 9d968b2 commit 350515e
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 59 deletions.
19 changes: 18 additions & 1 deletion packages/helix-shared-process-queue/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,30 @@

## Usage

Process tasks concurrently:
```js
const processQueue = require('@adobe/helix-shared-process-queue');

const tasks = [1, 2, 3];

await processQueue(tasks, async (task) => {
const result = await processQueue(tasks, async (task) => {
console.log(task);
return someValue;
});
```

Access results during task
```js
const processQueue = require('@adobe/helix-shared-process-queue');

const tasks = [1, 2, 3];

const result = await processQueue(tasks, async (task, queue, results) => {
if (somecondition(results)) {
// returning undefined does not add the return value to results
return;
}
return someValue;
});
```

23 changes: 16 additions & 7 deletions packages/helix-shared-process-queue/src/process-queue.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,36 @@
/**
* Queue entry.
*/
export declare interface QueueEntry {}
export declare type QueueEntry = any;

/**
* Queue type
*/
export declare type Queue = AsyncGenerator<QueueEntry>|Iterable<QueueEntry>|Array<QueueEntry>;

/**
* A (asynchronous) handler function that is invoked for every queue entry.
* Values added to the `results` array will be returned by `processQueue` function.
* The handler can modify the `queue` if needed.
* If the return value is not undefined, it is added to the `results` array.
*
* @param {QueueEntry} entry The queue entry.
* @param {QueueEntry[]} queue the queue.
* @param {Queue} queue the queue.
* @param {[]} results the process queue results
* @return {*} result or undefined.
*/
export declare interface ProcessQueueHandler {
(entry: QueueEntry, queue:[QueueEntry], results:[any]): void;
(entry: QueueEntry, queue:Queue, results:Array<any>): Promise<any>;
}

/**
* Processes the given queue concurrently.
* Processes the given queue concurrently. If the `queue` is an array it will remove the
* entries during processing. It returns the `results` array which is either populated by the
* queue handler function directly or with the return values of the handler functions.
*
* @param {QueueEntry[]} queue A list of entries to be processed
* @param {Queue} queue A list of entries to be processed
* @param {ProcessQueueHandler} fn A handler function
* @param {number} [maxConcurrent = 8] Concurrency level
* @returns {[]} the results
* @returns {Promise<[]>} the results
*/
export declare function processQueue(queue:[QueueEntry], fn:ProcessQueueHandler, maxConcurrent?:number): [any];
export default function processQueue(queue:Queue, fn:ProcessQueueHandler, maxConcurrent?:number): Promise<Array<any>>;
59 changes: 33 additions & 26 deletions packages/helix-shared-process-queue/src/process-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,25 @@
* governing permissions and limitations under the License.
*/

/**
* Simple dequeing iterator.
* @param queue
* @returns {Generator<*, void, *>}
*/
function* dequeue(queue) {
while (queue.length) {
yield queue.shift();
}
}

/**
* Processes the given queue concurrently. The handler functions can add more items to the queue
* if needed.
*
* @param {Iterable|Array} queue A list of tasks
* @param {ProcessQueueHandler} fn A handler function `fn(task:any, queue:array, results:array)`
* @param {number} [maxConcurrent = 8] Concurrency level
* @returns the results
* @returns {Promise<[]>} the results
*/
async function processQueue(queue, fn, maxConcurrent = 8) {
if (typeof queue !== 'object') {
Expand All @@ -29,44 +40,40 @@ async function processQueue(queue, fn, maxConcurrent = 8) {

const handler = (entry) => {
const task = fn(entry, queue, results);
if (task && task.then) {
if (task?.then) {
running.push(task);
task
.then((r) => {
if (r !== undefined) {
results.push(r);
}
})
.catch(() => {})
.finally(() => {
running.splice(running.indexOf(task), 1);
});
} else if (task !== undefined) {
results.push(task);
}
};

// when using array, dequeue the entries
if (Array.isArray(queue)) {
while (queue.length || running.length) {
if (running.length < maxConcurrent && queue.length) {
handler(queue.shift());
} else {
// eslint-disable-next-line no-await-in-loop
await Promise.race(running);
}
}
return results;
const iter = Array.isArray(queue)
? dequeue(queue)
: queue;
if (!iter || !('next' in iter)) {
throw Error('invalid queue argument: iterable expected');
}

if ('next' in queue) {
let next = queue.next();
while (!next.done || running.length) {
if (running.length < maxConcurrent && !next.done) {
handler(next.value);
next = queue.next();
} else {
// eslint-disable-next-line no-await-in-loop
await Promise.race(running);
}
for await (const value of iter) {
while (running.length >= maxConcurrent) {
// eslint-disable-next-line no-await-in-loop
await Promise.race(running);
}
return results;
handler(value);
}

throw Error('invalid queue argument: iterable expected');
// wait until remaining tasks have completed
await Promise.all(running);
return results;
}

module.exports = processQueue;
82 changes: 57 additions & 25 deletions packages/helix-shared-process-queue/test/process-queue.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,30 @@ const processQueue = require('../src/process-queue.js');

const nop = () => {};

function* fibonacci() {
let a = 1;
let b = 1;
while (a < 50) {
const c = a + b;
a = b;
b = c;
yield {
time: 10,
number: c,
};
}
}

async function* sleepOver(nights) {
for (let i = 0; i < nights; i += 1) {
// eslint-disable-next-line no-await-in-loop
await new Promise((resolve) => {
setTimeout(resolve, 10);
});
yield i;
}
}

describe('Process Queue', () => {
let concurrency = 0;
let maxConcurrency = 0;
Expand All @@ -40,12 +64,24 @@ describe('Process Queue', () => {

it('works with empty queue', async () => {
const result = await processQueue([], nop);
assert.deepEqual(result, []);
assert.deepStrictEqual(result, []);
});

it('works with non async function', async () => {
const result = await processQueue([5], (n, queue, results) => results.push(n * n));
assert.deepEqual(result, [25]);
const result = await processQueue([5], (n, queue, results) => {
results.push(n * n);
});
assert.deepStrictEqual(result, [25]);
});

it('return values are added as results', async () => {
const result = await processQueue([1, 2, 3, 4], (n) => n * n);
assert.deepStrictEqual(result, [1, 4, 9, 16]);
});

it('return values are added as results (async)', async () => {
const result = await processQueue([1, 2, 3, 4], async (n) => n * n);
assert.deepStrictEqual(result, [1, 4, 9, 16]);
});

it('processes queue', async () => {
Expand All @@ -56,7 +92,7 @@ describe('Process Queue', () => {
time: 50,
number: 3,
}], testFunction);
assert.deepEqual(result, [9, 16]);
assert.deepStrictEqual(result, [9, 16]);
});

it('processes queue can add more items to the queue', async () => {
Expand All @@ -67,7 +103,7 @@ describe('Process Queue', () => {
results.push(0);
}
});
assert.deepEqual(result, [0, 0, 0, 0]);
assert.deepStrictEqual(result, [0, 0, 0, 0]);
});

it('respects concurrency', async () => {
Expand All @@ -81,8 +117,8 @@ describe('Process Queue', () => {
expected.push(i * i);
}
const result = await processQueue(tasks, testFunction);
assert.deepEqual(result.sort((a, b) => a - b), expected);
assert.equal(maxConcurrency, 8);
assert.deepStrictEqual(result.sort((a, b) => a - b), expected);
assert.strictEqual(maxConcurrency, 8);
});

it('aborts queue on early error', async () => {
Expand Down Expand Up @@ -115,27 +151,23 @@ describe('Process Queue', () => {
}

const result = await processQueue(counter(), testFunction);
assert.deepEqual(result, [0, 1, 4, 9, 16]);
assert.deepEqual(maxConcurrency, 5);
assert.deepStrictEqual(result, [0, 1, 4, 9, 16]);
assert.deepStrictEqual(maxConcurrency, 5);
});

it('iterators respect concurrency', async () => {
function* fibonacci() {
let a = 1;
let b = 1;
while (a < 50) {
const c = a + b;
a = b;
b = c;
yield {
time: 10,
number: c,
};
}
}

const result = await processQueue(fibonacci(), testFunction, 4);
assert.deepEqual(result, [4, 9, 25, 64, 169, 441, 1156, 3025, 7921]);
assert.deepEqual(maxConcurrency, 4);
assert.deepStrictEqual(result, [4, 9, 25, 64, 169, 441, 1156, 3025, 7921]);
assert.deepStrictEqual(maxConcurrency, 4);
});

it('iterators add return value to result', async () => {
const result = await processQueue(fibonacci(), ({ number }) => number);
assert.deepStrictEqual(result, [2, 3, 5, 8, 13, 21, 34, 55, 89]);
});

it('async iterators add return value to results', async () => {
const result = await processQueue(await sleepOver(5), (number) => number);
assert.deepStrictEqual(result, [0, 1, 2, 3, 4]);
});
});

0 comments on commit 350515e

Please sign in to comment.