Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion backend/requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ cookiecutter==2.3.0
coverage==7.6.1
crispy-bootstrap5==0.7
cryptography==44.0.3
django==4.2.15
django==4.2.16
django-axes==6.4.0
django-cachalot==2.6.1
django-celery-beat==2.5.0
Expand All @@ -28,6 +28,7 @@ django-smtp-ssl==1.0
django-storages==1.14
django-tenants==3.6.1
djangorestframework==3.15.2
fido2==2.0.0
flake8==6.1.0
flower==2.0.1
GitPython==3.1.43
Expand Down
Empty file.
Empty file.
8 changes: 8 additions & 0 deletions backend/src/zango/api/app_auth/config/v1/urls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from django.urls import path

from .views import AppAuthConfigViewAPIV1


urlpatterns = [
path("", AppAuthConfigViewAPIV1.as_view(), name="app-auth-config"),
]
16 changes: 16 additions & 0 deletions backend/src/zango/api/app_auth/config/v1/views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from rest_framework.views import APIView

from django.utils.decorators import method_decorator
from django.views.decorators.csrf import ensure_csrf_cookie

from zango.core.api import get_api_response
from zango.core.utils import get_auth_priority


@method_decorator(ensure_csrf_cookie, name="dispatch")
class AppAuthConfigViewAPIV1(APIView):
def get(self, request):
response = {"auth_config": get_auth_priority(request=request)}
status = 200
success = True
return get_api_response(success, response, status)
Empty file.
Empty file.
35 changes: 35 additions & 0 deletions backend/src/zango/api/app_auth/flows/v1/forms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from allauth.headless.internal.restkit import inputs

from django import forms
from django.utils.translation import gettext_lazy as _

from zango.apps.appauth.models import OldPasswords


class BaseSetPasswordForm(forms.Form):
new_password = forms.CharField(
label=_("New password"),
widget=forms.PasswordInput(
attrs={
"class": "form-control",
"placeholder": "Enter your new password",
"autocomplete": "new-password",
}
),
strip=False,
)

def __init__(self, *args, **kwargs):
self.user = kwargs.pop("user")
super().__init__(*args, **kwargs)

def save(self):
self.user.set_password(self.cleaned_data["new_password"])
self.user.save()
obj = OldPasswords.objects.create(user=self.user)
obj.setPasswords(self.user.password)
obj.save()


class PasswordSetForm(BaseSetPasswordForm, inputs.Input):
pass
60 changes: 60 additions & 0 deletions backend/src/zango/api/app_auth/flows/v1/login.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import json

from allauth.account.stages import LoginStageController, RoleSelectionStage
from allauth.headless.account.views import LoginView
from allauth.headless.base import response
from allauth.headless.base.views import APIView

from zango.apps.appauth.models import UserRoleModel
from zango.core.api import get_api_response


class AppLoginViewAPIV1(LoginView):
def post(self, request, *args, **kwargs):
resp = super().post(request, *args, **kwargs)
return get_api_response(
success=True,
response_content=json.loads(resp.content.decode("utf-8")),
status=resp.status_code,
)


class UserRoleViewAPIV1(APIView):
stage_class = RoleSelectionStage

def handle(self, request, *args, **kwargs):
self.stage = LoginStageController.enter(request, self.stage_class.key)
if not self.stage:
return response.UnauthorizedResponse(request)
return super().handle(request, *args, **kwargs)

def respond_stage_error(self):
return response.UnauthorizedResponse(self.request)

def respond_next_stage(self):
self.stage.exit()
return response.AuthenticationResponse(self.request)

def post(self, request, *args, **kwargs):
role = request.GET.get("role")
if role is None:
return get_api_response(
success=False,
response_content={"message": "user role not specified"},
status=400,
)
try:
UserRoleModel.objects.get(id=role)
except UserRoleModel.DoesNotExist:
return get_api_response(
success=False,
response_content={"message": "specified user role does not exist"},
status=400,
)
request.session["role_id"] = role
response = self.respond_next_stage()
return get_api_response(
success=True,
response_content=json.loads(response.content.decode("utf-8")),
status=response.status_code,
)
25 changes: 25 additions & 0 deletions backend/src/zango/api/app_auth/flows/v1/login_with_code.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import json

from allauth.headless.account.views import ConfirmLoginCodeView, RequestLoginCodeView

