Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
node_modules/
build_MV2/
build_MV3/
source_code.zip
source_code.zip
.DS_Store
82 changes: 82 additions & 0 deletions process_s3_objects/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# S3 to PostgreSQL data injestion

This project contains a Python script and associated files to process JSON objects from an AWS S3 bucket, ingest the data into a PostgreSQL database, and manage the processed objects within S3.

## Overview

The core of this project is the `process_s3_objects.py` script. This script connects to a specified S3 bucket, iterates through each object (JSON file), extracts relevant data, and then inserts that data into a PostgreSQL database. It also handles moving the processed files to a separate "_processed_" folder in the S3 bucket and adds a timestamped tag for tracking.

The project uses the following key components:

- `process_s3_objects.py:` The main script that orchestrates the data ingestion workflow. It connects to S3, retrieves objects, calls functions to handle data extraction and database insertion, and manages file movement within the S3 bucket.


- `database_ingestion.py:` This script contains the functions responsible for connecting to the PostgreSQL database and performing the data extraction and insertion operations. It uses secrets manager service to retrive database credentials.


- `secret_manager.py:`: A utility for securely retrieving secrets (like database credentials) from AWS Secrets Manager.


- `Dockerfile:` Defines the environment for the application, including installing necessary dependencies like boto3 and the PostgreSQL client.


- `requirements.txt:` Lists the Python libraries required for the project, such as boto3, awscli, botocore, and psycopg2-binary.


- `Makefile:` Provides convenient commands for building the Docker image and running the script.


- `table_creation.sql:` Contains the SQL commands for creating the necessary tables in the PostgreSQL database

## Dependencies:

The project relies on the python packages outlined in `requirement.txt`


## Setup and Usage:

1. ### Database setup
Before running the script, you must set up the PostgreSQL database and create the required tables.

> [!IMPORTANT]
> Created a PostgreSQL on AWS: https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/CHAP_GettingStarted.CreatingConnecting.PostgreSQL.html

Use the SQL commands in `table_creation.sql` to `create the object_dump` and `object_dump_time_stamp` tables. The `uuid-ossp` extension is used to generate UUIDs for the primary keys.

2. ### AWS configuration (for local testing)
To run the application locally on docker, the application requires access to AWS services, specifically S3 and Secrets Manager. Configure your AWS credentials and region, ensuring they are accessible to the Docker container. This can be done by mounting your local `.aws` directory.
The `secret_manager.py` script is configured to retrieve secrets from AWS Secrets Manager using the region ap-southeast-2 and a test secret named `dev-cilogon`.

Since the database is running on AWS RDS in a private subnet, the Docker container cannot connect to it directly. The project uses an SSH tunnel to securely forward traffic from your local machine to the remote database.

Before running the ingestion process, you must open the SSH tunnel by running the following command from the `Makefile` in a separate terminal:

``` bash
make ssh-db
```

This command connects to the jump box `(3.26.23.185)` and forwards a local port `(5432)` on your host machine to the remote AWS RDS instance` (testbrowserext.cjpd8tzzheam.ap-southeast-2.rds.amazonaws.com)`.
The `database_ingestion.py` script is already configured to connect to the database via this tunnel. It uses `host="host.docker.internal"` which directs the connection to the local forwarded port, which in turn routes the request to the remote database.

> [!NOTE]
> If you are setting up a new environment from scratch, you will need to change the hardcoded values for the `secret_name`, `jump box IP`, `RDS endpoint`, and `SSH key path` in the Makefile and database_ingestion.py files to match your new AWS environment.

3. ### Running with Docker:

The project includes a `Makefile` to simplify building and running the application within a Docker container.
- #### Build the Docker image:
This command builds a Docker image named `process_s3_object`s using the `Dockerfile`.

``` bash
make build
```

- #### Run the data injestion script:

This command runs the `process_s3_objects.py` script inside the Docker container. It mounts your local AWS credentials and sets the `AWS_DEFAULT_PROFILE` environment variable. The script will connect to the S3 bucket specified by the `BUCKET` variable in `process_s3_objects.py` and begin processing files. Processed files will be moved to a `processed/` folder within the same bucket.
``` bash
make run
```

4. #### Database Connection
The `database_ingestion.py` script connects to the PostgreSQL database using the host `host.docker.internal`. This is a Docker-specific hostname that allows the container to connect to the host machine's network, which is useful for development purposes. For deployment in other environments like ECS, the host would need to be changed.
17 changes: 17 additions & 0 deletions process_s3_objects/src/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
FROM python:3.9
LABEL MAINTAINER="AIO"


