-
Notifications
You must be signed in to change notification settings - Fork 30
feat(a2a): Add A2A authentication middleware with TIP token propagation support #304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
1. **Runner run_processor support** - Add `run_processor` parameter to Runner.__init__() and Runner.run() - Support multiple sources with priority: run() arg > __init__ arg > agent.run_processor > NoOpRunProcessor 2. **Fix IdentityClient region initialization** - Change default region from hardcoded "cn-beijing" to None in WorkloadTokenManager - Auto-detect region using _get_default_region() when not specified - Import _get_default_region from auth_config module Changes: - veadk/runner.py: Add run_processor support with priority chain - veadk/integrations/ve_identity/token_manager.py: Fix region initialization - veadk/integrations/ve_identity/auth_mixins.py: Minor formatting fix
This commit introduces comprehensive authentication support for A2A (Agent-to-Agent) communication, enabling secure credential management and automatic token injection. Key Changes: 1. **VeCredentialStore** (veadk/a2a/credentials.py) - Implement custom credential store with user ID and session ID support - Support both synchronous and asynchronous credential operations - Prioritize user ID over session ID for credential retrieval 2. **AuthenticatedA2ARequestConverter** (veadk/a2a/ve_request_converter.py) - Extract JWT tokens from Authorization headers - Parse user ID from JWT payload (sub field) 3. **RemoteVeAgent** (veadk/a2a/remote_ve_agent.py) - Add credential_service parameter to constructor - Implement _run_async_impl with automatic auth token injection - Inject Bearer tokens into httpx client headers before requests - Add comprehensive error handling and logging 4. **VeA2AServer** (veadk/a2a/ve_a2a_server.py) - Add credential_service parameter to constructor - Integrate with AuthenticatedA2ARequestConverter 5. **Unit Tests** (tests/) - Add comprehensive test coverage for VeCredentialStore - Add tests for AuthenticatedA2ARequestConverter - All tests passing with proper fixtures and mocking Benefits: - Seamless authentication for remote agent calls - Automatic credential propagation across agent boundaries - Support for both session-based and user-based authentication - Clean separation of concerns with dedicated credential service Breaking Changes: - None (backward compatible - credential_service is optional) Related: A2A authentication and secure agent communication
…pport - Add A2AAuthMiddleware for extracting auth tokens from requests - Support both Authorization header and query string authentication methods - Implement TIP (Trust Identity Propagation) token exchange via IdentityClient - Add VeCredentialService integration for credential storage and retrieval - Support workload token generation and propagation in request scope - Add RemoteVeAgent with automatic credential injection from context - Enhance credential service with ADK BaseCredentialService interface - Add comprehensive test coverage for middleware and credential service Key features: * Extract JWT tokens and delegation chains from incoming requests * Exchange TIP tokens for workload access tokens using IdentityClient * Store credentials in credential service with app_name and user_id scoping * Inject authentication tokens into remote agent HTTP clients at runtime * Support multiple authentication methods (header/querystring) This enables secure A2A communication with automatic credential propagation across the Volcengine Agent runtimes.
veadk/a2a/ve_a2a_server.py
Outdated
| url: str, | ||
| app_name: str, | ||
| short_term_memory: ShortTermMemory, | ||
| credential_service: BaseCredentialService, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
break change for existing a2a app.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import time
import requests
from google.adk.run import RemoteAgent
OAUTH_TOKEN_URL = "https://your-idp.com/oauth/token"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
class OAuthSession:
def init(self):
self.access_token = None
self.refresh_token = None
self.expires_at = 0
def authenticate(self):
resp = requests.post(OAUTH_TOKEN_URL, data={
"grant_type": "password",
"username": "[email protected]",
"password": "pass",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
}).json()
self.access_token = resp["access_token"]
self.refresh_token = resp["refresh_token"]
self.expires_at = time.time() + resp["expires_in"]
def refresh(self):
resp = requests.post(OAUTH_TOKEN_URL, data={
"grant_type": "refresh_token",
"refresh_token": self.refresh_token,
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
}).json()
self.access_token = resp["access_token"]
self.expires_at = time.time() + resp["expires_in"]
def get_token(self):
if time.time() >= self.expires_at:
self.refresh()
return self.access_token
oauth = OAuthSession()
oauth.authenticate()
def build_agent():
return RemoteAgent(
url="https://remote-agent.company.com/mcp",
headers={
"Authorization": f"Bearer {oauth.get_token()}"
}
)
agent = build_agent()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
break change for existing a2a app.
good point, have fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces comprehensive A2A (Agent-to-Agent) authentication middleware for the Volcengine Agent ecosystem, enabling secure service-to-service communication with TIP (Trust Identity Propagation) token support.
Key Changes:
- New authentication utilities for JWT parsing, delegation chain extraction, and flexible auth config building
- A2A authentication middleware that extracts credentials from requests and exchanges TIP tokens for workload access tokens
- VeCredentialService for app/user-scoped credential management extending ADK's BaseCredentialService
- Enhanced RemoteVeAgent with automatic credential injection from InvocationContext
- Global singleton IdentityClient pattern for improved connection pooling and credential caching
Reviewed Changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| veadk/utils/auth.py | New authentication utilities module providing JWT parsing, delegation chain extraction, and AuthConfig building functions |
| veadk/tools/builtin_tools/agent_authorization.py | Refactored to use new auth utilities and support actor chain validation in permission checks |
| veadk/runner.py | Added credential_service parameter support for integration with VeCredentialService |
| veadk/integrations/ve_identity/token_manager.py | Updated type hints to support CallbackContext in addition to ToolContext and ReadonlyContext |
| veadk/integrations/ve_identity/identity_client.py | Improved error handling for STS operations and added original_callers parameter to check_permission |
| veadk/integrations/ve_identity/auth_processor.py | Refactored to use centralized get_default_identity_client helper |
| veadk/integrations/ve_identity/auth_mixins.py | Updated to use get_default_identity_client for consistent client management |
| veadk/integrations/ve_identity/auth_config.py | Added get_default_identity_client function to retrieve global IdentityClient instance |
| veadk/integrations/ve_identity/init.py | Exported get_default_identity_client utility for public use |
| veadk/configs/auth_configs.py | Added singleton IdentityClient management to VeIdentityConfig with lazy initialization |
| veadk/auth/credential_service.py | New VeCredentialService implementing app/user-scoped credential storage extending BaseCredentialService |
| veadk/a2a/ve_middlewares.py | New A2AAuthMiddleware for extracting auth tokens and exchanging TIP tokens with factory function |
| veadk/a2a/ve_a2a_server.py | Integrated credential_service parameter into VeA2AServer and init_app |
| veadk/a2a/remote_ve_agent.py | Enhanced with _inject_auth_token method for automatic credential injection from InvocationContext |
| tests/test_ve_a2a_middlewares.py | Comprehensive test suite for A2A authentication middleware |
| tests/auth/test_credential_service.py | Unit tests for VeCredentialService covering various credential management scenarios |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
veadk/a2a/ve_a2a_server.py
Outdated
| url: str, | ||
| app_name: str, | ||
| short_term_memory: ShortTermMemory, | ||
| credential_service: BaseCredentialService, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import time
import requests
from google.adk.run import RemoteAgent
OAUTH_TOKEN_URL = "https://your-idp.com/oauth/token"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
class OAuthSession:
def init(self):
self.access_token = None
self.refresh_token = None
self.expires_at = 0
def authenticate(self):
resp = requests.post(OAUTH_TOKEN_URL, data={
"grant_type": "password",
"username": "[email protected]",
"password": "pass",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
}).json()
self.access_token = resp["access_token"]
self.refresh_token = resp["refresh_token"]
self.expires_at = time.time() + resp["expires_in"]
def refresh(self):
resp = requests.post(OAUTH_TOKEN_URL, data={
"grant_type": "refresh_token",
"refresh_token": self.refresh_token,
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
}).json()
self.access_token = resp["access_token"]
self.expires_at = time.time() + resp["expires_in"]
def get_token(self):
if time.time() >= self.expires_at:
self.refresh()
return self.access_token
oauth = OAuthSession()
oauth.authenticate()
def build_agent():
return RemoteAgent(
url="https://remote-agent.company.com/mcp",
headers={
"Authorization": f"Bearer {oauth.get_token()}"
}
)
agent = build_agent()
| # Example 2: Using Bearer token in header | ||
| # Example 2: Using static Bearer token in header for initialization | ||
| agent = RemoteVeAgent( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't inject the RemoteA2aAgent, with fetch_token() instead.
remote = RemoteA2aAgent(
name="secure-agent",
url="https://my-secure-agent.example.com/a2a",
headers={"Authorization": f"Bearer {fetch_token()}"},
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only support header with Authorization, for querystring using url only
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't inject the RemoteA2aAgent, with fetch_token() instead. remote = RemoteA2aAgent( name="secure-agent", url="https://my-secure-agent.example.com/a2a", headers={"Authorization": f"Bearer {fetch_token()}"}, )
@cuericlee 在 RemoteA2aAgent 初始化时,fetch_token() 只会被调用一次,返回的 token 会被固定在 headers 中。之后即使 token 过期或需要刷新,headers 也不会更新。不能这么搞的
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class VeCredentialService(BaseCredentialService): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not extend from InMemoryCredentialService
https://github.com/google/adk-python/blob/main/src/google/adk/auth/credential_service/in_memory_credential_service.py#L29
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cuericlee since InMemoryCredentialService is an experimental feature, and only the abstract class BaseCredentialService needs to be passed at all locations. to prevent the impact caused by changes to InMemoryCredentialService, VeCredentialService is separately defined based on the abstract definition.
Updated the VeA2AServer constructor and init_app function to allow credential_service to be optional by defaulting it to None. This increases flexibility for cases where credentials are not required.
Replaces incorrect 'VeA2ACredentialService' with 'VeCredentialService' in usage examples within docstrings to ensure accuracy and prevent confusion for users.
Added explicit initialization of the 'token' variable to None in the _extract_token method to ensure it is always defined before use.
Changed the example values for 'Type' in the docstring of the permission check method to use lowercase (e.g., 'user', 'action', 'agent') for consistency and clarity.
Updated the type annotation for _identity_client to use Optional["IdentityClient"] for better type clarity. Also removed unnecessary whitespace in ve_middlewares.py.
cuericlee
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
Introduces a new `to_a2a` utility in `veadk.a2a.utils.agent_to_a2a` to wrap Google ADK's A2A conversion with optional VeADK authentication and credential service integration. Adds comprehensive tests for the auth switch, refactors `RemoteVeAgent` to ensure pre-run initialization/auth logic always executes, and renames `credential_service.py` to `ve_credential_service.py` for clarity.
Deleted test_to_a2a_auth_switch.py as it is no longer needed. Fixed import formatting in several files for consistency by removing extra spaces. Also removed an unused import in identity_client.py.
Added detailed instructions for enabling and configuring authentication in VeADK A2A Server, including server and client usage, supported authentication methods, and new parameters for the to_a2a function.
cuericlee
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
cuericlee
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
Add A2A authentication middleware with TIP token propagation support
This PR introduces comprehensive A2A (Agent-to-Agent) authentication middleware with TIP (Trust Identity Propagation) token support for secure communication across the Volcengine Agent ecosystem.
Key Features
Usage Example
Basic A2A Server Setup with Authentication
Client-side Remote Agent with Auto-authentication
Technical Details
Authentication Flow
Credential Injection
Security Features
Changes
A2AAuthMiddlewareclass inveadk/a2a/ve_middlewares.pybuild_a2a_auth_middleware()factory functionRemoteVeAgentwith_inject_auth_token()methodThis enables secure A2A communication with automatic credential propagation across the Volcengine Agent runtimes.