-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
Description
An agent can handle a single authenticated tool call, for example getting files from ones Google Drive. But having the agent do two such calls results in a "invalid grant" being thrown. For example. "List the files on my Google Drive and send the information to ". This requires the agent to first do an authenticated tool call to the Google Drive API, and then again to the Gmail API, resulting in a "invalid grant" in the later tool call.
To Reproduce
Example code to reproduce. Also requires the helpers.py according to the documentation here:
https://google.github.io/adk-docs/tools/authentication/#authentication-logic-within-the-tool-function
import os
import asyncio
import urllib.parse
from dotenv import load_dotenv
load_dotenv()
import warnings
# Ignore all warnings
warnings.filterwarnings("ignore")
import logging
logging.basicConfig(level=logging.ERROR)
from google.adk.artifacts.in_memory_artifact_service import InMemoryArtifactService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
from helpers import is_pending_auth_event, get_function_call_id, get_function_call_auth_config, get_user_input
from google.adk.agents import LlmAgent
from google.adk.auth import AuthConfig, AuthCredential, AuthCredentialTypes, OAuth2Auth
from google.adk.tools.openapi_tool.openapi_spec_parser.openapi_toolset import OpenAPIToolset
from fastapi.openapi.models import OAuth2, OAuthFlowAuthorizationCode, OAuthFlows
required_scopes: dict[str, str] = {
"https://www.googleapis.com/auth/drive": "drive scope",
"https://mail.google.com/": "gmail scope"
}
auth_scheme = OAuth2(
flows=OAuthFlows(
authorizationCode=OAuthFlowAuthorizationCode(
authorizationUrl="https://accounts.google.com/o/oauth2/auth",
tokenUrl="https://oauth2.googleapis.com/token",
scopes=required_scopes,
)
)
)
auth_credential = AuthCredential(
auth_type=AuthCredentialTypes.OAUTH2,
oauth2=OAuth2Auth(
client_id=os.getenv("GOOGLE_CLIENT_ID"),
client_secret=os.getenv("GOOGLE_CLIENT_SECRET"),
redirect_uri=os.getenv("REDIRECT_URI"),
),
)
gmail_spec_path = <GMAIL_SPEC_PATH>
gdrive_spec_path = <GDRIVE_SPEC_PATH>
with open(gmail_spec_path, "r") as f:
gmail_spec_str = f.read()
gmail_api_toolset = OpenAPIToolset(spec_str=gmail_spec_str, spec_str_type='yaml', auth_scheme=auth_scheme, auth_credential=auth_credential).get_tools()
with open(gdrive_spec_path, "r") as f:
gdrive_spec_str = f.read()
gdrive_api_toolset = OpenAPIToolset(spec_str=gdrive_spec_str, spec_str_type='yaml', auth_scheme=auth_scheme, auth_credential=auth_credential).get_tools()
gmail_agent = LlmAgent(
name="gmail_agent",
description="A google mail agent with access to tools for interacting with Google Mail",
model="gemini-2.0-flash",
instruction="You are a helpful agent for interacting with Google Mail. Help the user with their queries.",
tools=[*gmail_api_toolset],
)
gdrive_agent = LlmAgent(
name="gdrive_agent",
description="A google drive agent with access to tools for interacting with Google Drive",
model="gemini-2.0-flash",
instruction="You are a helpful agent for interacting with Google Drive. Help the user with their queries.",
tools=[*gdrive_api_toolset],
)
root_agent = LlmAgent(
name="Root",
description="Root agent",
model="gemini-2.0-flash",
instruction="""You are a helpful assistant that can help with tasks and questions.
You have access to a suite of helper assistants that can help you complete the task given to you by the user.
Your main job is to coordinate the helper assistants to complete the task. Before doing so, always think deeply about the user's request and decide which assistants to use and in what order.
Always keep the user updated on your progress.""",
sub_agents=[gdrive_agent, gmail_agent],
)
async def async_main():
"""
Main asynchronous function orchestrating the agent interaction and authentication flow.
"""
# --- Step 1: Service Initialization ---
# Use in-memory services for session and artifact storage (suitable for demos/testing).
session_service = InMemorySessionService()
artifacts_service = InMemoryArtifactService()
# Create a new user session to maintain conversation state.
session = session_service.create_session(
state={}, # Optional state dictionary for session-specific data
app_name='my_app', # Application identifier
user_id='user' # User identifier
)
# --- Step 2: Initial User Query ---
# Define the user's initial request.
query = 'List all files on my drive and draft an email with the information to [email protected]. Use placeholders where you need. Send from [email protected]'
print(f"user: {query}")
# Format the query into the Content structure expected by the ADK Runner.
content = types.Content(role='user', parts=[types.Part(text=query)])
# Initialize the ADK Runner
runner = Runner(
app_name='my_app',
agent=root_agent,
artifact_service=artifacts_service,
session_service=session_service,
)
# --- Step 3: Send Query and Handle Potential Auth Request ---
print("\nRunning agent with initial query...")
events_async = runner.run_async(
session_id=session.id, user_id='user', new_message=content
)
# Variables to store details if an authentication request occurs.
auth_request_event_id, auth_config = None, None
# Iterate through the events generated by the first run.
async for event in events_async:
if event.content and event.content.parts:
if text := ''.join(part.text or '' for part in event.content.parts):
print(f'[{event.author}]: {text}')
if is_pending_auth_event(event):
print("--> Authentication required by agent.")
auth_request_event_id = get_function_call_id(event)
auth_config = get_function_call_auth_config(event)
break # Exit the initial event loop for auth
if auth_request_event_id and auth_config:
base_auth_uri = auth_config.exchanged_auth_credential.oauth2.auth_uri
parsed_uri = urllib.parse.urlparse(base_auth_uri)
query_params = urllib.parse.parse_qs(parsed_uri.query)
query_params['redirect_uri'] = [os.getenv("REDIRECT_URI")]
query_params['access_type'] = ['offline']
query_params['prompt'] = ['consent']
if 'scope' not in query_params and auth_config.exchanged_auth_credential.oauth2.scopes:
query_params['scope'] = [' '.join(auth_config.exchanged_auth_credential.oauth2.scopes)]
updated_query = urllib.parse.urlencode(query_params, doseq=True)
auth_request_uri = urllib.parse.urlunparse(
(parsed_uri.scheme, parsed_uri.netloc, parsed_uri.path,
parsed_uri.params, updated_query, parsed_uri.fragment)
)
print("--- User Action Required ---")
auth_response_uri = await get_user_input(
f'1. Please open this URL in your browser to log in:\n {auth_request_uri}\n\n'
f'2. After successful login and authorization, your browser will be redirected.\n'
f' Copy the *entire* URL from the browser\'s address bar.\n\n'
f'3. Paste the copied URL here and press Enter:\n\n> '
)
auth_config.exchanged_auth_credential.oauth2.auth_response_uri = auth_response_uri.strip()
auth_config.exchanged_auth_credential.oauth2.redirect_uri = os.getenv("REDIRECT_URI")
auth_content = types.Content(
role='user',
parts=[
types.Part(
function_response=types.FunctionResponse(
id=auth_request_event_id,
name='adk_request_credential',
response=auth_config.model_dump(),
)
)
],
)
print("Submitting authentication response...")
events_async = runner.run_async(
session_id=session.id,
user_id='user',
new_message=auth_content,
)
print("\n--- Agent Response after Authentication ---")
async for event in events_async:
if event.content and event.content.parts:
if text := ''.join(part.text or '' for part in event.content.parts):
print(f'[{event.author}]: {text}')
if __name__ == '__main__':
asyncio.run(async_main())
Expected behavior
Expecting behavior to go through the auth flow ones to grant access to the APIs, then the agent fulfilling the task without "invalid grant" being thrown.
Desktop (please complete the following information):
- OS: [e.g. iOS] macOS Sequoia 15.4.1
- Python version(python -V): 3.13.2
- ADK version(pip show google-adk): 0.3.0