Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add s3 lambda docs #55

Merged
merged 4 commits into from
Mar 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/_sidebar.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
- [Airflow - Initial setup](/how-tos/airflow/initial-setup.md)
- [Airflow - Accessing the Airflow API](/how-tos/airflow/use-airflow-api.md)
- [Airflow - Sync Internal Airflow database](/how-tos/airflow/sync-database.md)
- [Airflow - Trigger a DAG using Datasets](how-tos/airflow/api-triggered-dag.md)
- [DAGs - Add Dag Documentation](/how-tos/airflow/create-dag-level-docs.md)
- [DAGs - Calling External Python Scripts](/how-tos/airflow/external-python-dag.md)
- [DAGs - Dynamically Set Schedule](/how-tos/airflow/dynamically-set-schedule.md)
Expand Down
207 changes: 207 additions & 0 deletions docs/how-tos/airflow/api-triggered-dag.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
# How to Trigger a DAG using Datasets

## Overview
This guide explains how to trigger Airflow DAGs with Datasets. DAGs can be triggered by another DAG using datasets or by an external process that sends a dataset event using the Airflow API.

## Producer DAG

Airflow enables DAGs to be triggered dynamically based on dataset updates. A producer DAG updates a dataset, automatically triggering any consumer DAGs subscribed to it.

To implement this, start by creating a DAG and defining the dataset it will update.

```python
# data_aware_producer_dag.py
import datetime

from airflow.decorators import dag, task
from airflow.datasets import Dataset


# A dataset can be anything, it will be a poiner in the Airflow db.
# If you need to access url like s3://my_bucket/my_file.txt then you can set
# it with the proper path for reuse.
DAG_UPDATED_DATASET = Dataset("upstream_data")

@dag(
default_args={
"start_date": datetime.datetime(2024, 1, 1, 0, 0),
"owner": "Noel Gomez",
"email": "[email protected]",
"retries": 3
},
description="Sample Producer DAG",
schedule="0 0 1 */12 *",
tags=["extract_and_load"],
catchup=False,
)
def data_aware_producer_dag():
@task(outlets=[DAG_UPDATED_DATASET])
def extract_and_load_dlt():
print("I'm the producer")

extract_and_load_dlt()


dag = data_aware_producer_dag()
```

