diff --git a/serverless-message-processing/README.md b/serverless-message-processing/README.md new file mode 100644 index 000000000..fe7962997 --- /dev/null +++ b/serverless-message-processing/README.md @@ -0,0 +1,93 @@ +# Serverless Message Processing Pattern + +## Overview +An adaptable pattern for message processing using AWS serverless services, featuring error handling and automatic recovery mechanisms. + +## Core Components +- Amazon API Gateway (message ingestion) +- Amazon SQS Queues (main + DLQs) +- Lambda Functions (processing + recovery) + + +## Architecture Diagram + +![Architecture Diagram](architecture.jpeg) + + +## Basic Flow +1. Messages enter through API Gateway +2. Main queue receives messages +3. Lambda function polls the main queue using Event Source Mappings (ESMs) and handles the messages. + Read more about how Lambda synchronously processes queue messages in this [documentation.](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html) +4. Failed messages route to DLQs -In this case a failed message would be a malformed email, however this can be adapted to other use cases. +5. Decision maker attempts an automated recovery -In this sample, we remediate common email malform issues including whitespace and typos in domain extensions. + +## Deployment +# Build the SAM application + +1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: + ``` + git clone https://github.com/aws-samples/serverless-patterns + ``` +2. Change directory to the pattern directory: + ``` + cd serverless-patterns/serverless-message-redrive + ``` + +# Build the SAM application +The ```sam build ``` command prepares an application for subsequent steps in the developer workflow, such as local testing or deploying to AWS. + + ``` +sam build + ``` +# Deploy the application +The ```sam deploy``` command deploys an application to the AWS Cloud using AWS CloudFormation. The ```--guided``` option is to have the AWS SAM CLI use prompts to guide you through the deployment. + + ``` +sam deploy --guided + ``` + +## Key Features +- Automatic retry mechanism +- Segregation of recoverable/fatal errors +- Processing logic with the potential for points of adaptation + +## API Reference +# Send Message + +The following is an example API call that you can try with your own endpoint. + + ``` + +curl -X POST \ + 'https://\${endpoint}/prod/message' \ + -H 'Content-Type: application/json' \ + -d '{ + "messageId": "test-456", + "messageType": "TYPE_A", + "payload": { + "email": "user@example.com", + "data": "some data" + }, + "timestamp": "2023-11-22T12:00:00Z" + }' + ``` + + +## Adaptation Points +- Message validation rules +- Processing logic +- Error handling strategies +- Recovery mechanisms +- Monitoring requirements +- API Design + +## Cleanup +1. Delete the SAM template +``` +sam delete +``` +2. Confirm the stack has been deleted +``` +aws cloudformation list-stacks --query "StackSummaries[?contains(StackName,'STACK_NAME')].StackStatus" +``` \ No newline at end of file diff --git a/serverless-message-processing/architecture.jpeg b/serverless-message-processing/architecture.jpeg new file mode 100644 index 000000000..e305cd831 Binary files /dev/null and b/serverless-message-processing/architecture.jpeg differ diff --git a/serverless-message-processing/example-pattern.json b/serverless-message-processing/example-pattern.json new file mode 100644 index 000000000..88c9cbb71 --- /dev/null +++ b/serverless-message-processing/example-pattern.json @@ -0,0 +1,56 @@ +{ + "title": "Serverless Messaging Redrive", + "description": "Automate the redrive and fixing of queue messages using AWS Lambda.", + "language": "Python", + "level": "200", + "framework": "SAM", + "introBox": { + "headline": "How it works", + "text": [ + "This sample project demonstrates how to use a serverless solution for processing and fixing malformed messages using SQS queues and Lambda functions" ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/serverless-messaging-processing", + "templateURL": "serverless-patterns/serverless-messaging-processing", + "projectFolder": "serverless-messaging-processing", + "templateFile": "template.yaml" + } + }, + "resources": { + "bullets": [ + { + "text": "Amazon SQS Docs", + "link": "https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html" + }, + { + "text": "Using dead-letter queues in Amazon SQS", + "link": "https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html" + } + ] + }, + "deploy": { + "text": [ + "sam build", + "sam deploy --guided" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "Delete the stack: sam delete." + ] + }, + "authors": [ + { + "name": "Ilias Ali", + "image": "https://avatars.githubusercontent.com/zalilias", + "bio": "I am a Solutions Architect working at AWS based in the UK.", + "linkedin": "ilias-ali-0849991a4" + } + ] + } diff --git a/serverless-message-processing/functions/decision_maker/app.py b/serverless-message-processing/functions/decision_maker/app.py new file mode 100644 index 000000000..6e0512a4a --- /dev/null +++ b/serverless-message-processing/functions/decision_maker/app.py @@ -0,0 +1,199 @@ +import json +import re +import boto3 +import os +import logging + +# Set up logging +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +# Initialize AWS clients +sqs = boto3.client('sqs') + +# Environment variables +MAIN_QUEUE_URL = os.environ['MAIN_QUEUE_URL'] +FATAL_DLQ_URL = os.environ['FATAL_DLQ_URL'] + +def fix_email(email): + """ + Attempt to fix common email format issues + Args: + email: String containing malformed email + Returns: + str: Fixed email or original if unfixable + """ + try: + logger.info(f"Starting email fix attempt for: {email}") + + # Remove whitespace + email = email.strip() + + # Handle multiple @ symbols + if email.count('@') > 1: + parts = email.split('@') + email = f"{parts[0]}@{parts[-1]}" + logger.info(f"Fixed multiple @ symbols. Result: {email}") + + # Common domain typo fixes + domain_fixes = { + '.con': '.com', + '.vom': '.com', + '.comm': '.com', + '.orgg': '.org', + '.nett': '.net', + '.ckm': '.com', + '.cm': '.com' + } + + original_email = email + for wrong, right in domain_fixes.items(): + if email.endswith(wrong): + email = email[:-len(wrong)] + right + logger.info(f"Fixed domain from {wrong} to {right}. Before: {original_email}, After: {email}") + break + + return email + except Exception as e: + logger.error(f"Error fixing email: {str(e)}") + return None + +def validate_fixed_email(email): + """ + Validate fixed email format. + Args: + email: String containing fixed email + Returns: + bool: True if valid email format, False otherwise + """ + email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}\$' + return bool(re.match(email_pattern, email)) + +def lambda_handler(event, context): + """ + Processes messages from a DLQ that have failed validation, + attempting to fix common email format issues. + If fixed successfully, messages are sent back to the main queue. + If unfixable, messages are sent to a fatal DLQ. + + Flow: + 1. Extract email from failed message + 2. Attempt to fix common issues + 3. If fixed → Main Queue + 4. If unfixable → Fatal DLQ + + Extension points: + 1. Add more sophisticated routing logic- including a delay queue + 2. Implement custom error handling + 3. Add message transformation + 4. Implement retry mechanisms + 5. Add monitoring and metrics + + Args: + event: Lambda event object containing SQS messages + context: Lambda context object + Returns: + dict: Processing summary with counts and batchItemFailures + """ + processed_count = 0 + fixed_count = 0 + fatal_count = 0 + failed_message_ids = [] + + logger.info(f"Starting to process batch of {len(event['Records'])} messages") + + for record in event['Records']: + original_message_id = "unknown" + try: + # Parse the failed message + message = json.loads(record['body']) + original_message_id = message.get('messageId', 'unknown') + + # Detailed message content logging + logger.info(f"Processing message content: {json.dumps(message, indent=2)}") + + # Check if message has already been remediated + if 'remediation' in message: + logger.info("Message already remediated, skipping processing") + continue + + # Extract email from payload + if 'payload' in message and 'email' in message['payload']: + original_email = message['payload']['email'] + + # Attempt to fix email + fixed_email = fix_email(original_email) + + if fixed_email and validate_fixed_email(fixed_email): + # Update message with fixed email + message['payload']['email'] = fixed_email + message['remediation'] = { + 'original_email': original_email, + 'fixed_email': fixed_email, + 'timestamp': context.invoked_function_arn + } + + # Send back to main queue + sqs.send_message( + QueueUrl=MAIN_QUEUE_URL, + MessageBody=json.dumps(message) + ) + fixed_count += 1 + else: + # Send to fatal DLQ if unfixable + message['failureReason'] = 'Email could not be remediated' + sqs.send_message( + QueueUrl=FATAL_DLQ_URL, + MessageBody=json.dumps(message) + ) + fatal_count += 1 + else: + # Send to fatal DLQ if message structure is invalid + message['failureReason'] = 'Invalid message structure - missing email in payload' + sqs.send_message( + QueueUrl=FATAL_DLQ_URL, + MessageBody=json.dumps(message) + ) + fatal_count += 1 + + processed_count += 1 + + except Exception as e: + logger.error(f"Error processing message {original_message_id}: {str(e)}") + # Add message ID to failed messages list + failed_message_ids.append(record['messageId']) + try: + error_message = { + 'originalMessage': record['body'], + 'failureReason': f"Remediation error: {str(e)}", + 'timestamp': context.invoked_function_arn + } + sqs.send_message( + QueueUrl=FATAL_DLQ_URL, + MessageBody=json.dumps(error_message) + ) + fatal_count += 1 + except Exception as fatal_e: + logger.critical(f"Could not send to fatal DLQ: {str(fatal_e)}") + + # Execution summary + logger.info(f""" + === Execution Summary === + Messages Processed: {processed_count} + Messages Fixed: {fixed_count} + Messages Fatal: {fatal_count} + Messages Failed: {len(failed_message_ids)} + ======================== + """) + + # Return both the processing info and the batch failures for SQS + result = { + 'batchItemFailures': [{"itemIdentifier": message_id} for message_id in failed_message_ids], + 'processingInfo': { + 'processed': processed_count, + 'fixed': fixed_count, + 'fatal': fatal_count + } + } + + return result diff --git a/serverless-message-processing/functions/decision_maker/requirements.txt b/serverless-message-processing/functions/decision_maker/requirements.txt new file mode 100644 index 000000000..08939dbfa --- /dev/null +++ b/serverless-message-processing/functions/decision_maker/requirements.txt @@ -0,0 +1 @@ +boto3==1.26.137 diff --git a/serverless-message-processing/functions/processor/app.py b/serverless-message-processing/functions/processor/app.py new file mode 100644 index 000000000..bba95b809 --- /dev/null +++ b/serverless-message-processing/functions/processor/app.py @@ -0,0 +1,116 @@ +import json +import logging +import re + +# Set up logging +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +def validate_email(email): + """ + Validate email format + Args: + email: Email string to validate + Returns: + bool: True if valid email format, False otherwise + """ + email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}' + if not bool(re.match(email_pattern, email)): + logger.error(f"Invalid email format: {email}") + return False + return True + +def validate_message_structure(message): + """ + Validate message structure and required fields. + Args: + message: Dictionary containing message data + Returns: + tuple: (bool, str) - (is_valid, error_message) + """ + required_fields = ['messageType', 'payload', 'timestamp'] + + # Check required fields + if not all(field in message for field in required_fields): + return False, "Missing required fields" + + # Validate message type + valid_types = ['TYPE_A', 'TYPE_B', 'TYPE_C'] + if message['messageType'] not in valid_types: + return False, f"Invalid message type: {message['messageType']}" + + # Validate payload structure + if 'email' not in message['payload']: + return False, "Missing email in payload" + + # Validate email format + if not validate_email(message['payload']['email']): + return False, f"Invalid email format: {message['payload']['email']}" + + # Check system status if present + if 'systemStatus' in message and message['systemStatus'].lower() == 'unavailable': + return False, "DOWNSTREAM_ERROR: Target system unavailable" + + return True, "" + +def lambda_handler(event, context): + """ + Main Lambda handler function. + Args: + event: Lambda event object + context: Lambda context object + Returns: + dict: Response object + """ + logger.info(f"Processing {len(event['Records'])} messages") + + processed_count = 0 + failed_count = 0 + validation_errors = [] + + for record in event['Records']: + try: + # Parse the message body + message = json.loads(record['body']) + message_id = message.get('messageId', 'unknown') + + # Validate message + is_valid, error_message = validate_message_structure(message) + + if is_valid: + # Process valid message + logger.info(f"Successfully validated message: {message_id}") + processed_count += 1 + + # Add your processing logic here + # process_valid_message(message) + + else: + # Track validation failure + failed_count += 1 + validation_errors.append({ + 'messageId': message_id, + 'error': error_message + }) + logger.warning(f"Validation failed for message {message_id}: {error_message}") + # Message will automatically go to DLQ due to raised exception + raise ValueError(error_message) + + except json.JSONDecodeError as e: + failed_count += 1 + logger.error(f"Invalid JSON in message: {str(e)}") + raise + + except Exception as e: + failed_count += 1 + logger.error(f"Error processing message: {str(e)}") + raise + + return { + 'statusCode': 200, + 'body': json.dumps({ + 'processed': processed_count, + 'failed': failed_count, + 'validation_errors': validation_errors + }) + } diff --git a/serverless-message-processing/functions/processor/requirements.txt b/serverless-message-processing/functions/processor/requirements.txt new file mode 100644 index 000000000..08939dbfa --- /dev/null +++ b/serverless-message-processing/functions/processor/requirements.txt @@ -0,0 +1 @@ +boto3==1.26.137 diff --git a/serverless-message-processing/serverless-messaging-processing.json b/serverless-message-processing/serverless-messaging-processing.json new file mode 100644 index 000000000..3c3b5dbb2 --- /dev/null +++ b/serverless-message-processing/serverless-messaging-processing.json @@ -0,0 +1,105 @@ +{ + "title": "Serverless Messaging Redrive", + "description": "Automate the redrive and fixing of queue messages using AWS Lambda.", + "language": "Python", + "level": "200", + "framework": "SAM", + "introBox": { + "headline": "How it works", + "text": [ + "This sample project demonstrates a serverless solution for processing and fixing malformed messages using Amazon SQS queues and AWS Lambda functions" + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/serverless-messaging-processing", + "templateURL": "serverless-patterns/serverless-message-processing", + "projectFolder": "serverless-message-processing", + "templateFile": "template.yaml" + } + }, + "resources": { + "bullets": [ + { + "text": "Amazon SQS Docs", + "link": "https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html" + }, + { + "text": "Using dead-letter queues in Amazon SQS", + "link": "https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html" + } + ] + }, + "deploy": { + "text": [ + "sam build", + "sam deploy --guided" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "Delete the stack: sam delete." + ] + }, + "authors": [ + { + "name": "Ilias Ali", + "image": "https://avatars.githubusercontent.com/zalilias", + "bio": "I am a Solutions Architect working at AWS based in the UK.", + "linkedin": "ilias-ali-0849991a4" + } + ], + "patternArch": { + "icon1": { + "x": 10, + "y": 50, + "service": "apigw", + "label": "API Gateway" + }, + "icon2": { + "x": 30, + "y": 50, + "service": "sqs", + "label": "Amazon SQS" + }, + "icon3": { + "x": 52, + "y": 25, + "service": "lambda", + "label": "Processor" + }, + "icon4": { + "x": 70, + "y": 40, + "service": "sqs", + "label": "Amazon SQS DLQ" + }, + "icon5": { + "x": 92, + "y": 65, + "service": "lambda", + "label": "Remediation" + }, + "line1": { + "from": "icon1", + "to": "icon2" + }, + "line2": { + "from": "icon2", + "to": "icon3" + }, + "line3": { + "from": "icon3", + "to": "icon4" + }, + "line4": { + "from": "icon4", + "to": "icon5" + } + } +} diff --git a/serverless-message-processing/template.yaml b/serverless-message-processing/template.yaml new file mode 100644 index 000000000..5f839e530 --- /dev/null +++ b/serverless-message-processing/template.yaml @@ -0,0 +1,224 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: Automated Serverless Messaging Redrive (uksb-1tthgi812) (tag:serverless-message-processing) + +Parameters: + ProcessorMemorySize: + Type: Number + Default: 128 + Description: | + Memory allocation for the ProcessMessages Lambda function (MB). + Default of 128 MB is suitable for demo purposes. + For production workloads, consider: + - Using AWS Lambda Power Tuning tool to determine optimal memory settings + - Testing with your specific workload characteristics + - Monitoring performance metrics and costs + MinValue: 128 + MaxValue: 10240 + + DecisionMakerMemorySize: + Type: Number + Default: 128 + Description: | + Memory allocation for the ProcessMessages Lambda function (MB). + Default of 128 MB is suitable for demo purposes. + For production workloads, consider: + - Using AWS Lambda Power Tuning tool to determine optimal memory settings + - Testing with your specific workload characteristics + - Monitoring performance metrics and costs + MinValue: 128 + MaxValue: 10240 + + LambdaTimeout: + Type: Number + Default: 29 + Description: | + Timeout for the Lambda functions (seconds). + Consider increasing this value if: + - Processing takes longer than expected + - External API calls are involved + - Complex data transformations are needed + Note: Maximum value is 900 seconds (15 minutes) + MinValue: 1 + MaxValue: 900 + + + +Resources: + ApiGateway: + Type: AWS::Serverless::Api + Properties: + StageName: prod + DefinitionBody: + swagger: '2.0' + info: + title: SQS API + version: '1.0' + schemes: + - https + paths: + /message: + post: + consumes: + - application/json + produces: + - application/json + responses: + '200': + description: OK + schema: + type: object + x-amazon-apigateway-integration: + credentials: !GetAtt ApiGatewayRole.Arn + type: aws + uri: !Sub "arn:aws:apigateway:${AWS::Region}:sqs:path/${AWS::AccountId}/${MainQueue.QueueName}" + httpMethod: POST + responses: + default: + statusCode: "200" + requestParameters: + integration.request.header.Content-Type: "'application/x-www-form-urlencoded'" + requestTemplates: + application/json: | + Action=SendMessage&MessageBody=$input.body + passthroughBehavior: never + integrationResponses: + '200': + statusCode: '200' + responseTemplates: + application/json: | + { + "message": "Message sent successfully", + "messageId": "$input.path('$.SendMessageResponse.SendMessageResult.MessageId')" + } + + ApiGatewayRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: apigateway.amazonaws.com + Action: sts:AssumeRole + Policies: + - PolicyName: ApiGatewaySQSPolicy + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: sqs:SendMessage + Resource: !GetAtt MainQueue.Arn + + MainQueue: + Type: AWS::SQS::Queue + Properties: + RedrivePolicy: + deadLetterTargetArn: !GetAtt AutomatedDLQ.Arn + maxReceiveCount: 3 + VisibilityTimeout: 30 + MessageRetentionPeriod: 345600 + ReceiveMessageWaitTimeSeconds: 20 + + AutomatedDLQ: + Type: AWS::SQS::Queue + Properties: + MessageRetentionPeriod: 345600 + + + FatalDLQ: + Type: AWS::SQS::Queue + Properties: + MessageRetentionPeriod: 1209600 + + ProcessMessages: + Type: AWS::Serverless::Function + Properties: + Handler: app.lambda_handler + Runtime: python3.13 + CodeUri: ./functions/processor/ + MemorySize: !Ref ProcessorMemorySize + Timeout: !Ref LambdaTimeout + + Events: + SQSEvent: + Type: SQS + Properties: + Queue: !GetAtt MainQueue.Arn + BatchSize: 10 + FunctionResponseTypes: + - ReportBatchItemFailures + + Environment: + Variables: + MAIN_QUEUE_URL: !Ref MainQueue + FATAL_DLQ_URL: !Ref FatalDLQ + + DecisionMaker: + Type: AWS::Serverless::Function + Properties: + Handler: app.lambda_handler + Runtime: python3.13 + CodeUri: ./functions/decision_maker/ + MemorySize: !Ref DecisionMakerMemorySize + Timeout: !Ref LambdaTimeout + + Events: + SQSEvent: + Type: SQS + Properties: + Queue: !GetAtt AutomatedDLQ.Arn + BatchSize: 10 + FunctionResponseTypes: + - ReportBatchItemFailures + + + Policies: + - SQSPollerPolicy: + QueueName: !GetAtt FatalDLQ.QueueName + - SQSSendMessagePolicy: + QueueName: !GetAtt FatalDLQ.QueueName + - SQSSendMessagePolicy: + QueueName: !GetAtt MainQueue.QueueName + Environment: + Variables: + MAIN_QUEUE_URL: !Ref MainQueue + FATAL_DLQ_URL: !Ref FatalDLQ + + + # CloudWatch Log Groups + ProcessMessagesLogGroup: + Type: AWS::Logs::LogGroup + Properties: + LogGroupName: !Sub "/aws/lambda/${ProcessMessages}" + RetentionInDays: 14 + + DecisionMakerLogGroup: + Type: AWS::Logs::LogGroup + Properties: + LogGroupName: !Sub "/aws/lambda/${DecisionMaker}" + RetentionInDays: 14 + + + +Outputs: + ApiEndpoint: + Description: API Gateway endpoint URL + Value: !Sub "https://${ApiGateway}.execute-api.${AWS::Region}.amazonaws.com/prod/message" + + MainQueueUrl: + Description: URL of the main SQS queue + Value: !Ref MainQueue + + MainQueueArn: + Description: ARN of the main SQS queue + Value: !GetAtt MainQueue.Arn + + AutomatedDLQUrl: + Description: URL of the automated DLQ + Value: !Ref AutomatedDLQ + + FatalDLQUrl: + Description: URL of the fatal DLQ + Value: !Ref FatalDLQ