Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ CHANGELOG
7.0.7 (unreleased)
------------------

- Nothing changed yet.
- feat: create reCAPTCHA validation utility
Comment thread
rboixaderg marked this conversation as resolved.
[rboixaderg]
Comment thread
rboixaderg marked this conversation as resolved.


7.0.6 (2025-10-10)
Expand Down
5 changes: 5 additions & 0 deletions guillotina/_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
"factory": "guillotina.async_util.AsyncJobPool",
"settings": {"max_size": 5},
},
"guillotina.recaptcha": {
"provides": "guillotina.interfaces.IRecaptchaValidationUtility",
"factory": "guillotina.auth.recaptcha.RecaptchaValidator",
"settings": {},
},
},
"store_json": True,
"pickle_protocol": pickle.HIGHEST_PROTOCOL,
Expand Down
29 changes: 16 additions & 13 deletions guillotina/api/login.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from guillotina import configure
from guillotina.api.service import Service
from guillotina.auth import authenticate_user
from guillotina.auth.recaptcha import RecaptchaValidator
from guillotina.auth.utils import find_user
from guillotina.component import get_utility
from guillotina.component import query_utility
Expand All @@ -16,6 +15,7 @@
from guillotina.interfaces import IAuthValidationUtility
from guillotina.interfaces import IContainer
from guillotina.interfaces import ISessionManagerUtility
from guillotina.interfaces.async_util import IRecaptchaValidationUtility
from guillotina.response import HTTPNotAcceptable
from guillotina.response import HTTPPreconditionFailed
from guillotina.response import HTTPUnauthorized
Expand Down Expand Up @@ -240,10 +240,11 @@ async def __call__(self):
raise HTTPNotAcceptable()
else:
# We validate with recaptcha
validator = RecaptchaValidator()
status = await validator.validate()
if status is False:
raise HTTPUnauthorized(content={"text": "Invalid validation"})
recaptcha_validator = query_utility(IRecaptchaValidationUtility)
if recaptcha_validator is not None:
status = await recaptcha_validator.validate()
if status is False:
raise HTTPUnauthorized(content={"text": "Invalid validation"})

# We need to validate is a valid user
user = await find_user({"id": user_id})
Expand Down Expand Up @@ -338,10 +339,11 @@ async def __call__(self):
if allowed is False:
raise HTTPUnauthorized(content={"text": "Not allowed registration"})

validator = RecaptchaValidator()
status = await validator.validate()
if status is False:
raise HTTPUnauthorized(content={"text": "Invalid validation"})
recaptcha_validator = query_utility(IRecaptchaValidationUtility)
if recaptcha_validator is not None:
status = await recaptcha_validator.validate()
if status is False:
raise HTTPUnauthorized(content={"text": "Invalid validation"})

payload = await self.request.json()

Expand Down Expand Up @@ -398,10 +400,11 @@ async def __call__(self):
)
class InfoAccess(Service):
async def __call__(self):
validator = RecaptchaValidator()
status = await validator.validate()
if status is False:
raise HTTPUnauthorized(content={"text": "Invalid validation"})
recaptcha_validator = query_utility(IRecaptchaValidationUtility)
if recaptcha_validator is not None:
status = await recaptcha_validator.validate()
if status is False:
raise HTTPUnauthorized(content={"text": "Invalid validation"})

auth_providers = app_settings.get("auth_providers", {})
providers = []
Expand Down
1 change: 0 additions & 1 deletion guillotina/auth/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from .groups import GroupsUtility # noqa
from .recaptcha import RecaptchaValidator # noqa
from .utils import authenticate_request # noqa
from .utils import authenticate_user # noqa
from .utils import find_user # noqa
Expand Down
13 changes: 12 additions & 1 deletion guillotina/auth/recaptcha.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from guillotina import app_settings
from guillotina.interfaces.async_util import IRecaptchaValidationUtility
from guillotina.utils import get_current_request
from zope.interface import implementer

import logging

Expand All @@ -10,10 +12,19 @@
VALIDATION_HEADER = "X-VALIDATION-G"


@implementer(IRecaptchaValidationUtility)
class RecaptchaValidator:
# Not valid to generate a user
for_validators = ()

