Skip to content

Commit d3bc93d

Browse files
Merge pull request #55 from datacoves/mp_airflow_api_s3
Add s3 lambda docs
2 parents 2d99f9b + 47dd586 commit d3bc93d

File tree

4 files changed

+224
-4
lines changed

4 files changed

+224
-4
lines changed

docs/_sidebar.md

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
- [Airflow - Initial setup](/how-tos/airflow/initial-setup.md)
1818
- [Airflow - Accessing the Airflow API](/how-tos/airflow/use-airflow-api.md)
1919
- [Airflow - Sync Internal Airflow database](/how-tos/airflow/sync-database.md)
20+
- [Airflow - Trigger a DAG using Datasets](how-tos/airflow/api-triggered-dag.md)
2021
- [DAGs - Add Dag Documentation](/how-tos/airflow/create-dag-level-docs.md)
2122
- [DAGs - Calling External Python Scripts](/how-tos/airflow/external-python-dag.md)
2223
- [DAGs - Dynamically Set Schedule](/how-tos/airflow/dynamically-set-schedule.md)
+207
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
# How to Trigger a DAG using Datasets
2+
3+
## Overview
4+
This guide explains how to trigger Airflow DAGs with Datasets. DAGs can be triggered by another DAG using datasets or by an external process that sends a dataset event using the Airflow API.
5+
6+
## Producer DAG
7+
8+
Airflow enables DAGs to be triggered dynamically based on dataset updates. A producer DAG updates a dataset, automatically triggering any consumer DAGs subscribed to it.
9+
10+
To implement this, start by creating a DAG and defining the dataset it will update.
11+
12+
```python
13+
# data_aware_producer_dag.py
14+
import datetime
15+
16+
from airflow.decorators import dag, task
17+
from airflow.datasets import Dataset
18+
19+
20+
# A dataset can be anything, it will be a poiner in the Airflow db.
21+
# If you need to access url like s3://my_bucket/my_file.txt then you can set
22+
# it with the proper path for reuse.
23+
DAG_UPDATED_DATASET = Dataset("upstream_data")
24+
25+
@dag(
26+
default_args={
27+
"start_date": datetime.datetime(2024, 1, 1, 0, 0),
28+
"owner": "Noel Gomez",
29+
"email": "[email protected]",
30+
"retries": 3
31+
},
32+
description="Sample Producer DAG",
33+
schedule="0 0 1 */12 *",
34+
tags=["extract_and_load"],
35+
catchup=False,
36+
)
37+
def data_aware_producer_dag():
38+
@task(outlets=[DAG_UPDATED_DATASET])
39+
def extract_and_load_dlt():
40+
print("I'm the producer")
41+
42+
extract_and_load_dlt()
43+
44+
45+
dag = data_aware_producer_dag()
46+
```
47+
48+
Thats it, now you are ready to create your [Consumer DAG](#setting-up-the-airflow-dag)
49+
50+
## Lambda Function
51+
52+
Alternatively, you can trigger a DAG externally using the [Airflow API](how-tos/airflow/use-airflow-api.md). In this example we will be using an AWS Lambda Function to trigger your DAG once data lands in an S3 Bucket.
53+
54+
### Creating your zip files
55+
56+
To run your python script in a lambda function you need to upload the `requests` library
57+
along with your `lambda_function.py` file.
58+
59+
- Create a python file locally and write out your function. Below is an example function.
60+
61+
**Example Lambda function:**
62+
63+
```python
64+
import requests
65+
import os
66+
import json
67+
68+
# In Lambda, environment variables are set in the Lambda configuration
69+
# rather than using dotenv
70+
API_URL = os.environ.get("AIRFLOW_API_URL")
71+
API_KEY = os.environ.get("DATACOVES_API_KEY")
72+
73+
def update_dataset(dataset_name):
74+
url = f"{API_URL}/datasets/events"
75+
76+
response = requests.post(
77+
url=url,
78+
headers={
79+
"Authorization": f"Token {API_KEY}",
80+
},
81+
json={"dataset_uri": dataset_name,}
82+
)
83+
84+
try:
85+
return response.json()
86+
except ValueError:
87+
return response.text
88+
89+
def print_response(response):
90+
if response:
91+
msg = json.dumps(response, indent=2)
92+
print(f"Event posted successfully:\n{'='*30}\n\n {msg}")
93+
94+
def lambda_handler(event, context):
95+
print("Lambda execution started")
96+
97+
try:
98+
print(f"Environment variables: API_URL={API_URL is not None}, API_KEY={API_KEY is not None}")
99+
100+
# Extract S3 information
101+
bucket = event['Records'][0]['s3']['bucket']['name']
102+
key = event['Records'][0]['s3']['object']['key']
103+
print(f"S3 event details: bucket={bucket}, key={key}")
104+
105+
print(f"File uploaded: {bucket}/{key}")
106+
107+
# Airflow Dataset name must be static so if filename changes, that would have to
108+
# be addressed above
109+
dataset_name = f"s3://{bucket}/{key}"
110+
111+
response = update_dataset(dataset_name)
112+
print_response(response)
113+
114+
return {
115+
'statusCode': 200,
116+
'body': 'Successfully processed S3 event'
117+
}
118+
except Exception as e:
119+
print(f"ERROR: {str(e)}")
120+
import traceback
121+
print(traceback.format_exc())
122+
return {
123+
'statusCode': 500,
124+
'body': f'Error: {str(e)}'
125+
}
126+
```
127+
128+
- Run the following commands locally to prepare a zip file with everything you need.
129+
130+
```bash
131+
pip install --target ./package requests
132+
cd package
133+
zip -r ../deployment-package.zip .
134+
cd ..
135+
zip -g deployment-package.zip lambda_function.py
136+
```
137+
138+
### Create a Lambda Function
139+
140+
- Create a new AWS lambda function.
141+
- Set the runtime to Python 3.10.
142+
- Create an IAM role and add the following policy:
143+
- AmazonS3ReadOnlyAccess to bucket
144+
- Upload `deployment-package.zip` from the earlier step into the Lambda function.
145+
146+
### Set Environment Variables
147+
148+
- Gather your [API credentials](how-tos/airflow/use-airflow-api.md#step-1-navigate-to-your-target-environment) Configure the following environment variables in the Lambda Function's Configuration:
149+
- `AIRFLOW_API_URL` (the API URL for Airflow)
150+
- `AIRFLOW_API_KEY` (the API key for authentication)
151+
152+
## Configuring the S3 Event Notification
153+
154+
1. **Go to S3 and Open the Target Bucket**
155+
2. **Create a New Event Notification under the bucket's properties**
156+
- **Event Name:** `TriggerAirflowDAG`
157+
- **Prefix (Optional):** Specify a subfolder if needed.
158+
- **Suffix (Optional)** If you would like to trigger specific files ie) .csv
159+
- **Event Type:** Select `All object create events`
160+
- **Destination:** Select **AWS Lambda** and choose the function created earlier.
161+
162+
Now you are ready to set up your Consumer DAG.
163+
164+
## Setting Up the Airflow DAG
165+
166+
Whether you decide to use a producer DAG or the Airflow API, the last step is to create an Airflow DAG that is triggered by a dataset event rather than a schedule. This particular example can be triggered with either a `LAMBDA_UPDATED_DATASET` or `DAG_UPDATED_DATASET`.
167+
168+
![datasets graph](assets/datasets_graph.png)
169+
170+
### Example DAG
171+
172+
```python
173+
import datetime
174+
175+
from airflow.decorators import dag, task
176+
from airflow.datasets import Dataset
177+
178+
LAMBDA_UPDATED_DATASET = Dataset("s3://my_bucket/my_folder/my_file.csv")
179+
DAG_UPDATED_DATASET = Dataset("upstream_data")
180+
181+
@dag(
182+
default_args={
183+
"start_date": datetime.datetime(2024, 1, 1, 0, 0),
184+
"owner": "Noel Gomez",
185+
"email": "[email protected]",
186+
"retries": 1
187+
},
188+
description="Sample Producer DAG",
189+
schedule=(LAMBDA_UPDATED_DATASET | DAG_UPDATED_DATASET),
190+
tags=["transform"],
191+
catchup=False,
192+
)
193+
def data_aware_consumer_dag():
194+
@task
195+
def run_consumer():
196+
print("I'm the consumer")
197+
198+
run_consumer()
199+
200+
201+
dag = data_aware_consumer_dag()
202+
```
203+
204+
205+
>[!NOTE] Ensure the Dataset you are sending an event to exists in Airflow. It will be created automatically when a DAG is created. If a dataset does not exist when the API event is sent, the API call will fail.
206+
207+
105 KB
Loading

docs/how-tos/datahub/how_to_datahub_cli.md

+16-4
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,25 @@ datahub init
1616
```
1717
![DataHub init](assets/datahub-init.png)
1818

19-
2. When prompted, enter the DataHub host URL using the following pattern:
19+
2. When prompted, enter the DataHub host URL. This will differ depending on which environment your Datahub instance is in.
20+
21+
#### Development Environment
22+
23+
If your Datahub instance is within the Development environment use:
24+
25+
```bash
26+
http://{environment-slug}-datahub-datahub-gms:8080
27+
```
28+
29+
#### Cross Environment
30+
31+
You can access Datahub in Prod or QA from the Dev environment. This is considered cross environment access and requires the use the full url as seen below. The slug will be for the environment where Datahub is hosted (QA or Prod).
2032

2133
```bash
22-
http://{environment-slug}-datahub-datahub-gms:8080
23-
```
34+
http://<slug>-datahub-datahub-gms.dcw-<slug>:8080
35+
```
2436

25-
>[!TIP] The environment slug can be found next to your environment name on the top left corner of your Datacoves workspace. For example, the environment slug below is `DEV123`, so the URL would be: `http://dev123-datahub-datahub-gms:8080`. Cross environment access is available. ie) You can access Datahub in in Prod from the Dev environment. Just be sure to use the environment slug where Datahub is hosted.
37+
>[!NOTE] The environment slug can be found next to your environment name on the top left corner of your Datacoves workspace. For example, the environment slug below is `DEV123`, so the URL would be: `http://dev123-datahub-datahub-gms:8080`.
2638
2739
![Environment slug](assets/datahub-env-slug.png)
2840

0 commit comments

Comments
 (0)