From 243462e23e6b66dae8e19bb92b3157ce2ceea781 Mon Sep 17 00:00:00 2001 From: Rohan Mehta Date: Tue, 22 Jul 2025 13:12:02 -0400 Subject: [PATCH 1/2] Update codex action to use repo syntax (#1218) Action isn't published yet, so gotta do this --- .github/workflows/codex.yml | 2 +- src/agents/agent.py | 39 +++- src/agents/items.py | 2 +- src/agents/run.py | 1 + src/agents/run_context.py | 5 +- src/agents/tool.py | 3 + src/agents/tool_context.py | 10 +- tests/test_as_streaming_tool.py | 335 ++++++++++++++++++++++++++++++++ 8 files changed, 383 insertions(+), 14 deletions(-) create mode 100644 tests/test_as_streaming_tool.py diff --git a/.github/workflows/codex.yml b/.github/workflows/codex.yml index 3baa1564d..556e8d4d8 100644 --- a/.github/workflows/codex.yml +++ b/.github/workflows/codex.yml @@ -53,7 +53,7 @@ jobs: # Note it is possible that the `verify` step internal to Run Codex will # fail, in which case the work to setup the repo was worthless :( - name: Run Codex - uses: openai/codex-action@latest + uses: openai/codex/.github/actions/codex@main with: openai_api_key: ${{ secrets.PROD_OPENAI_API_KEY }} github_token: ${{ secrets.GITHUB_TOKEN }} diff --git a/src/agents/agent.py b/src/agents/agent.py index b67a12c0d..f0161c4d8 100644 --- a/src/agents/agent.py +++ b/src/agents/agent.py @@ -27,7 +27,7 @@ if TYPE_CHECKING: from .lifecycle import AgentHooks from .mcp import MCPServer - from .result import RunResult + from .result import RunResult, RunResultStreaming @dataclass @@ -233,7 +233,9 @@ def as_tool( self, tool_name: str | None, tool_description: str | None, + *, custom_output_extractor: Callable[[RunResult], Awaitable[str]] | None = None, + stream_inner_events: bool = False, ) -> Tool: """Transform this agent into a tool, callable by other agents. @@ -258,17 +260,36 @@ def as_tool( async def run_agent(context: RunContextWrapper, input: str) -> str: from .run import Runner - output = await Runner.run( - starting_agent=self, - input=input, - context=context.context, - ) + output_run: RunResult | RunResultStreaming + if stream_inner_events: + from .stream_events import RunItemStreamEvent + + sub_run = Runner.run_streamed( + self, + input=input, + context=context.context, + ) + parent_queue = getattr(context, "_event_queue", None) + async for ev in sub_run.stream_events(): + if parent_queue is not None and isinstance(ev, RunItemStreamEvent): + if ev.name in ("tool_called", "tool_output"): + parent_queue.put_nowait(ev) + output_run = sub_run + else: + output_run = await Runner.run( + starting_agent=self, + input=input, + context=context.context, + ) + if custom_output_extractor: - return await custom_output_extractor(output) + return await custom_output_extractor(cast(Any, output_run)) - return ItemHelpers.text_message_outputs(output.new_items) + return ItemHelpers.text_message_outputs(output_run.new_items) - return run_agent + tool = run_agent + tool.stream_inner_events = stream_inner_events + return tool async def get_system_prompt(self, run_context: RunContextWrapper[TContext]) -> str | None: """Get the system prompt for the agent.""" diff --git a/src/agents/items.py b/src/agents/items.py index fd13031e2..fc016b3a7 100644 --- a/src/agents/items.py +++ b/src/agents/items.py @@ -299,7 +299,7 @@ def tool_call_output_item( ) -> FunctionCallOutput: """Creates a tool call output item from a tool call and its output.""" return { - "call_id": tool_call.call_id, + "call_id": str(tool_call.call_id), "output": output, "type": "function_call_output", } diff --git a/src/agents/run.py b/src/agents/run.py index 2dd9524bb..900f385fb 100644 --- a/src/agents/run.py +++ b/src/agents/run.py @@ -576,6 +576,7 @@ def run_streamed( trace=new_trace, context_wrapper=context_wrapper, ) + context_wrapper._event_queue = streamed_result._event_queue # Kick off the actual agent loop in the background and return the streamed result object. streamed_result._run_impl_task = asyncio.create_task( diff --git a/src/agents/run_context.py b/src/agents/run_context.py index 579a215f2..a25ccf882 100644 --- a/src/agents/run_context.py +++ b/src/agents/run_context.py @@ -1,5 +1,6 @@ +import asyncio from dataclasses import dataclass, field -from typing import Any, Generic +from typing import Any, Generic, Optional from typing_extensions import TypeVar @@ -24,3 +25,5 @@ class RunContextWrapper(Generic[TContext]): """The usage of the agent run so far. For streamed responses, the usage will be stale until the last chunk of the stream is processed. """ + + _event_queue: Optional[asyncio.Queue[Any]] = field(default=None, init=False, repr=False) diff --git a/src/agents/tool.py b/src/agents/tool.py index 16e149904..affab3f32 100644 --- a/src/agents/tool.py +++ b/src/agents/tool.py @@ -97,6 +97,9 @@ def __post_init__(self): if self.strict_json_schema: self.params_json_schema = ensure_strict_json_schema(self.params_json_schema) + stream_inner_events: bool = False + """Whether to stream inner events when used as an agent tool.""" + @dataclass class FileSearchTool: diff --git a/src/agents/tool_context.py b/src/agents/tool_context.py index 16845badd..9127d30c9 100644 --- a/src/agents/tool_context.py +++ b/src/agents/tool_context.py @@ -1,3 +1,4 @@ +import asyncio from dataclasses import dataclass, field, fields from typing import Any, Optional @@ -24,11 +25,13 @@ class ToolContext(RunContextWrapper[TContext]): tool_call_id: str = field(default_factory=_assert_must_pass_tool_call_id) """The ID of the tool call.""" + _event_queue: asyncio.Queue[Any] | None = field(default=None, init=False, repr=False) + @classmethod def from_agent_context( cls, context: RunContextWrapper[TContext], - tool_call_id: str, + tool_call_id: str | int, tool_call: Optional[ResponseFunctionToolCall] = None, ) -> "ToolContext": """ @@ -39,4 +42,7 @@ def from_agent_context( f.name: getattr(context, f.name) for f in fields(RunContextWrapper) if f.init } tool_name = tool_call.name if tool_call is not None else _assert_must_pass_tool_name() - return cls(tool_name=tool_name, tool_call_id=tool_call_id, **base_values) + obj = cls(tool_name=tool_name, tool_call_id=str(tool_call_id), **base_values) + if hasattr(context, "event_queue"): + obj.event_queue = context.event_queue + return obj diff --git a/tests/test_as_streaming_tool.py b/tests/test_as_streaming_tool.py new file mode 100644 index 000000000..2fce89d1d --- /dev/null +++ b/tests/test_as_streaming_tool.py @@ -0,0 +1,335 @@ +import pytest + +from agents import Agent, ModelSettings, RunConfig, Runner, function_tool +from agents.models.openai_chatcompletions import OpenAIChatCompletionsModel + + +@function_tool +async def grab(x: int) -> int: + return x * 2 + + +async def collect_tool_events(run): + events = [] + async for ev in run.stream_events(): + if hasattr(ev, "name"): + item = getattr(ev, "item", None) + events.append((ev.name, getattr(item, "name", None))) + else: + events.append((ev.type, None)) + return events + + +@pytest.mark.asyncio +async def test_stream_inner_events_single_agent(monkeypatch): + """Verify we stream inner tool events for a single agent.""" + + async def fake_stream(self): + yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": "grab_tool"})}) + yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": "grab"})}) + yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": "grab"})}) + yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": "grab_tool"})}) + + monkeypatch.setattr( + Runner, + "run_streamed", + lambda *args, **kwargs: type("R", (), {"stream_events": fake_stream})(), + ) + + sub = Agent(name="sub", instructions="", tools=[grab]) + tool = sub.as_tool("grab_tool", "test", stream_inner_events=True) + main = Agent(name="main", instructions="", tools=[tool]) + run = Runner.run_streamed(main, input="5") + names = await collect_tool_events(run) + assert names == [ + ("tool_called", "grab_tool"), + ("tool_called", "grab"), + ("tool_output", "grab"), + ("tool_output", "grab_tool"), + ] + + +@pytest.mark.asyncio +async def test_parallel_stream_inner_events(monkeypatch): + """Verify we stream inner tool events for parallel tools.""" + + async def fake_stream(self): + for tool_name in ("A", "B"): + yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": tool_name})}) + yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": "grab"})}) + yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": "grab"})}) + yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": tool_name})}) + + monkeypatch.setattr( + Runner, + "run_streamed", + lambda *args, **kwargs: type("R", (), {"stream_events": fake_stream})(), + ) + + sub = Agent(name="sub", instructions="", tools=[grab]) + a = sub.as_tool("A", "A", stream_inner_events=True) + b = sub.as_tool("B", "B", stream_inner_events=True) + main = Agent(name="main", instructions="", tools=[a, b]) + run = Runner.run_streamed( + main, + input="", + run_config=RunConfig(model_settings=ModelSettings(parallel_tool_calls=True)), + ) + names = await collect_tool_events(run) + assert names.count(("tool_called", "grab")) == 2 + assert names.count(("tool_output", "grab")) == 2 + assert ("tool_called", "A") in names and ("tool_called", "B") in names + + +@pytest.mark.asyncio +async def test_as_tool_streams_nested_tool_calls(monkeypatch): + """Ensure nested tool events surface when streaming a sub-agent.""" + + async def fake_stream(self): + yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": "wrapper"})}) + yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": "grab"})}) + yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": "grab"})}) + yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": "wrapper"})}) + + monkeypatch.setattr( + Runner, "run_streamed", lambda *a, **k: type("R", (), {"stream_events": fake_stream})() + ) + monkeypatch.setattr(OpenAIChatCompletionsModel, "_fetch_response", lambda *a, **k: None) + + sub = Agent(name="sub", instructions="", tools=[grab]) + tool = sub.as_tool("wrapper", "desc", stream_inner_events=True) + main = Agent(name="main", instructions="", tools=[tool]) + run = Runner.run_streamed(main, input="") + + events = await collect_tool_events(run) + assert events == [ + ("tool_called", "wrapper"), + ("tool_called", "grab"), + ("tool_output", "grab"), + ("tool_output", "wrapper"), + ] + + +@pytest.mark.asyncio +async def test_as_tool_parallel_streams(monkeypatch): + """Nested tool events appear for each tool when parallelized.""" + + async def fake_stream(self): + for name in ("A", "B"): + yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": name})}) + yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": "grab"})}) + yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": "grab"})}) + yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": name})}) + + monkeypatch.setattr( + Runner, "run_streamed", lambda *a, **k: type("R", (), {"stream_events": fake_stream})() + ) + monkeypatch.setattr(OpenAIChatCompletionsModel, "_fetch_response", lambda *a, **k: None) + + sub = Agent(name="sub", instructions="", tools=[grab]) + t1 = sub.as_tool("A", "A", stream_inner_events=True) + t2 = sub.as_tool("B", "B", stream_inner_events=True) + main = Agent(name="main", instructions="", tools=[t1, t2]) + run = Runner.run_streamed( + main, + input="", + run_config=RunConfig(model_settings=ModelSettings(parallel_tool_calls=True)), + ) + + events = await collect_tool_events(run) + assert events.count(("tool_called", "grab")) == 2 + assert events.count(("tool_output", "grab")) == 2 + assert ("tool_called", "A") in events and ("tool_called", "B") in events + + +@pytest.mark.asyncio +async def test_as_tool_error_propagation(monkeypatch): + """Errors inside a sub-agent surface as outer tool_error events.""" + + async def fake_stream(self): + yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": "outer"})}) + yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": "grab"})}) + yield type("E", (), {"name": "tool_error", "item": type("I", (), {"name": "grab"})}) + yield type("E", (), {"name": "tool_error", "item": type("I", (), {"name": "outer"})}) + + monkeypatch.setattr( + Runner, "run_streamed", lambda *a, **k: type("R", (), {"stream_events": fake_stream})() + ) + monkeypatch.setattr(OpenAIChatCompletionsModel, "_fetch_response", lambda *a, **k: None) + + sub = Agent(name="sub", instructions="", tools=[grab]) + tool = sub.as_tool("outer", "desc", stream_inner_events=True) + main = Agent(name="main", instructions="", tools=[tool]) + run = Runner.run_streamed(main, input="") + + events = await collect_tool_events(run) + assert ("tool_error", "outer") in events + + +@pytest.mark.asyncio +async def test_as_tool_empty_inner_run(monkeypatch): + """An inner agent that does nothing still emits wrapper events.""" + + async def fake_stream(self): + yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": "outer"})}) + yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": "outer"})}) + + monkeypatch.setattr( + Runner, "run_streamed", lambda *a, **k: type("R", (), {"stream_events": fake_stream})() + ) + monkeypatch.setattr(OpenAIChatCompletionsModel, "_fetch_response", lambda *a, **k: None) + + sub = Agent(name="sub", instructions="", tools=[grab]) + tool = sub.as_tool("outer", "desc", stream_inner_events=True) + main = Agent(name="main", instructions="", tools=[tool]) + run = Runner.run_streamed(main, input="") + + events = await collect_tool_events(run) + assert events == [ + ("tool_called", "outer"), + ("tool_output", "outer"), + ] + + +@pytest.mark.asyncio +async def test_as_tool_mixed_reasoning_and_tools(monkeypatch): + """Wrapper forwards reasoning and tool events in order.""" + + async def fake_stream(self): + yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": "outer"})}) + yield type( + "E", (), {"name": "reasoning_item_created", "item": type("I", (), {"name": "r"})} + ) + yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": "grab"})}) + yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": "grab"})}) + yield type( + "E", (), {"name": "reasoning_item_created", "item": type("I", (), {"name": "r2"})} + ) + yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": "outer"})}) + + monkeypatch.setattr( + Runner, "run_streamed", lambda *a, **k: type("R", (), {"stream_events": fake_stream})() + ) + monkeypatch.setattr(OpenAIChatCompletionsModel, "_fetch_response", lambda *a, **k: None) + + sub = Agent(name="sub", instructions="", tools=[grab]) + tool = sub.as_tool("outer", "desc", stream_inner_events=True) + main = Agent(name="main", instructions="", tools=[tool]) + run = Runner.run_streamed(main, input="") + + events = await collect_tool_events(run) + assert events == [ + ("tool_called", "outer"), + ("reasoning_item_created", "r"), + ("tool_called", "grab"), + ("tool_output", "grab"), + ("reasoning_item_created", "r2"), + ("tool_output", "outer"), + ] + + +@pytest.mark.asyncio +async def test_as_tool_multiple_inner_tools(monkeypatch): + """Two inner tools are streamed sequentially.""" + + async def fake_stream(self): + yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": "outer"})}) + yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": "grab"})}) + yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": "grab"})}) + yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": "grab2"})}) + yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": "grab2"})}) + yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": "outer"})}) + + monkeypatch.setattr( + Runner, "run_streamed", lambda *a, **k: type("R", (), {"stream_events": fake_stream})() + ) + monkeypatch.setattr(OpenAIChatCompletionsModel, "_fetch_response", lambda *a, **k: None) + + @function_tool + async def grab2(x: int) -> int: + return x + 1 + + sub = Agent(name="sub", instructions="", tools=[grab, grab2]) + tool = sub.as_tool("outer", "desc", stream_inner_events=True) + main = Agent(name="main", instructions="", tools=[tool]) + run = Runner.run_streamed(main, input="") + + events = await collect_tool_events(run) + assert events == [ + ("tool_called", "outer"), + ("tool_called", "grab"), + ("tool_output", "grab"), + ("tool_called", "grab2"), + ("tool_output", "grab2"), + ("tool_output", "outer"), + ] + + +@pytest.mark.asyncio +async def test_as_tool_heavy_concurrency_ordering(monkeypatch): + """Nested events from many tools appear once and in order.""" + + async def fake_stream(self): + for name in ("A", "B", "C", "D"): + yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": name})}) + yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": "grab"})}) + yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": "grab"})}) + yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": name})}) + + monkeypatch.setattr( + Runner, "run_streamed", lambda *a, **k: type("R", (), {"stream_events": fake_stream})() + ) + monkeypatch.setattr(OpenAIChatCompletionsModel, "_fetch_response", lambda *a, **k: None) + + sub = Agent(name="sub", instructions="", tools=[grab]) + tools = [sub.as_tool(n, n, stream_inner_events=True) for n in ("A", "B", "C", "D")] + main = Agent(name="main", instructions="", tools=tools) + run = Runner.run_streamed( + main, + input="", + run_config=RunConfig(model_settings=ModelSettings(parallel_tool_calls=True)), + ) + + events = await collect_tool_events(run) + assert events.count(("tool_called", "grab")) == 4 + assert events.count(("tool_output", "grab")) == 4 + for name in ("A", "B", "C", "D"): + assert ("tool_called", name) in events + + +@pytest.mark.asyncio +async def test_as_tool_backward_compatibility(monkeypatch): + """When stream_inner_events is False, inner events are hidden.""" + + def fake_run_streamed(agent, *args, **kwargs): + async def fake_stream(self): + if agent.name == "sub": + yield type( + "E", (), {"name": "tool_called", "item": type("I", (), {"name": "grab"})} + ) + yield type( + "E", (), {"name": "tool_output", "item": type("I", (), {"name": "grab"})} + ) + else: + yield type( + "E", (), {"name": "tool_called", "item": type("I", (), {"name": "outer"})} + ) + yield type( + "E", (), {"name": "tool_output", "item": type("I", (), {"name": "outer"})} + ) + + return type("R", (), {"stream_events": fake_stream})() + + monkeypatch.setattr(Runner, "run_streamed", fake_run_streamed) + monkeypatch.setattr(OpenAIChatCompletionsModel, "_fetch_response", lambda *a, **k: None) + + sub = Agent(name="sub", instructions="", tools=[grab]) + tool = sub.as_tool("outer", "desc", stream_inner_events=False) + main = Agent(name="main", instructions="", tools=[tool]) + run = Runner.run_streamed(main, input="") + + events = await collect_tool_events(run) + assert events == [ + ("tool_called", "outer"), + ("tool_output", "outer"), + ] From 1642e6174cddd659ea9dadfdfb510ec1e6246945 Mon Sep 17 00:00:00 2001 From: vrtnis <123119434+vrtnis@users.noreply.github.com> Date: Tue, 22 Jul 2025 14:20:13 -0700 Subject: [PATCH 2/2] =?UTF-8?q?fix:=20restore=20event=5Fqueue=20property?= =?UTF-8?q?=20and=203.9=E2=80=91compatible=20typing=20in=20ToolContext?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/agents/tool_context.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/agents/tool_context.py b/src/agents/tool_context.py index 9127d30c9..7a70375dd 100644 --- a/src/agents/tool_context.py +++ b/src/agents/tool_context.py @@ -1,6 +1,6 @@ import asyncio from dataclasses import dataclass, field, fields -from typing import Any, Optional +from typing import Any, Optional, Union from openai.types.responses import ResponseFunctionToolCall @@ -25,13 +25,21 @@ class ToolContext(RunContextWrapper[TContext]): tool_call_id: str = field(default_factory=_assert_must_pass_tool_call_id) """The ID of the tool call.""" - _event_queue: asyncio.Queue[Any] | None = field(default=None, init=False, repr=False) + _event_queue: Optional[asyncio.Queue[Any]] = field(default=None, init=False, repr=False) + + @property + def event_queue(self) -> Optional[asyncio.Queue[Any]]: + return self._event_queue + + @event_queue.setter + def event_queue(self, queue: Optional[asyncio.Queue[Any]]) -> None: + self._event_queue = queue @classmethod def from_agent_context( cls, context: RunContextWrapper[TContext], - tool_call_id: str | int, + tool_call_id: Union[str, int], tool_call: Optional[ResponseFunctionToolCall] = None, ) -> "ToolContext": """