Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(custom-resources): handle Inactive lambda functions #20922

Closed
wants to merge 18 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import { IsCompleteResponse, OnEventResponse } from '../types';
import * as cfnResponse from './cfn-response';
import * as consts from './consts';
import { invokeFunction, startExecution } from './outbound';
import { invokeFunction, startExecution, getFunction } from './outbound';
import { getEnv, log } from './util';

// use consts for handler names to compiler-enforce the coupling with construction code.
Expand All @@ -13,6 +13,9 @@ export = {
[consts.FRAMEWORK_ON_TIMEOUT_HANDLER_NAME]: onTimeout,
};

const BASE_SLEEP = 10_000;
const MAX_TOTAL_SLEEP = 620_000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I apparently already forgot about our conversation on the duplicate PR because I was about to be like, why this number? 😂

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rereading that, however, made me realize that the ordering on these checks might be somewhat off. If the lambda. is called while it is in inactive or pending state, the invocation will simply fail so a new invocation will need to be called instead of giving time for the current invocation to pass. Basically, I think that invokeUserFunction may need to have the following workflow:

Within a while loop `await invokeFunction` -> `await getFunctionResponse` -> 
    if `Configuration. LastUpdateStatus.failure && Configuration.State === inactive or pending` -> retry
    else if -> `Configuration. LastUpdateStatus.failure` i.e. it's status is active or failed -> get and throw error error (break)
    else -> return payload (break)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above could be a do-while loop so that you don't have to use breaks but obviously the actual implementation is up to you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, regarding this number, do we know in general how long it usually takes a function to return to active? That data might be good in determining this number.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, we do not at this time. I should have lambdas that will become inactive in the next week or so (someone from lambda told me offline that this is ~30 days), so I can check once I have an inactive function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original assessment was only partially right. I was under the impression that the Lamba function failed, not errored, but upon further digging, I see that it throws an exception. See outbound.ts for my revised suggestion here.


