Skip to content

docs(kafka): refactor kafka documentation #6854

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
637 changes: 101 additions & 536 deletions docs/utilities/kafka.md

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions examples/kafka/consumer/events/kafka_event_avro.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"eventSource":"aws:kafka",
"eventSourceArn":"arn:aws:kafka:eu-west-3:123456789012:cluster/powertools-kafka-esm/f138df86-9253-4d2a-b682-19e132396d4f-s3",
"bootstrapServers":"boot-z3majaui.c3.kafka-serverless.eu-west-3.amazonaws.com:9098",
"records":{
"python-with-avro-doc-3":[
{
"topic":"python-with-avro-doc",
"partition":3,
"offset":0,
"timestamp":1750547105187,
"timestampType":"CREATE_TIME",
"key":"MTIz",
"value":"AwBXT2qalUhN6oaj2CwEeaEWFFBvd2VydG9vbHMK",
"headers":[

]
}
]
}
}
21 changes: 21 additions & 0 deletions examples/kafka/consumer/events/kafka_event_json.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"eventSource":"aws:kafka",
"eventSourceArn":"arn:aws:kafka:eu-west-3:123456789012:cluster/powertools-kafka-esm/f138df86-9253-4d2a-b682-19e132396d4f-s3",
"bootstrapServers":"boot-z3majaui.c3.kafka-serverless.eu-west-3.amazonaws.com:9098",
"records":{
"python-with-avro-doc-5":[
{
"topic":"python-with-avro-doc",
"partition":5,
"offset":0,
"timestamp":1750547462087,
"timestampType":"CREATE_TIME",
"key":"MTIz",
"value":"eyJuYW1lIjogIlBvd2VydG9vbHMiLCAiYWdlIjogNX0=",
"headers":[

]
}
]
}
}
21 changes: 21 additions & 0 deletions examples/kafka/consumer/events/kafka_event_protobuf.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"eventSource":"aws:kafka",
"eventSourceArn":"arn:aws:kafka:eu-west-3:992382490249:cluster/powertools-kafka-esm/f138df86-9253-4d2a-b682-19e132396d4f-s3",
"bootstrapServers":"boot-z3majaui.c3.kafka-serverless.eu-west-3.amazonaws.com:9098",
"records":{
"python-with-avro-doc-5":[
{
"topic":"python-with-avro-doc",
"partition":5,
"offset":1,
"timestamp":1750624373324,
"timestampType":"CREATE_TIME",
"key":"MTIz",
"value":"Cgpwb3dlcnRvb2xzEAU=",
"headers":[

]
}
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Resources:
KafkaConsumerFunction:
Type: AWS::Serverless::Function
Properties:
Handler: app.lambda_handler
Runtime: python3.13
Timeout: 30
Events:
MSKEvent:
Type: MSK
Properties:
StartingPosition: LATEST
Stream: "arn:aws:lambda:eu-west-3:123456789012:event-source-mapping:11a2c814-dda3-4df8-b46f-4eeafac869ac"
Topics:
- my-topic-1
BatchSize: 100
MaximumBatchingWindowInSeconds: 5
Policies:
- AWSLambdaMSKExecutionRole

19 changes: 19 additions & 0 deletions examples/kafka/consumer/sam/getting_started_with_msk.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Resources:
KafkaConsumerFunction:
Type: AWS::Serverless::Function
Properties:
Handler: app.lambda_handler
Runtime: python3.13
Timeout: 30
Events:
MSKEvent:
Type: MSK
Properties:
StartingPosition: LATEST
Stream: "arn:aws:lambda:eu-west-3:123456789012:event-source-mapping:11a2c814-dda3-4df8-b46f-4eeafac869ac"
Topics:
- my-topic-1
Policies:
- AWSLambdaMSKExecutionRole
9 changes: 9 additions & 0 deletions examples/kafka/consumer/schemas/user.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"type": "record",
"name": "User",
"namespace": "com.example",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}
4 changes: 4 additions & 0 deletions examples/kafka/consumer/schemas/user.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"name": "...",
"age": "..."
}
8 changes: 8 additions & 0 deletions examples/kafka/consumer/schemas/user.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
syntax = "proto3";

package com.example;

message User {
string name = 1;
int32 age = 2;
}
44 changes: 44 additions & 0 deletions examples/kafka/consumer/src/access_event_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger()

# Define Avro schema
avro_schema = """
{
"type": "record",
"name": "User",
"namespace": "com.example",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}
"""

schema_config = SchemaConfig(
value_schema_type="AVRO",
value_schema=avro_schema,
)


@kafka_consumer(schema_config=schema_config)
def lambda_handler(event: ConsumerRecords, context: LambdaContext):
for record in event.records:
# Log record coordinates for tracing
logger.info(f"Processing message from topic '{record.topic}'")
logger.info(f"Partition: {record.partition}, Offset: {record.offset}")
logger.info(f"Produced at: {record.timestamp}")

# Process message headers
logger.info(f"Headers: {record.headers}")

# Access the Avro deserialized message content
value = record.value
logger.info(f"Deserialized value: {value['name']}")

