You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
After updating from an older version to v3.7.0, I encountered an issue where messages will stop being sent in certain cases.
I've narrowed it down, and the issue first occurs with v3.5.2, where the batch sending is introduced.
I send around 4000 messages to the same channel in a loop, and await all the sendToQueue(...) calls via Promise.all. After successfully sending most of them, ~3500 or so, the processing just stops, and none of the remaining messages are sent.
If I then send more messages to the channel, the processing starts again and the remaining messages are sent.
Apparently, the logic in _publishQueuedMessages(...) has a flaw.
It looks something like this:
private _publishQueuedMessages(workerNumber: number): void {
const channel = this._channel;
if (
!channel ||
!this._shouldPublish() ||
!this._working ||
workerNumber !== this._workerNumber
) {
// Can't publish anything right now...
this._working = false;
return;
}
try {
// Send messages in batches of 1000 - don't want to starve the event loop.
let sendsLeft = MAX_MESSAGES_PER_BATCH;
while (this._channelHasRoom && this._messages.length > 0 && sendsLeft > 0) {
// <snip --- send some messages ... --- >
}
// If we didn't send all the messages, send some more...
if (this._channelHasRoom && this._messages.length > 0) {
setImmediate(() => this._publishQueuedMessages(workerNumber));
}
this._working = false;
/* istanbul ignore next */
} catch (err) {
this._working = false;
this.emit('error', err);
}
}
I'm guessing that this should trigger the sending of a new batch: setImmediate(() => this._publishQueuedMessages(workerNumber));
but this._working = false; happens unconditionally, so when _publishQueuedMessages is called on the next tick, then the tests at the start of the method will cause it to return immediately without doing anything.
I've confirmed that doing this instead will make the issue go away:
// If we didn't send all the messages, send some more...
if (this._channelHasRoom && this._messages.length > 0) {
setImmediate(() => this._publishQueuedMessages(workerNumber));
} else {
this._working = false;
}
But I'm not sure if this is a good general fix.
The text was updated successfully, but these errors were encountered:
Hi.
After updating from an older version to v3.7.0, I encountered an issue where messages will stop being sent in certain cases.
I've narrowed it down, and the issue first occurs with v3.5.2, where the batch sending is introduced.
I send around 4000 messages to the same channel in a loop, and await all the
sendToQueue(...)
calls viaPromise.all
. After successfully sending most of them, ~3500 or so, the processing just stops, and none of the remaining messages are sent.If I then send more messages to the channel, the processing starts again and the remaining messages are sent.
Apparently, the logic in
_publishQueuedMessages(...)
has a flaw.It looks something like this:
I'm guessing that this should trigger the sending of a new batch:
setImmediate(() => this._publishQueuedMessages(workerNumber));
but
this._working = false;
happens unconditionally, so when_publishQueuedMessages
is called on the next tick, then the tests at the start of the method will cause it to return immediately without doing anything.I've confirmed that doing this instead will make the issue go away:
But I'm not sure if this is a good general fix.
The text was updated successfully, but these errors were encountered: