Skip to content

Commit f652eb5

Browse files
authored
More thorough integration tests for pg, redshift, snowflake, bq (#27)
* more thorough integration tests for pg, redshift, snowflake, bq * tidy and comments * add gh workflow to run pg integration tests on PRs * fix csv name in workflow * remove psutil dependency * add dotenv dependency * update README and include path in comments
1 parent 55e19f5 commit f652eb5

19 files changed

Lines changed: 1505 additions & 407 deletions
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
name: Data Transfer Integration Tests
2+
3+
on:
4+
pull_request:
5+
6+
jobs:
7+
postgresql-connector:
8+
runs-on: ubuntu-latest
9+
10+
services:
11+
postgres:
12+
image: postgres:16
13+
env:
14+
POSTGRES_USER: test
15+
POSTGRES_PASSWORD: test
16+
POSTGRES_DB: pontoon
17+
ports:
18+
- 5432:5432
19+
options: >-
20+
--health-cmd="pg_isready -U test"
21+
--health-interval=10s
22+
--health-timeout=5s
23+
--health-retries=5
24+
25+
env:
26+
ENV: test
27+
POSTGRES_USER: test
28+
POSTGRES_PASSWORD: test
29+
POSTGRES_DATABASE: pontoon
30+
POSTGRES_HOST: localhost
31+
POSTGRES_PORT: 5432
32+
ALLOW_ORIGIN: http://localhost:3000
33+
JWT_ALGORITHM: HS256
34+
JWT_SIGNING_KEY: test_key
35+
SKIP_TRANSFERS: true
36+
37+
steps:
38+
- name: Checkout repo
39+
uses: actions/checkout@v4
40+
41+
- name: Set up Python
42+
uses: actions/setup-python@v5
43+
with:
44+
python-version: 3.12
45+
46+
- name: Wait for DB to be ready
47+
run: |
48+
for i in {1..10}; do
49+
pg_isready -h localhost -U test && break
50+
echo "Waiting for postgres..." && sleep 2
51+
done
52+
53+
- name: Run integration tests
54+
working-directory: data-transfer/pontoon
55+
run: |
56+
python -m pip install --upgrade pip
57+
pip install pytest python-dotenv
58+
pip install .
59+
60+
cat <<EOF > .env
61+
POSTGRES_HOST=localhost
62+
POSTGRES_USER=test
63+
POSTGRES_PASSWORD=test
64+
POSTGRES_DATABASE=pontoon
65+
EOF
66+
67+
PGPASSWORD=test psql -h localhost -U test -d pontoon -c "CREATE SCHEMA source;"
68+
PGPASSWORD=test psql -h localhost -U test -d pontoon -c "CREATE SCHEMA target;"
69+
PGPASSWORD=test psql -h localhost -U test -d pontoon -c "CREATE TABLE source.leads_xs (id uuid NOT NULL,created_at timestamp NOT NULL,updated_at timestamp NOT NULL,customer_id text NOT NULL,name text NOT NULL,email text NOT NULL,score integer NOT NULL,notes text);"
70+
PGPASSWORD=test psql -h localhost -U test -d pontoon -c "\COPY source.leads_xs(id,created_at,updated_at,customer_id,name,email,score,notes) FROM 'tests/data/leads_xs_20250701.csv' CSV HEADER;"
71+
72+
pytest -s tests/integration/test_postgres_connectors.py

data-transfer/pontoon/Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ build-wheel:
77
test:
88
pytest -s
99

10+
test-integration:
11+
pytest -s tests/integration/
12+
1013
install-dev:
1114
pip install -e .
1215

data-transfer/pontoon/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,5 +50,6 @@ Create schemas named `source` and `target`. Load the `tests/data/leads_xs_*.csv`
5050
Create schemas named `pontoon` and `target`. Load the same test data into `pontoon.leads_xs`
5151

5252
## Running
53+
Install test dependencies: `pytest` and `python-dotenv`.
5354

54-
`make tests` or use `pytest` directly to run specific sets of tests.
55+
`make test`, `make test-integration` or use `pytest` directly to run specific sets of tests, e.g. `pytest -s tests/integration/test_postgres_connectors.py`

data-transfer/pontoon/pontoon/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020
from pontoon.cache.memory_cache import MemoryCache
2121
from pontoon.cache.sqlite_cache import SqliteCache
2222
from pontoon.base import Namespace, Stream, Record, Dataset, Cache, Mode, Source, Destination
23+
from pontoon.base import SourceConnectionFailed, SourceStreamDoesNotExist, SourceStreamInvalidSchema
24+
from pontoon.base import DestinationConnectionFailed, DestinationStreamInvalidSchema
25+
from pontoon.base import StreamMissingField
26+
2327

2428
__sources = {}
2529
__destinations = {}

data-transfer/pontoon/pontoon/base.py

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,16 @@ def __init__(self, data:List[Any]):
3232
self.data = data
3333

3434

35+
36+
class StreamError(Exception):
37+
""" Base class for all Stream related exceptions """
38+
pass
39+
40+
class StreamMissingField(StreamError):
41+
""" Raised when a stream schema is missing a field """
42+
pass
43+
44+
3545
class Stream:
3646
"""
3747
A class to represent a typed stream of records
@@ -108,7 +118,7 @@ def _compute_checksum(self, row:List[Any]) -> str:
108118

109119

110120
def _missing_field(self, field_name:str):
111-
raise Exception(f"Stream {self.schema_name}.{self.name} does not have field: {field_name}")
121+
raise StreamMissingField(f"Stream {self.schema_name}.{self.name} does not have field: {field_name}")
112122

113123

114124
def with_field(self, field_name:str, field_type:Any, value:Any) -> 'Stream':
@@ -135,7 +145,7 @@ def with_version(self, version:str, field_name='pontoon__version') -> 'Stream':
135145

136146
def drop_field(self, field_name:str) -> 'Stream':
137147
if field_name not in self.schema.names:
138-
raise Exception(f"Stream {self.schema_name}.{self.name} does not have field: {field_name}")
148+
raise StreamMissingField(f"Stream {self.schema_name}.{self.name} does not have field: {field_name}")
139149
field_idx = self.schema.get_field_index(field_name)
140150
self.schema = self.schema.remove(field_idx)
141151
self._drop_fields.append(field_idx)
@@ -418,6 +428,7 @@ def check_batch_volume(self, ds:Dataset):
418428
pass
419429

420430

431+
421432
class Source(ABC):
422433
""" Abstract base class to represent a data source connector """
423434

@@ -441,6 +452,28 @@ def inspect_streams(self):
441452
def close(self):
442453
pass
443454

455+
456+
class SourceError(Exception):
457+
"""Base exception for all Source-related errors."""
458+
pass
459+
460+
461+
class SourceConnectionFailed(SourceError):
462+
"""Raised when the connection to the source fails."""
463+
pass
464+
465+
466+
class SourceStreamDoesNotExist(SourceError):
467+
"""Raised when a requested stream does not exist in the source."""
468+
pass
469+
470+
471+
class SourceStreamInvalidSchema(SourceError):
472+
"""Raised when the source stream has an invalid or unexpected schema."""
473+
pass
474+
475+
476+
444477

445478
class Destination(ABC):
446479
""" Abstract base class to represent a data destination connector """
@@ -459,4 +492,21 @@ def integrity(self) -> Integrity:
459492

460493
@abstractmethod
461494
def close(self):
462-
pass
495+
pass
496+
497+
498+
class DestinationError(Exception):
499+
"""Base exception for all Destination-related errors."""
500+
pass
501+
502+
503+
class DestinationConnectionFailed(DestinationError):
504+
"""Raised when the connection to the destination fails."""
505+
pass
506+
507+
508+
class DestinationStreamInvalidSchema(DestinationError):
509+
"""Raised when the destination stream has an invalid or unexpected schema."""
510+
pass
511+
512+

data-transfer/pontoon/pontoon/destination/bigquery_destination.py

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import json
22
from typing import List, Dict, Tuple, Generator, Any
3-
from sqlalchemy import create_engine, text
3+
from sqlalchemy import create_engine, inspect, MetaData, Table, text
44

55
from pontoon.base import Destination, Dataset, Stream, Record, Progress, Mode
6+
from pontoon.base import DestinationConnectionFailed, DestinationStreamInvalidSchema
7+
68
from pontoon.source.sql_source import SQLUtil
79
from pontoon.destination.sql_destination import SQLDestination
810
from pontoon.destination.gcs_destination import GCSDestination, GCSConfig
@@ -72,6 +74,9 @@ def write(self, ds:Dataset, progress_callback = None):
7274

7375
with self._connect() as conn:
7476

77+
insp = inspect(conn)
78+
metadata_obj = MetaData()
79+
7580
for stream in ds.streams:
7681

7782
# configure progress tracking
@@ -83,6 +88,11 @@ def write(self, ds:Dataset, progress_callback = None):
8388
if callable(progress_callback):
8489
progress.subscribe(progress_callback)
8590

91+
# Check if there are any records to process
92+
stream_size = ds.size(stream)
93+
if stream_size == 0:
94+
progress.message("No records to process for this stream")
95+
continue
8696

8797
# staging and target table names
8898
target_table_name = f"{stream.schema_name}.{stream.name}"
@@ -100,6 +110,15 @@ def write(self, ds:Dataset, progress_callback = None):
100110
)
101111
)
102112

113+
# delete records depending on sync mode
114+
if self._mode.type == Mode.FULL_REFRESH:
115+
progress.message("Dropping target table")
116+
SQLDestination.drop_table(conn, target_table_name)
117+
118+
# Drop staging table if it happens to exist from previous failed load
119+
# BQ does not support TEMP tables, so we use a real table - can't assume it will be cleaned up
120+
SQLDestination.drop_table(conn, stage_table_name)
121+
103122
progress.message("Running LOAD from GCS")
104123
with conn.begin():
105124
conn.execute(text(load_sql))
@@ -109,12 +128,19 @@ def write(self, ds:Dataset, progress_callback = None):
109128
with conn.begin():
110129
conn.execute(text(create_target_sql))
111130

112-
# delete records depending on sync mode
113-
if self._mode.type == Mode.FULL_REFRESH:
114-
progress.message("Truncating target table")
115-
with conn.begin():
116-
conn.execute(text(f"DELETE FROM {target_table_name} WHERE 1=1"))
117131

132+
# Check that target and stage schemas are compatible
133+
target_table = Table(stream.name, metadata_obj, schema=stream.schema_name, autoload_with=insp)
134+
target_table_schema = SQLDestination.table_ddl_to_schema(target_table.columns)
135+
136+
stage_table = Table(f"__temp_{stream.name}", metadata_obj, schema=stream.schema_name, autoload_with=insp)
137+
stage_table_schema = SQLDestination.table_ddl_to_schema(stage_table.columns)
138+
139+
# Use flexible schema comparison that ignores column order
140+
if not SQLDestination.schemas_compatible(target_table_schema, stage_table_schema):
141+
raise DestinationStreamInvalidSchema(f"Existing schema for stream {stream.name} does not match.")
142+
143+
118144
# sql to MERGE the staging table into the target table
119145
merge_sql = BigQuerySQLUtil.merge(
120146
target_table_name,

data-transfer/pontoon/pontoon/destination/postgres_destination.py

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
1+
import psycopg2
12
from psycopg2 import sql
23
from psycopg2.extras import execute_values
34
from typing import List, Dict, Tuple, Generator, Any
45

6+
from sqlalchemy.exc import SQLAlchemyError, OperationalError, NoSuchTableError
57
from pontoon.base import Destination, Dataset, Stream, Record, Progress, Mode
8+
from pontoon.base import DestinationConnectionFailed, \
9+
DestinationStreamInvalidSchema
10+
11+
612
from pontoon.destination.sql_destination import SQLDestination
713

814

@@ -44,9 +50,9 @@ def create_temp_table(table_name:str, like_table_name:str) -> str:
4450
def drop_table(table_name:str):
4551
schema, table = PostgresSQLUtil.parse_table(table_name)
4652
if schema:
47-
return sql.SQL("DROP TABLE {}.{}").format(schema, table)
53+
return sql.SQL("DROP TABLE IF EXISTS {}.{}").format(schema, table)
4854
else:
49-
return sql.SQL("DROP TABLE {}").format(table)
55+
return sql.SQL("DROP TABLE IF EXISTS {}").format(table)
5056

5157

5258
@staticmethod
@@ -147,17 +153,27 @@ def write(self, ds:Dataset, progress_callback = None):
147153
progress.message("No records to process for this stream")
148154
continue
149155

150-
with self._connect() as conn:
151-
# create target table for the stream if it doesn't exist
152-
table = SQLDestination.create_table_if_not_exists(conn, stream)
153-
154-
155-
# using the raw psycopg2 connection for efficiency
156-
conn = self._engine.raw_connection()
157-
158156
target_table_name = f"{stream.schema_name}.{stream.name}"
159157
stage_table_name = f"temp_{stream.schema_name}_{stream.name}"
160158

159+
# using the raw psycopg2 connection for efficiency
160+
try:
161+
conn = self._engine.raw_connection()
162+
except OperationalError as e:
163+
raise DestinationConnectionFailed("Could not connect to destination databse") from e
164+
165+
# Drop existing table if needed
166+
if self._mode.type == Mode.FULL_REFRESH:
167+
with conn.cursor() as cur:
168+
cur.execute(PostgresSQLUtil.drop_table(target_table_name))
169+
conn.commit()
170+
171+
172+
with self._connect() as base_conn:
173+
# create target table for the stream if it doesn't exist
174+
table = SQLDestination.create_table_if_not_exists(base_conn, stream)
175+
176+
161177
# temporary staging table
162178
create_stage_sql = PostgresSQLUtil.create_temp_table(stage_table_name, target_table_name)
163179

@@ -193,15 +209,6 @@ def write(self, ds:Dataset, progress_callback = None):
193209
conn.commit()
194210

195211
with conn.cursor() as cur:
196-
# delete all records from the table
197-
if self._mode.type == Mode.FULL_REFRESH:
198-
progress.message("Truncating target table")
199-
cur.execute(
200-
sql.SQL("DELETE FROM {}.{}").format(
201-
sql.Identifier(stream.schema_name),
202-
sql.Identifier(stream.name)
203-
)
204-
)
205212

206213
# upsert staging into target table
207214
progress.message("Upserting records into target table")

data-transfer/pontoon/pontoon/destination/redshift_destination.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from typing import List, Dict, Tuple, Generator, Any
2-
from sqlalchemy import text
2+
from sqlalchemy import text, MetaData, Table
33

44
from pontoon.base import Destination, Dataset, Stream, Record, Progress, Mode
55
from pontoon.source.sql_source import SQLUtil
@@ -76,16 +76,21 @@ def write(self, ds:Dataset, progress_callback = None):
7676
if callable(progress_callback):
7777
progress.subscribe(progress_callback)
7878

79-
# create a table for the stream if it doesn't exist
80-
table = SQLDestination.create_table_if_not_exists(conn, stream)
81-
79+
# Check if there are any records to process
80+
stream_size = ds.size(stream)
81+
if stream_size == 0:
82+
progress.message("No records to process for this stream")
83+
continue
84+
8285
target_table_name = f"{stream.schema_name}.{stream.name}"
8386
stage_table_name = f"temp_{stream.schema_name}_{stream.name}"
8487

8588
if self._mode.type == Mode.FULL_REFRESH:
8689
with conn.begin():
87-
# delete all records from the table
88-
conn.execute(table.delete())
90+
SQLDestination.drop_table(conn, target_table_name)
91+
92+
# create a table for the stream if it doesn't exist
93+
table = SQLDestination.create_table_if_not_exists(conn, stream)
8994

9095
# temporary staging table
9196
create_stage_sql = RedshiftSQLUtil.create_temp_table(stage_table_name, target_table_name)

0 commit comments

Comments
 (0)