-
-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
44e2be8
commit 342b79f
Showing
7 changed files
with
182 additions
and
0 deletions.
There are no files selected for viewing
Empty file.
Empty file.
33 changes: 33 additions & 0 deletions
33
bothub/authorizations/consumers/authorizations_consumer.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
import amqp | ||
from sentry_sdk import capture_exception | ||
|
||
from bothub.event_driven.parsers import JSONParser | ||
from bothub.event_driven.consumer.consumers import EDAConsumer | ||
from bothub.authorizations.usecases import AuthorizationsUsecase | ||
from bothub.authorizations.usecases.dto import OrgAuthDTO | ||
|
||
|
||
class OrgAuthConsumer(EDAConsumer): # pragma: no cover | ||
def consume(self, message: amqp.Message): | ||
print(f"[ProjectConsumer] - Consuming a message. Body: {message.body}") | ||
|
||
try: | ||
body = JSONParser.parse(message.body) | ||
|
||
action = body.get("action") | ||
|
||
org_auth_dto = OrgAuthDTO( | ||
user=body.get("user_email"), | ||
org_id=body.get("organization_intelligence"), | ||
role=body.get("role") | ||
) | ||
|
||
usecase = AuthorizationsUsecase() | ||
usecase.dispatch(action)(org_auth_dto) | ||
|
||
message.channel.basic_ack(message.delivery_tag) | ||
|
||
except Exception as exception: | ||
capture_exception(exception) | ||
message.channel.basic_reject(message.delivery_tag, requeue=False) | ||
print(f"[ProjectConsumer] - Message rejected by: {exception}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
from bothub.authorizations.usecases.usecase import AuthorizationsUsecase |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
from dataclasses import dataclass | ||
|
||
|
||
@dataclass | ||
class OrgAuthDTO: | ||
user: str | ||
role: int | ||
org_id: int |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
from django.test import TestCase, RequestFactory | ||
|
||
from bothub.common.models import Organization, RepositoryCategory, OrganizationAuthorization | ||
from bothub.api.v2.tests.utils import create_user_and_token | ||
|
||
|
||
from bothub.authorizations.usecases.dto import OrgAuthDTO | ||
from bothub.authorizations.usecases import AuthorizationsUsecase | ||
|
||
|
||
class UseCaseTestCase(TestCase): | ||
def setUp(self) -> None: | ||
self.factory = RequestFactory() | ||
self.moduser, self.moduser_token = create_user_and_token("module", module=True) | ||
self.user, self.user_token = create_user_and_token("user") | ||
self.category = RepositoryCategory.objects.create(name="Category 1") | ||
self.org = Organization.objects.create( | ||
name="Organization 1", nickname="organization1" | ||
) | ||
self.usecase = AuthorizationsUsecase() | ||
|
||
def test_create(self): | ||
role = 3 | ||
org_id = self.org.id | ||
user_email = self.user.email | ||
org_auth_dto = OrgAuthDTO( | ||
user=user_email, | ||
role=role, | ||
org_id=org_id | ||
) | ||
|
||
created: bool | ||
org_auth: OrganizationAuthorization | ||
|
||
created, org_auth = self.usecase.dispatch("create")(org_auth_dto) | ||
|
||
self.assertIsInstance(org_auth, OrganizationAuthorization) | ||
self.assertTrue(created) | ||
self.assertEquals(org_auth.user.email, user_email) | ||
|
||
def test_update(self): | ||
OrganizationAuthorization.objects.create( | ||
organization=self.org, | ||
user=self.user, | ||
role=3 | ||
) | ||
|
||
role = 2 | ||
org_id = self.org.id | ||
user_email = self.user.email | ||
org_auth_dto = OrgAuthDTO( | ||
user=user_email, | ||
role=role, | ||
org_id=org_id | ||
) | ||
org_auth: OrganizationAuthorization | ||
|
||
org_auth = self.usecase.dispatch("update")(org_auth_dto) | ||
|
||
self.assertIsInstance(org_auth, OrganizationAuthorization) | ||
self.assertEquals(org_auth.role, role) | ||
|
||
def test_delete(self): | ||
self.org.organization_authorizations.create(user=self.user, role=3) | ||
role = 2 | ||
org_id = self.org.id | ||
user_email = self.user.email | ||
org_auth_dto = OrgAuthDTO( | ||
user=user_email, | ||
role=role, | ||
org_id=org_id | ||
) | ||
|
||
self.usecase.dispatch("delete")(org_auth_dto) | ||
|
||
with self.assertRaises(OrganizationAuthorization.DoesNotExist): | ||
OrganizationAuthorization.objects.get(user=self.user, organization=self.org) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
from typing import Tuple | ||
|
||
from bothub.common.models import Organization, OrganizationAuthorization, User | ||
from bothub.authorizations.usecases.dto import OrgAuthDTO | ||
|
||
class UserDoesNotExist(BaseException): | ||
pass | ||
|
||
class OrgDoesNotExist(BaseException): | ||
pass | ||
|
||
|
||
class AuthorizationsUsecase: | ||
def dispatch(self, action: str): | ||
if action == "create": | ||
return self.get_or_create_organization_authorization | ||
if action == "update": | ||
return self.update_organization_authorization | ||
if action == "delete": | ||
return self.delete_organization_authorization | ||
|
||
def __get_user(self, user_email: str) -> User: | ||
try: | ||
return User.objects.get(email=user_email) | ||
except User.DoesNotExist: | ||
raise UserDoesNotExist(f"User with email {user_email} does not exist") | ||
|
||
def __get_org(self, org_id: int) -> Organization: | ||
try: | ||
return Organization.objects.get(id=org_id) | ||
except Organization.DoesNotExist: | ||
raise OrgDoesNotExist(f"Org with id {org_id} does not exist") | ||
|
||
def get_or_create_organization_authorization(self, org_auth_dto: OrgAuthDTO) -> Tuple[bool, OrganizationAuthorization]: | ||
org: Organization = self.__get_org(org_auth_dto.org_id) | ||
user: User = self.__get_user(org_auth_dto.user) | ||
|
||
try: | ||
org_auth: OrganizationAuthorization = org.organization_authorizations.get(user=user) | ||
return False, org_auth | ||
except OrganizationAuthorization.DoesNotExist: | ||
org_auth = org.organization_authorizations.create( | ||
user=user, | ||
role=org_auth_dto.role | ||
) | ||
return True, org_auth | ||
|
||
def update_organization_authorization(self, org_auth_dto: OrgAuthDTO) -> OrganizationAuthorization: | ||
created, org_auth = self.get_or_create_organization_authorization(org_auth_dto) | ||
|
||
if created: | ||
return org_auth | ||
|
||
org_auth.role = org_auth_dto.role | ||
org_auth.save(update_fields=["role"]) | ||
|
||
return org_auth | ||
|
||
def delete_organization_authorization(self, org_auth_dto: OrgAuthDTO) -> None: | ||
_, org_auth = self.get_or_create_organization_authorization(org_auth_dto) | ||
org_auth.delete() | ||
return |