Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ CHANGELOG
7.0.7 (unreleased)
------------------

- feat: create reCAPTCHA validation utility
Comment thread
rboixaderg marked this conversation as resolved.
[rboixaderg]
Comment thread
rboixaderg marked this conversation as resolved.
- Docs: Update documentation and configuration settings
[rboixaderg]

Expand Down
5 changes: 5 additions & 0 deletions guillotina/_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
"factory": "guillotina.async_util.AsyncJobPool",
"settings": {"max_size": 5},
},
"guillotina.recaptcha": {
"provides": "guillotina.interfaces.IRecaptchaValidationUtility",
"factory": "guillotina.auth.recaptcha.RecaptchaValidator",
"settings": {},
},
},
"store_json": True,
"pickle_protocol": pickle.HIGHEST_PROTOCOL,
Expand Down
29 changes: 16 additions & 13 deletions guillotina/api/login.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from guillotina import configure
from guillotina.api.service import Service
from guillotina.auth import authenticate_user
from guillotina.auth.recaptcha import RecaptchaValidator
from guillotina.auth.utils import find_user
from guillotina.component import get_utility
from guillotina.component import query_utility
Expand All @@ -16,6 +15,7 @@
from guillotina.interfaces import IAuthValidationUtility
from guillotina.interfaces import IContainer
from guillotina.interfaces import ISessionManagerUtility
from guillotina.interfaces.async_util import IRecaptchaValidationUtility
from guillotina.response import HTTPNotAcceptable
from guillotina.response import HTTPPreconditionFailed
from guillotina.response import HTTPUnauthorized
Expand Down Expand Up @@ -240,10 +240,11 @@ async def __call__(self):
raise HTTPNotAcceptable()
else:
# We validate with recaptcha
validator = RecaptchaValidator()
status = await validator.validate()
if status is False:
raise HTTPUnauthorized(content={"text": "Invalid validation"})
recaptcha_validator = query_utility(IRecaptchaValidationUtility)
if recaptcha_validator is not None:
status = await recaptcha_validator.validate()
if status is False:
raise HTTPUnauthorized(content={"text": "Invalid validation"})

# We need to validate is a valid user
user = await find_user({"id": user_id})
Expand Down Expand Up @@ -338,10 +339,11 @@ async def __call__(self):
if allowed is False:
raise HTTPUnauthorized(content={"text": "Not allowed registration"})

validator = RecaptchaValidator()
status = await validator.validate()
if status is False:
raise HTTPUnauthorized(content={"text": "Invalid validation"})
recaptcha_validator = query_utility(IRecaptchaValidationUtility)
if recaptcha_validator is not None:
status = await recaptcha_validator.validate()
if status is False:
raise HTTPUnauthorized(content={"text": "Invalid validation"})

payload = await self.request.json()

Expand Down Expand Up @@ -398,10 +400,11 @@ async def __call__(self):
)
class InfoAccess(Service):
async def __call__(self):
validator = RecaptchaValidator()
status = await validator.validate()
if status is False:
raise HTTPUnauthorized(content={"text": "Invalid validation"})
recaptcha_validator = query_utility(IRecaptchaValidationUtility)
if recaptcha_validator is not None:
status = await recaptcha_validator.validate()
if status is False:
raise HTTPUnauthorized(content={"text": "Invalid validation"})

auth_providers = app_settings.get("auth_providers", {})
providers = []
Expand Down
1 change: 0 additions & 1 deletion guillotina/auth/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from .groups import GroupsUtility # noqa
from .recaptcha import RecaptchaValidator # noqa
from .utils import authenticate_request # noqa
from .utils import authenticate_user # noqa
from .utils import find_user # noqa
Expand Down
13 changes: 12 additions & 1 deletion guillotina/auth/recaptcha.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from guillotina import app_settings
from guillotina.interfaces.async_util import IRecaptchaValidationUtility
from guillotina.utils import get_current_request
from zope.interface import implementer

import logging

Expand All @@ -10,10 +12,19 @@
VALIDATION_HEADER = "X-VALIDATION-G"


@implementer(IRecaptchaValidationUtility)
class RecaptchaValidator:
# Not valid to generate a user
for_validators = ()

async def initialize(self, app=None):
"""Initialize the utility (no-op for reCAPTCHA)."""
pass

async def finalize(self, app=None):
"""Finalize the utility (no-op for reCAPTCHA)."""
pass

