diff --git a/oidc/viewsets.py b/oidc/viewsets.py index eff99dd..35625ba 100644 --- a/oidc/viewsets.py +++ b/oidc/viewsets.py @@ -3,7 +3,9 @@ """ import importlib +import logging import re +import traceback from typing import Optional, Tuple from django.conf import settings @@ -37,6 +39,8 @@ default_config = getattr(default, "OPENID_CONNECT_VIEWSET_CONFIG", {}) SSO_COOKIE_NAME = "SSO" +logger = logging.getLogger(__name__) + class BaseOpenIDConnectViewset(viewsets.ViewSet): """ @@ -201,6 +205,7 @@ def validate_fields(self, data: dict) -> dict: field_validation_regex = self.field_validation_regex[k] regex = re.compile(field_validation_regex.get("regex")) if regex and not regex.search(data[k]): + logger.info(f"Invalid `{k}` value `{data[k]}`") raise ValueError( field_validation_regex.get("help_text") or f"Invalid `{k}` value `{data[k]}`" @@ -240,27 +245,21 @@ def _clean_user_data(self, user_data) -> Tuple[dict, Optional[list]]: ) and "email" in user_data: username = user_data["email"].split("@")[0] if ( - self.user_model.objects.filter(username__iexact=username).count() - == 0 + self.replaceable_username_characters + and self.username_char_replacement ): - username = user_data["email"].split("@")[0] - if ( - self.replaceable_username_characters - and self.username_char_replacement - ): - for char in list(self.replaceable_username_characters): - username = username.replace( - char, self.username_char_replacement - ) + for char in list(self.replaceable_username_characters): + username = username.replace( + char, self.username_char_replacement + ) - # Validate retrieved username matches regex - if ( - "username" in self.field_validation_regex - and username_regex.search(username) - ): - user_data["username"] = username - if "username" in missing_fields: - missing_fields.remove("username") + # Validate retrieved username matches regex + if "username" in self.field_validation_regex and username_regex.search( + username + ): + user_data["username"] = username + if "username" in missing_fields: + missing_fields.remove("username") return user_data, missing_fields @@ -272,6 +271,7 @@ def callback(self, request: HttpRequest, **kwargs: dict) -> HttpResponse: # noq if client: user_data = request.POST.dict() id_token = user_data.get("id_token") + provided_username = user_data.get("username") if not id_token and user_data.get("code"): try: @@ -299,6 +299,8 @@ def callback(self, request: HttpRequest, **kwargs: dict) -> HttpResponse: # noq redirect_after = decoded_token.pop(REDIRECT_AFTER_AUTH) user_data.update(decoded_token) user_data = self.map_claims_to_model_field(user_data) + if provided_username: + user_data.update({"username": provided_username}) filter_kwargs = None if "email" in user_data: @@ -321,11 +323,13 @@ def callback(self, request: HttpRequest, **kwargs: dict) -> HttpResponse: # noq and list(missing_fields)[0] == "username" ): data = {"id_token": id_token} + logger.info("missing_fields: ", missing_fields) return Response( data, template_name="oidc/oidc_user_data_entry.html" ) else: missing_fields = ", ".join(missing_fields) + logger.error(f"missing fields: {missing_fields}") return Response( { "error": _( @@ -343,6 +347,7 @@ def callback(self, request: HttpRequest, **kwargs: dict) -> HttpResponse: # noq "id_token": id_token, "error": f"{field.capitalize()} field is already in use.", } + logger.info(data) return Response( data, template_name="oidc/oidc_user_data_entry.html" ) @@ -356,6 +361,9 @@ def callback(self, request: HttpRequest, **kwargs: dict) -> HttpResponse: # noq user = self.create_login_user(create_data) except ValueError as e: + stack_trace = traceback.format_exc() + logger.info("ValueError") + logger.info(stack_trace) return Response( {"error": str(e), "id_token": id_token}, status=status.HTTP_400_BAD_REQUEST, diff --git a/tests/test_viewsets.py b/tests/test_viewsets.py index c94e1c0..0253f2e 100644 --- a/tests/test_viewsets.py +++ b/tests/test_viewsets.py @@ -35,6 +35,7 @@ "MAP_CLAIM_TO_MODEL": { "given_name": "first_name", "family_name": "last_name", + "preferred_username": "username", "sub": "email", }, "USER_DEFAULTS": { @@ -87,6 +88,85 @@ def test_returns_data_entry_template_on_missing_username_claim(self): self.assertEqual(response.status_code, 200) self.assertEqual(response.template_name, "oidc/oidc_user_data_entry.html") + @override_settings(OPENID_CONNECT_VIEWSET_CONFIG=OPENID_CONNECT_VIEWSET_CONFIG) + def test_create_user_providing_id_token_in_form(self): + """ + Trying to create a user that already exists will ask you to chose a different username + """ + view = UserModelOpenIDConnectViewset.as_view({"post": "callback"}) + with patch( + "oidc.viewsets.OpenIDClient.verify_and_decode_id_token" + ) as mock_func: + mock_func.return_value = { + "email_verified": False, + "name": "Alice User", + "preferred_username": "useralice@gmail.com", + "given_name": "user", + "family_name": "Alice", + "email": "useralice@gmail.com", + } + + data = { + "id_token": "sadsdaio3209lkasdlkas0d.sdojdsiad.iosdadia", + } + request = self.factory.post("/", data=data) + response = view(request, auth_server="default") + self.assertEqual(response.status_code, 302) + user = User.objects.get(username="useralice") + self.assertEqual(user.email, "useralice@gmail.com") + + # when email username is already in use erorr message should reflect that + with patch( + "oidc.viewsets.OpenIDClient.verify_and_decode_id_token" + ) as mock_func: + mock_func.return_value = { + "email_verified": False, + "name": "Alice User", + "preferred_username": "useralice@ona.io", + "given_name": "user", + "family_name": "Alice", + "email": "useralice@ona.io", + } + + data = { + "id_token": "sadsdaio3209lkasdlkas0d.sdojdsiad.iosdadia", + } + request = self.factory.post("/", data=data) + response = view(request, auth_server="default") + + self.assertEqual(user.email, "useralice@gmail.com") + self.assertEqual(response.template_name, "oidc/oidc_user_data_entry.html") + self.assertEqual(response.status_code, 200) + self.assertEqual( + response.data["error"], + "Username field is already in use.", + ) + + # when preferred username is provided, use the preferred username + with patch( + "oidc.viewsets.OpenIDClient.verify_and_decode_id_token" + ) as mock_func: + mock_func.return_value = { + "email_verified": False, + "name": "Alice User", + "preferred_username": "useralice@ona.io", + "given_name": "user", + "family_name": "Alice", + "email": "useralice@ona.io", + } + + data = { + "id_token": "sadsdaio3209lkasdlkas0d.sdojdsiad.iosdadia", + "username": "preferredusername", + } + request = self.factory.post("/", data=data) + response = view(request, auth_server="default") + + self.assertEqual(user.email, "useralice@gmail.com") + self.assertEqual(response.status_code, 302) + user = User.objects.get(username="preferredusername") + self.assertEqual(user.email, "useralice@ona.io") + @override_settings(OPENID_CONNECT_VIEWSET_CONFIG=OPENID_CONNECT_VIEWSET_CONFIG) def test_recreating_already_existing_user(self): """