Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions src/sentry/migrations/1019_add_issued_grant_type_to_apitoken.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Generated by Django 5.2.8 on 2025-01-15

from django.db import migrations, models

from sentry.new_migrations.migrations import CheckedMigration


class Migration(CheckedMigration):
# This flag is used to mark that a migration shouldn't be automatically run in production.
# This should only be used for operations where it's safe to run the migration after your
# code has deployed. So this should not be used for most operations that alter the schema
# of a table.
# Here are some things that make sense to mark as post deployment:
# - Large data migrations. Typically we want these to be run manually so that they can be
# monitored and not block the deploy for a long period of time while they run.
# - Adding indexes to large tables. Since this can take a long time, we'd generally prefer to
# run this outside deployments so that we don't block them. Note that while adding an index
# is a schema change, it's completely safe to run the operation after the code has deployed.
# Once deployed, run these manually via: https://develop.sentry.dev/database-migrations/#migration-deployment

is_post_deployment = False

dependencies = [
("sentry", "1018_encrypt_integration_metadata"),
]

operations = [
migrations.AddField(
model_name="apitoken",
name="issued_grant_type",
field=models.CharField(max_length=50, null=True, blank=True),
),
]
11 changes: 10 additions & 1 deletion src/sentry/models/apitoken.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,12 @@ class ApiToken(ReplicatedControlModel, HasApiScopes):
hashed_refresh_token = models.CharField(max_length=128, unique=True, null=True)
expires_at = models.DateTimeField(null=True, default=default_expiration)
date_added = models.DateTimeField(default=timezone.now)
# Track how the token was originally issued (e.g., "device_code", "authorization_code").
# Used to determine refresh behavior per RFC 6749 §6:
# - Tokens issued via device_code (public client per RFC 8628 §5.6) can refresh without client_secret
# - Tokens issued via authorization_code (confidential client) MUST provide client_secret for refresh
# Null for tokens created before this field was added (treated as confidential for backward compatibility).
issued_grant_type = models.CharField(max_length=50, null=True, blank=True)

objects: ClassVar[ApiTokenManager] = ApiTokenManager(cache_fields=("token",))

Expand Down Expand Up @@ -439,6 +445,7 @@ def from_grant(
user=grant.user,
scope_list=grant.get_scopes(),
scoping_organization_id=grant.organization_id,
issued_grant_type="authorization_code",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use/expand the existing GrantType enum instead of adding magic strings?

)

# Remove the ApiGrant from the database to prevent reuse of the same
Expand Down Expand Up @@ -580,7 +587,9 @@ def default_flush(self, value: bool) -> None:
self._default_flush = value


def is_api_token_auth(auth: object) -> TypeGuard[AuthenticatedToken | ApiToken | ApiTokenReplica]:
def is_api_token_auth(
auth: object,
) -> TypeGuard[AuthenticatedToken | ApiToken | ApiTokenReplica]:
""":returns True when an API token is hitting the API."""
from sentry.auth.services.auth import AuthenticatedToken
from sentry.hybridcloud.models.apitokenreplica import ApiTokenReplica
Expand Down
72 changes: 69 additions & 3 deletions src/sentry/web/frontend/oauth_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,49 @@
reason=reason,
status=401,
)
elif grant_type == GrantTypes.REFRESH:
# Refresh token: client authentication depends on how the token was issued.
# Per RFC 6749 §6: "If the client type is confidential or the client was issued
# client credentials, the client MUST authenticate."
# We determine client type by checking the token's issued_grant_type.
if not client_id:
return self.error(
request=request,
name="invalid_client",
reason="missing client_id",
status=401,
)

# Build query - validate secret only if provided
query: dict[str, str] = {"client_id": client_id}
if client_secret:
query["client_secret"] = client_secret

