Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions src/sentry/incidents/endpoints/bases.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,41 @@
from sentry.api.bases.project import ProjectAlertRulePermission, ProjectEndpoint
from sentry.api.exceptions import ResourceDoesNotExist
from sentry.incidents.models.alert_rule import AlertRule
from sentry.models.organization import Organization


class OrganizationAlertRuleBaseEndpoint(OrganizationEndpoint):
"""
Base endpoint for organization-scoped alert rule creation.
Provides permission checking for alert creation that handles both
org-level permissions and team admin project-scoped permissions.
"""

def check_can_create_alert(self, request: Request, organization: Organization) -> None:
"""
Determine if the requesting user has access to alert creation. If the request does not have the "alerts:write"
permission, then we must verify that the user is a team admin with "alerts:write" access to the project(s)
in their request.
"""

# if the requesting user has any of these org-level permissions, then they can create an alert
if (
request.access.has_scope("alerts:write")
or request.access.has_scope("org:admin")
or request.access.has_scope("org:write")
):
return

# team admins should be able to create alerts for the projects they have access to
projects = self.get_projects(request, organization)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this get all projects the user has access to, or just the projects in the current url params selection?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returns all project ids that the user can access within this organization:
https://github.com/getsentry/sentry/blob/master/src/sentry/api/bases/organization.py#L355-L369

Btw this is not the new code, it was moved from OrganizationAlertRuleIndexEndpoint.check_can_create_alert
https://github.com/getsentry/sentry/pull/104171/changes#diff-7ce0c5c5a5e204143274734b7e9a288cd70a869fa5d460e99eee830e44cda682L627-L628

# team admins will have alerts:write scoped to their projects, members will not
team_admin_has_access = all(
[request.access.has_project_scope(project, "alerts:write") for project in projects]
)
# all() returns True for empty list, so include a check for it
if not team_admin_has_access or not projects:
raise PermissionDenied
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Permission check validates all projects, not the requested one

Medium Severity

The check_can_create_alert method calls self.get_projects(request, organization) without passing the target project, which returns ALL accessible projects rather than the project specified in the POST body. The subsequent check verifies alerts:write permission on ALL returned projects using all(). This causes users with mixed team permissions (team admin on some projects, regular member on others) to be incorrectly denied when trying to create a monitor for a project they DO have admin access to, because the permission check fails on the other projects where they only have member access.

Additional Locations (1)

Fix in Cursor Fix in Web



class ProjectAlertRuleEndpoint(ProjectEndpoint):
Expand Down
63 changes: 24 additions & 39 deletions src/sentry/incidents/endpoints/organization_alert_rule_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from django.http.response import HttpResponseBase
from drf_spectacular.utils import extend_schema, extend_schema_serializer
from rest_framework import serializers, status
from rest_framework.exceptions import ParseError, PermissionDenied, ValidationError
from rest_framework.exceptions import ParseError, ValidationError
from rest_framework.request import Request
from rest_framework.response import Response

