Skip to content

Commit

Permalink
fixes and improvements (#11)
Browse files Browse the repository at this point in the history
* Fix bug with not-awaited emits and endless cycle problem
* Bump dependencies
  • Loading branch information
if0s authored Jul 21, 2021
1 parent e595cf3 commit 0939dae
Show file tree
Hide file tree
Showing 9 changed files with 5,644 additions and 2,604 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
key: dependency-cache-{{ checksum "package.json" }}
- run:
name: Audit Dependencies
command: npm audit --audit-level=high
command: npm audit --audit-level=critical
- run:
name: Installing Dependencies
command: npm install
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ node_modules
coverage
.idea
.env
.nyc_output
.vscode
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# 1.7.3 (July 23, 2021)
* Fix bug with not-awaited emits and endless cycle problem
* Bump dependencies

# 1.7.2 (March 25, 2021)

* Update Sailor version to 2.6.24
Expand Down
1 change: 1 addition & 0 deletions component.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"title": "Google Pubsub",
"description": "Interact with Google Pub-Sub API",
"buildType": "docker",
"version": "1.7.3",
"env": [
"GOOGLE_APP_ID",
"GOOGLE_APP_SECRET"
Expand Down
6 changes: 3 additions & 3 deletions lib/actions/pub.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ async function processAction(msg, cfg) {
};
this.logger.info('Message sent with ID:', messageId);
const data = messages.newMessageWithBody(body);
self.emit('data', data);
await self.emit('data', data);
} catch (err) {
this.logger.info('Oops! Error occurred');
self.emit('error', err);
await self.emit('error', err);
}

self.logger.info('Finished execution');
self.emit('end');
await self.emit('end');
}

module.exports.process = processAction;
75 changes: 39 additions & 36 deletions lib/triggers/pull.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,44 +42,47 @@ async function processAction(msg, cfg) {
}
}

for (;;) {
try {
// The subscriber pulls a specified number of messages.
const [response] = await subClient.pull({
subscription: formattedSubscription,
maxMessages: 100,
returnImmediately: true,
});
// Process the messages.
const ackIds = [];
let newMsg;
// eslint-disable-next-line no-restricted-syntax
for (const message of response.receivedMessages) {
self.logger.info(`Message received with ID: ${message.message.messageId}`);
ackIds.push(message.ackId);
newMsg = messages.newMessageWithBody(message.message.data.toString());
newMsg.headers = message.message.attributes || {};
newMsg.id = message.message.messageId;
if (message.message.publishTime) {
newMsg.headers.publishTime = message.message.publishTime;
}
// eslint-disable-next-line no-await-in-loop
await self.emit('data', messages.newMessageWithBody(newMsg));
}
// Acknowledge all of the messages. You could also acknowledge
// these individually, but this is more efficient.
if (ackIds.length) {
const ackRequest = {
subscription: formattedSubscription,
ackIds,
};
self.logger.debug('Acknowledging retrieved messages...');
await subClient.acknowledge(ackRequest);
try {
// The subscriber pulls a specified number of messages.
const [response] = await subClient.pull({
subscription: formattedSubscription,
maxMessages: 100,
returnImmediately: true,
});
self.logger.info(`Total messeges received: ${response.receivedMessages.length}`);
if (response.receivedMessages.length === 0) {
await self.emit('end');
return;
}
// Process the messages.
const ackIds = [];
let newMsg;
// eslint-disable-next-line no-restricted-syntax
for (const message of response.receivedMessages) {
self.logger.info(`Message received with ID: ${message.message.messageId}`);
ackIds.push(message.ackId);
newMsg = messages.newMessageWithBody(message.message.data.toString());
newMsg.headers = message.message.attributes || {};
newMsg.id = message.message.messageId;
if (message.message.publishTime) {
newMsg.headers.publishTime = message.message.publishTime;
}
} catch (err) {
this.logger.info('Oops! Error occurred');
throw err;
// eslint-disable-next-line no-await-in-loop
await self.emit('data', messages.newMessageWithBody(newMsg));
}
// Acknowledge all of the messages. You could also acknowledge
// these individually, but this is more efficient.
if (ackIds.length) {
const ackRequest = {
subscription: formattedSubscription,
ackIds,
};
self.logger.debug('Acknowledging retrieved messages...');
await subClient.acknowledge(ackRequest);
}
} catch (err) {
this.logger.info('Oops! Error occurred');
throw err;
}
}
await synchronousPull();
Expand Down
Loading

0 comments on commit 0939dae

Please sign in to comment.