try:
application = ApiApplication.objects.get(**query)
except ApiApplication.DoesNotExist:
metrics.incr("oauth_token.post.invalid", sample_rate=1.0)
if client_secret:
logger.warning(
"Invalid client_id / secret pair",
extra={"client_id": client_id},
)
reason = "invalid client_id or client_secret"
else:
logger.warning("Invalid client_id", extra={"client_id": client_id})
reason = "invalid client_id"
return self.error(
request=request,
name="invalid_client",
reason=reason,
status=401,
)

# For refresh_token, we need to verify the client authentication matches
# how the token was issued. This check happens in get_refresh_token().
# Pass whether client_secret was provided so we can enforce RFC 6749 §6.
else:
# Other grant types require confidential client authentication
# authorization_code grant requires confidential client authentication
if not client_id or not client_secret:
return self.error(
request=request,
Expand Down Expand Up @@ -273,7 +314,11 @@
elif grant_type == GrantTypes.DEVICE_CODE:
return self.handle_device_code_grant(request=request, application=application)
else:
token_data = self.get_refresh_token(request=request, application=application)
token_data = self.get_refresh_token(
request=request,
application=application,
client_secret_provided=bool(client_secret),
)
if "error" in token_data:
return self.error(
request=request,
Expand Down Expand Up @@ -418,7 +463,12 @@

return token_data

def get_refresh_token(self, request: Request, application: ApiApplication) -> dict:
def get_refresh_token(
self,
request: Request,
application: ApiApplication,
client_secret_provided: bool = True,
) -> dict:
refresh_token_code = request.POST.get("refresh_token")
scope = request.POST.get("scope")

Expand All @@ -435,6 +485,21 @@
)
except ApiToken.DoesNotExist:
return {"error": "invalid_grant", "reason": "invalid request"}

# RFC 6749 §6: "If the client type is confidential or the client was issued
# client credentials, the client MUST authenticate."
#
# Per RFC 8628 §5.6, device flow clients are public clients.
# Tokens issued via device_code can be refreshed without client_secret.
# All other tokens (authorization_code, or legacy null) require client_secret.
if not client_secret_provided:
if refresh_token.issued_grant_type != "device_code":
# Confidential client token - client_secret is required
return {
"error": "invalid_client",
"reason": "client_secret required for this token",
}

refresh_token.refresh()

return {"token": refresh_token}
Expand Down Expand Up @@ -587,6 +652,7 @@
user_id=device_code.user.id,
scope_list=device_code.scope_list,
scoping_organization_id=device_code.organization_id,
issued_grant_type="device_code",
)

# Delete the device code (one-time use)
Expand Down
178 changes: 178 additions & 0 deletions tests/sentry/web/frontend/test_oauth_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,184 @@ def test_inactive_application_rejects_token_refresh(self) -> None:
assert token_after.refresh_token == self.token.refresh_token
assert token_after.expires_at == self.token.expires_at

def test_device_flow_token_refresh_without_secret(self) -> None:
"""Device flow tokens can be refreshed without client_secret.

Per RFC 8628 §5.6: "device clients... should be treated as public clients"
Per RFC 6749 §6: Public clients don't need to authenticate for refresh.

Tokens issued via device_code grant can be refreshed with just client_id.
"""
self.login_as(self.user)

# Create a token that was issued via device flow
device_flow_token = ApiToken.objects.create(
application=self.application,
user=self.user,
expires_at=timezone.now(),
issued_grant_type="device_code",
)

# Request without client_secret (public client)
resp = self.client.post(
self.path,
{
"grant_type": "refresh_token",
"client_id": self.application.client_id,
"refresh_token": device_flow_token.refresh_token,
# No client_secret - this is a public client (device flow)
},
)
Comment on lines +689 to +705
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there also be a test covering device_code + client_secret submit?

assert resp.status_code == 200

data = json.loads(resp.content)
assert "access_token" in data
assert "refresh_token" in data
assert data["token_type"] == "Bearer"

# Token should be refreshed
token2 = ApiToken.objects.get(id=device_flow_token.id)
assert token2.token != device_flow_token.token
assert token2.refresh_token != device_flow_token.refresh_token