Expand All @@ -33,6 +33,7 @@
from sentry.constants import ObjectStatus
from sentry.db.models.manager.base_query_set import BaseQuerySet
from sentry.exceptions import InvalidParams
from sentry.incidents.endpoints.bases import OrganizationAlertRuleBaseEndpoint
from sentry.incidents.endpoints.serializers.alert_rule import (
AlertRuleSerializer,
AlertRuleSerializerResponse,
Expand Down Expand Up @@ -157,7 +158,14 @@ def create_metric_alert(
return Response(serialize(alert_rule, request.user), status=status.HTTP_201_CREATED)


class AlertRuleIndexMixin(Endpoint):
class AlertRuleFetchMixin(Endpoint):
"""
Mixin providing fetch functionality for metric alert rules.

This mixin requires access to paginate() method from Endpoint.
Can be used with any endpoint base class (OrganizationEndpoint, ProjectEndpoint, etc).
"""

def fetch_metric_alert(
self, request: Request, organization: Organization, alert_rules: BaseQuerySet[AlertRule]
) -> HttpResponseBase:
Expand Down Expand Up @@ -590,50 +598,14 @@ class OrganizationAlertRuleIndexPostSerializer(serializers.Serializer):

@extend_schema(tags=["Alerts"])
@region_silo_endpoint
class OrganizationAlertRuleIndexEndpoint(OrganizationEndpoint, AlertRuleIndexMixin):
class OrganizationAlertRuleIndexEndpoint(OrganizationAlertRuleBaseEndpoint, AlertRuleFetchMixin):
owner = ApiOwner.ISSUES
publish_status = {
"GET": ApiPublishStatus.PUBLIC,
"POST": ApiPublishStatus.PUBLIC,
}
permission_classes = (OrganizationAlertRulePermission,)

def check_can_create_alert(self, request: Request, organization: Organization) -> None:
"""
Determine if the requesting user has access to alert creation. If the request does not have the "alerts:write"
permission, then we must verify that the user is a team admin with "alerts:write" access to the project(s)
in their request.
"""
if features.has(
"organizations:workflow-engine-metric-detector-limit", organization, actor=request.user
):
alert_limit = quotas.backend.get_metric_detector_limit(organization.id)
alert_count = AlertRule.objects.fetch_for_organization(organization=organization)
# filter out alert rules without any projects
alert_count = alert_count.filter(projects__isnull=False).distinct().count()

if alert_limit >= 0 and alert_count >= alert_limit:
raise ValidationError(
f"You may not exceed {alert_limit} metric alerts on your current plan."
)

# if the requesting user has any of these org-level permissions, then they can create an alert
if (
request.access.has_scope("alerts:write")
or request.access.has_scope("org:admin")
or request.access.has_scope("org:write")
):
return
# team admins should be able to create alerts for the projects they have access to
projects = self.get_projects(request, organization)
# team admins will have alerts:write scoped to their projects, members will not
team_admin_has_access = all(
[request.access.has_project_scope(project, "alerts:write") for project in projects]
)
# all() returns True for empty list, so include a check for it
if not team_admin_has_access or not projects:
raise PermissionDenied

@extend_schema(
operation_id="List an Organization's Metric Alert Rules",
parameters=[GlobalParams.ORG_ID_OR_SLUG],
Expand Down Expand Up @@ -827,5 +799,18 @@ def post(self, request: Request, organization: Organization) -> HttpResponseBase
}
```
"""
if features.has(
"organizations:workflow-engine-metric-detector-limit", organization, actor=request.user
):
alert_limit = quotas.backend.get_metric_detector_limit(organization.id)
alert_count = AlertRule.objects.fetch_for_organization(organization=organization)
# filter out alert rules without any projects
alert_count = alert_count.filter(projects__isnull=False).distinct().count()

if alert_limit >= 0 and alert_count >= alert_limit:
raise ValidationError(
f"You may not exceed {alert_limit} metric alerts on your current plan."
)

self.check_can_create_alert(request, organization)
return create_metric_alert(request, organization)
4 changes: 2 additions & 2 deletions src/sentry/incidents/endpoints/project_alert_rule_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
from sentry.api.base import region_silo_endpoint
from sentry.api.bases.project import ProjectAlertRulePermission, ProjectEndpoint
from sentry.incidents.endpoints.organization_alert_rule_index import (
AlertRuleIndexMixin,
AlertRuleFetchMixin,
create_metric_alert,
)
from sentry.incidents.models.alert_rule import AlertRule
from sentry.workflow_engine.utils.legacy_metric_tracking import track_alert_endpoint_execution


@region_silo_endpoint
class ProjectAlertRuleIndexEndpoint(ProjectEndpoint, AlertRuleIndexMixin):
class ProjectAlertRuleIndexEndpoint(ProjectEndpoint, AlertRuleFetchMixin):
owner = ApiOwner.ISSUES
publish_status = {
"GET": ApiPublishStatus.EXPERIMENTAL,
Expand Down
7 changes: 5 additions & 2 deletions src/sentry/monitors/endpoints/organization_monitor_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from sentry.api.api_publish_status import ApiPublishStatus
from sentry.api.base import region_silo_endpoint
from sentry.api.bases import NoProjects
from sentry.api.bases.organization import OrganizationAlertRulePermission, OrganizationEndpoint
from sentry.api.bases.organization import OrganizationAlertRulePermission
from sentry.api.helpers.teams import get_teams
from sentry.api.paginator import OffsetPaginator
from sentry.api.serializers import serialize
Expand All @@ -32,6 +32,7 @@
from sentry.apidocs.utils import inline_sentry_response_serializer
from sentry.constants import DataCategory, ObjectStatus
from sentry.db.models.query import in_iexact
from sentry.incidents.endpoints.bases import OrganizationAlertRuleBaseEndpoint
from sentry.models.environment import Environment
from sentry.models.organization import Organization
from sentry.monitors.models import (
Expand Down Expand Up @@ -72,7 +73,7 @@ def flip_sort_direction(sort_field: str) -> str:

@region_silo_endpoint
@extend_schema(tags=["Crons"])
class OrganizationMonitorIndexEndpoint(OrganizationEndpoint):
class OrganizationMonitorIndexEndpoint(OrganizationAlertRuleBaseEndpoint):
publish_status = {
"GET": ApiPublishStatus.PUBLIC,
"POST": ApiPublishStatus.PUBLIC,
Expand Down Expand Up @@ -276,6 +277,8 @@ def post(self, request: AuthenticatedHttpRequest, organization) -> Response:
"""
Create a new monitor.
"""
self.check_can_create_alert(request, organization)

This comment was marked as outdated.


validator = MonitorValidator(
data=request.data,
context={"organization": organization, "access": request.access, "request": request},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from sentry import audit_log
from sentry.analytics.events.cron_monitor_created import CronMonitorCreated, FirstCronMonitorCreated
from sentry.constants import DataCategory, ObjectStatus
from sentry.models.projectteam import ProjectTeam
from sentry.models.rule import Rule, RuleSource
from sentry.monitors.models import Monitor, MonitorStatus, ScheduleType, is_monitor_muted
from sentry.monitors.utils import get_detector_for_monitor
Expand Down Expand Up @@ -664,6 +665,58 @@ def test_invalid_schedule(self) -> None:
response = self.get_error_response(self.organization.slug, **data, status_code=400)
assert response.data["config"]["schedule"][0] == "Schedule is invalid"

def test_team_admin_create(self) -> None:
team_admin_user = self.create_user()
team = self.create_team(organization=self.organization)
self.create_member(
team_roles=[(team, "admin")],
user=team_admin_user,
role="member",
organization=self.organization,
)
# Associate the team with the project
ProjectTeam.objects.create(project=self.project, team=team)

member_user = self.create_user()
self.create_member(
user=member_user, organization=self.organization, role="member", teams=[team]
)

self.organization.update_option("sentry:alerts_member_write", False)
self.login_as(team_admin_user)

data = {
"project": self.project.slug,
"name": "Team Admin Monitor",
"type": "cron_job",
"config": {"schedule_type": "crontab", "schedule": "@daily"},
}
resp = self.get_success_response(self.organization.slug, **data)
assert resp.status_code == 201

# verify that a team admin cannot create a monitor for a project their team doesn't own
other_org = self.create_organization()
other_project = self.create_project(organization=other_org)
data_invalid = {
"project": other_project.slug,
"name": "Invalid Monitor",
"type": "cron_job",
"config": {"schedule_type": "crontab", "schedule": "@daily"},
}
resp = self.get_error_response(self.organization.slug, status_code=400, **data_invalid)
assert resp.data["project"][0] == "Invalid project"

# verify that a regular team member cannot create a monitor
self.login_as(member_user)
data_member = {
"project": self.project.slug,
"name": "Member Monitor",
"type": "cron_job",
"config": {"schedule_type": "crontab", "schedule": "@daily"},
}
resp = self.get_response(self.organization.slug, **data_member)
assert resp.status_code == 403


class BulkEditOrganizationMonitorTest(MonitorTestCase):
endpoint = "sentry-api-0-organization-monitor-index"
Expand Down
Loading