Skip to content

Commit

Permalink
feat: support rate limiting in processQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
dylandepass committed Feb 1, 2025
1 parent dccc0b5 commit e4ad90d
Show file tree
Hide file tree
Showing 5 changed files with 291 additions and 10 deletions.
126 changes: 122 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions packages/helix-shared-process-queue/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,8 @@
"homepage": "https://github.com/adobe/helix-shared#readme",
"publishConfig": {
"access": "public"
},
"devDependencies": {
"sinon": "19.0.2"
}
}
20 changes: 17 additions & 3 deletions packages/helix-shared-process-queue/src/process-queue.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,13 @@ export declare type ProcessQueueHandler<
* entries during processing. It returns the `results` array which is either populated by the
* queue handler function directly or with the return values of the handler functions.
*
* Optionally, rate limiting can be applied using a Token Bucket algorithm to throttle the rate at
* which tasks are processed.
*
* @param queue A list of entries to be processed
* @param fn A handler function
* @param {number} [maxConcurrent = 8] Concurrency level
* @param {number} [maxConcurrent=8] Concurrency level
* @param {RateLimitOptions} [rateLimitOptions] Optional rate limit options for throttling processing.
* @returns the results
*/
export default function processQueue<
Expand All @@ -63,7 +67,17 @@ export default function processQueue<
>(
queue: TQueue,
fn: THandler,
maxConcurrent?: number
maxConcurrent?: number,
rateLimitOptions?: RateLimitOptions | null
): Promise<TReturn[]>;


/**
* Rate limiting options for processQueue
*
* @property {number} limit Maximum number of items processed within the interval
* @property {number} interval Time window in milliseconds
*/
export declare type RateLimitOptions = {
limit: number;
interval: number;
};
70 changes: 67 additions & 3 deletions packages/helix-shared-process-queue/src/process-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,81 @@ function* dequeue(queue) {
}

/**
* Processes the given queue concurrently. The handler functions can add more items to the queue
* if needed.
* Creates a token bucket rate limiter.
*
* The token bucket algorithm controls the rate of operations by maintaining a "bucket"
* that holds a fixed number of tokens. Each operation that wishes to proceed must first
* acquire a token. The bucket is initially filled with `limit` tokens and is refilled
* back to that level after every `interval` milliseconds.
*
* The returned async function, `waitForToken`, implements this logic:
* - It checks if enough time has elapsed since the last refill. If so, it refills the bucket
* - If a token is available, it decrements the token count and returns immediately
* - If no tokens are available, it waits for a short period before trying to refill
*
* This guarantes that no more than `limit` operations are performed within any
* given `interval`, thus enforcing the specified rate limit.
*
* @param {number} limit Maximum tokens (operations) allowed per interval
* @param {number} interval Time period in ms after which the token bucket is refilled
* @returns {Function} An async function that waits until a token is available
*/
function createRateLimiter(limit, interval) {
let tokens = limit;
let lastRefill = Date.now();

return async function waitForToken() {
// eslint-disable-next-line no-constant-condition
while (true) {
const now = Date.now();

// Refill tokens if the interval has passed
if (now - lastRefill >= interval) {
tokens = limit;
lastRefill = now;
}

// If a token is available, consume one and exit
if (tokens > 0) {
tokens -= 1;
return;
}

// Else, wait before checking again
// eslint-disable-next-line no-await-in-loop, no-promise-executor-return
await new Promise((resolve) => setTimeout(resolve, 50));
}
};
}

/**
* Processes the given queue concurrently, optionally enforcing rate limits via a Token Bucket.
* The handler function may add more items to the queue as needed.
*
* @param {Iterable|Array} queue A list of tasks
* @param {ProcessQueueHandler} fn A handler function `fn(task:any, queue:array, results:array)`
* @param {number} [maxConcurrent = 8] Concurrency level
* @param {RateLimitOptions} [rateLimitOptions=null] Optional rate limiting options
* @returns {Promise<Array>} the results
*/
export default async function processQueue(queue, fn, maxConcurrent = 8) {
export default async function processQueue(queue, fn, maxConcurrent = 8, rateLimitOptions = null) {
if (typeof queue !== 'object') {
throw Error('invalid queue argument: iterable expected');
}

// noop by default
let waitForToken = async () => {};

// If rate limiting options are provided, define a token bucket limiter.
if (
rateLimitOptions
&& rateLimitOptions.limit != null
&& rateLimitOptions.interval != null
) {
const { limit, interval } = rateLimitOptions;
waitForToken = createRateLimiter(limit, interval);
}

const running = [];
const results = [];

Expand Down Expand Up @@ -65,6 +127,8 @@ export default async function processQueue(queue, fn, maxConcurrent = 8) {
}

for await (const value of iter) {
await waitForToken();

while (running.length >= maxConcurrent) {
// eslint-disable-next-line no-await-in-loop
await Promise.race(running);
Expand Down
Loading

0 comments on commit e4ad90d

Please sign in to comment.