async def initialize(self, app=None):
"""Initialize the utility (no-op for reCAPTCHA)."""
pass

async def finalize(self, app=None):
"""Finalize the utility (no-op for reCAPTCHA)."""
pass

async def validate(self):
request = get_current_request()
token = request.headers.get(VALIDATION_HEADER)
Expand All @@ -35,7 +46,7 @@ async def validate(self):
data = await resp.json()
except Exception: # pragma: no cover
logger.warning("Did not get json response", exc_info=True)
return
return False
try:
return data["success"]
except Exception: # pragma: no cover
Expand Down
1 change: 1 addition & 0 deletions guillotina/interfaces/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .async_util import ICacheUtility # noqa
from .async_util import IPubSubUtility # noqa
from .async_util import IQueueUtility # noqa
from .async_util import IRecaptchaValidationUtility # noqa
from .async_util import ISessionManagerUtility # noqa
from .behaviors import IAsyncBehavior # noqa
from .behaviors import IBehavior # noqa
Expand Down
5 changes: 5 additions & 0 deletions guillotina/interfaces/async_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,8 @@ async def refresh_session(ident: str, session: str) -> str:
"""
Refresh an actual session
"""


class IRecaptchaValidationUtility(IAsyncUtility):
Comment thread
rboixaderg marked this conversation as resolved.
async def validate() -> bool:
Comment thread
rboixaderg marked this conversation as resolved.
pass
190 changes: 190 additions & 0 deletions guillotina/tests/test_recaptcha.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
from guillotina import app_settings
from guillotina.auth.recaptcha import RECAPTCHA_VALIDATION_URL
from guillotina.auth.recaptcha import RecaptchaValidator
from guillotina.auth.recaptcha import VALIDATION_HEADER
from guillotina.component import query_utility
from guillotina.interfaces.async_util import IRecaptchaValidationUtility
from guillotina.tests import utils
from unittest.mock import AsyncMock
from unittest.mock import MagicMock
from unittest.mock import patch

import json
import pytest


pytestmark = pytest.mark.asyncio

FAKE_RECAPTCHA = "FAKE_RECAPTCHA"


class TestRecaptchaValidator:
"""Test the RecaptchaValidator utility class directly."""

async def test_initialize_and_finalize(self):
"""Test that initialize and finalize methods exist and work."""
validator = RecaptchaValidator()
await validator.initialize()
await validator.finalize()

async def test_validate_with_fake_recaptcha(self):
"""Test validation with fake recaptcha token."""
app_settings["_fake_recaptcha_"] = FAKE_RECAPTCHA
request = utils.get_mocked_request(headers={VALIDATION_HEADER: FAKE_RECAPTCHA})
utils.task_vars.request.set(request)

validator = RecaptchaValidator()
result = await validator.validate()
assert result is True
Comment thread
rboixaderg marked this conversation as resolved.
Outdated

async def test_validate_without_configuration(self):
"""Test validation when recaptcha is not configured (graceful degradation)."""
app_settings.pop("recaptcha", None)
request = utils.get_mocked_request(headers={VALIDATION_HEADER: "some-token"})
utils.task_vars.request.set(request)

validator = RecaptchaValidator()
result = await validator.validate()
# Should return True when not configured (graceful degradation)
assert result is True

async def test_validate_success(self):
"""Test successful validation with mocked HTTP response."""
app_settings["recaptcha"] = {"private": "test-secret-key"}
request = utils.get_mocked_request(headers={VALIDATION_HEADER: "valid-token"})
utils.task_vars.request.set(request)

# Mock aiohttp response
mock_response = AsyncMock()
mock_response.json = AsyncMock(return_value={"success": True})
mock_response.__aenter__ = AsyncMock(return_value=mock_response)
mock_response.__aexit__ = AsyncMock(return_value=None)

mock_post = AsyncMock(return_value=mock_response)
mock_session = MagicMock()
mock_session.post = mock_post
mock_session.__aenter__ = AsyncMock(return_value=mock_session)
mock_session.__aexit__ = AsyncMock(return_value=None)

with patch("aiohttp.ClientSession", return_value=mock_session):
validator = RecaptchaValidator()
result = await validator.validate()

