-
Notifications
You must be signed in to change notification settings - Fork 408
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[I Made This]: Serverless Transactional Messages App #2180
Comments
Thanks for opening your first issue here! We'll come back to you as soon as we can. |
Thank you @san99tiago ! I'll create a PR to add this to our documentation today. Thank you for the nice sample! |
+1 to what Ruben said :) You could make that example even more resilient and easier to read by introducing Powertools Batch feature: https://awslabs.github.io/aws-lambda-powertools-python/2.14.1/utilities/batch/#processing-messages-from-sqs |
Example using Batch and with redundant parts now removed :) This now supports partial failures. You just need to change your Lambda function in CDK to enable If you later want to add another example for Kinesis Data Streams or DynamoDB Streams, you'll only need to change 2-3 lines ;) ################################################################################
# Lambda Function to "simulate" the processing of the messages
################################################################################
# Built-in imports
import time
import json
# External imports
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
from aws_lambda_powertools.utilities.batch.types import PartialItemFailureResponse
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
tracer = Tracer(service="MessagesAPIService")
logger = Logger(service="MessagesAPIService", log_uncaught_exceptions=True, owner="san99tiago")
processor = BatchProcessor(event_type=EventType.SQS)
@tracer.capture_method
def process_message(record: SQSRecord):
# Add message id for each log statement so we know which message is being processed
logger.append_keys(message_id=record.message_id)
# Batch will call this function for each record and will handle partial failures
logger.info(record.body)
# Simulate a "time" processing delay for the messages
logger.debug("Processing message")
tracer.put_annotation(key="sqs_id", value=record.message_id)
time.sleep(4)
logger.debug("Finished processing message")
try:
# Validate "Message" key on input, otherwise return failure
# Note: if input does not contain "Message" key, this will raise an error
message = json.loads(record.body)["Message"]
logger.info(f"Message: {message}")
return True
except:
logger.exception("Failed to process message")
raise
@logger.inject_lambda_context(log_event=True)
@tracer.capture_lambda_handler
def handler(event: dict, context: LambdaContext) -> PartialItemFailureResponse:
logger.info("Starting messages processing")
tracer.put_metadata(key="details", value="messages processing handler")
number_of_records = len(event.get("Records", []))
tracer.put_metadata(key="total_messages", value=number_of_records)
batch_response = process_partial_response(event=event, record_handler=process_message, processor=processor, context=context)
logger.info("Finished messages processing")
return batch_response |
Awesome @heitorlessa , that makes sense to be changed. I will add those Batch Processing SQS improvements later today in my repo so that it's 100% compliant. Thanks for the feedback @rubenfonseca , @heitorlessa . I appreciate it!! |
@san99tiago great!! Can you please ping us again here when you've done the changes? Then we will be ready to merge the PR :) |
Hi @heitorlessa , @rubenfonseca . I have succesfully implemented and tested the suggestions. You can double check them with this PR: Let me know if additional requirements are needed. I appreciate the given feedback and taking the time to read the code! |
Thank you for the quick turnaround @san99tiago ! We're merging the PR now :) It's a nice sample, congratulations! |
|
This is now released under 2.15.0 version! |
Link to your material
https://github.com/san99tiago/aws-cdk-transactional-messages
Description
A well documented example of a Transactional Messages App that illustrates how to use Lambda PowerTools to process SQS messages in batches (with IaC on top of CDK). It uses LambdaPowerTools Logger, Tracing, DataClasses and includes UnitTests. The code is simple, but it's really usefull to understand basic concepts and integrate them with IaC deployments using CDK-Python.
Preferred contact
[email protected]
(Optional) Social Network
https://san99tiago.com
(Optional) Additional notes
Overview of example Architecture (with Lambda Function on top of Lambda PowerTools):
Acknowledgment
The text was updated successfully, but these errors were encountered: