Skip to content

Commit

Permalink
Merge pull request #2665 from elliot-nelson/node-core-async
Browse files Browse the repository at this point in the history
[node-core-library] Provide async utilities in core library
  • Loading branch information
octogonz authored May 1, 2021
2 parents f5b9d3c + 7a7534d commit c2c27e2
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 38 deletions.
31 changes: 3 additions & 28 deletions apps/heft/src/utilities/Async.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT license.
// See LICENSE in the project root for license information.

import { Async as CoreAsync } from '@rushstack/node-core-library';
import { ScopedLogger } from '../pluginFramework/logging/ScopedLogger';

export class Async {
Expand All @@ -9,34 +10,8 @@ export class Async {
parallelismLimit: number,
fn: (entry: TEntry) => Promise<void>
): Promise<void> {
return new Promise((resolve: () => void, reject: (error: Error) => void) => {
if (parallelismLimit < 1) {
throw new Error('parallelismLimit must be at least 1');
}

let operationsInProgress: number = 1;
let index: number = 0;

function onOperationCompletion(): void {
operationsInProgress--;
if (operationsInProgress === 0 && index >= array.length) {
resolve();
}

while (operationsInProgress < parallelismLimit) {
if (index < array.length) {
operationsInProgress++;
fn(array[index++])
.then(() => onOperationCompletion())
.catch(reject);
} else {
break;
}
}
}

onOperationCompletion();
});
// Defer to the implementation in node-core-library
return CoreAsync.forEachAsync(array, fn, { concurrency: parallelismLimit });
}

public static runWatcherWithErrorHandling(fn: () => Promise<void>, scopedLogger: ScopedLogger): void {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"changes": [
{
"packageName": "@rushstack/heft",
"comment": "Move forEachLimitAsync implementation out of heft",
"type": "patch"
}
],
"packageName": "@rushstack/heft",
"email": "[email protected]"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"changes": [
{
"packageName": "@rushstack/node-core-library",
"comment": "Add a new API \"Async\" with some utilities for working with promises",
"type": "minor"
}
],
"packageName": "@rushstack/node-core-library",
"email": "[email protected]"
}
12 changes: 12 additions & 0 deletions common/reviews/api/node-core-library.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ export class AnsiEscape {
static removeCodes(text: string): string;
}

// @beta
export class Async {
static forEachAsync<TEntry>(array: TEntry[], callback: (entry: TEntry, arrayIndex: number) => Promise<void>, options?: IAsyncParallelismOptions | undefined): Promise<void>;
static mapAsync<TEntry, TRetVal>(array: TEntry[], callback: (entry: TEntry, arrayIndex: number) => Promise<TRetVal>, options?: IAsyncParallelismOptions | undefined): Promise<TRetVal[]>;
static sleep(ms: number): Promise<void>;
}

// @public
export type Brand<T, BrandTag extends string> = T & {
__brand: BrandTag;
Expand Down Expand Up @@ -260,6 +267,11 @@ export interface IAnsiEscapeConvertForTestsOptions {
encodeNewlines?: boolean;
}

// @beta
export interface IAsyncParallelismOptions {
concurrency?: number;
}

// @beta (undocumented)
export interface IColorableSequence {
// (undocumented)
Expand Down
127 changes: 127 additions & 0 deletions libraries/node-core-library/src/Async.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT license.
// See LICENSE in the project root for license information.

/**
* Options for controlling the parallelism of asynchronous operations.
*
* @remarks
* Used with {@link Async.mapAsync} and {@link Async.forEachAsync}.
*
* @beta
*/
export interface IAsyncParallelismOptions {
/**
* Optionally used with the {@link Async.mapAsync} and {@link Async.forEachAsync}
* to limit the maximum number of concurrent promises to the specified number.
*/
concurrency?: number;
}

/**
* Utilities for parallel asynchronous operations, for use with the system `Promise` APIs.
*
* @beta
*/
export class Async {
/**
* Given an input array and a `callback` function, invoke the callback to start a
* promise for each element in the array. Returns an array containing the results.
*
* @remarks
* This API is similar to the system `Array#map`, except that the loop is asynchronous,
* and the maximum number of concurrent promises can be throttled
* using {@link IAsyncParallelismOptions.concurrency}.
*
* If `callback` throws a synchronous exception, or if it returns a promise that rejects,
* then the loop stops immediately. Any remaining array items will be skipped, and
* overall operation will reject with the first error that was encountered.
*
* @param array - the array of inputs for the callback function
* @param callback - a function that starts an asynchronous promise for an element
* from the array
* @param options - options for customizing the control flow
* @returns an array containing the result for each callback, in the same order
* as the original input `array`
*/
public static async mapAsync<TEntry, TRetVal>(
array: TEntry[],
callback: (entry: TEntry, arrayIndex: number) => Promise<TRetVal>,
options?: IAsyncParallelismOptions | undefined
): Promise<TRetVal[]> {
const result: TRetVal[] = [];

await Async.forEachAsync(
array,
async (item: TEntry, arrayIndex: number): Promise<void> => {
result[arrayIndex] = await callback(item, arrayIndex);
},
options
);

return result;
}

/**
* Given an input array and a `callback` function, invoke the callback to start a
* promise for each element in the array.
*
* @remarks
* This API is similar to the system `Array#forEach`, except that the loop is asynchronous,
* and the maximum number of concurrent promises can be throttled
* using {@link IAsyncParallelismOptions.concurrency}.
*
* If `callback` throws a synchronous exception, or if it returns a promise that rejects,
* then the loop stops immediately. Any remaining array items will be skipped, and
* overall operation will reject with the first error that was encountered.
*
* @param array - the array of inputs for the callback function
* @param callback - a function that starts an asynchronous promise for an element
* from the array
* @param options - options for customizing the control flow
*/
public static async forEachAsync<TEntry>(
array: TEntry[],
callback: (entry: TEntry, arrayIndex: number) => Promise<void>,
options?: IAsyncParallelismOptions | undefined
): Promise<void> {
await new Promise((resolve: () => void, reject: (error: Error) => void) => {
const concurrency: number =
options?.concurrency && options.concurrency > 0 ? options.concurrency : Infinity;
let operationsInProgress: number = 1;
let arrayIndex: number = 0;

function onOperationCompletion(): void {
operationsInProgress--;
if (operationsInProgress === 0 && arrayIndex >= array.length) {
resolve();
}

while (operationsInProgress < concurrency) {
if (arrayIndex < array.length) {
operationsInProgress++;
try {
Promise.resolve(callback(array[arrayIndex], arrayIndex++))
.then(() => onOperationCompletion())
.catch(reject);
} catch (error) {
reject(error);
}
} else {
break;
}
}
}

onOperationCompletion();
});
}

/**
* Return a promise that resolves after the specified number of milliseconds.
*/
public static async sleep(ms: number): Promise<void> {
await new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
}
12 changes: 2 additions & 10 deletions libraries/node-core-library/src/LockFile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

import * as path from 'path';
import * as child_process from 'child_process';
import { setTimeout } from 'timers';
import { FileSystem } from './FileSystem';
import { FileWriter } from './FileWriter';
import { Async } from './Async';

/**
* http://man7.org/linux/man-pages/man5/proc.5.html
Expand Down Expand Up @@ -231,21 +231,13 @@ export class LockFile {
throw new Error(`Exceeded maximum wait time to acquire lock for resource "${resourceName}"`);
}

await LockFile._sleepForMs(interval);
await Async.sleep(interval);
return retryLoop();
};

return retryLoop();
}

private static _sleepForMs(timeout: number): Promise<void> {
return new Promise<void>((resolve: () => void, reject: () => void) => {
setTimeout(() => {
resolve();
}, timeout);
});
}

/**
* Attempts to acquire the lock on a Linux or OSX machine
*/
Expand Down
1 change: 1 addition & 0 deletions libraries/node-core-library/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

export { AlreadyReportedError } from './AlreadyReportedError';
export { AnsiEscape, IAnsiEscapeConvertForTestsOptions } from './Terminal/AnsiEscape';
export { Async, IAsyncParallelismOptions } from './Async';
export { Brand } from './PrimitiveTypes';
export { FileConstants, FolderConstants } from './Constants';
export { Enum } from './Enum';
Expand Down
110 changes: 110 additions & 0 deletions libraries/node-core-library/src/test/Async.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT license.
// See LICENSE in the project root for license information.

import { Async } from '../Async';

describe('Async', () => {
describe('mapAsync', () => {
it('returns the same result as built-in Promise.all', async () => {
const array: number[] = [1, 2, 3, 4, 5, 6, 7, 8];
const fn: (item: number) => Promise<string> = async (item) => `result ${item}`;

expect(await Async.mapAsync(array, fn)).toEqual(await Promise.all(array.map(fn)));
});

it('passes an index parameter to the callback function', async () => {
const array: number[] = [1, 2, 3];
const fn: (item: number, index: number) => Promise<string> = jest.fn(async (item) => `result ${item}`);

await Async.mapAsync(array, fn);
expect(fn).toHaveBeenNthCalledWith(1, 1, 0);
expect(fn).toHaveBeenNthCalledWith(2, 2, 1);
expect(fn).toHaveBeenNthCalledWith(3, 3, 2);
});

it('returns the same result as built-in Promise.all', async () => {
const array: number[] = [1, 2, 3, 4, 5, 6, 7, 8];
const fn: (item: number) => Promise<string> = async (item) => `result ${item}`;

expect(await Async.mapAsync(array, fn)).toEqual(await Promise.all(array.map(fn)));
});

it('if concurrency is set, ensures no more than N operations occur in parallel', async () => {
let running: number = 0;
let maxRunning: number = 0;

const array: number[] = [1, 2, 3, 4, 5, 6, 7, 8];

const fn: (item: number) => Promise<string> = async (item) => {
running++;
await Async.sleep(1);
maxRunning = Math.max(maxRunning, running);
running--;
return `result ${item}`;
};

expect(await Async.mapAsync(array, fn, { concurrency: 3 })).toEqual([
'result 1',
'result 2',
'result 3',
'result 4',
'result 5',
'result 6',
'result 7',
'result 8'
]);
expect(maxRunning).toEqual(3);
});
});

describe('forEachAsync', () => {
it('if concurrency is set, ensures no more than N operations occur in parallel', async () => {
let running: number = 0;
let maxRunning: number = 0;

const array: number[] = [1, 2, 3, 4, 5, 6, 7, 8];

const fn: (item: number) => Promise<void> = jest.fn(async (item) => {
running++;
await Async.sleep(1);
maxRunning = Math.max(maxRunning, running);
running--;
});

await Async.forEachAsync(array, fn, { concurrency: 3 });
expect(fn).toHaveBeenCalledTimes(8);
expect(maxRunning).toEqual(3);
});

it('rejects if any operation rejects', async () => {
const array: number[] = [1, 2, 3];

const fn: (item: number) => Promise<void> = jest.fn(async (item) => {
await Async.sleep(1);
if (item === 3) throw new Error('Something broke');
});

await expect(() => Async.forEachAsync(array, fn, { concurrency: 3 })).rejects.toThrowError(
'Something broke'
);
expect(fn).toHaveBeenCalledTimes(3);
});

it('rejects if any operation synchronously throws', async () => {
const array: number[] = [1, 2, 3];

// The compiler is (rightly) very concerned about us claiming that this synchronous
// function is going to return a promise. This situation is not very likely in a
// TypeScript project, but it's such a common problem in JavaScript projects that
// it's worth doing an explicit test.
const fn: (item: number) => Promise<void> = (jest.fn((item) => {
if (item === 3) throw new Error('Something broke');
}) as unknown) as (item: number) => Promise<void>;

await expect(() => Async.forEachAsync(array, fn, { concurrency: 3 })).rejects.toThrowError(
'Something broke'
);
expect(fn).toHaveBeenCalledTimes(3);
});
});
});

0 comments on commit c2c27e2

Please sign in to comment.