Skip to content

Commit 8a63e4d

Browse files
committed
add IO transform and unit tests
1 parent fb3d98d commit 8a63e4d

File tree

4 files changed

+348
-31
lines changed

4 files changed

+348
-31
lines changed

src/dynamodb_pyio/boto3_client.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ def put_items_batch(self, records: list, table_name: str, dedup_pkeys: list = No
8686
8787
Args:
8888
records (list): Records to send into an Amazon SQS queue.
89-
table_name (str): DynamoDB table name.
89+
table_name (str): Amazon DynamoDB table name.
9090
dedup_pkeys (list, Optional): List of keys to be used for de-duplicating items in buffer.
9191
9292
Raises:
@@ -105,7 +105,3 @@ def put_items_batch(self, records: list, table_name: str, dedup_pkeys: list = No
105105
batch.put_item(Item=record)
106106
except Exception as e:
107107
raise DynamoDBClientError(str(e), get_http_error_code(e))
108-
109-
def close(self):
110-
"""Closes underlying endpoint connections."""
111-
self.client.close()

src/dynamodb_pyio/io.py

Lines changed: 101 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,101 @@
1-
def my_fn():
2-
return 1
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
import logging
19+
import typing
20+
import apache_beam as beam
21+
from apache_beam import metrics
22+
from apache_beam.pvalue import PCollection
23+
24+
from dynamodb_pyio.boto3_client import DynamoDBClient
25+
from dynamodb_pyio.options import DynamoDBOptions
26+
27+
28+
__all__ = ["WriteToDynamoDB"]
29+
30+
31+
class _DynamoDBWriteFn(beam.DoFn):
32+
"""Create the connector can send messages in batch to an Amazon DynamoDB table.
33+
34+
Args:
35+
table_name (str): Amazon DynamoDB table name.
36+
dedup_pkeys (list): List of keys to be used for de-duplicating records in buffer.
37+
options (Union[DynamoDBOptions, dict]): Options to create a boto3 dynamodb client.
38+
"""
39+
40+
total_elements_count = metrics.Metrics.counter(
41+
"_DynamoDBWriteFn", "total_elements_count"
42+
)
43+
44+
def __init__(
45+
self,
46+
table_name: str,
47+
dedup_pkeys: list,
48+
options: typing.Union[DynamoDBOptions, dict],
49+
):
50+
"""Constructor of _DynamoDBWriteFn
51+
52+
Args:
53+
table_name (str): Amazon DynamoDB table name.
54+
dedup_pkeys (list): List of keys to be used for de-duplicating records in buffer.
55+
options (Union[DynamoDBOptions, dict]): Options to create a boto3 dynamodb client.
56+
"""
57+
super().__init__()
58+
self.table_name = table_name
59+
self.dedup_pkeys = dedup_pkeys
60+
self.options = options
61+
62+
def start_bundle(self):
63+
self.client = DynamoDBClient(self.options)
64+
65+
def process(self, element):
66+
if isinstance(element, tuple):
67+
element = element[1]
68+
self.client.put_items_batch(element, self.table_name, self.dedup_pkeys)
69+
self.total_elements_count.inc(len(element))
70+
logging.info(f"total {len(element)} elements processed...")
71+
72+
73+
class WriteToDynamoDB(beam.PTransform):
74+
"""A transform that puts records to an Amazon DynamoDB table.
75+
76+
Takes an input PCollection and put them in batch using the boto3 package.
77+
For more information, visit the `Boto3 Documentation <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/table/batch_writer.html>`__.
78+
79+
Note that, if the PCollection element is a tuple (i.e. keyed stream), only the value is used to put records in batch.
80+
81+
Args:
82+
table_name (str): Amazon DynamoDB table name.
83+
dedup_pkeys (list, Optional): List of keys to be used for de-duplicating records in buffer.
84+
"""
85+
86+
def __init__(self, table_name: str, dedup_pkeys: list = None):
87+
"""Constructor of the transform that puts records into an Amazon DynamoDB table.
88+
89+
Args:
90+
table_name (str): Amazon DynamoDB table name.
91+
dedup_pkeys (list, Optional): List of keys to be used for de-duplicating items in buffer.
92+
"""
93+
super().__init__()
94+
self.table_name = table_name
95+
self.dedup_pkeys = dedup_pkeys
96+
97+
def expand(self, pcoll: PCollection):
98+
options = pcoll.pipeline.options.view_as(DynamoDBOptions)
99+
return pcoll | beam.ParDo(
100+
_DynamoDBWriteFn(self.table_name, self.dedup_pkeys, options)
101+
)

tests/boto3_client_test.py

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def scan_table(**kwargs):
6565
for k, v in document.items()
6666
}
6767
)
68-
return items
68+
return sorted(items, key=lambda d: d["sk"])
6969

