Skip to content

New serverless pattern - Serverless Messaging Redrive #2543

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 23, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions serverless-message-processing/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Serverless Message Processing Pattern

## Overview
An adaptable pattern for message processing using AWS serverless services, featuring error handling and automatic recovery mechanisms.

## Core Components
- API Gateway (message ingestion)
- SQS Queues (main + DLQs)
- Lambda Functions (processing + recovery)

## Basic Flow
1. Messages enter through API Gateway
2. Main queue receives messages
3. Processor Lambda handles messages
4. Failed messages route to DLQs
5. Decision maker attempts an automated recovery

## Deployment
# Build the SAM application
```
sam build
```
# Deploy the application
```
sam deploy --guided
```

## Key Features
- Automatic retry mechanism
- Segregation of recoverable/fatal errors
- Extensible processing logic

## API Reference
# Send Message
```

POST /message
Content-Type: application/json
```
```
{
"messageType": "TYPE_A|TYPE_B|TYPE_C",
"payload": {},
"timestamp": "ISO8601_TIMESTAMP"
}
```


## Adaptation Points
- Message validation rules
- Processing logic
- Error handling strategies
- Recovery mechanisms
- Monitoring requirements
- API Design

## Note
This is a sample pattern. Adapt security, scaling, and processing logic according to your requirements.
60 changes: 60 additions & 0 deletions serverless-message-processing/example-pattern.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
{
"title": "Step Functions to Athena",
"description": "Create a Step Functions workflow to query Amazon Athena.",
"language": "Python",
"level": "200",
"framework": "SAM",
"introBox": {
"headline": "How it works",
"text": [
"This sample project demonstrates how to use a serverless solution for processing and fixing malformed messages using SQS queues and Lambda functions",
"The system automatically handles message validation, applies fixes where possible, and routes messages to appropriate queues based on their fixability.",
"It has built-in error handling and detailed logging, it provides a robust framework for message processing that can be easily extended for specific business needs.",
"This pattern uses AWS Lambda for processing, multiple SQS queues for message routing, and includes 2 dead-letter queue (DLQ) for messages requiring human intervention or for auto-remediation."
]
},
"gitHub": {
"template": {
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/serverless-messaging-processing",
"templateURL": "serverless-patterns/serverless-messaging-processing",
"projectFolder": "serverless-messaging-processing",
"templateFile": "template.yaml"
}
},
"resources": {
"bullets": [
{
"text": "Amazon SQS Docs",
"link": "https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html"
},
{
"text": "Using dead-letter queues in Amazon SQS",
"link": "https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html"
}
]
},
"deploy": {
"text": [
"sam build",
"sam deploy --guided"
]
},
"testing": {
"text": [
"See the GitHub repo for detailed testing instructions."
]
},
"cleanup": {
"text": [
"Delete the stack: <code>sam delete</code>."
]
},
"authors": [
{
"name": "Ilias Ali",
"image": "link-to-your-photo.jpg",
"bio": "I am a Solutions Architect working at AWS based in the UK.",
"linkedin": "ilias-ali-0849991a4"
}
]
}
152 changes: 152 additions & 0 deletions serverless-message-processing/functions/decision_maker/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import json
import re
import boto3
import os
import logging

# Set up logging
logger = logging.getLogger()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an opportunity to use the PowerTools logging module here instead of Python native?

logger.setLevel(logging.INFO)

# Initialize AWS clients
sqs = boto3.client('sqs')

# Environment variables
MAIN_QUEUE_URL = os.environ['MAIN_QUEUE_URL']
FATAL_DLQ_URL = os.environ['FATAL_DLQ_URL']

# Email validation pattern
EMAIL_PATTERN = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'

def fix_email(email):
"""
Attempt to fix common email format issues
Can be amended to other scenarios e.g. Downstream issues
"""
# Remove multiple @ symbols, keep the last one
if email.count('@') > 1:
parts = email.split('@')
email = f"{parts[0]}@{parts[-1]}"

# Remove spaces
email = email.strip().replace(' ', '')

# Fix common typos in domain extensions
common_fixes = {
'.con': '.com',
'.vom': '.com',
'.comm': '.com',
'.orgg': '.org',
'.nett': '.net'
}

for wrong, right in common_fixes.items():
if email.endswith(wrong):
email = email[:-len(wrong)] + right

return email

def can_fix_email(message):
"""
Check if the email in the message can be fixed
"""
if 'email' not in message:
return False

email = message['email']
fixed_email = fix_email(email)

return bool(re.match(EMAIL_PATTERN, fixed_email))


def lambda_handler(event, context):
"""
Processes messages from a DLQ that have already failed to be automatically processed,
and attempts automated remediation and redelivery of the messages back to the main queue.
If no suitable fixes can be applied, messages end up in a fatal DLQ where the typical
approach of human intervention is required.

Flow:
1. Attempt to fix message
2. If fixable -> Main Queue
3. If unfixable -> Fatal DLQ

Extension points:
1. Add more sophisticated routing logic- including a delay queue
2. Implement custom error handling
3. Add message transformation
4. Implement retry mechanisms
5. Add monitoring and metrics

"""
processed_count = 0

