This is part of my technical assessment for the Data Engineer role at Yayasan Peneraju. This project implements an ETL (Extract, Transform, Load) pipeline using Apache Airflow to process daily sales data. The pipeline extracts raw CSV files from an S3 bucket, cleans and aggregates the data, performs quality checks, and loads the transformed data into a PostgreSQL database.
The ETL pipeline automates the following tasks:
- Extract: Downloads the latest daily sales data CSV file from an S3 bucket.
- Transform: Cleans and aggregates the data by product category, ensuring consistency and removing invalid records.
- Quality Check: Validates that the transformed data contains at least one row before loading.
- Load: Stores the transformed data in a PostgreSQL database.
The pipeline is orchestrated using Apache Airflow, which ensures reliable scheduling and task dependencies. It is containerized using Docker for easy deployment and reproducibility.
The project is organized as follows:
sales-data-pipeline/
├── dags/
│ └── sales_etl_dag.py # Airflow DAG definition
├── data/
│ └── sales_data_21_May_2025.csv # Sample raw data file
├── scripts/
│ ├── extract_sales_data.py # Script to download data from S3
│ ├── transform_sales_data.py # Script to clean and aggregate data
│ ├── load_to_postgres.py # Script to load data into PostgreSQL
│ ├── data_quality_check.py # Script for data quality checks
| └── __init__.py # Package initialization
│ └── utils/
│ ├── logger.py # Logging utilities
│ └── __init__.py # Package initialization
├── tests/
│ └── test_transform_sales_data.py # Unit tests for transformation logic
├── docker-compose.yml # Docker configuration for Airflow
├── .env # Environment variables (e.g., AWS credentials, DB credentials)
├── README.md # Documentation
└── requirements.txt # Python dependencies
- Purpose: Defines the workflow of the ETL pipeline.
- Tasks:
extract_task: Downloads the latest sales data from S3.transform_task: Cleans and aggregates the data.quality_check_task: Validates the transformed data.load_task: Loads the cleaned data into PostgreSQL.
- Schedule: Runs daily at midnight (
@daily).
- Purpose: Downloads the latest sales data CSV file from an S3 bucket.
- Features:
- Uses
boto3to interact with S3. - Logs the download process and handles errors gracefully.
- Stores the downloaded file locally.
- Uses
- Purpose: Cleans and aggregates the raw sales data.
- Steps:
- Assigns column names to the raw data.
- Standardizes product categories (e.g., "Vintage Cars", "Trains").
- Converts sales values to numeric format and removes invalid entries.
- Removes outliers in sales data.
- Aggregates total sales by product category.
- Output: A cleaned CSV file stored in
/tmp.
- Purpose: Loads the transformed data into a PostgreSQL database.
- Features:
- Connects to PostgreSQL using environment variables.
- Creates the
sales_by_categorytable if it doesn't exist. - Uses batch inserts for efficient loading.
- Logs the loading process and handles errors.
- Purpose: Ensures the transformed data is valid.
- Check: Verifies that the transformed data contains at least one row.
- Action: Raises an error if the row count is zero.
- Purpose: Provides consistent logging across all scripts.
- Features:
- Logs to both console and timestamped log files in the
logs/directory. - Includes detailed error handling and debugging information.
- Logs to both console and timestamped log files in the
- Purpose: Ensures the transformation logic works as expected.
- Tests:
- Validates data cleaning and aggregation.
- Checks for proper handling of missing or invalid values.
I found this sample data from Kaggle and opted to use it in this assessment. The raw sales data is exported daily from an ERP system and stored in an S3 bucket. The data is in CSV format and includes the following columns:
| Column Name | Description |
|---|---|
order_id |
Unique identifier for each order. |
quantity_ordered |
Quantity of items ordered. |
price_each |
Price per item. |
order_line_number |
Line number within the order. |
sales |
Total sales amount for the order line. |
order_date |
Date of the order. |
status |
Status of the order (e.g., completed, pending). |
customer_id |
Identifier for the customer. |
month_id |
Month ID for the order. |
year_id |
Year ID for the order. |
product_line |
Category of the product (e.g., Vintage Cars). |
msrp |
Manufacturer's suggested retail price. |
product_code |
Code for the product. |
customer_name |
Name of the customer. |
phone |
Customer's phone number. |
address_line1 |
First line of the customer's address. |
address_line2 |
Second line of the customer's address. |
city |
City of the customer's address. |
state |
State of the customer's address. |
postal_code |
Postal code of the customer's address. |
country |
Country of the customer's address. |
territory |
Sales territory. |
contact_last_name |
Last name of the contact person. |
contact_first_name |
First name of the contact person. |
deal_size |
Size of the deal (e.g., small, medium, large). |
order_id,quantity_ordered,price_each,order_line_number,sales,order_date,status,customer_id,month_id,year_id,product_line,msrp,product_code,customer_name,phone,address_line1,address_line2,city,state,postal_code,country,territory,contact_last_name,contact_first_name,deal_size
10001,2,500,1,1000,2025-05-21,Completed,101,5,2025,Vintage Cars,500,P1001,John Doe,+1234567890,123 Main St,,Anytown,CA,90210,USA,North America,Doe,John,Large
10002,1,200,1,200,2025-05-21,Completed,102,5,2025,Trains,200,P1002,Jane Smith,+1987654321,456 Elm St,,Othertown,CA,90211,USA,North America,Smith,Jane,Small
The transformed data is aggregated by product_line and includes the following columns:
| Column Name | Description |
|---|---|
category |
Product category (e.g., Vintage Cars, Trains). |
total_sales |
Total sales amount for the category. |
category,total_sales
Vintage Cars,1200
Trains,200
The transformed data is loaded into the sales_by_category table in PostgreSQL.
CREATE TABLE IF NOT EXISTS sales_by_category (
id SERIAL PRIMARY KEY,
category VARCHAR(255) NOT NULL,
total_sales NUMERIC NOT NULL
);| id | category | total_sales |
|---|---|---|
| 1 | Classic Cars | 120000 |
| 2 | Trains | 80000 |
| 3 | Vintage Cars | 95000 |
| 4 | Motorcycles | 70000 |
✅ This table gives a clear, clean summary of daily sales by product line, suitable for reporting and analytics.
- Docker: Ensure Docker is installed on your machine.
- AWS Credentials: Configure AWS access keys in the
.envfile or environment variables. - PostgreSQL: Set up a PostgreSQL instance locally or remotely. Update the database connection details in the
.envfile.
-
Clone the Repository
git clone https://github.com/your-username/sales-data-pipeline.git cd sales-data-pipeline -
Install Dependencies
pip install -r requirements.txt
-
Configure Environment Variables Create a
.envfile with the following variables:AIRFLOW_UID=50000 S3_BUCKET_NAME=your-s3-bucket-name S3_PREFIX=sales/ AWS_ACCESS_KEY_ID=your-access-key-id AWS_SECRET_ACCESS_KEY=your-secret-access-key PG_HOST=localhost PG_DATABASE=airflow PG_USER=airflow PG_PASSWORD=airflow PG_PORT=5432
-
Start Airflow Using Docker
docker-compose up -d
-
Access Airflow UI Open http://localhost:8080 in your browser. Use the default credentials:
Username: airflow Password: airflowyour airflow dags should look like this
-
Trigger the DAG
- Navigate to the "DAGs" tab in the Airflow UI.
- Locate the
sales_etl_pipelineDAG. - Trigger the DAG manually or wait for the scheduled run (
@daily).
- Airflow for Orchestration:
- Airflow is used to define the DAG and manage task dependencies, retries, and scheduling.
- Docker for Containerization:
- The pipeline is containerized using Docker for ease of deployment and reproducibility.
- Logging:
- A centralized logging mechanism is implemented using Python's
loggingmodule, with logs stored in timestamped files under thelogs/directory.
- A centralized logging mechanism is implemented using Python's
- Error Handling:
- Each script includes robust error handling with detailed logging and exception propagation.
- Batch Inserts:
- PostgreSQL inserts use batch processing (
execute_batch) for improved performance.
- PostgreSQL inserts use batch processing (
- S3 Bucket Access:
- The S3 bucket is accessible with the provided AWS credentials.
- PostgreSQL Connection:
- The PostgreSQL database is reachable with the specified connection details.
- File Format:
- The input CSV file has a consistent structure and column names.
-
Start Airflow:
docker-compose up -d
-
Access Airflow UI:
- Open http://localhost:8080.
- Trigger the
sales_etl_pipelineDAG manually or wait for the scheduled run.
-
Monitor Logs:
- Logs are stored in the
logs/directory. - Real-time logs can also be viewed in the Airflow UI.
- Logs are stored in the
-
Verify Results:
- Check the
sales_by_categorytable in PostgreSQL to ensure data is loaded correctly. - Review the DAG status in the Airflow UI to confirm successful execution.
- Check the
Run unit tests for the transformation logic:
python -m pytest tests/test_transform_sales_data.py