Skip to content

Commit

Permalink
feat: adding the ability to execute the consumer concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
nicholasgriffintn committed Dec 31, 2023
1 parent ae5d837 commit 3768e93
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 6 deletions.
9 changes: 9 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/**
* The current polling status of the consumer.
*/
export enum POLLING_STATUS {
ACTIVE,
WAITING,
INACTIVE,
READY
}
58 changes: 52 additions & 6 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
} from '@aws-sdk/client-sqs';

import { ConsumerOptions, StopOptions, UpdatableOptions } from './types';
import { POLLING_STATUS } from './constants';
import { TypedEventEmitter } from './emitter';
import { autoBind } from './bind';
import {
Expand Down Expand Up @@ -49,6 +50,9 @@ export class Consumer extends TypedEventEmitter {
private shouldDeleteMessages: boolean;
private alwaysAcknowledge: boolean;
private batchSize: number;
private concurrency: number;
private concurrentExecutions: number;
private pollingStatus: POLLING_STATUS;
private visibilityTimeout: number;
private terminateVisibilityTimeout: boolean;
private waitTimeSeconds: number;
Expand All @@ -69,6 +73,7 @@ export class Consumer extends TypedEventEmitter {
this.attributeNames = options.attributeNames || [];
this.messageAttributeNames = options.messageAttributeNames || [];
this.batchSize = options.batchSize || 1;
this.concurrency = options.concurrency ?? 1;
this.visibilityTimeout = options.visibilityTimeout;
this.terminateVisibilityTimeout =
options.terminateVisibilityTimeout || false;
Expand Down Expand Up @@ -146,7 +151,8 @@ export class Consumer extends TypedEventEmitter {
}

/**
* Returns the current polling state of the consumer: `true` if it is actively polling, `false` if it is not.
* Returns the current polling state of the consumer: `true` if it is
* actively polling, `false` if it is not.
*/
public get isRunning(): boolean {
return !this.stopped;
Expand Down Expand Up @@ -185,20 +191,50 @@ export class Consumer extends TypedEventEmitter {
}
}

/**
* Queue a poll to be executed after a timeout
* @param timeout The timeout to wait before polling
* @returns The timeout id
*/
private queuePoll(timeout?: number) {
if (this.pollingStatus !== POLLING_STATUS.WAITING) {
this.pollingStatus = POLLING_STATUS.WAITING;
if (this.pollingTimeoutId) {
clearTimeout(this.pollingTimeoutId);
}
this.pollingTimeoutId = setTimeout(this.poll, timeout);
}
}

/**
* Poll for new messages from SQS
*/
private poll(): void {
if (this.stopped) {
this.pollingStatus = POLLING_STATUS.INACTIVE;
logger.debug('cancelling_poll', {
detail: 'Poll was called while consumer was stopped, cancelling poll...'
});
return;
}

let currentPollingTimeout = this.pollingWaitTimeMs;

const isConcurrencyReached = this.concurrentExecutions >= this.concurrency;

if (isConcurrencyReached) {
logger.debug('reached_concurrency_limit', {
detail:
'The concurrency limit has been reached. Pausing before retrying.'
});
this.pollingStatus = POLLING_STATUS.READY;
this.queuePoll(currentPollingTimeout);
return;
}

logger.debug('polling');
this.pollingStatus = POLLING_STATUS.ACTIVE;

let currentPollingTimeout = this.pollingWaitTimeMs;
this.receiveMessage({
QueueUrl: this.queueUrl,
AttributeNames: this.attributeNames,
Expand All @@ -207,6 +243,10 @@ export class Consumer extends TypedEventEmitter {
WaitTimeSeconds: this.waitTimeSeconds,
VisibilityTimeout: this.visibilityTimeout
})
.then((response) => {
this.queuePoll(currentPollingTimeout);
return response;
})
.then(this.handleSqsResponse)
.catch((err) => {
this.emitError(err);
Expand All @@ -220,13 +260,15 @@ export class Consumer extends TypedEventEmitter {
return;
})
.then(() => {
if (this.pollingTimeoutId) {
clearTimeout(this.pollingTimeoutId);
}
this.pollingTimeoutId = setTimeout(this.poll, currentPollingTimeout);
this.queuePoll(currentPollingTimeout);
})
.catch((err) => {
this.emitError(err);
})
.finally(() => {
if (this.pollingStatus === POLLING_STATUS.ACTIVE) {
this.pollingStatus = POLLING_STATUS.INACTIVE;
}
});
}

Expand Down Expand Up @@ -270,12 +312,16 @@ export class Consumer extends TypedEventEmitter {
});
}, 1000);

this.concurrentExecutions += 1;

if (this.handleMessageBatch) {
await this.processMessageBatch(response.Messages);
} else {
await Promise.all(response.Messages.map(this.processMessage));
}

this.concurrentExecutions -= 1;

clearInterval(handlerProcessingDebugger);

this.emit('response_processed');
Expand Down
5 changes: 5 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ export interface ConsumerOptions {
* @defaultvalue `1`
*/
batchSize?: number;
/**
* The number of messages (or batches if `handleMessageBatch` is set) to
* process concurrently.
*/
concurrency?: number;
/**
* The duration (in seconds) that the received messages are hidden from subsequent
* retrieve requests after being retrieved by a ReceiveMessage request.
Expand Down
8 changes: 8 additions & 0 deletions src/validation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ function validateOption(
throw new Error('batchSize must be between 1 and 10.');
}
break;
case 'concurrency':
if (value < 1) {
throw new Error('concurrency must be greater than 0.');
}
break;
case 'heartbeatInterval':
if (
!allOptions.visibilityTimeout ||
Expand Down Expand Up @@ -75,6 +80,9 @@ function assertOptions(options: ConsumerOptions): void {
if (options.batchSize) {
validateOption('batchSize', options.batchSize, options);
}
if (options.concurrency) {
validateOption('concurrency', options.concurrency, options);
}
if (options.heartbeatInterval) {
validateOption('heartbeatInterval', options.heartbeatInterval, options);
}
Expand Down
35 changes: 35 additions & 0 deletions test/tests/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,17 @@ describe('Consumer', () => {
}, 'batchSize must be between 1 and 10.');
});

it('requires the concurrency option to be greater than 0', () => {
assert.throws(() => {
new Consumer({
region: REGION,
queueUrl: QUEUE_URL,
handleMessage,
concurrency: 0
});
}, 'concurrency must be greater than 0.');
});

it('requires visibilityTimeout to be set with heartbeatInterval', () => {
assert.throws(() => {
new Consumer({
Expand Down Expand Up @@ -1571,6 +1582,30 @@ describe('Consumer', () => {
sandbox.assert.notCalled(optionUpdatedListener);
});

it('updates the concurrency option and emits an event', () => {
const optionUpdatedListener = sandbox.stub();
consumer.on('option_updated', optionUpdatedListener);

consumer.updateOption('concurrency', 4);

assert.equal(consumer.concurrency, 4);

sandbox.assert.calledWithMatch(optionUpdatedListener, 'concurrency', 4);
});

it('does not update the concurrency if the value is less than 1', () => {
const optionUpdatedListener = sandbox.stub();
consumer.on('option_updated', optionUpdatedListener);

assert.throws(() => {
consumer.updateOption('concurrency', 0);
}, 'concurrency must be greater than 0.');

assert.equal(consumer.concurrency, 1);

sandbox.assert.notCalled(optionUpdatedListener);
});

it('updates the waitTimeSeconds option and emits an event', () => {
const optionUpdatedListener = sandbox.stub();
consumer.on('option_updated', optionUpdatedListener);
Expand Down

0 comments on commit 3768e93

Please sign in to comment.