Skip to content

Commit 25cf79b

Browse files
committed
Airflow: Implement suggestions by CodeRabbit, part 1
1 parent 99f5b01 commit 25cf79b

File tree

8 files changed

+39
-38
lines changed

8 files changed

+39
-38
lines changed

docs/integrate/airflow/data-retention-hot-cold.md

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ In this fourth article on automating recurrent CrateDB queries with [Apache Airf
77

88
A hot/cold storage strategy is often motivated by a tradeoff between performance and cost-effectiveness. In a database such as CrateDB, more recent data tends to have a higher significance for analytical queries. Well-performing disks (hot storage) play a key role on the infrastructure side to support performance requirements but can come at a high cost. As data ages and gets less business-critical for near-real-time analysis, transitioning it to slower/cheaper disks (cold storage) helps to improve the cost-performance ratio.
99

10-
In a CrateDB cluster, nodes can have different hardware specifications. Hence, a cluster can consist of a combination of hot and cold storage nodes, each with respective disks. By assigning corresponding attributes to nodes, CrateDB can be made aware of such node types and consider if when allocating partitions.
10+
In a CrateDB cluster, nodes can have different hardware specifications. Hence, a cluster can consist of a combination of hot and cold storage nodes, each with respective disks. By assigning corresponding attributes to nodes, CrateDB recognizes these node types and considers it when allocating partitions.
1111

1212
## CrateDB setup
1313

@@ -131,11 +131,12 @@ To remember which partitions have already been reallocated, we can make use of t
131131

132132
## Airflow setup
133133

134-
We assume that a basic Astronomer/Airflow setup is already in place, as described in our {ref}`first post <airflow-export-s3>`. Let’s quickly go through the three steps of the algorithm:
134+
Assume a basic Astronomer/Airflow setup is in place, as described in the {ref}`first post <airflow-export-s3>`. The algorithm has three steps:
135135

136136
1. `get_policies`: A query on `doc.retention_policies` and `information_schema.table_partitions` identifies partitions affected by a retention policy.
137137
2. `map_policy`: A helper method transforming the output of `get_policies` into a Python `dict` data structure for easier handling.
138-
4. `reallocate_partitions`: Executes an SQL statement for each mapped policy: `ALTER TABLE <table> PARTITION (<partition key> = <partition value>) SET ("routing.allocation.require.storage" = 'cold');`
138+
3. `reallocate_partitions`: Executes an SQL statement for each mapped policy: `ALTER TABLE <table> PARTITION (<partition key> = <partition value>) SET ("routing.allocation.require.storage" = 'cold');`
139+
139140
The CrateDB cluster will then automatically initiate the relocation of the affected partition to a node that fulfills the requirement (`cratedb03` in our case).
140141

141142
The full implementation is available as [data_retention_reallocate_dag.py](https://github.com/crate/crate-airflow-tutorial/blob/main/dags/data_retention_reallocate_dag.py) on GitHub.
@@ -168,5 +169,5 @@ INSERT INTO doc.retention_policies (table_schema, table_name, partition_column,
168169

169170
## Summary
170171

171-
Building upon the previously discussed data retention policy implementation, we showed that reallocating partitions integrates seemingly and consists only of a single SQL statement.
172-
CrateDB’s self-organization capabilities take care of all low-level operations and the actual moving of partitions. Furthermore, we showed that a multi-staged approach to data retention policies can be achieved by first reallocating and eventually deleting partitions permanently.
172+
Building on the earlier data retention policy implementation, reallocating partitions integrates seamlessly and consists of a single SQL statement.
173+
CrateDB’s self-organization handles the low-level operations and the actual movement of partitions. A multi‑stage policy is straightforward: first reallocate, then delete.

docs/integrate/airflow/data-retention-policy.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ The implementation of the corresponding tasks looks as follows:
6666
```python
6767
@task
6868
def get_policies(ds=None):
69-
"""Retrieve all partitions effected by a policy"""
69+
"""Retrieve all partitions affected by a policy"""
7070
pg_hook = PostgresHook(postgres_conn_id="cratedb_connection")
7171
sql = Path("include/data_retention_retrieve_delete_policies.sql")
7272
return pg_hook.get_records(
@@ -94,7 +94,7 @@ def map_policy(policy):
9494
}
9595
```
9696

97-
In the DAG's main method, we can now make use of Airflows' [dynamic task mapping](https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html) which allows executing the same task several times, with different parameters:
97+
In the DAGs main method, use Airflow’s [dynamic task mapping](https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html) to execute the same task several times with different parameters:
9898

9999
```python
100100
SQLExecuteQueryOperator.partial(
@@ -118,7 +118,7 @@ def map_policy(policy):
118118

119119
@task
120120
def get_policies(ds=None):
121-
"""Retrieve all partitions effected by a policy"""
121+
"""Retrieve all partitions affected by a policy"""
122122
pg_hook = PostgresHook(postgres_conn_id="cratedb_connection")
123123
sql = Path("include/data_retention_retrieve_delete_policies.sql")
124124
return pg_hook.get_records(

docs/integrate/airflow/export-s3.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ CREATE TABLE IF NOT EXISTS "metrics" (
2323
"name" TEXT,
2424
"tags" OBJECT(DYNAMIC),
2525
"fields" OBJECT(DYNAMIC),
26-
"month" TIMESTAMP AS date_trunc('quarter', "timestamp")
26+
"month" TIMESTAMP AS date_trunc('month', "timestamp")
2727
)
2828
```
2929
In general, to export data to a file one can use the `COPY TO` statement in CrateDB. This command exports the content of a table to one or more JSON files in a given directory. JSON files have unique names and they are formatted to contain one table row per line. The `TO` clause specifies the URI string of the output location. CrateDB supports two URI schemes: `file` and `s3`. We use the `s3` scheme to access the bucket on Amazon S3. Further information on different clauses of the `COPY TO` statement can be found in the official {ref}`CrateDB documentation <crate-reference:sql-copy-to>`.
@@ -34,7 +34,7 @@ To export data from the `metrics` table to S3, we need a statement such as:
3434

3535
## DAG implementation
3636

37-
In order to build a generic DAG that is not specific to one single table configuration, we first create a file `include/table_exports.py`, containing a list of dictionaries (key/value pairs) for each table to export:
37+
To keep the DAG generic, create `include/table_exports.py` with one dictionary per table to export:
3838
```python
3939
TABLES = [
4040
{

docs/integrate/airflow/getting-started.md

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,8 @@ Automate CrateDB queries with Apache Airflow.
66
:::
77

88
## Introduction
9-
This article is the first in a series of articles demonstrating how to
10-
effectively use [Apache Airflow](https://airflow.apache.org/) with CrateDB.
11-
In this part, we first introduce Apache Airflow and why we should use it
12-
for automating recurring queries in CrateDB.
9+
10+
This first article shows how to use [Apache Airflow] with CrateDB to automate recurring queries.
1311

1412
Then, we cover [Astronomer], the managed Apache Airflow provider, followed
1513
by instructions on how to set up the project with [Astronomer CLI].
@@ -120,5 +118,6 @@ To check all environment variables that are applied, run `env`.
120118
This will output some variables set by Astronomer by default including the variable for the CrateDB connection.
121119

122120

121+
[Apache Airflow]: https://airflow.apache.org/
123122
[Astronomer]: https://www.astronomer.io/
124123
[Astronomer CLI]: https://docs.astronomer.io/astro/cli/overview

docs/integrate/airflow/import-parquet.md

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
## Introduction
55
Using Airflow to import the NYC Taxi and Limousine dataset in Parquet format.
66

7-
Currently, Parquet imports using COPY FROM are not supported by CrateDB, it only supports CSV and JSON files instead. Because of that, we implemented a different approach from simply changing the previous implementation from CSV to Parquet.
7+
CrateDB does not support `COPY FROM` for Parquet. It supports CSV and JSON. Therefore, this tutorial uses an alternative approach rather than switching the previous CSV workflow to Parquet.
88

99
First and foremost, keep in mind the strategy presented here for importing Parquet files into CrateDB, we have already covered this topic in a previous tutorial using a different approach from the one introduced in this tutorial, so feel free to have a look at the tutorial about {ref}`arrow-import-parquet` and explore with the different possibilities out there.
1010

@@ -80,20 +80,19 @@ Ok! So, once the tools are already set up with the corresponding tables created,
8080
![Airflow DAG workflow|690x76](https://us1.discourse-cdn.com/flex020/uploads/crate/original/1X/29502f83c13d29d90ab703a399f58c6daeee6fe6.png)
8181

8282
The DAG pictured above represents a routine that will run every month to retrieve the latest released file by NYC TLC based on the execution date of that particular instance. Since it is configured to catch up with previous months when enabled, it will generate one instance for each previous month since January 2009 and each instance will download and process the corresponding month, based on the logical execution date.
83-
The Airflow DAG used in this tutorial contains 6 tasks which are described below:
84-
83+
The Airflow DAG used in this tutorial contains 7 tasks:
8584
* **format_file_name:** according to the NYC Taxi and Limousine Commission (TLC) [documentation](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page), the files are named after the month they correspond to, for example:
86-
```
87-
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-03.parquet
85+
```text
86+
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-03.parquet
8887
```
8988
The file path above corresponds to the data from March 2022. So, to retrieve a specific file, the task gets the date and formats it to compose the name of the specific file. Important to mention that the data is released with 2 months of delay, so it had to be taken into consideration.
90-
* **process_parquet:** afterward, the name is used to download the file to local storage and then transform it from the Parquet format into CSV using CLI tools of [Apache Arrow](https://github.com/apache/arrow), as follows:
89+
* **process_parquet:** afterward, the name is used to download the file to local storage and then transform it from Parquet to CSV using [`parquet-tools`] (Apache Parquet CLI, see [Apache Arrow])
9190
* `curl -o "<LOCAL-PARQUET-FILE-PATH>" "<REMOTE-PARQUET-FILE>"`
9291
* `parquet-tools csv <LOCAL-PARQUET-FILE-PATH> > <CSV-FILE-PATH>`
9392
Both tasks are executed within one Bash Operator.
9493
* **copy_csv_to_s3:** Once the newly transformed file is available, it gets uploaded to an S3 Bucket to then, be used in the {ref}`crate-reference:sql-copy-from` SQL statement.
9594
* **copy_csv_staging:** copy the CSV file stored in S3 to the staging table described previously.
96-
* **copy_stating_to_trips:** finally, copy the data from the staging table to the trips table, casting the columns that are not in the right type yet.
95+
* **copy_staging_to_trips:** finally, copy the data from the staging table to the trips table, casting the columns that are not in the right type yet.
9796
* **delete_staging:** after it is all processed, clean up the staging table by deleting all rows, and preparing for the next file.
9897
* **delete_local_parquet_csv:** delete the files (Parquet and CSV) from the storage.
9998

@@ -114,3 +113,6 @@ are other approaches out there, we encourage you to try them out.
114113

115114
If you want to continue to explore how CrateDB can be used with Airflow, you can
116115
check other tutorials related to that topic {ref}`here <airflow-tutorials>`.
116+
117+
118+
[Apache Arrow]: https://github.com/apache/arrow

docs/integrate/airflow/import-stock-market-data.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ Let’s now create a table to store your financial data. I'm particularly intere
3434
CREATE TABLE sp500 (
3535
closing_date TIMESTAMP,
3636
ticker TEXT,
37-
adjusted_close double,
37+
adjusted_close DOUBLE PRECISION,
3838
primary key (closing_date, ticker)
3939
);
4040
```
@@ -105,22 +105,24 @@ Create a `.py` file for your DAG in your `astro-project/dags` folder; I will cal
105105

106106
### Import operators and modules
107107

108-
Let’s start by importing the necessary operator to connect to CrateDB, the `PostgresOperator`, and the decorator to define the DAG and its tasks. You will also import the `datetime`, `pendulum` modules to set up your schedule and the `yfinance`, `pandas`, and `json` modules to download and manipulate the financial data later.
108+
Import the operator used in this tutorial, `SQLExecuteQueryOperator`,
109+
and the decorator to define the DAG and its tasks. You will also import
110+
the `datetime`, `pendulum` modules to set up your schedule and the
111+
`yfinance`, `pandas`, and `json` modules to download and manipulate the
112+
financial data later.
109113
```python
110114
import datetime
111115
import math
112116
import json
113117
import logging
114118
import pendulum
115-
import requests
116-
from bs4 import BeautifulSoup
117119
import yfinance as yf
118120
import pandas as pd
119121
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
120122
from airflow.decorators import dag, task
121123
```
122124
Don’t forget to add these modules to the `requirements.txt` file inside your project like so:
123-
```
125+
```text
124126
apache-airflow-providers-postgres>=5.3.1
125127
apache-airflow-providers-common-sql>=1.3.1
126128
apache-airflow[pandas]
@@ -133,7 +135,7 @@ The next step is to declare the necessary tasks to download, prepare and insert
133135
#### Download task
134136

135137
Let's first write a function to download data from `yfinance`; I will call it `download_yfinance_data`.
136-
You can use ds for today’s date or get yesterday’s date with `airflow.macros.ds_add(ds, -1)`. You start by listing tickers from stocks of interest into a `tickers` variable. You then pass this list and the start date as arguments to the `yf.download` function and store the result in a `data `variable. `data` is a pandas data frame with various values for each stock, such as high/low, volume, dividends, and so on. Today, I will focus on the adjusted close value, so I filter data using the `Adj Close` key. Moreover, I return the data as a JSON object (instead of a data frame) because it works better with XCom, which is Airflow's mechanism to talk between tasks. Finally, you set this function as an Airflow task using the `@task` decorator and give it an execution timeout.
138+
You can use ds for today’s date or get yesterday’s date with `airflow.macros.ds_add(ds, -1)`. You start by listing tickers from stocks of interest into a `tickers` variable. You then pass this list and the start date as arguments to the `yf.download` function and store the result in a `data` variable. `data` is a pandas data frame with various values for each stock, such as high/low, volume, dividends, and so on. Today, I will focus on the adjusted close value, so I filter data using the `Adj Close` key. Moreover, I return the data as a JSON object (instead of a data frame) because it works better with XCom, which is Airflow's mechanism to talk between tasks. Finally, you set this function as an Airflow task using the `@task` decorator and give it an execution timeout.
137139
```python
138140
@task(execution_timeout=datetime.timedelta(minutes=3))
139141
def download_yfinance_data(ds=None):
@@ -232,8 +234,6 @@ import math
232234
import json
233235
import logging
234236
import pendulum
235-
import requests
236-
from bs4 import BeautifulSoup
237237
import yfinance as yf
238238
import pandas as pd
239239
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

docs/integrate/airflow/index.md

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,7 @@ journey. Spend time where it counts.
6868
:columns: 12
6969
:link: airflow-getting-started
7070
:link-type: ref
71-
Define an Airflow DAG to download, process, and store stock market data
72-
into CrateDB.
71+
Define an Airflow DAG that downloads, processes, and stores stock market data in CrateDB.
7372
:::
7473

7574
:::{grid-item-card} Tutorial: Import Parquet files
@@ -96,7 +95,7 @@ into CrateDB.
9695
:::{grid-item-card} Tutorial: Export to S3
9796
:link: airflow-export-s3
9897
:link-type: ref
99-
Recurrently export data from CrateDB to S3.
98+
Export data from CrateDB to S3 on a schedule.
10099
:::
101100

102101
:::{grid-item-card} Tutorial: Implement a data retention policy

docs/integrate/arrow/import-parquet.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ Apache Parquet is a free and open-source column-oriented data storage format. It
99

1010
## Prerequisites
1111

12-
The libraries needed are **crate**, **sqlalchemy** and **pyarrow**, so you should install them. To do so, you can use the following `pip install` command. To check the latest version supported by CrateDB, have a look at the {ref}`sqlalchemy-cratedb:index`.
12+
Install the required libraries: **sqlalchemy-cratedb** and **pyarrow**.
1313

1414
```shell
1515
pip install pyarrow sqlalchemy-cratedb
@@ -34,8 +34,7 @@ parquet_path = 'yellow_tripdata_2022-01.parquet'
3434
ny_taxi_parquet = pq.ParquetFile(parquet_path)
3535
```
3636

37-
Now, make sure to set up the SQLAlchemy engine and session as seen below. If you are not using localhost, remember to replace the URI string with your own.
38-
37+
Set up the SQLAlchemy engine and session:
3938
```python
4039
CRATE_URI = 'crate://localhost:4200'
4140

@@ -47,13 +46,14 @@ session.configure(bind=engine, autoflush=False, expire_on_commit=False)
4746

4847
## Creating the model
4948

50-
Before processing the newly imported file, the corresponding Model must be created, this is the representation of the final table, if you are using a different dataset, adapt the model to your data. Remember that the attribute name is case sensitive, so in our example **vendorID** will have the same name in CrateDB's table.
49+
Before processing the newly imported file, the corresponding Model must be created, this is the representation of the final table, if you are using a different dataset, adapt the model to your data. Attribute names are casesensitive. In this example, **VendorID** in the model maps to "VendorID" in CrateDB.
5150

5251
```python
5352
class TaxiTrip(Base):
5453
__tablename__='ny_taxi'
5554

56-
id = Column(String, primary_key=True, default=uuid4)
55+
# Generate a text UUID to match Column(String)
56+
id = Column(String, primary_key=True, default=lambda: str(uuid4()))
5757
VendorID = Column(String)
5858
tpep_pickup_datetime = Column(DateTime)
5959
tpep_dropoff_datetime = Column(DateTime)
@@ -80,7 +80,7 @@ Now if we call:
8080
```python
8181
Base.metadata.create_all(engine)
8282
```
83-
It will create the table in CrateDB **if it does not exist**, if you change the schema after creating the table, it might fail, in this case you will need to rename the table in CrateDB and adapt the model, if the data doesn't matter you can delete the table and re-create it.
83+
It creates the table in CrateDB if it does not exist. If you later change the schema, `create_all()` may fail. In that case, rename or drop the existing table, adjust the model, and recreate it.
8484

8585
For further details on how to use the SQLAlchemy with CrateDB, you can refer to the {ref}`sqlalchemy-cratedb:index`.
8686

0 commit comments

Comments
 (0)