diff --git a/openfga_sdk/credentials.py b/openfga_sdk/credentials.py index c2bcdf2..9e50a6e 100644 --- a/openfga_sdk/credentials.py +++ b/openfga_sdk/credentials.py @@ -30,6 +30,7 @@ class CredentialConfiguration: :param api_token: Bearer token to be sent for authentication :param api_audience: API audience used for OAuth2 :param api_issuer: API issuer used for OAuth2 + :param scopes: OAuth2 scopes to request, can be a list of strings or a space-separated string """ def __init__( @@ -39,12 +40,14 @@ def __init__( api_audience: str | None = None, api_issuer: str | None = None, api_token: str | None = None, + scopes: str | list[str] | None = None, ): self._client_id = client_id self._client_secret = client_secret self._api_audience = api_audience self._api_issuer = api_issuer self._api_token = api_token + self._scopes = scopes @property def client_id(self): @@ -116,6 +119,20 @@ def api_token(self, value): """ self._api_token = value + @property + def scopes(self): + """ + Return the scopes configured + """ + return self._scopes + + @scopes.setter + def scopes(self, value): + """ + Update the scopes + """ + self._scopes = value + class Credentials: """ diff --git a/openfga_sdk/oauth2.py b/openfga_sdk/oauth2.py index 5405286..060acf5 100644 --- a/openfga_sdk/oauth2.py +++ b/openfga_sdk/oauth2.py @@ -79,6 +79,13 @@ async def _obtain_token(self, client): "grant_type": "client_credentials", } + # Add scope parameter if scopes are configured + if configuration.scopes is not None: + if isinstance(configuration.scopes, list): + post_params["scope"] = " ".join(configuration.scopes) + else: + post_params["scope"] = configuration.scopes + headers = urllib3.response.HTTPHeaderDict( { "Accept": "application/json", diff --git a/openfga_sdk/sync/oauth2.py b/openfga_sdk/sync/oauth2.py index 9e3b8b7..ad4336c 100644 --- a/openfga_sdk/sync/oauth2.py +++ b/openfga_sdk/sync/oauth2.py @@ -79,6 +79,13 @@ def _obtain_token(self, client): "grant_type": "client_credentials", } + # Add scope parameter if scopes are configured + if configuration.scopes is not None: + if isinstance(configuration.scopes, list): + post_params["scope"] = " ".join(configuration.scopes) + else: + post_params["scope"] = configuration.scopes + headers = urllib3.response.HTTPHeaderDict( { "Accept": "application/json", diff --git a/test/credentials_test.py b/test/credentials_test.py index 238608f..d3bc446 100644 --- a/test/credentials_test.py +++ b/test/credentials_test.py @@ -107,6 +107,42 @@ def test_configuration_client_credentials(self): credential.validate_credentials_config() self.assertEqual(credential.method, "client_credentials") + def test_configuration_client_credentials_with_scopes_list(self): + """ + Test credential with method client_credentials and scopes as list is valid + """ + credential = Credentials( + method="client_credentials", + configuration=CredentialConfiguration( + client_id="myclientid", + client_secret="mysecret", + api_issuer="issuer.fga.example", + api_audience="myaudience", + scopes=["read", "write", "admin"], + ), + ) + credential.validate_credentials_config() + self.assertEqual(credential.method, "client_credentials") + self.assertEqual(credential.configuration.scopes, ["read", "write", "admin"]) + + def test_configuration_client_credentials_with_scopes_string(self): + """ + Test credential with method client_credentials and scopes as string is valid + """ + credential = Credentials( + method="client_credentials", + configuration=CredentialConfiguration( + client_id="myclientid", + client_secret="mysecret", + api_issuer="issuer.fga.example", + api_audience="myaudience", + scopes="read write admin", + ), + ) + credential.validate_credentials_config() + self.assertEqual(credential.method, "client_credentials") + self.assertEqual(credential.configuration.scopes, "read write admin") + def test_configuration_client_credentials_missing_config(self): """ Test credential with method client_credentials and configuration is missing diff --git a/test/oauth2_test.py b/test/oauth2_test.py index 0b5adef..ba43806 100644 --- a/test/oauth2_test.py +++ b/test/oauth2_test.py @@ -494,3 +494,117 @@ async def test_get_authentication_add_scheme_and_path(self, mock_request): }, ) await rest_client.close() + + @patch.object(rest.RESTClientObject, "request") + async def test_get_authentication_obtain_client_credentials_with_scopes_list(self, mock_request): + """ + Test getting authentication header when method is client credentials with scopes as list + """ + response_body = """ +{ + "expires_in": 120, + "access_token": "AABBCCDD" +} + """ + mock_request.return_value = mock_response(response_body, 200) + + credentials = Credentials( + method="client_credentials", + configuration=CredentialConfiguration( + client_id="myclientid", + client_secret="mysecret", + api_issuer="issuer.fga.example", + api_audience="myaudience", + scopes=["read", "write", "admin"], + ), + ) + rest_client = rest.RESTClientObject(Configuration()) + current_time = datetime.now() + client = OAuth2Client(credentials) + auth_header = await client.get_authentication_header(rest_client) + self.assertEqual(auth_header, {"Authorization": "Bearer AABBCCDD"}) + self.assertEqual(client._access_token, "AABBCCDD") + self.assertGreaterEqual( + client._access_expiry_time, current_time + timedelta(seconds=120) + ) + expected_header = urllib3.response.HTTPHeaderDict( + { + "Accept": "application/json", + "Content-Type": "application/x-www-form-urlencoded", + "User-Agent": "openfga-sdk (python) 0.9.5", + } + ) + mock_request.assert_called_once_with( + method="POST", + url="https://issuer.fga.example/oauth/token", + headers=expected_header, + query_params=None, + body=None, + _preload_content=True, + _request_timeout=None, + post_params={ + "client_id": "myclientid", + "client_secret": "mysecret", + "audience": "myaudience", + "grant_type": "client_credentials", + "scope": "read write admin", + }, + ) + await rest_client.close() + + @patch.object(rest.RESTClientObject, "request") + async def test_get_authentication_obtain_client_credentials_with_scopes_string(self, mock_request): + """ + Test getting authentication header when method is client credentials with scopes as string + """ + response_body = """ +{ + "expires_in": 120, + "access_token": "AABBCCDD" +} + """ + mock_request.return_value = mock_response(response_body, 200) + + credentials = Credentials( + method="client_credentials", + configuration=CredentialConfiguration( + client_id="myclientid", + client_secret="mysecret", + api_issuer="issuer.fga.example", + api_audience="myaudience", + scopes="read write admin", + ), + ) + rest_client = rest.RESTClientObject(Configuration()) + current_time = datetime.now() + client = OAuth2Client(credentials) + auth_header = await client.get_authentication_header(rest_client) + self.assertEqual(auth_header, {"Authorization": "Bearer AABBCCDD"}) + self.assertEqual(client._access_token, "AABBCCDD") + self.assertGreaterEqual( + client._access_expiry_time, current_time + timedelta(seconds=120) + ) + expected_header = urllib3.response.HTTPHeaderDict( + { + "Accept": "application/json", + "Content-Type": "application/x-www-form-urlencoded", + "User-Agent": "openfga-sdk (python) 0.9.5", + } + ) + mock_request.assert_called_once_with( + method="POST", + url="https://issuer.fga.example/oauth/token", + headers=expected_header, + query_params=None, + body=None, + _preload_content=True, + _request_timeout=None, + post_params={ + "client_id": "myclientid", + "client_secret": "mysecret", + "audience": "myaudience", + "grant_type": "client_credentials", + "scope": "read write admin", + }, + ) + await rest_client.close() \ No newline at end of file diff --git a/test/sync/oauth2_test.py b/test/sync/oauth2_test.py index 0a872e1..fd4f39c 100644 --- a/test/sync/oauth2_test.py +++ b/test/sync/oauth2_test.py @@ -104,6 +104,119 @@ def test_get_authentication_obtain_client_credentials(self, mock_request): ) rest_client.close() + @patch.object(rest.RESTClientObject, "request") + def test_get_authentication_obtain_client_credentials_with_scopes_list(self, mock_request): + """ + Test getting authentication header when method is client credentials with scopes as list + """ + response_body = """ +{ + "expires_in": 120, + "access_token": "AABBCCDD" +} + """ + mock_request.return_value = mock_response(response_body, 200) + + credentials = Credentials( + method="client_credentials", + configuration=CredentialConfiguration( + client_id="myclientid", + client_secret="mysecret", + api_issuer="issuer.fga.example", + api_audience="myaudience", + scopes=["read", "write", "admin"], + ), + ) + rest_client = rest.RESTClientObject(Configuration()) + current_time = datetime.now() + client = OAuth2Client(credentials) + auth_header = client.get_authentication_header(rest_client) + self.assertEqual(auth_header, {"Authorization": "Bearer AABBCCDD"}) + self.assertEqual(client._access_token, "AABBCCDD") + self.assertGreaterEqual( + client._access_expiry_time, current_time + timedelta(seconds=120) + ) + expected_header = urllib3.response.HTTPHeaderDict( + { + "Accept": "application/json", + "Content-Type": "application/x-www-form-urlencoded", + "User-Agent": "openfga-sdk (python) 0.9.5", + } + ) + mock_request.assert_called_once_with( + method="POST", + url="https://issuer.fga.example/oauth/token", + headers=expected_header, + query_params=None, + body=None, + _preload_content=True, + _request_timeout=None, + post_params={ + "client_id": "myclientid", + "client_secret": "mysecret", + "audience": "myaudience", + "grant_type": "client_credentials", + "scope": "read write admin", + }, + ) + rest_client.close() + + @patch.object(rest.RESTClientObject, "request") + def test_get_authentication_obtain_client_credentials_with_scopes_string(self, mock_request): + """ + Test getting authentication header when method is client credentials with scopes as string + """ + response_body = """ +{ + "expires_in": 120, + "access_token": "AABBCCDD" +} + """ + mock_request.return_value = mock_response(response_body, 200) + + credentials = Credentials( + method="client_credentials", + configuration=CredentialConfiguration( + client_id="myclientid", + client_secret="mysecret", + api_issuer="issuer.fga.example", + api_audience="myaudience", + scopes="read write admin", + ), + ) + rest_client = rest.RESTClientObject(Configuration()) + current_time = datetime.now() + client = OAuth2Client(credentials) + auth_header = client.get_authentication_header(rest_client) + self.assertEqual(auth_header, {"Authorization": "Bearer AABBCCDD"}) + self.assertEqual(client._access_token, "AABBCCDD") + self.assertGreaterEqual( + client._access_expiry_time, current_time + timedelta(seconds=120) + ) + expected_header = urllib3.response.HTTPHeaderDict( + { + "Accept": "application/json", + "Content-Type": "application/x-www-form-urlencoded", + "User-Agent": "openfga-sdk (python) 0.9.5", + } + ) + mock_request.assert_called_once_with( + method="POST", + url="https://issuer.fga.example/oauth/token", + headers=expected_header, + query_params=None, + body=None, + _preload_content=True, + _request_timeout=None, + post_params={ + "client_id": "myclientid", + "client_secret": "mysecret", + "audience": "myaudience", + "grant_type": "client_credentials", + "scope": "read write admin", + }, + ) + @patch.object(rest.RESTClientObject, "request") def test_get_authentication_obtain_client_credentials_failed(self, mock_request): """