diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index acc9af19b..1d88599a5 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -14,6 +14,7 @@ concurrency: jobs: tests: + name: "Regular, Python ${{ matrix.python-version }}" strategy: matrix: # I tried running stuff on macOS but it was too slow and unreliable. @@ -64,3 +65,22 @@ jobs: run: make test-ci - name: check the formatting run: make lint-ci + + tests-cratedb: + name: "CrateDB" + runs-on: ubuntu-latest + steps: + - name: Acquire sources + uses: actions/checkout@v4 + - name: Install Microsoft ODBC + run: sudo ACCEPT_EULA=Y apt-get install msodbcsql18 -y + - name: Install Python + uses: actions/setup-python@v5 + with: + python-version: 3.12 + - name: Install uv + uses: astral-sh/setup-uv@v6 + - name: Install project + run: make deps-ci + - name: Run tests + run: make test-ci-cratedb diff --git a/Makefile b/Makefile index 186509fe3..7e5bebdc6 100644 --- a/Makefile +++ b/Makefile @@ -22,7 +22,10 @@ deps-ci: uv pip install --system -r requirements-dev.txt test-ci: - set -a; source test.env; set +a; TESTCONTAINERS_RYUK_DISABLED=true pytest -n auto -x -rP -vv --tb=short --durations=10 --cov=ingestr --no-cov-on-fail + set -a; source test.env; set +a; TESTCONTAINERS_RYUK_DISABLED=true pytest -n auto -x -rP -vv --tb=short --durations=10 --cov=ingestr --no-cov-on-fail -k "not cratedb" + +test-ci-cratedb: + set -a; source test.env; set +a; TESTCONTAINERS_RYUK_DISABLED=true pytest -rP -vv --tb=short --durations=10 --cov=ingestr --no-cov-on-fail -k "cratedb" test : venv lock-deps . venv/bin/activate; $(MAKE) test-ci diff --git a/ingestr/main_test.py b/ingestr/main_test.py index 738d46dd3..051969a81 100644 --- a/ingestr/main_test.py +++ b/ingestr/main_test.py @@ -30,6 +30,9 @@ import requests import sqlalchemy from confluent_kafka import Producer # type: ignore +from cratedb_toolkit.testing.testcontainers.cratedb import ( # type: ignore + CrateDBContainer, +) from dlt.sources.filesystem import glob_files from fsspec.implementations.memory import MemoryFileSystem # type: ignore from sqlalchemy.pool import NullPool @@ -42,6 +45,7 @@ from testcontainers.mysql import MySqlContainer # type: ignore from testcontainers.postgres import PostgresContainer # type: ignore from typer.testing import CliRunner +from yarl import URL from ingestr.main import app from ingestr.src.appstore.errors import ( @@ -525,10 +529,44 @@ def start_fully(self) -> str: return conn_url +class CrateDbDockerImage(DockerImage): + """ + The CrateDB destination uses the PostgreSQL protocol (default port 5432). + """ + + def start_fully(self) -> str: + self.container = self.container_creator() + if self.container is None: + raise ValueError("Container is not initialized.") + + port5432 = int(self.container.get_exposed_port(5432)) + url = ( + URL(self.container.get_connection_url()) + .with_scheme("cratedb") + .with_port(port5432) + ) + + conn_url = str(url) + with open(f"{self.container_lock_dir}/{self.id}", "w") as f: + f.write(conn_url) + + return conn_url + + class EphemeralDuckDb: + def __init__(self) -> None: + self.abs_path = get_abs_path(f"./testdata/duckdb_{get_random_string(5)}.db") + self.connection_url = f"duckdb:///{self.abs_path}" + this = self + + class ContainerSurrogate: + def get_connection_url(self) -> str: + return this.connection_url + + self.container = ContainerSurrogate() + def start(self) -> str: - abs_path = get_abs_path(f"./testdata/duckdb_{get_random_string(5)}.db") - return f"duckdb:///{abs_path}" + return self.connection_url def start_fully(self) -> str: # type: ignore pass @@ -551,6 +589,7 @@ def stop_fully(self): MYSQL8_IMAGE = "mysql:8.4.1" MSSQL22_IMAGE = "mcr.microsoft.com/mssql/server:2022-CU13-ubuntu-22.04" CLICKHOUSE_IMAGE = "clickhouse/clickhouse-server:24.12" +CRATEDB_IMAGE = "crate:5.10" pgDocker = DockerImage( "postgres", lambda: PostgresContainer(POSTGRES_IMAGE, driver=None).start() @@ -558,6 +597,10 @@ def stop_fully(self): clickHouseDocker = ClickhouseDockerImage( "clickhouse", lambda: ClickHouseContainer(CLICKHOUSE_IMAGE).start() ) +crateDbDocker = CrateDbDockerImage( + "cratedb", + lambda: CrateDBContainer(CRATEDB_IMAGE, ports={4200: None, 5432: None}).start(), +) mysqlDocker = DockerImage( "mysql", lambda: MySqlContainer(MYSQL8_IMAGE, username="root").start() ) @@ -577,6 +620,7 @@ def stop_fully(self): "postgres": pgDocker, "duckdb": EphemeralDuckDb(), "clickhouse+native": clickHouseDocker, + "cratedb": crateDbDocker, } @@ -607,17 +651,49 @@ def patched_dlt_dest(uri, **kwargs): patcher.stop() +def get_uri_read(url: str, image: DockerImage) -> str: + """ + Some databases need different URLs (destination vs. reading back). + + CrateDB uses `cratedb://` for destination addressing, + but `crate://` for reading back, as the latter is the + designated scheme of the SQLAlchemy dialect. + + In abundance to that, `cratedb://` uses the PostgreSQL protocol + (default port 5432), while `crate://` uses the HTTP protocol + (default port 4200). + + C'est la vie. ¯\\_(ツ)_/¯ + """ + uri = URL(url) + if uri.scheme == "cratedb": + if image.container is None: + raise RuntimeError("Needs a container to determine exposed port") + port4200 = int(image.container.get_exposed_port(4200)) + uri = uri.with_scheme("crate").with_port(port4200) + return str(uri) + return url + + @pytest.mark.parametrize( "dest", list(DESTINATIONS.values()), ids=list(DESTINATIONS.keys()) ) @pytest.mark.parametrize("source", list(SOURCES.values()), ids=list(SOURCES.keys())) def test_create_replace(source, dest): + if isinstance(source.container, SqlServerContainer) and isinstance( + dest, CrateDbDockerImage + ): + pytest.skip( + "CrateDB type mapping does not support `DATE` yet, " + "see https://github.com/crate-workbench/ingestr/issues/4" + ) with ThreadPoolExecutor() as executor: source_future = executor.submit(source.start) dest_future = executor.submit(dest.start) source_uri = source_future.result() dest_uri = dest_future.result() - db_to_db_create_replace(source_uri, dest_uri) + dest_uri_read = get_uri_read(dest_uri, dest) + db_to_db_create_replace(source_uri, dest_uri, dest_uri_read) source.stop() dest.stop() @@ -627,12 +703,18 @@ def test_create_replace(source, dest): ) @pytest.mark.parametrize("source", list(SOURCES.values()), ids=list(SOURCES.keys())) def test_append(source, dest): + if isinstance(dest, CrateDbDockerImage): + pytest.skip( + "CrateDB support for 'append' strategy pending, " + "see https://github.com/crate-workbench/ingestr/issues/6" + ) with ThreadPoolExecutor() as executor: source_future = executor.submit(source.start) dest_future = executor.submit(dest.start) source_uri = source_future.result() dest_uri = dest_future.result() - db_to_db_append(source_uri, dest_uri) + dest_uri_read = get_uri_read(dest_uri, dest) + db_to_db_append(source_uri, dest_uri, dest_uri_read) source.stop() dest.stop() @@ -642,12 +724,18 @@ def test_append(source, dest): ) @pytest.mark.parametrize("source", list(SOURCES.values()), ids=list(SOURCES.keys())) def test_merge_with_primary_key(source, dest): + if isinstance(dest, CrateDbDockerImage): + pytest.skip( + "CrateDB support for 'merge' strategy pending, " + "see https://github.com/crate/dlt-cratedb/issues/14" + ) with ThreadPoolExecutor() as executor: source_future = executor.submit(source.start) dest_future = executor.submit(dest.start) source_uri = source_future.result() dest_uri = dest_future.result() - db_to_db_merge_with_primary_key(source_uri, dest_uri) + dest_uri_read = get_uri_read(dest_uri, dest) + db_to_db_merge_with_primary_key(source_uri, dest_uri, dest_uri_read) source.stop() dest.stop() @@ -657,12 +745,18 @@ def test_merge_with_primary_key(source, dest): ) @pytest.mark.parametrize("source", list(SOURCES.values()), ids=list(SOURCES.keys())) def test_delete_insert_without_primary_key(source, dest): + if isinstance(dest, CrateDbDockerImage): + pytest.skip( + "CrateDB support for 'delete+insert' strategy pending, " + "see https://github.com/crate/dlt-cratedb/issues/14" + ) with ThreadPoolExecutor() as executor: source_future = executor.submit(source.start) dest_future = executor.submit(dest.start) source_uri = source_future.result() dest_uri = dest_future.result() - db_to_db_delete_insert_without_primary_key(source_uri, dest_uri) + dest_uri_read = get_uri_read(dest_uri, dest) + db_to_db_delete_insert_without_primary_key(source_uri, dest_uri, dest_uri_read) source.stop() dest.stop() @@ -672,29 +766,44 @@ def test_delete_insert_without_primary_key(source, dest): ) @pytest.mark.parametrize("source", list(SOURCES.values()), ids=list(SOURCES.keys())) def test_delete_insert_with_time_range(source, dest): + if isinstance(dest, CrateDbDockerImage): + pytest.skip( + "CrateDB support for 'delete+insert' strategy pending, " + "see https://github.com/crate/dlt-cratedb/issues/14" + ) with ThreadPoolExecutor() as executor: source_future = executor.submit(source.start) dest_future = executor.submit(dest.start) source_uri = source_future.result() dest_uri = dest_future.result() - db_to_db_delete_insert_with_timerange(source_uri, dest_uri) + dest_uri_read = get_uri_read(dest_uri, dest) + db_to_db_delete_insert_with_timerange(source_uri, dest_uri, dest_uri_read) source.stop() dest.stop() -def db_to_db_create_replace(source_connection_url: str, dest_connection_url: str): +def db_to_db_create_replace( + source_connection_url: str, + dest_connection_url: str, + dest_connection_url_read: str, +): schema_rand_prefix = f"testschema_create_replace_{get_random_string(5)}" try: shutil.rmtree(get_abs_path("../pipeline_data")) except Exception: pass + # CrateDB: Compensate for "Type `date` does not support storage". + updated_at_type = "DATE" + if dest_connection_url.startswith("cratedb://"): + updated_at_type = "TIMESTAMP" + source_engine = sqlalchemy.create_engine(source_connection_url) with source_engine.begin() as conn: conn.execute(f"DROP SCHEMA IF EXISTS {schema_rand_prefix}") conn.execute(f"CREATE SCHEMA {schema_rand_prefix}") conn.execute( - f"CREATE TABLE {schema_rand_prefix}.input (id INTEGER, val VARCHAR(20), updated_at DATE)" + f"CREATE TABLE {schema_rand_prefix}.input (id INTEGER, val VARCHAR(20), updated_at {updated_at_type})" ) conn.execute( f"INSERT INTO {schema_rand_prefix}.input VALUES (1, 'val1', '2022-01-01')" @@ -717,30 +826,48 @@ def db_to_db_create_replace(source_connection_url: str, dest_connection_url: str assert result.exit_code == 0 - dest_engine = sqlalchemy.create_engine(dest_connection_url) + dest_engine = sqlalchemy.create_engine(dest_connection_url_read) + # CrateDB needs an explicit flush to make data available for reads immediately. + if dest_engine.dialect.name == "crate": + dest_engine.execute(f"REFRESH TABLE {schema_rand_prefix}.output") res = dest_engine.execute( f"select id, val, updated_at from {schema_rand_prefix}.output" ).fetchall() dest_engine.dispose() assert len(res) == 2 - assert res[0] == (1, "val1", as_datetime("2022-01-01")) - assert res[1] == (2, "val2", as_datetime("2022-02-01")) + # Compensate for CrateDB types and insert order. + if dest_connection_url.startswith("cratedb://"): + assert (1, "val1", 1640995200000) in res + assert (2, "val2", 1643673600000) in res + else: + assert res[0] == (1, "val1", as_datetime("2022-01-01")) + assert res[1] == (2, "val2", as_datetime("2022-02-01")) -def db_to_db_append(source_connection_url: str, dest_connection_url: str): + +def db_to_db_append( + source_connection_url: str, + dest_connection_url: str, + dest_connection_url_read: str, +): schema_rand_prefix = f"testschema_append_{get_random_string(5)}" try: shutil.rmtree(get_abs_path("../pipeline_data")) except Exception: pass + # CrateDB: Compensate for "Type `date` does not support storage". + updated_at_type = "DATE" + if dest_connection_url.startswith("cratedb://"): + updated_at_type = "TIMESTAMP" + source_engine = sqlalchemy.create_engine(source_connection_url) with source_engine.begin() as conn: conn.execute(f"DROP SCHEMA IF EXISTS {schema_rand_prefix}") conn.execute(f"CREATE SCHEMA {schema_rand_prefix}") conn.execute( - f"CREATE TABLE {schema_rand_prefix}.input (id INTEGER, val VARCHAR(20), updated_at DATE)" + f"CREATE TABLE {schema_rand_prefix}.input (id INTEGER, val VARCHAR(20), updated_at {updated_at_type})" ) conn.execute( f"INSERT INTO {schema_rand_prefix}.input VALUES (1, 'val1', '2022-01-01'), (2, 'val2', '2022-01-02')" @@ -764,7 +891,10 @@ def run(): assert res.exit_code == 0 def get_output_table(): - dest_engine = sqlalchemy.create_engine(dest_connection_url) + dest_engine = sqlalchemy.create_engine(dest_connection_url_read) + # CrateDB needs an explicit flush to make data available for reads immediately. + if dest_engine.dialect.name == "crate": + dest_engine.execute(f"REFRESH TABLE {schema_rand_prefix}.output") results = dest_engine.execute( f"select id, val, updated_at from {schema_rand_prefix}.output order by id asc" ).fetchall() @@ -775,20 +905,34 @@ def get_output_table(): res = get_output_table() assert len(res) == 2 - assert res[0] == (1, "val1", as_datetime("2022-01-01")) - assert res[1] == (2, "val2", as_datetime("2022-01-02")) + + # Compensate for CrateDB types and insert order. + if dest_connection_url.startswith("cratedb://"): + assert (1, "val1", 1640995200000) in res + assert (2, "val2", 1641081600000) in res + else: + assert res[0] == (1, "val1", as_datetime("2022-01-01")) + assert res[1] == (2, "val2", as_datetime("2022-01-02")) # # run again, nothing should be inserted into the output table run() res = get_output_table() assert len(res) == 2 - assert res[0] == (1, "val1", as_datetime("2022-01-01")) - assert res[1] == (2, "val2", as_datetime("2022-01-02")) + + # Compensate for CrateDB types and insert order. + if dest_connection_url.startswith("cratedb://"): + assert (1, "val1", 1640995200000) in res + assert (2, "val2", 1641081600000) in res + else: + assert res[0] == (1, "val1", as_datetime("2022-01-01")) + assert res[1] == (2, "val2", as_datetime("2022-01-02")) def db_to_db_merge_with_primary_key( - source_connection_url: str, dest_connection_url: str + source_connection_url: str, + dest_connection_url: str, + dest_connection_url_read: str, ): schema_rand_prefix = f"testschema_merge_{get_random_string(5)}" try: @@ -796,12 +940,17 @@ def db_to_db_merge_with_primary_key( except Exception: pass + # CrateDB: Compensate for "Type `date` does not support storage". + updated_at_type = "DATE" + if dest_connection_url.startswith("cratedb://"): + updated_at_type = "TIMESTAMP" + source_engine = sqlalchemy.create_engine(source_connection_url) with source_engine.begin() as conn: conn.execute(f"DROP SCHEMA IF EXISTS {schema_rand_prefix}") conn.execute(f"CREATE SCHEMA {schema_rand_prefix}") conn.execute( - f"CREATE TABLE {schema_rand_prefix}.input (id INTEGER NOT NULL, val VARCHAR(20), updated_at DATE NOT NULL)" + f"CREATE TABLE {schema_rand_prefix}.input (id INTEGER NOT NULL, val VARCHAR(20), updated_at {updated_at_type} NOT NULL)" ) conn.execute( f"INSERT INTO {schema_rand_prefix}.input VALUES (1, 'val1', '2022-01-01')" @@ -831,9 +980,12 @@ def run(): assert res.exit_code == 0 return res - dest_engine = sqlalchemy.create_engine(dest_connection_url) + dest_engine = sqlalchemy.create_engine(dest_connection_url_read) def get_output_rows(): + # CrateDB needs an explicit flush to make data available for reads immediately. + if dest_engine.dialect.name == "crate": + dest_engine.execute(f"REFRESH TABLE {schema_rand_prefix}.output") return dest_engine.execute( f"select id, val, updated_at from {schema_rand_prefix}.output order by id asc" ).fetchall() @@ -846,9 +998,17 @@ def assert_output_equals(expected): dest_engine.dispose() res = run() - assert_output_equals( - [(1, "val1", as_datetime("2022-01-01")), (2, "val2", as_datetime("2022-02-01"))] - ) + + # Compensate for CrateDB types and insert order. + if dest_connection_url.startswith("cratedb://"): + assert_output_equals([(1, "val1", 1640995200000), (2, "val2", 1643673600000)]) + else: + assert_output_equals( + [ + (1, "val1", as_datetime("2022-01-01")), + (2, "val2", as_datetime("2022-02-01")), + ] + ) first_run_id = dest_engine.execute( f"select _dlt_load_id from {schema_rand_prefix}.output limit 1" @@ -859,9 +1019,15 @@ def assert_output_equals(expected): ############################## # we'll run again, we don't expect any changes since the data hasn't changed res = run() - assert_output_equals( - [(1, "val1", as_datetime("2022-01-01")), (2, "val2", as_datetime("2022-02-01"))] - ) + if dest_connection_url.startswith("cratedb://"): + assert_output_equals([(1, "val1", 1640995200000), (2, "val2", 1643673600000)]) + else: + assert_output_equals( + [ + (1, "val1", as_datetime("2022-01-01")), + (2, "val2", as_datetime("2022-02-01")), + ] + ) # we also ensure that the other rows were not touched count_by_run_id = dest_engine.execute( @@ -881,9 +1047,15 @@ def assert_output_equals(expected): source_engine.dispose() run() - assert_output_equals( - [(1, "val1", as_datetime("2022-01-01")), (2, "val2", as_datetime("2022-02-01"))] - ) + if dest_connection_url.startswith("cratedb://"): + assert_output_equals([(1, "val1", 1640995200000), (2, "val2", 1643673600000)]) + else: + assert_output_equals( + [ + (1, "val1", as_datetime("2022-01-01")), + (2, "val2", as_datetime("2022-02-01")), + ] + ) # we also ensure that the other rows were not touched count_by_run_id = dest_engine.execute( @@ -903,9 +1075,15 @@ def assert_output_equals(expected): source_engine.dispose() run() - assert_output_equals( - [(1, "val1", as_datetime("2022-01-01")), (2, "val2", as_datetime("2022-02-01"))] - ) + if dest_connection_url.startswith("cratedb://"): + assert_output_equals([(1, "val1", 1640995200000), (2, "val2", 1643673600000)]) + else: + assert_output_equals( + [ + (1, "val1", as_datetime("2022-01-01")), + (2, "val2", as_datetime("2022-02-01")), + ] + ) # we also ensure that the other rows were not touched count_by_run_id = dest_engine.execute( @@ -925,13 +1103,22 @@ def assert_output_equals(expected): source_engine.dispose() run() - assert_output_equals( - [ - (1, "val1", as_datetime("2022-01-01")), - (2, "val2", as_datetime("2022-02-01")), - (3, "val3", as_datetime("2022-02-02")), - ] - ) + if dest_connection_url.startswith("cratedb://"): + assert_output_equals( + [ + (1, "val1", 1640995200000), + (2, "val2", 1643673600000), + (3, "val3", 1643673600000), + ] + ) + else: + assert_output_equals( + [ + (1, "val1", as_datetime("2022-01-01")), + (2, "val2", as_datetime("2022-02-01")), + (3, "val3", as_datetime("2022-02-02")), + ] + ) # we have a new run that inserted rows to this table, so the run count should be 2 count_by_run_id = dest_engine.execute( @@ -953,13 +1140,22 @@ def assert_output_equals(expected): source_engine.dispose() run() - assert_output_equals( - [ - (1, "val1", as_datetime("2022-01-01")), - (2, "val2_modified", as_datetime("2022-02-03")), - (3, "val3", as_datetime("2022-02-02")), - ] - ) + if dest_connection_url.startswith("cratedb://"): + assert_output_equals( + [ + (1, "val1", 1640995200000), + (2, "val2_modified", 1643673600000), + (3, "val3", 1643673600000), + ] + ) + else: + assert_output_equals( + [ + (1, "val1", as_datetime("2022-01-01")), + (2, "val2_modified", as_datetime("2022-02-03")), + (3, "val3", as_datetime("2022-02-02")), + ] + ) # we have a new run that inserted rows to this table, so the run count should be 2 count_by_run_id = dest_engine.execute( @@ -976,7 +1172,9 @@ def assert_output_equals(expected): def db_to_db_delete_insert_without_primary_key( - source_connection_url: str, dest_connection_url: str + source_connection_url: str, + dest_connection_url: str, + dest_connection_url_read: str, ): schema_rand_prefix = f"testschema_delete_insert_{get_random_string(5)}" try: @@ -984,12 +1182,17 @@ def db_to_db_delete_insert_without_primary_key( except Exception: pass + # CrateDB: Compensate for "Type `date` does not support storage". + updated_at_type = "DATE" + if dest_connection_url.startswith("cratedb://"): + updated_at_type = "TIMESTAMP" + source_engine = sqlalchemy.create_engine(source_connection_url) with source_engine.begin() as conn: conn.execute(f"DROP SCHEMA IF EXISTS {schema_rand_prefix}") conn.execute(f"CREATE SCHEMA {schema_rand_prefix}") conn.execute( - f"CREATE TABLE {schema_rand_prefix}.input (id INTEGER, val VARCHAR(20), updated_at DATE)" + f"CREATE TABLE {schema_rand_prefix}.input (id INTEGER, val VARCHAR(20), updated_at {updated_at_type})" ) conn.execute( f"INSERT INTO {schema_rand_prefix}.input VALUES (1, 'val1', '2022-01-01')" @@ -1019,9 +1222,12 @@ def run(): assert res.exit_code == 0 return res - dest_engine = sqlalchemy.create_engine(dest_connection_url) + dest_engine = sqlalchemy.create_engine(dest_connection_url_read) def get_output_rows(): + # CrateDB needs an explicit flush to make data available for reads immediately. + if dest_engine.dialect.name == "crate": + dest_engine.execute(f"REFRESH TABLE {schema_rand_prefix}.output") results = dest_engine.execute( f"select id, val, updated_at from {schema_rand_prefix}.output order by id asc" ).fetchall() @@ -1035,9 +1241,15 @@ def assert_output_equals(expected): assert res[i] == row run() - assert_output_equals( - [(1, "val1", as_datetime("2022-01-01")), (2, "val2", as_datetime("2022-02-01"))] - ) + if dest_connection_url.startswith("cratedb://"): + assert_output_equals([(1, "val1", 1640995200000), (2, "val2", 1643673600000)]) + else: + assert_output_equals( + [ + (1, "val1", as_datetime("2022-01-01")), + (2, "val2", as_datetime("2022-02-01")), + ] + ) first_run_id = dest_engine.execute( f"select _dlt_load_id from {schema_rand_prefix}.output limit 1" @@ -1047,9 +1259,15 @@ def assert_output_equals(expected): ############################## # we'll run again, since this is a delete+insert, we expect the run ID to change for the last one res = run() - assert_output_equals( - [(1, "val1", as_datetime("2022-01-01")), (2, "val2", as_datetime("2022-02-01"))] - ) + if dest_connection_url.startswith("cratedb://"): + assert_output_equals([(1, "val1", 1640995200000), (2, "val2", 1643673600000)]) + else: + assert_output_equals( + [ + (1, "val1", as_datetime("2022-01-01")), + (2, "val2", as_datetime("2022-02-01")), + ] + ) # we ensure that one of the rows is updated with a new run count_by_run_id = dest_engine.execute( @@ -1071,14 +1289,24 @@ def assert_output_equals(expected): source_engine.dispose() run() - assert_output_equals( - [ - (1, "val1", as_datetime("2022-01-01")), - (2, "val2", as_datetime("2022-02-01")), - (3, "val3", as_datetime("2022-02-01")), - (4, "val4", as_datetime("2022-02-01")), - ] - ) + if dest_connection_url.startswith("cratedb://"): + assert_output_equals( + [ + (1, "val1", 1640995200000), + (2, "val2", 1643673600000), + (3, "val3", 1643673600000), + (4, "val4", 1643673600000), + ] + ) + else: + assert_output_equals( + [ + (1, "val1", as_datetime("2022-01-01")), + (2, "val2", as_datetime("2022-02-01")), + (3, "val3", as_datetime("2022-02-01")), + (4, "val4", as_datetime("2022-02-01")), + ] + ) # the new rows should have a new run ID, there should be 2 distinct runs now count_by_run_id = dest_engine.execute( @@ -1094,7 +1322,9 @@ def assert_output_equals(expected): def db_to_db_delete_insert_with_timerange( - source_connection_url: str, dest_connection_url: str + source_connection_url: str, + dest_connection_url: str, + dest_connection_url_read: str, ): schema_rand_prefix = f"testschema_delete_insert_timerange_{get_random_string(5)}" source_engine = sqlalchemy.create_engine(source_connection_url) @@ -1143,9 +1373,12 @@ def run(start_date: str, end_date: str): assert res.exit_code == 0 return res - dest_engine = sqlalchemy.create_engine(dest_connection_url, poolclass=NullPool) + dest_engine = sqlalchemy.create_engine(dest_connection_url_read, poolclass=NullPool) def get_output_rows(): + # CrateDB needs an explicit flush to make data available for reads immediately. + if dest_engine.dialect.name == "crate": + dest_engine.execute(f"REFRESH TABLE {schema_rand_prefix}.output") if "clickhouse" not in dest_connection_url: dest_engine.execute("CHECKPOINT") rows = dest_engine.execute( @@ -1160,14 +1393,24 @@ def assert_output_equals(expected): assert res[i] == row run("2022-01-01", "2022-01-02") # dlt runs them with the end date exclusive - assert_output_equals( - [ - (1, "val1", as_datetime("2022-01-01")), - (2, "val2", as_datetime("2022-01-01")), - (3, "val3", as_datetime("2022-01-02")), - (4, "val4", as_datetime("2022-01-02")), - ] - ) + if dest_connection_url.startswith("cratedb://"): + assert_output_equals( + [ + (1, "val1", 1640995200000), + (2, "val2", 1640995200000), + (3, "val3", 1643673600000), + (4, "val4", 1643673600000), + ] + ) + else: + assert_output_equals( + [ + (1, "val1", as_datetime("2022-01-01")), + (2, "val2", as_datetime("2022-01-01")), + (3, "val3", as_datetime("2022-01-02")), + (4, "val4", as_datetime("2022-01-02")), + ] + ) first_run_id = dest_engine.execute( f"select _dlt_load_id from {schema_rand_prefix}.output limit 1" @@ -1319,10 +1562,14 @@ def run(): assert res.exit_code == 0 def get_output_table(): - dest_engine = sqlalchemy.create_engine(dest_uri) + dest_uri_read = get_uri_read(dest_uri, dest) + dest_engine = sqlalchemy.create_engine(dest_uri_read) + # CrateDB needs an explicit flush to make data available for reads immediately. + if dest_engine.dialect.name == "crate": + dest_engine.execute("REFRESH TABLE testschema.output") with dest_engine.connect() as conn: res = conn.execute( - "select _kafka__data from testschema.output order by _kafka_msg_id asc" + "select _kafka__data from testschema.output order by _kafka__msg_id asc" ).fetchall() dest_engine.dispose() return res @@ -1331,18 +1578,30 @@ def get_output_table(): res = get_output_table() assert len(res) == 3 - assert res[0] == ("message1",) - assert res[1] == ("message2",) - assert res[2] == ("message3",) + if dest_uri.startswith("cratedb://"): + messages_db = [res[0][0], res[1][0], res[2][0]] + assert "message1" in messages_db + assert "message2" in messages_db + assert "message3" in messages_db + else: + assert res[0] == ("message1",) + assert res[1] == ("message2",) + assert res[2] == ("message3",) # run again, nothing should be inserted into the output table run() res = get_output_table() assert len(res) == 3 - assert res[0] == ("message1",) - assert res[1] == ("message2",) - assert res[2] == ("message3",) + if dest_uri.startswith("cratedb://"): + messages_db = [res[0][0], res[1][0], res[2][0]] + assert "message1" in messages_db + assert "message2" in messages_db + assert "message3" in messages_db + else: + assert res[0] == ("message1",) + assert res[1] == ("message2",) + assert res[2] == ("message3",) # add a new message producer.produce(topic, "message4".encode("utf-8")) @@ -1352,10 +1611,17 @@ def get_output_table(): run() res = get_output_table() assert len(res) == 4 - assert res[0] == ("message1",) - assert res[1] == ("message2",) - assert res[2] == ("message3",) - assert res[3] == ("message4",) + if dest_uri.startswith("cratedb://"): + messages_db = [res[0][0], res[1][0], res[2][0], res[3][0]] + assert "message1" in messages_db + assert "message2" in messages_db + assert "message3" in messages_db + assert "message4" in messages_db + else: + assert res[0] == ("message1",) + assert res[1] == ("message2",) + assert res[2] == ("message3",) + assert res[3] == ("message4",) kafka.stop() @@ -1364,6 +1630,12 @@ def get_output_table(): "dest", list(DESTINATIONS.values()), ids=list(DESTINATIONS.keys()) ) def test_arrow_mmap_to_db_create_replace(dest): + if isinstance(dest, CrateDbDockerImage): + pytest.skip( + "CrateDB type mapping does not support `DATE` yet, " + "see https://github.com/crate-workbench/ingestr/issues/4" + ) + schema = f"testschema_arrow_mmap_create_replace_{get_random_string(5)}" def run_command( @@ -1408,7 +1680,8 @@ def run_command( table = pa.Table.from_pandas(df) run_command(table) - dest_engine = sqlalchemy.create_engine(dest_uri) + dest_uri_read = get_uri_read(dest_uri, dest) + dest_engine = sqlalchemy.create_engine(dest_uri_read) with dest_engine.begin() as conn: res = conn.execute(f"select count(*) from {schema}.output").fetchall() assert res[0][0] == row_count @@ -1473,8 +1746,14 @@ def run_command(df: pd.DataFrame, incremental_key: Optional[str] = None): dest_uri = dest.start() if "clickhouse" in dest_uri: pytest.skip("clickhouse is not supported for this test") + if dest_uri.startswith("cratedb://"): + pytest.skip( + "CrateDB type mapping does not support `DATE` yet, " + "see https://github.com/crate-workbench/ingestr/issues/4" + ) - dest_engine = sqlalchemy.create_engine(dest_uri) + dest_uri_read = get_uri_read(dest_uri, dest) + dest_engine = sqlalchemy.create_engine(dest_uri_read) # let's start with a basic dataframe row_count = 1000 @@ -1577,6 +1856,11 @@ def build_datetime(ds: str): "dest", list(DESTINATIONS.values()), ids=list(DESTINATIONS.keys()) ) def test_arrow_mmap_to_db_merge_without_incremental(dest): + if isinstance(dest, CrateDbDockerImage): + pytest.skip( + "CrateDB type mapping does not support `DATE` yet, " + "see https://github.com/crate-workbench/ingestr/issues/4" + ) schema = f"testschema_arrow_mmap_{get_random_string(5)}" def run_command(df: pd.DataFrame): @@ -1599,7 +1883,9 @@ def run_command(df: pd.DataFrame): return res dest_uri = dest.start() - dest_engine = sqlalchemy.create_engine(dest_uri) + + dest_uri_read = get_uri_read(dest_uri, dest) + dest_engine = sqlalchemy.create_engine(dest_uri_read) # let's start with a basic dataframe row_count = 1000 @@ -1682,6 +1968,13 @@ def run_command(df: pd.DataFrame): ) @pytest.mark.parametrize("source", list(SOURCES.values()), ids=list(SOURCES.keys())) def test_db_to_db_exclude_columns(source, dest): + if isinstance(source.container, SqlServerContainer) and isinstance( + dest, CrateDbDockerImage + ): + pytest.skip( + "CrateDB type mapping does not support `DATE` yet, " + "see https://github.com/crate-workbench/ingestr/issues/4" + ) with ThreadPoolExecutor() as executor: source_future = executor.submit(source.start) dest_future = executor.submit(dest.start) @@ -1690,12 +1983,17 @@ def test_db_to_db_exclude_columns(source, dest): schema_rand_prefix = f"testschema_db_to_db_exclude_columns_{get_random_string(5)}" + # CrateDB: Compensate for "Type `date` does not support storage". + updated_at_type = "DATE" + if dest_uri.startswith("cratedb://"): + updated_at_type = "TIMESTAMP" + source_engine = sqlalchemy.create_engine(source_uri) with source_engine.begin() as conn: conn.execute(f"DROP SCHEMA IF EXISTS {schema_rand_prefix}") conn.execute(f"CREATE SCHEMA {schema_rand_prefix}") conn.execute( - f"CREATE TABLE {schema_rand_prefix}.input (id INTEGER, val VARCHAR(20), updated_at DATE, col_to_exclude1 VARCHAR(20), col_to_exclude2 VARCHAR(20))" + f"CREATE TABLE {schema_rand_prefix}.input (id INTEGER, val VARCHAR(20), updated_at {updated_at_type}, col_to_exclude1 VARCHAR(20), col_to_exclude2 VARCHAR(20))" ) conn.execute( f"INSERT INTO {schema_rand_prefix}.input VALUES (1, 'val1', '2022-01-01', 'col1', 'col2')" @@ -1718,14 +2016,22 @@ def test_db_to_db_exclude_columns(source, dest): assert result.exit_code == 0 - dest_engine = sqlalchemy.create_engine(dest_uri) + dest_uri_read = get_uri_read(dest_uri, dest) + dest_engine = sqlalchemy.create_engine(dest_uri_read) + # CrateDB needs an explicit flush to make data available for reads immediately. + if dest_engine.dialect.name == "crate": + dest_engine.execute(f"REFRESH TABLE {schema_rand_prefix}.output") res = dest_engine.execute( f"select id, val, updated_at from {schema_rand_prefix}.output" ).fetchall() assert len(res) == 2 - assert res[0] == (1, "val1", as_datetime("2022-01-01")) - assert res[1] == (2, "val2", as_datetime("2022-02-01")) + if dest_uri.startswith("cratedb://"): + assert (1, "val1", 1640995200000) in res + assert (2, "val2", 1643673600000) in res + else: + assert res[0] == (1, "val1", as_datetime("2022-01-01")) + assert res[1] == (2, "val2", as_datetime("2022-02-01")) # Verify excluded columns don't exist in destination schema columns = dest_engine.execute( @@ -1982,7 +2288,7 @@ def assert_success(result): traceback.print_exception(*result.exc_info) raise AssertionError(result.exception) - def smoke_test(dest_uri, dynamodb): + def smoke_test(dest_uri, dest_uri_read, dynamodb): dest_table = f"public.dynamodb_{get_random_string(5)}" result = invoke_ingest_command( @@ -1990,15 +2296,29 @@ def smoke_test(dest_uri, dynamodb): ) assert_success(result) + # CrateDB needs an explicit flush to make data available for reads immediately. + if dest_uri.startswith("cratedb://"): + get_query_result(dest_uri_read, f"REFRESH TABLE {dest_table}") + result = get_query_result( - dest_uri, f"select id, updated_at from {dest_table} ORDER BY id" + dest_uri_read, f"select id, updated_at from {dest_table} ORDER BY id" ) assert len(result) == 3 for i in range(len(result)): assert result[i][0] == dynamodb.data[i]["id"] - assert result[i][1] == pendulum.parse(dynamodb.data[i]["updated_at"]) + refval = pendulum.parse(dynamodb.data[i]["updated_at"]) + if dest_uri.startswith("cratedb://"): + assert result[i][1] == refval.int_timestamp * 1000 + else: + assert result[i][1] == refval + + def append_test(dest_uri, dest_uri_read, dynamodb): + if dest_uri.startswith("cratedb://"): + pytest.skip( + "CrateDB support for 'append' strategy pending, " + "see https://github.com/crate-workbench/ingestr/issues/6" + ) - def append_test(dest_uri, dynamodb): dest_table = f"public.dynamodb_{get_random_string(5)}" # we run it twice to assert that the data in destination doesn't change @@ -2013,16 +2333,30 @@ def append_test(dest_uri, dynamodb): ) assert_success(result) + + # CrateDB needs an explicit flush to make data available for reads immediately. + if dest_uri.startswith("cratedb://"): + get_query_result(dest_uri_read, f"REFRESH TABLE {dest_table}") + result = get_query_result( - dest_uri, f"select id, updated_at from {dest_table} ORDER BY id" + dest_uri_read, f"select id, updated_at from {dest_table} ORDER BY id" ) assert len(result) == 3 for i in range(len(result)): assert result[i][0] == dynamodb.data[i]["id"] - assert result[i][1] == pendulum.parse(dynamodb.data[i]["updated_at"]) + refval = pendulum.parse(dynamodb.data[i]["updated_at"]) + if dest_uri.startswith("cratedb://"): + assert result[i][1] == refval.int_timestamp * 1000 + else: + assert result[i][1] == refval def incremental_test_factory(strategy): - def incremental_test(dest_uri, dynamodb): + def incremental_test(dest_uri, dest_uri_read, dynamodb): + if dest_uri.startswith("cratedb://") and strategy != "replace": + pytest.skip( + "CrateDB support for 'merge' strategy pending, " + "see https://github.com/crate/dlt-cratedb/issues/14" + ) dest_table = f"public.dynamodb_{get_random_string(5)}" result = invoke_ingest_command( @@ -2036,13 +2370,22 @@ def incremental_test(dest_uri, dynamodb): interval_end="2024-02-01T00:01:00", # upto the second entry ) assert_success(result) + + # CrateDB needs an explicit flush to make data available for reads immediately. + if dest_uri.startswith("cratedb://"): + get_query_result(dest_uri_read, f"REFRESH TABLE {dest_table}") + rows = get_query_result( - dest_uri, f"select id, updated_at from {dest_table} ORDER BY id" + dest_uri_read, f"select id, updated_at from {dest_table} ORDER BY id" ) assert len(rows) == 2 for i in range(len(rows)): assert rows[i][0] == dynamodb.data[i]["id"] - assert rows[i][1] == pendulum.parse(dynamodb.data[i]["updated_at"]) + refval = pendulum.parse(dynamodb.data[i]["updated_at"]) + if dest_uri.startswith("cratedb://"): + assert rows[i][1] == refval.int_timestamp * 1000 + else: + assert rows[i][1] == refval # ingest the rest # run it twice to test idempotency @@ -2058,8 +2401,16 @@ def incremental_test(dest_uri, dynamodb): ) assert_success(result) + # CrateDB needs an explicit flush to make data available for reads immediately. + dest_engine = sqlalchemy.create_engine( + dest_uri_read, poolclass=NullPool + ) + if dest_engine.dialect.name == "crate": + dest_engine.execute(f"REFRESH TABLE {dest_table}") + rows = get_query_result( - dest_uri, f"select id, updated_at from {dest_table} ORDER BY id" + dest_uri_read, + f"select id, updated_at from {dest_table} ORDER BY id", ) rows_expected = 3 if strategy == "replace": @@ -2070,7 +2421,12 @@ def incremental_test(dest_uri, dynamodb): for row in rows: id = int(row[0]) - 1 assert row[0] == dynamodb.data[id]["id"] - assert row[1] == pendulum.parse(dynamodb.data[id]["updated_at"]) + + refval = pendulum.parse(dynamodb.data[id]["updated_at"]) + if dest_uri.startswith("cratedb://"): + assert row[1] == refval.int_timestamp * 1000 + else: + assert row[1] == refval # for easier debugging incremental_test.__name__ += f"_{strategy}" @@ -2095,7 +2451,9 @@ def incremental_test(dest_uri, dynamodb): ) @pytest.mark.parametrize("testcase", dynamodb_tests()) def test_dynamodb(dest, dynamodb, testcase): - testcase(dest.start(), dynamodb) + dest_uri = dest.start() + dest_uri_read = get_uri_read(dest_uri, dest) + testcase(dest_uri, dest_uri_read, dynamodb) dest.stop() @@ -2108,7 +2466,24 @@ def get_query_result(uri: str, query: str): def custom_query_tests(): - def replace(source_connection_url, dest_connection_url): + def replace( + source_connection_url, + dest_connection_url, + dest_connection_url_read: str, + ): + if source_connection_url.startswith( + "mssql://" + ) and dest_connection_url.startswith("cratedb://"): + pytest.skip( + "CrateDB type mapping does not support `DATE` yet, " + "see https://github.com/crate-workbench/ingestr/issues/4" + ) + + # CrateDB: Compensate for "Type `date` does not support storage". + updated_at_type = "DATE" + if dest_connection_url.startswith("cratedb://"): + updated_at_type = "TIMESTAMP" + schema = f"testschema_cr_cust_{get_random_string(5)}" with sqlalchemy.create_engine( source_connection_url, poolclass=NullPool @@ -2116,7 +2491,7 @@ def replace(source_connection_url, dest_connection_url): conn.execute(f"DROP SCHEMA IF EXISTS {schema}") conn.execute(f"CREATE SCHEMA {schema}") conn.execute( - f"CREATE TABLE {schema}.orders (id INTEGER, name VARCHAR(255) NOT NULL, updated_at DATE)" + f"CREATE TABLE {schema}.orders (id INTEGER, name VARCHAR(255) NOT NULL, updated_at {updated_at_type})" ) conn.execute( f"CREATE TABLE {schema}.order_items (id INTEGER, order_id INTEGER NOT NULL, subname VARCHAR(255) NOT NULL)" @@ -2147,18 +2522,47 @@ def replace(source_connection_url, dest_connection_url): assert result.exit_code == 0 + # CrateDB needs an explicit flush to make data available for reads immediately. + if dest_connection_url.startswith("cratedb://"): + get_query_result(dest_connection_url_read, f"REFRESH TABLE {schema}.output") + res = get_query_result( - dest_connection_url, + dest_connection_url_read, f"select id, order_id, subname, updated_at from {schema}.output order by id asc", ) assert len(res) == 4 - assert res[0] == (1, 1, "Item 1 for First Order", as_datetime("2024-01-01")) - assert res[1] == (2, 1, "Item 2 for First Order", as_datetime("2024-01-01")) - assert res[2] == (3, 2, "Item 1 for Second Order", as_datetime("2024-01-01")) - assert res[3] == (4, 3, "Item 1 for Third Order", as_datetime("2024-01-01")) + if dest_connection_url.startswith("cratedb://"): + assert (1, 1, "Item 1 for First Order", 1704067200000) in res + assert (2, 1, "Item 2 for First Order", 1704067200000) in res + assert (3, 2, "Item 1 for Second Order", 1704067200000) in res + assert (4, 3, "Item 1 for Third Order", 1704067200000) in res + else: + assert res[0] == (1, 1, "Item 1 for First Order", as_datetime("2024-01-01")) + assert res[1] == (2, 1, "Item 2 for First Order", as_datetime("2024-01-01")) + assert res[2] == ( + 3, + 2, + "Item 1 for Second Order", + as_datetime("2024-01-01"), + ) + assert res[3] == (4, 3, "Item 1 for Third Order", as_datetime("2024-01-01")) + + def merge( + source_connection_url, + dest_connection_url, + dest_connection_url_read: str, + ): + if dest_connection_url.startswith("cratedb://"): + pytest.skip( + "CrateDB support for 'merge' strategy pending, " + "see https://github.com/crate/dlt-cratedb/issues/14" + ) - def merge(source_connection_url, dest_connection_url): + # CrateDB: Compensate for "Type `date` does not support storage". + updated_at_type = "DATE" + if dest_connection_url.startswith("cratedb://"): + updated_at_type = "TIMESTAMP" schema = f"testschema_merge_cust_{get_random_string(5)}" source_engine = sqlalchemy.create_engine( source_connection_url, poolclass=NullPool @@ -2167,7 +2571,7 @@ def merge(source_connection_url, dest_connection_url): conn.execute(f"DROP SCHEMA IF EXISTS {schema}") conn.execute(f"CREATE SCHEMA {schema}") conn.execute( - f"CREATE TABLE {schema}.orders (id INTEGER, name VARCHAR(255) NOT NULL, updated_at DATE)" + f"CREATE TABLE {schema}.orders (id INTEGER, name VARCHAR(255) NOT NULL, updated_at {updated_at_type})" ) conn.execute( f"CREATE TABLE {schema}.order_items (id INTEGER, order_id INTEGER NOT NULL, subname VARCHAR(255) NOT NULL)" @@ -2200,8 +2604,12 @@ def run(): # Initial run to get all data run() + # CrateDB needs an explicit flush to make data available for reads immediately. + if dest_connection_url.startswith("cratedb://"): + get_query_result(dest_connection_url_read, f"REFRESH TABLE {schema}.output") + res = get_query_result( - dest_connection_url, + dest_connection_url_read, f"select id, order_id, subname, updated_at, _dlt_load_id from {schema}.output order by id asc", ) @@ -2239,8 +2647,13 @@ def run(): # Run again - should get same load_id since no changes run() + + # CrateDB needs an explicit flush to make data available for reads immediately. + if dest_connection_url.startswith("cratedb://"): + get_query_result(dest_connection_url_read, f"REFRESH TABLE {schema}.output") + res = get_query_result( - dest_connection_url, + dest_connection_url_read, f"select id, order_id, subname, updated_at, _dlt_load_id from {schema}.output order by id asc", ) assert len(res) == 4 @@ -2257,8 +2670,13 @@ def run(): # Run again - should see updated data with new load_id run() + + # CrateDB needs an explicit flush to make data available for reads immediately. + if dest_connection_url.startswith("cratedb://"): + get_query_result(dest_connection_url_read, f"REFRESH TABLE {schema}.output") + res = get_query_result( - dest_connection_url, + dest_connection_url_read, f"select id, order_id, subname, updated_at, _dlt_load_id from {schema}.output order by id asc", ) @@ -2309,7 +2727,8 @@ def test_custom_query(testcase, source, dest): dest_future = executor.submit(dest.start) source_uri = source_future.result() dest_uri = dest_future.result() - testcase(source_uri, dest_uri) + dest_uri_read = get_uri_read(dest_uri, dest) + testcase(source_uri, dest_uri, dest_uri_read) source.stop() dest.stop() @@ -2320,6 +2739,7 @@ def test_custom_query(testcase, source, dest): ) def test_github_to_duckdb(dest): dest_uri = dest.start() + dest_uri_read = get_uri_read(dest_uri, dest) source_uri = "github://?owner=bruin-data&repo=ingestr" source_table = "repo_events" @@ -2327,7 +2747,11 @@ def test_github_to_duckdb(dest): res = invoke_ingest_command(source_uri, source_table, dest_uri, dest_table) assert res.exit_code == 0 - dest_engine = sqlalchemy.create_engine(dest_uri, poolclass=NullPool) + + dest_engine = sqlalchemy.create_engine(dest_uri_read, poolclass=NullPool) + # CrateDB needs an explicit flush to make data available for reads immediately. + if dest_engine.dialect.name == "crate": + dest_engine.execute(f"REFRESH TABLE {dest_table}") res = dest_engine.execute(f"select count(*) from {dest_table}").fetchall() dest_engine.dispose() assert len(res) > 0 @@ -2361,7 +2785,7 @@ def create_mock_response(data: str) -> requests.Response: res.raw = buffer return res - def test_no_report_instances_found(dest_uri): + def test_no_report_instances_found(dest_uri, dest_uri_read): """ When there are no report instances for the given date range, NoReportsError should be raised. @@ -2428,7 +2852,7 @@ def test_no_report_instances_found(dest_uri): ) assert has_exception(result.exception, NoReportsFoundError) - def test_no_ongoing_reports_found(dest_uri): + def test_no_ongoing_reports_found(dest_uri, dest_uri_read): """ when there are no ongoing reports, or ongoing reports that have been stopped due to inactivity, NoOngoingReportRequestsFoundError should be raised. @@ -2471,7 +2895,7 @@ def test_no_ongoing_reports_found(dest_uri): ) assert has_exception(result.exception, NoOngoingReportRequestsFoundError) - def test_no_such_report(dest_uri): + def test_no_such_report(dest_uri, dest_uri_read): """ when there is no report with the given name, NoSuchReportError should be raised. """ @@ -2514,10 +2938,17 @@ def test_no_such_report(dest_uri): ) assert has_exception(result.exception, NoSuchReportError) - def test_successful_ingestion(dest_uri): + def test_successful_ingestion(dest_uri, dest_uri_read): """ When there are report instances for the given date range, the data should be ingested """ + + if dest_uri.startswith("cratedb://"): + pytest.skip( + "CrateDB type mapping does not support `DATE` yet, " + "see https://github.com/crate-workbench/ingestr/issues/4" + ) + client = MagicMock() client.list_analytics_report_requests = MagicMock( return_value=AnalyticsReportRequestsResponse( @@ -2603,17 +3034,26 @@ def test_successful_ingestion(dest_uri): assert result.exit_code == 0 - dest_engine = sqlalchemy.create_engine(dest_uri) + dest_engine = sqlalchemy.create_engine(dest_uri_read) + # CrateDB needs an explicit flush to make data available for reads immediately. + if dest_engine.dialect.name == "crate": + dest_engine.execute(f"REFRESH TABLE {dest_table}") count = dest_engine.execute(f"select count(*) from {dest_table}").fetchone()[0] dest_engine.dispose() assert count == 3 - def test_incremental_ingestion(dest_uri): + def test_incremental_ingestion(dest_uri, dest_uri_read): """ when the pipeline is run till a specific end date, the next ingestion should load data from the last processing date, given that last_date is not provided """ + if dest_uri.startswith("cratedb://"): + pytest.skip( + "CrateDB type mapping does not support `DATE` yet, " + "see https://github.com/crate-workbench/ingestr/issues/4" + ) + client = MagicMock() client.list_analytics_report_requests = MagicMock( return_value=AnalyticsReportRequestsResponse( @@ -2705,7 +3145,10 @@ def test_incremental_ingestion(dest_uri): assert result.exit_code == 0 - dest_engine = sqlalchemy.create_engine(dest_uri) + dest_engine = sqlalchemy.create_engine(dest_uri_read) + # CrateDB needs an explicit flush to make data available for reads immediately. + if dest_engine.dialect.name == "crate": + dest_engine.execute(f"REFRESH TABLE {dest_table}") count = dest_engine.execute(f"select count(*) from {dest_table}").fetchone()[0] dest_engine.dispose() assert count == 3 @@ -2731,7 +3174,10 @@ def test_incremental_ingestion(dest_uri): assert result.exit_code == 0 - dest_engine = sqlalchemy.create_engine(dest_uri) + dest_engine = sqlalchemy.create_engine(dest_uri_read) + # CrateDB needs an explicit flush to make data available for reads immediately. + if dest_engine.dialect.name == "crate": + dest_engine.execute(f"REFRESH TABLE {dest_table}") count = dest_engine.execute(f"select count(*) from {dest_table}").fetchone()[0] assert count == 6 assert ( @@ -2758,7 +3204,9 @@ def test_incremental_ingestion(dest_uri): ) @pytest.mark.parametrize("test_case", appstore_test_cases()) def test_appstore(dest, test_case): - test_case(dest.start()) + dest_uri = dest.start() + dest_uri_read = get_uri_read(dest_uri, dest) + test_case(dest_uri, dest_uri_read) dest.stop() @@ -2829,12 +3277,15 @@ def glob_files_override(fs_client, _, file_glob): def assert_rows(dest_uri, dest_table, n): engine = sqlalchemy.create_engine(dest_uri) with engine.connect() as conn: + # CrateDB needs an explicit flush to make data available for reads immediately. + if engine.dialect.name == "crate": + conn.execute(f"REFRESH TABLE {dest_table}") rows = conn.execute(f"select count(*) from {dest_table}").fetchall() assert len(rows) == 1 assert rows[0] == (n,) engine.dispose() - def test_empty_source_uri(dest_uri): + def test_empty_source_uri(dest_uri, dest_uri_read): """ When the source URI is empty, an error should be raised. """ @@ -2848,7 +3299,7 @@ def test_empty_source_uri(dest_uri): ) assert has_exception(result.exception, InvalidBlobTableError) - def test_unsupported_file_format(dest_uri): + def test_unsupported_file_format(dest_uri, dest_uri_read): """ When the source file is not one of [csv, parquet, jsonl] it should raise an exception @@ -2869,7 +3320,7 @@ def test_unsupported_file_format(dest_uri): assert result.exit_code != 0 assert has_exception(result.exception, ValueError) - def test_missing_credentials(dest_uri): + def test_missing_credentials(dest_uri, dest_uri_read): """ When the credentials are missing, an error should be raised. """ @@ -2884,7 +3335,7 @@ def test_missing_credentials(dest_uri): ) assert result.exit_code != 0 - def test_csv_load(dest_uri): + def test_csv_load(dest_uri, dest_uri_read): """ When the source URI is a CSV file, the data should be ingested. """ @@ -2902,9 +3353,9 @@ def test_csv_load(dest_uri): dest_table, ) assert result.exit_code == 0 - assert_rows(dest_uri, dest_table, 5) + assert_rows(dest_uri_read, dest_table, 5) - def test_csv_gz_load(dest_uri): + def test_csv_gz_load(dest_uri, dest_uri_read): """When the source URI is a gzipped CSV file, the data should be ingested.""" with ( patch(target_fs) as target_fs_mock, @@ -2920,9 +3371,9 @@ def test_csv_gz_load(dest_uri): dest_table, ) assert result.exit_code == 0 - assert_rows(dest_uri, dest_table, 5) + assert_rows(dest_uri_read, dest_table, 5) - def test_parquet_load(dest_uri): + def test_parquet_load(dest_uri, dest_uri_read): """ When the source URI is a Parquet file, the data should be ingested. """ @@ -2940,9 +3391,9 @@ def test_parquet_load(dest_uri): dest_table, ) assert result.exit_code == 0 - assert_rows(dest_uri, dest_table, 5) + assert_rows(dest_uri_read, dest_table, 5) - def test_parquet_gz_load(dest_uri): + def test_parquet_gz_load(dest_uri, dest_uri_read): """When the source URI is a gzipped Parquet file, the data should be ingested.""" with ( patch(target_fs) as target_fs_mock, @@ -2958,9 +3409,9 @@ def test_parquet_gz_load(dest_uri): dest_table, ) assert result.exit_code == 0 - assert_rows(dest_uri, dest_table, 5) + assert_rows(dest_uri_read, dest_table, 5) - def test_jsonl_load(dest_uri): + def test_jsonl_load(dest_uri, dest_uri_read): """ When the source URI is a JSONL file, the data should be ingested. """ @@ -2978,9 +3429,9 @@ def test_jsonl_load(dest_uri): dest_table, ) assert result.exit_code == 0 - assert_rows(dest_uri, dest_table, 5) + assert_rows(dest_uri_read, dest_table, 5) - def test_jsonl_gz_load(dest_uri): + def test_jsonl_gz_load(dest_uri, dest_uri_read): """When the source URI is a gzipped JSONL file, the data should be ingested.""" with ( patch(target_fs) as target_fs_mock, @@ -2996,9 +3447,9 @@ def test_jsonl_gz_load(dest_uri): dest_table, ) assert result.exit_code == 0 - assert_rows(dest_uri, dest_table, 5) + assert_rows(dest_uri_read, dest_table, 5) - def test_glob_load(dest_uri): + def test_glob_load(dest_uri, dest_uri_read): """ When the source URI is a glob pattern, all files matching the pattern should be ingested """ @@ -3016,9 +3467,9 @@ def test_glob_load(dest_uri): dest_table, ) assert result.exit_code == 0 - assert_rows(dest_uri, dest_table, 6) + assert_rows(dest_uri_read, dest_table, 6) - def test_compound_table_name(dest_uri): + def test_compound_table_name(dest_uri, dest_uri_read): """ When table contains both the bucket name and the file glob, loads should be successful. @@ -3037,9 +3488,9 @@ def test_compound_table_name(dest_uri): dest_table, ) assert result.exit_code == 0 - assert_rows(dest_uri, dest_table, 6) + assert_rows(dest_uri_read, dest_table, 6) - def test_uri_precedence(dest_uri): + def test_uri_precedence(dest_uri, dest_uri_read): """ When file glob is present in both URI and Source Table, the URI glob should be used @@ -3059,7 +3510,7 @@ def test_uri_precedence(dest_uri): dest_table, ) assert result.exit_code == 0 - assert_rows(dest_uri, dest_table, 6) + assert_rows(dest_uri_read, dest_table, 6) return [ test_empty_source_uri, @@ -3089,7 +3540,9 @@ def test_uri_precedence(dest_uri): ), ) def test_gcs(dest, test_case): - test_case(dest.start()) + dest_uri = dest.start() + dest_uri_read = get_uri_read(dest_uri, dest) + test_case(dest_uri, dest_uri_read) dest.stop() @@ -3105,7 +3558,9 @@ def test_gcs(dest, test_case): ), ) def test_s3(dest, test_case): - test_case(dest.start()) + dest_uri = dest.start() + dest_uri_read = get_uri_read(dest_uri, dest) + test_case(dest_uri, dest_uri_read) dest.stop() @@ -3144,7 +3599,7 @@ def test_applovin_source(testcase): def frankfurter_test_cases() -> Iterable[Callable]: - def invalid_source_table(dest_uri): + def invalid_source_table(dest_uri, dest_uri_read): schema = f"testschema_frankfurter_{get_random_string(5)}" dest_table = f"{schema}.frankfurter_{get_random_string(5)}" result = invoke_ingest_command( @@ -3156,7 +3611,7 @@ def invalid_source_table(dest_uri): assert result.exit_code != 0 assert has_exception(result.exception, UnsupportedResourceError) - def interval_start_does_not_exceed_interval_end(dest_uri): + def interval_start_does_not_exceed_interval_end(dest_uri, dest_uri_read): schema = f"testschema_frankfurter_{get_random_string(5)}" dest_table = f"{schema}.frankfurter_{get_random_string(5)}" result = invoke_ingest_command( @@ -3171,7 +3626,12 @@ def interval_start_does_not_exceed_interval_end(dest_uri): assert has_exception(result.exception, ValueError) assert "Interval-end cannot be before interval-start." in str(result.exception) - def interval_start_can_equal_interval_end(dest_uri): + def interval_start_can_equal_interval_end(dest_uri, dest_uri_read): + if dest_uri.startswith("cratedb://"): + pytest.skip( + "CrateDB support for 'merge' strategy pending, " + "see https://github.com/crate/dlt-cratedb/issues/14" + ) schema = f"testschema_frankfurter_{get_random_string(5)}" dest_table = f"{schema}.frankfurter_{get_random_string(5)}" result = invoke_ingest_command( @@ -3184,7 +3644,7 @@ def interval_start_can_equal_interval_end(dest_uri): ) assert result.exit_code == 0 - def interval_start_does_not_exceed_current_date(dest_uri): + def interval_start_does_not_exceed_current_date(dest_uri, dest_uri_read): schema = f"testschema_frankfurter_{get_random_string(5)}" dest_table = f"{schema}.frankfurter_{get_random_string(5)}" start_date = pendulum.now().add(days=1).format("YYYY-MM-DD") @@ -3199,7 +3659,7 @@ def interval_start_does_not_exceed_current_date(dest_uri): assert has_exception(result.exception, ValueError) assert "Interval-start cannot be in the future." in str(result.exception) - def interval_end_does_not_exceed_current_date(dest_uri): + def interval_end_does_not_exceed_current_date(dest_uri, dest_uri_read): schema = f"testschema_frankfurter_{get_random_string(5)}" dest_table = f"{schema}.frankfurter_{get_random_string(5)}" start_date = pendulum.now().subtract(days=1).format("YYYY-MM-DD") @@ -3216,7 +3676,12 @@ def interval_end_does_not_exceed_current_date(dest_uri): assert has_exception(result.exception, ValueError) assert "Interval-end cannot be in the future." in str(result.exception) - def exchange_rate_on_specific_date(dest_uri): + def exchange_rate_on_specific_date(dest_uri, dest_uri_read): + if dest_uri.startswith("cratedb://"): + pytest.skip( + "CrateDB support for 'merge' strategy pending, " + "see https://github.com/crate/dlt-cratedb/issues/14" + ) schema = f"testschema_frankfurter_{get_random_string(5)}" dest_table = f"{schema}.frankfurter_{get_random_string(5)}" start_date = "2025-01-03" @@ -3231,10 +3696,12 @@ def exchange_rate_on_specific_date(dest_uri): ) assert result.exit_code == 0, f"Ingestion failed: {result.output}" - dest_engine = sqlalchemy.create_engine(dest_uri) + dest_engine = sqlalchemy.create_engine(dest_uri_read) query = f"SELECT rate FROM {dest_table} WHERE currency_code = 'GBP'" with dest_engine.connect() as conn: + if dest_engine.dialect.name == "crate": + conn.execute(f"REFRESH TABLE {dest_table}") rows = conn.execute(query).fetchall() dest_engine.dispose() @@ -3259,7 +3726,9 @@ def exchange_rate_on_specific_date(dest_uri): ) @pytest.mark.parametrize("test_case", frankfurter_test_cases()) def test_frankfurter(dest, test_case): - test_case(dest.start()) + dest_uri = dest.start() + dest_uri_read = get_uri_read(dest_uri, dest) + test_case(dest_uri, dest_uri_read) dest.stop() @@ -3329,7 +3798,11 @@ def test_mysql_zero_dates(source, dest): assert result.exit_code == 0 - dest_engine = sqlalchemy.create_engine(dest_uri) + dest_uri_read = get_uri_read(dest_uri, dest) + dest_engine = sqlalchemy.create_engine(dest_uri_read) + # CrateDB needs an explicit flush to make data available for reads immediately. + if dest_engine.dialect.name == "crate": + dest_engine.execute(f"REFRESH TABLE {schema_rand_prefix}.output") res = dest_engine.execute(f"select * from {schema_rand_prefix}.output").fetchall() dest_engine.dispose() @@ -3352,11 +3825,18 @@ def test_mysql_zero_dates(source, dest): ] assert len(res) == 5 - assert res[0] == ("Row 1", "1970-01-01 00:00:00") - assert res[1] == ("Row 2", "2024-01-01 12:00:00") - assert res[2] == ("Row 3", "1970-01-01 00:00:00") - assert res[3] == ("Row 4", "2025-04-05 08:30:00") - assert res[4] == ("Row 5", "1970-01-01 00:00:00") + if dest_uri.startswith("cratedb://"): + assert ("Row 1", 0) in res + assert ("Row 2", 1704110400000) in res + assert ("Row 3", 0) in res + assert ("Row 4", 1743841800000) in res + assert ("Row 5", 0) in res + else: + assert res[0] == ("Row 1", "1970-01-01 00:00:00") + assert res[1] == ("Row 2", "2024-01-01 12:00:00") + assert res[2] == ("Row 3", "1970-01-01 00:00:00") + assert res[3] == ("Row 4", "2025-04-05 08:30:00") + assert res[4] == ("Row 5", "1970-01-01 00:00:00") # Clean up source.stop() @@ -3368,7 +3848,7 @@ def appsflyer_test_cases(): "INGESTR_TEST_APPSFLYER_TOKEN", "" ) - def creatives(dest_uri: str): + def creatives(dest_uri: str, dest_uri_read: str): schema_rand_prefix = f"testschema_appsflyer_{get_random_string(5)}" result = invoke_ingest_command( source_uri, @@ -3381,7 +3861,10 @@ def creatives(dest_uri: str): ) assert result.exit_code == 0 - with sqlalchemy.create_engine(dest_uri).connect() as conn: + with sqlalchemy.create_engine(dest_uri_read).connect() as conn: + # CrateDB needs an explicit flush to make data available for reads immediately. + if conn.dialect.name == "crate": + conn.execute(f"REFRESH TABLE {schema_rand_prefix}.creatives") res = conn.execute( f"select * from {schema_rand_prefix}.creatives" ).fetchall() @@ -3414,7 +3897,7 @@ def creatives(dest_uri: str): ] assert sorted(columns) == sorted(expected_columns) - def campaigns(dest_uri: str): + def campaigns(dest_uri: str, dest_uri_read: str): schema_rand_prefix = f"testschema_appsflyer_{get_random_string(5)}" result = invoke_ingest_command( source_uri, @@ -3427,7 +3910,10 @@ def campaigns(dest_uri: str): ) assert result.exit_code == 0 - with sqlalchemy.create_engine(dest_uri).connect() as conn: + with sqlalchemy.create_engine(dest_uri_read).connect() as conn: + # CrateDB needs an explicit flush to make data available for reads immediately. + if conn.dialect.name == "crate": + conn.execute(f"REFRESH TABLE {schema_rand_prefix}.campaigns") res = conn.execute( f"select * from {schema_rand_prefix}.campaigns" ).fetchall() @@ -3468,7 +3954,7 @@ def campaigns(dest_uri: str): ] assert sorted(columns) == sorted(expected_columns) - def custom(dest_uri: str): + def custom(dest_uri: str, dest_uri_read: str): schema_rand_prefix = f"testschema_appsflyer_{get_random_string(5)}" result = invoke_ingest_command( source_uri, @@ -3481,7 +3967,10 @@ def custom(dest_uri: str): ) assert result.exit_code == 0 - with sqlalchemy.create_engine(dest_uri).connect() as conn: + with sqlalchemy.create_engine(dest_uri_read).connect() as conn: + # CrateDB needs an explicit flush to make data available for reads immediately. + if conn.dialect.name == "crate": + conn.execute(f"REFRESH TABLE {schema_rand_prefix}.custom") res = conn.execute(f"select * from {schema_rand_prefix}.custom").fetchall() assert len(res) > 0 columns = [ @@ -3519,12 +4008,14 @@ def custom(dest_uri: str): "dest", list(DESTINATIONS.values()), ids=list(DESTINATIONS.keys()) ) def test_appsflyer_source(testcase, dest): - testcase(dest.start()) + dest_uri = dest.start() + dest_uri_read = get_uri_read(dest_uri, dest) + testcase(dest_uri, dest_uri_read) dest.stop() def airtable_test_cases(): - def table_with_base_id(dest_uri: str): + def table_with_base_id(dest_uri: str, dest_uri_read: str): source_uri = "airtable://?access_token=" + os.environ.get( "INGESTR_TEST_AIRTABLE_TOKEN", "" ) @@ -3543,7 +4034,10 @@ def table_with_base_id(dest_uri: str): assert result.exit_code == 0 - with sqlalchemy.create_engine(dest_uri).connect() as conn: + with sqlalchemy.create_engine(dest_uri_read).connect() as conn: + # CrateDB needs an explicit flush to make data available for reads immediately. + if conn.dialect.name == "crate": + conn.execute(f"REFRESH TABLE {dest_table}") res = conn.execute(f"select count(*) from {dest_table}").fetchall() assert len(res) > 0 assert res[0][0] > 0 @@ -3561,7 +4055,9 @@ def table_with_base_id(dest_uri: str): "dest", list(DESTINATIONS.values()), ids=list(DESTINATIONS.keys()) ) def test_airtable_source(testcase, dest): - testcase(dest.start()) + dest_uri = dest.start() + dest_uri_read = get_uri_read(dest_uri, dest) + testcase(dest_uri, dest_uri_read) dest.stop() @@ -3575,6 +4071,12 @@ def pp(x): "dest", list(DESTINATIONS.values()), ids=list(DESTINATIONS.keys()) ) def test_mongodb_source(dest): + if isinstance(dest, CrateDbDockerImage): + pytest.skip( + "CrateDB is not supported for this test, " + "see https://github.com/crate-workbench/ingestr/issues/5" + ) + mongo = MongoDbContainer("mongo:7.0.7") mongo.start() @@ -3641,6 +4143,7 @@ def test_mongodb_source(dest): ) dest_uri = dest.start() + dest_uri_read = get_uri_read(dest_uri, dest) try: invoke_ingest_command( @@ -3650,7 +4153,10 @@ def test_mongodb_source(dest): "raw.test_collection", ) - with sqlalchemy.create_engine(dest_uri).connect() as conn: + with sqlalchemy.create_engine(dest_uri_read).connect() as conn: + # CrateDB needs an explicit flush to make data available for reads immediately. + if conn.dialect.name == "crate": + conn.execute("REFRESH TABLE raw.test_collection") res = conn.execute( "select id, name, nested_parent__key1, nested_parent__key2, nested_parent__key3, key4, value from raw.test_collection" ).fetchall() @@ -3818,7 +4324,13 @@ def test_stripe_source_incremental(stripe_table): pass -def trustpilot_test_case(dest_uri): +def trustpilot_test_case(dest_uri, dest_uri_read): + if dest_uri.startswith("cratedb://"): + pytest.skip( + "CrateDB support for 'merge' strategy pending, " + "see https://github.com/crate/dlt-cratedb/issues/14" + ) + sample_response = { "links": [ { @@ -3926,8 +4438,11 @@ def trustpilot_test_case(dest_uri): assert result.exit_code == 0 - engine = sqlalchemy.create_engine(dest_uri) + engine = sqlalchemy.create_engine(dest_uri_read) with engine.connect() as conn: + # CrateDB needs an explicit flush to make data available for reads immediately. + if conn.dialect.name == "crate": + conn.execute(f"REFRESH TABLE {dest_table}") rows = conn.execute(f"SELECT * FROM {dest_table}").fetchall() assert len(rows) > 0, "No data ingested into the destination" engine.dispose() @@ -3937,11 +4452,13 @@ def trustpilot_test_case(dest_uri): "dest", list(DESTINATIONS.values()), ids=list(DESTINATIONS.keys()) ) def test_trustpilot(dest): - trustpilot_test_case(dest.start()) + dest_uri = dest.start() + dest_uri_read = get_uri_read(dest_uri, dest) + trustpilot_test_case(dest_uri, dest_uri_read) dest.stop() -def pinterest_test_case(dest_uri): +def pinterest_test_case(dest_uri, dest_uri_read): sample_response = { "items": [ { @@ -4027,8 +4544,11 @@ def pinterest_test_case(dest_uri): assert result.exit_code == 0 - engine = sqlalchemy.create_engine(dest_uri) + engine = sqlalchemy.create_engine(dest_uri_read) with engine.connect() as conn: + # CrateDB needs an explicit flush to make data available for reads immediately. + if conn.dialect.name == "crate": + conn.execute(f"REFRESH TABLE {dest_table}") rows = conn.execute(f"SELECT * FROM {dest_table}").fetchall() assert len(rows) > 0, "No data ingested into the destination" engine.dispose() @@ -4038,5 +4558,12 @@ def pinterest_test_case(dest_uri): "dest", list(DESTINATIONS.values()), ids=list(DESTINATIONS.keys()) ) def test_pinterest_test_case(dest): - pinterest_test_case(dest.start()) + if isinstance(dest, CrateDbDockerImage): + pytest.skip( + "CrateDB support for 'merge' strategy pending, " + "see https://github.com/crate/dlt-cratedb/issues/14" + ) + dest_uri = dest.start() + dest_uri_read = get_uri_read(dest_uri, dest) + pinterest_test_case(dest_uri, dest_uri_read) dest.stop() diff --git a/ingestr/src/destinations_test.py b/ingestr/src/destinations_test.py index f04dad619..992d1b20f 100644 --- a/ingestr/src/destinations_test.py +++ b/ingestr/src/destinations_test.py @@ -4,9 +4,11 @@ import dlt import pytest +from sqlglot.helper import classproperty from ingestr.src.destinations import ( BigQueryDestination, + CrateDBDestination, DatabricksDestination, DuckDBDestination, MsSQLDestination, @@ -111,3 +113,11 @@ class MsSQLDestinationTest(unittest.TestCase, GenericSqlDestinationFixture): class DatabricksDestinationTest(unittest.TestCase, GenericSqlDestinationFixture): destination = DatabricksDestination() expected_class = dlt.destinations.databricks + + +class CrateDBDestinationTest(unittest.TestCase, GenericSqlDestinationFixture): + destination = CrateDBDestination() + + @classproperty + def expected_class(cls): + return dlt.destinations.cratedb # type: ignore[attr-defined] diff --git a/ingestr/src/kafka/helpers.py b/ingestr/src/kafka/helpers.py index 7a42a0619..a04528db7 100644 --- a/ingestr/src/kafka/helpers.py +++ b/ingestr/src/kafka/helpers.py @@ -46,7 +46,7 @@ def default_msg_processor(msg: Message) -> Dict[str, Any]: }, "data": msg.value().decode("utf-8"), }, - "_kafka_msg_id": digest128(topic + str(partition) + str(key)), + "_kafka__msg_id": digest128(topic + str(partition) + str(key)), } diff --git a/requirements-dev.txt b/requirements-dev.txt index f3d80f433..de40fe80c 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,5 +1,6 @@ -r requirements.txt +cratedb-toolkit[testing]==0.0.37 mypy==1.15.0 pytest-cov==4.1.0 pytest==8.3.3 @@ -11,4 +12,5 @@ twine==6.0.1 testcontainers[postgres,mysql]==4.8.2 pytest-xdist[psutil]==3.6.1 pkginfo==1.12.0 -pytest-repeat==0.9.3 \ No newline at end of file +pytest-repeat==0.9.3 +yarl<2