Skip to content

fix: Unable to acquire impersonated credentials #2003

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/google/adk/tools/apihub_tool/clients/apihub_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from abc import ABC
from abc import abstractmethod
import base64
Expand Down Expand Up @@ -324,7 +326,9 @@ def _get_access_token(self) -> str:
raise ValueError(f"Invalid service account JSON: {e}") from e
else:
try:
credentials, _ = default_service_credential()
credentials, _ = default_service_credential(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
except:
credentials = None

Expand Down
6 changes: 5 additions & 1 deletion src/google/adk/tools/apihub_tool/clients/secret_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import json
from typing import Optional

Expand Down Expand Up @@ -73,7 +75,9 @@ def __init__(
credentials.refresh(request)
else:
try:
credentials, _ = default_service_credential()
credentials, _ = default_service_credential(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
except Exception as e:
raise ValueError(
"'service_account_json' or 'auth_token' are both missing, and"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import json
import time
from typing import Any
Expand Down Expand Up @@ -810,7 +812,9 @@ def _get_access_token(self) -> str:
)
else:
try:
credentials, _ = default_service_credential()
credentials, _ = default_service_credential(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
except:
credentials = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import json
from typing import List
from typing import Optional
Expand Down Expand Up @@ -241,7 +243,9 @@ def _get_access_token(self) -> str:
)
else:
try:
credentials, _ = default_service_credential()
credentials, _ = default_service_credential(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
except:
credentials = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

"""Credential fetcher for Google Service Account."""

from __future__ import annotations

from typing import Optional

import google.auth
Expand Down Expand Up @@ -72,7 +74,9 @@ def exchange_credential(

try:
if auth_credential.service_account.use_default_credential:
credentials, _ = google.auth.default()
credentials, _ = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
else:
config = auth_credential.service_account
credentials = service_account.Credentials.from_service_account_info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,10 @@ def test_get_access_token_use_default_credential(
client = APIHubClient()
token = client._get_access_token()
assert token == "default_token"
# Verify default_service_credential is called with the correct scopes parameter
mock_default_service_credential.assert_called_once_with(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
mock_credential.refresh.assert_called_once()
assert client.credential_cache == mock_credential

Expand Down
195 changes: 195 additions & 0 deletions tests/unittests/tools/apihub_tool/clients/test_secret_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Unit tests for the SecretManagerClient."""

import json
from unittest.mock import MagicMock
from unittest.mock import patch

from google.adk.tools.apihub_tool.clients.secret_client import SecretManagerClient
import pytest

import google


class TestSecretManagerClient:
"""Tests for the SecretManagerClient class."""

@patch("google.cloud.secretmanager.SecretManagerServiceClient")
@patch(
"google.adk.tools.apihub_tool.clients.secret_client.default_service_credential"
)
def test_init_with_default_credentials(
self, mock_default_service_credential, mock_secret_manager_client
):
"""Test initialization with default credentials."""
# Setup
mock_credentials = MagicMock()
mock_default_service_credential.return_value = (
mock_credentials,
"test-project",
)

# Execute
client = SecretManagerClient()

# Verify
mock_default_service_credential.assert_called_once_with(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
mock_secret_manager_client.assert_called_once_with(
credentials=mock_credentials
)
assert client._credentials == mock_credentials
assert client._client == mock_secret_manager_client.return_value

@patch("google.cloud.secretmanager.SecretManagerServiceClient")
@patch("google.oauth2.service_account.Credentials.from_service_account_info")
def test_init_with_service_account_json(
self, mock_from_service_account_info, mock_secret_manager_client
):
"""Test initialization with service account JSON."""
# Setup
mock_credentials = MagicMock()
mock_from_service_account_info.return_value = mock_credentials
service_account_json = json.dumps({
"type": "service_account",
"project_id": "test-project",
"private_key_id": "key-id",
"private_key": "private-key",
"client_email": "[email protected]",
})

# Execute
client = SecretManagerClient(service_account_json=service_account_json)

# Verify
mock_from_service_account_info.assert_called_once_with(
json.loads(service_account_json)
)
mock_secret_manager_client.assert_called_once_with(
credentials=mock_credentials
)
assert client._credentials == mock_credentials
assert client._client == mock_secret_manager_client.return_value

@patch("google.cloud.secretmanager.SecretManagerServiceClient")
def test_init_with_auth_token(self, mock_secret_manager_client):
"""Test initialization with auth token."""
# Setup
auth_token = "test-token"
mock_credentials = MagicMock()

# Mock the entire credentials creation process
with (
patch("google.auth.credentials.Credentials") as mock_credentials_class,
patch("google.auth.transport.requests.Request") as mock_request,
):
# Configure the mock to return our mock_credentials when instantiated
mock_credentials_class.return_value = mock_credentials

# Execute
client = SecretManagerClient(auth_token=auth_token)

# Verify
mock_credentials.refresh.assert_called_once()
mock_secret_manager_client.assert_called_once_with(
credentials=mock_credentials
)
assert client._credentials == mock_credentials
assert client._client == mock_secret_manager_client.return_value

@patch(
"google.adk.tools.apihub_tool.clients.secret_client.default_service_credential"
)
def test_init_with_default_credentials_error(
self, mock_default_service_credential
):
"""Test initialization with default credentials that fails."""
# Setup
mock_default_service_credential.side_effect = Exception("Auth error")

# Execute and verify
with pytest.raises(
ValueError,
match="error occurred while trying to use default credentials",
):
SecretManagerClient()

def test_init_with_invalid_service_account_json(self):
"""Test initialization with invalid service account JSON."""
# Execute and verify
with pytest.raises(ValueError, match="Invalid service account JSON"):
SecretManagerClient(service_account_json="invalid-json")

@patch("google.cloud.secretmanager.SecretManagerServiceClient")
@patch(
"google.adk.tools.apihub_tool.clients.secret_client.default_service_credential"
)
def test_get_secret(
self, mock_default_service_credential, mock_secret_manager_client
):
"""Test getting a secret."""
# Setup
mock_credentials = MagicMock()
mock_default_service_credential.return_value = (
mock_credentials,
"test-project",
)

mock_client = MagicMock()
mock_secret_manager_client.return_value = mock_client
mock_response = MagicMock()
mock_response.payload.data.decode.return_value = "secret-value"
mock_client.access_secret_version.return_value = mock_response

# Execute - use default credentials instead of auth_token
client = SecretManagerClient()
result = client.get_secret(
"projects/test-project/secrets/test-secret/versions/latest"
)

# Verify
assert result == "secret-value"
mock_client.access_secret_version.assert_called_once_with(
name="projects/test-project/secrets/test-secret/versions/latest"
)
mock_response.payload.data.decode.assert_called_once_with("UTF-8")

@patch("google.cloud.secretmanager.SecretManagerServiceClient")
@patch(
"google.adk.tools.apihub_tool.clients.secret_client.default_service_credential"
)
def test_get_secret_error(
self, mock_default_service_credential, mock_secret_manager_client
):
"""Test getting a secret that fails."""
# Setup
mock_credentials = MagicMock()
mock_default_service_credential.return_value = (
mock_credentials,
"test-project",
)

mock_client = MagicMock()
mock_secret_manager_client.return_value = mock_client
mock_client.access_secret_version.side_effect = Exception("Secret error")

# Execute and verify - use default credentials instead of auth_token
client = SecretManagerClient()
with pytest.raises(Exception, match="Secret error"):
client.get_secret(
"projects/test-project/secrets/test-secret/versions/latest"
)
Original file line number Diff line number Diff line change
Expand Up @@ -604,11 +604,15 @@ def test_get_access_token_with_default_credentials(
mock.patch(
"google.adk.tools.application_integration_tool.clients.connections_client.default_service_credential",
return_value=(mock_credentials, "test_project_id"),
),
) as mock_default_service_credential,
mock.patch.object(mock_credentials, "refresh", return_value=None),
):
token = client._get_access_token()
assert token == "test_token"
# Verify default_service_credential is called with the correct scopes parameter
mock_default_service_credential.assert_called_once_with(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

def test_get_access_token_no_valid_credentials(
self, project, location, connection_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ def test_get_access_token_with_default_credentials(
mock.patch(
"google.adk.tools.application_integration_tool.clients.integration_client.default_service_credential",
return_value=(mock_credentials, "test_project_id"),
),
) as mock_default_service_credential,
mock.patch.object(mock_credentials, "refresh", return_value=None),
):
client = IntegrationClient(
Expand All @@ -552,6 +552,10 @@ def test_get_access_token_with_default_credentials(
)
token = client._get_access_token()
assert token == "test_token"
# Verify default_service_credential is called with the correct scopes parameter
mock_default_service_credential.assert_called_once_with(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

def test_get_access_token_no_valid_credentials(
self, project, location, integration_name, triggers, connection_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ def test_exchange_credential_use_default_credential_success(
assert result.auth_type == AuthCredentialTypes.HTTP
assert result.http.scheme == "bearer"
assert result.http.credentials.token == "mock_access_token"
mock_google_auth_default.assert_called_once()
# Verify google.auth.default is called with the correct scopes parameter
mock_google_auth_default.assert_called_once_with(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
mock_credentials.refresh.assert_called_once()


Expand Down