from zango.core.api import get_api_response


class RequestLoginCodeViewAPIV1(RequestLoginCodeView):
def post(self, request, *args, **kwargs):
resp = super().post(request, *args, **kwargs)
return get_api_response(
success=True,
response_content=json.loads(resp.content.decode("utf-8")),
status=resp.status_code,
)


class ConfirmLoginCodeViewAPIV1(ConfirmLoginCodeView):
def post(self, request, *args, **kwargs):
resp = super().post(request, *args, **kwargs)
return get_api_response(
success=True,
response_content=json.loads(resp.content.decode("utf-8")),
status=resp.status_code,
)
15 changes: 15 additions & 0 deletions backend/src/zango/api/app_auth/flows/v1/logout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import json

from allauth.headless.account.views import SessionView

from zango.core.api import get_api_response


class AppLogoutViewAPIV1(SessionView):
def delete(self, request, *args, **kwargs):
resp = super().delete(request, *args, **kwargs)
return get_api_response(
success=True,
response_content=json.loads(resp.content.decode("utf-8")),
status=resp.status_code,
)
127 changes: 127 additions & 0 deletions backend/src/zango/api/app_auth/flows/v1/mfa.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import json

from allauth.headless.mfa.views import AuthenticateView
from rest_framework.views import APIView

from zango.apps.appauth.tasks import send_otp
from zango.core.api import get_api_response
from zango.core.utils import get_auth_priority, mask_email, mask_phone_number


class GetMFACodeViewAPIV1(APIView):
def get_user(self, username):
from django.db.models import Q

from zango.apps.appauth.models import AppUserModel

try:
return AppUserModel.objects.get(Q(email=username) | Q(mobile=username))
except AppUserModel.DoesNotExist:
return None

def get(self, request, *args, **kwargs):
policy = get_auth_priority(policy="two_factor_auth", request=request)
if not policy.get("required"):
return get_api_response(
success=False,
response_content={"message": "MFA not required"},
status=400,
)
else:
allowed_methods = policy.get("allowed_methods", [])
if len(allowed_methods) == 0:
return get_api_response(
success=False,
response_content={"message": "No MFA methods configured"},
status=400,
)

if len(request.session.get("account_authentication_methods", [])) > 0:
latest_auth_method = request.session["account_authentication_methods"][
0
]
preferred_method = None

request_data = {
"path": request.path,
"params": request.GET,
}

user = None
if latest_auth_method.get("email"):
user = self.get_user(latest_auth_method.get("email"))
if user is None:
return get_api_response(
success=False,
response_content={"message": "User not found"},
status=400,
)
preferred_method = "sms"
if preferred_method not in allowed_methods:
return get_api_response(
success=False,
response_content={"message": "SMS MFA method not allowed"},
status=400,
)
send_otp.delay(
method=preferred_method,
otp_type="two_factor_auth",
user_id=user.id,
tenant_id=request.tenant.id,
request_data=request_data,
user_role_id=request.session.get("role_id"),
message="Your 2FA code is",
)
else:
user = self.get_user(latest_auth_method.get("phone"))
if user is None:
return get_api_response(
success=False,
response_content={"message": "User not found"},
status=400,
)
preferred_method = "email"
if preferred_method not in allowed_methods:
return get_api_response(
success=False,
response_content={
"message": "Email MFA method not allowed"
},
status=400,
)
send_otp.delay(
method=preferred_method,
otp_type="two_factor_auth",
user_id=user.id,
tenant_id=request.tenant.id,
request_data=request_data,
user_role_id=request.session.get("role_id"),
message="Your 2FA code is",
subject="2FA Verification Code",
)
return get_api_response(
success=True,
response_content={
"message": f"Verification code sent to {preferred_method}",
"masked_destination": mask_email(user.email)
if preferred_method == "email"
else mask_phone_number(str(user.mobile)),
},
status=200,
)
else:
return get_api_response(
success=False,
response_content={"message": "User not authenticated"},
status=400,
)


class MFAVerifyViewAPIV1(AuthenticateView):
def post(self, request, *args, **kwargs):
resp = super().post(request, *args, **kwargs)
return get_api_response(
success=True,
response_content=json.loads(resp.content.decode("utf-8")),
status=resp.status_code,
)
Loading
Loading