diff --git a/package.json b/package.json index c7315ad..f9e46b9 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "squiss-ts", - "version": "4.0.3", + "version": "4.0.4", "description": "High-volume SQS poller", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/src/Message.ts b/src/Message.ts index eeb3495..4cfe2c1 100644 --- a/src/Message.ts +++ b/src/Message.ts @@ -27,6 +27,7 @@ interface SNSBody { } interface IMessageEvents { + error: Error; delQueued: void; handled: void; released: void; @@ -150,8 +151,9 @@ export class Message extends (EventEmitter as new() => MessageEmitter) { if (!this._handled) { this._handled = true; return this._squiss.releaseMessage(this) - .catch(() => { + .catch((e) => { this._handled = false; + this.emit('error', e); }); } return Promise.resolve(); diff --git a/src/Squiss.ts b/src/Squiss.ts index b860296..2ddfcc7 100644 --- a/src/Squiss.ts +++ b/src/Squiss.ts @@ -207,11 +207,12 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) { public releaseMessage(msg: Message): Promise { this.handledMessage(msg); - return this.changeMessageVisibility(msg, 0).then((res) => { - msg.emit('released'); - this.emit('released', msg); - return res; - }); + return this.changeMessageVisibility(msg, 0) + .then((res) => { + msg.emit('released'); + this.emit('released', msg); + return res; + }); } public purgeQueue(): Promise { @@ -366,6 +367,10 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) { message.parse() .then(() => { this.emit('message', message); + }) + .catch((e: Error) => { + this.emit('error', e); + message.release(); }); }); } diff --git a/src/test/src/Message.spec.ts b/src/test/src/Message.spec.ts index 208c8bd..0c160d2 100644 --- a/src/test/src/Message.spec.ts +++ b/src/test/src/Message.spec.ts @@ -5,6 +5,8 @@ import {SquissStub} from '../stubs/SquissStub'; import {SQS, S3} from 'aws-sdk'; import {Blobs, S3Stub} from '../stubs/S3Stub'; import delay from 'delay'; +// @ts-ignore +import * as sinon from 'sinon'; const wait = (ms?: number) => delay(ms === undefined ? 20 : ms); @@ -393,15 +395,21 @@ describe('Message', () => { bodyFormat: 'json', squiss: { releaseMessage: (toDel: Message) => { - return Promise.reject(); + return Promise.reject(new Error('failed to release')); }, } as any as Squiss, s3Retriever: getS3Stub(), s3Retain: false, }); + const errorSpy = sinon.spy(); + msg.on('error', errorSpy); return msg.release() .then(() => { msg.isHandled().should.eql(false); + return wait().then(() => { + errorSpy.should.be.calledOnce(); + errorSpy.should.be.calledWith(sinon.match.instanceOf(Error)); + }); }); }); it('calls Squiss.handledMessage on keep', (done) => { diff --git a/src/test/src/index.spec.ts b/src/test/src/index.spec.ts index a06f7cf..a6efa66 100644 --- a/src/test/src/index.spec.ts +++ b/src/test/src/index.spec.ts @@ -330,6 +330,22 @@ describe('index', () => { ]); }); }); + it('emits error on message parse error', () => { + const msgSpy = sinon.spy(); + const errorSpy = sinon.spy(); + inst = new SquissPatched({bodyFormat: 'json', queueUrl: 'foo', maxInFlight: 15, + receiveBatchSize: 10} as ISquissOptions); + inst!.sqs = new SQSStub(0, 0) as any as SQS; + inst!.on('message', msgSpy); + inst!.on('error', errorSpy); + inst!.start(); + inst!.sendMessage('{sdfsdf'); + return wait().then(() => { + msgSpy.should.not.be.called(); + errorSpy.should.be.calledOnce(); + errorSpy.should.be.calledWith(sinon.match.instanceOf(Error)); + }); + }); it('emits queueEmpty event with no messages', () => { const msgSpy = sinon.spy(); const qeSpy = sinon.spy();