diff --git a/CHANGELOG.md b/CHANGELOG.md index 7712662..56c1203 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,15 @@ -## v0.1.0 (05/09/2024) +## v0.1.0 (24/09/2024) -- First release of `dynamodb_pyio`! \ No newline at end of file +✨NEW + +- Add a composite transform (`WriteToDynamoDB`) that writes records to a DynamoDB table with help of the [`batch_writer`](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/table/batch_writer.html) of the boto3 package. + - The batch writer will automatically handle buffering and sending items in batches. In addition, it will also automatically handle any unprocessed items and resend them as needed. +- Provide an option that handles duplicate records + - _dedup_pkeys_ - List of keys to be used for deduplicating items in buffer. +- Create a dedicated pipeline option (`DynamoDBOptions`) that reads AWS related values (e.g. `aws_access_key_id`) from pipeline arguments. +- Implement a metric object that records the total counts. +- Add unit and integration testing cases. The [moto](https://github.com/getmoto/moto) and [localstack-utils](https://docs.localstack.cloud/user-guide/tools/testing-utils/) are used for unit and integration testing respectively. +- Integrate with GitHub Actions by adding workflows for testing, documentation and release management. diff --git a/README.md b/README.md index 9588c7e..87dc9ab 100644 --- a/README.md +++ b/README.md @@ -6,17 +6,79 @@ ![pypi](https://img.shields.io/pypi/v/dynamodb_pyio) ![python](https://img.shields.io/pypi/pyversions/dynamodb_pyio) -Apache Beam Python I/O connector for Amazon DynamoDB +[Amazon DynamoDB](https://aws.amazon.com/dynamodb/) is a serverless, NoSQL database service that allows you to develop modern applications at any scale. The Apache Beam Python I/O connector for Amazon DynamoDB (`dynamodb_pyio`) aims to integrate with the database service by supporting source and sink connectors. Currently, the sink connector is available. ## Installation +The connector can be installed from PyPI. + ```bash $ pip install dynamodb_pyio ``` ## Usage -- TODO +### Sink Connector + +It has the main composite transform ([`WriteToDynamoDB`](https://beam-pyio.github.io/dynamodb_pyio/autoapi/dynamodb_pyio/io/index.html#dynamodb_pyio.io.WriteToDynamoDB)), and it expects a list or tuple _PCollection_ element. If the element is a tuple, the tuple's first element is taken. If the element is not of the accepted types, you can apply the [`GroupIntoBatches`](https://beam.apache.org/documentation/transforms/python/aggregation/groupintobatches/) or [`BatchElements`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements) transform beforehand. Then, the records of the element are written to a DynamoDB table with help of the [`batch_writer`](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/table/batch_writer.html) of the boto3 package. Note that the batch writer will automatically handle buffering and sending items in batches. In addition, it will also automatically handle any unprocessed items and resend them as needed. + +The transform also has an option that handles duplicate records. + +- _dedup_pkeys_ - List of keys to be used for deduplicating items in buffer. + +#### Sink Connector Example + +The transform can process many records, thanks to the _batch writer_. + +```python +import apache_beam as beam +from dynamodb_pyio.io import WriteToDynamoDB + +records = [{"pk": str(i), "sk": i} for i in range(500)] + +with beam.Pipeline() as p: + ( + p + | beam.Create([records]) + | WriteToDynamoDB(table_name=self.table_name) + ) +``` + +Duplicate records can be handled using the _dedup_pkeys_ option. + +```python +import apache_beam as beam +from dynamodb_pyio.io import WriteToDynamoDB + +records = [{"pk": str(1), "sk": 1} for _ in range(20)] + +with beam.Pipeline() as p: + ( + p + | beam.Create([records]) + | WriteToDynamoDB(table_name=self.table_name, dedup_pkeys=["pk", "sk"]) + ) +``` + +Batches of elements can be controlled further with the `BatchElements` or `GroupIntoBatches` transform + +```python +import apache_beam as beam +from apache_beam.transforms.util import BatchElements +from dynamodb_pyio.io import WriteToDynamoDB + +records = [{"pk": str(i), "sk": i} for i in range(100)] + +with beam.Pipeline() as p: + ( + p + | beam.Create(records) + | BatchElements(min_batch_size=50, max_batch_size=50) + | WriteToDynamoDB(table_name=self.table_name) + ) +``` + +See [Introduction to DynamoDB PyIO Sink Connector](/blog/2024/dynamodb-pyio-intro/) for more examples. ## Contributing diff --git a/examples/pipeline.py b/examples/pipeline.py new file mode 100644 index 0000000..e238ec2 --- /dev/null +++ b/examples/pipeline.py @@ -0,0 +1,149 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import argparse +import decimal +import logging + +import boto3 +from boto3.dynamodb.types import TypeDeserializer + +import apache_beam as beam +from apache_beam.transforms.util import BatchElements +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions + +from dynamodb_pyio.io import WriteToDynamoDB + +TABLE_NAME = "dynamodb-pyio-test" + + +def get_table(table_name): + resource = boto3.resource("dynamodb") + return resource.Table(table_name) + + +def create_table(table_name): + client = boto3.client("dynamodb") + try: + client.describe_table(TableName=table_name) + table_exists = True + except Exception: + table_exists = False + if not table_exists: + print(">> create table...") + params = { + "TableName": table_name, + "KeySchema": [ + {"AttributeName": "pk", "KeyType": "HASH"}, + {"AttributeName": "sk", "KeyType": "RANGE"}, + ], + "AttributeDefinitions": [ + {"AttributeName": "pk", "AttributeType": "S"}, + {"AttributeName": "sk", "AttributeType": "N"}, + ], + "BillingMode": "PAY_PER_REQUEST", + } + client.create_table(**params) + get_table(table_name).wait_until_exists() + + +def to_int_if_decimal(v): + try: + if isinstance(v, decimal.Decimal): + return int(v) + else: + return v + except Exception: + return v + + +def scan_table(**kwargs): + client = boto3.client("dynamodb") + paginator = client.get_paginator("scan") + page_iterator = paginator.paginate(**kwargs) + items = [] + for page in page_iterator: + for document in page["Items"]: + items.append( + { + k: to_int_if_decimal(TypeDeserializer().deserialize(v)) + for k, v in document.items() + } + ) + return sorted(items, key=lambda d: d["sk"]) + + +def truncate_table(table_name): + records = scan_table(TableName=TABLE_NAME) + table = get_table(table_name) + with table.batch_writer() as batch: + for record in records: + batch.delete_item(Key=record) + + +def mask_secrets(d: dict): + return {k: (v if k.find("aws") < 0 else "x" * len(v)) for k, v in d.items()} + + +def run(argv=None, save_main_session=True): + parser = argparse.ArgumentParser(description="Beam pipeline arguments") + parser.add_argument( + "--table_name", default=TABLE_NAME, type=str, help="DynamoDB table name" + ) + parser.add_argument( + "--num_records", default="500", type=int, help="Number of records" + ) + known_args, pipeline_args = parser.parse_known_args(argv) + + pipeline_options = PipelineOptions(pipeline_args) + pipeline_options.view_as(SetupOptions).save_main_session = save_main_session + print(f"known_args - {known_args}") + print(f"pipeline options - {mask_secrets(pipeline_options.display_data())}") + + with beam.Pipeline(options=pipeline_options) as p: + ( + p + | "CreateElements" + >> beam.Create( + [ + { + "pk": str(int(1 if i >= known_args.num_records / 2 else i)), + "sk": int(1 if i >= known_args.num_records / 2 else i), + } + for i in range(known_args.num_records) + ] + ) + | "BatchElements" >> BatchElements(min_batch_size=100, max_batch_size=200) + | "WriteToDynamoDB" + >> WriteToDynamoDB( + table_name=known_args.table_name, dedup_pkeys=["pk", "sk"] + ) + ) + + logging.getLogger().setLevel(logging.INFO) + logging.info("Building pipeline ...") + + +if __name__ == "__main__": + create_table(TABLE_NAME) + print(">> start pipeline...") + run() + print(">> check number of records...") + print(len(scan_table(TableName=TABLE_NAME))) + print(">> truncate table...") + truncate_table(TABLE_NAME) diff --git a/src/dynamodb_pyio/boto3_client.py b/src/dynamodb_pyio/boto3_client.py index cd258bd..03e8d5f 100644 --- a/src/dynamodb_pyio/boto3_client.py +++ b/src/dynamodb_pyio/boto3_client.py @@ -87,7 +87,7 @@ def put_items_batch(self, records: list, table_name: str, dedup_pkeys: list = No Args: records (list): Records to send into an Amazon SQS queue. table_name (str): Amazon DynamoDB table name. - dedup_pkeys (list, Optional): List of keys to be used for de-duplicating items in buffer. + dedup_pkeys (list, Optional): List of keys to be used for deduplicating items in buffer. Raises: DynamoDBClientError: DynamoDB client error. diff --git a/src/dynamodb_pyio/io.py b/src/dynamodb_pyio/io.py index 851b56d..a8f28a0 100644 --- a/src/dynamodb_pyio/io.py +++ b/src/dynamodb_pyio/io.py @@ -33,7 +33,7 @@ class _DynamoDBWriteFn(beam.DoFn): Args: table_name (str): Amazon DynamoDB table name. - dedup_pkeys (list): List of keys to be used for de-duplicating records in buffer. + dedup_pkeys (list): List of keys to be used for deduplicating records in buffer. options (Union[DynamoDBOptions, dict]): Options to create a boto3 dynamodb client. """ @@ -51,7 +51,7 @@ def __init__( Args: table_name (str): Amazon DynamoDB table name. - dedup_pkeys (list): List of keys to be used for de-duplicating records in buffer. + dedup_pkeys (list): List of keys to be used for deduplicating records in buffer. options (Union[DynamoDBOptions, dict]): Options to create a boto3 dynamodb client. """ super().__init__() @@ -80,7 +80,7 @@ class WriteToDynamoDB(beam.PTransform): Args: table_name (str): Amazon DynamoDB table name. - dedup_pkeys (list, Optional): List of keys to be used for de-duplicating records in buffer. + dedup_pkeys (list, Optional): List of keys to be used for deduplicating records in buffer. """ def __init__(self, table_name: str, dedup_pkeys: list = None): @@ -88,7 +88,7 @@ def __init__(self, table_name: str, dedup_pkeys: list = None): Args: table_name (str): Amazon DynamoDB table name. - dedup_pkeys (list, Optional): List of keys to be used for de-duplicating items in buffer. + dedup_pkeys (list, Optional): List of keys to be used for deduplicating items in buffer. """ super().__init__() self.table_name = table_name