diff --git a/docs/utilities/kafka.md b/docs/utilities/kafka.md index 18e81ddb056..a3463e4e950 100644 --- a/docs/utilities/kafka.md +++ b/docs/utilities/kafka.md @@ -41,7 +41,7 @@ flowchart LR **SchemaConfig class** Contains parameters that tell Powertools how to interpret message data, including the format type (JSON, Avro, Protocol Buffers) and optional schema definitions needed for binary formats. -**Output Serializer** A Pydantic model, Python dataclass, or any custom class that helps structure data for your business logic. +**Output Serializer** A Pydantic model, Python dataclass, or any custom function that helps structure data for your business logic. **Schema Registry** Is a centralized service that stores and validates schemas, ensuring producers and consumers maintain compatibility when message formats evolve over time. @@ -64,16 +64,20 @@ Lambda processes Kafka messages as discrete events rather than continuous stream Install the Powertools for AWS Lambda package with the appropriate extras for your use case: -```bash -# Basic installation - includes JSON support -pip install aws-lambda-powertools +=== "JSON" + ```bash + pip install aws-lambda-powertools + ``` -# For processing Avro messages -pip install 'aws-lambda-powertools[kafka-consumer-avro]' +=== "Avro" + ```bash + pip install 'aws-lambda-powertools[kafka-consumer-avro]' + ``` -# For working with Protocol Buffers -pip install 'aws-lambda-powertools[kafka-consumer-protobuf]' -``` +=== "Protobuf" + ```bash + pip install 'aws-lambda-powertools[kafka-consumer-protobuf]' + ``` ### Required resources @@ -82,26 +86,7 @@ To use the Kafka consumer utility, you need an AWS Lambda function configured wi === "getting_started_with_msk.yaml" ```yaml - AWSTemplateFormatVersion: '2010-09-09' - Transform: AWS::Serverless-2016-10-31 - Resources: - KafkaConsumerFunction: - Type: AWS::Serverless::Function - Properties: - Handler: app.lambda_handler - Runtime: python3.13 - Timeout: 30 - Events: - MSKEvent: - Type: MSK - Properties: - StartingPosition: LATEST - Stream: !GetAtt MyMSKCluster.Arn - Topics: - - my-topic-1 - - my-topic-2 - Policies: - - AWSLambdaMSKExecutionRole + --8<-- "examples/kafka/consumer/sam/getting_started_with_msk.yaml" ``` ### Using ESM with Schema Registry @@ -110,6 +95,38 @@ The Event Source Mapping configuration determines which mode is used. With `JSON Powertools for AWS supports both Schema Registry integration modes in your Event Source Mapping configuration. +For simplicity, we will use a simple schema containing `name` and `age` in all our examples. You can also copy the payload example with the expected Kafka event to test your code. + +=== "JSON" + ```json + --8<-- "examples/kafka/consumer/schemas/user.json" + ``` + +=== "Payload JSON" + ```json + --8<-- "examples/kafka/consumer/events/kafka_event_json.json" + ``` + +=== "Avro Schema" + ```json + --8<-- "examples/kafka/consumer/schemas/user.avsc" + ``` + +=== "Payload AVRO" + ```json + --8<-- "examples/kafka/consumer/events/kafka_event_avro.json" + ``` + +=== "Protobuf Schema" + ```protobuf + --8<-- "examples/kafka/consumer/schemas/user.proto" + ``` + +=== "Payload Protobuf" + ```json + --8<-- "examples/kafka/consumer/events/kafka_event_protobuf.json" + ``` + ### Processing Kafka events The Kafka consumer utility transforms raw Lambda Kafka events into an intuitive format for processing. To handle messages effectively, you'll need to configure a schema that matches your data format. @@ -119,35 +136,41 @@ The Kafka consumer utility transforms raw Lambda Kafka events into an intuitive === "Avro Messages" - ```python title="getting_started_with_avro.py" + ```python hl_lines="2 21-24 27" --8<-- "examples/kafka/consumer/src/getting_started_with_avro.py" ``` === "Protocol Buffers" - ```python title="getting_started_with_protobuf.py" + ```python hl_lines="2 6 11-14 17" --8<-- "examples/kafka/consumer/src/getting_started_with_protobuf.py" ``` === "JSON Messages" - ```python title="getting_started_with_json.py" + ```python hl_lines="2 8 11" --8<-- "examples/kafka/consumer/src/getting_started_with_json.py" ``` -### Deserializing keys and values +### Deserializing key and value -The `@kafka_consumer` decorator can deserialize both keys and values independently based on your schema configuration. This flexibility allows you to work with different data formats in the same message. +The `@kafka_consumer` decorator can deserialize both key and value fields independently based on your schema configuration. This flexibility allows you to work with different data formats in the same message. === "Key and Value Deserialization" - ```python title="working_with_key_and_value.py" + ```python hl_lines="2 31-36 39" --8<-- "examples/kafka/consumer/src/working_with_key_and_value.py" ``` -=== "Value-Only Deserialization" +=== "Key-only Deserialization" + + ```python hl_lines="2 19-22 25" + --8<-- "examples/kafka/consumer/src/working_with_key_only.py" + ``` + +=== "Value-only Deserialization" - ```python title="working_with_value_only.py" + ```python hl_lines="2 21-24 27" --8<-- "examples/kafka/consumer/src/working_with_value_only.py" ``` @@ -160,13 +183,13 @@ When working with primitive data types (strings, integers, etc.) rather than str === "Primitive key" - ```python title="working_with_primitive_key.py" + ```python hl_lines="2 8 11" --8<-- "examples/kafka/consumer/src/working_with_primitive_key.py" ``` === "Primitive key and value" - ```python title="working_with_primitive_key_and_value.py" + ```python hl_lines="2 8" --8<-- "examples/kafka/consumer/src/working_with_primitive_key_and_value.py" ``` @@ -209,54 +232,10 @@ Choose the serialization format that best fits your needs: Each Kafka record contains important metadata that you can access alongside the deserialized message content. This metadata helps with message processing, troubleshooting, and implementing advanced patterns like exactly-once processing. -=== "Working with Record Metadata" +=== "Accessing record metadata" - ```python - from aws_lambda_powertools.utilities.kafka import kafka_consumer - from aws_lambda_powertools.utilities.kafka.consumer_records import ConsumerRecords - from aws_lambda_powertools.utilities.kafka.schema_config import SchemaConfig - - # Define Avro schema - avro_schema = """ - { - "type": "record", - "name": "Customer", - "fields": [ - {"name": "customer_id", "type": "string"}, - {"name": "name", "type": "string"}, - {"name": "email", "type": "string"}, - {"name": "order_total", "type": "double"} - ] - } - """ - - schema_config = SchemaConfig( - value_schema_type="AVRO", - value_schema=avro_schema - ) - - @kafka_consumer(schema_config=schema_config) - def lambda_handler(event: ConsumerRecords, context): - for record in event.records: - # Log record coordinates for tracing - print(f"Processing message from topic '{record.topic}'") - print(f" Partition: {record.partition}, Offset: {record.offset}") - print(f" Produced at: {record.timestamp}") - - # Process message headers - if record.headers: - for header in record.headers: - print(f" Header: {header['key']} = {header['value']}") - - # Access the Avro deserialized message content - customer = record.value - print(f"Processing order for: {customer['name']}") - print(f"Order total: ${customer['order_total']}") - - # For debugging, you can access the original raw data - # print(f"Raw message: {record.raw_value}") - - return {"statusCode": 200} + ```python hl_lines="2 27 30" + --8<-- "examples/kafka/consumer/src/access_event_metadata.py" ``` #### Available metadata properties @@ -273,6 +252,8 @@ Each Kafka record contains important metadata that you can access alongside the | `value` | Deserialized message content | The actual business data | | `original_value` | Base64-encoded original message value | Debugging or custom deserialization | | `original_key` | Base64-encoded original message key | Debugging or custom deserialization | +| `value_schema_metadata` | Metadata about the value schema like `schemaId` and `dataFormat` | Data format and schemaId propagated when integrating with Schema Registry | +| `key_schema_metadata` | Metadata about the key schema like `schemaId` and `dataFormat` | Data format and schemaId propagated when integrating with Schema Registry | ### Custom output serializers @@ -281,256 +262,43 @@ Transform deserialized data into your preferred object types using output serial ???+ tip "Choosing the right output serializer" - **Pydantic models** offer robust data validation at runtime and excellent IDE support - **Dataclasses** provide lightweight type hints with better performance than Pydantic - - **Custom classes** give complete flexibility for complex transformations and business logic + - **Custom functions** give complete flexibility for complex transformations and business logic -=== "Pydantic Models" +=== "Pydantic models" - ```python - from pydantic import BaseModel, Field, EmailStr - from aws_lambda_powertools.utilities.kafka import kafka_consumer - from aws_lambda_powertools.utilities.kafka.schema_config import SchemaConfig - - # Define Pydantic model for strong validation - class Customer(BaseModel): - id: str - name: str - email: EmailStr - tier: str = Field(pattern=r'^(standard|premium|enterprise)$') - loyalty_points: int = Field(ge=0) - - def is_premium(self) -> bool: - return self.tier in ("premium", "enterprise") - - # Configure with Avro schema and Pydantic output - schema_config = SchemaConfig( - value_schema_type="JSON", - value_output_serializer=Customer - ) - - @kafka_consumer(schema_config=schema_config) - def lambda_handler(event, context): - for record in event.records: - # record.value is now a validated Customer instance - customer = record.value - - # Access model properties and methods - if customer.is_premium(): - print(f"Processing premium customer: {customer.name}") - apply_premium_benefits(customer) - - return {"statusCode": 200} + ```python hl_lines="1 10-13 17 24" + --8<-- "examples/kafka/consumer/src/serializing_output_with_pydantic.py" ``` -=== "Python Dataclasses" +=== "Dataclasses" - ```python - from dataclasses import dataclass - from datetime import datetime - from typing import List, Optional - from aws_lambda_powertools.utilities.kafka import kafka_consumer - from aws_lambda_powertools.utilities.kafka.schema_config import SchemaConfig - - # Define dataclasses for type hints and structure - @dataclass - class OrderItem: - product_id: str - quantity: int - unit_price: float - - @dataclass - class Order: - order_id: str - customer_id: str - items: List[OrderItem] - created_at: datetime - shipped_at: Optional[datetime] = None - - @property - def total(self) -> float: - return sum(item.quantity * item.unit_price for item in self.items) - - # Helper function to convert timestamps to datetime objects - def order_converter(data): - # Convert timestamps to datetime objects - data['created_at'] = datetime.fromtimestamp(data['created_at']/1000) - if data.get('shipped_at'): - data['shipped_at'] = datetime.fromtimestamp(data['shipped_at']/1000) - - # Convert order items - data['items'] = [OrderItem(**item) for item in data['items']] - return Order(**data) - - schema_config = SchemaConfig( - value_schema_type="JSON", - value_output_serializer=order_converter - ) - - @kafka_consumer(schema_config=schema_config) - def lambda_handler(event, context): - for record in event.records: - # record.value is now an Order object - order = record.value - - print(f"Processing order {order.order_id} with {len(order.items)} items") - print(f"Order total: ${order.total:.2f}") - - return {"statusCode": 200} + ```python hl_lines="1 10-14 18 25" + --8<-- "examples/kafka/consumer/src/serializing_output_with_dataclass.py" ``` -=== "Custom Class" +=== "Custom function" - ```python - from aws_lambda_powertools.utilities.kafka import kafka_consumer - from aws_lambda_powertools.utilities.kafka.schema_config import SchemaConfig - - # Custom class with business logic - class EnrichmentProcessor: - def __init__(self, data): - self.user_id = data['user_id'] - self.name = data['name'] - self.preferences = data.get('preferences', {}) - self._raw_data = data # Keep original data - self._enriched = False - self._recommendations = None - - def enrich(self, recommendation_service): - """Enrich user data with recommendations""" - if not self._enriched: - self._recommendations = recommendation_service.get_for_user(self.user_id) - self._enriched = True - return self - - @property - def recommendations(self): - if not self._enriched: - raise ValueError("Must call enrich() before accessing recommendations") - return self._recommendations - - # Configure with custom processor - schema_config = SchemaConfig( - value_schema_type="JSON", - value_output_serializer=EnrichmentProcessor - ) - - @kafka_consumer(schema_config=schema_config) - def lambda_handler(event, context): - # Initialize services - recommendation_service = RecommendationService() - - for record in event.records: - # record.value is now an EnrichmentProcessor - processor = record.value - - # Use the processor's methods for business logic - enriched = processor.enrich(recommendation_service) - - # Access computed properties - print(f"User: {enriched.name}") - print(f"Top recommendation: {enriched.recommendations[0]['title']}") - - return {"statusCode": 200} + ```python hl_lines="8-11 15" + --8<-- "examples/kafka/consumer/src/serializing_output_with_custom_function.py" ``` ### Error handling Handle errors gracefully when processing Kafka messages to ensure your application maintains resilience and provides clear diagnostic information. The Kafka consumer utility provides specific exception types to help you identify and handle deserialization issues effectively. +!!! info + Fields like `value`, `key`, and `headers` are decoded lazily, meaning they are only deserialized when accessed. This allows you to handle deserialization errors at the point of access rather than when the record is first processed. + === "Basic Error Handling" - ```python - from aws_lambda_powertools.utilities.kafka import kafka_consumer - from aws_lambda_powertools.utilities.kafka.exceptions import KafkaConsumerDeserializationError - from aws_lambda_powertools import Logger - - logger = Logger() - - @kafka_consumer(schema_config=schema_config) - def lambda_handler(event, context): - successful_records = 0 - failed_records = 0 - - for record in event.records: - try: - # Process each record individually to isolate failures - process_customer_data(record.value) - successful_records += 1 - - except KafkaConsumerDeserializationError as e: - failed_records += 1 - logger.error( - "Failed to deserialize Kafka message", - extra={ - "topic": record.topic, - "partition": record.partition, - "offset": record.offset, - "error": str(e) - } - ) - # Optionally send to DLQ or error topic - - except Exception as e: - failed_records += 1 - logger.error( - "Error processing Kafka message", - extra={ - "error": str(e), - "topic": record.topic - } - ) - - return { - "statusCode": 200, - "body": f"Processed {successful_records} records successfully, {failed_records} failed" - } + ```python hl_lines="3 28" + --8<-- "examples/kafka/consumer/src/working_with_record_error_handling.py" ``` === "Handling Schema Errors" - ```python - from aws_lambda_powertools.utilities.kafka import kafka_consumer - from aws_lambda_powertools.utilities.kafka.exceptions import ( - KafkaConsumerDeserializationError, - KafkaConsumerAvroSchemaParserError - ) - from aws_lambda_powertools import Logger, Metrics - from aws_lambda_powertools.metrics import MetricUnit - - logger = Logger() - metrics = Metrics() - - @kafka_consumer(schema_config=schema_config) - def lambda_handler(event, context): - metrics.add_metric(name="TotalRecords", unit=MetricUnit.Count, value=len(event.records)) - - for record in event.records: - try: - order = record.value - process_order(order) - metrics.add_metric(name="ProcessedRecords", unit=MetricUnit.Count, value=1) - - except KafkaConsumerAvroSchemaParserError as e: - logger.critical( - "Invalid Avro schema configuration", - extra={"error": str(e)} - ) - metrics.add_metric(name="SchemaErrors", unit=MetricUnit.Count, value=1) - # This requires fixing the schema - might want to raise to stop processing - raise - - except KafkaConsumerDeserializationError as e: - logger.warning( - "Message format doesn't match schema", - extra={ - "topic": record.topic, - "error": str(e), - "raw_data_sample": str(record.raw_value)[:100] + "..." if len(record.raw_value) > 100 else record.raw_value - } - ) - metrics.add_metric(name="DeserializationErrors", unit=MetricUnit.Count, value=1) - # Send to dead-letter queue for analysis - send_to_dlq(record) - - return {"statusCode": 200, "metrics": metrics.serialize_metric_set()} + ```python hl_lines="4-7 36 42" + --8<-- "examples/kafka/consumer/src/working_with_schema_errors.py" ``` #### Exception types @@ -541,81 +309,18 @@ Handle errors gracefully when processing Kafka messages to ensure your applicati | `KafkaConsumerAvroSchemaParserError` | Raised when parsing Avro schema definition fails | Syntax errors in schema JSON, invalid field types, or malformed schema | | `KafkaConsumerMissingSchemaError` | Raised when a required schema is not provided | Missing schema for AVRO or PROTOBUF formats (required parameter) | | `KafkaConsumerOutputSerializerError` | Raised when output serializer fails | Error in custom serializer function, incompatible data, or validation failures in Pydantic models | +| `KafkaConsumerDeserializationFormatMismatch` | Raised when SchemaConfig format is wrong | When integrating with Schema Registry, the data format is propagated, so Powertools for AWS catches this error if the format is different from the configured one. | ### Integrating with Idempotency -When processing Kafka messages in Lambda, failed batches can result in message reprocessing. The idempotency utility prevents duplicate processing by tracking which messages have already been handled, ensuring each message is processed exactly once. +When processing Kafka messages in Lambda, failed batches can result in message reprocessing. The [idempotency utility](idempotency.md){target="_blank"} prevents duplicate processing by tracking which messages have already been handled, ensuring each message is processed exactly once. -The Idempotency utility automatically stores the result of each successful operation, returning the cached result if the same message is processed again, which prevents potentially harmful duplicate operations like double-charging customers or double-counting metrics. +The [idempotency utility](idempotency.md){target="_blank"} automatically stores the result of each successful operation, returning the cached result if the same message is processed again, which prevents potentially harmful duplicate operations like double-charging customers or double-counting metrics. === "Idempotent Kafka Processing" - ```python - from aws_lambda_powertools import Logger - from aws_lambda_powertools.utilities.idempotency import ( - idempotent_function, - DynamoDBPersistenceLayer, - IdempotencyConfig - ) - from aws_lambda_powertools.utilities.kafka import kafka_consumer - from aws_lambda_powertools.utilities.kafka.schema_config import SchemaConfig - from aws_lambda_powertools.utilities.kafka.consumer_records import ConsumerRecords - - # Configure persistence layer for idempotency - persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable") - logger = Logger() - - # Configure Kafka schema - avro_schema = """ - { - "type": "record", - "name": "Payment", - "fields": [ - {"name": "payment_id", "type": "string"}, - {"name": "customer_id", "type": "string"}, - {"name": "amount", "type": "double"}, - {"name": "status", "type": "string"} - ] - } - """ - - schema_config = SchemaConfig( - value_schema_type="AVRO", - value_schema=avro_schema - ) - - @kafka_consumer(schema_config=schema_config) - def lambda_handler(event: ConsumerRecords, context): - for record in event.records: - # Process each message with idempotency protection - process_payment( - payment=record.value, - topic=record.topic, - partition=record.partition, - offset=record.offset - ) - - return {"statusCode": 200} - - @idempotent_function( - data_keyword_argument="payment", - config=IdempotencyConfig( - event_key_jmespath="topic & '-' & partition & '-' & offset" - ), - persistence_store=persistence_layer - ) - def process_payment(payment, topic, partition, offset): - """Process a payment exactly once""" - logger.info(f"Processing payment {payment['payment_id']} from {topic}-{partition}-{offset}") - - # Execute payment logic - payment_service.process( - payment_id=payment['payment_id'], - customer_id=payment['customer_id'], - amount=payment['amount'] - ) - - return {"success": True, "payment_id": payment['payment_id']} + ```python hl_lines="2 7 9 39-42" + --8<-- "examples/kafka/consumer/src/working_with_idempotency.py" ``` TIP: By using the Kafka record's unique coordinates (topic, partition, offset) as the idempotency key, you ensure that even if a batch fails and Lambda retries the messages, each message will be processed exactly once. @@ -629,33 +334,7 @@ When processing large Kafka messages in Lambda, be mindful of memory limitations === "Handling Large Messages" ```python - from aws_lambda_powertools.utilities.kafka import kafka_consumer - from aws_lambda_powertools import Logger - - logger = Logger() - - @kafka_consumer(schema_config=schema_config) - def lambda_handler(event, context): - for record in event.records: - # Example: Handle large product catalog updates differently - if record.topic == "product-catalog" and len(record.raw_value) > 3_000_000: - logger.info(f"Detected large product catalog update ({len(record.raw_value)} bytes)") - - # Example: Extract S3 reference from message - catalog_ref = record.value.get("s3_reference") - logger.info(f"Processing catalog from S3: {catalog_ref}") - - # Process via S3 reference instead of direct message content - result = process_catalog_from_s3( - bucket=catalog_ref["bucket"], - key=catalog_ref["key"] - ) - logger.info(f"Processed {result['product_count']} products from S3") - else: - # Regular processing for standard-sized messages - process_standard_message(record.value) - - return {"statusCode": 200} + --8<-- "examples/kafka/consumer/src/working_with_large_messages.py" ``` For large messages, consider these proven approaches: @@ -670,27 +349,7 @@ The number of Kafka records processed per Lambda invocation is controlled by you === "Batch size configuration" ```yaml - Resources: - OrderProcessingFunction: - Type: AWS::Serverless::Function - Properties: - Handler: app.lambda_handler - Runtime: python3.9 - Events: - KafkaEvent: - Type: MSK - Properties: - Stream: !GetAtt OrdersMSKCluster.Arn - Topics: - - order-events - - payment-events - # Configuration for optimal throughput/latency balance - BatchSize: 100 - MaximumBatchingWindowInSeconds: 5 - StartingPosition: LATEST - # Enable partial batch success reporting - FunctionResponseTypes: - - ReportBatchItemFailures + --8<-- "examples/kafka/consumer/sam/adjust_batch_size_configuration.yaml" ``` Different workloads benefit from different batch configurations: @@ -705,47 +364,8 @@ When using binary serialization formats across multiple programming languages, e === "Using Java naming convention" - ```python - # Example: Processing Java-produced Avro messages in Python - from aws_lambda_powertools.utilities.kafka import kafka_consumer - from aws_lambda_powertools.utilities.kafka.schema_config import SchemaConfig - - # Define schema that matches Java producer - avro_schema = """ - { - "namespace": "com.example.orders", - "type": "record", - "name": "OrderEvent", - "fields": [ - {"name": "orderId", "type": "string"}, - {"name": "customerId", "type": "string"}, - {"name": "totalAmount", "type": "double"}, - {"name": "orderDate", "type": "long", "logicalType": "timestamp-millis"} - ] - } - """ - - # Configure schema with field name normalization for Python style - class OrderProcessor: - def __init__(self, data): - # Convert Java camelCase to Python snake_case - self.order_id = data["orderId"] - self.customer_id = data["customerId"] - self.total_amount = data["totalAmount"] - # Convert Java timestamp to Python datetime - self.order_date = datetime.fromtimestamp(data["orderDate"]/1000) - - schema_config = SchemaConfig( - value_schema_type="AVRO", - value_schema=avro_schema, - value_output_serializer=OrderProcessor - ) - - @kafka_consumer(schema_config=schema_config) - def lambda_handler(event, context): - for record in event.records: - order = record.value # OrderProcessor instance - print(f"Processing order {order.order_id} from {order.order_date}") + ```python hl_lines="25-31" + --8<-- "examples/kafka/consumer/src/using_java_naming_convention.py" ``` Common cross-language challenges to address: @@ -771,7 +391,7 @@ For binary messages that fail to deserialize, examine the raw encoded data: # For troubleshooting purposes only import base64 -raw_bytes = base64.b64decode(record.raw_value) +raw_bytes = base64.b64decode(record.original_value) print(f"Message size: {len(raw_bytes)} bytes") print(f"First 50 bytes (hex): {raw_bytes[:50].hex()}") ``` @@ -908,66 +528,11 @@ Testing Kafka consumer functions is straightforward with pytest. You can create === "Testing your code" ```python - import pytest - import base64 - import json - from your_module import lambda_handler - - def test_process_json_message(): - """Test processing a simple JSON message""" - # Create a test Kafka event with JSON data - test_event = { - "eventSource": "aws:kafka", - "records": { - "orders-topic": [ - { - "topic": "orders-topic", - "partition": 0, - "offset": 15, - "timestamp": 1545084650987, - "timestampType": "CREATE_TIME", - "key": None, - "value": base64.b64encode(json.dumps({"order_id": "12345", "amount": 99.95}).encode()).decode(), - } - ] - } - } - - # Invoke the Lambda handler - response = lambda_handler(test_event, {}) - - # Verify the response - assert response["statusCode"] == 200 - assert response.get("processed") == 1 - - - def test_process_multiple_records(): - """Test processing multiple records in a batch""" - # Create a test event with multiple records - test_event = { - "eventSource": "aws:kafka", - "records": { - "customers-topic": [ - { - "topic": "customers-topic", - "partition": 0, - "offset": 10, - "value": base64.b64encode(json.dumps({"customer_id": "A1", "name": "Alice"}).encode()).decode(), - }, - { - "topic": "customers-topic", - "partition": 0, - "offset": 11, - "value": base64.b64encode(json.dumps({"customer_id": "B2", "name": "Bob"}).encode()).decode(), - } - ] - } - } - - # Invoke the Lambda handler - response = lambda_handler(test_event, {}) - - # Verify the response - assert response["statusCode"] == 200 - assert response.get("processed") == 2 + --8<-- "examples/kafka/consumer/src/testing_your_code.py" + ``` + +=== "Lambda handler" + + ```python + --8<-- "examples/kafka/consumer/src/lambda_handler_test.py" ``` diff --git a/examples/kafka/consumer/events/kafka_event_avro.json b/examples/kafka/consumer/events/kafka_event_avro.json new file mode 100644 index 00000000000..4bba0680cc1 --- /dev/null +++ b/examples/kafka/consumer/events/kafka_event_avro.json @@ -0,0 +1,21 @@ +{ + "eventSource":"aws:kafka", + "eventSourceArn":"arn:aws:kafka:eu-west-3:123456789012:cluster/powertools-kafka-esm/f138df86-9253-4d2a-b682-19e132396d4f-s3", + "bootstrapServers":"boot-z3majaui.c3.kafka-serverless.eu-west-3.amazonaws.com:9098", + "records":{ + "python-with-avro-doc-3":[ + { + "topic":"python-with-avro-doc", + "partition":3, + "offset":0, + "timestamp":1750547105187, + "timestampType":"CREATE_TIME", + "key":"MTIz", + "value":"AwBXT2qalUhN6oaj2CwEeaEWFFBvd2VydG9vbHMK", + "headers":[ + + ] + } + ] + } +} diff --git a/examples/kafka/consumer/events/kafka_event_json.json b/examples/kafka/consumer/events/kafka_event_json.json new file mode 100644 index 00000000000..3d8bc5a50da --- /dev/null +++ b/examples/kafka/consumer/events/kafka_event_json.json @@ -0,0 +1,21 @@ +{ + "eventSource":"aws:kafka", + "eventSourceArn":"arn:aws:kafka:eu-west-3:123456789012:cluster/powertools-kafka-esm/f138df86-9253-4d2a-b682-19e132396d4f-s3", + "bootstrapServers":"boot-z3majaui.c3.kafka-serverless.eu-west-3.amazonaws.com:9098", + "records":{ + "python-with-avro-doc-5":[ + { + "topic":"python-with-avro-doc", + "partition":5, + "offset":0, + "timestamp":1750547462087, + "timestampType":"CREATE_TIME", + "key":"MTIz", + "value":"eyJuYW1lIjogIlBvd2VydG9vbHMiLCAiYWdlIjogNX0=", + "headers":[ + + ] + } + ] + } +} diff --git a/examples/kafka/consumer/events/kafka_event_protobuf.json b/examples/kafka/consumer/events/kafka_event_protobuf.json new file mode 100644 index 00000000000..ad2848b314d --- /dev/null +++ b/examples/kafka/consumer/events/kafka_event_protobuf.json @@ -0,0 +1,21 @@ +{ + "eventSource":"aws:kafka", + "eventSourceArn":"arn:aws:kafka:eu-west-3:992382490249:cluster/powertools-kafka-esm/f138df86-9253-4d2a-b682-19e132396d4f-s3", + "bootstrapServers":"boot-z3majaui.c3.kafka-serverless.eu-west-3.amazonaws.com:9098", + "records":{ + "python-with-avro-doc-5":[ + { + "topic":"python-with-avro-doc", + "partition":5, + "offset":1, + "timestamp":1750624373324, + "timestampType":"CREATE_TIME", + "key":"MTIz", + "value":"Cgpwb3dlcnRvb2xzEAU=", + "headers":[ + + ] + } + ] + } +} diff --git a/examples/kafka/consumer/sam/adjust_batch_size_configuration.yaml b/examples/kafka/consumer/sam/adjust_batch_size_configuration.yaml new file mode 100644 index 00000000000..f471f25f712 --- /dev/null +++ b/examples/kafka/consumer/sam/adjust_batch_size_configuration.yaml @@ -0,0 +1,22 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Resources: + KafkaConsumerFunction: + Type: AWS::Serverless::Function + Properties: + Handler: app.lambda_handler + Runtime: python3.13 + Timeout: 30 + Events: + MSKEvent: + Type: MSK + Properties: + StartingPosition: LATEST + Stream: "arn:aws:lambda:eu-west-3:123456789012:event-source-mapping:11a2c814-dda3-4df8-b46f-4eeafac869ac" + Topics: + - my-topic-1 + BatchSize: 100 + MaximumBatchingWindowInSeconds: 5 + Policies: + - AWSLambdaMSKExecutionRole + diff --git a/examples/kafka/consumer/sam/getting_started_with_msk.yaml b/examples/kafka/consumer/sam/getting_started_with_msk.yaml new file mode 100644 index 00000000000..47a7cc59693 --- /dev/null +++ b/examples/kafka/consumer/sam/getting_started_with_msk.yaml @@ -0,0 +1,19 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Resources: + KafkaConsumerFunction: + Type: AWS::Serverless::Function + Properties: + Handler: app.lambda_handler + Runtime: python3.13 + Timeout: 30 + Events: + MSKEvent: + Type: MSK + Properties: + StartingPosition: LATEST + Stream: "arn:aws:lambda:eu-west-3:123456789012:event-source-mapping:11a2c814-dda3-4df8-b46f-4eeafac869ac" + Topics: + - my-topic-1 + Policies: + - AWSLambdaMSKExecutionRole diff --git a/examples/kafka/consumer/schemas/user.avsc b/examples/kafka/consumer/schemas/user.avsc new file mode 100644 index 00000000000..8415948b210 --- /dev/null +++ b/examples/kafka/consumer/schemas/user.avsc @@ -0,0 +1,9 @@ +{ + "type": "record", + "name": "User", + "namespace": "com.example", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "age", "type": "int"} + ] +} diff --git a/examples/kafka/consumer/schemas/user.json b/examples/kafka/consumer/schemas/user.json new file mode 100644 index 00000000000..e22c5bf33a4 --- /dev/null +++ b/examples/kafka/consumer/schemas/user.json @@ -0,0 +1,4 @@ +{ + "name": "...", + "age": "..." +} diff --git a/examples/kafka/consumer/schemas/user.proto b/examples/kafka/consumer/schemas/user.proto new file mode 100644 index 00000000000..5b795393e7d --- /dev/null +++ b/examples/kafka/consumer/schemas/user.proto @@ -0,0 +1,8 @@ +syntax = "proto3"; + +package com.example; + +message User { + string name = 1; + int32 age = 2; +} diff --git a/examples/kafka/consumer/src/access_event_metadata.py b/examples/kafka/consumer/src/access_event_metadata.py new file mode 100644 index 00000000000..c576e3da774 --- /dev/null +++ b/examples/kafka/consumer/src/access_event_metadata.py @@ -0,0 +1,44 @@ +from aws_lambda_powertools import Logger +from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer +from aws_lambda_powertools.utilities.typing import LambdaContext + +logger = Logger() + +# Define Avro schema +avro_schema = """ +{ + "type": "record", + "name": "User", + "namespace": "com.example", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "age", "type": "int"} + ] +} +""" + +schema_config = SchemaConfig( + value_schema_type="AVRO", + value_schema=avro_schema, +) + + +@kafka_consumer(schema_config=schema_config) +def lambda_handler(event: ConsumerRecords, context: LambdaContext): + for record in event.records: + # Log record coordinates for tracing + logger.info(f"Processing message from topic '{record.topic}'") + logger.info(f"Partition: {record.partition}, Offset: {record.offset}") + logger.info(f"Produced at: {record.timestamp}") + + # Process message headers + logger.info(f"Headers: {record.headers}") + + # Access the Avro deserialized message content + value = record.value + logger.info(f"Deserialized value: {value['name']}") + + # For debugging, you can access the original raw data + logger.info(f"Raw message: {record.original_value}") + + return {"statusCode": 200} diff --git a/examples/kafka/consumer/src/lambda_handler_test.py b/examples/kafka/consumer/src/lambda_handler_test.py new file mode 100644 index 00000000000..df73fca34b5 --- /dev/null +++ b/examples/kafka/consumer/src/lambda_handler_test.py @@ -0,0 +1,9 @@ +from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer +from aws_lambda_powertools.utilities.typing import LambdaContext + +schema_config = SchemaConfig(value_schema_type="JSON") + + +@kafka_consumer(schema_config=schema_config) +def lambda_handler(event: ConsumerRecords, context: LambdaContext): + return {"statusCode": 200, "processed": len(list(event.records))} diff --git a/examples/kafka/consumer/src/serializing_output_with_custom_function.py b/examples/kafka/consumer/src/serializing_output_with_custom_function.py new file mode 100644 index 00000000000..31fd55672ff --- /dev/null +++ b/examples/kafka/consumer/src/serializing_output_with_custom_function.py @@ -0,0 +1,26 @@ +from aws_lambda_powertools import Logger +from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer +from aws_lambda_powertools.utilities.typing import LambdaContext + +logger = Logger() + + +# Define custom serializer +def custom_serializer(data: dict): + del data["age"] # Remove age key just for example + return data + + +# Configure with Avro schema and function serializer +schema_config = SchemaConfig(value_schema_type="JSON", value_output_serializer=custom_serializer) + + +@kafka_consumer(schema_config=schema_config) +def lambda_handler(event: ConsumerRecords, context: LambdaContext): + for record in event.records: + # record.value now only contains the key "name" + value = record.value + + logger.info(f"Name: '{value['name']}'") + + return {"statusCode": 200} diff --git a/examples/kafka/consumer/src/serializing_output_with_dataclass.py b/examples/kafka/consumer/src/serializing_output_with_dataclass.py new file mode 100644 index 00000000000..2f840c7a119 --- /dev/null +++ b/examples/kafka/consumer/src/serializing_output_with_dataclass.py @@ -0,0 +1,30 @@ +from dataclasses import dataclass + +from aws_lambda_powertools import Logger +from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer +from aws_lambda_powertools.utilities.typing import LambdaContext + +logger = Logger() + + +# Define Dataclass model +@dataclass +class User: + name: str + age: int + + +# Configure with Avro schema and Dataclass output +schema_config = SchemaConfig(value_schema_type="JSON", value_output_serializer=User) + + +@kafka_consumer(schema_config=schema_config) +def lambda_handler(event: ConsumerRecords, context: LambdaContext): + for record in event.records: + # record.value is now a User instance + value: User = record.value + + logger.info(f"Name: '{value.name}'") + logger.info(f"Age: '{value.age}'") + + return {"statusCode": 200} diff --git a/examples/kafka/consumer/src/serializing_output_with_pydantic.py b/examples/kafka/consumer/src/serializing_output_with_pydantic.py new file mode 100644 index 00000000000..c1c7c97c6e3 --- /dev/null +++ b/examples/kafka/consumer/src/serializing_output_with_pydantic.py @@ -0,0 +1,29 @@ +from pydantic import BaseModel + +from aws_lambda_powertools import Logger +from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer +from aws_lambda_powertools.utilities.typing import LambdaContext + +logger = Logger() + + +# Define Pydantic model for strong validation +class User(BaseModel): + name: str + age: int + + +# Configure with Avro schema and Pydantic output +schema_config = SchemaConfig(value_schema_type="JSON", value_output_serializer=User) + + +@kafka_consumer(schema_config=schema_config) +def lambda_handler(event: ConsumerRecords, context: LambdaContext): + for record in event.records: + # record.value is now a User instance + value: User = record.value + + logger.info(f"Name: '{value.name}'") + logger.info(f"Age: '{value.age}'") + + return {"statusCode": 200} diff --git a/examples/kafka/consumer/src/testing_your_code.py b/examples/kafka/consumer/src/testing_your_code.py new file mode 100644 index 00000000000..1c1eee513a0 --- /dev/null +++ b/examples/kafka/consumer/src/testing_your_code.py @@ -0,0 +1,63 @@ +import base64 +import json + +from lambda_handler_test import lambda_handler + + +def test_process_json_message(): + """Test processing a simple JSON message""" + # Create a test Kafka event with JSON data + test_event = { + "eventSource": "aws:kafka", + "records": { + "orders-topic": [ + { + "topic": "orders-topic", + "partition": 0, + "offset": 15, + "timestamp": 1545084650987, + "timestampType": "CREATE_TIME", + "key": None, + "value": base64.b64encode(json.dumps({"order_id": "12345", "amount": 99.95}).encode()).decode(), + }, + ], + }, + } + + # Invoke the Lambda handler + response = lambda_handler(test_event, {}) + + # Verify the response + assert response["statusCode"] == 200 + assert response.get("processed") == 1 + + +def test_process_multiple_records(): + """Test processing multiple records in a batch""" + # Create a test event with multiple records + test_event = { + "eventSource": "aws:kafka", + "records": { + "customers-topic": [ + { + "topic": "customers-topic", + "partition": 0, + "offset": 10, + "value": base64.b64encode(json.dumps({"customer_id": "A1", "name": "Alice"}).encode()).decode(), + }, + { + "topic": "customers-topic", + "partition": 0, + "offset": 11, + "value": base64.b64encode(json.dumps({"customer_id": "B2", "name": "Bob"}).encode()).decode(), + }, + ], + }, + } + + # Invoke the Lambda handler + response = lambda_handler(test_event, {}) + + # Verify the response + assert response["statusCode"] == 200 + assert response.get("processed") == 2 diff --git a/examples/kafka/consumer/src/using_java_naming_convention.py b/examples/kafka/consumer/src/using_java_naming_convention.py new file mode 100644 index 00000000000..a7a02ed4cd3 --- /dev/null +++ b/examples/kafka/consumer/src/using_java_naming_convention.py @@ -0,0 +1,45 @@ +from datetime import datetime + +from aws_lambda_powertools import Logger +from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer +from aws_lambda_powertools.utilities.typing import LambdaContext + +logger = Logger() + +# Define schema that matches Java producer +avro_schema = """ +{ + "namespace": "com.example.orders", + "type": "record", + "name": "OrderEvent", + "fields": [ + {"name": "orderId", "type": "string"}, + {"name": "customerId", "type": "string"}, + {"name": "totalAmount", "type": "double"}, + {"name": "orderDate", "type": "long", "logicalType": "timestamp-millis"} + ] +} +""" + + +# Configure schema with field name normalization for Python style +def normalize_field_name(data: dict): + data["order_id"] = data["orderId"] + data["customer_id"] = data["customerId"] + data["total_amount"] = data["totalAmount"] + data["order_date"] = datetime.fromtimestamp(data["orderDate"] / 1000) + return data + + +schema_config = SchemaConfig( + value_schema_type="AVRO", + value_schema=avro_schema, + value_output_serializer=normalize_field_name, +) + + +@kafka_consumer(schema_config=schema_config) +def lambda_handler(event: ConsumerRecords, context: LambdaContext): + for record in event.records: + order = record.value # OrderProcessor instance + logger.info(f"Processing order {order['order_id']}") diff --git a/examples/kafka/consumer/src/working_with_idempotency.py b/examples/kafka/consumer/src/working_with_idempotency.py new file mode 100644 index 00000000000..608c887d7e9 --- /dev/null +++ b/examples/kafka/consumer/src/working_with_idempotency.py @@ -0,0 +1,49 @@ +from aws_lambda_powertools import Logger +from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, IdempotencyConfig, idempotent_function +from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer +from aws_lambda_powertools.utilities.typing import LambdaContext + +# Configure persistence layer for idempotency +persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable") +logger = Logger() +idempotency_config = IdempotencyConfig() + +# Configure Kafka schema +avro_schema = """ +{ + "type": "record", + "name": "Payment", + "fields": [ + {"name": "payment_id", "type": "string"}, + {"name": "customer_id", "type": "string"}, + {"name": "amount", "type": "double"}, + {"name": "status", "type": "string"} + ] +} +""" + +schema_config = SchemaConfig(value_schema_type="AVRO", value_schema=avro_schema) + + +@kafka_consumer(schema_config=schema_config) +def lambda_handler(event: ConsumerRecords, context: LambdaContext): + idempotency_config.register_lambda_context(context) + + for record in event.records: + # Process each message with idempotency protection + process_payment(payment=record.value, topic=record.topic, partition=record.partition, offset=record.offset) + + return {"statusCode": 200} + + +@idempotent_function( + data_keyword_argument="payment", + persistence_store=persistence_layer, +) +def process_payment(payment, topic, partition, offset): + """Process a payment exactly once""" + logger.info(f"Processing payment {payment['payment_id']} from {topic}-{partition}-{offset}") + + # Execute payment logic + + return {"success": True, "payment_id": payment["payment_id"]} diff --git a/examples/kafka/consumer/src/working_with_key_and_value.py b/examples/kafka/consumer/src/working_with_key_and_value.py index d6a1a9e02ce..e86a2636625 100644 --- a/examples/kafka/consumer/src/working_with_key_and_value.py +++ b/examples/kafka/consumer/src/working_with_key_and_value.py @@ -10,7 +10,7 @@ "type": "record", "name": "ProductKey", "fields": [ - {"name": "product_id", "type": "string"} + {"name": "region_name", "type": "string"} ] } """ @@ -18,11 +18,11 @@ value_schema = """ { "type": "record", - "name": "ProductInfo", + "name": "User", + "namespace": "com.example", "fields": [ {"name": "name", "type": "string"}, - {"name": "price", "type": "double"}, - {"name": "in_stock", "type": "boolean"} + {"name": "age", "type": "int"} ] } """ @@ -43,7 +43,7 @@ def lambda_handler(event: ConsumerRecords, context: LambdaContext): key = record.key value = record.value - logger.info(f"Processing key: {key['product_id']}") + logger.info(f"Processing key: {key['region_name']}") logger.info(f"Processing value: {value['name']}") return {"statusCode": 200} diff --git a/examples/kafka/consumer/src/working_with_key_only.py b/examples/kafka/consumer/src/working_with_key_only.py new file mode 100644 index 00000000000..9de98d2f92a --- /dev/null +++ b/examples/kafka/consumer/src/working_with_key_only.py @@ -0,0 +1,33 @@ +from aws_lambda_powertools import Logger +from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer +from aws_lambda_powertools.utilities.typing import LambdaContext + +logger = Logger() + +# Define schemas for key +key_schema = """ +{ + "type": "record", + "name": "ProductKey", + "fields": [ + {"name": "region_name", "type": "string"} + ] +} +""" + +# Configure key schema +schema_config = SchemaConfig( + key_schema_type="AVRO", + key_schema=key_schema, +) + + +@kafka_consumer(schema_config=schema_config) +def lambda_handler(event: ConsumerRecords, context: LambdaContext): + for record in event.records: + # Access deserialized key + key = record.key + + logger.info(f"Processing key: {key}") + + return {"statusCode": 200} diff --git a/examples/kafka/consumer/src/working_with_large_messages.py b/examples/kafka/consumer/src/working_with_large_messages.py new file mode 100644 index 00000000000..d607019acd9 --- /dev/null +++ b/examples/kafka/consumer/src/working_with_large_messages.py @@ -0,0 +1,38 @@ +from aws_lambda_powertools import Logger +from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer +from aws_lambda_powertools.utilities.typing import LambdaContext + +logger = Logger() + +schema_config = SchemaConfig(value_schema_type="JSON") + + +def process_standard_message(message): + # Simulate processing logic + logger.info(f"Processing standard message: {message}") + + +def process_catalog_from_s3(bucket, key): + # Simulate processing logic + return {"product_count": 1000} + + +@kafka_consumer(schema_config=schema_config) +def lambda_handler(event: ConsumerRecords, context: LambdaContext): + for record in event.records: + # Example: Handle large product catalog updates differently + if "large-product-update" in record.headers: + logger.info("Detected large product catalog update") + + # Example: Extract S3 reference from message + catalog_ref = record.value.get("s3_reference") + logger.info(f"Processing catalog from S3: {catalog_ref}") + + # Process via S3 reference instead of direct message content + result = process_catalog_from_s3(bucket=catalog_ref["bucket"], key=catalog_ref["key"]) + logger.info(f"Processed {result['product_count']} products from S3") + else: + # Regular processing for standard-sized messages + process_standard_message(record.value) + + return {"statusCode": 200} diff --git a/examples/kafka/consumer/src/working_with_record_error_handling.py b/examples/kafka/consumer/src/working_with_record_error_handling.py new file mode 100644 index 00000000000..bb96d63e1c2 --- /dev/null +++ b/examples/kafka/consumer/src/working_with_record_error_handling.py @@ -0,0 +1,40 @@ +from aws_lambda_powertools import Logger +from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer +from aws_lambda_powertools.utilities.kafka.exceptions import KafkaConsumerDeserializationError +from aws_lambda_powertools.utilities.typing import LambdaContext + +logger = Logger() + +schema_config = SchemaConfig(value_schema_type="JSON") + + +def process_customer_data(customer_data: dict): + # Simulate processing logic + if customer_data.get("name") == "error": + raise ValueError("Simulated processing error") + + +@kafka_consumer(schema_config=schema_config) +def lambda_handler(event: ConsumerRecords, context: LambdaContext): + successful_records = 0 + failed_records = 0 + + for record in event.records: + try: + # Process each record individually to isolate failures + process_customer_data(record.value) + successful_records += 1 + + except KafkaConsumerDeserializationError as e: + failed_records += 1 + logger.error( + "Failed to deserialize Kafka message", + extra={"topic": record.topic, "partition": record.partition, "offset": record.offset, "error": str(e)}, + ) + # Optionally send to DLQ or error topic + + except Exception as e: + failed_records += 1 + logger.error("Error processing Kafka message", extra={"error": str(e), "topic": record.topic}) + + return {"statusCode": 200, "body": f"Processed {successful_records} records successfully, {failed_records} failed"} diff --git a/examples/kafka/consumer/src/working_with_schema_errors.py b/examples/kafka/consumer/src/working_with_schema_errors.py new file mode 100644 index 00000000000..bc8eec6737d --- /dev/null +++ b/examples/kafka/consumer/src/working_with_schema_errors.py @@ -0,0 +1,48 @@ +from aws_lambda_powertools import Logger, Metrics +from aws_lambda_powertools.metrics import MetricUnit +from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer +from aws_lambda_powertools.utilities.kafka.exceptions import ( + KafkaConsumerAvroSchemaParserError, + KafkaConsumerDeserializationError, +) +from aws_lambda_powertools.utilities.typing import LambdaContext + +logger = Logger() +metrics = Metrics() + +schema_config = SchemaConfig(value_schema_type="JSON") + + +def process_order(order): + # Simulate processing logic + return "OK" + + +def send_to_dlq(record): + # Simulate sending to DLQ + logger.error("Sending to DLQ", record=record) + + +@kafka_consumer(schema_config=schema_config) +def lambda_handler(event: ConsumerRecords, context: LambdaContext): + metrics.add_metric(name="TotalRecords", unit=MetricUnit.Count, value=len(list(event.records))) + + for record in event.records: + try: + order = record.value + process_order(order) + metrics.add_metric(name="ProcessedRecords", unit=MetricUnit.Count, value=1) + + except KafkaConsumerAvroSchemaParserError as exc: + logger.error("Invalid Avro schema configuration", error=str(exc)) + metrics.add_metric(name="SchemaErrors", unit=MetricUnit.Count, value=1) + # This requires fixing the schema - might want to raise to stop processing + raise + + except KafkaConsumerDeserializationError as exc: + logger.warning("Message format doesn't match schema", topic=record.topic, error=str(exc)) + metrics.add_metric(name="DeserializationErrors", unit=MetricUnit.Count, value=1) + # Send to dead-letter queue for analysis + send_to_dlq(record) + + return {"statusCode": 200, "metrics": metrics.serialize_metric_set()} diff --git a/examples/kafka/consumer/src/working_with_value_only.py b/examples/kafka/consumer/src/working_with_value_only.py index 54907400d78..4acfa99f37e 100644 --- a/examples/kafka/consumer/src/working_with_value_only.py +++ b/examples/kafka/consumer/src/working_with_value_only.py @@ -4,19 +4,32 @@ logger = Logger() -# Configure only value schema -schema_config = SchemaConfig(value_schema_type="JSON") +# Define schemas for value +value_schema = """ +{ + "type": "record", + "name": "User", + "namespace": "com.example", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "age", "type": "int"} + ] +} +""" + +# Configure value schema +schema_config = SchemaConfig( + value_schema_type="AVRO", + value_schema=value_schema, +) @kafka_consumer(schema_config=schema_config) def lambda_handler(event: ConsumerRecords, context: LambdaContext): for record in event.records: - # Key remains as string (if present) - if record.key is not None: - logger.info(f"Message key: {record.key}") - - # Value is deserialized as JSON + # Access deserialized value value = record.value - logger.info(f"Order #{value['order_id']} - Total: ${value['total']}") + + logger.info(f"Processing value: {value['name']}") return {"statusCode": 200}