diff --git a/README.md b/README.md index e6697458..6d11241c 100644 --- a/README.md +++ b/README.md @@ -132,6 +132,10 @@ Creates a new SQS consumer. - `attributeNames` - _Array_ - List of queue attributes to retrieve (i.e. `['All', 'ApproximateFirstReceiveTimestamp', 'ApproximateReceiveCount']`). - `messageAttributeNames` - _Array_ - List of message attributes to retrieve (i.e. `['name', 'address']`). - `batchSize` - _Number_ - The number of messages to request from SQS when polling (default `1`). This cannot be higher than the AWS limit of 10. + +* `concurrency` - _Number_ - The number of messages (or batches if `handleMessageBatch` is set) to process concurrently. This **can** be higher than the `batchSize` limit. +* `bufferMessages` - _Boolean_ - When enabled, maintains a buffer (up to the `batchSize`) of messages in an internal queue. When this option is enabled, the consumer does not need to wait for an entire batch of messages to be processed before moving on to the next one. Enabled by default when `concurrency` is set. + - `visibilityTimeout` - _Number_ - The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request. - `heartbeatInterval` - _Number_ - The interval (in seconds) between requests to extend the message visibility timeout. On each heartbeat the visibility is extended by adding `visibilityTimeout` to the number of seconds since the start of the handler function. This value must less than `visibilityTimeout`. - `terminateVisibilityTimeout` - _Boolean_ - If true, sets the message visibility timeout to 0 after a `processing_error` (defaults to `false`). diff --git a/package-lock.json b/package-lock.json index c2712f69..8f19457b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,16 +1,17 @@ { "name": "sqs-consumer", - "version": "5.7.0", + "version": "5.8.0", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "sqs-consumer", - "version": "5.7.0", + "version": "5.8.0", "license": "Apache-2.0", "dependencies": { "aws-sdk": "^2.1271.0", - "debug": "^4.3.4" + "debug": "^4.3.4", + "fastq": "^1.14.0" }, "devDependencies": { "@types/chai": "^4.3.4", @@ -2005,7 +2006,6 @@ "version": "1.14.0", "resolved": "https://registry.npmjs.org/fastq/-/fastq-1.14.0.tgz", "integrity": "sha512-eR2D+V9/ExcbF9ls441yIuN6TI2ED1Y2ZcA5BmMtJsOkWOFRJQ0Jt0g1UwqXJJVAb+V+umH5Dfr8oh4EVP7VVg==", - "dev": true, "dependencies": { "reusify": "^1.0.4" } @@ -3743,7 +3743,6 @@ "version": "1.0.4", "resolved": "https://registry.npmjs.org/reusify/-/reusify-1.0.4.tgz", "integrity": "sha512-U9nH88a3fc/ekCF1l0/UP1IosiuIjyTh7hBvXVMHYgVcfGvt897Xguj2UOLDeI5BG2m7/uwyaLVT6fbtCwTyzw==", - "dev": true, "engines": { "iojs": ">=1.0.0", "node": ">=0.10.0" @@ -5910,7 +5909,6 @@ "version": "1.14.0", "resolved": "https://registry.npmjs.org/fastq/-/fastq-1.14.0.tgz", "integrity": "sha512-eR2D+V9/ExcbF9ls441yIuN6TI2ED1Y2ZcA5BmMtJsOkWOFRJQ0Jt0g1UwqXJJVAb+V+umH5Dfr8oh4EVP7VVg==", - "dev": true, "requires": { "reusify": "^1.0.4" } @@ -7190,8 +7188,7 @@ "reusify": { "version": "1.0.4", "resolved": "https://registry.npmjs.org/reusify/-/reusify-1.0.4.tgz", - "integrity": "sha512-U9nH88a3fc/ekCF1l0/UP1IosiuIjyTh7hBvXVMHYgVcfGvt897Xguj2UOLDeI5BG2m7/uwyaLVT6fbtCwTyzw==", - "dev": true + "integrity": "sha512-U9nH88a3fc/ekCF1l0/UP1IosiuIjyTh7hBvXVMHYgVcfGvt897Xguj2UOLDeI5BG2m7/uwyaLVT6fbtCwTyzw==" }, "rimraf": { "version": "3.0.2", diff --git a/package.json b/package.json index c6790ac2..e4f0d76c 100644 --- a/package.json +++ b/package.json @@ -53,7 +53,8 @@ }, "dependencies": { "aws-sdk": "^2.1271.0", - "debug": "^4.3.4" + "debug": "^4.3.4", + "fastq": "^1.14.0" }, "peerDependencies": { "aws-sdk": "^2.1271.0" diff --git a/src/consumer.ts b/src/consumer.ts index 3df8a0ed..d3b070ca 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -4,6 +4,7 @@ import Debug from 'debug'; import { EventEmitter } from 'events'; import { autoBind } from './bind'; import { SQSError, TimeoutError } from './errors'; +import fastq from 'fastq'; const debug = Debug('sqs-consumer'); @@ -90,6 +91,8 @@ export interface ConsumerOptions { messageAttributeNames?: string[]; stopped?: boolean; batchSize?: number; + concurrency?: number; + bufferMessages?: boolean; visibilityTimeout?: number; waitTimeSeconds?: number; authenticationErrorTimeout?: number; @@ -115,6 +118,13 @@ interface Events { stopped: []; } +enum POLLING_STATUS { + ACTIVE, + WAITING, + INACTIVE, + READY +} + export class Consumer extends EventEmitter { private queueUrl: string; private handleMessage: (message: SQSMessage) => Promise; @@ -124,6 +134,8 @@ export class Consumer extends EventEmitter { private messageAttributeNames: string[]; private stopped: boolean; private batchSize: number; + private concurrency: number; + private bufferMessages: boolean; private visibilityTimeout: number; private waitTimeSeconds: number; private authenticationErrorTimeout: number; @@ -131,6 +143,8 @@ export class Consumer extends EventEmitter { private terminateVisibilityTimeout: boolean; private heartbeatInterval: number; private sqs: SQS; + private workQueue: fastq.queueAsPromised; + private pollingStatus: POLLING_STATUS; private shouldDeleteMessages: boolean; constructor(options: ConsumerOptions) { @@ -144,7 +158,10 @@ export class Consumer extends EventEmitter { this.messageAttributeNames = options.messageAttributeNames || []; this.stopped = true; this.batchSize = options.batchSize || 1; - this.visibilityTimeout = options.visibilityTimeout; + this.concurrency = + options.concurrency || (this.handleMessageBatch ? 1 : this.batchSize); + (this.bufferMessages = options.bufferMessages ?? !!options.concurrency), + (this.visibilityTimeout = options.visibilityTimeout); this.terminateVisibilityTimeout = options.terminateVisibilityTimeout || false; this.heartbeatInterval = options.heartbeatInterval; @@ -152,7 +169,11 @@ export class Consumer extends EventEmitter { this.authenticationErrorTimeout = options.authenticationErrorTimeout ?? 10000; this.pollingWaitTimeMs = options.pollingWaitTimeMs ?? 0; + this.pollingStatus = POLLING_STATUS.INACTIVE; this.shouldDeleteMessages = options.shouldDeleteMessages ?? true; + this.workQueue = this.handleMessageBatch + ? fastq.promise(this.executeBatchHandler.bind(this), this.concurrency) + : fastq.promise(this.executeHandler.bind(this), this.concurrency); this.sqs = options.sqs || @@ -233,7 +254,9 @@ export class Consumer extends EventEmitter { return this.changeVisibilityTimeout(message, this.visibilityTimeout); }); } - await this.executeHandler(message); + debug('pushed'); + await this.workQueue.push(message); + debug('done'); await this.deleteMessage(message); this.emit('message_processed', message); } catch (err) { @@ -244,6 +267,7 @@ export class Consumer extends EventEmitter { } } finally { clearInterval(heartbeat); + this.queuePoll(); } } @@ -333,10 +357,35 @@ export class Consumer extends EventEmitter { private poll(): void { if (this.stopped) { + this.pollingStatus === POLLING_STATUS.INACTIVE; this.emit('stopped'); return; } + if (this.pollingStatus === POLLING_STATUS.ACTIVE) { + debug('sqs polling already in progress'); + return; + } + + if (!this.bufferMessages && (this.workQueue as any).running() > 0) { + debug('work queue is not yet empty. not polling'); + this.pollingStatus = POLLING_STATUS.READY; + return; + } + + if (this.workQueue.length() > 0) { + debug('unstarted work in queue. not polling'); + this.pollingStatus = POLLING_STATUS.READY; + return; + } + + if ((this.workQueue as any).running() >= this.concurrency) { + debug('work queue at capacity, no need to poll'); + this.pollingStatus = POLLING_STATUS.READY; + return; + } + + this.pollingStatus = POLLING_STATUS.ACTIVE; debug('Polling for messages'); const receiveParams = { QueueUrl: this.queueUrl, @@ -349,7 +398,6 @@ export class Consumer extends EventEmitter { let currentPollingTimeout = this.pollingWaitTimeMs; this.receiveMessage(receiveParams) - .then(this.handleSqsResponse) .catch((err) => { this.emit('error', err); if (isConnectionError(err)) { @@ -358,14 +406,27 @@ export class Consumer extends EventEmitter { } return; }) - .then(() => { - setTimeout(this.poll, currentPollingTimeout); + .then((message) => { + this.queuePoll(currentPollingTimeout); + if (message) return this.handleSqsResponse(message); }) .catch((err) => { this.emit('error', err); + }) + .finally(() => { + if (this.pollingStatus === POLLING_STATUS.ACTIVE) { + this.pollingStatus = POLLING_STATUS.INACTIVE; + } }); } + private queuePoll(timeout?: number) { + if (this.pollingStatus !== POLLING_STATUS.WAITING) { + this.pollingStatus = POLLING_STATUS.WAITING; + setTimeout(this.poll, timeout ?? this.pollingWaitTimeMs); + } + } + private async processMessageBatch(messages: SQSMessage[]): Promise { messages.forEach((message) => { this.emit('message_received', message); @@ -381,7 +442,7 @@ export class Consumer extends EventEmitter { ); }); } - await this.executeBatchHandler(messages); + await this.workQueue.push(messages); await this.deleteMessageBatch(messages); messages.forEach((message) => { this.emit('message_processed', message); @@ -394,6 +455,7 @@ export class Consumer extends EventEmitter { } } finally { clearInterval(heartbeat); + this.queuePoll(); } } diff --git a/test/consumer.test.ts b/test/consumer.test.ts index b71b923e..2d5ace7c 100644 --- a/test/consumer.test.ts +++ b/test/consumer.test.ts @@ -352,7 +352,7 @@ describe('Consumer', () => { handleMessage, sqs, authenticationErrorTimeout: 20, - pollingWaitTimeMs: 100 + pollingWaitTimeMs: POLLING_TIMEOUT }); consumer.start(); @@ -712,15 +712,34 @@ describe('Consumer', () => { { Id: '3', ReceiptHandle: 'receipt-handle-3', VisibilityTimeout: 40 } ] }); - sandbox.assert.calledWith(sqs.changeMessageVisibilityBatch, { - QueueUrl: 'some-queue-url', - Entries: [ - { Id: '1', ReceiptHandle: 'receipt-handle-1', VisibilityTimeout: 40 }, - { Id: '2', ReceiptHandle: 'receipt-handle-2', VisibilityTimeout: 40 }, - { Id: '3', ReceiptHandle: 'receipt-handle-3', VisibilityTimeout: 40 } + sandbox.assert.calledOnce(clearIntervalSpy); + }); + + it('can process more messages than the batch limit', async () => { + sqs.receiveMessage = stubResolve({ + Messages: [ + { MessageId: '1', ReceiptHandle: 'receipt-handle-1', Body: 'body-1' }, + { MessageId: '2', ReceiptHandle: 'receipt-handle-2', Body: 'body-2' }, + { MessageId: '3', ReceiptHandle: 'receipt-handle-3', Body: 'body-3' } ] }); - sandbox.assert.calledOnce(clearIntervalSpy); + (handleMessage = sinon + .stub() + .callsFake(() => new Promise((resolve) => setTimeout(resolve, 100)))), + (consumer = new Consumer({ + queueUrl: 'some-queue-url', + region: 'some-region', + handleMessage, + batchSize: 3, + concurrency: 6, + sqs + })); + + consumer.start(); + await clock.tickAsync(100); + consumer.stop(); + + sandbox.assert.callCount(handleMessage, 6); }); it('emit error when changing visibility timeout fails', async () => { @@ -823,6 +842,7 @@ describe('Consumer', () => { consumer.start(); consumer.stop(); + await clock.runAllAsync(); consumer.start(); consumer.stop(); await clock.runAllAsync(); diff --git a/tsconfig.json b/tsconfig.json index 18bad39c..2cd52b17 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -7,7 +7,8 @@ "moduleResolution": "node", "allowJs": false, "noUnusedLocals": true, - "declaration": true + "declaration": true, + "esModuleInterop": true }, "include": ["src/**/*"], "exclude": ["node_modules", "dist"]