# Install PostgreSQL client tools
RUN apt-get update \
&& apt-get install -y --no-install-recommends postgresql-client \
&& rm -rf /var/lib/apt/lists/*

# install dependencies
COPY requirements.txt /tmp
RUN python3 -m pip install --upgrade pip setuptools \
&& python3 -m pip install -r /tmp/requirements.txt

WORKDIR /app

COPY /lib /app
25 changes: 25 additions & 0 deletions process_s3_objects/src/docker/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
NAME=process_s3_objects

GIT_HASH=$(shell git rev-parse HEAD)


all: build run

build:
docker build --pull -t ${NAME}:${GIT_HASH} .

run:
docker run -it --rm \
-v ${HOME}/.aws:/root/.aws \
-e AWS_DEFAULT_PROFILE=qut \
${NAME}:${GIT_HASH} python process_s3_objects.py

# ensure that you setup the .pem key path and the hostname
# ssh -i <path to .pem> -L 5432:<your database hostname>:5432 ec2-user@<jumpbox-ip>

ssh-db:
ssh -i /Users/E129044/Downloads/psqljumpbox.pem -L 5432:testbrowserext.cjpd8tzzheam.ap-southeast-2.rds.amazonaws.com:5432 [email protected]

# run plsql scripts through CLI for debugging and testing
run-db:
psql -h host.docker.internal -h testbrowserext.cjpd8tzzheam.ap-southeast-2.rds.amazonaws.com -U postgres -p 5432
89 changes: 89 additions & 0 deletions process_s3_objects/src/docker/lib/database_ingestion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import psycopg2
import secret_manager

secret_name="dev-cilogon"
secrets = secret_manager.get_secret_by_name(secret_name)

secrets = secret_manager.get_secret_by_name(secret_name)

def connect_db():
# connect to existing database
# TODO: Change the host for ecs
conn = psycopg2.connect(host="host.docker.internal", user = secrets['s3_posgres_username'], password = secrets['s3_postgres_password'], port="5432")
return conn

def extract_object_json_data(data):
print("\nExtracting data from object")
observer_uuid = data["observer_uuid"]
plugin_software_version = data["plugin"]["software_version"]
browser_type = data["browser"]["type"]
browser_localisation = data["browser"]["localisation"]
browser_user_agent_type = data["browser"]["user_agent_type"]
browser_user_agent_raw = data["browser"]["user_agent_raw"]
observation_query = data["observation"]["query"]
observation_time_of_retrieval = data["observation"]["time_of_retrieval"]
observation_platform = data["observation"]["platform"]
observation_raw_html = data["observation"]["raw_html"]

return (
observer_uuid,
plugin_software_version,
browser_type,
browser_localisation,
browser_user_agent_type,
browser_user_agent_raw,
observation_query,
observation_time_of_retrieval,
observation_platform,
observation_raw_html
)

def copy_data_to_db(data):
json_values = extract_object_json_data(data)
print("\nCopying extracted objects to database")
conn = connect_db()
cur = conn.cursor()

query = """
INSERT INTO object_dump (
observer_uuid,
plugin_software_version,
browser_type,
browser_localisation,
browser_user_agent_type,
browser_user_agent_raw,
observation_query,
observation_time_of_retrieval,
observation_platform,
observation_raw_html
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
"""

cur.execute (query, json_values)

conn.commit()
generated_id = cur.fetchone()[0]
print("Inserted into database with id:", generated_id)

cur.close()
conn.close()
return generated_id

def populate_audit_log(id):
print("\nPopulating timestamp")
conn = connect_db()
cur = conn.cursor()

generated_id = id
cur.execute (
"""
INSERT INTO object_dump_time_stamp (object_dump_uuid) VALUES (%s)
RETURNING id
""",
(generated_id,))
conn.commit()
id = cur.fetchone()[0]
print("Inserted into database with id:", id)
cur.close()
conn.close()
88 changes: 88 additions & 0 deletions process_s3_objects/src/docker/lib/process_s3_objects.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import boto3
import json
from datetime import datetime
import database_ingestion as db

BUCKET = "browser-extension-payload-test"
FOLDER = "processed"
TIMESTAMP=datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")

TAGS = [
{'Key': 'Project', 'Value': 'AIO'},
{'Key': 'Environment', 'Value': 'Dev'},
{'Key': 'Proccssedtime', 'Value': TIMESTAMP}
]

s3 = boto3.client("s3")

# add time stamp to the tag
def add_processed_tags(bucket, key, tags):
s3.put_object_tagging(
Bucket=bucket,
Key=key,
Tagging={
'TagSet': tags
}
)
print(f"Tags successfully added to bucket '{bucket}'.")

# Move the object to processed folder and add tags
def move_to_processed_folder(bucket, key):
old_object_name=key.split('/')[-1]
new_key = f"processed/{old_object_name}_processed"
s3.copy_object(Bucket=bucket, CopySource={"Bucket": bucket, "Key": key}, Key=new_key)
s3.delete_object(Bucket=bucket, Key=key)
print(f"Moved {key} to {new_key}")
add_processed_tags(bucket, new_key, TAGS)

# process object
def process_object(bucket,key):
try:
# get the content of the file
obj = s3.get_object(Bucket=BUCKET,Key=key)
obj_data = json.loads(obj['Body'].read())
print("Loaded Object data\n", obj_data)
except Exception as e:
print(f"Failed at load object data for {key}: {e}")
return

try:
generated_id = db.copy_data_to_db(obj_data)
except Exception as e:
print(f"Failed at DB insert for {key}: {e}")
return

try:
db.populate_audit_log(generated_id)
except Exception as e:
print(f"Failed at auditlog step {key}: {e}")
return

try:
move_to_processed_folder(bucket,key)
pass
except Exception as e:
print(f"Failed at moving file to processed folder {key}: {e}")
return

print(f"\nSuccessfully processed {key}")

# list objects using paginator
def process_all_objects(bucket):
paginator = s3.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=bucket):
for obj in page.get("Contents", []):
key = obj["Key"]
print(obj["Key"])
print(f"Retriving S3 object with key: {key}")

# skip all objects in processing folder
if key.startswith("processed/"):
print(f"Skipping already processed object: {key}")
continue

print("\nProcessing objects..\n")
process_object(bucket, key)

if __name__ == "__main__":
process_all_objects(BUCKET)
Loading