async def validate(self):
request = get_current_request()
token = request.headers.get(VALIDATION_HEADER)
Expand All @@ -35,7 +46,7 @@ async def validate(self):
data = await resp.json()
except Exception: # pragma: no cover
logger.warning("Did not get json response", exc_info=True)
return
return False
try:
return data["success"]
except Exception: # pragma: no cover
Expand Down
1 change: 1 addition & 0 deletions guillotina/interfaces/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .async_util import ICacheUtility # noqa
from .async_util import IPubSubUtility # noqa
from .async_util import IQueueUtility # noqa
from .async_util import IRecaptchaValidationUtility # noqa
from .async_util import ISessionManagerUtility # noqa
from .behaviors import IAsyncBehavior # noqa
from .behaviors import IBehavior # noqa
Expand Down
5 changes: 5 additions & 0 deletions guillotina/interfaces/async_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,8 @@ async def refresh_session(ident: str, session: str) -> str:
"""
Refresh an actual session
"""


class IRecaptchaValidationUtility(IAsyncUtility):
Comment thread
rboixaderg marked this conversation as resolved.
async def validate() -> bool:
Comment thread
rboixaderg marked this conversation as resolved.
pass
225 changes: 225 additions & 0 deletions guillotina/tests/test_recaptcha.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
from guillotina import app_settings
from guillotina.auth.recaptcha import RECAPTCHA_VALIDATION_URL
from guillotina.auth.recaptcha import RecaptchaValidator
from guillotina.auth.recaptcha import VALIDATION_HEADER
from guillotina.component import query_utility
from guillotina.interfaces.async_util import IRecaptchaValidationUtility
from guillotina.tests import utils
from unittest.mock import AsyncMock
from unittest.mock import MagicMock
from unittest.mock import patch

import json
import pytest


pytestmark = pytest.mark.asyncio

FAKE_RECAPTCHA = "FAKE_RECAPTCHA"


class TestRecaptchaValidator:
"""Test the RecaptchaValidator utility class directly."""

async def test_initialize_and_finalize(self):
"""Test that initialize and finalize methods exist and work."""
validator = RecaptchaValidator()
await validator.initialize()
await validator.finalize()

async def test_validate_with_fake_recaptcha(self):
"""Test validation with fake recaptcha token."""
original_value = app_settings.get("_fake_recaptcha_")
try:
app_settings["_fake_recaptcha_"] = FAKE_RECAPTCHA
request = utils.get_mocked_request(headers={VALIDATION_HEADER: FAKE_RECAPTCHA})
utils.task_vars.request.set(request)

validator = RecaptchaValidator()
result = await validator.validate()
assert result is True
finally:
if original_value is not None:
app_settings["_fake_recaptcha_"] = original_value
else:
app_settings.pop("_fake_recaptcha_", None)

async def test_validate_without_configuration(self):
"""Test validation when recaptcha is not configured (graceful degradation)."""
original_value = app_settings.get("recaptcha")
try:
app_settings.pop("recaptcha", None)
request = utils.get_mocked_request(headers={VALIDATION_HEADER: "some-token"})
utils.task_vars.request.set(request)

validator = RecaptchaValidator()
result = await validator.validate()
# Should return True when not configured (graceful degradation)
assert result is True
finally:
if original_value is not None:
app_settings["recaptcha"] = original_value
else:
app_settings.pop("recaptcha", None)

async def test_validate_success(self):
"""Test successful validation with mocked HTTP response."""
original_value = app_settings.get("recaptcha")
try:
app_settings["recaptcha"] = {"private": "test-secret-key"}
request = utils.get_mocked_request(headers={VALIDATION_HEADER: "valid-token"})
utils.task_vars.request.set(request)

# Mock aiohttp response
mock_response = AsyncMock()
mock_response.json = AsyncMock(return_value={"success": True})
mock_response.__aenter__ = AsyncMock(return_value=mock_response)
mock_response.__aexit__ = AsyncMock(return_value=None)

mock_post = AsyncMock(return_value=mock_response)
mock_session = MagicMock()
mock_session.post = mock_post
mock_session.__aenter__ = AsyncMock(return_value=mock_session)
mock_session.__aexit__ = AsyncMock(return_value=None)

with patch("aiohttp.ClientSession", return_value=mock_session):
validator = RecaptchaValidator()
result = await validator.validate()

