Skip to content

Commit

Permalink
Merge pull request #30 from nasa/CUMULUS-1896
Browse files Browse the repository at this point in the history
Cumulus 1896
  • Loading branch information
Jkovarik authored May 5, 2020
2 parents be6d470 + 5108094 commit f9c78b0
Show file tree
Hide file tree
Showing 8 changed files with 3,049 additions and 3,369 deletions.
2 changes: 1 addition & 1 deletion .eslintrc.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"root": true,
"extends": "airbnb",
"extends": "airbnb-base",
"plugins": ["eslint-plugin-jsdoc"],
"parser": "babel-eslint",
"env": {
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/).

## [v.1.6.0]

## BREAKING CHANGES

- **CUMULUS-1896** - Updates to the [cumulus-message-adapter-js](https://github.com/nasa/cumulus-message-adapter-js) library and Cumulus core required an update to this image to utilize async handlers. See [Node.js Lambda documentation](https://docs.aws.amazon.com/lambda/latest/dg/nodejs-handler.html) for more information on async handlers. Users utilizing this module should update their lambdas to utilize an async handler style.


## [v1.5.1]

### Fixed
Expand Down
4 changes: 2 additions & 2 deletions bin/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ const cliOptions = createCliOptions([
{
name: 'heartbeat',
default: null,
help: 'interval in milliseconds between sending heartbeat messages to the state machine. ' +
'default is null, which disables the heartbeat'
help: 'interval in milliseconds between sending heartbeat messages to the state machine. '
+ 'default is null, which disables the heartbeat'
},
{
name: 'help',
Expand Down
82 changes: 34 additions & 48 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ function tryToDownloadFile(url, destinationFilename) {
}

// eslint-disable-next-line require-jsdoc
const getLogSenderFromLambdaId = (lambdaId) =>
`cumulus-ecs-task/${getFunctionName(lambdaId)}`;
const getLogSenderFromLambdaId = (lambdaId) => `cumulus-ecs-task/${getFunctionName(lambdaId)}`;

/**
* Download a URL and save it to a file. If an ETIMEDOUT error is received,
Expand All @@ -69,24 +68,19 @@ const getLogSenderFromLambdaId = (lambdaId) =>
* @returns {Promise<undefined>} resolves when file has been downloaded
*/
function downloadFile(url, destinationFilename) {
return pRetry(() =>
tryToDownloadFile(url, destinationFilename)
.catch((err) => {
if (err.code === 'ETIMEDOUT') {
throw err;
}

throw new pRetry.AbortError(err);
}
)
);
return pRetry(() => tryToDownloadFile(url, destinationFilename).catch((err) => {
if (err.code === 'ETIMEDOUT') {
throw err;
}
throw new pRetry.AbortError(err);
}));
}

/**
* Downloads an array of layers from AWS
*
* @param {Array<Object>} layers - list of layer config objects to download
* @param {Array<string>} layersDir - path to download the files to, generally '/opt'
* @param {Array<Object>} layers - list of layer config objects to download
* @param {Array<string>} layersDir - path to download the files to, generally '/opt'
* @returns {Promise<Array>} - returns an array of promises that resolve to a
* filepath strings to downloaded layer .zips
*/
Expand All @@ -96,7 +90,7 @@ async function downloadLayers(layers, layersDir) {
const filePath = `${layersDir}/${getFunctionName(layer.LayerArn)}.zip`;
return downloadFile(layer.Content.Location, filePath).then(() => filePath);
});
return await Promise.all(layerDownloadPromises);
return Promise.all(layerDownloadPromises);
}

/**
Expand Down Expand Up @@ -179,7 +173,7 @@ async function installLambdaFunction(lambdaArn, workDir, taskDir, layerDir) {

setCumulusMessageAdapterPath(taskDir, layerDir);

const task = require(`${taskDir}/${resp.moduleFileName}`); //eslint-disable-line global-require
const task = require(`${taskDir}/${resp.moduleFileName}`); //eslint-disable-line import/no-dynamic-require,global-require
return task[resp.moduleFunctionName];
}

Expand Down Expand Up @@ -264,17 +258,8 @@ async function getActivityTask(activityArn) {
* @param {Function} handler - the lambda function to execute
* @returns {Promise} the lambda functions response
**/
function handleResponse(event, handler) {
const context = { via: 'ECS' };

return new Promise((resolve, reject) => {
handler(event, context, (err, output) => {
if (err) {
return reject(err);
}
return resolve(output);
});
});
async function handleResponse(event, handler) {
return handler(event, { via: 'ECS' });
}

/**
Expand Down Expand Up @@ -325,17 +310,16 @@ async function runTask(options) {
assert(!options.layersDirectory || typeof options.layersDirectory === 'string', 'options.layersDir should be a string');

const layersDir = options.layersDirectory ? options.layersDirectory : layersDefaultDirectory;
const lambdaArn = options.lambdaArn;
const event = options.lambdaInput;
const taskDir = options.taskDirectory;
const workDir = options.workDirectory;
const {
lambdaArn, lambdaInput, taskDirectory, workDirectory
} = options;

log.sender = getLogSenderFromLambdaId(lambdaArn);

log.info('Downloading the Lambda function');
try {
const handler = await installLambdaFunction(lambdaArn, workDir, taskDir, layersDir);
const output = await handleResponse(event, handler);
const handler = await installLambdaFunction(lambdaArn, workDirectory, taskDirectory, layersDir);
const output = await handleResponse(lambdaInput, handler);
log.info('task executed successfully');
return output;
}
Expand All @@ -351,7 +335,7 @@ async function runTask(options) {
*
* @param {Object} options - options object
* @param {string} options.lambdaArn - the arn of the lambda handler
* @param {string} options.sqsUrl - the url to the sqs queue
* @param {string} options.sqsUrl - the url to the sqs queue
* @param {integer} options.heartbeat - number of milliseconds between heartbeat messages.
* defaults to null, which deactivates heartbeats
* @param {string} options.taskDirectory - the directory to put the unzipped lambda zip
Expand All @@ -369,10 +353,9 @@ async function runServiceFromSQS(options) {

const sqs = new AWS.SQS({ apiVersion: '2016-11-23' });

const lambdaArn = options.lambdaArn;
const sqsUrl = options.sqsUrl;
const taskDir = options.taskDirectory;
const workDir = options.workDirectory;
const {
lambdaArn, sqsUrl, taskDirecotry, workDirectory
} = options;
const layersDir = options.layersDirectory ? options.layersDirectory : layersDefaultDirectory;

const runForever = isBoolean(options.runForever) ? options.runForever : true;
Expand All @@ -381,14 +364,15 @@ async function runServiceFromSQS(options) {


log.info('Downloading the Lambda function');
const handler = await installLambdaFunction(lambdaArn, workDir, taskDir, layersDir);
const handler = await installLambdaFunction(lambdaArn, workDirectory, taskDirecotry, layersDir);

let sigTermReceived = false;
process.on('SIGTERM', () => {
log.info('Received SIGTERM, will stop polling for new work');
sigTermReceived = true;
});

/* eslint-disable no-await-in-loop*/
let counter = 1;
do {
try {
Expand All @@ -399,7 +383,7 @@ async function runServiceFromSQS(options) {
}).promise();
const messages = resp.Messages;
if (messages) {
const promises = messages.map(async (message) => {
const promises = messages.map(async(message) => {
if (message && message.Body) {
const receipt = message.ReceiptHandle;
log.info('received message from queue, executing the task');
Expand All @@ -426,6 +410,7 @@ async function runServiceFromSQS(options) {

log.info('Exiting');
}
/* eslint-enable no-await-in-loop*/

/**
* Start the Lambda handler as a service by polling a SF activity queue
Expand Down Expand Up @@ -454,26 +439,26 @@ async function runServiceFromActivity(options) {
assert(Number.isInteger(options.heartbeat), 'options.heartbeat must be an integer');
}

const lambdaArn = options.lambdaArn;
const activityArn = options.activityArn;
const taskDir = options.taskDirectory;
const workDir = options.workDirectory;
const heartbeatInterval = options.heartbeat;
const {
lambdaArn, activityArn, taskDirectory, workDirectory, heartbeat
} = options;
const layersDir = options.layersDirectory ? options.layersDirectory : layersDefaultDirectory;

const runForever = isBoolean(options.runForever) ? options.runForever : true;

log.sender = getLogSenderFromLambdaId(lambdaArn);

log.info('Downloading the Lambda function');
const handler = await installLambdaFunction(lambdaArn, workDir, taskDir, layersDir);
const handler = await installLambdaFunction(lambdaArn, workDirectory, taskDirectory, layersDir);

let sigTermReceived = false;
process.on('SIGTERM', () => {
log.info('Received SIGTERM, will stop polling for new work');
sigTermReceived = true;
});

/* eslint-disable no-await-in-loop*/

let counter = 1;
do {
log.info(`[${counter}] Getting tasks from ${activityArn}`);
Expand All @@ -485,7 +470,7 @@ async function runServiceFromActivity(options) {
activity.event,
activity.token,
handler,
heartbeatInterval
heartbeat
);
}
}
Expand All @@ -500,6 +485,7 @@ async function runServiceFromActivity(options) {

log.info('Exiting');
}
/* eslint-enable no-await-in-loop*/

module.exports = {
runServiceFromActivity,
Expand Down
Loading

0 comments on commit f9c78b0

Please sign in to comment.