Skip to content

Commit 326e791

Browse files
authored
Merge pull request #16 from beam-pyio/feature/prepare-release
Preparation for release
2 parents 28669d0 + 215652c commit 326e791

File tree

10 files changed

+517
-236
lines changed

10 files changed

+517
-236
lines changed

CHANGELOG.md

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22

33
<!--next-version-placeholder-->
44

5-
## v0.1.0 (11/06/2024)
5+
## v0.1.0 (23/07/2024)
66

7-
- First release of `firehose_pyio`!
7+
✨NEW
8+
9+
- Add a composite transform (`WriteToFirehose`) that puts records into a Firehose delivery stream in batch, using the [`put_record_batch`](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/firehose/client/put_record_batch.html) method of the boto3 package.
10+
- Create a dedicated pipeline option (`FirehoseOptions`) that reads AWS related values (e.g. `aws_access_key_id`) from pipeline arguments.
11+
- Implement metric objects that record the total, succeeded and failed elements counts.
12+
- Add unit and integration testing cases. The [moto](https://github.com/getmoto/moto) and [localstack-utils](https://docs.localstack.cloud/user-guide/tools/testing-utils/) are used for unit and integration testing respectively. Also, a custom test client is created for testing retry behavior, which is not supported by the moto package.
13+
- Integrate with GitHub Actions by adding workflows for testing, documentation and release management.

README.md

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
![python](https://img.shields.io/badge/python-3.8%2C%203.9%2C%203.10%2C%203.11%2C%203.12-blue)
77
![os](https://img.shields.io/badge/OS-Ubuntu%2C%20Mac%2C%20Windows-purple)
88

9-
Apache Beam Python I/O connector for Amazon Firehose
9+
[Amazon Data Firehose](https://aws.amazon.com/firehose/) is a fully managed service for delivering real-time streaming data to destinations such as Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon OpenSearch Service and Amazon OpenSearch Serverless. The Apache Beam Python I/O connector for Amazon Data Firehose (`firehose_pyio`) provides a data sink feature that facilitates integration with those services.
1010

1111
## Installation
1212

@@ -16,7 +16,61 @@ $ pip install firehose_pyio
1616

1717
## Usage
1818

19-
- TODO
19+
The connector has the main composite transform ([`WriteToFirehose`](https://beam-pyio.github.io/firehose_pyio/autoapi/firehose_pyio/io/index.html#firehose_pyio.io.WriteToFirehose)), and it expects a list or tuple _PCollection_ element. If the element is a tuple, the tuple's first element is taken. If the element is not of the accepted types, you can apply the [`GroupIntoBatches`](https://beam.apache.org/documentation/transforms/python/aggregation/groupintobatches/) or [`BatchElements`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements) transform beforehand. Then, the element is sent into a Firehose delivery stream using the [`put_record_batch`](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/firehose/client/put_record_batch.html) method of the boto3 package. Note that the above batch transforms can also be useful to overcome the API limitation listed below.
20+
21+
- Each `PutRecordBatch` request supports up to 500 records. Each record in the request can be as large as 1,000 KB (before base64 encoding), up to a limit of 4 MB for the entire request. These limits cannot be changed.
22+
23+
The transform also has options that control individual records or handling failed records.
24+
25+
- _jsonify_ - A flag that indicates whether to convert a record into Json. Note that a record should be of _bytes_, _bytearray_ or file-like object, and, if it is not of a supported type (e.g. integer), we can convert it into a Json string by specifying this flag to _True_.
26+
- _multiline_ - A flag that indicates whether to add a new line character (`\n`) to each record. It is useful to save records into a _CSV_ or _Jsonline_ file.
27+
- _max_trials_ - The maximum number of trials when there is one or more failed records - it defaults to 3. Note that failed records after all trials are returned, which allows users to determine how to handle them subsequently.
28+
29+
### Example
30+
31+
If a _PCollection_ element is key-value pair (i.e. keyed stream), it can be batched in group using the `GroupIntoBatches` transform before it is connected into the main transform.
32+
33+
```python
34+
import apache_beam as beam
35+
from apache_beam import GroupIntoBatches
36+
from firehose_pyio.io import WriteToFirehose
37+
38+
with beam.Pipeline(options=pipeline_options) as p:
39+
(
40+
p
41+
| beam.Create([(1, "one"), (2, "three"), (1, "two"), (2, "four")])
42+
| GroupIntoBatches(batch_size=2)
43+
| WriteToFirehose(
44+
delivery_stream_name=delivery_stream_name,
45+
jsonify=True,
46+
multiline=True,
47+
max_trials=3
48+
)
49+
)
50+
```
51+
52+
For a list element (i.e. unkeyed stream), we can apply the `BatchElements` transform instead.
53+
54+
```python
55+
import apache_beam as beam
56+
from apache_beam.transforms.util import BatchElements
57+
from firehose_pyio.io import WriteToFirehose
58+
59+
with beam.Pipeline(options=pipeline_options) as p:
60+
(
61+
p
62+
| beam.Create(["one", "two", "three", "four"])
63+
| BatchElements(min_batch_size=2, max_batch_size=2)
64+
| WriteToFirehose(
65+
delivery_stream_name=delivery_stream_name,
66+
jsonify=True,
67+
multiline=True,
68+
max_trials=3
69+
)
70+
)
71+
```
72+
73+
See [this post](https://beam-pyio.github.io/blog/2024/firehose-pyio-intro/) for more examples.
2074

2175
## Contributing
2276

examples/create_resources.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
import json
19+
import boto3
20+
from botocore.exceptions import ClientError
21+
22+
COMMON_NAME = "firehose-pyio-test"
23+
24+
25+
def create_destination_bucket(bucket_name):
26+
client = boto3.client("s3")
27+
suffix = client._client_config.region_name
28+
client.create_bucket(Bucket=f"{bucket_name}-{suffix}")
29+
30+
31+
def create_firehose_iam_role(role_name):
32+
assume_role_policy_document = json.dumps(
33+
{
34+
"Version": "2012-10-17",
35+
"Statement": [
36+
{
37+
"Effect": "Allow",
38+
"Principal": {"Service": "firehose.amazonaws.com"},
39+
"Action": "sts:AssumeRole",
40+
}
41+
],
42+
}
43+
)
44+
client = boto3.client("iam")
45+
try:
46+
return client.get_role(RoleName=role_name)
47+
except ClientError as error:
48+
if error.response["Error"]["Code"] == "NoSuchEntity":
49+
resp = client.create_role(
50+
RoleName=role_name, AssumeRolePolicyDocument=assume_role_policy_document
51+
)
52+
client.attach_role_policy(
53+
RoleName=role_name,
54+
PolicyArn="arn:aws:iam::aws:policy/AmazonS3FullAccess",
55+
)
56+
return resp
57+
58+
59+
def create_delivery_stream(delivery_stream_name, role_arn, bucket_name):
60+
client = boto3.client("firehose")
61+
suffix = client._client_config.region_name
62+
try:
63+
client.create_delivery_stream(
64+
DeliveryStreamName=delivery_stream_name,
65+
DeliveryStreamType="DirectPut",
66+
S3DestinationConfiguration={
67+
"RoleARN": role_arn,
68+
"BucketARN": f"arn:aws:s3:::{bucket_name}-{suffix}",
69+
"BufferingHints": {"SizeInMBs": 1, "IntervalInSeconds": 0},
70+
},
71+
)
72+
except ClientError as error:
73+
if error.response["Error"]["Code"] == "ResourceInUseException":
74+
pass
75+
else:
76+
raise error
77+
78+
79+
if __name__ == "__main__":
80+
print("create a destination bucket...")
81+
create_destination_bucket(COMMON_NAME)
82+
print("create an iam role...")
83+
iam_resp = create_firehose_iam_role(COMMON_NAME)
84+
print("create a delivery stream...")
85+
create_delivery_stream(COMMON_NAME, iam_resp["Role"]["Arn"], COMMON_NAME)

examples/pipeline.py

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
import argparse
19+
import datetime
20+
import random
21+
import string
22+
import logging
23+
import boto3
24+
import time
25+
26+
import apache_beam as beam
27+
from apache_beam.transforms.util import BatchElements
28+
from apache_beam.options.pipeline_options import PipelineOptions
29+
from apache_beam.options.pipeline_options import SetupOptions
30+
31+
from firehose_pyio.io import WriteToFirehose
32+
33+
34+
def get_all_contents(bucket_name):
35+
client = boto3.client("s3")
36+
bucket_objects = client.list_objects_v2(
37+
Bucket=f"{bucket_name}-{client._client_config.region_name}"
38+
)
39+
return bucket_objects.get("Contents") or []
40+
41+
42+
def delete_all_objects(bucket_name):
43+
client = boto3.client("s3")
44+
contents = get_all_contents(bucket_name)
45+
for content in contents:
46+
client.delete_object(
47+
Bucket=f"{bucket_name}-{client._client_config.region_name}",
48+
Key=content["Key"],
49+
)
50+
51+
52+
def print_bucket_contents(bucket_name):
53+
client = boto3.client("s3")
54+
contents = get_all_contents(bucket_name)
55+
for content in contents:
56+
resp = client.get_object(
57+
Bucket=f"{bucket_name}-{client._client_config.region_name}",
58+
Key=content["Key"],
59+
)
60+
print(f"Key - {content['Key']}")
61+
print(resp["Body"].read().decode())
62+
63+
64+
def create_records(n=100):
65+
return [
66+
{
67+
"id": i,
68+
"name": "".join(random.choices(string.ascii_letters, k=5)).lower(),
69+
"created_at": datetime.datetime.now(),
70+
}
71+
for i in range(n)
72+
]
73+
74+
75+
def convert_ts(record: dict):
76+
record["created_at"] = record["created_at"].isoformat(timespec="milliseconds")
77+
return record
78+
79+
80+
def mask_secrets(d: dict):
81+
return {k: (v if k.find("aws") < 0 else "x" * len(v)) for k, v in d.items()}
82+
83+
84+
def run(argv=None, save_main_session=True):
85+
parser = argparse.ArgumentParser(description="Beam pipeline arguments")
86+
parser.add_argument(
87+
"--stream_name",
88+
default="firehose-pyio-test",
89+
type=str,
90+
help="Delivery stream name",
91+
)
92+
parser.add_argument(
93+
"--num_records", default="100", type=int, help="Number of records"
94+
)
95+
known_args, pipeline_args = parser.parse_known_args(argv)
96+
97+
pipeline_options = PipelineOptions(pipeline_args)
98+
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
99+
print(f"known_args - {known_args}")
100+
print(f"pipeline options - {mask_secrets(pipeline_options.display_data())}")
101+
102+
with beam.Pipeline(options=pipeline_options) as p:
103+
(
104+
p
105+
| "CreateElements" >> beam.Create(create_records(known_args.num_records))
106+
| "DatetimeToStr" >> beam.Map(convert_ts)
107+
| "BatchElements" >> BatchElements(min_batch_size=50)
108+
| "WriteToFirehose"
109+
>> WriteToFirehose(
110+
delivery_stream_name=known_args.stream_name,
111+
jsonify=True,
112+
multiline=True,
113+
max_trials=3,
114+
)
115+
)
116+
117+
logging.getLogger().setLevel(logging.WARN)
118+
logging.info("Building pipeline ...")
119+
120+
121+
if __name__ == "__main__":
122+
BUCKET_NAME = "firehose-pyio-test"
123+
print(">> delete existing objects...")
124+
delete_all_objects(BUCKET_NAME)
125+
print(">> start pipeline...")
126+
run()
127+
time.sleep(1)
128+
print(">> print bucket contents...")
129+
print_bucket_contents(BUCKET_NAME)

0 commit comments

Comments
 (0)