Skip to content

Commit

Permalink
update saml login (#7366)
Browse files Browse the repository at this point in the history
fix login failed as soon as license limit is reached
  • Loading branch information
imwhatiam authored Jan 15, 2025
1 parent b3da26a commit 7225eb8
Showing 1 changed file with 58 additions and 41 deletions.
99 changes: 58 additions & 41 deletions seahub/adfs_auth/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from seahub.base.accounts import User
from seahub.auth.models import SocialAuthUser
from seahub.profile.models import Profile, DetailedProfile
from seahub.utils import render_error
from seahub.utils.licenseparse import user_number_over_limit
from seahub.utils.file_size import get_quota_from_string
from seahub.role_permissions.utils import get_enabled_role_permissions_by_role
Expand Down Expand Up @@ -184,13 +185,18 @@ def assertion_consumer_service(request, org_id=None, attribute_mapping=None, cre
settings.py. The `djangosaml2.backends.Saml2Backend` can be used for this purpose,
though some implementations may instead register their own subclasses of Saml2Backend.
"""

internal_server_error_msg = _('Internal server error. Please contact system administrator.')
login_failed_error_msg = _('Login failed: Bad response from ADFS/SAML service. '
'Please report to your organization (company) administrator.')

org = None
if org_id and int(org_id) > 0:
org_id = int(org_id)
org = ccnet_api.get_org_by_id(org_id)
if not org:
logger.error('Cannot find an organization related to org_id %s.' % org_id)
return HttpResponseBadRequest(_('Internal server error. Please contact system administrator.'))
return render_error(request, internal_server_error_msg)
else:
org_id = -1

Expand All @@ -206,8 +212,7 @@ def assertion_consumer_service(request, org_id=None, attribute_mapping=None, cre
admins = User.objects.get_superusers()
for admin in admins:
saml_sso_failed.send(sender=None, to_user=admin.email, error_msg=error_msg)
return HttpResponseBadRequest(_('Login failed: Bad response from ADFS/SAML service. '
'Please report to your organization (company) administrator.'))
return render_error(request, login_failed_error_msg)

try:
conf = get_config(None, request)
Expand All @@ -223,11 +228,10 @@ def assertion_consumer_service(request, org_id=None, attribute_mapping=None, cre
admins = User.objects.get_superusers()
for admin in admins:
saml_sso_failed.send(sender=None, to_user=admin.email, error_msg=error_msg)
return HttpResponseBadRequest(_('Login failed: ADFS/SAML service error. '
'Please report to your organization (company) administrator.'))
return render_error(request, login_failed_error_msg)
except Exception as e:
logger.error(e)
return HttpResponseBadRequest(_('Internal server error. Please contact system administrator.'))
return render_error(request, internal_server_error_msg)

identity_cache = IdentityCache(request.saml_session)
client = Saml2Client(conf, identity_cache=identity_cache)
Expand All @@ -237,7 +241,9 @@ def assertion_consumer_service(request, org_id=None, attribute_mapping=None, cre

xmlstr = request.POST['SAMLResponse']
try:
response = client.parse_authn_request_response(xmlstr, BINDING_HTTP_POST, outstanding_queries)
response = client.parse_authn_request_response(xmlstr,
BINDING_HTTP_POST,
outstanding_queries)
except Exception as e:
logger.error('SAMLResponse Error: %s' % e)
# send error msg to admin
Expand All @@ -250,8 +256,8 @@ def assertion_consumer_service(request, org_id=None, attribute_mapping=None, cre
admins = User.objects.get_superusers()
for admin in admins:
saml_sso_failed.send(sender=None, to_user=admin.email, error_msg=error_msg)
return HttpResponseBadRequest(_('Login failed: Bad response from ADFS/SAML service. '
'Please report to your organization (company) administrator.'))
return render_error(request, login_failed_error_msg)

if response is None:
logger.error('Invalid SAML Assertion received.')
# send error msg to admin
Expand All @@ -264,8 +270,7 @@ def assertion_consumer_service(request, org_id=None, attribute_mapping=None, cre
admins = User.objects.get_superusers()
for admin in admins:
saml_sso_failed.send(sender=None, to_user=admin.email, error_msg=error_msg)
return HttpResponseBadRequest(_('Login failed: Bad response from ADFS/SAML service. '
'Please report to your organization (company) administrator.'))
return render_error(request, login_failed_error_msg)

session_id = response.session_id()
oq_cache.delete(session_id)
Expand All @@ -283,13 +288,13 @@ def assertion_consumer_service(request, org_id=None, attribute_mapping=None, cre
name_id = session_info.get('name_id', '')
if not name_id:
logger.error('The name_id is not available. Could not determine user identifier.')
return HttpResponseBadRequest(_('Internal server error. Please contact system administrator.'))
return render_error(request, internal_server_error_msg)

name_id = name_id.text
saml_user = SocialAuthUser.objects.get_by_provider_and_uid(SAML_PROVIDER_IDENTIFIER, name_id)
if saml_user:
logger.error('The SAML user has already been bound to another account.')
return HttpResponseBadRequest(_('Internal server error. Please contact system administrator.'))
return render_error(request, internal_server_error_msg)

# bind saml user
username = request.user.username
Expand All @@ -308,32 +313,44 @@ def assertion_consumer_service(request, org_id=None, attribute_mapping=None, cre

return HttpResponseRedirect(relay_state)

# check user number limit by license
if user_number_over_limit():
logger.error('The number of users exceeds the license limit.')
# send error msg to admin
error_msg = 'The number of users exceeds the license limit.'
admins = User.objects.get_superusers()
for admin in admins:
saml_sso_failed.send(sender=None, to_user=admin.email, error_msg=error_msg)
return HttpResponseForbidden(_('Internal server error. Please contact system administrator.'))

# check user number limit by org member quota
if org:
org_members = len(ccnet_api.get_org_emailusers(org.url_prefix, -1, -1))
if ORG_MEMBER_QUOTA_ENABLED:
from seahub.organizations.models import OrgMemberQuota
org_members_quota = OrgMemberQuota.objects.get_quota(org_id)
if org_members_quota is not None and org_members >= org_members_quota:
logger.error('The number of users exceeds the organization quota.')
# send error msg to admin
error_msg = 'Failed to create new user: the number of users exceeds the organization quota.'
org_admins = get_org_admins(org)
for org_admin in org_admins:
saml_sso_failed.send(sender=None, to_user=org_admin.email, error_msg=error_msg)
return HttpResponseForbidden(_('Failed to create new user: '
'the number of users exceeds the organization quota. '
'Please report to your organization (company) administrator.'))
if not session_info:
return render_error(request, login_failed_error_msg)

name_id = session_info.get('name_id', '')
if not name_id:
return render_error(request, login_failed_error_msg)

uid = name_id.text
if not uid:
return render_error(request, login_failed_error_msg)

if not SocialAuthUser.objects.get_by_provider_and_uid(SAML_PROVIDER_IDENTIFIER,
uid):
# check user number limit by license
if user_number_over_limit():
logger.error('The number of users exceeds the license limit.')
# send error msg to admin
error_msg = 'The number of users exceeds the license limit.'
admins = User.objects.get_superusers()
for admin in admins:
saml_sso_failed.send(sender=None, to_user=admin.email, error_msg=error_msg)
return render_error(request, internal_server_error_msg)

# check user number limit by org member quota
if org:
org_members = len(ccnet_api.get_org_emailusers(org.url_prefix, -1, -1))
if ORG_MEMBER_QUOTA_ENABLED:
from seahub.organizations.models import OrgMemberQuota
org_members_quota = OrgMemberQuota.objects.get_quota(org_id)
if org_members_quota is not None and org_members >= org_members_quota:
logger.error('The number of users exceeds the organization quota.')
# send error msg to admin
error_msg = 'The number of users exceeds the organization quota.'
org_admins = get_org_admins(org)
for org_admin in org_admins:
saml_sso_failed.send(sender=None, to_user=org_admin.email,
error_msg=error_msg)
return render_error(request, internal_server_error_msg)

# authenticate the remote user
logger.debug('Trying to authenticate the user')
Expand All @@ -353,8 +370,7 @@ def assertion_consumer_service(request, org_id=None, attribute_mapping=None, cre
admins = User.objects.get_superusers()
for admin in admins:
saml_sso_failed.send(sender=None, to_user=admin.email, error_msg=error_msg)
return HttpResponseForbidden(_('Login failed: failed to create user. '
'Please report to your organization (company) administrator.'))
return render_error(request, login_failed_error_msg)

if not user.is_active:
logger.error('ADFS/SAML single sign-on failed: user %s is deactivated.' % user.username)
Expand Down Expand Up @@ -564,6 +580,7 @@ def adfs_compatible_view(request, url_prefix):
org_id = str(org.org_id)
return HttpResponsePermanentRedirect(request.path.replace(url_prefix, org_id))


class SamlLogoutView(LogoutView):
def do_logout_service(self, request, data, binding, *args, **kwargs):
return super(SamlLogoutView, self).do_logout_service(request, data, binding)

0 comments on commit 7225eb8

Please sign in to comment.