Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions openfga_sdk/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class CredentialConfiguration:
:param api_token: Bearer token to be sent for authentication
:param api_audience: API audience used for OAuth2
:param api_issuer: API issuer used for OAuth2
:param scopes: OAuth2 scopes to request, can be a list of strings or a space-separated string
"""

def __init__(
Expand All @@ -39,12 +40,14 @@ def __init__(
api_audience: str | None = None,
api_issuer: str | None = None,
api_token: str | None = None,
scopes: str | list[str] | None = None,
):
self._client_id = client_id
self._client_secret = client_secret
self._api_audience = api_audience
self._api_issuer = api_issuer
self._api_token = api_token
self._scopes = scopes

@property
def client_id(self):
Expand Down Expand Up @@ -116,6 +119,20 @@ def api_token(self, value):
"""
self._api_token = value

@property
def scopes(self):
"""
Return the scopes configured
"""
return self._scopes

@scopes.setter
def scopes(self, value):
"""
Update the scopes
"""
self._scopes = value


class Credentials:
"""
Expand Down
7 changes: 7 additions & 0 deletions openfga_sdk/oauth2.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ async def _obtain_token(self, client):
"grant_type": "client_credentials",
}

# Add scope parameter if scopes are configured
if configuration.scopes is not None:
if isinstance(configuration.scopes, list):
post_params["scope"] = " ".join(configuration.scopes)
else:
post_params["scope"] = configuration.scopes

Comment on lines +82 to +88
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Add async tests for scopes handling

Sync path has good coverage; async path does not appear to be tested for scope serialization. Please add a mirror test in test/oauth2_test.py that asserts scope is form-encoded as a space-delimited string for list and string inputs.


🏁 Script executed:

#!/bin/bash
# Look for async OAuth2 tests that exercise scope serialization
rg -n -C2 -P 'from\s+openfga_sdk\.oauth2\s+import\s+OAuth2Client' test || true
# Check whether scope assertions exist in async tests
rg -n -C3 -P '\bscope\b' test/oauth2_test.py || true

Length of output: 415


Add synchronous and asynchronous tests for OAuth2Client scope serialization

A search of test/oauth2_test.py found no existing assertions for the “scope” parameter. Please extend that file with:

  • Sync tests
    • Verify that passing configuration.scopes as a list yields a space-delimited "scope" form field
    • Verify that passing it as a single string is encoded unchanged
  • Async tests (pytest.mark.asyncio)
    • Mirror the two sync tests against the async token-request method

headers = urllib3.response.HTTPHeaderDict(
{
"Accept": "application/json",
Expand Down
7 changes: 7 additions & 0 deletions openfga_sdk/sync/oauth2.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ def _obtain_token(self, client):
"grant_type": "client_credentials",
}

# Add scope parameter if scopes are configured
if configuration.scopes is not None:
if isinstance(configuration.scopes, list):
post_params["scope"] = " ".join(configuration.scopes)
else:
post_params["scope"] = configuration.scopes

headers = urllib3.response.HTTPHeaderDict(
{
"Accept": "application/json",
Expand Down
36 changes: 36 additions & 0 deletions test/credentials_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,42 @@ def test_configuration_client_credentials(self):
credential.validate_credentials_config()
self.assertEqual(credential.method, "client_credentials")

def test_configuration_client_credentials_with_scopes_list(self):
"""
Test credential with method client_credentials and scopes as list is valid
"""
credential = Credentials(
method="client_credentials",
configuration=CredentialConfiguration(
client_id="myclientid",
client_secret="mysecret",
api_issuer="issuer.fga.example",
api_audience="myaudience",
scopes=["read", "write", "admin"],
),
)
credential.validate_credentials_config()
self.assertEqual(credential.method, "client_credentials")
self.assertEqual(credential.configuration.scopes, ["read", "write", "admin"])

def test_configuration_client_credentials_with_scopes_string(self):
"""
Test credential with method client_credentials and scopes as string is valid
"""
credential = Credentials(
method="client_credentials",
configuration=CredentialConfiguration(
client_id="myclientid",
client_secret="mysecret",
api_issuer="issuer.fga.example",
api_audience="myaudience",
scopes="read write admin",
),
)
credential.validate_credentials_config()
self.assertEqual(credential.method, "client_credentials")
self.assertEqual(credential.configuration.scopes, "read write admin")

def test_configuration_client_credentials_missing_config(self):
"""
Test credential with method client_credentials and configuration is missing
Expand Down
114 changes: 114 additions & 0 deletions test/oauth2_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,3 +494,117 @@ async def test_get_authentication_add_scheme_and_path(self, mock_request):
},
)
await rest_client.close()

@patch.object(rest.RESTClientObject, "request")
async def test_get_authentication_obtain_client_credentials_with_scopes_list(self, mock_request):
"""
Test getting authentication header when method is client credentials with scopes as list
"""
response_body = """
{
"expires_in": 120,
"access_token": "AABBCCDD"
}
"""
mock_request.return_value = mock_response(response_body, 200)