# For debugging, you can access the original raw data
logger.info(f"Raw message: {record.original_value}")

return {"statusCode": 200}
9 changes: 9 additions & 0 deletions examples/kafka/consumer/src/lambda_handler_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer
from aws_lambda_powertools.utilities.typing import LambdaContext

schema_config = SchemaConfig(value_schema_type="JSON")


@kafka_consumer(schema_config=schema_config)
def lambda_handler(event: ConsumerRecords, context: LambdaContext):
return {"statusCode": 200, "processed": len(list(event.records))}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger()


# Define custom serializer
def custom_serializer(data: dict):
del data["age"] # Remove age key just for example
return data


# Configure with Avro schema and function serializer
schema_config = SchemaConfig(value_schema_type="JSON", value_output_serializer=custom_serializer)


@kafka_consumer(schema_config=schema_config)
def lambda_handler(event: ConsumerRecords, context: LambdaContext):
for record in event.records:
# record.value now only contains the key "name"
value = record.value

logger.info(f"Name: '{value['name']}'")

return {"statusCode": 200}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from dataclasses import dataclass

from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger()


# Define Dataclass model
@dataclass
class User:
name: str
age: int


# Configure with Avro schema and Dataclass output
schema_config = SchemaConfig(value_schema_type="JSON", value_output_serializer=User)


@kafka_consumer(schema_config=schema_config)
def lambda_handler(event: ConsumerRecords, context: LambdaContext):
for record in event.records:
# record.value is now a User instance
value: User = record.value

logger.info(f"Name: '{value.name}'")
logger.info(f"Age: '{value.age}'")

return {"statusCode": 200}
29 changes: 29 additions & 0 deletions examples/kafka/consumer/src/serializing_output_with_pydantic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from pydantic import BaseModel

from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger()


# Define Pydantic model for strong validation
class User(BaseModel):
name: str
age: int


# Configure with Avro schema and Pydantic output
schema_config = SchemaConfig(value_schema_type="JSON", value_output_serializer=User)


@kafka_consumer(schema_config=schema_config)
def lambda_handler(event: ConsumerRecords, context: LambdaContext):
for record in event.records:
# record.value is now a User instance
value: User = record.value

logger.info(f"Name: '{value.name}'")
logger.info(f"Age: '{value.age}'")

return {"statusCode": 200}
63 changes: 63 additions & 0 deletions examples/kafka/consumer/src/testing_your_code.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import base64
import json

from lambda_handler_test import lambda_handler


def test_process_json_message():
"""Test processing a simple JSON message"""
# Create a test Kafka event with JSON data
test_event = {
"eventSource": "aws:kafka",
"records": {
"orders-topic": [
{
"topic": "orders-topic",
"partition": 0,
"offset": 15,
"timestamp": 1545084650987,
"timestampType": "CREATE_TIME",
"key": None,
"value": base64.b64encode(json.dumps({"order_id": "12345", "amount": 99.95}).encode()).decode(),
},
],
},
}

# Invoke the Lambda handler
response = lambda_handler(test_event, {})

# Verify the response
assert response["statusCode"] == 200
assert response.get("processed") == 1


def test_process_multiple_records():
"""Test processing multiple records in a batch"""
# Create a test event with multiple records
test_event = {
"eventSource": "aws:kafka",
"records": {
"customers-topic": [
{
"topic": "customers-topic",
"partition": 0,
"offset": 10,
"value": base64.b64encode(json.dumps({"customer_id": "A1", "name": "Alice"}).encode()).decode(),
},
{
"topic": "customers-topic",
"partition": 0,
"offset": 11,
"value": base64.b64encode(json.dumps({"customer_id": "B2", "name": "Bob"}).encode()).decode(),
},
],
},
}

# Invoke the Lambda handler
response = lambda_handler(test_event, {})

# Verify the response
assert response["statusCode"] == 200
assert response.get("processed") == 2
45 changes: 45 additions & 0 deletions examples/kafka/consumer/src/using_java_naming_convention.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from datetime import datetime

from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger()

# Define schema that matches Java producer
avro_schema = """
{
"namespace": "com.example.orders",
"type": "record",
"name": "OrderEvent",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "customerId", "type": "string"},
{"name": "totalAmount", "type": "double"},
{"name": "orderDate", "type": "long", "logicalType": "timestamp-millis"}
]
}
"""


# Configure schema with field name normalization for Python style
def normalize_field_name(data: dict):
data["order_id"] = data["orderId"]
data["customer_id"] = data["customerId"]
data["total_amount"] = data["totalAmount"]
data["order_date"] = datetime.fromtimestamp(data["orderDate"] / 1000)
return data


schema_config = SchemaConfig(
value_schema_type="AVRO",
value_schema=avro_schema,
value_output_serializer=normalize_field_name,
)


@kafka_consumer(schema_config=schema_config)
def lambda_handler(event: ConsumerRecords, context: LambdaContext):
for record in event.records:
order = record.value # OrderProcessor instance
logger.info(f"Processing order {order['order_id']}")
Loading
Loading