7070

7171
@mock_aws
@@ -95,11 +95,25 @@ def setUp(self):
9595
create_table(params)
9696

9797
def test_put_items_batch(self):
98-
records = [{"pk": str(i), "sk": i} for i in range(3)]
98+
records = [{"pk": str(i), "sk": i} for i in range(20)]
9999
self.dynamodb_client.put_items_batch(records, self.table_name)
100+
self.assertListEqual(records, scan_table(TableName=self.table_name))
100101

102+
def test_put_items_batch_with_large_items(self):
103+
# batch writer automatically handles buffering and sending items in batches
104+
records = [{"pk": str(i), "sk": i} for i in range(5000)]
105+
self.dynamodb_client.put_items_batch(records, self.table_name)
101106
self.assertListEqual(records, scan_table(TableName=self.table_name))
102107

108+
def test_put_items_batch_with_non_existing_table(self):
109+
records = [{"pk": 1, "sk": str(1)} for i in range(20)]
110+
self.assertRaises(
111+
DynamoDBClientError,
112+
self.dynamodb_client.put_items_batch,
113+
records,
114+
"non-existing-table",
115+
)
116+
103117
def test_put_items_batch_with_unsupported_record_type(self):
104118
# records should be a list
105119
records = {}
@@ -110,37 +124,28 @@ def test_put_items_batch_with_unsupported_record_type(self):
110124
self.table_name,
111125
)
112126

113-
def test_put_items_batch_duplicate_records_without_dedup_keys(self):
114-
records = [{"pk": str(1), "sk": 1} for i in range(3)]
127+
def test_put_items_batch_with_wrong_data_types(self):
128+
# pk and sk should be string and number respectively
129+
records = [{"pk": 1, "sk": str(1)} for i in range(20)]
115130
self.assertRaises(
116131
DynamoDBClientError,
117132
self.dynamodb_client.put_items_batch,
118133
records,
119134
self.table_name,
120135
)
121136

122-
def test_put_items_batch_duplicate_records_with_dedup_keys(self):
123-
records = [{"pk": str(1), "sk": 1} for i in range(3)]
124-
self.dynamodb_client.put_items_batch(
125-
records, self.table_name, dedup_pkeys=["pk", "sk"]
126-
)
127-
self.assertListEqual(records[:1], scan_table(TableName=self.table_name))
128-
129-
def test_put_items_batch_with_wrong_data_types(self):
130-
# pk and sk should be string and number respectively
131-
records = [{"pk": 1, "sk": str(1)} for i in range(3)]
137+
def test_put_items_batch_duplicate_records_without_dedup_keys(self):
138+
records = [{"pk": str(1), "sk": 1} for i in range(20)]
132139
self.assertRaises(
133140
DynamoDBClientError,
134141
self.dynamodb_client.put_items_batch,
135142
records,
136143
self.table_name,
137144
)
138145

139-
def test_put_items_batch_with_large_items(self):
140-
records = [{"pk": str(i), "sk": i} for i in range(5000)]
141-
self.dynamodb_client.put_items_batch(records, self.table_name)
142-
143-
self.assertEqual(
144-
len(records),
145-
len(scan_table(TableName=self.table_name)),
146+
def test_put_items_batch_duplicate_records_with_dedup_keys(self):
147+
records = [{"pk": str(1), "sk": 1} for i in range(20)]
148+
self.dynamodb_client.put_items_batch(
149+
records, self.table_name, dedup_pkeys=["pk", "sk"]
146150
)
151+
self.assertListEqual(records[:1], scan_table(TableName=self.table_name))

0 commit comments

Comments
 (0)