diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 00d7cccdb..06d512a5f 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -4,6 +4,8 @@ CHANGELOG 7.0.7 (unreleased) ------------------ +- feat: create reCAPTCHA validation utility +[rboixaderg] - Docs: Update documentation and configuration settings [rboixaderg] diff --git a/guillotina/_settings.py b/guillotina/_settings.py index be8b05a5b..b50222248 100644 --- a/guillotina/_settings.py +++ b/guillotina/_settings.py @@ -32,6 +32,11 @@ "factory": "guillotina.async_util.AsyncJobPool", "settings": {"max_size": 5}, }, + "guillotina.recaptcha": { + "provides": "guillotina.interfaces.IRecaptchaValidationUtility", + "factory": "guillotina.auth.recaptcha.RecaptchaValidator", + "settings": {}, + }, }, "store_json": True, "pickle_protocol": pickle.HIGHEST_PROTOCOL, diff --git a/guillotina/api/login.py b/guillotina/api/login.py index f0dbebdef..484c031af 100644 --- a/guillotina/api/login.py +++ b/guillotina/api/login.py @@ -5,7 +5,6 @@ from guillotina import configure from guillotina.api.service import Service from guillotina.auth import authenticate_user -from guillotina.auth.recaptcha import RecaptchaValidator from guillotina.auth.utils import find_user from guillotina.component import get_utility from guillotina.component import query_utility @@ -16,6 +15,7 @@ from guillotina.interfaces import IAuthValidationUtility from guillotina.interfaces import IContainer from guillotina.interfaces import ISessionManagerUtility +from guillotina.interfaces.async_util import IRecaptchaValidationUtility from guillotina.response import HTTPNotAcceptable from guillotina.response import HTTPPreconditionFailed from guillotina.response import HTTPUnauthorized @@ -240,10 +240,11 @@ async def __call__(self): raise HTTPNotAcceptable() else: # We validate with recaptcha - validator = RecaptchaValidator() - status = await validator.validate() - if status is False: - raise HTTPUnauthorized(content={"text": "Invalid validation"}) + recaptcha_validator = query_utility(IRecaptchaValidationUtility) + if recaptcha_validator is not None: + status = await recaptcha_validator.validate() + if status is False: + raise HTTPUnauthorized(content={"text": "Invalid validation"}) # We need to validate is a valid user user = await find_user({"id": user_id}) @@ -338,10 +339,11 @@ async def __call__(self): if allowed is False: raise HTTPUnauthorized(content={"text": "Not allowed registration"}) - validator = RecaptchaValidator() - status = await validator.validate() - if status is False: - raise HTTPUnauthorized(content={"text": "Invalid validation"}) + recaptcha_validator = query_utility(IRecaptchaValidationUtility) + if recaptcha_validator is not None: + status = await recaptcha_validator.validate() + if status is False: + raise HTTPUnauthorized(content={"text": "Invalid validation"}) payload = await self.request.json() @@ -398,10 +400,11 @@ async def __call__(self): ) class InfoAccess(Service): async def __call__(self): - validator = RecaptchaValidator() - status = await validator.validate() - if status is False: - raise HTTPUnauthorized(content={"text": "Invalid validation"}) + recaptcha_validator = query_utility(IRecaptchaValidationUtility) + if recaptcha_validator is not None: + status = await recaptcha_validator.validate() + if status is False: + raise HTTPUnauthorized(content={"text": "Invalid validation"}) auth_providers = app_settings.get("auth_providers", {}) providers = [] diff --git a/guillotina/auth/__init__.py b/guillotina/auth/__init__.py index ff68e1a12..e1c790ef6 100644 --- a/guillotina/auth/__init__.py +++ b/guillotina/auth/__init__.py @@ -1,5 +1,4 @@ from .groups import GroupsUtility # noqa -from .recaptcha import RecaptchaValidator # noqa from .utils import authenticate_request # noqa from .utils import authenticate_user # noqa from .utils import find_user # noqa diff --git a/guillotina/auth/recaptcha.py b/guillotina/auth/recaptcha.py index 0b76e7726..8c44c2345 100644 --- a/guillotina/auth/recaptcha.py +++ b/guillotina/auth/recaptcha.py @@ -1,5 +1,7 @@ from guillotina import app_settings +from guillotina.interfaces.async_util import IRecaptchaValidationUtility from guillotina.utils import get_current_request +from zope.interface import implementer import logging @@ -10,10 +12,19 @@ VALIDATION_HEADER = "X-VALIDATION-G" +@implementer(IRecaptchaValidationUtility) class RecaptchaValidator: # Not valid to generate a user for_validators = () + async def initialize(self, app=None): + """Initialize the utility (no-op for reCAPTCHA).""" + pass + + async def finalize(self, app=None): + """Finalize the utility (no-op for reCAPTCHA).""" + pass + async def validate(self): request = get_current_request() token = request.headers.get(VALIDATION_HEADER) @@ -35,7 +46,7 @@ async def validate(self): data = await resp.json() except Exception: # pragma: no cover logger.warning("Did not get json response", exc_info=True) - return + return False try: return data["success"] except Exception: # pragma: no cover diff --git a/guillotina/interfaces/__init__.py b/guillotina/interfaces/__init__.py index ca32a8fdf..0bbf2b948 100644 --- a/guillotina/interfaces/__init__.py +++ b/guillotina/interfaces/__init__.py @@ -5,6 +5,7 @@ from .async_util import ICacheUtility # noqa from .async_util import IPubSubUtility # noqa from .async_util import IQueueUtility # noqa +from .async_util import IRecaptchaValidationUtility # noqa from .async_util import ISessionManagerUtility # noqa from .behaviors import IAsyncBehavior # noqa from .behaviors import IBehavior # noqa diff --git a/guillotina/interfaces/async_util.py b/guillotina/interfaces/async_util.py index 1043c3510..4dcb09ed9 100644 --- a/guillotina/interfaces/async_util.py +++ b/guillotina/interfaces/async_util.py @@ -74,3 +74,8 @@ async def refresh_session(ident: str, session: str) -> str: """ Refresh an actual session """ + + +class IRecaptchaValidationUtility(IAsyncUtility): + async def validate() -> bool: + pass diff --git a/guillotina/tests/test_recaptcha.py b/guillotina/tests/test_recaptcha.py new file mode 100644 index 000000000..8ea25830b --- /dev/null +++ b/guillotina/tests/test_recaptcha.py @@ -0,0 +1,225 @@ +from guillotina import app_settings +from guillotina.auth.recaptcha import RECAPTCHA_VALIDATION_URL +from guillotina.auth.recaptcha import RecaptchaValidator +from guillotina.auth.recaptcha import VALIDATION_HEADER +from guillotina.component import query_utility +from guillotina.interfaces.async_util import IRecaptchaValidationUtility +from guillotina.tests import utils +from unittest.mock import AsyncMock +from unittest.mock import MagicMock +from unittest.mock import patch + +import json +import pytest + + +pytestmark = pytest.mark.asyncio + +FAKE_RECAPTCHA = "FAKE_RECAPTCHA" + + +class TestRecaptchaValidator: + """Test the RecaptchaValidator utility class directly.""" + + async def test_initialize_and_finalize(self): + """Test that initialize and finalize methods exist and work.""" + validator = RecaptchaValidator() + await validator.initialize() + await validator.finalize() + + async def test_validate_with_fake_recaptcha(self): + """Test validation with fake recaptcha token.""" + original_value = app_settings.get("_fake_recaptcha_") + try: + app_settings["_fake_recaptcha_"] = FAKE_RECAPTCHA + request = utils.get_mocked_request(headers={VALIDATION_HEADER: FAKE_RECAPTCHA}) + utils.task_vars.request.set(request) + + validator = RecaptchaValidator() + result = await validator.validate() + assert result is True + finally: + if original_value is not None: + app_settings["_fake_recaptcha_"] = original_value + else: + app_settings.pop("_fake_recaptcha_", None) + + async def test_validate_without_configuration(self): + """Test validation when recaptcha is not configured (graceful degradation).""" + original_value = app_settings.get("recaptcha") + try: + app_settings.pop("recaptcha", None) + request = utils.get_mocked_request(headers={VALIDATION_HEADER: "some-token"}) + utils.task_vars.request.set(request) + + validator = RecaptchaValidator() + result = await validator.validate() + # Should return True when not configured (graceful degradation) + assert result is True + finally: + if original_value is not None: + app_settings["recaptcha"] = original_value + else: + app_settings.pop("recaptcha", None) + + async def test_validate_success(self): + """Test successful validation with mocked HTTP response.""" + original_value = app_settings.get("recaptcha") + try: + app_settings["recaptcha"] = {"private": "test-secret-key"} + request = utils.get_mocked_request(headers={VALIDATION_HEADER: "valid-token"}) + utils.task_vars.request.set(request) + + # Mock aiohttp response + mock_response = AsyncMock() + mock_response.json = AsyncMock(return_value={"success": True}) + mock_response.__aenter__ = AsyncMock(return_value=mock_response) + mock_response.__aexit__ = AsyncMock(return_value=None) + + mock_post = AsyncMock(return_value=mock_response) + mock_session = MagicMock() + mock_session.post = mock_post + mock_session.__aenter__ = AsyncMock(return_value=mock_session) + mock_session.__aexit__ = AsyncMock(return_value=None) + + with patch("aiohttp.ClientSession", return_value=mock_session): + validator = RecaptchaValidator() + result = await validator.validate() + + assert result is True + mock_post.assert_called_once() + call_args = mock_post.call_args + assert call_args[0][0] == RECAPTCHA_VALIDATION_URL + assert call_args[1]["data"]["secret"] == "test-secret-key" + assert call_args[1]["data"]["response"] == "valid-token" + finally: + if original_value is not None: + app_settings["recaptcha"] = original_value + else: + app_settings.pop("recaptcha", None) + + async def test_validate_failure(self): + """Test failed validation with mocked HTTP response.""" + original_value = app_settings.get("recaptcha") + try: + app_settings["recaptcha"] = {"private": "test-secret-key"} + request = utils.get_mocked_request(headers={VALIDATION_HEADER: "invalid-token"}) + utils.task_vars.request.set(request) + + # Mock aiohttp response + mock_response = AsyncMock() + mock_response.json = AsyncMock(return_value={"success": False}) + mock_response.__aenter__ = AsyncMock(return_value=mock_response) + mock_response.__aexit__ = AsyncMock(return_value=None) + + mock_post = AsyncMock(return_value=mock_response) + mock_session = MagicMock() + mock_session.post = mock_post + mock_session.__aenter__ = AsyncMock(return_value=mock_session) + mock_session.__aexit__ = AsyncMock(return_value=None) + + with patch("aiohttp.ClientSession", return_value=mock_session): + validator = RecaptchaValidator() + result = await validator.validate() + + assert result is False + finally: + if original_value is not None: + app_settings["recaptcha"] = original_value + else: + app_settings.pop("recaptcha", None) + + async def test_validate_error_handling(self): + """Test validation error handling (JSON decode error, missing success key).""" + original_value = app_settings.get("recaptcha") + try: + app_settings["recaptcha"] = {"private": "test-secret-key"} + request = utils.get_mocked_request(headers={VALIDATION_HEADER: "some-token"}) + utils.task_vars.request.set(request) + + # Test JSON decode error + mock_response = AsyncMock() + mock_response.json = AsyncMock(side_effect=ValueError("Invalid JSON")) + mock_response.__aenter__ = AsyncMock(return_value=mock_response) + mock_response.__aexit__ = AsyncMock(return_value=None) + + mock_post = AsyncMock(return_value=mock_response) + mock_session = MagicMock() + mock_session.post = mock_post + mock_session.__aenter__ = AsyncMock(return_value=mock_session) + mock_session.__aexit__ = AsyncMock(return_value=None) + + with patch("aiohttp.ClientSession", return_value=mock_session): + validator = RecaptchaValidator() + result = await validator.validate() + assert result is False + + # Test missing success key + mock_response.json = AsyncMock(return_value={}) + with patch("aiohttp.ClientSession", return_value=mock_session): + validator = RecaptchaValidator() + result = await validator.validate() + assert result is False + finally: + if original_value is not None: + app_settings["recaptcha"] = original_value + else: + app_settings.pop("recaptcha", None) + + +class TestRecaptchaUtilityIntegration: + """Test the utility pattern integration.""" + + async def test_utility_registered_and_implements_interface(self, guillotina_main): + """Test that the utility is registered and implements the interface.""" + from zope.interface.verify import verifyObject + + # guillotina_main fixture sets up the application and registers utilities + assert guillotina_main is not None # Ensure app is initialized + + utility = query_utility(IRecaptchaValidationUtility) + assert utility is not None + assert isinstance(utility, RecaptchaValidator) + assert verifyObject(IRecaptchaValidationUtility, utility) + + +class TestRecaptchaEndpointIntegration: + """Test endpoints that use reCAPTCHA validation.""" + + @pytest.mark.app_settings({"_fake_recaptcha_": FAKE_RECAPTCHA}) + async def test_endpoint_rejects_invalid_recaptcha(self, container_requester): + """Test that endpoints reject requests with invalid reCAPTCHA.""" + async with container_requester as requester: + # Mock the utility to return False + utility = query_utility(IRecaptchaValidationUtility) + original_validate = utility.validate + utility.validate = AsyncMock(return_value=False) + + try: + # Test @info endpoint + _, status = await requester( + "GET", + "/db/guillotina/@info", + authenticated=False, + headers={VALIDATION_HEADER: "invalid-token"}, + ) + assert status == 401 + + # Test @users registration endpoint + _, status = await requester( + "POST", + "/db/guillotina/@users", + data=json.dumps( + { + "id": "testuser@example.com", + "email": "testuser@example.com", + "password": "testpassword", + "fullname": "Test User", + } + ), + authenticated=False, + headers={VALIDATION_HEADER: "invalid-token"}, + ) + assert status == 401 + finally: + utility.validate = original_validate