Skip to content

Commit

Permalink
feat: add default publish timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
luddd3 committed Aug 31, 2021
1 parent dee380d commit 6826be2
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 39 deletions.
51 changes: 29 additions & 22 deletions src/ChannelWrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ export interface CreateChannelOpts {
* These will be encoded automatically before being sent.
*/
json?: boolean;
/**
* Default publish timeout in ms. Messages not published within the given time are rejected with a timeout error.
*/
publishTimeout?: number;
}

interface PublishMessage {
Expand Down Expand Up @@ -135,6 +139,11 @@ export default class ChannelWrapper extends EventEmitter {
*/
private _channelHasRoom = true;

/**
* Default publish timeout
*/
private _publishTimeout?: number;

public name?: string;

addListener(event: string, listener: (...args: any[]) => void): this;
Expand Down Expand Up @@ -276,6 +285,7 @@ export default class ChannelWrapper extends EventEmitter {
return pb.addCallback(
done,
new Promise<boolean>((resolve, reject) => {
const { timeout, ...opts } = options || {};
this._enqueueMessage(
{
type: 'publish',
Expand All @@ -284,9 +294,10 @@ export default class ChannelWrapper extends EventEmitter {
content,
resolve,
reject,
options: opts,
isTimedout: false,
},
options
timeout || this._publishTimeout
);
this._startWorker();
})
Expand All @@ -311,44 +322,39 @@ export default class ChannelWrapper extends EventEmitter {
return pb.addCallback(
done,
new Promise<boolean>((resolve, reject) => {
const { timeout, ...opts } = options || {};
this._enqueueMessage(
{
type: 'sendToQueue',
queue,
content,
resolve,
reject,
options: opts,
isTimedout: false,
},
options
timeout || this._publishTimeout
);
this._startWorker();
})
);
}

private _enqueueMessage(message: Message, options?: PublishOptions) {
if (options) {
if (options.timeout) {
const { timeout, ...opts } = options;
message.timeout = setTimeout(() => {
let idx = this._messages.indexOf(message);
private _enqueueMessage(message: Message, timeout?: number) {
if (timeout) {
message.timeout = setTimeout(() => {
let idx = this._messages.indexOf(message);
if (idx !== -1) {
this._messages.splice(idx, 1);
} else {
idx = this._unconfirmedMessages.indexOf(message);
if (idx !== -1) {
this._messages.splice(idx, 1);
} else {
idx = this._unconfirmedMessages.indexOf(message);
if (idx !== -1) {
this._unconfirmedMessages.splice(idx, 1);
}
this._unconfirmedMessages.splice(idx, 1);
}

message.isTimedout = true;
message.reject(new Error('timeout'));
}, timeout);
message.options = opts;
} else {
message.options = options;
}
}
message.isTimedout = true;
message.reject(new Error('timeout'));
}, timeout);
}
this._messages.push(message);
}
Expand All @@ -374,6 +380,7 @@ export default class ChannelWrapper extends EventEmitter {
this._connectionManager = connectionManager;
this.name = options.name;

this._publishTimeout = options.publishTimeout;
this._json = options.json ?? false;

// Array of setup functions to call.
Expand Down
36 changes: 19 additions & 17 deletions test/ChannelWrapperTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ describe('ChannelWrapper', function () {
'exchange',
'routingKey',
Buffer.from('argleblargle'),
undefined,
{},
]);
expect(channelWrapper.queueLength(), 'queue length').to.equal(0);
});
Expand Down Expand Up @@ -392,27 +392,29 @@ describe('ChannelWrapper', function () {
).to.equal(0);
});

it('should timeout published messages', async function () {
it('should timeout published message', async function () {
const channelWrapper = new ChannelWrapper(connectionManager);

const startTime = Date.now();
const errors = await Promise.all([
channelWrapper
.publish('exchange', 'routingKey', 'argleblargle', {
messageId: 'foo',
timeout: 100,
})
.catch((err) => err),
channelWrapper
.sendToQueue('queue', 'argleblargle', {
messageId: 'foo',
timeout: 100,
})
.catch((err) => err),
]);
const error = await channelWrapper
.publish('exchange', 'routingKey', 'argleblargle', {
timeout: 100,
})
.catch((err) => err);
const duration = Date.now() - startTime;
expect(error.message).to.equal('timeout');
expect(duration).to.be.approximately(100, 10);
});

it('should use default timeout for published messages', async function () {
const channelWrapper = new ChannelWrapper(connectionManager, { publishTimeout: 100 });

expect(errors.map((e) => e.message)).to.deep.equal(['timeout', 'timeout']);
const startTime = Date.now();
const error = await channelWrapper
.publish('exchange', 'routingKey', 'argleblargle')
.catch((err) => err);
const duration = Date.now() - startTime;
expect(error.message).to.equal('timeout');
expect(duration).to.be.approximately(100, 10);
});

Expand Down

0 comments on commit 6826be2

Please sign in to comment.