for record in event['Records']:
try:
# Parse the message body
message = json.loads(record['body'])
original_message_id = record.get('messageId', 'unknown')

logger.info(f"Processing failed message: {original_message_id}")




# Option A: Try to fix malformed email

if can_fix_email(message) and not re.match(EMAIL_PATTERN, message['email']):
fixed_email = fix_email(message['email'])
logger.info(f"Fixed email from '{message['email']}' to '{fixed_email}'")

# Update the message with fixed email
message['email'] = fixed_email
message['emailWasFixed'] = True

# Send back to main queue
sqs.send_message(
QueueUrl=MAIN_QUEUE_URL,
MessageBody=json.dumps(message)
)

logger.info(f"Sent fixed message back to main queue: {original_message_id}")

# Option B: Cannot fix - send to fatal DLQ
else:
logger.warning(f"Message cannot be fixed, sending to fatal DLQ: {original_message_id}")

# Add failure reason if not present
if 'failureReason' not in message:
message['failureReason'] = 'Unrecoverable error - could not fix message'

# Send to fatal DLQ
sqs.send_message(
QueueUrl=FATAL_DLQ_URL,
MessageBody=json.dumps(message)
)

processed_count += 1

except Exception as e:
logger.error(f"Error processing message {original_message_id}: {str(e)}")
# If we can't process the decision, send to fatal DLQ
try:
error_message = {
'originalMessage': record['body'],
'failureReason': f"Decision maker error: {str(e)}",
'timestamp': context.invoked_function_arn
}
sqs.send_message(
QueueUrl=FATAL_DLQ_URL,
MessageBody=json.dumps(error_message)
)

except Exception as fatal_e:
logger.critical(f"Could not send to fatal DLQ: {str(fatal_e)}")
raise

return {
'statusCode': 200,
'body': json.dumps({
'processedMessages': processed_count
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
boto3==1.26.137
121 changes: 121 additions & 0 deletions serverless-message-processing/functions/processor/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import json
import logging

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use PowerTools logging implementation?


# Set up logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def validate_message_structure(message):
"""
Validate message structure and required fields.
Args:
message: Dictionary containing message data
Returns:
bool: True if valid message structure, False otherwise
"""
required_fields = ['messageType', 'payload', 'timestamp']
return all(field in message for field in required_fields)

def process_message(message):
"""
Process the message content.
Args:
message: Dictionary containing message data
Returns:
bool: True if processing successful, False otherwise
"""
try:
# Validate message structure
if not validate_message_structure(message):
logger.error("Message missing required fields")
raise ValueError("Invalid message structure")

message_type = message['messageType']
payload = message['payload']

# Validate message type
valid_types = ['TYPE_A', 'TYPE_B', 'TYPE_C']
if message_type not in valid_types:
logger.error(f"Invalid message type: {message_type}")
raise ValueError(f"Invalid message type: {message_type}")

# Check for downstream system status
if 'systemStatus' in message and message['systemStatus'].lower() == 'unavailable':
logger.error("Target system is unavailable")
raise ValueError("DOWNSTREAM_ERROR: Target system unavailable")

# Process the message based on type
logger.info(f"Processing message type: {message_type}")

# Add type-specific processing logic here
if message_type == 'TYPE_A':
# Process TYPE_A messages
pass
elif message_type == 'TYPE_B':
# Process TYPE_B messages
pass
elif message_type == 'TYPE_C':
# Process TYPE_C messages
pass

return True

except Exception as e:
logger.error(f"Error processing message: {str(e)}")
raise

def lambda_handler(event, context):
"""
Main Lambda handler function.
Args:
event: Lambda event object
context: Lambda context object
Returns:
dict: Response object
"""
logger.info(f"Processing {len(event['Records'])} messages")

processed_count = 0
failed_count = 0
downstream_errors = 0

for record in event['Records']:
try:
# Parse the message body
message = json.loads(record['body'])

# Process the message
if process_message(message):
processed_count += 1
logger.info(f"Successfully processed message: {message.get('messageId', 'unknown')}")
else:
failed_count += 1
logger.warning(f"Message processing returned False: {message.get('messageId', 'unknown')}")

except json.JSONDecodeError as e:
failed_count += 1
logger.error(f"Invalid JSON in message: {str(e)}")
raise

except ValueError as e:
if "DOWNSTREAM_ERROR" in str(e):
downstream_errors += 1
logger.error("Downstream error detected")
raise
failed_count += 1
logger.error(f"Validation error: {str(e)}")
raise

except Exception as e:
failed_count += 1
logger.error(f"Unexpected error processing message: {str(e)}")
raise

return {
'statusCode': 200,
'body': json.dumps({
'processed': processed_count,
'failed': failed_count,
'downstream_errors': downstream_errors
})
}
Loading