Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@

<!--next-version-placeholder-->

## v0.1.0 (05/09/2024)
## v0.1.0 (24/09/2024)

- First release of `dynamodb_pyio`!
✨NEW

- Add a composite transform (`WriteToDynamoDB`) that writes records to a DynamoDB table with help of the [`batch_writer`](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/table/batch_writer.html) of the boto3 package.
- The batch writer will automatically handle buffering and sending items in batches. In addition, it will also automatically handle any unprocessed items and resend them as needed.
- Provide an option that handles duplicate records
- _dedup_pkeys_ - List of keys to be used for deduplicating items in buffer.
- Create a dedicated pipeline option (`DynamoDBOptions`) that reads AWS related values (e.g. `aws_access_key_id`) from pipeline arguments.
- Implement a metric object that records the total counts.
- Add unit and integration testing cases. The [moto](https://github.com/getmoto/moto) and [localstack-utils](https://docs.localstack.cloud/user-guide/tools/testing-utils/) are used for unit and integration testing respectively.
- Integrate with GitHub Actions by adding workflows for testing, documentation and release management.
66 changes: 64 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,79 @@
![pypi](https://img.shields.io/pypi/v/dynamodb_pyio)
![python](https://img.shields.io/pypi/pyversions/dynamodb_pyio)

Apache Beam Python I/O connector for Amazon DynamoDB
[Amazon DynamoDB](https://aws.amazon.com/dynamodb/) is a serverless, NoSQL database service that allows you to develop modern applications at any scale. The Apache Beam Python I/O connector for Amazon DynamoDB (`dynamodb_pyio`) aims to integrate with the database service by supporting source and sink connectors. Currently, the sink connector is available.

## Installation

The connector can be installed from PyPI.

```bash
$ pip install dynamodb_pyio
```

## Usage

- TODO
### Sink Connector

It has the main composite transform ([`WriteToDynamoDB`](https://beam-pyio.github.io/dynamodb_pyio/autoapi/dynamodb_pyio/io/index.html#dynamodb_pyio.io.WriteToDynamoDB)), and it expects a list or tuple _PCollection_ element. If the element is a tuple, the tuple's first element is taken. If the element is not of the accepted types, you can apply the [`GroupIntoBatches`](https://beam.apache.org/documentation/transforms/python/aggregation/groupintobatches/) or [`BatchElements`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements) transform beforehand. Then, the records of the element are written to a DynamoDB table with help of the [`batch_writer`](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/table/batch_writer.html) of the boto3 package. Note that the batch writer will automatically handle buffering and sending items in batches. In addition, it will also automatically handle any unprocessed items and resend them as needed.

The transform also has an option that handles duplicate records.

- _dedup_pkeys_ - List of keys to be used for deduplicating items in buffer.

#### Sink Connector Example

The transform can process many records, thanks to the _batch writer_.

```python
import apache_beam as beam
from dynamodb_pyio.io import WriteToDynamoDB

records = [{"pk": str(i), "sk": i} for i in range(500)]

with beam.Pipeline() as p:
(
p
| beam.Create([records])
| WriteToDynamoDB(table_name=self.table_name)
)
```

Duplicate records can be handled using the _dedup_pkeys_ option.

```python
import apache_beam as beam
from dynamodb_pyio.io import WriteToDynamoDB

records = [{"pk": str(1), "sk": 1} for _ in range(20)]

with beam.Pipeline() as p:
(
p
| beam.Create([records])
| WriteToDynamoDB(table_name=self.table_name, dedup_pkeys=["pk", "sk"])
)
```

Batches of elements can be controlled further with the `BatchElements` or `GroupIntoBatches` transform

```python
import apache_beam as beam
from apache_beam.transforms.util import BatchElements
from dynamodb_pyio.io import WriteToDynamoDB

records = [{"pk": str(i), "sk": i} for i in range(100)]

with beam.Pipeline() as p:
(
p
| beam.Create(records)
| BatchElements(min_batch_size=50, max_batch_size=50)
| WriteToDynamoDB(table_name=self.table_name)
)
```

See [Introduction to DynamoDB PyIO Sink Connector](/blog/2024/dynamodb-pyio-intro/) for more examples.

## Contributing

Expand Down
149 changes: 149 additions & 0 deletions examples/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import argparse
import decimal
import logging

import boto3
from boto3.dynamodb.types import TypeDeserializer

import apache_beam as beam
from apache_beam.transforms.util import BatchElements
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

from dynamodb_pyio.io import WriteToDynamoDB

TABLE_NAME = "dynamodb-pyio-test"


def get_table(table_name):
resource = boto3.resource("dynamodb")
return resource.Table(table_name)


def create_table(table_name):
client = boto3.client("dynamodb")
try:
client.describe_table(TableName=table_name)
table_exists = True
except Exception:
table_exists = False
if not table_exists:
print(">> create table...")
params = {
"TableName": table_name,
"KeySchema": [
{"AttributeName": "pk", "KeyType": "HASH"},
{"AttributeName": "sk", "KeyType": "RANGE"},
],
"AttributeDefinitions": [
{"AttributeName": "pk", "AttributeType": "S"},
{"AttributeName": "sk", "AttributeType": "N"},
],
"BillingMode": "PAY_PER_REQUEST",
}
client.create_table(**params)
get_table(table_name).wait_until_exists()


def to_int_if_decimal(v):
try:
if isinstance(v, decimal.Decimal):
return int(v)
else:
return v
except Exception:
return v


def scan_table(**kwargs):
client = boto3.client("dynamodb")
paginator = client.get_paginator("scan")
page_iterator = paginator.paginate(**kwargs)
items = []
for page in page_iterator:
for document in page["Items"]:
items.append(
{
k: to_int_if_decimal(TypeDeserializer().deserialize(v))
for k, v in document.items()
}
)
return sorted(items, key=lambda d: d["sk"])


def truncate_table(table_name):
records = scan_table(TableName=TABLE_NAME)
table = get_table(table_name)
with table.batch_writer() as batch:
for record in records:
batch.delete_item(Key=record)


def mask_secrets(d: dict):
return {k: (v if k.find("aws") < 0 else "x" * len(v)) for k, v in d.items()}


def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser(description="Beam pipeline arguments")
parser.add_argument(
"--table_name", default=TABLE_NAME, type=str, help="DynamoDB table name"
)
parser.add_argument(
"--num_records", default="500", type=int, help="Number of records"
)
known_args, pipeline_args = parser.parse_known_args(argv)

pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
print(f"known_args - {known_args}")
print(f"pipeline options - {mask_secrets(pipeline_options.display_data())}")

with beam.Pipeline(options=pipeline_options) as p:
(
p
| "CreateElements"
>> beam.Create(
[
{
"pk": str(int(1 if i >= known_args.num_records / 2 else i)),
"sk": int(1 if i >= known_args.num_records / 2 else i),
}
for i in range(known_args.num_records)
]
)
| "BatchElements" >> BatchElements(min_batch_size=100, max_batch_size=200)
| "WriteToDynamoDB"
>> WriteToDynamoDB(
table_name=known_args.table_name, dedup_pkeys=["pk", "sk"]
)
)

logging.getLogger().setLevel(logging.INFO)
logging.info("Building pipeline ...")


if __name__ == "__main__":
create_table(TABLE_NAME)
print(">> start pipeline...")
run()
print(">> check number of records...")
print(len(scan_table(TableName=TABLE_NAME)))
print(">> truncate table...")
truncate_table(TABLE_NAME)
2 changes: 1 addition & 1 deletion src/dynamodb_pyio/boto3_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def put_items_batch(self, records: list, table_name: str, dedup_pkeys: list = No
Args:
records (list): Records to send into an Amazon SQS queue.
table_name (str): Amazon DynamoDB table name.
dedup_pkeys (list, Optional): List of keys to be used for de-duplicating items in buffer.
dedup_pkeys (list, Optional): List of keys to be used for deduplicating items in buffer.

Raises:
DynamoDBClientError: DynamoDB client error.
Expand Down
8 changes: 4 additions & 4 deletions src/dynamodb_pyio/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class _DynamoDBWriteFn(beam.DoFn):

Args:
table_name (str): Amazon DynamoDB table name.
dedup_pkeys (list): List of keys to be used for de-duplicating records in buffer.
dedup_pkeys (list): List of keys to be used for deduplicating records in buffer.
options (Union[DynamoDBOptions, dict]): Options to create a boto3 dynamodb client.
"""

Expand All @@ -51,7 +51,7 @@ def __init__(

Args:
table_name (str): Amazon DynamoDB table name.
dedup_pkeys (list): List of keys to be used for de-duplicating records in buffer.
dedup_pkeys (list): List of keys to be used for deduplicating records in buffer.
options (Union[DynamoDBOptions, dict]): Options to create a boto3 dynamodb client.
"""
super().__init__()
Expand Down Expand Up @@ -80,15 +80,15 @@ class WriteToDynamoDB(beam.PTransform):

Args:
table_name (str): Amazon DynamoDB table name.
dedup_pkeys (list, Optional): List of keys to be used for de-duplicating records in buffer.
dedup_pkeys (list, Optional): List of keys to be used for deduplicating records in buffer.
"""

def __init__(self, table_name: str, dedup_pkeys: list = None):
"""Constructor of the transform that puts records into an Amazon DynamoDB table.

Args:
table_name (str): Amazon DynamoDB table name.
dedup_pkeys (list, Optional): List of keys to be used for de-duplicating items in buffer.
dedup_pkeys (list, Optional): List of keys to be used for deduplicating items in buffer.
"""
super().__init__()
self.table_name = table_name
Expand Down
Loading