Thats it, now you are ready to create your [Consumer DAG](#setting-up-the-airflow-dag)

## Lambda Function

Alternatively, you can trigger a DAG externally using the [Airflow API](how-tos/airflow/use-airflow-api.md). In this example we will be using an AWS Lambda Function to trigger your DAG once data lands in an S3 Bucket.

### Creating your zip files

To run your python script in a lambda function you need to upload the `requests` library
along with your `lambda_function.py` file.

- Create a python file locally and write out your function. Below is an example function.

**Example Lambda function:**

```python
import requests
import os
import json

# In Lambda, environment variables are set in the Lambda configuration
# rather than using dotenv
API_URL = os.environ.get("AIRFLOW_API_URL")
API_KEY = os.environ.get("DATACOVES_API_KEY")

def update_dataset(dataset_name):
url = f"{API_URL}/datasets/events"

response = requests.post(
url=url,
headers={
"Authorization": f"Token {API_KEY}",
},
json={"dataset_uri": dataset_name,}
)

try:
return response.json()
except ValueError:
return response.text

def print_response(response):
if response:
msg = json.dumps(response, indent=2)
print(f"Event posted successfully:\n{'='*30}\n\n {msg}")

def lambda_handler(event, context):
print("Lambda execution started")

try:
print(f"Environment variables: API_URL={API_URL is not None}, API_KEY={API_KEY is not None}")

# Extract S3 information
bucket = event['Records'][0]['s3']['bucket']['name']
key = event['Records'][0]['s3']['object']['key']
print(f"S3 event details: bucket={bucket}, key={key}")

print(f"File uploaded: {bucket}/{key}")

# Airflow Dataset name must be static so if filename changes, that would have to
# be addressed above
dataset_name = f"s3://{bucket}/{key}"

response = update_dataset(dataset_name)
print_response(response)

return {
'statusCode': 200,
'body': 'Successfully processed S3 event'
}
except Exception as e:
print(f"ERROR: {str(e)}")
import traceback
print(traceback.format_exc())
return {
'statusCode': 500,
'body': f'Error: {str(e)}'
}
```

- Run the following commands locally to prepare a zip file with everything you need.

```bash
pip install --target ./package requests
cd package
zip -r ../deployment-package.zip .
cd ..
zip -g deployment-package.zip lambda_function.py
```

### Create a Lambda Function

- Create a new AWS lambda function.
- Set the runtime to Python 3.10.
- Create an IAM role and add the following policy:
- AmazonS3ReadOnlyAccess to bucket
- Upload `deployment-package.zip` from the earlier step into the Lambda function.

### Set Environment Variables

- Gather your [API credentials](how-tos/airflow/use-airflow-api.md#step-1-navigate-to-your-target-environment) Configure the following environment variables in the Lambda Function's Configuration:
- `AIRFLOW_API_URL` (the API URL for Airflow)
- `AIRFLOW_API_KEY` (the API key for authentication)

## Configuring the S3 Event Notification

1. **Go to S3 and Open the Target Bucket**
2. **Create a New Event Notification under the bucket's properties**
- **Event Name:** `TriggerAirflowDAG`
- **Prefix (Optional):** Specify a subfolder if needed.
- **Suffix (Optional)** If you would like to trigger specific files ie) .csv
- **Event Type:** Select `All object create events`
- **Destination:** Select **AWS Lambda** and choose the function created earlier.

Now you are ready to set up your Consumer DAG.

## Setting Up the Airflow DAG

Whether you decide to use a producer DAG or the Airflow API, the last step is to create an Airflow DAG that is triggered by a dataset event rather than a schedule. This particular example can be triggered with either a `LAMBDA_UPDATED_DATASET` or `DAG_UPDATED_DATASET`.

![datasets graph](assets/datasets_graph.png)

### Example DAG

```python
import datetime

from airflow.decorators import dag, task
from airflow.datasets import Dataset

LAMBDA_UPDATED_DATASET = Dataset("s3://my_bucket/my_folder/my_file.csv")
DAG_UPDATED_DATASET = Dataset("upstream_data")

@dag(
default_args={
"start_date": datetime.datetime(2024, 1, 1, 0, 0),
"owner": "Noel Gomez",
"email": "[email protected]",
"retries": 1
},
description="Sample Producer DAG",
schedule=(LAMBDA_UPDATED_DATASET | DAG_UPDATED_DATASET),
tags=["transform"],
catchup=False,
)
def data_aware_consumer_dag():
@task
def run_consumer():
print("I'm the consumer")

run_consumer()


dag = data_aware_consumer_dag()
```


>[!NOTE] Ensure the Dataset you are sending an event to exists in Airflow. It will be created automatically when a DAG is created. If a dataset does not exist when the API event is sent, the API call will fail.


Binary file added docs/how-tos/airflow/assets/datasets_graph.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
20 changes: 16 additions & 4 deletions docs/how-tos/datahub/how_to_datahub_cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,25 @@ datahub init
```
![DataHub init](assets/datahub-init.png)

2. When prompted, enter the DataHub host URL using the following pattern:
2. When prompted, enter the DataHub host URL. This will differ depending on which environment your Datahub instance is in.

#### Development Environment

If your Datahub instance is within the Development environment use:

```bash
http://{environment-slug}-datahub-datahub-gms:8080
```

#### Cross Environment

You can access Datahub in Prod or QA from the Dev environment. This is considered cross environment access and requires the use the full url as seen below. The slug will be for the environment where Datahub is hosted (QA or Prod).

```bash
http://{environment-slug}-datahub-datahub-gms:8080
```
http://<slug>-datahub-datahub-gms.dcw-<slug>:8080
```

>[!TIP] The environment slug can be found next to your environment name on the top left corner of your Datacoves workspace. For example, the environment slug below is `DEV123`, so the URL would be: `http://dev123-datahub-datahub-gms:8080`. Cross environment access is available. ie) You can access Datahub in in Prod from the Dev environment. Just be sure to use the environment slug where Datahub is hosted.
>[!NOTE] The environment slug can be found next to your environment name on the top left corner of your Datacoves workspace. For example, the environment slug below is `DEV123`, so the URL would be: `http://dev123-datahub-datahub-gms:8080`.

![Environment slug](assets/datahub-env-slug.png)

Expand Down