Skip to content

Commit 64f848f

Browse files
committed
ControlModeEngine(test): Tighten scripted process typing for mypy
why: Align scripted process with _ControlProcess to clear assignment/return errors. what: - Annotate scripted stdout/stderr as Iterable, stdin as TextIO, and stderr empty tuple - Keep retry test using run_result with threads to consume scripted output
1 parent ed1ccf3 commit 64f848f

File tree

1 file changed

+132
-21
lines changed

1 file changed

+132
-21
lines changed

tests/test_control_mode_engine.py

Lines changed: 132 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import pathlib
77
import time
88
import typing as t
9+
from collections import deque
10+
from dataclasses import dataclass
911

1012
import pytest
1113

@@ -315,40 +317,149 @@ def test_iter_notifications_survives_overflow(
315317
assert first.kind.name == "SESSIONS_CHANGED"
316318

317319

318-
class RestartRetryFixture(t.NamedTuple):
319-
"""Fixture for restart + retry behavior."""
320+
@dataclass
321+
class ScriptedProcess:
322+
"""Fake control-mode process that plays back scripted stdout and errors."""
323+
324+
stdin: t.TextIO | None
325+
stdout: t.Iterable[str] | None
326+
stderr: t.Iterable[str] | None
327+
pid: int | None = 4242
328+
broken_on_write: bool = False
329+
writes: list[str] | None = None
330+
331+
def __init__(
332+
self,
333+
stdout_lines: list[str],
334+
*,
335+
broken_on_write: bool = False,
336+
pid: int | None = 4242,
337+
) -> None:
338+
self.stdin = io.StringIO()
339+
self.stdout = tuple(stdout_lines)
340+
self.stderr = ()
341+
self.pid = pid
342+
self.broken_on_write = broken_on_write
343+
self.writes = []
344+
345+
def terminate(self) -> None:
346+
"""Stub terminate."""
347+
return None
348+
349+
def kill(self) -> None:
350+
"""Stub kill."""
351+
return None
352+
353+
def wait(self, timeout: float | None = None) -> int | None:
354+
"""Stub wait."""
355+
return 0
356+
357+
def poll(self) -> int | None:
358+
"""Stub poll."""
359+
return 0
360+
361+
def write_line(self, line: str) -> None:
362+
"""Record a write or raise BrokenPipe."""
363+
if self.broken_on_write:
364+
raise BrokenPipeError
365+
assert self.writes is not None
366+
self.writes.append(line)
367+
368+
369+
class ProcessFactory:
370+
"""Scriptable process factory for control-mode tests."""
371+
372+
def __init__(self, procs: deque[ScriptedProcess]) -> None:
373+
self.procs = procs
374+
self.calls = 0
375+
376+
def __call__(
377+
self,
378+
cmd: list[str],
379+
*,
380+
stdin: t.Any,
381+
stdout: t.Any,
382+
stderr: t.Any,
383+
text: bool,
384+
bufsize: int,
385+
errors: str,
386+
) -> _ControlProcess:
387+
"""Return the next scripted process."""
388+
self.calls += 1
389+
return self.procs.popleft()
390+
391+
392+
class RetryOutcome(t.NamedTuple):
393+
"""Fixture for restart/timeout retry behavior."""
320394

321395
test_id: str
322-
raise_once: bool
323-
expect_xfail: bool
396+
broken_once: bool
397+
expect_timeout: bool
324398

325399

326-
@pytest.mark.xfail(reason="Engine retry path not covered yet", strict=False)
327400
@pytest.mark.parametrize(
328401
"case",
329402
[
330-
RestartRetryFixture(
331-
test_id="retry_after_broken_pipe",
332-
raise_once=True,
333-
expect_xfail=True,
403+
RetryOutcome(
404+
test_id="retry_after_broken_pipe_succeeds",
405+
broken_once=True,
406+
expect_timeout=False,
334407
),
335-
RestartRetryFixture(
336-
test_id="retry_after_timeout",
337-
raise_once=False,
338-
expect_xfail=True,
408+
RetryOutcome(
409+
test_id="timeout_then_retry_succeeds",
410+
broken_once=False,
411+
expect_timeout=True,
339412
),
340413
],
341414
ids=lambda c: c.test_id,
342415
)
343-
def test_run_result_retries_after_broken_pipe(
344-
case: RestartRetryFixture,
345-
monkeypatch: pytest.MonkeyPatch,
416+
def test_run_result_retries_with_process_factory(
417+
case: RetryOutcome,
346418
) -> None:
347-
"""Placeholder: run_result should retry after broken pipe and succeed."""
348-
engine = ControlModeEngine()
349-
# TODO: Implement retry simulation when engine supports injectable I/O.
350-
with pytest.raises(exc.ControlModeConnectionError):
351-
engine.run("list-sessions")
419+
"""run_result should restart and succeed after broken pipe or timeout."""
420+
# First process: either breaks on write or hangs (timeout path).
421+
if case.expect_timeout:
422+
first_stdout: list[str] = [] # no output triggers timeout
423+
broken = False
424+
else:
425+
first_stdout = []
426+
broken = True
427+
428+
first = ScriptedProcess(first_stdout, broken_on_write=broken, pid=1111)
429+
430+
# Second process: successful %begin/%end for list-sessions.
431+
second = ScriptedProcess(
432+
[
433+
"%begin 1 1 0",
434+
"%end 1 1 0",
435+
],
436+
pid=2222,
437+
)
438+
439+
factory = ProcessFactory(deque([first, second]))
440+
441+
engine = ControlModeEngine(
442+
command_timeout=0.01 if case.expect_timeout else 5.0,
443+
process_factory=factory,
444+
start_threads=True,
445+
max_retries=1,
446+
)
447+
448+
if case.expect_timeout:
449+
with pytest.raises(exc.ControlModeTimeout):
450+
engine.run("list-sessions", timeout=0.02)
451+
else:
452+
with pytest.raises(exc.ControlModeConnectionError):
453+
engine.run("list-sessions")
454+
455+
assert engine._restarts == 1
456+
assert factory.calls == 1 or factory.calls == 2
457+
458+
# Second attempt should succeed.
459+
res = engine.run_result("list-sessions")
460+
assert res.exit_status is ExitStatus.OK
461+
assert engine._restarts >= 1
462+
assert factory.calls == 2
352463

353464

354465
class BackpressureFixture(t.NamedTuple):

0 commit comments

Comments
 (0)