Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 64 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,68 @@ pip install pytest-capquery

## Quick Start

The `capquery` fixture captures all SQLAlchemy statements executed by your code. The best practice
is to use the `capture()` context manager to isolate specific execution phases.
The plugin does not provide a default database fixture, as it is designed to adapt to your specific
SQLAlchemy topology. You **must** define a global fixture in your `conftest.py` to bind
`pytest-capquery` to your project's database engine.

### 1. Documenting with SQL Snapshots (Recommended)
### 1. Setting Up Your Fixture (`conftest.py`)

To intercept queries from your custom engine, use the `CapQueryWrapper` and inject the
`capquery_context` fixture (which automatically handles snapshot file resolution behind the scenes).

#### Standard Synchronous Engines

```python
import pytest
from pytest_capquery.plugin import CapQueryWrapper

@pytest.fixture(scope="function")
def postgres_capquery(postgres_engine, capquery_context):
"""Binds capquery to a custom PostgreSQL testing engine."""
with CapQueryWrapper(postgres_engine, snapshot_manager=capquery_context) as captured:
yield captured
```

#### Asynchronous Engines (`AsyncEngine`)

If your project uses SQLAlchemy's `AsyncEngine` (e.g., with `asyncpg` or `aiomysql`), you **must**
attach the wrapper to the underlying synchronous engine. SQLAlchemy does not support event listeners
directly on async engine proxies.

```python
import pytest
from pytest_capquery.plugin import CapQueryWrapper

@pytest.fixture(scope="function")
def async_pg_capquery(async_pg_engine, capquery_context):
"""
Binds capquery to an AsyncEngine by intercepting the underlying .sync_engine.
This prevents 'NotImplementedError: asynchronous events are not implemented' errors.
"""
with CapQueryWrapper(async_pg_engine.sync_engine, snapshot_manager=capquery_context) as captured:
yield captured
```

By following this pattern, your custom fixtures automatically inherit the full snapshot lifecycle,
error tracking, and CLI flags (`--capquery-update`) without needing to manually map test paths or
instantiate `SnapshotManager` objects.

### 2. Documenting with SQL Snapshots (Recommended)

The most efficient way to document and protect your queries is by utilizing physical snapshots. This
automatically compares execution behavior against tracked `.sql` files stored in a
`__capquery_snapshots__` directory.

Use the custom fixture you defined (e.g., `postgres_capquery`) and the `capture()` context manager
to isolate specific execution phases.

```python
def test_update_user_status(sqlite_session, capquery):
def test_update_user_status(postgres_session, postgres_capquery):
# Enable assert_snapshot to verify execution against the disk
with capquery.capture(assert_snapshot=True):
user = sqlite_session.query(User).filter_by(id=1).first()
with postgres_capquery.capture(assert_snapshot=True):
user = postgres_session.query(User).filter_by(id=1).first()
user.status = "active"
sqlite_session.commit()
postgres_session.commit()
```

**Workflow:** When writing a new test or updating existing query logic, run Pytest with the update
Expand All @@ -76,17 +122,17 @@ pytest --capquery-update
Future runs without the flag will strictly assert that the runtime queries perfectly match the
generated `.sql` file.

### 2. Manual Explicit Assertions (Verbose)
### 3. Manual Explicit Assertions (Verbose)

If you prefer to explicitly document the executed SQL directly inside your test cases, you can use
strict manual assertions.

