Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions sdk/python/tests/test_harness_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""Tests for shared subprocess helpers used by CLI harness providers."""

from __future__ import annotations

import asyncio
from unittest.mock import AsyncMock, MagicMock, patch

import pytest

from agentfield.harness._cli import (
estimate_cli_cost,
extract_final_text,
parse_jsonl,
run_cli,
strip_ansi,
)


def test_strip_ansi_removes_colors():
assert strip_ansi("\x1b[31mError\x1b[0m") == "Error"


@pytest.mark.asyncio
async def test_run_cli_success():
process = MagicMock()
process.communicate = AsyncMock(return_value=(b"OK", b""))
process.returncode = 0

create_process = AsyncMock(return_value=process)

with patch("asyncio.create_subprocess_exec", create_process):
stdout, stderr, returncode = await run_cli(
["agentfield", "status"],
env={"AGENTFIELD_TEST": "1"},
cwd=".",
timeout=1,
)

assert stdout == "OK"
assert stderr == ""
assert returncode == 0
create_process.assert_awaited_once()
_, kwargs = create_process.call_args
assert kwargs["env"]["AGENTFIELD_TEST"] == "1"
assert kwargs["cwd"] == "."
assert kwargs["stdout"] is asyncio.subprocess.PIPE
assert kwargs["stderr"] is asyncio.subprocess.PIPE


@pytest.mark.asyncio
async def test_run_cli_timeout():
class HangingProcess:
returncode = None

def __init__(self) -> None:
self.killed = False
self.wait = AsyncMock(return_value=None)

async def communicate(self):
await asyncio.sleep(1)
return b"", b""

def kill(self):
self.killed = True

process = HangingProcess()

with patch("asyncio.create_subprocess_exec", AsyncMock(return_value=process)):
with pytest.raises(TimeoutError, match="CLI command timed out"):
await run_cli(["agentfield", "hang"], timeout=0.01)

assert process.killed is True
process.wait.assert_awaited_once()


def test_parse_jsonl_skips_invalid():
events = parse_jsonl('{"type":"a"}\nnot-json\n{"type":"b"}')

assert events == [{"type": "a"}, {"type": "b"}]


def test_extract_final_text_codex_style():
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟧 [HIGH] extract_final_text only tests 1 of 4 event-type branches

extract_final_text in agentfield/harness/_cli.py:68-98 handles four event types:

  • item.completed (Codex) ← tested here
  • result ← untested
  • turn.completed ← untested
  • message / assistant ← untested

Untested branches in a "final answer extraction" path can silently return None or wrong text in production for non-Codex CLI providers. Suggest adding a case per event type plus an empty-events case.


🤖 Reviewed by AgentField PR Review Harness

events = [
{"type": "item.completed", "item": {"type": "agent_message", "text": "first"}},
{
"type": "item.completed",
"item": {"type": "agent_message", "text": "final answer"},
},
]

assert extract_final_text(events) == "final answer"


def test_estimate_cli_cost_calls_litellm():
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟧 [HIGH] estimate_cli_cost except path is untested

Source (_cli.py:113-123) has try: import litellm ... except Exception: return None. Only the success path returning 0.05 is exercised; the "litellm-missing" or "completion_cost raises" branches silently return None. Callers MUST treat None as "unknown" rather than "free" — exactly the kind of defensive code that rots untested and quietly corrupts billing/accounting if broken.

Suggested additions:

  • ImportError when litellm is absent
  • completion_cost returning 0 or None
  • completion_cost raising

🤖 Reviewed by AgentField PR Review Harness

mock_litellm = MagicMock()
mock_litellm.completion_cost.return_value = 0.05

with patch.dict("sys.modules", {"litellm": mock_litellm}):
cost = estimate_cli_cost(
model="openai/gpt-4o",
prompt="Summarize this run",
result_text="Done",
)

assert cost == 0.05
mock_litellm.completion_cost.assert_called_once_with(
model="openai/gpt-4o",
prompt="Summarize this run",
completion="Done",
)
192 changes: 186 additions & 6 deletions sdk/python/tests/test_node_logs.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
"""
Tests for agentfield.node_logs — ProcessLogRing and related helpers.
"""

from __future__ import annotations

import io
import json
import queue
import sys
import threading
import time

import pytest

from agentfield.node_logs import (
LogEntry,
ProcessLogRing,
_TeeTextIO,
get_ring,
install_stdio_tee,
iter_tail_ndjson,
verify_internal_bearer,
get_ring,
)


Expand All @@ -23,20 +31,26 @@

class TestLogEntryNdjson:
def test_stdout_produces_info_level(self):
entry = LogEntry(seq=1, ts="2024-01-01T00:00:00.000Z", stream="stdout", line="hello")
entry = LogEntry(
seq=1, ts="2024-01-01T00:00:00.000Z", stream="stdout", line="hello"
)
data = json.loads(entry.to_ndjson_line().decode())
assert data["level"] == "info"
assert data["line"] == "hello"
assert data["v"] == 1
assert data["source"] == "process"

def test_stderr_produces_error_level(self):
entry = LogEntry(seq=2, ts="2024-01-01T00:00:00.000Z", stream="stderr", line="err")
entry = LogEntry(
seq=2, ts="2024-01-01T00:00:00.000Z", stream="stderr", line="err"
)
data = json.loads(entry.to_ndjson_line().decode())
assert data["level"] == "error"

def test_other_stream_produces_log_level(self):
entry = LogEntry(seq=3, ts="2024-01-01T00:00:00.000Z", stream="custom", line="msg")
entry = LogEntry(
seq=3, ts="2024-01-01T00:00:00.000Z", stream="custom", line="msg"
)
data = json.loads(entry.to_ndjson_line().decode())
assert data["level"] == "log"

Expand All @@ -55,7 +69,9 @@ def test_ndjson_ends_with_newline(self):
assert entry.to_ndjson_line().endswith(b"\n")

def test_seq_and_ts_preserved(self):
entry = LogEntry(seq=42, ts="2024-06-15T10:00:00.000Z", stream="stdout", line="data")
entry = LogEntry(
seq=42, ts="2024-06-15T10:00:00.000Z", stream="stdout", line="data"
)
data = json.loads(entry.to_ndjson_line().decode())
assert data["seq"] == 42
assert data["ts"] == "2024-06-15T10:00:00.000Z"
Expand Down Expand Up @@ -157,7 +173,9 @@ def test_long_line_is_truncated(self):
ring.append("stdout", long_text, max_line_bytes=10)
entries = ring.tail(1)
assert entries[0].truncated is True
assert len(entries[0].line.encode("utf-8")) <= 10 + 3 # allow for replacement chars
assert (
len(entries[0].line.encode("utf-8")) <= 10 + 3
) # allow for replacement chars

def test_short_line_is_not_truncated(self):
ring = ProcessLogRing(max_bytes=1024 * 1024)
Expand Down Expand Up @@ -275,6 +293,168 @@ def test_iter_tail_empty_ring(self, monkeypatch):
assert chunks == []


# ---------------------------------------------------------------------------
# _TeeTextIO and install_stdio_tee
# ---------------------------------------------------------------------------


class TestTeeTextIO:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟥 [HIGH] _TeeTextIO is missing core TextIO methods (fileno, writelines, close, readable, writable, seekable)

typing.TextIO is a typing stub, not a usable base class with default method implementations. The class in agentfield/node_logs.py:122-158 only proxies write, flush, encoding, and isatty. After install_stdio_tee() runs, any caller of sys.stdout.fileno() — pytest's capfd fixture, Click, rich, subprocess.Popen(stdin=sys.stdin) — will hit AttributeError/NotImplementedError at runtime.

This is a production gap exposed by the test omission, not just a coverage hole. Suggested fix:

  1. Inherit from io.TextIOBase to get sane defaults, OR
  2. Explicitly proxy the missing methods to self._original
  3. Add a test that calls each TextIO method through sys.stdout after install_stdio_tee()

🤖 Reviewed by AgentField PR Review Harness

def test_tee_text_io_writes_to_original(self):
original = io.StringIO()
ring = ProcessLogRing(max_bytes=1024 * 1024)
tee = _TeeTextIO("stdout", original, ring, max_line_bytes=1024)

written = tee.write("hello\n")

assert written == len("hello\n")
assert original.getvalue() == "hello\n"

def test_tee_text_io_appends_to_ring(self):
original = io.StringIO()
ring = ProcessLogRing(max_bytes=1024 * 1024)
tee = _TeeTextIO("stdout", original, ring, max_line_bytes=1024)

tee.write("one line\n")

entries = ring.tail(1)
assert len(entries) == 1
assert entries[0].stream == "stdout"
assert entries[0].line == "one line"

def test_tee_text_io_buffers_until_newline(self):
original = io.StringIO()
ring = ProcessLogRing(max_bytes=1024 * 1024)
tee = _TeeTextIO("stderr", original, ring, max_line_bytes=1024)

tee.write("partial")
assert ring.tail(1) == []

tee.write(" line\n")
entries = ring.tail(1)
assert entries[0].stream == "stderr"
assert entries[0].line == "partial line"

def test_install_stdio_tee_replaces_sys_stdout(self, monkeypatch):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟧 [HIGH] install_stdio_tee disabled-env early-return is untested

Source (node_logs.py:196) is if _tee_installed or not logs_enabled(): return. This test sets AGENTFIELD_LOGS_ENABLED=true and verifies wrapping happens. The disabled case — which is the default-user invocation when the env var is unset to false/0/no/off — has no coverage. A regression here silently mis-wires stdout for every default install.

Suggest adding a test that sets AGENTFIELD_LOGS_ENABLED=false and asserts sys.stdout is not wrapped.


🤖 Reviewed by AgentField PR Review Harness

import agentfield.node_logs as nl

previous_stdout = sys.stdout
previous_stderr = sys.stderr
original_stdout = io.StringIO()
original_stderr = io.StringIO()
ring = ProcessLogRing(max_bytes=1024 * 1024)

monkeypatch.setenv("AGENTFIELD_LOGS_ENABLED", "true")
monkeypatch.setattr(sys, "__stdout__", original_stdout)
monkeypatch.setattr(sys, "__stderr__", original_stderr)
monkeypatch.setattr(nl, "_global_ring", ring)
monkeypatch.setattr(nl, "_tee_installed", False)

try:
install_stdio_tee()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟧 [HIGH] install_stdio_tee idempotency (_tee_installed flag) is untested

The function guards with if _tee_installed or not logs_enabled(): return. This test sets _tee_installed=False then calls install_stdio_tee() once — it does NOT call it twice to verify the second call is a no-op. Without this test, a refactor that breaks the idempotency guard would double-wrap sys.stdout, producing duplicated log entries — a silent correctness bug.

Suggest: call install_stdio_tee() twice and assert sys.stdout._original is the original (not another _TeeTextIO).


🤖 Reviewed by AgentField PR Review Harness

assert isinstance(sys.stdout, _TeeTextIO)
assert isinstance(sys.stderr, _TeeTextIO)

sys.stdout.write("captured\n")
assert original_stdout.getvalue() == "captured\n"
assert ring.tail(1)[0].line == "captured"
finally:
sys.stdout = previous_stdout
sys.stderr = previous_stderr
nl._tee_installed = False


class TestIterTailNdjsonFollow:
def test_iter_tail_ndjson_follow_mode(self, monkeypatch):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟨 [MEDIUM] iter_tail_ndjson follow + tail_lines>0 prelude path untested

All three follow-mode tests use tail_lines=0. The combined "emit existing tail entries, then enter follow loop" path (node_logs.py:222-233) is uncovered. Most real CLI invocations of --follow combine an initial tail with follow — a broken prelude would silently drop history without anyone noticing.

Suggest a test that pre-populates the ring with N entries, opens iter_tail_ndjson(tail_lines=2, since_seq=0, follow=True), asserts the 2 tail entries are yielded, then exercises the follow loop with a new append.


🤖 Reviewed by AgentField PR Review Harness

import agentfield.node_logs as nl

ring = ProcessLogRing(max_bytes=1024 * 1024)
monkeypatch.setattr(nl, "_global_ring", ring)
monkeypatch.setattr(nl, "_follow_queues", [])

chunks: list[bytes] = []
errors: list[BaseException] = []
generator = iter_tail_ndjson(tail_lines=0, since_seq=0, follow=True)

def read_next():
try:
chunks.append(next(generator))
except Exception as exc: # pragma: no cover - assertion reports details
errors.append(exc)

thread = threading.Thread(target=read_next)
thread.start()
deadline = time.monotonic() + 2
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟨 [MEDIUM] Follow-mode test is timing-sensitive / flaky

The busy-wait pattern while not nl._follow_queues and thread.is_alive() and time.monotonic() < deadline: time.sleep(0.001) (with a 2 s deadline) plus thread.join(timeout=2) is racy under slow CI hosts or coverage instrumentation. Both deadlines can be exceeded — the test may pass without exercising the new-entry path (zero chunks, no error) or flake outright.

Suggest replacing busy-wait with a deterministic synchronization primitive: a threading.Event set inside the generator after register_follow_queue() (or wrap register_follow_queue to notify).


🤖 Reviewed by AgentField PR Review Harness

while (
not nl._follow_queues and thread.is_alive() and time.monotonic() < deadline
):
time.sleep(0.001)

ring.append("stdout", "new log", max_line_bytes=1024)
thread.join(timeout=2)
generator.close()

assert errors == []
assert len(chunks) == 1
assert json.loads(chunks[0].decode())["line"] == "new log"

def test_iter_tail_ndjson_unregisters_on_close(self, monkeypatch):
import agentfield.node_logs as nl

class ClosingQueue:
def __init__(self, maxsize: int) -> None:
self.maxsize = maxsize

def put_nowait(self, _item):
return None

def get(self, timeout: float):
assert timeout == 0.5
raise GeneratorExit

ring = ProcessLogRing(max_bytes=1024 * 1024)
monkeypatch.setattr(nl, "_global_ring", ring)
monkeypatch.setattr(nl, "_follow_queues", [])
monkeypatch.setattr(nl.queue, "Queue", ClosingQueue)

generator = iter_tail_ndjson(tail_lines=0, since_seq=0, follow=True)
with pytest.raises(GeneratorExit):
next(generator)

assert nl._follow_queues == []

def test_iter_tail_ndjson_queue_timeout(self, monkeypatch):
import agentfield.node_logs as nl

ring = ProcessLogRing(max_bytes=1024 * 1024)

class TimeoutQueue:
def __init__(self, maxsize: int) -> None:
self.maxsize = maxsize
self._appended = False

def put_nowait(self, _item):
return None

def get(self, timeout: float):
assert timeout == 0.5
if not self._appended:
self._appended = True
ring.append("stdout", "after timeout", max_line_bytes=1024)
raise queue.Empty

monkeypatch.setattr(nl, "_global_ring", ring)
monkeypatch.setattr(nl, "_follow_queues", [])
monkeypatch.setattr(nl.queue, "Queue", TimeoutQueue)

generator = iter_tail_ndjson(tail_lines=0, since_seq=0, follow=True)
try:
chunk = next(generator)
finally:
generator.close()

assert json.loads(chunk.decode())["line"] == "after timeout"


# ---------------------------------------------------------------------------
# verify_internal_bearer
# ---------------------------------------------------------------------------
Expand Down
Loading