/**
* The main runtime entrypoint of the async custom resource lambda function.
*
Expand Down Expand Up @@ -96,7 +99,7 @@ async function onTimeout(timeoutEvent: any) {
});
}

async function invokeUserFunction<A extends { ResponseURL: '...' }>(functionArnEnv: string, sanitizedPayload: A, responseUrl: string) {
async function invokeUserFunction<A extends { ResponseURL: '...' }>(functionArnEnv: string, sanitizedPayload: A, responseUrl: string, reinvoke?: boolean): Promise<any> {
const functionArn = getEnv(functionArnEnv);
log(`executing user function ${functionArn} with payload`, sanitizedPayload);

Expand All @@ -112,17 +115,47 @@ async function invokeUserFunction<A extends { ResponseURL: '...' }>(functionArnE

log('user function response:', resp, typeof(resp));

// parse function name from arn
// arn:${Partition}:lambda:${Region}:${Account}:function:${FunctionName}
const arn = functionArn.split(':');
const functionName = arn[arn.length - 1];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a circumstance where this lambda might have a version added to the end of it or are our custom resources using unqualified ARNs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user function is under the user's control, so they can create versions as they please. However, getFunction() will happily accept a name with a :version-string at the end as well, so we're safe in either case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue here is that if they do provide us with one that has a version at the end, arn[arn.length - 1] will pick up the version number instead of the function name, since it would be arn:${Partition}:lambda:${Region}:${Account}:function:${FunctionName}:${Version}


const jsonPayload = parseJsonPayload(resp.Payload);
if (resp.FunctionError) {
let totalSleep = 0, attempt = 0;
while (totalSleep <= MAX_TOTAL_SLEEP) {
// if the user's lambda has become Inactive, we must retry the invocation until Lambda finishes provisioning resources for it.
const getFunctionResponse = await getFunction({
FunctionName: functionName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we can just provide the ARN here instead of trying to parse per GetFunction Documentation.

Copy link
Contributor Author

@comcalvi comcalvi Aug 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, but we need the name later anyway for the logs (line 154 in this file with these changes), and imo passing the name is cleaner. If you prefer though we can change this to the ARN

});

if ((getFunctionResponse.Configuration?.State === 'Active' || getFunctionResponse.Configuration?.State === 'Failed') && !reinvoke) {
if (getFunctionResponse.Configuration?.State === 'Active') {
log('user function is in the \'Active\' state, reinvoking it now');
} else if (getFunctionResponse.Configuration?.State === 'Failed') {
log('user function is in the \'Failed\' state with this reason code: ', getFunctionResponse.Configuration.StateReasonCode);
log('user function provided this reason for the error: ', getFunctionResponse.Configuration.StateReason);
log('reinvoking user function to get error trace');
}

// do not reinvoke more than once
return invokeUserFunction(functionArnEnv, sanitizedPayload, responseUrl, true);
}

const currentSleep = Math.floor(BASE_SLEEP * Math.pow(2, attempt) * Math.random());

// don't spend more than 10 minutes and some change waiting
log(`user function is still being initialized by Lambda, sleeping for: ${currentSleep} ms before retry`);
await sleep(currentSleep);

totalSleep += currentSleep;
attempt++;
}

log('user function threw an error:', resp.FunctionError);

const errorMessage = jsonPayload.errorMessage || 'error';

// parse function name from arn
// arn:${Partition}:lambda:${Region}:${Account}:function:${FunctionName}
const arn = functionArn.split(':');
const functionName = arn[arn.length - 1];

// append a reference to the log group.
const message = [
errorMessage,
Expand All @@ -146,6 +179,10 @@ async function invokeUserFunction<A extends { ResponseURL: '...' }>(functionArnE
return jsonPayload;
}

async function sleep(ms: number): Promise<void> {
return new Promise<void>(ok => setTimeout(ok, ms));
}

function parseJsonPayload(payload: any): any {
if (!payload) { return { }; }
const text = payload.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ async function defaultInvokeFunction(req: AWS.Lambda.InvocationRequest): Promise
return lambda.invoke(req).promise();
}

async function defaultGetFunction(req: AWS.Lambda.GetFunctionRequest): Promise<AWS.Lambda.GetFunctionResponse> {
if (!lambda) {
lambda = new AWS.Lambda(awsSdkConfig);
}

return lambda.getFunction(req).promise();
}

Comment on lines +49 to +56
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, according to the documentation, we can run into a situation where new lamba functions also haven't finished provisioning before we try to invoke them so I wonder if the right path forward here is to replace `defaultInvokeFunction with the following (roughly, this also contains notes):

async function defaultInvokeFunction(req: AWS.Lambda.InvocationRequest): Promise<AWS.Lambda.InvocationResponse> {
  if (!lambda) {
    lambda = new AWS.Lambda(awsSdkConfig);
  }

  let status = await defaultGetFunction(req);
  if (status.Configuration?.State !== 'Failed') {
    try {
      // If it is inactive, pending, or failed it will throw instead of returning
      return lambda.invoke(req).promise();
    } catch (e) {
      // We only care about inactive or pending for retries
      while (status.Configuration?.State === 'Inactive' || status.Configuration?.State === 'Pending') {
        // Add wait of some sort in here if we want
        // Since we don't know how long the reprovisioning of resources will take, we shouldn't add in a max time.
        status = await defaultGetFunction(req);
      }
    }
  }
  // Now that it's either active or failed, return the terminal response, error or not
  return lambda.invoke(req).promise();
}

This would mean that we don't want to export defaultGetFunction as getFunction since the scope of its use is only in this file. This edit would require no change (I think) to the logic in invokeUserFunction() in framework.ts. Well, except that there's still the issue with the ARN that I commented on. Maybe we just never take in qualified ARNs, though, so maybe that isn't an issue. On the other hand, you could export getFunction and use that to make sure we're getting the correct function name. It's part of Configuration.

What do you think?

export let startExecution = defaultStartExecution;
export let invokeFunction = defaultInvokeFunction;
export let getFunction = defaultGetFunction;
export let httpRequest = defaultHttpRequest;
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ export let isCompleteImplMock: AWSCDKAsyncCustomResource.IsCompleteHandler | und
export let startStateMachineInput: AWS.StepFunctions.StartExecutionInput | undefined;
export let cfnResponse: AWSLambda.CloudFormationCustomResourceResponse;

let getFunctionInvokeCount: number;

export function setup() {
process.env[consts.WAITER_STATE_MACHINE_ARN_ENV] = MOCK_SFN_ARN;

Expand All @@ -27,6 +29,7 @@ export function setup() {
isCompleteImplMock = undefined;
cfnResponse = {} as any;
startStateMachineInput = undefined;
getFunctionInvokeCount = -1;
}

export async function httpRequestMock(options: https.RequestOptions, body: string) {
Expand Down Expand Up @@ -110,6 +113,17 @@ export async function invokeFunctionMock(req: AWS.Lambda.InvocationRequest): Pro
}
}

export async function invokeInactiveFunctionMock(_req: AWS.Lambda.InvocationRequest): Promise<AWS.Lambda.InvocationResponse> {
const ret = {
foo: 'bar',
};

return {
FunctionError: getFunctionInvokeCount <= 1 ? 'Lambda is initializing your function...' : undefined,
Payload: stringifyPayload ? JSON.stringify(ret) : ret,
};
}

export function prepareForExecution() {
startStateMachineInput = undefined;

Expand All @@ -134,3 +148,25 @@ export async function startExecutionMock(req: AWS.StepFunctions.StartExecutionIn
startDate: new Date(),
};
}

export async function getFunctionMock(_req: AWS.Lambda.GetFunctionRequest): Promise<AWS.Lambda.GetFunctionResponse> {
getFunctionInvokeCount++;
if (getFunctionInvokeCount === 0) {
return {
Configuration: {
State: 'Inactive',
},
};
} else if (getFunctionInvokeCount === 1) {
return {
Configuration: {
State: 'Pending',
},
};
}
return {
Configuration: {
State: 'Active',
},
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,23 @@ const MOCK_ATTRS = { MyAttribute: 'my-mock-attribute' };
outbound.httpRequest = mocks.httpRequestMock;
outbound.invokeFunction = mocks.invokeFunctionMock;
outbound.startExecution = mocks.startExecutionMock;
outbound.getFunction = mocks.getFunctionMock;
(setTimeout as any) = jest.fn((callback, _timeout) => {
return callback();
});

const invokeFunctionSpy = jest.spyOn(outbound, 'invokeFunction');
let invokeFunctionSpy = jest.spyOn(outbound, 'invokeFunction');
const getFunctionSpy = jest.spyOn(outbound, 'getFunction');

beforeEach(() => mocks.setup());
afterEach(() => invokeFunctionSpy.mockClear());
beforeEach(() => {
outbound.invokeFunction = mocks.invokeFunctionMock;
invokeFunctionSpy = jest.spyOn(outbound, 'invokeFunction'); // has to be reset because one test changes outbound.invokeFunction
mocks.setup();
});
afterEach(() => {
invokeFunctionSpy.mockClear();
getFunctionSpy.mockClear();
});

test('async flow: isComplete returns true only after 3 times', async () => {
let isCompleteCalls = 0;
Expand Down Expand Up @@ -346,7 +358,20 @@ describe('if CREATE fails, the subsequent DELETE will be ignored', () => {
// THEN
expectCloudFormationSuccess();
});
});

test('getFunction() is called only when user function is inactive, pending, and active', async () => {
// GIVEN
mocks.onEventImplMock = async () => ({ PhysicalResourceId: MOCK_PHYSICAL_ID });
outbound.invokeFunction = mocks.invokeInactiveFunctionMock;

// WHEN
await simulateEvent({
RequestType: 'Create',
});

// THEN
expect(getFunctionSpy).toHaveBeenCalledTimes(3);
});

describe('ResponseURL is passed to user function', () => {
Expand Down