Skip to content

Commit

Permalink
Update API for async functions
Browse files Browse the repository at this point in the history
  • Loading branch information
elliot-nelson committed May 1, 2021
1 parent 049dc7d commit 6e54d2a
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 37 deletions.
2 changes: 1 addition & 1 deletion apps/heft/src/utilities/Async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export class Async {
fn: (entry: TEntry) => Promise<void>
): Promise<void> {
// Defer to the implementation in node-core-library
return CoreAsync.forEachLimitAsync(array, parallelismLimit, fn);
return CoreAsync.forEachAsync(array, fn, { concurrency: parallelismLimit });
}

public static runWatcherWithErrorHandling(fn: () => Promise<void>, scopedLogger: ScopedLogger): void {
Expand Down
11 changes: 8 additions & 3 deletions common/reviews/api/node-core-library.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ export class AnsiEscape {
static removeCodes(text: string): string;
}

// @public
// @beta
export class Async {
static forEachLimitAsync<TEntry>(array: TEntry[], parallelismLimit: number, fn: ((entry: TEntry) => Promise<void>) | ((entry: TEntry, index: number) => Promise<void>)): Promise<void>;
static mapLimitAsync<TEntry, TRetVal>(array: TEntry[], parallelismLimit: number, fn: ((entry: TEntry) => Promise<TRetVal>) | ((entry: TEntry, index: number) => Promise<TRetVal>)): Promise<TRetVal[]>;
static forEachAsync<TEntry>(array: TEntry[], fn: (entry: TEntry, index: number) => Promise<void>, options?: IAsyncParallelismOptions | undefined): Promise<void>;
static mapAsync<TEntry, TRetVal>(array: TEntry[], fn: (entry: TEntry, index: number) => Promise<TRetVal>, options?: IAsyncParallelismOptions | undefined): Promise<TRetVal[]>;
static sleep(ms: number): Promise<void>;
}

Expand Down Expand Up @@ -267,6 +267,11 @@ export interface IAnsiEscapeConvertForTestsOptions {
encodeNewlines?: boolean;
}

// @beta
export interface IAsyncParallelismOptions {
concurrency?: number;
}

// @beta (undocumented)
export interface IColorableSequence {
// (undocumented)
Expand Down
69 changes: 44 additions & 25 deletions libraries/node-core-library/src/Async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,46 +2,61 @@
// See LICENSE in the project root for license information.

/**
* Utilities for parallel asynchronous operations, to augment built-in Promise capability.
* @public
* Options for controlling the parallelism of asynchronous operations.
* @beta
*/
export interface IAsyncParallelismOptions {
/**
* If provided, asynchronous operations like `mapAsync` and `forEachAsync` will limit the
* number of concurrent operations to the specified number.
*/
concurrency?: number;
}

/**
* Utilities for parallel asynchronous operations, to augment built-in Promises capability.
* @beta
*/
export class Async {
/**
* Take an input array and map it through an asynchronous function, with a maximum number
* of parallel operations provided by the `parallelismLimit` parameter.
* Given an input array and an asynchronous callback function, execute the callback
* function for every element in the array and return a promise for an array containing
* the results.
*
* Behaves like an asynchronous version of built-in `Array#map`.
*/
public static async mapLimitAsync<TEntry, TRetVal>(
public static async mapAsync<TEntry, TRetVal>(
array: TEntry[],
parallelismLimit: number,
fn: ((entry: TEntry) => Promise<TRetVal>) | ((entry: TEntry, index: number) => Promise<TRetVal>)
fn: (entry: TEntry, index: number) => Promise<TRetVal>,
options?: IAsyncParallelismOptions | undefined
): Promise<TRetVal[]> {
const result: TRetVal[] = [];

await Async.forEachLimitAsync(
await Async.forEachAsync(
array,
parallelismLimit,
async (item: TEntry, index: number): Promise<void> => {
result[index] = await fn(item, index);
}
},
options
);

return result;
}

/**
* Take an input array and loop through it, calling an asynchronous function, with a maximum number
* of parallel operations provided by the `parallelismLimit` parameter.
* Given an input array and an asynchronous callback function, execute the callback
* function for every element in the array and return a void promise.
*
* Behaves like an asynchronous version of built-in `Array#forEach`.
*/
public static async forEachLimitAsync<TEntry>(
public static async forEachAsync<TEntry>(
array: TEntry[],
parallelismLimit: number,
fn: ((entry: TEntry) => Promise<void>) | ((entry: TEntry, index: number) => Promise<void>)
fn: (entry: TEntry, index: number) => Promise<void>,
options?: IAsyncParallelismOptions | undefined
): Promise<void> {
return new Promise((resolve: () => void, reject: (error: Error) => void) => {
if (parallelismLimit < 1) {
throw new Error('parallelismLimit must be at least 1');
}

await new Promise((resolve: () => void, reject: (error: Error) => void) => {
const concurrency: number =
options?.concurrency && options.concurrency > 0 ? options.concurrency : Infinity;
let operationsInProgress: number = 1;
let index: number = 0;

Expand All @@ -51,12 +66,16 @@ export class Async {
resolve();
}

while (operationsInProgress < parallelismLimit) {
while (operationsInProgress < concurrency) {
if (index < array.length) {
operationsInProgress++;
fn(array[index], index++)
.then(() => onOperationCompletion())
.catch(reject);
try {
Promise.resolve(fn(array[index], index++))
.then(() => onOperationCompletion())
.catch(reject);
} catch (error) {
reject(error);
}
} else {
break;
}
Expand All @@ -71,7 +90,7 @@ export class Async {
* Return a promise that resolves after the specified number of milliseconds.
*/
public static async sleep(ms: number): Promise<void> {
return new Promise((resolve) => {
await new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
Expand Down
2 changes: 1 addition & 1 deletion libraries/node-core-library/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

export { AlreadyReportedError } from './AlreadyReportedError';
export { AnsiEscape, IAnsiEscapeConvertForTestsOptions } from './Terminal/AnsiEscape';
export { Async } from './Async';
export { Async, IAsyncParallelismOptions } from './Async';
export { Brand } from './PrimitiveTypes';
export { FileConstants, FolderConstants } from './Constants';
export { Enum } from './Enum';
Expand Down
62 changes: 55 additions & 7 deletions libraries/node-core-library/src/test/Async.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,32 @@
import { Async } from '../Async';

describe('Async', () => {
describe('mapLimitAsync', () => {
describe('mapAsync', () => {
it('returns the same result as built-in Promise.all', async () => {
const array: number[] = [1, 2, 3, 4, 5, 6, 7, 8];
const fn: (item: number) => Promise<string> = async (item) => `result ${item}`;

expect(await Async.mapLimitAsync(array, 1, fn)).toEqual(await Promise.all(array.map(fn)));
expect(await Async.mapAsync(array, fn)).toEqual(await Promise.all(array.map(fn)));
});

it('ensures no more than N operations occur in parallel', async () => {
it('passes an index parameter to the callback function', async () => {
const array: number[] = [1, 2, 3];
const fn: (item: number, index: number) => Promise<string> = jest.fn(async (item) => `result ${item}`);

await Async.mapAsync(array, fn);
expect(fn).toHaveBeenNthCalledWith(1, 1, 0);
expect(fn).toHaveBeenNthCalledWith(2, 2, 1);
expect(fn).toHaveBeenNthCalledWith(3, 3, 2);
});

it('returns the same result as built-in Promise.all', async () => {
const array: number[] = [1, 2, 3, 4, 5, 6, 7, 8];
const fn: (item: number) => Promise<string> = async (item) => `result ${item}`;

expect(await Async.mapAsync(array, fn)).toEqual(await Promise.all(array.map(fn)));
});

it('if concurrency is set, ensures no more than N operations occur in parallel', async () => {
let running: number = 0;
let maxRunning: number = 0;

Expand All @@ -26,7 +43,7 @@ describe('Async', () => {
return `result ${item}`;
};

expect(await Async.mapLimitAsync(array, 3, fn)).toEqual([
expect(await Async.mapAsync(array, fn, { concurrency: 3 })).toEqual([
'result 1',
'result 2',
'result 3',
Expand All @@ -40,8 +57,8 @@ describe('Async', () => {
});
});

describe('forEachLimitAsync', () => {
it('ensures no more than N operations occur in parallel', async () => {
describe('forEachAsync', () => {
it('if concurrency is set, ensures no more than N operations occur in parallel', async () => {
let running: number = 0;
let maxRunning: number = 0;

Expand All @@ -54,9 +71,40 @@ describe('Async', () => {
running--;
});

await Async.forEachLimitAsync(array, 3, fn);
await Async.forEachAsync(array, fn, { concurrency: 3 });
expect(fn).toHaveBeenCalledTimes(8);
expect(maxRunning).toEqual(3);
});

it('rejects if any operation rejects', async () => {
const array: number[] = [1, 2, 3];

const fn: (item: number) => Promise<void> = jest.fn(async (item) => {
await Async.sleep(1);
if (item === 3) throw new Error('Something broke');
});

await expect(() => Async.forEachAsync(array, fn, { concurrency: 3 })).rejects.toThrowError(
'Something broke'
);
expect(fn).toHaveBeenCalledTimes(3);
});

it('rejects if any operation synchronously throws', async () => {
const array: number[] = [1, 2, 3];

// The compiler is (rightly) very concerned about us claiming that this synchronous
// function is going to return a promise. This situation is not very likely in a
// TypeScript project, but it's such a common problem in JavaScript projects that
// it's worth doing an explicit test.
const fn: (item: number) => Promise<void> = (jest.fn((item) => {
if (item === 3) throw new Error('Something broke');
}) as unknown) as (item: number) => Promise<void>;

await expect(() => Async.forEachAsync(array, fn, { concurrency: 3 })).rejects.toThrowError(
'Something broke'
);
expect(fn).toHaveBeenCalledTimes(3);
});
});
});

0 comments on commit 6e54d2a

Please sign in to comment.