assert result is True
mock_post.assert_called_once()
call_args = mock_post.call_args
assert call_args[0][0] == RECAPTCHA_VALIDATION_URL
assert call_args[1]["data"]["secret"] == "test-secret-key"
assert call_args[1]["data"]["response"] == "valid-token"
finally:
if original_value is not None:
app_settings["recaptcha"] = original_value
else:
app_settings.pop("recaptcha", None)

async def test_validate_failure(self):
"""Test failed validation with mocked HTTP response."""
original_value = app_settings.get("recaptcha")
try:
app_settings["recaptcha"] = {"private": "test-secret-key"}
request = utils.get_mocked_request(headers={VALIDATION_HEADER: "invalid-token"})
utils.task_vars.request.set(request)

# Mock aiohttp response
mock_response = AsyncMock()
mock_response.json = AsyncMock(return_value={"success": False})
mock_response.__aenter__ = AsyncMock(return_value=mock_response)
mock_response.__aexit__ = AsyncMock(return_value=None)

mock_post = AsyncMock(return_value=mock_response)
mock_session = MagicMock()
mock_session.post = mock_post
mock_session.__aenter__ = AsyncMock(return_value=mock_session)
mock_session.__aexit__ = AsyncMock(return_value=None)

with patch("aiohttp.ClientSession", return_value=mock_session):
validator = RecaptchaValidator()
result = await validator.validate()

assert result is False
finally:
if original_value is not None:
app_settings["recaptcha"] = original_value
else:
app_settings.pop("recaptcha", None)

async def test_validate_error_handling(self):
"""Test validation error handling (JSON decode error, missing success key)."""
original_value = app_settings.get("recaptcha")
try:
app_settings["recaptcha"] = {"private": "test-secret-key"}
request = utils.get_mocked_request(headers={VALIDATION_HEADER: "some-token"})
utils.task_vars.request.set(request)

# Test JSON decode error
mock_response = AsyncMock()
mock_response.json = AsyncMock(side_effect=ValueError("Invalid JSON"))
mock_response.__aenter__ = AsyncMock(return_value=mock_response)
mock_response.__aexit__ = AsyncMock(return_value=None)

mock_post = AsyncMock(return_value=mock_response)
mock_session = MagicMock()
mock_session.post = mock_post
mock_session.__aenter__ = AsyncMock(return_value=mock_session)
mock_session.__aexit__ = AsyncMock(return_value=None)

with patch("aiohttp.ClientSession", return_value=mock_session):
validator = RecaptchaValidator()
result = await validator.validate()
assert result is False

# Test missing success key
mock_response.json = AsyncMock(return_value={})
with patch("aiohttp.ClientSession", return_value=mock_session):
validator = RecaptchaValidator()
result = await validator.validate()
assert result is False
finally:
if original_value is not None:
app_settings["recaptcha"] = original_value
else:
app_settings.pop("recaptcha", None)


class TestRecaptchaUtilityIntegration:
"""Test the utility pattern integration."""

async def test_utility_registered_and_implements_interface(self, guillotina_main):
"""Test that the utility is registered and implements the interface."""
from zope.interface.verify import verifyObject

# guillotina_main fixture sets up the application and registers utilities
assert guillotina_main is not None # Ensure app is initialized

utility = query_utility(IRecaptchaValidationUtility)
assert utility is not None
assert isinstance(utility, RecaptchaValidator)
assert verifyObject(IRecaptchaValidationUtility, utility)


class TestRecaptchaEndpointIntegration:
"""Test endpoints that use reCAPTCHA validation."""

@pytest.mark.app_settings({"_fake_recaptcha_": FAKE_RECAPTCHA})
async def test_endpoint_rejects_invalid_recaptcha(self, container_requester):
"""Test that endpoints reject requests with invalid reCAPTCHA."""
async with container_requester as requester:
# Mock the utility to return False
utility = query_utility(IRecaptchaValidationUtility)
original_validate = utility.validate
utility.validate = AsyncMock(return_value=False)

try:
# Test @info endpoint
_, status = await requester(
"GET",
"/db/guillotina/@info",
authenticated=False,
headers={VALIDATION_HEADER: "invalid-token"},
)
assert status == 401

# Test @users registration endpoint
_, status = await requester(
"POST",
"/db/guillotina/@users",
data=json.dumps(
{
"id": "[email protected]",
"email": "[email protected]",
"password": "testpassword",
"fullname": "Test User",
}
),
authenticated=False,
headers={VALIDATION_HEADER: "invalid-token"},
)
assert status == 401
finally:
utility.validate = original_validate