assert result is True
mock_post.assert_called_once()
call_args = mock_post.call_args
assert call_args[0][0] == RECAPTCHA_VALIDATION_URL
assert call_args[1]["data"]["secret"] == "test-secret-key"
assert call_args[1]["data"]["response"] == "valid-token"

async def test_validate_failure(self):
"""Test failed validation with mocked HTTP response."""
app_settings["recaptcha"] = {"private": "test-secret-key"}
request = utils.get_mocked_request(headers={VALIDATION_HEADER: "invalid-token"})
utils.task_vars.request.set(request)

# Mock aiohttp response
mock_response = AsyncMock()
mock_response.json = AsyncMock(return_value={"success": False})
mock_response.__aenter__ = AsyncMock(return_value=mock_response)
mock_response.__aexit__ = AsyncMock(return_value=None)

mock_post = AsyncMock(return_value=mock_response)
mock_session = MagicMock()
mock_session.post = mock_post
mock_session.__aenter__ = AsyncMock(return_value=mock_session)
mock_session.__aexit__ = AsyncMock(return_value=None)

with patch("aiohttp.ClientSession", return_value=mock_session):
validator = RecaptchaValidator()
result = await validator.validate()

assert result is False
Comment thread
rboixaderg marked this conversation as resolved.
Outdated

Comment thread
rboixaderg marked this conversation as resolved.
Outdated
async def test_validate_error_handling(self):
"""Test validation error handling (JSON decode error, missing success key)."""
app_settings["recaptcha"] = {"private": "test-secret-key"}
request = utils.get_mocked_request(headers={VALIDATION_HEADER: "some-token"})
utils.task_vars.request.set(request)

# Test JSON decode error
mock_response = AsyncMock()
mock_response.json = AsyncMock(side_effect=ValueError("Invalid JSON"))
mock_response.__aenter__ = AsyncMock(return_value=mock_response)
mock_response.__aexit__ = AsyncMock(return_value=None)

mock_post = AsyncMock(return_value=mock_response)
mock_session = MagicMock()
mock_session.post = mock_post
mock_session.__aenter__ = AsyncMock(return_value=mock_session)
mock_session.__aexit__ = AsyncMock(return_value=None)

with patch("aiohttp.ClientSession", return_value=mock_session):
validator = RecaptchaValidator()
result = await validator.validate()
assert result is False

# Test missing success key
mock_response.json = AsyncMock(return_value={})
with patch("aiohttp.ClientSession", return_value=mock_session):
validator = RecaptchaValidator()
result = await validator.validate()
assert result is False
Comment thread
rboixaderg marked this conversation as resolved.
Outdated


class TestRecaptchaUtilityIntegration:
"""Test the utility pattern integration."""

async def test_utility_registered_and_implements_interface(self, guillotina_main):
"""Test that the utility is registered and implements the interface."""
from zope.interface.verify import verifyObject

# guillotina_main fixture sets up the application and registers utilities
assert guillotina_main is not None # Ensure app is initialized

utility = query_utility(IRecaptchaValidationUtility)
assert utility is not None
assert isinstance(utility, RecaptchaValidator)
assert verifyObject(IRecaptchaValidationUtility, utility)


class TestRecaptchaEndpointIntegration:
"""Test endpoints that use reCAPTCHA validation."""

@pytest.mark.app_settings({"_fake_recaptcha_": FAKE_RECAPTCHA})
async def test_endpoint_rejects_invalid_recaptcha(self, container_requester):
"""Test that endpoints reject requests with invalid reCAPTCHA."""
async with container_requester as requester:
# Mock the utility to return False
utility = query_utility(IRecaptchaValidationUtility)
original_validate = utility.validate
utility.validate = AsyncMock(return_value=False)

try:
# Test @info endpoint
_, status = await requester(
"GET",
"/db/guillotina/@info",
authenticated=False,
headers={VALIDATION_HEADER: "invalid-token"},
)
assert status == 401

# Test @users registration endpoint
_, status = await requester(
"POST",
"/db/guillotina/@users",
data=json.dumps(
{
"id": "testuser@example.com",
"email": "testuser@example.com",
"password": "testpassword",
"fullname": "Test User",
}
),
authenticated=False,
headers={VALIDATION_HEADER: "invalid-token"},
)
assert status == 401
finally:
utility.validate = original_validate
Loading