Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions .github/workflows/data-transfer-integration-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
name: Data Transfer Integration Tests

on:
pull_request:

jobs:
postgresql-connector:
runs-on: ubuntu-latest

services:
postgres:
image: postgres:16
env:
POSTGRES_USER: test
POSTGRES_PASSWORD: test
POSTGRES_DB: pontoon
ports:
- 5432:5432
options: >-
--health-cmd="pg_isready -U test"
--health-interval=10s
--health-timeout=5s
--health-retries=5

env:
ENV: test
POSTGRES_USER: test
POSTGRES_PASSWORD: test
POSTGRES_DATABASE: pontoon
POSTGRES_HOST: localhost
POSTGRES_PORT: 5432
ALLOW_ORIGIN: http://localhost:3000
JWT_ALGORITHM: HS256
JWT_SIGNING_KEY: test_key
SKIP_TRANSFERS: true

steps:
- name: Checkout repo
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: 3.12

- name: Wait for DB to be ready
run: |
for i in {1..10}; do
pg_isready -h localhost -U test && break
echo "Waiting for postgres..." && sleep 2
done

- name: Run integration tests
working-directory: data-transfer/pontoon
run: |
python -m pip install --upgrade pip
pip install pytest python-dotenv
pip install .

cat <<EOF > .env
POSTGRES_HOST=localhost
POSTGRES_USER=test
POSTGRES_PASSWORD=test
POSTGRES_DATABASE=pontoon
EOF

PGPASSWORD=test psql -h localhost -U test -d pontoon -c "CREATE SCHEMA source;"
PGPASSWORD=test psql -h localhost -U test -d pontoon -c "CREATE SCHEMA target;"
PGPASSWORD=test psql -h localhost -U test -d pontoon -c "CREATE TABLE source.leads_xs (id uuid NOT NULL,created_at timestamp NOT NULL,updated_at timestamp NOT NULL,customer_id text NOT NULL,name text NOT NULL,email text NOT NULL,score integer NOT NULL,notes text);"
PGPASSWORD=test psql -h localhost -U test -d pontoon -c "\COPY source.leads_xs(id,created_at,updated_at,customer_id,name,email,score,notes) FROM 'tests/data/leads_xs_20250701.csv' CSV HEADER;"

pytest -s tests/integration/test_postgres_connectors.py
3 changes: 3 additions & 0 deletions data-transfer/pontoon/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ build-wheel:
test:
pytest -s

test-integration:
pytest -s tests/integration/

install-dev:
pip install -e .

3 changes: 2 additions & 1 deletion data-transfer/pontoon/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,6 @@ Create schemas named `source` and `target`. Load the `tests/data/leads_xs_*.csv`
Create schemas named `pontoon` and `target`. Load the same test data into `pontoon.leads_xs`

## Running
Install test dependencies: `pytest` and `python-dotenv`.

`make tests` or use `pytest` directly to run specific sets of tests.
`make test`, `make test-integration` or use `pytest` directly to run specific sets of tests, e.g. `pytest -s tests/integration/test_postgres_connectors.py`
4 changes: 4 additions & 0 deletions data-transfer/pontoon/pontoon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
from pontoon.cache.memory_cache import MemoryCache
from pontoon.cache.sqlite_cache import SqliteCache
from pontoon.base import Namespace, Stream, Record, Dataset, Cache, Mode, Source, Destination
from pontoon.base import SourceConnectionFailed, SourceStreamDoesNotExist, SourceStreamInvalidSchema
from pontoon.base import DestinationConnectionFailed, DestinationStreamInvalidSchema
from pontoon.base import StreamMissingField


__sources = {}
__destinations = {}
Expand Down
56 changes: 53 additions & 3 deletions data-transfer/pontoon/pontoon/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ def __init__(self, data:List[Any]):
self.data = data



class StreamError(Exception):
""" Base class for all Stream related exceptions """
pass

class StreamMissingField(StreamError):
""" Raised when a stream schema is missing a field """
pass


