diff --git a/README.md b/README.md index 9e16f9c..f2c5ad2 100644 --- a/README.md +++ b/README.md @@ -48,22 +48,68 @@ pip install pytest-capquery ## Quick Start -The `capquery` fixture captures all SQLAlchemy statements executed by your code. The best practice -is to use the `capture()` context manager to isolate specific execution phases. +The plugin does not provide a default database fixture, as it is designed to adapt to your specific +SQLAlchemy topology. You **must** define a global fixture in your `conftest.py` to bind +`pytest-capquery` to your project's database engine. -### 1. Documenting with SQL Snapshots (Recommended) +### 1. Setting Up Your Fixture (`conftest.py`) + +To intercept queries from your custom engine, use the `CapQueryWrapper` and inject the +`capquery_context` fixture (which automatically handles snapshot file resolution behind the scenes). + +#### Standard Synchronous Engines + +```python +import pytest +from pytest_capquery.plugin import CapQueryWrapper + +@pytest.fixture(scope="function") +def postgres_capquery(postgres_engine, capquery_context): + """Binds capquery to a custom PostgreSQL testing engine.""" + with CapQueryWrapper(postgres_engine, snapshot_manager=capquery_context) as captured: + yield captured +``` + +#### Asynchronous Engines (`AsyncEngine`) + +If your project uses SQLAlchemy's `AsyncEngine` (e.g., with `asyncpg` or `aiomysql`), you **must** +attach the wrapper to the underlying synchronous engine. SQLAlchemy does not support event listeners +directly on async engine proxies. + +```python +import pytest +from pytest_capquery.plugin import CapQueryWrapper + +@pytest.fixture(scope="function") +def async_pg_capquery(async_pg_engine, capquery_context): + """ + Binds capquery to an AsyncEngine by intercepting the underlying .sync_engine. + This prevents 'NotImplementedError: asynchronous events are not implemented' errors. + """ + with CapQueryWrapper(async_pg_engine.sync_engine, snapshot_manager=capquery_context) as captured: + yield captured +``` + +By following this pattern, your custom fixtures automatically inherit the full snapshot lifecycle, +error tracking, and CLI flags (`--capquery-update`) without needing to manually map test paths or +instantiate `SnapshotManager` objects. + +### 2. Documenting with SQL Snapshots (Recommended) The most efficient way to document and protect your queries is by utilizing physical snapshots. This automatically compares execution behavior against tracked `.sql` files stored in a `__capquery_snapshots__` directory. +Use the custom fixture you defined (e.g., `postgres_capquery`) and the `capture()` context manager +to isolate specific execution phases. + ```python -def test_update_user_status(sqlite_session, capquery): +def test_update_user_status(postgres_session, postgres_capquery): # Enable assert_snapshot to verify execution against the disk - with capquery.capture(assert_snapshot=True): - user = sqlite_session.query(User).filter_by(id=1).first() + with postgres_capquery.capture(assert_snapshot=True): + user = postgres_session.query(User).filter_by(id=1).first() user.status = "active" - sqlite_session.commit() + postgres_session.commit() ``` **Workflow:** When writing a new test or updating existing query logic, run Pytest with the update @@ -76,17 +122,17 @@ pytest --capquery-update Future runs without the flag will strictly assert that the runtime queries perfectly match the generated `.sql` file. -### 2. Manual Explicit Assertions (Verbose) +### 3. Manual Explicit Assertions (Verbose) If you prefer to explicitly document the executed SQL directly inside your test cases, you can use strict manual assertions. ```python -def test_update_user_status(sqlite_session, capquery): - with capquery.capture() as phase: - user = sqlite_session.query(User).filter_by(id=1).first() +def test_update_user_status(postgres_session, postgres_capquery): + with postgres_capquery.capture() as phase: + user = postgres_session.query(User).filter_by(id=1).first() user.status = "active" - sqlite_session.commit() + postgres_session.commit() # Verify the precise chronological timeline of the transaction phase.assert_executed_queries( @@ -95,13 +141,13 @@ def test_update_user_status(sqlite_session, capquery): """ SELECT users.id, users.status FROM users - WHERE users.id = ? + WHERE users.id = %s """, (1,) ), ( """ - UPDATE users SET status=? WHERE users.id = ? + UPDATE users SET status=%s WHERE users.id = %s """, ("active", 1) ), @@ -114,17 +160,17 @@ and the assertion fails, `pytest-capquery` will intercept the failure and drop t assertion block directly into your terminal's stdout. Simply copy and paste the block from your terminal directly into your test to instantly fix the regression! -### 3. Preventing N+1 Queries (Loose Assertion) +### 4. Preventing N+1 Queries (Loose Assertion) If you want to protect a block of code against N+1 regressions without hardcoding exact SQL strings, you can enforce a strict expected query count at the context boundary: ```python -def test_fetch_users(sqlite_session, capquery): +def test_fetch_users(postgres_session, postgres_capquery): # Enforce that exactly 1 query is executed inside this block. # If a lazy-loading loop triggers extra queries, this will raise an AssertionError. - with capquery.capture(expected_count=1): - users = sqlite_session.query(User).all() + with postgres_capquery.capture(expected_count=1): + users = postgres_session.query(User).all() for user in users: _ = user.address ``` diff --git a/pyproject.toml b/pyproject.toml index a97878a..198cbbc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "pytest-capquery" -version = "0.3.0" +version = "0.3.1" description = "A pytest fixture for high-precision SQL testing in SQLAlchemy." readme = "README.md" requires-python = ">=3.13" diff --git a/src/pytest_capquery/plugin.py b/src/pytest_capquery/plugin.py index e54a12f..ed01f82 100644 --- a/src/pytest_capquery/plugin.py +++ b/src/pytest_capquery/plugin.py @@ -7,7 +7,9 @@ between execution logs over to assertions models natively surfacing the capquery local features. """ -import ast +import datetime +import decimal +import uuid from pathlib import Path from typing import Callable, Dict, List, Optional, Tuple, Union @@ -168,7 +170,17 @@ def _deserialize_snapshot(self, content: str) -> List[List[Union[str, Tuple[str, if not query_str: continue - params = ast.literal_eval(params_str) + eval_globals = { + "__builtins__": None, + "datetime": datetime.datetime, + "date": datetime.date, + "time": datetime.time, + "timedelta": datetime.timedelta, + "FakeDatetime": datetime.datetime, + "Decimal": decimal.Decimal, + "UUID": uuid.UUID, + } + params = eval(params_str, eval_globals) item = query_str if params is None else (query_str, params) while len(phases) < phase_num: @@ -198,21 +210,16 @@ def pytest_addoption(parser: pytest.Parser) -> None: @pytest.fixture -def capquery(request: pytest.FixtureRequest, sqlite_engine: Engine) -> CapQueryWrapper: - """High-level standard testing interface securely delivering functional interception wrappers. - This fixture is specifically configured natively defaulting to standard SQLite validation. +def capquery_context(request: pytest.FixtureRequest) -> SnapshotManager: + """Dynamically resolves snapshot disk mapping parameters transparently intercepting CLI flags. Args: request (pytest.FixtureRequest): The test execution meta properties injected natively. - sqlite_engine (Engine): The dynamically provisioned SQLite integration execution engine instance. Returns: - CapQueryWrapper: An initialized interception footprint resolving assertions intelligently. + SnapshotManager: An initialized and locally configured snapshot filesystem driver. """ - update_mode = request.config.getoption("--capquery-update") - snapshot_manager = SnapshotManager( - nodeid=request.node.nodeid, test_path=Path(request.node.path), update_mode=update_mode - ) + update_mode = request.config.getoption("--capquery-update", default=False) + test_path = Path(request.node.path) if hasattr(request.node, "path") else Path.cwd() - with CapQueryWrapper(sqlite_engine, snapshot_manager=snapshot_manager) as captured: - yield captured + return SnapshotManager(nodeid=request.node.nodeid, test_path=test_path, update_mode=update_mode) diff --git a/tests/conftest.py b/tests/conftest.py index ab6c1bc..798e849 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,7 +6,6 @@ """ import gc -from pathlib import Path from typing import Generator import pytest @@ -67,16 +66,12 @@ def sqlite_session(sqlite_engine: Engine) -> Generator[Session, None, None]: @pytest.fixture(scope="function") def sqlite_capquery( - request: pytest.FixtureRequest, sqlite_engine: Engine + sqlite_engine: Engine, capquery_context: SnapshotManager ) -> Generator[CapQueryWrapper, None, None]: """Function-scoped fixture providing a CapQuery wrapper bound to the SQLite engine. Automatically intercepts and captures SQL statements and transaction events dispatched from the - provided SQLite engine context. + provided SQLite engine context, leveraging the globally provided snapshot context. """ - update_mode = request.config.getoption("--capquery-update", default=False) - snapshot_manager = SnapshotManager( - nodeid=request.node.nodeid, test_path=Path(request.node.path), update_mode=update_mode - ) - with CapQueryWrapper(sqlite_engine, snapshot_manager=snapshot_manager) as captured: + with CapQueryWrapper(sqlite_engine, snapshot_manager=capquery_context) as captured: yield captured diff --git a/tests/e2e/mysql/conftest.py b/tests/e2e/mysql/conftest.py index a2048b0..c6f217f 100644 --- a/tests/e2e/mysql/conftest.py +++ b/tests/e2e/mysql/conftest.py @@ -5,7 +5,6 @@ invalidated during teardown to maintain environment integrity and suppress system resource warnings. """ -from pathlib import Path from typing import Generator import pytest @@ -59,15 +58,12 @@ def mysql_session(mysql_engine: Engine) -> Generator[Session, None, None]: @pytest.fixture(scope="function") def mysql_capquery( - request: pytest.FixtureRequest, mysql_engine: Engine + mysql_engine: Engine, capquery_context: SnapshotManager ) -> Generator[CapQueryWrapper, None, None]: """Provide an engine-bound CapQuery interception interface for MySQL. - Catches and tracks PyMySQL Dialect queries executed through the provisioned engine. + Catches and tracks PyMySQL Dialect queries executed through the provisioned engine, leveraging + the globally provided snapshot context. """ - update_mode = request.config.getoption("--capquery-update", default=False) - snapshot_manager = SnapshotManager( - nodeid=request.node.nodeid, test_path=Path(request.node.path), update_mode=update_mode - ) - with CapQueryWrapper(mysql_engine, snapshot_manager=snapshot_manager) as captured: + with CapQueryWrapper(mysql_engine, snapshot_manager=capquery_context) as captured: yield captured diff --git a/tests/e2e/postgres/conftest.py b/tests/e2e/postgres/conftest.py index 39b7913..4b1239d 100644 --- a/tests/e2e/postgres/conftest.py +++ b/tests/e2e/postgres/conftest.py @@ -6,7 +6,6 @@ resource warnings. """ -from pathlib import Path from typing import Generator import pytest @@ -55,15 +54,12 @@ def postgres_session(postgres_engine: Engine) -> Generator[Session, None, None]: @pytest.fixture(scope="function") def postgres_capquery( - request: pytest.FixtureRequest, postgres_engine: Engine + postgres_engine: Engine, capquery_context: SnapshotManager ) -> Generator[CapQueryWrapper, None, None]: """Provide an engine-bound CapQuery interception interface for PostgreSQL. - Catches and tracks psycopg2 Dialect queries executed through the provisioned engine. + Catches and tracks psycopg2 Dialect queries executed through the provisioned engine, leveraging + the globally provided snapshot context. """ - update_mode = request.config.getoption("--capquery-update", default=False) - snapshot_manager = SnapshotManager( - nodeid=request.node.nodeid, test_path=Path(request.node.path), update_mode=update_mode - ) - with CapQueryWrapper(postgres_engine, snapshot_manager=snapshot_manager) as captured: + with CapQueryWrapper(postgres_engine, snapshot_manager=capquery_context) as captured: yield captured diff --git a/tests/unit/test_asserter.py b/tests/unit/test_asserter.py index 54e0c43..34f39fe 100644 --- a/tests/unit/test_asserter.py +++ b/tests/unit/test_asserter.py @@ -13,26 +13,26 @@ from pytest_capquery.snapshot import SnapshotManager -def test_single_query(capquery, sqlite_engine): +def test_single_query(sqlite_capquery, sqlite_engine): """Validates that a singular executed query is successfully verified, correctly identified within its ambient transaction boundary lifecycle tracking.""" with sqlite_engine.connect() as conn: conn.execute(text("SELECT :x"), {"x": 1}) - capquery.assert_executed_queries("BEGIN", ("SELECT ?", (1,)), "ROLLBACK") + sqlite_capquery.assert_executed_queries("BEGIN", ("SELECT ?", (1,)), "ROLLBACK") -def test_multiple_queries(capquery, sqlite_engine): +def test_multiple_queries(sqlite_capquery, sqlite_engine): """Validates that multiple linearly executed queries successfully form chronological ordered sequences verifiable under the assertion framework.""" with sqlite_engine.connect() as conn: conn.execute(text("SELECT 1")) conn.execute(text("SELECT 2")) - capquery.assert_executed_queries("BEGIN", "SELECT 1", "SELECT 2", "ROLLBACK") + sqlite_capquery.assert_executed_queries("BEGIN", "SELECT 1", "SELECT 2", "ROLLBACK") -def test_commit(capquery, sqlite_engine): +def test_commit(sqlite_capquery, sqlite_engine): """Confirms an explicit commit lifecycle generates matching BEGIN and COMMIT events alongside correctly registered nested SAVEPOINT interactions.""" with sqlite_engine.connect() as conn: @@ -41,7 +41,7 @@ def test_commit(capquery, sqlite_engine): with conn.begin_nested(): conn.execute(text("SELECT 2")) - capquery.assert_executed_queries( + sqlite_capquery.assert_executed_queries( "BEGIN", "SELECT 1", "SAVEPOINT sa_savepoint_1", @@ -51,7 +51,7 @@ def test_commit(capquery, sqlite_engine): ) -def test_nested_transaction_rollback(capquery, sqlite_engine): +def test_nested_transaction_rollback(sqlite_capquery, sqlite_engine): """Confirms nested transaction rollback behavior suppresses standard RELEASE markers and legitimately emits ROLLBACK TO SAVEPOINT signals perfectly matching runtime.""" with sqlite_engine.connect() as conn: @@ -61,7 +61,7 @@ def test_nested_transaction_rollback(capquery, sqlite_engine): conn.execute(text("SELECT 2")) nested.rollback() - capquery.assert_executed_queries( + sqlite_capquery.assert_executed_queries( "BEGIN", "SELECT 1", "SAVEPOINT sa_savepoint_1", @@ -71,7 +71,7 @@ def test_nested_transaction_rollback(capquery, sqlite_engine): ) -def test_complex_nested_transactions(capquery, sqlite_engine): +def test_complex_nested_transactions(sqlite_capquery, sqlite_engine): """Proves comprehensive multi-layered deeply nested executions including successful releases alongside rolling back targeted sub-phases resolve meticulously.""" with sqlite_engine.connect() as conn: @@ -84,7 +84,7 @@ def test_complex_nested_transactions(capquery, sqlite_engine): conn.execute(text("SELECT 3")) nested.rollback() - capquery.assert_executed_queries( + sqlite_capquery.assert_executed_queries( "BEGIN", "SELECT 1", "SAVEPOINT sa_savepoint_1", @@ -97,16 +97,16 @@ def test_complex_nested_transactions(capquery, sqlite_engine): ) -def test_transaction_begin_commit(capquery, sqlite_engine): +def test_transaction_begin_commit(sqlite_capquery, sqlite_engine): """Ensures baseline engine context manager invocations yield standard simple commit boundaries appropriately.""" with sqlite_engine.begin() as conn: conn.execute(text("SELECT 1")) - capquery.assert_executed_queries("BEGIN", "SELECT 1", "COMMIT") + sqlite_capquery.assert_executed_queries("BEGIN", "SELECT 1", "COMMIT") -def test_transaction_rollback(capquery, sqlite_engine): +def test_transaction_rollback(sqlite_capquery, sqlite_engine): """Ensures manual connection closures following specific execution rollback loops are mapped reliably without hanging connections.""" conn = sqlite_engine.connect() @@ -115,10 +115,10 @@ def test_transaction_rollback(capquery, sqlite_engine): trans.rollback() conn.close() - capquery.assert_executed_queries("BEGIN", "SELECT 1", "ROLLBACK") + sqlite_capquery.assert_executed_queries("BEGIN", "SELECT 1", "ROLLBACK") -def test_nested_partial_rollback(capquery, sqlite_engine): +def test_nested_partial_rollback(sqlite_capquery, sqlite_engine): """Verifies that subsequent statements seamlessly append into the global transaction chain post- recovery of a nested sub-transaction rollback.""" with sqlite_engine.connect() as conn: @@ -129,7 +129,7 @@ def test_nested_partial_rollback(capquery, sqlite_engine): nested.rollback() conn.execute(text("SELECT 3")) - capquery.assert_executed_queries( + sqlite_capquery.assert_executed_queries( "BEGIN", "SELECT 1", "SAVEPOINT sa_savepoint_1", @@ -140,7 +140,7 @@ def test_nested_partial_rollback(capquery, sqlite_engine): ) -def test_assertion_error_count_mismatch(capquery, sqlite_engine): +def test_assertion_error_count_mismatch(sqlite_capquery, sqlite_engine): """Ensures the assertion engine correctly flags a mismatch in the raw chronological length of executed tuples compared to expectations.""" with sqlite_engine.connect() as conn: @@ -148,17 +148,17 @@ def test_assertion_error_count_mismatch(capquery, sqlite_engine): conn.execute(text("SELECT 2")) with pytest.raises(AssertionError, match=r"Expected 3 queries, but found 4\."): - capquery.assert_executed_queries("BEGIN", "SELECT 1", "ROLLBACK") + sqlite_capquery.assert_executed_queries("BEGIN", "SELECT 1", "ROLLBACK") -def test_assertion_error_sql_mismatch(capquery, sqlite_engine): +def test_assertion_error_sql_mismatch(sqlite_capquery, sqlite_engine): """Guarantees structural variance in SQL statement sequences triggers precise and actionably diffed validation error blocks exposing actual against expected outputs.""" with sqlite_engine.connect() as conn: conn.execute(text("SELECT 1")) with pytest.raises(AssertionError) as exc_info: - capquery.assert_executed_queries("BEGIN", "SELECT 2", "ROLLBACK") + sqlite_capquery.assert_executed_queries("BEGIN", "SELECT 2", "ROLLBACK") error_msg = str(exc_info.value) assert "Expected SQL:" in error_msg @@ -167,55 +167,57 @@ def test_assertion_error_sql_mismatch(capquery, sqlite_engine): assert "SELECT 1" in error_msg -def test_assertion_error_parameter_mismatch(capquery, sqlite_engine): +def test_assertion_error_parameter_mismatch(sqlite_capquery, sqlite_engine): """Identifies bounds parameter failures dynamically, providing a strict failure should actual execution bound variables misalign with their hardcoded specification.""" with sqlite_engine.connect() as conn: conn.execute(text("SELECT :x"), {"x": 1}) with pytest.raises(AssertionError) as exc_info: - capquery.assert_executed_queries("BEGIN", ("SELECT ?", (2,)), "ROLLBACK") + sqlite_capquery.assert_executed_queries("BEGIN", ("SELECT ?", (2,)), "ROLLBACK") error_msg = str(exc_info.value) assert "Expected Params:" in error_msg assert "Actual Params:" in error_msg -def test_assertion_error_unexpected_parameters(capquery, sqlite_engine): +def test_assertion_error_unexpected_parameters(sqlite_capquery, sqlite_engine): """Provides developer coverage warning if a developer explicitly asserts an empty parameter signature but underlying SQL triggers runtime injections anyway.""" with sqlite_engine.connect() as conn: conn.execute(text("SELECT :x"), {"x": 1}) with pytest.raises(AssertionError) as exc_info: - capquery.assert_executed_queries("BEGIN", "SELECT ?", "ROLLBACK") + sqlite_capquery.assert_executed_queries("BEGIN", "SELECT ?", "ROLLBACK") error_msg = str(exc_info.value) assert "Expected Params to be empty or None, but got:" in error_msg -def test_assertion_error_missing_executed_query(capquery, sqlite_engine): +def test_assertion_error_missing_executed_query(sqlite_capquery, sqlite_engine): """Verifies timeline overrun expectations correctly fail when developers declare more testing conditions than actual timeline triggers resolve during assertion sequence.""" with sqlite_engine.connect() as conn: conn.execute(text("SELECT 1")) with pytest.raises(AssertionError) as exc_info: - capquery.assert_executed_queries("BEGIN", "SELECT 1", "ROLLBACK", "SELECT 2", strict=False) + sqlite_capquery.assert_executed_queries( + "BEGIN", "SELECT 1", "ROLLBACK", "SELECT 2", strict=False + ) error_msg = str(exc_info.value) assert "Mismatch at index 3" in error_msg assert "Expected query or event but no more statements were recorded" in error_msg -def test_assertion_error_generates_stdout_copy_paste_block(capquery, sqlite_engine, capsys): +def test_assertion_error_generates_stdout_copy_paste_block(sqlite_capquery, sqlite_engine, capsys): """Validates that a failed assertion outputs a strictly and perfectly formatted Python block to standard out facilitating rapid copy-pasting for regression fixes.""" with sqlite_engine.connect() as conn: conn.execute(text("SELECT 1")) with pytest.raises(AssertionError): - capquery.assert_executed_queries("BEGIN", "SELECT 2", "ROLLBACK") + sqlite_capquery.assert_executed_queries("BEGIN", "SELECT 2", "ROLLBACK") captured = capsys.readouterr() stdout = captured.out @@ -243,16 +245,16 @@ def test_assertion_error_generates_stdout_copy_paste_block(capquery, sqlite_engi assert stdout == expected_stdout -def test_assertion_error_copy_paste_block_no_params(capquery, capsys): +def test_assertion_error_copy_paste_block_no_params(sqlite_capquery, capsys): """Ensures robust generation parameter combinations are handled seamlessly for the terminal output block ensuring deep coverage logic triggers nicely.""" - capquery.statements.append(TxEvent("BEGIN")) + sqlite_capquery.statements.append(TxEvent("BEGIN")) long_sql = "SELECT column_a, column_b, column_c FROM some_very_long_table_name" - capquery.statements.append(TxEvent(statement=long_sql, parameters=None)) + sqlite_capquery.statements.append(TxEvent(statement=long_sql, parameters=None)) with pytest.raises(AssertionError): - capquery.assert_executed_queries("EXPECTED_SOMETHING_ELSE") + sqlite_capquery.assert_executed_queries("EXPECTED_SOMETHING_ELSE") stdout = capsys.readouterr().out @@ -260,14 +262,14 @@ def test_assertion_error_copy_paste_block_no_params(capquery, capsys): assert "FROM some_very_long_table_name" in stdout -def test_assertion_error_copy_paste_block_empty_query(capquery, capsys): +def test_assertion_error_copy_paste_block_empty_query(sqlite_capquery, capsys): """Ensures complete formatting stability preventing unexpected terminal panics if capturing perfectly empty queries internally.""" empty_stmt = TxEvent(statement="", parameters=None) - capquery.statements.append(empty_stmt) + sqlite_capquery.statements.append(empty_stmt) with pytest.raises(AssertionError): - capquery.assert_executed_queries("EXPECTED_SOMETHING_ELSE") + sqlite_capquery.assert_executed_queries("EXPECTED_SOMETHING_ELSE") stdout = capsys.readouterr().out diff --git a/tests/unit/test_models.py b/tests/unit/test_models.py index 7b3a9a6..543c6b7 100644 --- a/tests/unit/test_models.py +++ b/tests/unit/test_models.py @@ -10,7 +10,7 @@ from tests.models import AlarmPanel, Sensor -def test_orm_insert(sqlite_session, capquery): +def test_orm_insert(sqlite_session, sqlite_capquery): """Ensures that standard sequential class-based object insertions resolve distinctly to correct parameter tuple bounds perfectly matching standard SQL generation paths.""" panel = AlarmPanel(mac_address="00:11:22:33:44:55", is_online=True) @@ -20,10 +20,10 @@ def test_orm_insert(sqlite_session, capquery): sqlite_session.add(panel) - capquery.statements.clear() + sqlite_capquery.statements.clear() sqlite_session.flush() - capquery.assert_executed_queries( + sqlite_capquery.assert_executed_queries( "BEGIN", ( """ @@ -49,19 +49,19 @@ def test_orm_insert(sqlite_session, capquery): ) -def test_orm_update(sqlite_session, capquery): +def test_orm_update(sqlite_session, sqlite_capquery): """Confirms targeted object parameter replacement automatically delegates strict SQL update strings precisely bounded against verified schema definitions continuously.""" panel = AlarmPanel(mac_address="AA:BB:CC:DD:EE:FF", is_online=False) sqlite_session.add(panel) sqlite_session.flush() - capquery.statements.clear() + sqlite_capquery.statements.clear() panel = sqlite_session.query(AlarmPanel).filter_by(mac_address="AA:BB:CC:DD:EE:FF").first() panel.is_online = True sqlite_session.flush() - capquery.assert_executed_queries( + sqlite_capquery.assert_executed_queries( ( """ SELECT @@ -85,7 +85,7 @@ def test_orm_update(sqlite_session, capquery): ) -def test_orm_delete(sqlite_session, capquery): +def test_orm_delete(sqlite_session, sqlite_capquery): """Validates cascaded removal triggers corresponding object deletion queries cleanly leaving exact session markers fully transparent for regression checking layers.""" panel = AlarmPanel(mac_address="11:22:33:44:55:66", is_online=True) @@ -94,13 +94,13 @@ def test_orm_delete(sqlite_session, capquery): sqlite_session.add(panel) sqlite_session.flush() - capquery.statements.clear() + sqlite_capquery.statements.clear() sensor_to_delete = sqlite_session.query(Sensor).filter_by(name="Back Door").first() sqlite_session.delete(sensor_to_delete) sqlite_session.flush() - capquery.assert_executed_queries( + sqlite_capquery.assert_executed_queries( ( """ SELECT @@ -124,21 +124,21 @@ def test_orm_delete(sqlite_session, capquery): ) -def test_orm_select(sqlite_session, capquery): +def test_orm_select(sqlite_session, sqlite_capquery): """Assures simple declarative fetching routines bypass unnecessary engine overheads, directly exposing concise limiting offset targets deterministically properly.""" panel = AlarmPanel(mac_address="22:33:44:55:66:77", is_online=False) sqlite_session.add(panel) sqlite_session.flush() - capquery.statements.clear() + sqlite_capquery.statements.clear() fetched_panel = ( sqlite_session.query(AlarmPanel).filter_by(mac_address="22:33:44:55:66:77").first() ) assert fetched_panel is not None - capquery.assert_executed_queries( + sqlite_capquery.assert_executed_queries( ( """ SELECT @@ -154,7 +154,7 @@ def test_orm_select(sqlite_session, capquery): ) -def test_avoid_n_plus_one_queries(sqlite_session, capquery): +def test_avoid_n_plus_one_queries(sqlite_session, sqlite_capquery): """Simulates optimized bulk ingestion loading architectures, effectively asserting joined-load configuration successfully prevents N+1 execution disasters logically.""" for i in range(3): @@ -165,7 +165,7 @@ def test_avoid_n_plus_one_queries(sqlite_session, capquery): sqlite_session.flush() sqlite_session.expunge_all() - capquery.statements.clear() + sqlite_capquery.statements.clear() panels = sqlite_session.query(AlarmPanel).options(joinedload(AlarmPanel.sensors)).all() @@ -174,7 +174,7 @@ def test_avoid_n_plus_one_queries(sqlite_session, capquery): for sensor in panel.sensors: _ = sensor.name - capquery.assert_executed_queries( + sqlite_capquery.assert_executed_queries( """ SELECT alarm_panels.id AS alarm_panels_id, @@ -191,7 +191,7 @@ def test_avoid_n_plus_one_queries(sqlite_session, capquery): ) -def test_demonstrate_n_plus_one_problem(sqlite_session, capquery): +def test_demonstrate_n_plus_one_problem(sqlite_session, sqlite_capquery): """Establishes an intentional N+1 failure proving the execution tracer successfully chronicles repeating redundant queries reliably exposing bad optimization architectures.""" for i in range(3): @@ -202,7 +202,7 @@ def test_demonstrate_n_plus_one_problem(sqlite_session, capquery): sqlite_session.flush() sqlite_session.expunge_all() - capquery.statements.clear() + sqlite_capquery.statements.clear() panels = sqlite_session.query(AlarmPanel).all() @@ -211,7 +211,7 @@ def test_demonstrate_n_plus_one_problem(sqlite_session, capquery): for sensor in panel.sensors: _ = sensor.name - capquery.assert_executed_queries( + sqlite_capquery.assert_executed_queries( """ SELECT alarm_panels.id AS alarm_panels_id, diff --git a/tests/unit/test_plugin.py b/tests/unit/test_plugin.py index 40cbb37..1d26c54 100644 --- a/tests/unit/test_plugin.py +++ b/tests/unit/test_plugin.py @@ -13,14 +13,14 @@ from pytest_capquery.plugin import CapQueryWrapper -def test_capture_block_isolation(sqlite_engine, capquery): +def test_capture_block_isolation(sqlite_engine, sqlite_capquery): """Ensures that queries executed outside the specific capture context block are not visible inside its internal ledger, but the parent global wrapper maintains omnipresent scope tracking.""" with sqlite_engine.begin() as conn: conn.execute(text("SELECT 1")) - with capquery.capture() as phase: + with sqlite_capquery.capture() as phase: conn.execute(text("SELECT 2")) conn.execute(text("SELECT 3")) @@ -28,48 +28,50 @@ def test_capture_block_isolation(sqlite_engine, capquery): phase.assert_executed_queries("SELECT 2", "SELECT 3", strict=True) - capquery.assert_executed_queries( + sqlite_capquery.assert_executed_queries( "BEGIN", "SELECT 1", "SELECT 2", "SELECT 3", "SELECT 4", "COMMIT", strict=True ) -def test_multiple_sequential_captures(sqlite_engine, capquery): +def test_multiple_sequential_captures(sqlite_engine, sqlite_capquery): """Verifies that declaring sequence-based parallel capture boundaries safely establishes strict chronological segmentation logic across all involved instances reliably.""" with sqlite_engine.begin() as conn: - with capquery.capture() as phase_one: + with sqlite_capquery.capture() as phase_one: conn.execute(text("SELECT 'A'")) - with capquery.capture() as phase_two: + with sqlite_capquery.capture() as phase_two: conn.execute(text("SELECT 'B'")) conn.execute(text("SELECT 'C'")) phase_one.assert_executed_queries("SELECT 'A'") phase_two.assert_executed_queries("SELECT 'B'", "SELECT 'C'") - capquery.assert_executed_queries("BEGIN", "SELECT 'A'", "SELECT 'B'", "SELECT 'C'", "COMMIT") + sqlite_capquery.assert_executed_queries( + "BEGIN", "SELECT 'A'", "SELECT 'B'", "SELECT 'C'", "COMMIT" + ) -def test_capture_expected_count_success(sqlite_engine, capquery): +def test_capture_expected_count_success(sqlite_engine, sqlite_capquery): """Validates that the expected statement quantity constraints silently yield correctly when boundary lengths align structurally to the strict metric assigned directly.""" with sqlite_engine.begin() as conn: - with capquery.capture(expected_count=2): + with sqlite_capquery.capture(expected_count=2): conn.execute(text("SELECT 1")) conn.execute(text("SELECT 2")) -def test_capture_expected_count_failure(sqlite_engine, capquery): +def test_capture_expected_count_failure(sqlite_engine, sqlite_capquery): """Establishes that exceeding execution limits promptly evaluates the block termination logic yielding critical validation exception sequences dynamically notifying the layer.""" with sqlite_engine.begin() as conn: with pytest.raises(AssertionError, match="Expected 1 queries, but found 2"): - with capquery.capture(expected_count=1): + with sqlite_capquery.capture(expected_count=1): conn.execute(text("SELECT 1")) conn.execute(text("SELECT 2")) -def test_capture_expected_count_bypassed_on_exception(sqlite_engine, capquery): +def test_capture_expected_count_bypassed_on_exception(sqlite_engine, sqlite_capquery): """Protects user experience ensuring unexpected business logic framework crashes bypass the secondary assertions internally preserving parent traceability stack tracks natively.""" @@ -78,16 +80,16 @@ class BusinessLogicError(Exception): with pytest.raises(BusinessLogicError): with sqlite_engine.begin() as conn: - with capquery.capture(expected_count=2): + with sqlite_capquery.capture(expected_count=2): conn.execute(text("SELECT 1")) raise BusinessLogicError("Something went wrong in the app") -def test_capture_active_state_assertions(sqlite_engine, capquery): +def test_capture_active_state_assertions(sqlite_engine, sqlite_capquery): """Asserts validation handlers successfully maintain execution verification capabilities while running functionally mid-transaction continuously across the scope stack directly.""" with sqlite_engine.begin() as conn: - with capquery.capture() as active_phase: + with sqlite_capquery.capture() as active_phase: conn.execute(text("SELECT 1")) active_phase.assert_executed_queries("SELECT 1") diff --git a/tests/unit/test_snapshot.py b/tests/unit/test_snapshot.py index b476527..0f95d4b 100644 --- a/tests/unit/test_snapshot.py +++ b/tests/unit/test_snapshot.py @@ -13,7 +13,7 @@ from tests.models import AlarmPanel, Sensor -def test_user_business_logic(sqlite_session, capquery): +def test_user_business_logic(sqlite_session, sqlite_capquery): """Simulates a standard practical snapshot generation sequence, verifying the plugin captures local nested blocks directly resolving them appropriately against implicit automated file allocations automatically tied to the host test invocation.""" @@ -21,13 +21,13 @@ def test_user_business_logic(sqlite_session, capquery): sqlite_session.add(panel) sqlite_session.flush() - with capquery.capture(assert_snapshot=True): + with sqlite_capquery.capture(assert_snapshot=True): sensor = Sensor(name="Front Door", sensor_type="Contact") panel.sensors.append(sensor) sqlite_session.flush() -def test_user_multiple_phases(sqlite_session, capquery): +def test_user_multiple_phases(sqlite_session, sqlite_capquery): """Confirms multiple capture phases inside single transaction tests write and check their relative sequential markers distinctly within the overarching module scope. @@ -35,17 +35,17 @@ def test_user_multiple_phases(sqlite_session, capquery): """ panel = AlarmPanel(mac_address="AA:BB:CC:DD:EE:FF", is_online=True) - with capquery.capture(assert_snapshot=True, alias="Panel Setup Phase"): + with sqlite_capquery.capture(assert_snapshot=True, alias="Panel Setup Phase"): sqlite_session.add(panel) sqlite_session.flush() - with capquery.capture(assert_snapshot=True): + with sqlite_capquery.capture(assert_snapshot=True): sensor_1 = Sensor(name="Living Room", sensor_type="Motion") sensor_2 = Sensor(name="Back Door", sensor_type="Contact") panel.sensors.extend([sensor_1, sensor_2]) sqlite_session.flush() - with capquery.capture(assert_snapshot=True, alias="Status Toggle and Deletion Phase"): + with sqlite_capquery.capture(assert_snapshot=True, alias="Status Toggle and Deletion Phase"): panel.is_online = False panel.sensors.remove(sensor_1) sqlite_session.flush()