Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for LambdaFunctions triggered by SQS and EventBridge AWS Services. #569

Merged
merged 16 commits into from
Jun 23, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions common/src/main/java/com/genexus/db/DynamicExecute.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@ private static boolean dynamicExecute2(ModelContext context, int handle, Class c
return true;
}

private final static String METHOD_EXECUTE = "execute"; // El m�todo a ejecutar en la clase
public final static String METHOD_EXECUTE = "execute"; // Method to Execute for entry points in GXWebProcedure.

public static boolean dynamicExecute(ModelContext context, int handle, Class caller, String sPackage, String sPgmName, Object[] params)
{
String pgmName = getDynamicPgmName(caller, sPackage, sPgmName);
Expand All @@ -198,7 +199,7 @@ public static boolean dynamicExecute(ModelContext context, int handle, Class cal

public static boolean dynamicExecute(ModelContext context, int handle, Class caller, String className, Object[] params)
{
Object [] callingParams = new Object[params.length]; // Contiene el verdadero array a pasarle a la clase
Object [] callingParams = new Object[params.length];
boolean [] needToUpdateParams = new boolean[params.length]; // Indica si hay que actualizar el array de parametros(params) al terminar la invocaci�n del m�todo. Solo se deben actualizar los parametros que en destino son 'arrays', que son los que pueden sufrir modificaci�n

// Primero obtengo la clase a ejecutar
Expand Down
7 changes: 7 additions & 0 deletions gxawsserverless/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@
<version>1.2.1</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
<version>3.11.0</version>
</dependency>


<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-json-jackson</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package com.genexus.cloud.serverless;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.genexus.GXProcedure;
import com.genexus.GxUserType;
import com.genexus.ModelContext;
import com.genexus.cloud.serverless.model.EventMessageResponse;
import com.genexus.cloud.serverless.model.EventMessages;
import com.genexus.db.DynamicExecute;
import org.apache.commons.lang.NotImplementedException;

import java.lang.reflect.Array;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Optional;

public class GXProcedureExecutor {
protected Class<GXProcedure> entryPointClass;

private ModelContext modelContext;
private Class<?>[][] supportedMethodSignatures = new Class<?>[5][];
private int methodSignatureIdx = -1;

protected static final String MESSAGE_COLLECTION_INPUT_CLASS_NAME = "com.genexus.genexusserverlessapi.SdtEventMessages";
protected static final String MESSAGE_OUTPUT_COLLECTION_CLASS_NAME = "com.genexus.genexusserverlessapi.SdtEventMessageResponse";

public GXProcedureExecutor(String procedureClassName) throws ClassNotFoundException, NotImplementedException {
entryPointClass = (Class<GXProcedure>) Class.forName(procedureClassName);

supportedMethodSignatures[0] = new Class<?>[]{Class.forName(MESSAGE_COLLECTION_INPUT_CLASS_NAME), Class.forName(MESSAGE_OUTPUT_COLLECTION_CLASS_NAME)};
supportedMethodSignatures[1] = new Class<?>[]{String.class, Class.forName(MESSAGE_OUTPUT_COLLECTION_CLASS_NAME)};
supportedMethodSignatures[2] = new Class<?>[]{String.class};
supportedMethodSignatures[3] = new Class<?>[]{Class.forName(MESSAGE_OUTPUT_COLLECTION_CLASS_NAME)};
supportedMethodSignatures[4] = new Class<?>[]{}; //No inputs, no outputs

Optional<Method> executeMethodOpt = Arrays.stream(this.entryPointClass.getDeclaredMethods()).filter(m -> m.getName() == DynamicExecute.METHOD_EXECUTE).findFirst();

if (!executeMethodOpt.isPresent()) {
throw new NotImplementedException(String.format("EXECUTE Method not implemented on Class '%s'", procedureClassName));
}

Method executeMethod = executeMethodOpt.get();
Class<?>[] parametersTypes = executeMethod.getParameterTypes();

for (int i = 0; i < supportedMethodSignatures.length && methodSignatureIdx < 0; i++) {
if (supportedMethodSignatures[i].length != parametersTypes.length) {
continue;
}
Class<?>[] listParameters = (Class<?>[]) supportedMethodSignatures[i];
boolean isMatch = true;
for (int j = 0; j < listParameters.length && isMatch; j++) {
isMatch = listParameters[j] == parametersTypes[j] || listParameters[j] == parametersTypes[j].getComponentType();
}
if (isMatch) {
methodSignatureIdx = i;
}
}
if (methodSignatureIdx < 0) {
throw new NotImplementedException("Expected signature method did not match");
}

}

public EventMessageResponse execute(ModelContext modelContext, EventMessages msgs, String rawJsonEvent) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, JsonProcessingException {
EventMessageResponse response = new EventMessageResponse();

Object[] parameters;
boolean returnsValue = true;
switch (methodSignatureIdx) {
case 0:
Class<?> inputClass = Class.forName(MESSAGE_COLLECTION_INPUT_CLASS_NAME);
Object msgsInput = inputClass.getConstructor().newInstance();
if (GxUserType.class.isAssignableFrom(inputClass)) {
((GxUserType) msgsInput).fromJSonString(Helper.toJSONString(msgs));
}
parameters = new Object[]{msgsInput, new Object[]{}};
break;
case 1:
parameters = new Object[]{rawJsonEvent, new Object[]{}};
break;
case 2:
parameters = new Object[]{rawJsonEvent};
response.setHandled(true);
returnsValue = false;
break;
case 3:
parameters = new Object[]{new Object[]{}};
break;
default:
parameters = new Object[]{};
returnsValue = false;
response.setHandled(true);
break;
}

Object[] paramOutArray = null;
if (returnsValue) {
parameters[parameters.length - 1] = (Object[]) Array.newInstance(Class.forName(MESSAGE_OUTPUT_COLLECTION_CLASS_NAME), 1);
paramOutArray = (Object[]) parameters[parameters.length - 1];
paramOutArray[0] = Class.forName(MESSAGE_OUTPUT_COLLECTION_CLASS_NAME).getConstructor(int.class, ModelContext.class).newInstance(-1, modelContext);
}

com.genexus.db.DynamicExecute.dynamicExecute(modelContext, -1, entryPointClass, "", entryPointClass.getName(), parameters);

if (paramOutArray != null) {
GxUserType handlerOutput = (GxUserType) paramOutArray[0];
String jsonResponse = handlerOutput.toJSonString(false);
response = new ObjectMapper().readValue(jsonResponse, EventMessageResponse.class);
}
return response;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.genexus.cloud.serverless;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.genexus.cloud.serverless.model.EventMessage;
import com.genexus.cloud.serverless.model.EventMessageProperty;

public class Helper {

public static String toJSONString(Object dtoObject) {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(dtoObject);
}
catch (Exception e) { }
return "";
}

public static void addEventMessageProperty(EventMessage msg, String key, String value) {
msg.getMessageProperties().add(new EventMessageProperty(key, value));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.amazonaws.serverless.proxy.internal.servlet.AwsProxyHttpServletRequest;
import com.amazonaws.serverless.proxy.internal.servlet.AwsServletContext;
import com.amazonaws.serverless.proxy.model.MultiValuedTreeMap;
import com.genexus.cloud.serverless.aws.handler.AwsGxServletResponse;
import com.genexus.specific.java.Connect;
import com.genexus.specific.java.LogManager;
import com.genexus.webpanels.GXWebObjectStub;
Expand All @@ -35,7 +36,7 @@ public class LambdaHandler implements RequestHandler<AwsProxyRequest, AwsProxyRe
public static JerseyLambdaContainerHandler<AwsProxyRequest, AwsProxyResponse> handler = null;
private static ResourceConfig jerseyApplication = null;
private static final String BASE_REST_PATH = "/rest/";
private static final String GX_APPLICATION_CLASS = "GXApplication";
private static final String GX_APPLICATION_CLASS = "GXApplication";

public LambdaHandler() throws Exception {
if (LambdaHandler.jerseyApplication == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.genexus.cloud.serverless.aws;
package com.genexus.cloud.serverless.aws.handler;

import com.amazonaws.serverless.proxy.model.AwsProxyResponse;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package com.genexus.cloud.serverless.aws.handler;

import com.genexus.ModelContext;
import com.genexus.cloud.serverless.*;
import com.genexus.cloud.serverless.model.EventMessageResponse;
import com.genexus.cloud.serverless.model.EventMessages;
import com.genexus.diagnostics.core.ILogger;
import com.genexus.specific.java.Connect;
import com.genexus.specific.java.LogManager;
import com.genexus.util.IniFile;


public class LambdaBaseEventHandler {
protected static ILogger logger = null;
protected static Class entryPointClass = null;
private static LambdaFunctionConfiguration functionConfiguration;
private static final String GX_APPLICATION_CLASS = "GXcfg";
private static String packageName = null;
private static GXProcedureExecutor executor;

public LambdaBaseEventHandler() throws Exception {
initialize();

}

public LambdaBaseEventHandler(String className) throws Exception {
functionConfiguration = new LambdaFunctionConfiguration(className);
initialize();

}


private void initialize() throws Exception {
logger = LogManager.initialize(".", LambdaBaseEventHandler.class);
Connect.init();

IniFile config = com.genexus.ConfigFileFinder.getConfigFile(null, "client.cfg", null);
packageName = config.getProperty("Client", "PACKAGE", null);
Class cfgClass;

String cfgClassName = packageName.isEmpty() ? GX_APPLICATION_CLASS : String.format("%s.%s", packageName, GX_APPLICATION_CLASS);
try {
cfgClass = Class.forName(cfgClassName);
com.genexus.Application.init(cfgClass);
} catch (ClassNotFoundException e) {
logger.error(String.format("Failed to initialize GX AppConfig Class: %s", cfgClassName), e);
throw e;
}

logger.debug("Initializing Function configuration");
try {
if (functionConfiguration == null) {
functionConfiguration = LambdaFunctionConfigurationHelper.getFunctionConfiguration();
}
entryPointClass = Class.forName(functionConfiguration.getEntryPointClassName());
} catch (Exception e) {
logger.error(String.format("Failed to initialize Application for className: %s", functionConfiguration.getEntryPointClassName()), e);
throw e;
}

if (entryPointClass == null) {
throw new ClassNotFoundException(String.format("GeneXus Procedure '%s' was not found. Check deployment package ", functionConfiguration.getEntryPointClassName()));
}

executor = new GXProcedureExecutor(functionConfiguration.getEntryPointClassName());
}

protected EventMessageResponse dispatchEvent(EventMessages eventMessages, String lambdaRawMessageBody) throws Exception {
String jsonStringMessages = Helper.toJSONString(eventMessages);

if (logger.isDebugEnabled()) {
logger.debug(String.format("dispatchEventMessages (%s) - serialized messages: %s", functionConfiguration.getEntryPointClassName(), jsonStringMessages));
}

ModelContext modelContext = new ModelContext(entryPointClass);
EventMessageResponse response = null;

try {
response = executor.execute(modelContext, eventMessages, lambdaRawMessageBody);
} catch (Exception e) {
logger.error(String.format("dispatchEventmessages - program '%s' execution error", entryPointClass.getName()), e);
throw e;
}

if (!response.isHandled()) {
logger.info("dispatchEventmessages - messages not handled with success: " + response.getErrorMessage());
} else {
logger.debug("dispatchEventmessages - message handled with success");
}
return response;

}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.genexus.cloud.serverless.aws.handler;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.genexus.cloud.serverless.Helper;
import com.genexus.cloud.serverless.exception.FunctionRuntimeException;
import com.genexus.cloud.serverless.model.EventMessage;
import com.genexus.cloud.serverless.model.EventMessageResponse;
import com.genexus.cloud.serverless.model.EventMessageSourceType;
import com.genexus.cloud.serverless.model.EventMessages;
import com.genexus.webpanels.WebUtils;
import json.org.json.JSONObject;

import java.util.Map;

public class LambdaEventBridgeHandler extends LambdaBaseEventHandler implements RequestHandler<Map<String, Object>, String> {

public LambdaEventBridgeHandler() throws Exception {
super();
}

public LambdaEventBridgeHandler(String entryPointClassName) throws Exception {
super(entryPointClassName);
}

@Override
public String handleRequest(Map<String, Object> stringObjectMap, Context context) {
String jsonEventRaw = Helper.toJSONString(stringObjectMap);
if (logger.isDebugEnabled()) {
logger.debug("handleRequest started with event: " + jsonEventRaw);
}

String errorMessage;
EventMessageResponse response;

try {
EventMessages msgs = new EventMessages();
EventMessage msgItem = new EventMessage();
msgItem.setMessageSourceType(EventMessageSourceType.ServiceBusMessage);
if (stringObjectMap.containsKey("time")) {
msgItem.setMessageDate(WebUtils.parseDTimeParm(stringObjectMap.get("time").toString()));
}
msgItem.setMessageId(stringObjectMap.getOrDefault("id", "").toString());
if (stringObjectMap.containsKey("detail")) {
msgItem.setMessageData(new JSONObject(jsonEventRaw).getString("detail"));
}
for (Map.Entry<String, Object> entry : stringObjectMap.entrySet()) {
Helper.addEventMessageProperty(msgItem, entry.getKey(), entry.getValue().toString());
}
msgs.add(msgItem);
response = dispatchEvent(msgs, jsonEventRaw);
} catch (Exception e) {
errorMessage = "HandleRequest execution error";
logger.error(errorMessage, e);
throw new FunctionRuntimeException(errorMessage, e);
}

if (response == null) {
return "";
}

if (!response.isHandled()) {
//Throw exception in order to mark the message as not processed.
logger.error(String.format("Messages were not handled. Error: %s", response.getErrorMessage()));
throw new RuntimeException(response.getErrorMessage());
}
return Helper.toJSONString(response);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.genexus.cloud.serverless.aws.handler;

import com.fasterxml.jackson.annotation.JsonProperty;

public class LambdaFunctionConfiguration {

@JsonProperty("entryPointClassName")
private String entryPointClassName = null;

public LambdaFunctionConfiguration() {

}
public LambdaFunctionConfiguration(String entryPointClassName) {
this.entryPointClassName = entryPointClassName;
}

public String getEntryPointClassName() {
return entryPointClassName;
}

public void setEntryPointClassName(String entryPointClassName) {
this.entryPointClassName = entryPointClassName;
}

public boolean isValidConfiguration () {
return getEntryPointClassName() != null;
}
}
Loading