diff --git a/src/consumer.ts b/src/consumer.ts index 44f24d1..170424d 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -315,6 +315,7 @@ export class Consumer extends TypedEventEmitter { err, `SQS receive message failed: ${err.message}`, this.extendedAWSErrors, + this.queueUrl, ); } } @@ -479,6 +480,8 @@ export class Consumer extends TypedEventEmitter { err, `Error changing visibility timeout: ${err.message}`, this.extendedAWSErrors, + this.queueUrl, + message, ), message, ); @@ -514,6 +517,8 @@ export class Consumer extends TypedEventEmitter { err, `Error changing visibility timeout: ${err.message}`, this.extendedAWSErrors, + this.queueUrl, + messages, ), messages, ); @@ -549,12 +554,14 @@ export class Consumer extends TypedEventEmitter { throw toTimeoutError( err, `Message handler timed out after ${this.handleMessageTimeout}ms: Operation timed out.`, + message, ); } if (err instanceof Error) { throw toStandardError( err, `Unexpected message handler failure: ${err.message}`, + message, ); } throw err; @@ -581,6 +588,7 @@ export class Consumer extends TypedEventEmitter { throw toStandardError( err, `Unexpected message handler failure: ${err.message}`, + messages, ); } throw err; @@ -616,6 +624,8 @@ export class Consumer extends TypedEventEmitter { err, `SQS delete message failed: ${err.message}`, this.extendedAWSErrors, + this.queueUrl, + message, ); } } @@ -654,6 +664,8 @@ export class Consumer extends TypedEventEmitter { err, `SQS delete message failed: ${err.message}`, this.extendedAWSErrors, + this.queueUrl, + messages, ); } } diff --git a/src/errors.ts b/src/errors.ts index 5fbcb85..d196933 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -1,3 +1,5 @@ +import { Message } from "@aws-sdk/client-sqs"; + import { AWSError } from "./types.js"; class SQSError extends Error { @@ -9,6 +11,8 @@ class SQSError extends Error { fault: AWSError["$fault"]; response?: AWSError["$response"]; metadata?: AWSError["$metadata"]; + queueUrl?: string; + messageIds?: string[]; constructor(message: string) { super(message); @@ -17,6 +21,7 @@ class SQSError extends Error { } class TimeoutError extends Error { + messageIds: string[]; cause: Error; time: Date; @@ -24,10 +29,12 @@ class TimeoutError extends Error { super(message); this.message = message; this.name = "TimeoutError"; + this.messageIds = []; } } class StandardError extends Error { + messageIds: string[]; cause: Error; time: Date; @@ -35,6 +42,7 @@ class StandardError extends Error { super(message); this.message = message; this.name = "StandardError"; + this.messageIds = []; } } @@ -64,6 +72,17 @@ function isConnectionError(err: Error): boolean { return false; } +/** + * Gets the message IDs from the message. + * @param message The message that was received from SQS. + */ +function getMessageIds(message: Message | Message[]): string[] { + if (Array.isArray(message)) { + return message.map((m) => m.MessageId); + } + return [message.MessageId]; +} + /** * Formats an AWSError the the SQSError type. * @param err The error object that was received. @@ -73,6 +92,8 @@ function toSQSError( err: AWSError, message: string, extendedAWSErrors: boolean, + queueUrl?: string, + sqsMessage?: Message | Message[], ): SQSError { const sqsError = new SQSError(message); sqsError.code = err.name; @@ -87,6 +108,14 @@ function toSQSError( sqsError.metadata = err.$metadata; } + if (queueUrl) { + sqsError.queueUrl = queueUrl; + } + + if (sqsMessage) { + sqsError.messageIds = getMessageIds(sqsMessage); + } + return sqsError; } @@ -94,11 +123,17 @@ function toSQSError( * Formats an Error to the StandardError type. * @param err The error object that was received. * @param message The message to send with the error. + * @param sqsMessage The message that was received from SQS. */ -function toStandardError(err: Error, message: string): StandardError { +function toStandardError( + err: Error, + message: string, + sqsMessage: Message | Message[], +): StandardError { const error = new StandardError(message); error.cause = err; error.time = new Date(); + error.messageIds = getMessageIds(sqsMessage); return error; } @@ -107,11 +142,17 @@ function toStandardError(err: Error, message: string): StandardError { * Formats an Error to the TimeoutError type. * @param err The error object that was received. * @param message The message to send with the error. + * @param sqsMessage The message that was received from SQS. */ -function toTimeoutError(err: TimeoutError, message: string): TimeoutError { +function toTimeoutError( + err: TimeoutError, + message: string, + sqsMessage: Message | Message[], +): TimeoutError { const error = new TimeoutError(message); error.cause = err; error.time = new Date(); + error.messageIds = getMessageIds(sqsMessage); return error; } diff --git a/test/tests/consumer.test.ts b/test/tests/consumer.test.ts index 974277c..72caf07 100644 --- a/test/tests/consumer.test.ts +++ b/test/tests/consumer.test.ts @@ -466,6 +466,7 @@ describe("Consumer", () => { "Unexpected message handler failure: Processing error", ); assert.equal(message.MessageId, "123"); + assert.deepEqual((err as any).messageIds, ["123"]); }); it("fires an `error` event when an `SQSError` occurs processing a message", async () => { @@ -1674,6 +1675,8 @@ describe("Consumer", () => { assert.ok(err); assert.equal(err.message, "Error changing visibility timeout: failed"); + assert.equal(err.queueUrl, QUEUE_URL); + assert.deepEqual(err.messageIds, ["1"]); }); it("emit error when changing visibility timeout fails for batch handler functions", async () => { @@ -1706,6 +1709,156 @@ describe("Consumer", () => { assert.ok(err); assert.equal(err.message, "Error changing visibility timeout: failed"); + assert.equal(err.queueUrl, QUEUE_URL); + assert.deepEqual(err.messageIds, ["1", "2"]); + }); + + it("includes messageIds in timeout errors", async () => { + const handleMessageTimeout = 500; + consumer = new Consumer({ + queueUrl: QUEUE_URL, + region: REGION, + handleMessage: () => + new Promise((resolve) => setTimeout(resolve, 1000)), + handleMessageTimeout, + sqs, + authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT, + }); + + consumer.start(); + const [err]: any = await Promise.all([ + pEvent(consumer, "timeout_error"), + clock.tickAsync(handleMessageTimeout), + ]); + consumer.stop(); + + assert.ok(err); + assert.equal( + err.message, + `Message handler timed out after ${handleMessageTimeout}ms: Operation timed out.`, + ); + assert.deepEqual(err.messageIds, ["123"]); + }); + + it("includes messageIds in batch processing errors", async () => { + sqs.send.withArgs(mockReceiveMessage).resolves({ + Messages: [ + { MessageId: "1", ReceiptHandle: "receipt-handle-1", Body: "body-1" }, + { MessageId: "2", ReceiptHandle: "receipt-handle-2", Body: "body-2" }, + ], + }); + + consumer = new Consumer({ + queueUrl: QUEUE_URL, + region: REGION, + handleMessageBatch: () => { + throw new Error("Batch processing error"); + }, + batchSize: 2, + sqs, + authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT, + }); + + consumer.start(); + const [err]: any = await Promise.all([ + pEvent(consumer, "error"), + clock.tickAsync(100), + ]); + consumer.stop(); + + assert.ok(err); + assert.equal( + err.message, + "Unexpected message handler failure: Batch processing error", + ); + assert.deepEqual(err.messageIds, ["1", "2"]); + }); + + it("includes queueUrl and messageIds in SQS errors when deleting message", async () => { + const deleteErr = new Error("Delete error"); + deleteErr.name = "SQSError"; + + handleMessage.resolves(null); + sqs.send.withArgs(mockDeleteMessage).rejects(deleteErr); + + consumer.start(); + const [err]: any = await Promise.all([ + pEvent(consumer, "error"), + clock.tickAsync(100), + ]); + consumer.stop(); + + assert.ok(err); + assert.equal(err.message, "SQS delete message failed: Delete error"); + assert.equal(err.queueUrl, QUEUE_URL); + assert.deepEqual(err.messageIds, ["123"]); + }); + + it("includes queueUrl and messageIds in SQS errors when changing visibility timeout", async () => { + sqs.send.withArgs(mockReceiveMessage).resolves({ + Messages: [ + { MessageId: "1", ReceiptHandle: "receipt-handle-1", Body: "body-1" }, + ], + }); + consumer = new Consumer({ + queueUrl: QUEUE_URL, + region: REGION, + handleMessage: () => + new Promise((resolve) => setTimeout(resolve, 75000)), + sqs, + visibilityTimeout: 40, + heartbeatInterval: 30, + }); + + const receiveErr = new MockSQSError("failed"); + sqs.send.withArgs(mockChangeMessageVisibility).rejects(receiveErr); + + consumer.start(); + const [err]: any = await Promise.all([ + pEvent(consumer, "error"), + clock.tickAsync(75000), + ]); + consumer.stop(); + + assert.ok(err); + assert.equal(err.message, "Error changing visibility timeout: failed"); + assert.equal(err.queueUrl, QUEUE_URL); + assert.deepEqual(err.messageIds, ["1"]); + }); + + it("includes queueUrl and messageIds in batch SQS errors", async () => { + sqs.send.withArgs(mockReceiveMessage).resolves({ + Messages: [ + { MessageId: "1", ReceiptHandle: "receipt-handle-1", Body: "body-1" }, + { MessageId: "2", ReceiptHandle: "receipt-handle-2", Body: "body-2" }, + ], + }); + + consumer = new Consumer({ + queueUrl: QUEUE_URL, + region: REGION, + handleMessageBatch: () => + new Promise((resolve) => setTimeout(resolve, 75000)), + sqs, + batchSize: 2, + visibilityTimeout: 40, + heartbeatInterval: 30, + }); + + const receiveErr = new MockSQSError("failed"); + sqs.send.withArgs(mockChangeMessageVisibilityBatch).rejects(receiveErr); + + consumer.start(); + const [err]: any = await Promise.all([ + pEvent(consumer, "error"), + clock.tickAsync(75000), + ]); + consumer.stop(); + + assert.ok(err); + assert.equal(err.message, "Error changing visibility timeout: failed"); + assert.equal(err.queueUrl, QUEUE_URL); + assert.deepEqual(err.messageIds, ["1", "2"]); }); });