Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions examples/scaffolding/contrib/a2a/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Scaffolding A2A (Agent2Agent) Example

This example shows how a Scaffolding controller can delegate work to remote
agents that speak the [A2A (Agent2Agent) protocol](https://a2a-protocol.org/),
the agent-to-agent counterpart to the MCP tool-calling contrib. The generation
model decides which remote agent to call; the `A2AWorker` forwards the message
over A2A and feeds the reply back to the model for a final answer.

This is a **client-side** integration: Scaffolding acts as an A2A client that
consumes other agents. (Exposing a Scaffolding pipeline *as* an A2A server is a
possible follow-up.)

## Install

```bash
pip install a2a-sdk httpx uvicorn
```

`a2a-sdk` is only needed when actually talking to a remote agent; the contrib
imports it lazily, and the unit tests do not require it.

## Step 1: Start a remote A2A agent

A minimal reference agent server is included:

```bash
python weather_agent_server.py --port 9999
```

This exposes a `weather_agent` whose agent card is discoverable at
`http://0.0.0.0:9999/.well-known/agent-card.json`. You can also point the
example at any other A2A-compatible agent server.

## Step 2: Run the orchestrator

```bash
python a2a_run.py \
--API_KEY YOUR_API_KEY \
--base_url https://your-openai-compatible-endpoint/v1 \
--model your-model \
--agent_urls http://0.0.0.0:9999 \
--prompt "What is the weather in LA?"
```

The generation model receives the remote agents as callable tools, delegates to
`weather_agent`, and summarizes its reply.

## Files

| File | Role |
|------|------|
| `a2a_run.py` | Scaffolding A2A orchestrator runner (client) |
| `weather_agent_server.py` | Minimal reference A2A agent server for local testing |

> `a2a-sdk` server/client APIs evolve across versions. The scripts target the
> SDK's published "helloworld" pattern; adjust imports if your installed version
> differs.
84 changes: 84 additions & 0 deletions examples/scaffolding/contrib/a2a/a2a_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Run a Scaffolding A2A orchestrator against one or more remote A2A agents.

The generation side uses an OpenAI-compatible endpoint (any vendor, or a local
``trtllm-serve``); the orchestration side talks the Agent2Agent protocol via
``A2AWorker``. See README.md for how to start a sample remote agent server.
"""

import argparse
import asyncio

from openai import AsyncOpenAI

from tensorrt_llm.scaffolding import OpenaiWorker, ScaffoldingLlm
from tensorrt_llm.scaffolding.contrib.a2a import A2AController, A2AWorker
from tensorrt_llm.scaffolding.contrib.mcp.chat_handler import chat_handler
from tensorrt_llm.scaffolding.contrib.mcp.chat_task import ChatTask


def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument(
"--base_url",
type=str,
default="https://dashscope.aliyuncs.com/compatible-mode/v1",
help="OpenAI-compatible base URL for the generation model.",
)
parser.add_argument("--model", type=str, default="qwen-plus-latest")
parser.add_argument("--API_KEY", type=str)
parser.add_argument(
"--agent_urls",
type=str,
nargs="+",
default=["http://0.0.0.0:9999"],
help="Base URLs of the remote A2A agents to orchestrate.",
)
parser.add_argument("--prompt", type=str, default="What is the weather like today in LA?")
return parser.parse_args()


async def main():
args = parse_arguments()

client = AsyncOpenAI(api_key=args.API_KEY, base_url=args.base_url)
generation_worker = OpenaiWorker(client, args.model)
generation_worker.register_task_handler(ChatTask, chat_handler)

a2a_worker = await A2AWorker.init_with_urls(args.agent_urls)

controller = A2AController()
llm = ScaffoldingLlm(
controller,
{
A2AController.WorkerTag.GENERATION: generation_worker,
A2AController.WorkerTag.A2A: a2a_worker,
},
)

future = llm.generate_async(args.prompt)
result = await future.aresult()
print(f"\nresult is {result.outputs[0].text}\n")

print("shutting down...")
llm.shutdown()
generation_worker.shutdown()
await a2a_worker.async_shutdown()
print("shut down done")
Comment on lines +54 to +80

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Ensure worker shutdown runs even when generation fails.

Cleanup is currently best-effort on success only. Wrap orchestration in try/finally so llm, generation_worker, and a2a_worker are always closed.

Suggested patch
 async def main():
     args = parse_arguments()

     client = AsyncOpenAI(api_key=args.API_KEY, base_url=args.base_url)
     generation_worker = OpenaiWorker(client, args.model)
     generation_worker.register_task_handler(ChatTask, chat_handler)

     a2a_worker = await A2AWorker.init_with_urls(args.agent_urls)

     controller = A2AController()
     llm = ScaffoldingLlm(
         controller,
         {
             A2AController.WorkerTag.GENERATION: generation_worker,
             A2AController.WorkerTag.A2A: a2a_worker,
         },
     )
-
-    future = llm.generate_async(args.prompt)
-    result = await future.aresult()
-    print(f"\nresult is {result.outputs[0].text}\n")
-
-    print("shutting down...")
-    llm.shutdown()
-    generation_worker.shutdown()
-    await a2a_worker.async_shutdown()
-    print("shut down done")
+    try:
+        future = llm.generate_async(args.prompt)
+        result = await future.aresult()
+        print(f"\nresult is {result.outputs[0].text}\n")
+    finally:
+        print("shutting down...")
+        llm.shutdown()
+        generation_worker.shutdown()
+        await a2a_worker.async_shutdown()
+        print("shut down done")
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@examples/scaffolding/contrib/a2a/a2a_run.py` around lines 54 - 80, The main()
function does not guarantee cleanup when generation fails because the shutdown
calls for llm, generation_worker, and a2a_worker are only executed on the
success path. Wrap the generation logic (the llm.generate_async call and the
await future.aresult() call along with the result printing) in a try block, and
move all shutdown calls (llm.shutdown(), generation_worker.shutdown(), and await
a2a_worker.async_shutdown()) into a finally block to ensure they always execute
regardless of whether generation succeeds or raises an exception.



if __name__ == "__main__":
asyncio.run(main())
87 changes: 87 additions & 0 deletions examples/scaffolding/contrib/a2a/weather_agent_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A minimal reference A2A agent server used to exercise the A2A contrib.

This follows the ``a2a-sdk`` "helloworld" server pattern and exposes a single
``weather_agent`` that returns a canned reply. Run it with::

pip install a2a-sdk uvicorn
python weather_agent_server.py --port 9999

then point ``a2a_run.py --agent_urls http://0.0.0.0:9999`` at it.

Note: ``a2a-sdk`` server APIs evolve; this script targets the published
helloworld example. Adjust imports if your installed SDK version differs.
"""

import argparse

import uvicorn
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import AgentCapabilities, AgentCard, AgentSkill
from a2a.utils import new_agent_text_message


class WeatherAgentExecutor(AgentExecutor):
"""Returns a canned weather reply regardless of the incoming message."""

async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
await event_queue.enqueue_event(new_agent_text_message("It is sunny in LA, around 75F."))

async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
raise NotImplementedError("cancel is not supported by this agent")


def build_app(host: str, port: int) -> A2AStarletteApplication:
skill = AgentSkill(
id="weather",
name="weather",
description="Returns the current weather for a location.",
tags=["weather"],
examples=["What is the weather in LA?"],
)
agent_card = AgentCard(
name="weather_agent",
description="A demo agent that reports the weather.",
url=f"http://{host}:{port}/",
version="1.0.0",
Comment on lines +51 to +63

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Advertise a reachable endpoint, not the bind-all address.

AgentCard.url currently mirrors --host; with the default 0.0.0.0, the card can advertise a non-routable/ambiguous URL. Keep bind host and advertised host separate (or normalize 0.0.0.0 to 127.0.0.1 for local demos), and align a2a_run.py/README defaults accordingly.

Suggested patch
-def build_app(host: str, port: int) -> A2AStarletteApplication:
+def build_app(bind_host: str, port: int, public_host: str | None = None) -> A2AStarletteApplication:
+    advertised_host = public_host or ("127.0.0.1" if bind_host == "0.0.0.0" else bind_host)
     skill = AgentSkill(
@@
     agent_card = AgentCard(
@@
-        url=f"http://{host}:{port}/",
+        url=f"http://{advertised_host}:{port}/",
@@
 def main():
@@
-    parser.add_argument("--host", default="0.0.0.0")
+    parser.add_argument("--host", default="0.0.0.0")
+    parser.add_argument("--public_host", default=None)
@@
-    app = build_app(args.host, args.port)
+    app = build_app(args.host, args.port, args.public_host)
🧰 Tools
🪛 ast-grep (0.43.0)

[warning] 61-61: Do not make http calls without encryption
Context: f"http://{host}:{port}/"
Note: [CWE-319].

(requests-http)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@examples/scaffolding/contrib/a2a/weather_agent_server.py` around lines 51 -
63, The AgentCard.url is currently using the bind host parameter directly, which
defaults to 0.0.0.0 - a non-routable address that should not be advertised.
Modify the url construction in the build_app function to normalize 0.0.0.0 to
127.0.0.1 (or another reachable address) for the advertised endpoint, keeping
the bind host separate from the advertised host. This ensures the AgentCard
advertises a reachable URL that clients can actually connect to.

default_input_modes=["text"],
default_output_modes=["text"],
capabilities=AgentCapabilities(streaming=False),
skills=[skill],
)
request_handler = DefaultRequestHandler(
agent_executor=WeatherAgentExecutor(),
task_store=InMemoryTaskStore(),
)
return A2AStarletteApplication(agent_card=agent_card, http_handler=request_handler)


def main():
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="0.0.0.0")
parser.add_argument("--port", type=int, default=9999)
args = parser.parse_args()

app = build_app(args.host, args.port)
uvicorn.run(app.build(), host=args.host, port=args.port)


if __name__ == "__main__":
main()
27 changes: 27 additions & 0 deletions tensorrt_llm/scaffolding/contrib/a2a/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .a2a_controller import A2AController
from .a2a_task import A2AListTask, A2ASendTask
from .a2a_utils import A2AAgentConnection, AgentInfo
from .a2a_worker import A2AWorker

__all__ = [
"A2AController",
"A2AWorker",
"A2ASendTask",
"A2AListTask",
"A2AAgentConnection",
"AgentInfo",
]
Loading