Skip to content

Commit 08c1120

Browse files
authored
Merge pull request #8 from beam-pyio/feature/prepare-release
Prepare 0.1.0 release
2 parents 9b55196 + f73fcdb commit 08c1120

File tree

5 files changed

+229
-9
lines changed

5 files changed

+229
-9
lines changed

CHANGELOG.md

+11-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,15 @@
22

33
<!--next-version-placeholder-->
44

5-
## v0.1.0 (05/09/2024)
5+
## v0.1.0 (24/09/2024)
66

7-
- First release of `dynamodb_pyio`!
7+
✨NEW
8+
9+
- Add a composite transform (`WriteToDynamoDB`) that writes records to a DynamoDB table with help of the [`batch_writer`](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/table/batch_writer.html) of the boto3 package.
10+
- The batch writer will automatically handle buffering and sending items in batches. In addition, it will also automatically handle any unprocessed items and resend them as needed.
11+
- Provide an option that handles duplicate records
12+
- _dedup_pkeys_ - List of keys to be used for deduplicating items in buffer.
13+
- Create a dedicated pipeline option (`DynamoDBOptions`) that reads AWS related values (e.g. `aws_access_key_id`) from pipeline arguments.
14+
- Implement a metric object that records the total counts.
15+
- Add unit and integration testing cases. The [moto](https://github.com/getmoto/moto) and [localstack-utils](https://docs.localstack.cloud/user-guide/tools/testing-utils/) are used for unit and integration testing respectively.
16+
- Integrate with GitHub Actions by adding workflows for testing, documentation and release management.

README.md

+64-2
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,79 @@
66
![pypi](https://img.shields.io/pypi/v/dynamodb_pyio)
77
![python](https://img.shields.io/pypi/pyversions/dynamodb_pyio)
88

9-
Apache Beam Python I/O connector for Amazon DynamoDB
9+
[Amazon DynamoDB](https://aws.amazon.com/dynamodb/) is a serverless, NoSQL database service that allows you to develop modern applications at any scale. The Apache Beam Python I/O connector for Amazon DynamoDB (`dynamodb_pyio`) aims to integrate with the database service by supporting source and sink connectors. Currently, the sink connector is available.
1010

1111
## Installation
1212

13+
The connector can be installed from PyPI.
14+
1315
```bash
1416
$ pip install dynamodb_pyio
1517
```
1618

1719
## Usage
1820

19-
- TODO
21+
### Sink Connector
22+
23+
It has the main composite transform ([`WriteToDynamoDB`](https://beam-pyio.github.io/dynamodb_pyio/autoapi/dynamodb_pyio/io/index.html#dynamodb_pyio.io.WriteToDynamoDB)), and it expects a list or tuple _PCollection_ element. If the element is a tuple, the tuple's first element is taken. If the element is not of the accepted types, you can apply the [`GroupIntoBatches`](https://beam.apache.org/documentation/transforms/python/aggregation/groupintobatches/) or [`BatchElements`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements) transform beforehand. Then, the records of the element are written to a DynamoDB table with help of the [`batch_writer`](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/table/batch_writer.html) of the boto3 package. Note that the batch writer will automatically handle buffering and sending items in batches. In addition, it will also automatically handle any unprocessed items and resend them as needed.
24+
25+
The transform also has an option that handles duplicate records.
26+
27+
- _dedup_pkeys_ - List of keys to be used for deduplicating items in buffer.
28+
29+
#### Sink Connector Example
30+
31+
The transform can process many records, thanks to the _batch writer_.
32+
33+
```python
34+
import apache_beam as beam
35+
from dynamodb_pyio.io import WriteToDynamoDB
36+
37+
records = [{"pk": str(i), "sk": i} for i in range(500)]
38+
39+
with beam.Pipeline() as p:
40+
(
41+
p
42+
| beam.Create([records])
43+
| WriteToDynamoDB(table_name=self.table_name)
44+
)
45+
```
46+
47+
Duplicate records can be handled using the _dedup_pkeys_ option.
48+
49+
```python
50+
import apache_beam as beam
51+
from dynamodb_pyio.io import WriteToDynamoDB
52+
53+
records = [{"pk": str(1), "sk": 1} for _ in range(20)]
54+
55+
with beam.Pipeline() as p:
56+
(
57+
p
58+
| beam.Create([records])
59+
| WriteToDynamoDB(table_name=self.table_name, dedup_pkeys=["pk", "sk"])
60+
)
61+
```
62+
63+
Batches of elements can be controlled further with the `BatchElements` or `GroupIntoBatches` transform
64+
65+
```python
66+
import apache_beam as beam
67+
from apache_beam.transforms.util import BatchElements
68+
from dynamodb_pyio.io import WriteToDynamoDB
69+
70+
records = [{"pk": str(i), "sk": i} for i in range(100)]
71+
72+
with beam.Pipeline() as p:
73+
(
74+
p
75+
| beam.Create(records)
76+
| BatchElements(min_batch_size=50, max_batch_size=50)
77+
| WriteToDynamoDB(table_name=self.table_name)
78+
)
79+
```
80+
81+
See [Introduction to DynamoDB PyIO Sink Connector](/blog/2024/dynamodb-pyio-intro/) for more examples.
2082

2183
## Contributing
2284

examples/pipeline.py

+149
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
import argparse
19+
import decimal
20+
import logging
21+
22+
import boto3
23+
from boto3.dynamodb.types import TypeDeserializer
24+
25+
import apache_beam as beam
26+
from apache_beam.transforms.util import BatchElements
27+
from apache_beam.options.pipeline_options import PipelineOptions
28+
from apache_beam.options.pipeline_options import SetupOptions
29+
30+
from dynamodb_pyio.io import WriteToDynamoDB
31+
32+
TABLE_NAME = "dynamodb-pyio-test"
33+
34+
35+
def get_table(table_name):
36+
resource = boto3.resource("dynamodb")
37+
return resource.Table(table_name)
38+
39+
40+
def create_table(table_name):
41+
client = boto3.client("dynamodb")
42+
try:
43+
client.describe_table(TableName=table_name)
44+
table_exists = True
45+
except Exception:
46+
table_exists = False
47+
if not table_exists:
48+
print(">> create table...")
49+
params = {
50+
"TableName": table_name,
51+
"KeySchema": [
52+
{"AttributeName": "pk", "KeyType": "HASH"},
53+
{"AttributeName": "sk", "KeyType": "RANGE"},
54+
],
55+
"AttributeDefinitions": [
56+
{"AttributeName": "pk", "AttributeType": "S"},
57+
{"AttributeName": "sk", "AttributeType": "N"},
58+
],
59+
"BillingMode": "PAY_PER_REQUEST",
60+
}
61+
client.create_table(**params)
62+
get_table(table_name).wait_until_exists()
63+
64+
65+
def to_int_if_decimal(v):
66+
try:
67+
if isinstance(v, decimal.Decimal):
68+
return int(v)
69+
else:
70+
return v
71+
except Exception:
72+
return v
73+
74+
75+
def scan_table(**kwargs):
76+
client = boto3.client("dynamodb")
77+
paginator = client.get_paginator("scan")
78+
page_iterator = paginator.paginate(**kwargs)
79+
items = []
80+
for page in page_iterator:
81+
for document in page["Items"]:
82+
items.append(
83+
{
84+
k: to_int_if_decimal(TypeDeserializer().deserialize(v))
85+
for k, v in document.items()
86+
}
87+
)
88+
return sorted(items, key=lambda d: d["sk"])
89+
90+
91+
def truncate_table(table_name):
92+
records = scan_table(TableName=TABLE_NAME)
93+
table = get_table(table_name)
94+
with table.batch_writer() as batch:
95+
for record in records:
96+
batch.delete_item(Key=record)
97+
98+
99+
def mask_secrets(d: dict):
100+
return {k: (v if k.find("aws") < 0 else "x" * len(v)) for k, v in d.items()}
101+
102+
103+
def run(argv=None, save_main_session=True):
104+
parser = argparse.ArgumentParser(description="Beam pipeline arguments")
105+
parser.add_argument(
106+
"--table_name", default=TABLE_NAME, type=str, help="DynamoDB table name"
107+
)
108+
parser.add_argument(
109+
"--num_records", default="500", type=int, help="Number of records"
110+
)
111+
known_args, pipeline_args = parser.parse_known_args(argv)
112+
113+
pipeline_options = PipelineOptions(pipeline_args)
114+
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
115+
print(f"known_args - {known_args}")
116+
print(f"pipeline options - {mask_secrets(pipeline_options.display_data())}")
117+
118+
with beam.Pipeline(options=pipeline_options) as p:
119+
(
120+
p
121+
| "CreateElements"
122+
>> beam.Create(
123+
[
124+
{
125+
"pk": str(int(1 if i >= known_args.num_records / 2 else i)),
126+
"sk": int(1 if i >= known_args.num_records / 2 else i),
127+
}
128+
for i in range(known_args.num_records)
129+
]
130+
)
131+
| "BatchElements" >> BatchElements(min_batch_size=100, max_batch_size=200)
132+
| "WriteToDynamoDB"
133+
>> WriteToDynamoDB(
134+
table_name=known_args.table_name, dedup_pkeys=["pk", "sk"]
135+
)
136+
)
137+
138+
logging.getLogger().setLevel(logging.INFO)
139+
logging.info("Building pipeline ...")
140+
141+
142+
if __name__ == "__main__":
143+
create_table(TABLE_NAME)
144+
print(">> start pipeline...")
145+
run()
146+
print(">> check number of records...")
147+
print(len(scan_table(TableName=TABLE_NAME)))
148+
print(">> truncate table...")
149+
truncate_table(TABLE_NAME)

src/dynamodb_pyio/boto3_client.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ def put_items_batch(self, records: list, table_name: str, dedup_pkeys: list = No
8787
Args:
8888
records (list): Records to send into an Amazon SQS queue.
8989
table_name (str): Amazon DynamoDB table name.
90-
dedup_pkeys (list, Optional): List of keys to be used for de-duplicating items in buffer.
90+
dedup_pkeys (list, Optional): List of keys to be used for deduplicating items in buffer.
9191
9292
Raises:
9393
DynamoDBClientError: DynamoDB client error.

src/dynamodb_pyio/io.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class _DynamoDBWriteFn(beam.DoFn):
3333
3434
Args:
3535
table_name (str): Amazon DynamoDB table name.
36-
dedup_pkeys (list): List of keys to be used for de-duplicating records in buffer.
36+
dedup_pkeys (list): List of keys to be used for deduplicating records in buffer.
3737
options (Union[DynamoDBOptions, dict]): Options to create a boto3 dynamodb client.
3838
"""
3939

@@ -51,7 +51,7 @@ def __init__(
5151
5252
Args:
5353
table_name (str): Amazon DynamoDB table name.
54-
dedup_pkeys (list): List of keys to be used for de-duplicating records in buffer.
54+
dedup_pkeys (list): List of keys to be used for deduplicating records in buffer.
5555
options (Union[DynamoDBOptions, dict]): Options to create a boto3 dynamodb client.
5656
"""
5757
super().__init__()
@@ -80,15 +80,15 @@ class WriteToDynamoDB(beam.PTransform):
8080
8181
Args:
8282
table_name (str): Amazon DynamoDB table name.
83-
dedup_pkeys (list, Optional): List of keys to be used for de-duplicating records in buffer.
83+
dedup_pkeys (list, Optional): List of keys to be used for deduplicating records in buffer.
8484
"""
8585

8686
def __init__(self, table_name: str, dedup_pkeys: list = None):
8787
"""Constructor of the transform that puts records into an Amazon DynamoDB table.
8888
8989
Args:
9090
table_name (str): Amazon DynamoDB table name.
91-
dedup_pkeys (list, Optional): List of keys to be used for de-duplicating items in buffer.
91+
dedup_pkeys (list, Optional): List of keys to be used for deduplicating items in buffer.
9292
"""
9393
super().__init__()
9494
self.table_name = table_name

0 commit comments

Comments
 (0)