Skip to content

Commit 961bff7

Browse files
committed
Airflow: Implement suggestions by CodeRabbit, part 4
1 parent eee3766 commit 961bff7

File tree

5 files changed

+76
-51
lines changed

5 files changed

+76
-51
lines changed

docs/integrate/airflow/data-retention-policy.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,11 +100,13 @@ In the DAG’s main method, use Airflow’s [dynamic task mapping](https://airfl
100100
SQLExecuteQueryOperator.partial(
101101
task_id="delete_partition",
102102
conn_id="cratedb_connection",
103-
sql="DELETE FROM {{params.table_fqn}} WHERE {{params.column}} = {{params.value}};",
103+
sql="DELETE FROM {{ params.table_fqn }} WHERE {{ params.column }} = {{ params.value }};",
104104
).expand(params=get_policies().map(map_policy))
105105
```
106106

107-
`get_policies` returns a set of policies. On each policy, the `map_policy` is applied. The return value of `map_policy` is finally passed as `params` to the `SQLExecuteQueryOperator`.
107+
`get_policies` returns a set of policies. On each policy, the `map_policy` is
108+
applied. The return value of `map_policy` is finally passed as `params` to the
109+
`SQLExecuteQueryOperator`.
108110

109111
This leads us already to the final version of the DAG:
110112
```python
@@ -136,7 +138,7 @@ def data_retention_delete():
136138
SQLExecuteQueryOperator.partial(
137139
task_id="delete_partition",
138140
conn_id="cratedb_connection",
139-
sql="DELETE FROM {{params.table_fqn}} WHERE {{params.column}} = {{params.value}};",
141+
sql="DELETE FROM {{ params.table_fqn }} WHERE {{ params.column }} = {{ params.value }};",
140142
).expand(params=get_policies().map(map_policy))
141143

142144

docs/integrate/airflow/getting-started.md

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,15 @@ Example output:
5959
For other operating systems, follow the [official documentation](https://www.astronomer.io/docs/astro/cli/install-cli).
6060
After installing the Astronomer CLI, initialize a new project:
6161

62-
* Create a project directory:
62+
- Create a project directory:
6363
```bash
6464
mkdir astro-project && cd astro-project
6565
```
66-
* Initialize the project with the following command:
66+
- Initialize the project with the following command:
6767
```bash
6868
astro dev init
6969
```
70-
* This will create a skeleton project directory as follows:
70+
- This will create a skeleton project directory as follows:
7171
```text
7272
├── Dockerfile
7373
├── README.md
@@ -81,14 +81,16 @@ After installing the Astronomer CLI, initialize a new project:
8181
```
8282

8383
The astronomer project consists of four Docker containers:
84-
* PostgreSQL server (for configuration/runtime data)
85-
* Airflow scheduler
86-
* Web server for rendering Airflow UI
87-
* Triggerer (running an event loop for deferrable tasks)
84+
- PostgreSQL server (for configuration/runtime data)
85+
- Airflow scheduler
86+
- Web server for rendering Airflow UI
87+
- Triggerer (running an event loop for deferrable tasks)
8888

89-
The PostgreSQL server is configured to listen on port 5432. The web server is listening on port 8080 and can be accessed via http://localhost:8080/ with `admin` for both username and password.
89+
The PostgreSQL server listens on port 5432. The web server listens on port 8080
90+
and is available at <http://localhost:8080/> with `admin`/`admin`.
9091

91-
If these ports are already in use, change them in `.astro/config.yaml`. For example, set the webserver to 8081 and PostgreSQL to 5435:
92+
If these ports are already in use, change them in `.astro/config.yaml`. For
93+
example, set the webserver to 8081 and PostgreSQL to 5435:
9294
```yaml
9395
project:
9496
name: astro-project
@@ -98,7 +100,8 @@ postgres:
98100
port: 5435
99101
```
100102
101-
Start the project with `astro dev start`. After the containers start, access the Airflow UI at `http://localhost:8081`:
103+
Start the project with `astro dev start`. After the containers start, access
104+
the Airflow UI at <http://localhost:8081>:
102105

103106
![Airflow UI landing page](https://us1.discourse-cdn.com/flex020/uploads/crate/original/1X/f298a4c609312133e388555a9eba51733bfd5645.png)
104107

@@ -127,13 +130,17 @@ The initialized `astro-project` now has a home on GitHub.
127130

128131
## Add database credentials
129132

130-
To configure the connection to CrateDB we need to set up a corresponding environment variable. On Astronomer the environment variable can be set up via the Astronomer UI, via `Dockerfile`, or via a `.env` file which is automatically generated during project initialization.
133+
To configure the CrateDB connection, set an environment variable. On
134+
Astronomer, set it via the UI, `Dockerfile`, or the `.env` file
135+
(generated during initialization).
131136

132137
In this tutorial, we will set up the necessary environment variables via a `.env` file. To learn about alternative ways, please check the [Astronomer documentation](https://docs.astronomer.io/astro/environment-variables). The first variable we set is one for the CrateDB connection, as follows:
133138

134139
`AIRFLOW_CONN_CRATEDB_CONNECTION=postgresql://<user>:<password>@<host>/doc?sslmode=disable`
135140

136-
In case a TLS connection is required, change `sslmode=require`. To confirm that a new variable is applied, first, start the Airflow project and then create a bash session in the scheduler container by running `docker exec -it <scheduler_container_name> /bin/bash`.
141+
For TLS, set `sslmode=require`. To confirm that the variable is applied, start
142+
the project and open a bash session in the scheduler container:
143+
`docker exec -it <scheduler_container_name> /bin/bash`.
137144

138145
Run `env` to list the applied environment variables.
139146

docs/integrate/airflow/import-parquet.md

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,26 @@
22
# Automating the import of Parquet files with Apache Airflow
33

44
## Introduction
5-
Using Airflow to import the NYC Taxi and Limousine dataset in Parquet format.
65

7-
CrateDB does not support `COPY FROM` for Parquet. It supports CSV and JSON. Therefore, this tutorial uses an alternative approach rather than switching the previous CSV workflow to Parquet.
6+
Use Airflow to import the NYC Taxi and Limousine dataset provided in Parquet format.
87

9-
First and foremost, keep in mind the strategy presented here for importing Parquet files into CrateDB, we have already covered this topic in a previous tutorial using a different approach from the one introduced in this tutorial, so feel free to have a look at the tutorial about {ref}`arrow-import-parquet` and explore with the different possibilities out there.
8+
CrateDB supports `COPY FROM` for CSV and JSON, not Parquet. This tutorial converts
9+
Parquet to CSV before loading.
10+
11+
For an alternative Parquet ingestion approach, see {ref}`arrow-import-parquet`.
1012

1113
## Prerequisites
1214

13-
Before getting started, you need to have some knowledge of Airflow and an instance of Airflow already running. Besides that, a CrateDB instance should already be set up before moving on with this tutorial. This SQL is also available in the setup folder in our [GitHub repository](https://github.com/crate/crate-airflow-tutorial).
15+
Before you start, have Airflow and CrateDB running. The SQL shown below also
16+
resides in the setup folder of the
17+
[GitHub repository](https://github.com/crate/crate-airflow-tutorial).
1418

15-
We start by creating the two tables in CrateDB: A temporary staging table (`nyc_taxi.load_trips_staging`) and the final destination table (`nyc_taxi.trips`).
19+
Create two tables in CrateDB: a temporary staging table
20+
(`nyc_taxi.load_trips_staging`) and the final table (`nyc_taxi.trips`).
1621

17-
In this case, the staging table is a primary insertion point, which was later used to cast data to their final types. For example, the `passenger_count` column is defined as `REAL` in the staging table, while it is defined as `INTEGER` in the `nyc_taxi.trips` table.
22+
Insert into the staging table first, then cast values into their final
23+
types when inserting into `nyc_taxi.trips`. For example, `passenger_count`
24+
is `REAL` in staging and `INTEGER` in `nyc_taxi.trips`.
1825

1926
```sql
2027
CREATE TABLE IF NOT EXISTS "nyc_taxi"."load_trips_staging" (
@@ -74,7 +81,7 @@ PARTITIONED BY ("pickup_year");
7481
To better understand how Airflow works and its applications, you can check other
7582
tutorials related to that topic {ref}`here <airflow-tutorials>`.
7683

77-
Ok! So, once the tools are already set up with the corresponding tables created, we should be good to go.
84+
With the tools set up and tables created, proceed to the DAG.
7885

7986
## The Airflow DAG
8087
![Airflow DAG workflow|690x76](https://us1.discourse-cdn.com/flex020/uploads/crate/original/1X/29502f83c13d29d90ab703a399f58c6daeee6fe6.png)
@@ -86,10 +93,12 @@ The Airflow DAG used in this tutorial contains 7 tasks:
8693
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-03.parquet
8794
```
8895
The file path above corresponds to the data from March 2022. So, to retrieve a specific file, the task gets the date and formats it to compose the name of the specific file. Important to mention that the data is released with 2 months of delay, so it had to be taken into consideration.
89-
* **process_parquet:** afterward, the name is used to download the file to local storage and then transform it from Parquet to CSV using [`parquet-tools`] (Apache Parquet CLI, see [Apache Arrow])
90-
* `curl -o "<LOCAL-PARQUET-FILE-PATH>" "<REMOTE-PARQUET-FILE>"`
91-
* `parquet-tools csv <LOCAL-PARQUET-FILE-PATH> > <CSV-FILE-PATH>`
92-
Both tasks are executed within one Bash Operator.
96+
* **process_parquet:** afterward, use the name to download the file to local storage and convert it from Parquet to CSV using `parquet-tools` (Apache Parquet CLI; see [Apache Arrow]).
97+
98+
* `curl -o "<LOCAL-PARQUET-FILE-PATH>" "<REMOTE-PARQUET-FILE>"`
99+
* `parquet-tools csv <LOCAL-PARQUET-FILE-PATH> > <CSV-FILE-PATH>`
100+
101+
Both commands run within one `BashOperator`.
93102
* **copy_csv_to_s3:** Once the newly transformed file is available, it gets uploaded to an S3 Bucket to then, be used in the {ref}`crate-reference:sql-copy-from` SQL statement.
94103
* **copy_csv_staging:** copy the CSV file stored in S3 to the staging table described previously.
95104
* **copy_staging_to_trips:** finally, copy the data from the staging table to the trips table, casting the columns that are not in the right type yet.
@@ -101,9 +110,13 @@ The DAG was configured based on the characteristics of the data in use. In this
101110
* How often does the data get updated
102111
* When was the first file made available
103112

104-
In this case, according to the NYC TLC website “Trip data is published monthly (with two months delay)”. So, the DAG is set up to run monthly, and given the first file was made available in January 2009, the start date was set to March 2009. But why March and not January? As previously mentioned, the files are made available with 2 months of delay, so the first DAG instance, which has a logical execution date equal to "March 2009" will retrieve March as the current month minus 2, corresponding to January 2009, the very first file ever published.
113+
The NYC TLC publishes trip data monthly with a two‑month delay. Set the DAG to
114+
run monthly with a start date of March 2009. The first run (logical date March
115+
2009) downloads the file for January 2009 (logical date minus two months),
116+
2010) which is the first available dataset.
105117

106-
You may find the full code for the DAG described above available in our [GitHub repository](https://github.com/crate/crate-airflow-tutorial/blob/main/dags/nyc_taxi_dag.py).
118+
You may find the full code for the DAG described above available in our
119+
[GitHub repository](https://github.com/crate/crate-airflow-tutorial/blob/main/dags/nyc_taxi_dag.py).
107120

108121
## Wrap up
109122

docs/integrate/airflow/import-stock-market-data.md

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,36 @@
11
(airflow-import-stock-market-data)=
22
# Updating stock market data automatically with CrateDB and Apache Airflow
33

4-
Watch this tutorial on Youtube: https://www.youtube.com/watch?v=YTTUzeaYUgQ&t=685s
4+
Watch this tutorial on YouTube: [Automating stock data with Airflow and CrateDB](https://www.youtube.com/watch?v=YTTUzeaYUgQ&t=685s).
55

66
If you are struggling with keeping your stock market data up to date, this tutorial walks you through exactly what you need to do so you can automate data collection and storage from SP500 companies.
7-
![Picture by StockSnap on Pixabay](upload://tXDu25ajd6zX201Ju43lENW1uQ1.jpeg)
87

8+
## Quick overview
99

10-
## Quick Overview
1110
Let's have a quick overview of what you'll do:
1211

13-
You have a goal: regularly update stock market data.
14-
To achieve your goal, you can divide it into tasks: download, prepare, and store data. You want to turn these tasks into a workflow, run it and observe the results; in other words, you want to orchestrate your workflow, and Airflow is the tool for that.
15-
16-
So the first thing to do is to start CrateDB and set up a table to store your data. Then, to orchestrate the process of regular data updates, you will create an Airflow project and establish the connection to CrateDB. Once you set up your Airflow project, you will write your tasks in Python as an Airflow DAG workflow (more details later). Finally, you will set a schedule for your workflow, and it's done!
12+
:Goal: Update stock market data regularly.
13+
:Approach: Define tasks to download, prepare, and store data; orchestrate them with Airflow.
14+
:Steps: Start CrateDB and create a table; create an Airflow project and set the CrateDB connection; implement the DAG in Python; schedule it.
1715

1816
## Setup
17+
1918
Let's get right to the setup on a Mac machine.
2019
You want to make sure you have Homebrew installed and Docker Desktop running.
2120

2221
### Run CrateDB and create a table to store data
2322

24-
The first to do is to run CrateDB with Docker. It's easy: once you have Docker Desktop running, copy the Docker command from the CrateDB installation page and run it in your terminal.
25-
23+
First, run CrateDB with Docker. With Docker Desktop running, copy the command from the CrateDB installation page and run it:
2624
```bash
2725
docker run --publish=4200:4200 --publish=5432:5432 --env CRATE_HEAP_SIZE=1g crate:latest
2826
```
2927

30-
With CrateDB running, you can now access the CrateDB Admin UI by going to your browser and typing *localhost:4200*.
28+
With CrateDB running, you can now access the CrateDB Admin UI by going to
29+
your browser and typing *localhost:4200*.
3130

32-
Let’s now create a table to store your financial data. I'm particularly interested in the "adjusted-close" value for the stocks, so I will create a table that stores the date, the stock ticker, and the adjusted-close value. I will set the `closing_date` and `ticker` as primary keys. The final statement looks like this:
31+
Create a table to store financial data. Focus on the adjusted close value
32+
(“adjusted_close”) per ticker per day. Use a composite primary key on
33+
(`closing_date`, `ticker`):
3334
```sql
3435
CREATE TABLE sp500 (
3536
closing_date TIMESTAMP,
@@ -75,20 +76,22 @@ Some information about the default settings: the PostgreSQL server is set up to
7576
There are now three things you have to adjust before running Airflow:
7677

7778
* Add your CrateDB credentials to the `.env` file. Open the file in a text editor, and add the following line, which takes the default credentials for CrateDB, with user = crate, and password = null. (note: my internal port for running CrateDB in Docker is 5433, which I use here. If using the standard Docker command with 5432, here it should also be 5432).
78-
```bash
79-
AIRFLOW_CONN_CRATEDB_CONNECTION=postgresql://crate:[email protected]:5433/doc?sslmode=disable
80-
```
79+
```bash
80+
# For local development only; do not commit real credentials
81+
AIRFLOW_CONN_CRATEDB_CONNECTION=postgresql://crate:[email protected]:5433/doc?sslmode=disable
82+
```
8183
* If the default ports are unavailable, you can change them to free ports. Just open the `.astro/config.yaml` file in a text editor and update the web server port to 8081 (instead of default 8080) and Postgres port to 5435 (instead of the default 5432), like so:
82-
```yaml
83-
project:
84-
name: astro-project
85-
webserver:
86-
port: 8081
87-
postgres:
88-
port: 5435
89-
```
84+
```yaml
85+
project:
86+
name: astro-project
87+
webserver:
88+
port: 8081
89+
postgres:
90+
port: 5435
91+
```
9092
9193
### Start Airflow
94+
9295
Now you are done with the last adjustments, head back to your terminal and run this command to start Airflow: `astro dev start`
9396
You can now access Airflow in your browser at `http://localhost:8081`.
9497

docs/integrate/airflow/index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ journey. Spend time where it counts.
6868
:columns: 12
6969
:link: airflow-getting-started
7070
:link-type: ref
71-
Define an Airflow DAG that downloads, processes, and stores stock market data in CrateDB.
71+
Define an Airflow DAG that downloads, processes, and stores data in CrateDB.
7272
:::
7373

7474
:::{grid-item-card} Tutorial: Import Parquet files

0 commit comments

Comments
 (0)