Skip to content

Commit

Permalink
failed parse messages is not handled #48 (#49)
Browse files Browse the repository at this point in the history
  • Loading branch information
regevbr authored Aug 12, 2019
1 parent 5b66c7c commit bf0279b
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 8 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "squiss-ts",
"version": "4.0.3",
"version": "4.0.4",
"description": "High-volume SQS poller",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand Down
4 changes: 3 additions & 1 deletion src/Message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ interface SNSBody {
}

interface IMessageEvents {
error: Error;
delQueued: void;
handled: void;
released: void;
Expand Down Expand Up @@ -150,8 +151,9 @@ export class Message extends (EventEmitter as new() => MessageEmitter) {
if (!this._handled) {
this._handled = true;
return this._squiss.releaseMessage(this)
.catch(() => {
.catch((e) => {
this._handled = false;
this.emit('error', e);
});
}
return Promise.resolve();
Expand Down
15 changes: 10 additions & 5 deletions src/Squiss.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,12 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) {

public releaseMessage(msg: Message): Promise<void> {
this.handledMessage(msg);
return this.changeMessageVisibility(msg, 0).then((res) => {
msg.emit('released');
this.emit('released', msg);
return res;
});
return this.changeMessageVisibility(msg, 0)
.then((res) => {
msg.emit('released');
this.emit('released', msg);
return res;
});
}

public purgeQueue(): Promise<void> {
Expand Down Expand Up @@ -366,6 +367,10 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) {
message.parse()
.then(() => {
this.emit('message', message);
})
.catch((e: Error) => {
this.emit('error', e);
message.release();
});
});
}
Expand Down
10 changes: 9 additions & 1 deletion src/test/src/Message.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import {SquissStub} from '../stubs/SquissStub';
import {SQS, S3} from 'aws-sdk';
import {Blobs, S3Stub} from '../stubs/S3Stub';
import delay from 'delay';
// @ts-ignore
import * as sinon from 'sinon';

const wait = (ms?: number) => delay(ms === undefined ? 20 : ms);

Expand Down Expand Up @@ -393,15 +395,21 @@ describe('Message', () => {
bodyFormat: 'json',
squiss: {
releaseMessage: (toDel: Message) => {
return Promise.reject();
return Promise.reject(new Error('failed to release'));
},
} as any as Squiss,
s3Retriever: getS3Stub(),
s3Retain: false,
});
const errorSpy = sinon.spy();
msg.on('error', errorSpy);
return msg.release()
.then(() => {
msg.isHandled().should.eql(false);
return wait().then(() => {
errorSpy.should.be.calledOnce();
errorSpy.should.be.calledWith(sinon.match.instanceOf(Error));
});
});
});
it('calls Squiss.handledMessage on keep', (done) => {
Expand Down
16 changes: 16 additions & 0 deletions src/test/src/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,22 @@ describe('index', () => {
]);
});
});
it('emits error on message parse error', () => {
const msgSpy = sinon.spy();
const errorSpy = sinon.spy();
inst = new SquissPatched({bodyFormat: 'json', queueUrl: 'foo', maxInFlight: 15,
receiveBatchSize: 10} as ISquissOptions);
inst!.sqs = new SQSStub(0, 0) as any as SQS;
inst!.on('message', msgSpy);
inst!.on('error', errorSpy);
inst!.start();
inst!.sendMessage('{sdfsdf');
return wait().then(() => {
msgSpy.should.not.be.called();
errorSpy.should.be.calledOnce();
errorSpy.should.be.calledWith(sinon.match.instanceOf(Error));
});
});
it('emits queueEmpty event with no messages', () => {
const msgSpy = sinon.spy();
const qeSpy = sinon.spy();
Expand Down

0 comments on commit bf0279b

Please sign in to comment.