Skip to content

Commit a9f3bb1

Browse files
authored
Tests to use fast experimental communication strategy (SWE-agent#230)
1 parent 264ff06 commit a9f3bb1

File tree

4 files changed

+98
-1
lines changed

4 files changed

+98
-1
lines changed

sweagent/environment/swe_env.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
from simple_parsing.helpers.serialization.serializable import FrozenSerializable
2020
import yaml
2121
from sweagent.environment.utils import (
22+
PROCESS_DONE_MARKER_END,
23+
PROCESS_DONE_MARKER_START,
2224
copy_anything_to_container,
2325
copy_file_to_container,
2426
format_trajectory_markdown,
@@ -28,6 +30,7 @@
2830
parse_gh_issue_url,
2931
read_with_timeout,
3032
LOGGER_NAME,
33+
read_with_timeout_experimental,
3134
)
3235
from swebench import (
3336
get_environment_yml,
@@ -445,11 +448,40 @@ def _init_scripts(self):
445448
error_msg="Failed to add commands directory to PATH",
446449
)
447450

451+
def _communicate_experimental(
452+
self,
453+
input: str,
454+
timeout_duration=25,
455+
) -> str:
456+
"""Experimental version of `_communicate`"""
457+
458+
command_suffix = f"echo {PROCESS_DONE_MARKER_START}$?{PROCESS_DONE_MARKER_END}\n"
459+
try:
460+
self.returncode = None
461+
cmd = input if input.endswith("\n") else input + "\n"
462+
cmd += command_suffix
463+
print(cmd)
464+
os.write(self.container.stdin.fileno(), cmd.encode())
465+
time.sleep(0.03)
466+
self.container.stdin.flush()
467+
except BrokenPipeError:
468+
traceback.print_exc()
469+
self.logger.error(
470+
"Failed to communicate with container. Check docker logs for more information."
471+
)
472+
raise RuntimeError("Failed to communicate with container")
473+
474+
buffer, exit_code = read_with_timeout_experimental(self.container, timeout_duration)
475+
self.returncode = int(exit_code)
476+
return buffer
477+
448478
def _communicate(
449479
self,
450480
input: str,
451481
timeout_duration=25,
452482
) -> str:
483+
if "SWE_AGENT_EXPERIMENTAL_COMMUNICATE" in os.environ:
484+
return self._communicate_experimental(input, timeout_duration)
453485
try:
454486
self.returncode = None
455487
cmd = input if input.endswith("\n") else input + "\n"

sweagent/environment/utils.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,57 @@ def read_with_timeout(container, pid_func, timeout_duration):
161161
return buffer.decode()
162162

163163

164+
PROCESS_DONE_MARKER_START = "///PROCESS-DONE:"
165+
PROCESS_DONE_MARKER_END= ":PROCESS-DONE///"
166+
PROCESS_DONE_REGEX = re.compile(rf"{PROCESS_DONE_MARKER_START}(.+?){PROCESS_DONE_MARKER_END}")
167+
168+
169+
def read_with_timeout_experimental(container, timeout_duration):
170+
"""
171+
Read data from a subprocess with a timeout.
172+
This function uses a file descriptor to read data from the subprocess in a non-blocking way.
173+
174+
NOTE: This is an experimental implementation that is faster than `read_with_timeout`, but
175+
has not been thoroughly tested.
176+
177+
Args:
178+
container (subprocess.Popen): The subprocess container.
179+
timeout_duration (int): The timeout duration in seconds.
180+
181+
Returns:
182+
str: The data read from the subprocess, stripped of trailing newline characters.
183+
184+
Raises:
185+
TimeoutError: If the timeout duration is reached while reading from the subprocess.
186+
"""
187+
buffer = b""
188+
fd = container.stdout.fileno()
189+
end_time = time.time() + timeout_duration
190+
191+
while time.time() < end_time:
192+
ready_to_read, _, _ = select.select([fd], [], [], 0.01)
193+
if ready_to_read:
194+
data = os.read(fd, 4096)
195+
if data:
196+
buffer += data
197+
if PROCESS_DONE_MARKER_START in buffer.decode():
198+
break
199+
time.sleep(0.01) # Prevents CPU hogging
200+
201+
if container.poll() is not None:
202+
raise RuntimeError("Subprocess exited unexpectedly.\nCurrent buffer: {}".format(buffer.decode()))
203+
if time.time() >= end_time:
204+
raise TimeoutError("Timeout reached while reading from subprocess.\nCurrent buffer: {}".format(buffer.decode()))
205+
decoded = buffer.decode()
206+
body = "\n".join(line for line in decoded.splitlines() if not line.startswith(PROCESS_DONE_MARKER_START))
207+
last_line = decoded.splitlines()[-1]
208+
_results = PROCESS_DONE_REGEX.search(last_line)
209+
if _results is None:
210+
raise ValueError(f"Could not find process done marker in last line: {last_line=}, {body=}")
211+
exit_code = _results.group(1)
212+
return body, exit_code
213+
214+
164215
class timeout:
165216
def __init__(self, seconds=TIMEOUT_DURATION, error_message="Timeout"):
166217
self.seconds = seconds

tests/conftest.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import os
12
import json
23
import sys
34
from pathlib import Path
@@ -10,7 +11,7 @@
1011
package_dir = root_dir / "sweagent"
1112
sys.path.insert(0, str(root_dir))
1213
sys.path.insert(1, str(package_dir))
13-
print("Adjusted path: ", sys.path)
14+
os.environ["SWE_AGENT_EXPERIMENTAL_COMMUNICATE"] = "1"
1415

1516

1617
@fixture

tests/test_env.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import dataclasses
2+
import os
23
from pathlib import Path
34
import pytest
45
import yaml
@@ -80,3 +81,15 @@ def test_interrupt_close(test_env_args):
8081
env = SWEEnv(test_env_args)
8182
env.interrupt()
8283
env.close()
84+
85+
86+
@pytest.mark.slow
87+
def test_communicate_old(test_env_args):
88+
del os.environ["SWE_AGENT_EXPERIMENTAL_COMMUNICATE"]
89+
try:
90+
env = SWEEnv(test_env_args)
91+
env.reset()
92+
except:
93+
raise
94+
finally:
95+
os.environ["SWE_AGENT_EXPERIMENTAL_COMMUNICATE"] = "1"

0 commit comments

Comments
 (0)