class Stream:
"""
A class to represent a typed stream of records
Expand Down Expand Up @@ -108,7 +118,7 @@ def _compute_checksum(self, row:List[Any]) -> str:


def _missing_field(self, field_name:str):
raise Exception(f"Stream {self.schema_name}.{self.name} does not have field: {field_name}")
raise StreamMissingField(f"Stream {self.schema_name}.{self.name} does not have field: {field_name}")


def with_field(self, field_name:str, field_type:Any, value:Any) -> 'Stream':
Expand All @@ -135,7 +145,7 @@ def with_version(self, version:str, field_name='pontoon__version') -> 'Stream':

def drop_field(self, field_name:str) -> 'Stream':
if field_name not in self.schema.names:
raise Exception(f"Stream {self.schema_name}.{self.name} does not have field: {field_name}")
raise StreamMissingField(f"Stream {self.schema_name}.{self.name} does not have field: {field_name}")
field_idx = self.schema.get_field_index(field_name)
self.schema = self.schema.remove(field_idx)
self._drop_fields.append(field_idx)
Expand Down Expand Up @@ -418,6 +428,7 @@ def check_batch_volume(self, ds:Dataset):
pass



class Source(ABC):
""" Abstract base class to represent a data source connector """

Expand All @@ -441,6 +452,28 @@ def inspect_streams(self):
def close(self):
pass


class SourceError(Exception):
"""Base exception for all Source-related errors."""
pass


class SourceConnectionFailed(SourceError):
"""Raised when the connection to the source fails."""
pass


class SourceStreamDoesNotExist(SourceError):
"""Raised when a requested stream does not exist in the source."""
pass


class SourceStreamInvalidSchema(SourceError):
"""Raised when the source stream has an invalid or unexpected schema."""
pass




class Destination(ABC):
""" Abstract base class to represent a data destination connector """
Expand All @@ -459,4 +492,21 @@ def integrity(self) -> Integrity:

@abstractmethod
def close(self):
pass
pass


class DestinationError(Exception):
"""Base exception for all Destination-related errors."""
pass


class DestinationConnectionFailed(DestinationError):
"""Raised when the connection to the destination fails."""
pass


class DestinationStreamInvalidSchema(DestinationError):
"""Raised when the destination stream has an invalid or unexpected schema."""
pass


38 changes: 32 additions & 6 deletions data-transfer/pontoon/pontoon/destination/bigquery_destination.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import json
from typing import List, Dict, Tuple, Generator, Any
from sqlalchemy import create_engine, text
from sqlalchemy import create_engine, inspect, MetaData, Table, text

from pontoon.base import Destination, Dataset, Stream, Record, Progress, Mode
from pontoon.base import DestinationConnectionFailed, DestinationStreamInvalidSchema

from pontoon.source.sql_source import SQLUtil
from pontoon.destination.sql_destination import SQLDestination
from pontoon.destination.gcs_destination import GCSDestination, GCSConfig
Expand Down Expand Up @@ -72,6 +74,9 @@ def write(self, ds:Dataset, progress_callback = None):

with self._connect() as conn:

insp = inspect(conn)
metadata_obj = MetaData()

for stream in ds.streams:

# configure progress tracking
Expand All @@ -83,6 +88,11 @@ def write(self, ds:Dataset, progress_callback = None):
if callable(progress_callback):
progress.subscribe(progress_callback)

# Check if there are any records to process
stream_size = ds.size(stream)
if stream_size == 0:
progress.message("No records to process for this stream")
continue
Comment on lines +92 to +95

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to have this at a higher level instead of having it in each destination connector?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it'd definitely be possible in the orchestration layer (somewhere around here) to remove zero-length streams from the Dataset before passing it to destination.write(), but I kind of think the connectors should be able to gracefully handle an empty stream since nothing in the interface contract prevents passing one in. Another thing we could do (but might actually be less clear) would be making the Dataset.streams iterator skip empty streams, but that limits a connector's ability to do something (bookkeeping / logging?) with empty streams.


# staging and target table names
target_table_name = f"{stream.schema_name}.{stream.name}"
Expand All @@ -100,6 +110,15 @@ def write(self, ds:Dataset, progress_callback = None):
)
)

# delete records depending on sync mode
if self._mode.type == Mode.FULL_REFRESH:
progress.message("Dropping target table")
SQLDestination.drop_table(conn, target_table_name)

# Drop staging table if it happens to exist from previous failed load
# BQ does not support TEMP tables, so we use a real table - can't assume it will be cleaned up
SQLDestination.drop_table(conn, stage_table_name)

progress.message("Running LOAD from GCS")
with conn.begin():
conn.execute(text(load_sql))
Expand All @@ -109,12 +128,19 @@ def write(self, ds:Dataset, progress_callback = None):
with conn.begin():
conn.execute(text(create_target_sql))

# delete records depending on sync mode
if self._mode.type == Mode.FULL_REFRESH:
progress.message("Truncating target table")
with conn.begin():
conn.execute(text(f"DELETE FROM {target_table_name} WHERE 1=1"))

# Check that target and stage schemas are compatible
target_table = Table(stream.name, metadata_obj, schema=stream.schema_name, autoload_with=insp)
target_table_schema = SQLDestination.table_ddl_to_schema(target_table.columns)

stage_table = Table(f"__temp_{stream.name}", metadata_obj, schema=stream.schema_name, autoload_with=insp)
stage_table_schema = SQLDestination.table_ddl_to_schema(stage_table.columns)

# Use flexible schema comparison that ignores column order
if not SQLDestination.schemas_compatible(target_table_schema, stage_table_schema):
raise DestinationStreamInvalidSchema(f"Existing schema for stream {stream.name} does not match.")


# sql to MERGE the staging table into the target table
merge_sql = BigQuerySQLUtil.merge(
target_table_name,
Expand Down
45 changes: 26 additions & 19 deletions data-transfer/pontoon/pontoon/destination/postgres_destination.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import psycopg2
from psycopg2 import sql
from psycopg2.extras import execute_values
from typing import List, Dict, Tuple, Generator, Any

from sqlalchemy.exc import SQLAlchemyError, OperationalError, NoSuchTableError
from pontoon.base import Destination, Dataset, Stream, Record, Progress, Mode
from pontoon.base import DestinationConnectionFailed, \
DestinationStreamInvalidSchema


from pontoon.destination.sql_destination import SQLDestination


Expand Down Expand Up @@ -44,9 +50,9 @@ def create_temp_table(table_name:str, like_table_name:str) -> str:
def drop_table(table_name:str):
schema, table = PostgresSQLUtil.parse_table(table_name)
if schema:
return sql.SQL("DROP TABLE {}.{}").format(schema, table)
return sql.SQL("DROP TABLE IF EXISTS {}.{}").format(schema, table)
else:
return sql.SQL("DROP TABLE {}").format(table)
return sql.SQL("DROP TABLE IF EXISTS {}").format(table)


@staticmethod
Expand Down Expand Up @@ -147,17 +153,27 @@ def write(self, ds:Dataset, progress_callback = None):
progress.message("No records to process for this stream")
continue

with self._connect() as conn:
# create target table for the stream if it doesn't exist
table = SQLDestination.create_table_if_not_exists(conn, stream)


# using the raw psycopg2 connection for efficiency
conn = self._engine.raw_connection()

target_table_name = f"{stream.schema_name}.{stream.name}"
stage_table_name = f"temp_{stream.schema_name}_{stream.name}"

# using the raw psycopg2 connection for efficiency
try:
conn = self._engine.raw_connection()
except OperationalError as e:
raise DestinationConnectionFailed("Could not connect to destination databse") from e

# Drop existing table if needed
if self._mode.type == Mode.FULL_REFRESH:
with conn.cursor() as cur:
cur.execute(PostgresSQLUtil.drop_table(target_table_name))
conn.commit()


with self._connect() as base_conn:
# create target table for the stream if it doesn't exist
table = SQLDestination.create_table_if_not_exists(base_conn, stream)


# temporary staging table
create_stage_sql = PostgresSQLUtil.create_temp_table(stage_table_name, target_table_name)

Expand Down Expand Up @@ -193,15 +209,6 @@ def write(self, ds:Dataset, progress_callback = None):
conn.commit()

with conn.cursor() as cur:
# delete all records from the table
if self._mode.type == Mode.FULL_REFRESH:
progress.message("Truncating target table")
cur.execute(
sql.SQL("DELETE FROM {}.{}").format(
sql.Identifier(stream.schema_name),
sql.Identifier(stream.name)
)
)

# upsert staging into target table
progress.message("Upserting records into target table")
Expand Down
17 changes: 11 additions & 6 deletions data-transfer/pontoon/pontoon/destination/redshift_destination.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from typing import List, Dict, Tuple, Generator, Any
from sqlalchemy import text
from sqlalchemy import text, MetaData, Table

from pontoon.base import Destination, Dataset, Stream, Record, Progress, Mode
from pontoon.source.sql_source import SQLUtil
Expand Down Expand Up @@ -76,16 +76,21 @@ def write(self, ds:Dataset, progress_callback = None):
if callable(progress_callback):
progress.subscribe(progress_callback)

# create a table for the stream if it doesn't exist
table = SQLDestination.create_table_if_not_exists(conn, stream)

# Check if there are any records to process
stream_size = ds.size(stream)
if stream_size == 0:
progress.message("No records to process for this stream")
continue

target_table_name = f"{stream.schema_name}.{stream.name}"
stage_table_name = f"temp_{stream.schema_name}_{stream.name}"

if self._mode.type == Mode.FULL_REFRESH:
with conn.begin():
# delete all records from the table
conn.execute(table.delete())
SQLDestination.drop_table(conn, target_table_name)

# create a table for the stream if it doesn't exist
table = SQLDestination.create_table_if_not_exists(conn, stream)

# temporary staging table
create_stage_sql = RedshiftSQLUtil.create_temp_table(stage_table_name, target_table_name)
Expand Down
Loading