def test_authorization_code_token_requires_secret_for_refresh(self) -> None:
"""Authorization code tokens MUST provide client_secret for refresh.

Per RFC 6749 §6: "If the client type is confidential or the client was
issued client credentials, the client MUST authenticate."

Tokens issued via authorization_code are confidential client tokens.
"""
self.login_as(self.user)

# Create a token that was issued via authorization_code (confidential client)
auth_code_token = ApiToken.objects.create(
application=self.application,
user=self.user,
expires_at=timezone.now(),
issued_grant_type="authorization_code",
)

# Try to refresh without client_secret
resp = self.client.post(
self.path,
{
"grant_type": "refresh_token",
"client_id": self.application.client_id,
"refresh_token": auth_code_token.refresh_token,
# No client_secret - should fail for confidential token
},
)
assert resp.status_code == 400
assert json.loads(resp.content) == {"error": "invalid_client"}

def test_legacy_token_requires_secret_for_refresh(self) -> None:
"""Legacy tokens (null issued_grant_type) require client_secret for refresh.

Tokens created before the issued_grant_type field was added are treated
as confidential client tokens for backward compatibility.
"""
self.login_as(self.user)

# self.token has issued_grant_type=None (legacy token from setUp)
assert self.token.issued_grant_type is None

# Try to refresh without client_secret
resp = self.client.post(
self.path,
{
"grant_type": "refresh_token",
"client_id": self.application.client_id,
"refresh_token": self.token.refresh_token,
# No client_secret - should fail for legacy token
},
)
assert resp.status_code == 400
assert json.loads(resp.content) == {"error": "invalid_client"}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't know that I need to send my client secret as well by reading this error message.


def test_public_client_refresh_invalid_client_id(self) -> None:
"""Invalid client_id should return invalid_client."""
self.login_as(self.user)

resp = self.client.post(
self.path,
{
"grant_type": "refresh_token",
"client_id": "nonexistent_client_id",
"refresh_token": self.token.refresh_token,
},
)
assert resp.status_code == 401
assert json.loads(resp.content) == {"error": "invalid_client"}

def test_refresh_missing_client_id(self) -> None:
"""Refresh without client_id should return invalid_client."""
self.login_as(self.user)

resp = self.client.post(
self.path,
{
"grant_type": "refresh_token",
"refresh_token": self.token.refresh_token,
# No client_id
},
)
assert resp.status_code == 401
assert json.loads(resp.content) == {"error": "invalid_client"}

def test_device_flow_token_wrong_application(self) -> None:
"""Refresh token from different application should return invalid_grant.

Per RFC 6749 §6: "the refresh token is bound to the client to which it
was issued"
"""
self.login_as(self.user)

# Create a device flow token
device_flow_token = ApiToken.objects.create(
application=self.application,
user=self.user,
expires_at=timezone.now(),
issued_grant_type="device_code",
)

# Create a different application
other_app = ApiApplication.objects.create(
owner=self.user, redirect_uris="https://other.com"
)

# Try to refresh with token from self.application but client_id from other_app
resp = self.client.post(
self.path,
{
"grant_type": "refresh_token",
"client_id": other_app.client_id,
"refresh_token": device_flow_token.refresh_token,
},
)
assert resp.status_code == 400
assert json.loads(resp.content) == {"error": "invalid_grant"}

def test_confidential_client_refresh_wrong_secret_rejected(self) -> None:
"""When client_secret is provided, it must be valid.

Even for device flow tokens, if a client provides client_secret,
we validate it.
"""
self.login_as(self.user)

resp = self.client.post(
self.path,
{
"grant_type": "refresh_token",
"client_id": self.application.client_id,
"refresh_token": self.token.refresh_token,
"client_secret": "wrong_secret",
},
)
assert resp.status_code == 401
assert json.loads(resp.content) == {"error": "invalid_client"}


@control_silo_test
class OAuthTokenOrganizationScopedTest(TestCase):
Expand Down
Loading