credentials = Credentials(
method="client_credentials",
configuration=CredentialConfiguration(
client_id="myclientid",
client_secret="mysecret",
api_issuer="issuer.fga.example",
api_audience="myaudience",
scopes=["read", "write", "admin"],
),
)
rest_client = rest.RESTClientObject(Configuration())
current_time = datetime.now()
client = OAuth2Client(credentials)
auth_header = await client.get_authentication_header(rest_client)
self.assertEqual(auth_header, {"Authorization": "Bearer AABBCCDD"})
self.assertEqual(client._access_token, "AABBCCDD")
self.assertGreaterEqual(
client._access_expiry_time, current_time + timedelta(seconds=120)
)
expected_header = urllib3.response.HTTPHeaderDict(
{
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": "openfga-sdk (python) 0.9.5",
}
)
mock_request.assert_called_once_with(
method="POST",
url="https://issuer.fga.example/oauth/token",
headers=expected_header,
query_params=None,
body=None,
_preload_content=True,
_request_timeout=None,
post_params={
"client_id": "myclientid",
"client_secret": "mysecret",
"audience": "myaudience",
"grant_type": "client_credentials",
"scope": "read write admin",
},
)
await rest_client.close()

@patch.object(rest.RESTClientObject, "request")
async def test_get_authentication_obtain_client_credentials_with_scopes_string(self, mock_request):
"""
Test getting authentication header when method is client credentials with scopes as string
"""
response_body = """
{
"expires_in": 120,
"access_token": "AABBCCDD"
}
"""
mock_request.return_value = mock_response(response_body, 200)

credentials = Credentials(
method="client_credentials",
configuration=CredentialConfiguration(
client_id="myclientid",
client_secret="mysecret",
api_issuer="issuer.fga.example",
api_audience="myaudience",
scopes="read write admin",
),
)
rest_client = rest.RESTClientObject(Configuration())
current_time = datetime.now()
client = OAuth2Client(credentials)
auth_header = await client.get_authentication_header(rest_client)
self.assertEqual(auth_header, {"Authorization": "Bearer AABBCCDD"})
self.assertEqual(client._access_token, "AABBCCDD")
self.assertGreaterEqual(
client._access_expiry_time, current_time + timedelta(seconds=120)
)
expected_header = urllib3.response.HTTPHeaderDict(
{
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": "openfga-sdk (python) 0.9.5",
}
)
mock_request.assert_called_once_with(
method="POST",
url="https://issuer.fga.example/oauth/token",
headers=expected_header,
query_params=None,
body=None,
_preload_content=True,
_request_timeout=None,
post_params={
"client_id": "myclientid",
"client_secret": "mysecret",
"audience": "myaudience",
"grant_type": "client_credentials",
"scope": "read write admin",
},
)
await rest_client.close()
113 changes: 113 additions & 0 deletions test/sync/oauth2_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,119 @@ def test_get_authentication_obtain_client_credentials(self, mock_request):
)
rest_client.close()

@patch.object(rest.RESTClientObject, "request")
def test_get_authentication_obtain_client_credentials_with_scopes_list(self, mock_request):
"""
Test getting authentication header when method is client credentials with scopes as list
"""
response_body = """
{
"expires_in": 120,
"access_token": "AABBCCDD"
}
"""
mock_request.return_value = mock_response(response_body, 200)

credentials = Credentials(
method="client_credentials",
configuration=CredentialConfiguration(
client_id="myclientid",
client_secret="mysecret",
api_issuer="issuer.fga.example",
api_audience="myaudience",
scopes=["read", "write", "admin"],
),
)
rest_client = rest.RESTClientObject(Configuration())
current_time = datetime.now()
client = OAuth2Client(credentials)
auth_header = client.get_authentication_header(rest_client)
self.assertEqual(auth_header, {"Authorization": "Bearer AABBCCDD"})
self.assertEqual(client._access_token, "AABBCCDD")
self.assertGreaterEqual(
client._access_expiry_time, current_time + timedelta(seconds=120)
)
expected_header = urllib3.response.HTTPHeaderDict(
{
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": "openfga-sdk (python) 0.9.5",
}
)
mock_request.assert_called_once_with(
method="POST",
url="https://issuer.fga.example/oauth/token",
headers=expected_header,
query_params=None,
body=None,
_preload_content=True,
_request_timeout=None,
post_params={
"client_id": "myclientid",
"client_secret": "mysecret",
"audience": "myaudience",
"grant_type": "client_credentials",
"scope": "read write admin",
},
)
rest_client.close()

@patch.object(rest.RESTClientObject, "request")
def test_get_authentication_obtain_client_credentials_with_scopes_string(self, mock_request):
"""
Test getting authentication header when method is client credentials with scopes as string
"""
response_body = """
{
"expires_in": 120,
"access_token": "AABBCCDD"
}
"""
mock_request.return_value = mock_response(response_body, 200)

credentials = Credentials(
method="client_credentials",
configuration=CredentialConfiguration(
client_id="myclientid",
client_secret="mysecret",
api_issuer="issuer.fga.example",
api_audience="myaudience",
scopes="read write admin",
),
)
rest_client = rest.RESTClientObject(Configuration())
current_time = datetime.now()
client = OAuth2Client(credentials)
auth_header = client.get_authentication_header(rest_client)
self.assertEqual(auth_header, {"Authorization": "Bearer AABBCCDD"})
self.assertEqual(client._access_token, "AABBCCDD")
self.assertGreaterEqual(
client._access_expiry_time, current_time + timedelta(seconds=120)
)
expected_header = urllib3.response.HTTPHeaderDict(
{
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": "openfga-sdk (python) 0.9.5",
}
)
mock_request.assert_called_once_with(
method="POST",
url="https://issuer.fga.example/oauth/token",
headers=expected_header,
query_params=None,
body=None,
_preload_content=True,
_request_timeout=None,
post_params={
"client_id": "myclientid",
"client_secret": "mysecret",
"audience": "myaudience",
"grant_type": "client_credentials",
"scope": "read write admin",
},
)

@patch.object(rest.RESTClientObject, "request")
def test_get_authentication_obtain_client_credentials_failed(self, mock_request):
"""
Expand Down