From b37cbb3e3fa3e31f5de90aa0ecb2067536f7432a Mon Sep 17 00:00:00 2001 From: Elliot Nelson Date: Fri, 30 Apr 2021 09:11:37 -0400 Subject: [PATCH 1/5] [node-core-library] Move forEachLimitAsync from heft to node-core-library --- apps/heft/src/utilities/Async.ts | 31 +------- .../node-core-async_2021-04-30-11-01.json | 11 +++ .../node-core-async_2021-04-30-11-01.json | 11 +++ common/reviews/api/node-core-library.api.md | 7 ++ libraries/node-core-library/src/Async.ts | 78 +++++++++++++++++++ libraries/node-core-library/src/index.ts | 1 + .../node-core-library/src/test/Async.test.ts | 62 +++++++++++++++ 7 files changed, 173 insertions(+), 28 deletions(-) create mode 100644 common/changes/@rushstack/heft/node-core-async_2021-04-30-11-01.json create mode 100644 common/changes/@rushstack/node-core-library/node-core-async_2021-04-30-11-01.json create mode 100644 libraries/node-core-library/src/Async.ts create mode 100644 libraries/node-core-library/src/test/Async.test.ts diff --git a/apps/heft/src/utilities/Async.ts b/apps/heft/src/utilities/Async.ts index 654aeb8dbf4..7dabe1ce3fc 100644 --- a/apps/heft/src/utilities/Async.ts +++ b/apps/heft/src/utilities/Async.ts @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT license. // See LICENSE in the project root for license information. +import { Async as CoreAsync } from '@rushstack/node-core-library'; import { ScopedLogger } from '../pluginFramework/logging/ScopedLogger'; export class Async { @@ -9,34 +10,8 @@ export class Async { parallelismLimit: number, fn: (entry: TEntry) => Promise ): Promise { - return new Promise((resolve: () => void, reject: (error: Error) => void) => { - if (parallelismLimit < 1) { - throw new Error('parallelismLimit must be at least 1'); - } - - let operationsInProgress: number = 1; - let index: number = 0; - - function onOperationCompletion(): void { - operationsInProgress--; - if (operationsInProgress === 0 && index >= array.length) { - resolve(); - } - - while (operationsInProgress < parallelismLimit) { - if (index < array.length) { - operationsInProgress++; - fn(array[index++]) - .then(() => onOperationCompletion()) - .catch(reject); - } else { - break; - } - } - } - - onOperationCompletion(); - }); + // Defer to the implementation in node-core-library + return CoreAsync.forEachLimitAsync(array, parallelismLimit, fn); } public static runWatcherWithErrorHandling(fn: () => Promise, scopedLogger: ScopedLogger): void { diff --git a/common/changes/@rushstack/heft/node-core-async_2021-04-30-11-01.json b/common/changes/@rushstack/heft/node-core-async_2021-04-30-11-01.json new file mode 100644 index 00000000000..93f2f8d01d4 --- /dev/null +++ b/common/changes/@rushstack/heft/node-core-async_2021-04-30-11-01.json @@ -0,0 +1,11 @@ +{ + "changes": [ + { + "packageName": "@rushstack/heft", + "comment": "Move forEachLimitAsync implementation out of heft", + "type": "patch" + } + ], + "packageName": "@rushstack/heft", + "email": "elliot-nelson@users.noreply.github.com" +} \ No newline at end of file diff --git a/common/changes/@rushstack/node-core-library/node-core-async_2021-04-30-11-01.json b/common/changes/@rushstack/node-core-library/node-core-async_2021-04-30-11-01.json new file mode 100644 index 00000000000..e9aef547d85 --- /dev/null +++ b/common/changes/@rushstack/node-core-library/node-core-async_2021-04-30-11-01.json @@ -0,0 +1,11 @@ +{ + "changes": [ + { + "packageName": "@rushstack/node-core-library", + "comment": "Move forEachLimitAsync implementation into node-core-library", + "type": "minor" + } + ], + "packageName": "@rushstack/node-core-library", + "email": "elliot-nelson@users.noreply.github.com" +} \ No newline at end of file diff --git a/common/reviews/api/node-core-library.api.md b/common/reviews/api/node-core-library.api.md index 1a69a249206..841c2e39756 100644 --- a/common/reviews/api/node-core-library.api.md +++ b/common/reviews/api/node-core-library.api.md @@ -27,6 +27,13 @@ export class AnsiEscape { static removeCodes(text: string): string; } +// @public +export class Async { + static forEachLimitAsync(array: TEntry[], parallelismLimit: number, fn: ((entry: TEntry) => Promise) | ((entry: TEntry, index: number) => Promise)): Promise; + static mapLimitAsync(array: TEntry[], parallelismLimit: number, fn: ((entry: TEntry) => Promise) | ((entry: TEntry, index: number) => Promise)): Promise; + static sleep(ms: number): Promise; +} + // @public export type Brand = T & { __brand: BrandTag; diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts new file mode 100644 index 00000000000..8b53a08aa5a --- /dev/null +++ b/libraries/node-core-library/src/Async.ts @@ -0,0 +1,78 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT license. +// See LICENSE in the project root for license information. + +/** + * Utilities for parallel asynchronous operations, to augment built-in Promise capability. + * @public + */ +export class Async { + /** + * Take an input array and map it through an asynchronous function, with a maximum number + * of parallel operations provided by the `parallelismLimit` parameter. + */ + public static async mapLimitAsync( + array: TEntry[], + parallelismLimit: number, + fn: ((entry: TEntry) => Promise) | ((entry: TEntry, index: number) => Promise) + ): Promise { + const result: TRetVal[] = []; + + await Async.forEachLimitAsync( + array, + parallelismLimit, + async (item: TEntry, index: number): Promise => { + result[index] = await fn(item, index); + } + ); + + return result; + } + + /** + * Take an input array and loop through it, calling an asynchronous function, with a maximum number + * of parallel operations provided by the `parallelismLimit` parameter. + */ + public static async forEachLimitAsync( + array: TEntry[], + parallelismLimit: number, + fn: ((entry: TEntry) => Promise) | ((entry: TEntry, index: number) => Promise) + ): Promise { + return new Promise((resolve: () => void, reject: (error: Error) => void) => { + if (parallelismLimit < 1) { + throw new Error('parallelismLimit must be at least 1'); + } + + let operationsInProgress: number = 1; + let index: number = 0; + + function onOperationCompletion(): void { + operationsInProgress--; + if (operationsInProgress === 0 && index >= array.length) { + resolve(); + } + + while (operationsInProgress < parallelismLimit) { + if (index < array.length) { + operationsInProgress++; + fn(array[index], index++) + .then(() => onOperationCompletion()) + .catch(reject); + } else { + break; + } + } + } + + onOperationCompletion(); + }); + } + + /** + * Return a promise that resolves after the specified number of milliseconds. + */ + public static async sleep(ms: number): Promise { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); + } +} diff --git a/libraries/node-core-library/src/index.ts b/libraries/node-core-library/src/index.ts index 275095b52c9..e267abe39bd 100644 --- a/libraries/node-core-library/src/index.ts +++ b/libraries/node-core-library/src/index.ts @@ -9,6 +9,7 @@ export { AlreadyReportedError } from './AlreadyReportedError'; export { AnsiEscape, IAnsiEscapeConvertForTestsOptions } from './Terminal/AnsiEscape'; +export { Async } from './Async'; export { Brand } from './PrimitiveTypes'; export { FileConstants, FolderConstants } from './Constants'; export { Enum } from './Enum'; diff --git a/libraries/node-core-library/src/test/Async.test.ts b/libraries/node-core-library/src/test/Async.test.ts new file mode 100644 index 00000000000..3069a0e790d --- /dev/null +++ b/libraries/node-core-library/src/test/Async.test.ts @@ -0,0 +1,62 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT license. +// See LICENSE in the project root for license information. + +import { Async } from '../Async'; + +describe('Async', () => { + describe('mapLimitAsync', () => { + it('returns the same result as built-in Promise.all', async () => { + const array: number[] = [1, 2, 3, 4, 5, 6, 7, 8]; + const fn: (item: number) => Promise = async (item) => `result ${item}`; + + expect(await Async.mapLimitAsync(array, 1, fn)).toEqual(await Promise.all(array.map(fn))); + }); + + it('ensures no more than N operations occur in parallel', async () => { + let running: number = 0; + let maxRunning: number = 0; + + const array: number[] = [1, 2, 3, 4, 5, 6, 7, 8]; + + const fn: (item: number) => Promise = async (item) => { + running++; + await Async.sleep(1); + maxRunning = Math.max(maxRunning, running); + running--; + return `result ${item}`; + }; + + expect(await Async.mapLimitAsync(array, 3, fn)).toEqual([ + 'result 1', + 'result 2', + 'result 3', + 'result 4', + 'result 5', + 'result 6', + 'result 7', + 'result 8' + ]); + expect(maxRunning).toEqual(3); + }); + }); + + describe('forEachLimitAsync', () => { + it('ensures no more than N operations occur in parallel', async () => { + let running: number = 0; + let maxRunning: number = 0; + + const array: number[] = [1, 2, 3, 4, 5, 6, 7, 8]; + + const fn: (item: number) => Promise = jest.fn(async (item) => { + running++; + await Async.sleep(1); + maxRunning = Math.max(maxRunning, running); + running--; + }); + + await Async.forEachLimitAsync(array, 3, fn); + expect(fn).toHaveBeenCalledTimes(8); + expect(maxRunning).toEqual(3); + }); + }); +}); From c8251a22ff0860dfb64741dbe2956f239753f9c6 Mon Sep 17 00:00:00 2001 From: Elliot Nelson Date: Fri, 30 Apr 2021 09:11:58 -0400 Subject: [PATCH 2/5] [node-core-library] Replace setTimeout with Async.sleep --- libraries/node-core-library/src/LockFile.ts | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/libraries/node-core-library/src/LockFile.ts b/libraries/node-core-library/src/LockFile.ts index adacd3376bf..3e7d3734e93 100644 --- a/libraries/node-core-library/src/LockFile.ts +++ b/libraries/node-core-library/src/LockFile.ts @@ -3,9 +3,9 @@ import * as path from 'path'; import * as child_process from 'child_process'; -import { setTimeout } from 'timers'; import { FileSystem } from './FileSystem'; import { FileWriter } from './FileWriter'; +import { Async } from './Async'; /** * http://man7.org/linux/man-pages/man5/proc.5.html @@ -231,21 +231,13 @@ export class LockFile { throw new Error(`Exceeded maximum wait time to acquire lock for resource "${resourceName}"`); } - await LockFile._sleepForMs(interval); + await Async.sleep(interval); return retryLoop(); }; return retryLoop(); } - private static _sleepForMs(timeout: number): Promise { - return new Promise((resolve: () => void, reject: () => void) => { - setTimeout(() => { - resolve(); - }, timeout); - }); - } - /** * Attempts to acquire the lock on a Linux or OSX machine */ From 6e54d2a0b3289a09ae4ac70b756b92f176b2d85c Mon Sep 17 00:00:00 2001 From: Elliot Nelson Date: Sat, 1 May 2021 12:26:08 -0400 Subject: [PATCH 3/5] Update API for async functions --- apps/heft/src/utilities/Async.ts | 2 +- common/reviews/api/node-core-library.api.md | 11 ++- libraries/node-core-library/src/Async.ts | 69 ++++++++++++------- libraries/node-core-library/src/index.ts | 2 +- .../node-core-library/src/test/Async.test.ts | 62 +++++++++++++++-- 5 files changed, 109 insertions(+), 37 deletions(-) diff --git a/apps/heft/src/utilities/Async.ts b/apps/heft/src/utilities/Async.ts index 7dabe1ce3fc..5e5d383096b 100644 --- a/apps/heft/src/utilities/Async.ts +++ b/apps/heft/src/utilities/Async.ts @@ -11,7 +11,7 @@ export class Async { fn: (entry: TEntry) => Promise ): Promise { // Defer to the implementation in node-core-library - return CoreAsync.forEachLimitAsync(array, parallelismLimit, fn); + return CoreAsync.forEachAsync(array, fn, { concurrency: parallelismLimit }); } public static runWatcherWithErrorHandling(fn: () => Promise, scopedLogger: ScopedLogger): void { diff --git a/common/reviews/api/node-core-library.api.md b/common/reviews/api/node-core-library.api.md index 841c2e39756..b0dc96581ad 100644 --- a/common/reviews/api/node-core-library.api.md +++ b/common/reviews/api/node-core-library.api.md @@ -27,10 +27,10 @@ export class AnsiEscape { static removeCodes(text: string): string; } -// @public +// @beta export class Async { - static forEachLimitAsync(array: TEntry[], parallelismLimit: number, fn: ((entry: TEntry) => Promise) | ((entry: TEntry, index: number) => Promise)): Promise; - static mapLimitAsync(array: TEntry[], parallelismLimit: number, fn: ((entry: TEntry) => Promise) | ((entry: TEntry, index: number) => Promise)): Promise; + static forEachAsync(array: TEntry[], fn: (entry: TEntry, index: number) => Promise, options?: IAsyncParallelismOptions | undefined): Promise; + static mapAsync(array: TEntry[], fn: (entry: TEntry, index: number) => Promise, options?: IAsyncParallelismOptions | undefined): Promise; static sleep(ms: number): Promise; } @@ -267,6 +267,11 @@ export interface IAnsiEscapeConvertForTestsOptions { encodeNewlines?: boolean; } +// @beta +export interface IAsyncParallelismOptions { + concurrency?: number; +} + // @beta (undocumented) export interface IColorableSequence { // (undocumented) diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index 8b53a08aa5a..12bad187e85 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -2,46 +2,61 @@ // See LICENSE in the project root for license information. /** - * Utilities for parallel asynchronous operations, to augment built-in Promise capability. - * @public + * Options for controlling the parallelism of asynchronous operations. + * @beta + */ +export interface IAsyncParallelismOptions { + /** + * If provided, asynchronous operations like `mapAsync` and `forEachAsync` will limit the + * number of concurrent operations to the specified number. + */ + concurrency?: number; +} + +/** + * Utilities for parallel asynchronous operations, to augment built-in Promises capability. + * @beta */ export class Async { /** - * Take an input array and map it through an asynchronous function, with a maximum number - * of parallel operations provided by the `parallelismLimit` parameter. + * Given an input array and an asynchronous callback function, execute the callback + * function for every element in the array and return a promise for an array containing + * the results. + * + * Behaves like an asynchronous version of built-in `Array#map`. */ - public static async mapLimitAsync( + public static async mapAsync( array: TEntry[], - parallelismLimit: number, - fn: ((entry: TEntry) => Promise) | ((entry: TEntry, index: number) => Promise) + fn: (entry: TEntry, index: number) => Promise, + options?: IAsyncParallelismOptions | undefined ): Promise { const result: TRetVal[] = []; - await Async.forEachLimitAsync( + await Async.forEachAsync( array, - parallelismLimit, async (item: TEntry, index: number): Promise => { result[index] = await fn(item, index); - } + }, + options ); return result; } /** - * Take an input array and loop through it, calling an asynchronous function, with a maximum number - * of parallel operations provided by the `parallelismLimit` parameter. + * Given an input array and an asynchronous callback function, execute the callback + * function for every element in the array and return a void promise. + * + * Behaves like an asynchronous version of built-in `Array#forEach`. */ - public static async forEachLimitAsync( + public static async forEachAsync( array: TEntry[], - parallelismLimit: number, - fn: ((entry: TEntry) => Promise) | ((entry: TEntry, index: number) => Promise) + fn: (entry: TEntry, index: number) => Promise, + options?: IAsyncParallelismOptions | undefined ): Promise { - return new Promise((resolve: () => void, reject: (error: Error) => void) => { - if (parallelismLimit < 1) { - throw new Error('parallelismLimit must be at least 1'); - } - + await new Promise((resolve: () => void, reject: (error: Error) => void) => { + const concurrency: number = + options?.concurrency && options.concurrency > 0 ? options.concurrency : Infinity; let operationsInProgress: number = 1; let index: number = 0; @@ -51,12 +66,16 @@ export class Async { resolve(); } - while (operationsInProgress < parallelismLimit) { + while (operationsInProgress < concurrency) { if (index < array.length) { operationsInProgress++; - fn(array[index], index++) - .then(() => onOperationCompletion()) - .catch(reject); + try { + Promise.resolve(fn(array[index], index++)) + .then(() => onOperationCompletion()) + .catch(reject); + } catch (error) { + reject(error); + } } else { break; } @@ -71,7 +90,7 @@ export class Async { * Return a promise that resolves after the specified number of milliseconds. */ public static async sleep(ms: number): Promise { - return new Promise((resolve) => { + await new Promise((resolve) => { setTimeout(resolve, ms); }); } diff --git a/libraries/node-core-library/src/index.ts b/libraries/node-core-library/src/index.ts index e267abe39bd..317e43ea4b0 100644 --- a/libraries/node-core-library/src/index.ts +++ b/libraries/node-core-library/src/index.ts @@ -9,7 +9,7 @@ export { AlreadyReportedError } from './AlreadyReportedError'; export { AnsiEscape, IAnsiEscapeConvertForTestsOptions } from './Terminal/AnsiEscape'; -export { Async } from './Async'; +export { Async, IAsyncParallelismOptions } from './Async'; export { Brand } from './PrimitiveTypes'; export { FileConstants, FolderConstants } from './Constants'; export { Enum } from './Enum'; diff --git a/libraries/node-core-library/src/test/Async.test.ts b/libraries/node-core-library/src/test/Async.test.ts index 3069a0e790d..c0b5ca136d2 100644 --- a/libraries/node-core-library/src/test/Async.test.ts +++ b/libraries/node-core-library/src/test/Async.test.ts @@ -4,15 +4,32 @@ import { Async } from '../Async'; describe('Async', () => { - describe('mapLimitAsync', () => { + describe('mapAsync', () => { it('returns the same result as built-in Promise.all', async () => { const array: number[] = [1, 2, 3, 4, 5, 6, 7, 8]; const fn: (item: number) => Promise = async (item) => `result ${item}`; - expect(await Async.mapLimitAsync(array, 1, fn)).toEqual(await Promise.all(array.map(fn))); + expect(await Async.mapAsync(array, fn)).toEqual(await Promise.all(array.map(fn))); }); - it('ensures no more than N operations occur in parallel', async () => { + it('passes an index parameter to the callback function', async () => { + const array: number[] = [1, 2, 3]; + const fn: (item: number, index: number) => Promise = jest.fn(async (item) => `result ${item}`); + + await Async.mapAsync(array, fn); + expect(fn).toHaveBeenNthCalledWith(1, 1, 0); + expect(fn).toHaveBeenNthCalledWith(2, 2, 1); + expect(fn).toHaveBeenNthCalledWith(3, 3, 2); + }); + + it('returns the same result as built-in Promise.all', async () => { + const array: number[] = [1, 2, 3, 4, 5, 6, 7, 8]; + const fn: (item: number) => Promise = async (item) => `result ${item}`; + + expect(await Async.mapAsync(array, fn)).toEqual(await Promise.all(array.map(fn))); + }); + + it('if concurrency is set, ensures no more than N operations occur in parallel', async () => { let running: number = 0; let maxRunning: number = 0; @@ -26,7 +43,7 @@ describe('Async', () => { return `result ${item}`; }; - expect(await Async.mapLimitAsync(array, 3, fn)).toEqual([ + expect(await Async.mapAsync(array, fn, { concurrency: 3 })).toEqual([ 'result 1', 'result 2', 'result 3', @@ -40,8 +57,8 @@ describe('Async', () => { }); }); - describe('forEachLimitAsync', () => { - it('ensures no more than N operations occur in parallel', async () => { + describe('forEachAsync', () => { + it('if concurrency is set, ensures no more than N operations occur in parallel', async () => { let running: number = 0; let maxRunning: number = 0; @@ -54,9 +71,40 @@ describe('Async', () => { running--; }); - await Async.forEachLimitAsync(array, 3, fn); + await Async.forEachAsync(array, fn, { concurrency: 3 }); expect(fn).toHaveBeenCalledTimes(8); expect(maxRunning).toEqual(3); }); + + it('rejects if any operation rejects', async () => { + const array: number[] = [1, 2, 3]; + + const fn: (item: number) => Promise = jest.fn(async (item) => { + await Async.sleep(1); + if (item === 3) throw new Error('Something broke'); + }); + + await expect(() => Async.forEachAsync(array, fn, { concurrency: 3 })).rejects.toThrowError( + 'Something broke' + ); + expect(fn).toHaveBeenCalledTimes(3); + }); + + it('rejects if any operation synchronously throws', async () => { + const array: number[] = [1, 2, 3]; + + // The compiler is (rightly) very concerned about us claiming that this synchronous + // function is going to return a promise. This situation is not very likely in a + // TypeScript project, but it's such a common problem in JavaScript projects that + // it's worth doing an explicit test. + const fn: (item: number) => Promise = (jest.fn((item) => { + if (item === 3) throw new Error('Something broke'); + }) as unknown) as (item: number) => Promise; + + await expect(() => Async.forEachAsync(array, fn, { concurrency: 3 })).rejects.toThrowError( + 'Something broke' + ); + expect(fn).toHaveBeenCalledTimes(3); + }); }); }); From 5fe71ba69f887af16e4df7ecd2e3c8b36ce1bda7 Mon Sep 17 00:00:00 2001 From: Elliot Nelson Date: Sat, 1 May 2021 15:09:11 -0400 Subject: [PATCH 4/5] Update common/changes/@rushstack/node-core-library/node-core-async_2021-04-30-11-01.json Co-authored-by: Pete Gonzalez <4673363+octogonz@users.noreply.github.com> --- .../node-core-library/node-core-async_2021-04-30-11-01.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/changes/@rushstack/node-core-library/node-core-async_2021-04-30-11-01.json b/common/changes/@rushstack/node-core-library/node-core-async_2021-04-30-11-01.json index e9aef547d85..0d601549a66 100644 --- a/common/changes/@rushstack/node-core-library/node-core-async_2021-04-30-11-01.json +++ b/common/changes/@rushstack/node-core-library/node-core-async_2021-04-30-11-01.json @@ -2,10 +2,10 @@ "changes": [ { "packageName": "@rushstack/node-core-library", - "comment": "Move forEachLimitAsync implementation into node-core-library", + "comment": "Add a new API \"Async\" with some utilities for working with promises", "type": "minor" } ], "packageName": "@rushstack/node-core-library", "email": "elliot-nelson@users.noreply.github.com" -} \ No newline at end of file +} From b3414bfef116e9edeee9e3cd452bc136ab97209d Mon Sep 17 00:00:00 2001 From: Pete Gonzalez <4673363+octogonz@users.noreply.github.com> Date: Sat, 1 May 2021 12:15:35 -0700 Subject: [PATCH 5/5] Update API doc comments --- common/reviews/api/node-core-library.api.md | 4 +- libraries/node-core-library/src/Async.ts | 66 +++++++++++++++------ 2 files changed, 50 insertions(+), 20 deletions(-) diff --git a/common/reviews/api/node-core-library.api.md b/common/reviews/api/node-core-library.api.md index b0dc96581ad..eeb24aee5d9 100644 --- a/common/reviews/api/node-core-library.api.md +++ b/common/reviews/api/node-core-library.api.md @@ -29,8 +29,8 @@ export class AnsiEscape { // @beta export class Async { - static forEachAsync(array: TEntry[], fn: (entry: TEntry, index: number) => Promise, options?: IAsyncParallelismOptions | undefined): Promise; - static mapAsync(array: TEntry[], fn: (entry: TEntry, index: number) => Promise, options?: IAsyncParallelismOptions | undefined): Promise; + static forEachAsync(array: TEntry[], callback: (entry: TEntry, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions | undefined): Promise; + static mapAsync(array: TEntry[], callback: (entry: TEntry, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions | undefined): Promise; static sleep(ms: number): Promise; } diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index 12bad187e85..8ec16483582 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -3,39 +3,57 @@ /** * Options for controlling the parallelism of asynchronous operations. + * + * @remarks + * Used with {@link Async.mapAsync} and {@link Async.forEachAsync}. + * * @beta */ export interface IAsyncParallelismOptions { /** - * If provided, asynchronous operations like `mapAsync` and `forEachAsync` will limit the - * number of concurrent operations to the specified number. + * Optionally used with the {@link Async.mapAsync} and {@link Async.forEachAsync} + * to limit the maximum number of concurrent promises to the specified number. */ concurrency?: number; } /** - * Utilities for parallel asynchronous operations, to augment built-in Promises capability. + * Utilities for parallel asynchronous operations, for use with the system `Promise` APIs. + * * @beta */ export class Async { /** - * Given an input array and an asynchronous callback function, execute the callback - * function for every element in the array and return a promise for an array containing - * the results. + * Given an input array and a `callback` function, invoke the callback to start a + * promise for each element in the array. Returns an array containing the results. * - * Behaves like an asynchronous version of built-in `Array#map`. + * @remarks + * This API is similar to the system `Array#map`, except that the loop is asynchronous, + * and the maximum number of concurrent promises can be throttled + * using {@link IAsyncParallelismOptions.concurrency}. + * + * If `callback` throws a synchronous exception, or if it returns a promise that rejects, + * then the loop stops immediately. Any remaining array items will be skipped, and + * overall operation will reject with the first error that was encountered. + * + * @param array - the array of inputs for the callback function + * @param callback - a function that starts an asynchronous promise for an element + * from the array + * @param options - options for customizing the control flow + * @returns an array containing the result for each callback, in the same order + * as the original input `array` */ public static async mapAsync( array: TEntry[], - fn: (entry: TEntry, index: number) => Promise, + callback: (entry: TEntry, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions | undefined ): Promise { const result: TRetVal[] = []; await Async.forEachAsync( array, - async (item: TEntry, index: number): Promise => { - result[index] = await fn(item, index); + async (item: TEntry, arrayIndex: number): Promise => { + result[arrayIndex] = await callback(item, arrayIndex); }, options ); @@ -44,33 +62,45 @@ export class Async { } /** - * Given an input array and an asynchronous callback function, execute the callback - * function for every element in the array and return a void promise. + * Given an input array and a `callback` function, invoke the callback to start a + * promise for each element in the array. + * + * @remarks + * This API is similar to the system `Array#forEach`, except that the loop is asynchronous, + * and the maximum number of concurrent promises can be throttled + * using {@link IAsyncParallelismOptions.concurrency}. + * + * If `callback` throws a synchronous exception, or if it returns a promise that rejects, + * then the loop stops immediately. Any remaining array items will be skipped, and + * overall operation will reject with the first error that was encountered. * - * Behaves like an asynchronous version of built-in `Array#forEach`. + * @param array - the array of inputs for the callback function + * @param callback - a function that starts an asynchronous promise for an element + * from the array + * @param options - options for customizing the control flow */ public static async forEachAsync( array: TEntry[], - fn: (entry: TEntry, index: number) => Promise, + callback: (entry: TEntry, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions | undefined ): Promise { await new Promise((resolve: () => void, reject: (error: Error) => void) => { const concurrency: number = options?.concurrency && options.concurrency > 0 ? options.concurrency : Infinity; let operationsInProgress: number = 1; - let index: number = 0; + let arrayIndex: number = 0; function onOperationCompletion(): void { operationsInProgress--; - if (operationsInProgress === 0 && index >= array.length) { + if (operationsInProgress === 0 && arrayIndex >= array.length) { resolve(); } while (operationsInProgress < concurrency) { - if (index < array.length) { + if (arrayIndex < array.length) { operationsInProgress++; try { - Promise.resolve(fn(array[index], index++)) + Promise.resolve(callback(array[arrayIndex], arrayIndex++)) .then(() => onOperationCompletion()) .catch(reject); } catch (error) {