Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions runtime/acp/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,16 @@ def close(self) -> None:
self._process.stdin.close()
except OSError:
pass
try:
if self._process.stdout:
self._process.stdout.close()
except (OSError, ValueError):
pass
try:
if self._process.stderr:
self._process.stderr.close()
except (OSError, ValueError):
pass
try:
self._process.terminate()
self._process.wait(timeout=5)
Expand Down
68 changes: 68 additions & 0 deletions tests/test_acp_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,74 @@ def test_drain_notifications(self) -> None:
items = self.transport.drain_notifications()
self.assertGreaterEqual(len(items), 1)

def test_close_no_resource_warning(self) -> None:
"""close() must close stdout/stderr pipes — no ResourceWarning on GC."""
import gc
import warnings

transport = JsonRpcTransport()
transport.start(
command=[sys.executable, "-c", "import time; time.sleep(5)"],
cwd=self.tmp,
)
# Grab pid so we can confirm process existed
self.assertIsNotNone(transport.pid)
transport.close()

# Force GC and check for ResourceWarning about unclosed file/pipe
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always", ResourceWarning)
del transport
gc.collect()

resource_warnings = [w for w in caught if issubclass(w.category, ResourceWarning)]
self.assertEqual(
resource_warnings, [],
f"Expected no ResourceWarning, got: {[str(w.message) for w in resource_warnings]}",
)

def test_close_no_resource_warning_with_stderr_pipe(self) -> None:
"""close() must close stderr pipe when stderr=subprocess.PIPE."""
import gc
import warnings

transport = JsonRpcTransport()
transport.start(
command=[sys.executable, "-c", "import time; time.sleep(5)"],
cwd=self.tmp,
# Force stderr to be a PIPE (not DEVNULL)
)
self.assertIsNotNone(transport.pid)

# Manually set stderr to a PIPE for testing
# (start() uses DEVNULL by default, so we need stderr_path to get a pipe)
transport.close()

# Now test with explicit stderr pipe
transport2 = JsonRpcTransport()
proc = subprocess.Popen(
[sys.executable, "-c", "import time; time.sleep(5)"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
transport2._process = proc
transport2._running = True
transport2.close()

with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always", ResourceWarning)
del transport2
del proc
gc.collect()

resource_warnings = [w for w in caught if issubclass(w.category, ResourceWarning)]
self.assertEqual(
resource_warnings, [],
f"Expected no ResourceWarning with stderr pipe, got: {[str(w.message) for w in resource_warnings]}",
)


class TestTransportEdgeCases(unittest.TestCase):
def test_start_twice_raises(self) -> None:
Expand Down