Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 42 additions & 36 deletions src/sentry/incidents/endpoints/organization_alert_rule_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,35 @@ def create_metric_alert(


class AlertRuleIndexMixin(Endpoint):
def check_can_create_alert(self, request: Request, organization: Organization) -> None:
"""
Determine if the requesting user has access to alert creation. If the request does not have the "alerts:write"
permission, then we must verify that the user is a team admin with "alerts:write" access to the project(s)
in their request.
"""

# if the requesting user has any of these org-level permissions, then they can create an alert
if (
request.access.has_scope("alerts:write")
or request.access.has_scope("org:admin")
or request.access.has_scope("org:write")
):
return

# team admins should be able to create alerts for the projects they have access to
# Verify that get_projects is available (requires OrganizationEndpoint)
if not hasattr(self, "get_projects"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we record a sentry error here? Since presumably if this happens it means we’ve implemented things wrong (not extending organization endpoint)

raise PermissionDenied

projects = self.get_projects(request, organization)
# team admins will have alerts:write scoped to their projects, members will not
team_admin_has_access = all(
[request.access.has_project_scope(project, "alerts:write") for project in projects]
)
# all() returns True for empty list, so include a check for it
if not team_admin_has_access or not projects:
raise PermissionDenied

def fetch_metric_alert(
self, request: Request, organization: Organization, alert_rules: BaseQuerySet[AlertRule]
) -> HttpResponseBase:
Expand Down Expand Up @@ -595,42 +624,6 @@ class OrganizationAlertRuleIndexEndpoint(OrganizationEndpoint, AlertRuleIndexMix
}
permission_classes = (OrganizationAlertRulePermission,)

def check_can_create_alert(self, request: Request, organization: Organization) -> None:
"""
Determine if the requesting user has access to alert creation. If the request does not have the "alerts:write"
permission, then we must verify that the user is a team admin with "alerts:write" access to the project(s)
in their request.
"""
if features.has(
"organizations:workflow-engine-metric-detector-limit", organization, actor=request.user
):
alert_limit = quotas.backend.get_metric_detector_limit(organization.id)
alert_count = AlertRule.objects.fetch_for_organization(organization=organization)
# filter out alert rules without any projects
alert_count = alert_count.filter(projects__isnull=False).distinct().count()

if alert_limit >= 0 and alert_count >= alert_limit:
raise ValidationError(
f"You may not exceed {alert_limit} metric alerts on your current plan."
)

# if the requesting user has any of these org-level permissions, then they can create an alert
if (
request.access.has_scope("alerts:write")
or request.access.has_scope("org:admin")
or request.access.has_scope("org:write")
):
return
# team admins should be able to create alerts for the projects they have access to
projects = self.get_projects(request, organization)
# team admins will have alerts:write scoped to their projects, members will not
team_admin_has_access = all(
[request.access.has_project_scope(project, "alerts:write") for project in projects]
)
# all() returns True for empty list, so include a check for it
if not team_admin_has_access or not projects:
raise PermissionDenied

@extend_schema(
operation_id="List an Organization's Metric Alert Rules",
parameters=[GlobalParams.ORG_ID_OR_SLUG],
Expand Down Expand Up @@ -824,5 +817,18 @@ def post(self, request: Request, organization: Organization) -> HttpResponseBase
}
```
"""
if features.has(
"organizations:workflow-engine-metric-detector-limit", organization, actor=request.user
):
alert_limit = quotas.backend.get_metric_detector_limit(organization.id)
alert_count = AlertRule.objects.fetch_for_organization(organization=organization)
# filter out alert rules without any projects
alert_count = alert_count.filter(projects__isnull=False).distinct().count()

if alert_limit >= 0 and alert_count >= alert_limit:
raise ValidationError(
f"You may not exceed {alert_limit} metric alerts on your current plan."
)

self.check_can_create_alert(request, organization)
return create_metric_alert(request, organization)
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from sentry.apidocs.utils import inline_sentry_response_serializer
from sentry.constants import DataCategory, ObjectStatus
from sentry.db.models.query import in_iexact
from sentry.incidents.endpoints.organization_alert_rule_index import AlertRuleIndexMixin
from sentry.models.environment import Environment
from sentry.models.organization import Organization
from sentry.monitors.models import (
Expand Down Expand Up @@ -72,7 +73,7 @@ def flip_sort_direction(sort_field: str) -> str:

@region_silo_endpoint
@extend_schema(tags=["Crons"])
class OrganizationMonitorIndexEndpoint(OrganizationEndpoint):
class OrganizationMonitorIndexEndpoint(OrganizationEndpoint, AlertRuleIndexMixin):
publish_status = {
"GET": ApiPublishStatus.PUBLIC,
"POST": ApiPublishStatus.PUBLIC,
Expand Down Expand Up @@ -276,6 +277,8 @@ def post(self, request: AuthenticatedHttpRequest, organization) -> Response:
"""
Create a new monitor.
"""
self.check_can_create_alert(request, organization)

This comment was marked as outdated.


validator = MonitorValidator(
data=request.data,
context={"organization": organization, "access": request.access, "request": request},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from sentry import audit_log
from sentry.analytics.events.cron_monitor_created import CronMonitorCreated, FirstCronMonitorCreated
from sentry.constants import DataCategory, ObjectStatus
from sentry.models.projectteam import ProjectTeam
from sentry.models.rule import Rule, RuleSource
from sentry.monitors.models import Monitor, MonitorStatus, ScheduleType, is_monitor_muted
from sentry.monitors.utils import get_detector_for_monitor
Expand Down Expand Up @@ -664,6 +665,58 @@ def test_invalid_schedule(self) -> None:
response = self.get_error_response(self.organization.slug, **data, status_code=400)
assert response.data["config"]["schedule"][0] == "Schedule is invalid"

def test_team_admin_create(self) -> None:
team_admin_user = self.create_user()
team = self.create_team(organization=self.organization)
self.create_member(
team_roles=[(team, "admin")],
user=team_admin_user,
role="member",
organization=self.organization,
)
# Associate the team with the project
ProjectTeam.objects.create(project=self.project, team=team)

member_user = self.create_user()
self.create_member(
user=member_user, organization=self.organization, role="member", teams=[team]
)

self.organization.update_option("sentry:alerts_member_write", False)
self.login_as(team_admin_user)

data = {
"project": self.project.slug,
"name": "Team Admin Monitor",
"type": "cron_job",
"config": {"schedule_type": "crontab", "schedule": "@daily"},
}
resp = self.get_success_response(self.organization.slug, **data)
assert resp.status_code == 201

# verify that a team admin cannot create a monitor for a project their team doesn't own
other_org = self.create_organization()
other_project = self.create_project(organization=other_org)
data_invalid = {
"project": other_project.slug,
"name": "Invalid Monitor",
"type": "cron_job",
"config": {"schedule_type": "crontab", "schedule": "@daily"},
}
resp = self.get_error_response(self.organization.slug, status_code=400, **data_invalid)
assert resp.data["project"][0] == "Invalid project"

# verify that a regular team member cannot create a monitor
self.login_as(member_user)
data_member = {
"project": self.project.slug,
"name": "Member Monitor",
"type": "cron_job",
"config": {"schedule_type": "crontab", "schedule": "@daily"},
}
resp = self.get_response(self.organization.slug, **data_member)
assert resp.status_code == 403


class BulkEditOrganizationMonitorTest(MonitorTestCase):
endpoint = "sentry-api-0-organization-monitor-index"
Expand Down
Loading