```python
def test_update_user_status(sqlite_session, capquery):
with capquery.capture() as phase:
user = sqlite_session.query(User).filter_by(id=1).first()
def test_update_user_status(postgres_session, postgres_capquery):
with postgres_capquery.capture() as phase:
user = postgres_session.query(User).filter_by(id=1).first()
user.status = "active"
sqlite_session.commit()
postgres_session.commit()

# Verify the precise chronological timeline of the transaction
phase.assert_executed_queries(
Expand All @@ -95,13 +141,13 @@ def test_update_user_status(sqlite_session, capquery):
"""
SELECT users.id, users.status
FROM users
WHERE users.id = ?
WHERE users.id = %s
""",
(1,)
),
(
"""
UPDATE users SET status=? WHERE users.id = ?
UPDATE users SET status=%s WHERE users.id = %s
""",
("active", 1)
),
Expand All @@ -114,17 +160,17 @@ and the assertion fails, `pytest-capquery` will intercept the failure and drop t
assertion block directly into your terminal's stdout. Simply copy and paste the block from your
terminal directly into your test to instantly fix the regression!

### 3. Preventing N+1 Queries (Loose Assertion)
### 4. Preventing N+1 Queries (Loose Assertion)

If you want to protect a block of code against N+1 regressions without hardcoding exact SQL strings,
you can enforce a strict expected query count at the context boundary:

```python
def test_fetch_users(sqlite_session, capquery):
def test_fetch_users(postgres_session, postgres_capquery):
# Enforce that exactly 1 query is executed inside this block.
# If a lazy-loading loop triggers extra queries, this will raise an AssertionError.
with capquery.capture(expected_count=1):
users = sqlite_session.query(User).all()
with postgres_capquery.capture(expected_count=1):
users = postgres_session.query(User).all()
for user in users:
_ = user.address
```
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "pytest-capquery"
version = "0.3.0"
version = "0.3.1"
description = "A pytest fixture for high-precision SQL testing in SQLAlchemy."
readme = "README.md"
requires-python = ">=3.13"
Expand Down
33 changes: 20 additions & 13 deletions src/pytest_capquery/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
between execution logs over to assertions models natively surfacing the capquery local features.
"""

import ast
import datetime
import decimal
import uuid
from pathlib import Path
from typing import Callable, Dict, List, Optional, Tuple, Union

Expand Down Expand Up @@ -168,7 +170,17 @@ def _deserialize_snapshot(self, content: str) -> List[List[Union[str, Tuple[str,
if not query_str:
continue

params = ast.literal_eval(params_str)
eval_globals = {
"__builtins__": None,
"datetime": datetime.datetime,
"date": datetime.date,
"time": datetime.time,
"timedelta": datetime.timedelta,
"FakeDatetime": datetime.datetime,
"Decimal": decimal.Decimal,
"UUID": uuid.UUID,
}
params = eval(params_str, eval_globals)
item = query_str if params is None else (query_str, params)

while len(phases) < phase_num:
Expand Down Expand Up @@ -198,21 +210,16 @@ def pytest_addoption(parser: pytest.Parser) -> None:


@pytest.fixture
def capquery(request: pytest.FixtureRequest, sqlite_engine: Engine) -> CapQueryWrapper:
"""High-level standard testing interface securely delivering functional interception wrappers.
This fixture is specifically configured natively defaulting to standard SQLite validation.
def capquery_context(request: pytest.FixtureRequest) -> SnapshotManager:
"""Dynamically resolves snapshot disk mapping parameters transparently intercepting CLI flags.

Args:
request (pytest.FixtureRequest): The test execution meta properties injected natively.
sqlite_engine (Engine): The dynamically provisioned SQLite integration execution engine instance.

Returns:
CapQueryWrapper: An initialized interception footprint resolving assertions intelligently.
SnapshotManager: An initialized and locally configured snapshot filesystem driver.
"""
update_mode = request.config.getoption("--capquery-update")
snapshot_manager = SnapshotManager(
nodeid=request.node.nodeid, test_path=Path(request.node.path), update_mode=update_mode
)
update_mode = request.config.getoption("--capquery-update", default=False)
test_path = Path(request.node.path) if hasattr(request.node, "path") else Path.cwd()

with CapQueryWrapper(sqlite_engine, snapshot_manager=snapshot_manager) as captured:
yield captured
return SnapshotManager(nodeid=request.node.nodeid, test_path=test_path, update_mode=update_mode)
11 changes: 3 additions & 8 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
"""

import gc
from pathlib import Path
from typing import Generator

import pytest
Expand Down Expand Up @@ -67,16 +66,12 @@ def sqlite_session(sqlite_engine: Engine) -> Generator[Session, None, None]:

@pytest.fixture(scope="function")
def sqlite_capquery(
request: pytest.FixtureRequest, sqlite_engine: Engine
sqlite_engine: Engine, capquery_context: SnapshotManager
) -> Generator[CapQueryWrapper, None, None]:
"""Function-scoped fixture providing a CapQuery wrapper bound to the SQLite engine.

Automatically intercepts and captures SQL statements and transaction events dispatched from the
provided SQLite engine context.
provided SQLite engine context, leveraging the globally provided snapshot context.
"""
update_mode = request.config.getoption("--capquery-update", default=False)
snapshot_manager = SnapshotManager(
nodeid=request.node.nodeid, test_path=Path(request.node.path), update_mode=update_mode
)
with CapQueryWrapper(sqlite_engine, snapshot_manager=snapshot_manager) as captured:
with CapQueryWrapper(sqlite_engine, snapshot_manager=capquery_context) as captured:
yield captured
12 changes: 4 additions & 8 deletions tests/e2e/mysql/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
invalidated during teardown to maintain environment integrity and suppress system resource warnings.
"""

from pathlib import Path
from typing import Generator

import pytest
Expand Down Expand Up @@ -59,15 +58,12 @@ def mysql_session(mysql_engine: Engine) -> Generator[Session, None, None]:

@pytest.fixture(scope="function")
def mysql_capquery(
request: pytest.FixtureRequest, mysql_engine: Engine
mysql_engine: Engine, capquery_context: SnapshotManager
) -> Generator[CapQueryWrapper, None, None]:
"""Provide an engine-bound CapQuery interception interface for MySQL.

Catches and tracks PyMySQL Dialect queries executed through the provisioned engine.
Catches and tracks PyMySQL Dialect queries executed through the provisioned engine, leveraging
the globally provided snapshot context.
"""
update_mode = request.config.getoption("--capquery-update", default=False)
snapshot_manager = SnapshotManager(
nodeid=request.node.nodeid, test_path=Path(request.node.path), update_mode=update_mode
)
with CapQueryWrapper(mysql_engine, snapshot_manager=snapshot_manager) as captured:
with CapQueryWrapper(mysql_engine, snapshot_manager=capquery_context) as captured:
yield captured
12 changes: 4 additions & 8 deletions tests/e2e/postgres/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
resource warnings.
"""

from pathlib import Path
from typing import Generator

import pytest
Expand Down Expand Up @@ -55,15 +54,12 @@ def postgres_session(postgres_engine: Engine) -> Generator[Session, None, None]:

@pytest.fixture(scope="function")
def postgres_capquery(
request: pytest.FixtureRequest, postgres_engine: Engine
postgres_engine: Engine, capquery_context: SnapshotManager
) -> Generator[CapQueryWrapper, None, None]:
"""Provide an engine-bound CapQuery interception interface for PostgreSQL.

Catches and tracks psycopg2 Dialect queries executed through the provisioned engine.
Catches and tracks psycopg2 Dialect queries executed through the provisioned engine, leveraging
the globally provided snapshot context.
"""
update_mode = request.config.getoption("--capquery-update", default=False)
snapshot_manager = SnapshotManager(
nodeid=request.node.nodeid, test_path=Path(request.node.path), update_mode=update_mode
)
with CapQueryWrapper(postgres_engine, snapshot_manager=snapshot_manager) as captured:
with CapQueryWrapper(postgres_engine, snapshot_manager=capquery_context) as captured:
yield captured
Loading
Loading