33
44#![ cfg( feature = "e2e" ) ]
55
6- use std:: io:: { Read , Write } ;
7- use std:: net :: TcpListener ;
6+ use std:: io:: Write ;
7+ use std:: process :: Command ;
88use std:: process:: Stdio ;
99use std:: sync:: Mutex ;
10- use std:: thread;
1110use std:: time:: Duration ;
1211
1312use openshell_e2e:: harness:: binary:: openshell_cmd;
13+ use openshell_e2e:: harness:: port:: find_free_port;
1414use openshell_e2e:: harness:: sandbox:: SandboxGuard ;
1515use tempfile:: NamedTempFile ;
16+ use tokio:: time:: { interval, timeout} ;
1617
1718const INFERENCE_PROVIDER_NAME : & str = "e2e-host-inference" ;
19+ const TEST_SERVER_IMAGE : & str = "python:3.13-alpine" ;
1820static INFERENCE_ROUTE_LOCK : Mutex < ( ) > = Mutex :: new ( ( ) ) ;
1921
2022async fn run_cli ( args : & [ & str ] ) -> Result < String , String > {
@@ -41,68 +43,118 @@ async fn run_cli(args: &[&str]) -> Result<String, String> {
4143 Ok ( combined)
4244}
4345
44- fn spawn_server (
45- response_body : fn ( & str ) -> String ,
46- ) -> Result < ( u16 , thread:: JoinHandle < ( ) > ) , String > {
47- let listener = TcpListener :: bind ( "0.0.0.0:0" )
48- . map_err ( |e| format ! ( "bind echo server on 0.0.0.0:0: {e}" ) ) ?;
49- listener
50- . set_nonblocking ( false )
51- . map_err ( |e| format ! ( "configure echo server blocking mode: {e}" ) ) ?;
52- let port = listener
53- . local_addr ( )
54- . map_err ( |e| format ! ( "read echo server address: {e}" ) ) ?
55- . port ( ) ;
56-
57- let handle = thread:: spawn ( move || {
58- let ( mut stream, _) = listener. accept ( ) . expect ( "accept echo request" ) ;
59- stream
60- . set_read_timeout ( Some ( Duration :: from_secs ( 30 ) ) )
61- . expect ( "set read timeout" ) ;
62- stream
63- . set_write_timeout ( Some ( Duration :: from_secs ( 30 ) ) )
64- . expect ( "set write timeout" ) ;
65-
66- let mut request = Vec :: new ( ) ;
67- let mut buf = [ 0_u8 ; 1024 ] ;
68- loop {
69- let read = stream. read ( & mut buf) . expect ( "read echo request" ) ;
70- if read == 0 {
71- break ;
72- }
73- request. extend_from_slice ( & buf[ ..read] ) ;
74- if request. windows ( 4 ) . any ( |window| window == b"\r \n \r \n " ) {
75- break ;
76- }
46+ struct DockerServer {
47+ port : u16 ,
48+ container_id : String ,
49+ }
50+
51+ impl DockerServer {
52+ async fn start ( response_body : & str ) -> Result < Self , String > {
53+ let port = find_free_port ( ) ;
54+ let script = r#"from http.server import BaseHTTPRequestHandler, HTTPServer
55+ import os
56+
57+ BODY = os.environ["RESPONSE_BODY"].encode()
58+
59+ class Handler(BaseHTTPRequestHandler):
60+ def do_GET(self):
61+ self.send_response(200)
62+ self.send_header("Content-Type", "application/json")
63+ self.send_header("Content-Length", str(len(BODY)))
64+ self.end_headers()
65+ self.wfile.write(BODY)
66+
67+ def do_POST(self):
68+ length = int(self.headers.get("Content-Length", "0"))
69+ if length:
70+ self.rfile.read(length)
71+ self.send_response(200)
72+ self.send_header("Content-Type", "application/json")
73+ self.send_header("Content-Length", str(len(BODY)))
74+ self.end_headers()
75+ self.wfile.write(BODY)
76+
77+ def log_message(self, format, *args):
78+ pass
79+
80+ HTTPServer(("0.0.0.0", 8000), Handler).serve_forever()
81+ "# ;
82+
83+ let output = Command :: new ( "docker" )
84+ . args ( [
85+ "run" ,
86+ "--detach" ,
87+ "--rm" ,
88+ "-e" ,
89+ & format ! ( "RESPONSE_BODY={response_body}" ) ,
90+ "-p" ,
91+ & format ! ( "{port}:8000" ) ,
92+ TEST_SERVER_IMAGE ,
93+ "python3" ,
94+ "-c" ,
95+ script,
96+ ] )
97+ . output ( )
98+ . map_err ( |e| format ! ( "start docker test server: {e}" ) ) ?;
99+
100+ let stdout = String :: from_utf8_lossy ( & output. stdout ) . trim ( ) . to_string ( ) ;
101+ let stderr = String :: from_utf8_lossy ( & output. stderr ) . to_string ( ) ;
102+
103+ if !output. status . success ( ) {
104+ return Err ( format ! (
105+ "docker run failed (exit {:?}):\n {stderr}" ,
106+ output. status. code( )
107+ ) ) ;
77108 }
78109
79- let request_text = String :: from_utf8_lossy ( & request) ;
80- let body = response_body ( & request_text) ;
81- let response = format ! (
82- "HTTP/1.1 200 OK\r \n content-type: application/json\r \n content-length: {}\r \n connection: close\r \n \r \n {}" ,
83- body. len( ) ,
84- body
85- ) ;
86- stream
87- . write_all ( response. as_bytes ( ) )
88- . expect ( "write echo response" ) ;
89- stream. flush ( ) . expect ( "flush echo response" ) ;
90- } ) ;
91-
92- Ok ( ( port, handle) )
93- }
110+ let server = Self {
111+ port,
112+ container_id : stdout,
113+ } ;
114+ server. wait_until_ready ( ) . await ?;
115+ Ok ( server)
116+ }
94117
95- fn spawn_echo_server ( ) -> Result < ( u16 , thread:: JoinHandle < ( ) > ) , String > {
96- spawn_server ( |request_text| {
97- let request_line = request_text. lines ( ) . next ( ) . unwrap_or_default ( ) ;
98- format ! ( r#"{{"message":"hello-from-host","request_line":"{request_line}"}}"# )
99- } )
118+ async fn wait_until_ready ( & self ) -> Result < ( ) , String > {
119+ let container_id = self . container_id . clone ( ) ;
120+ timeout ( Duration :: from_secs ( 30 ) , async move {
121+ let mut tick = interval ( Duration :: from_millis ( 500 ) ) ;
122+ loop {
123+ tick. tick ( ) . await ;
124+ let output = Command :: new ( "docker" )
125+ . args ( [
126+ "exec" ,
127+ & container_id,
128+ "python3" ,
129+ "-c" ,
130+ "import urllib.request; urllib.request.urlopen('http://127.0.0.1:8000', timeout=1).read()" ,
131+ ] )
132+ . output ( ) ;
133+
134+ match output {
135+ Ok ( result) if result. status . success ( ) => return Ok ( ( ) ) ,
136+ Ok ( _) | Err ( _) => continue ,
137+ }
138+ }
139+ } )
140+ . await
141+ . map_err ( |_| {
142+ format ! (
143+ "docker test server {} did not become ready within 30s" ,
144+ self . container_id
145+ )
146+ } ) ?
147+ }
100148}
101149
102- fn spawn_inference_server ( ) -> Result < ( u16 , thread:: JoinHandle < ( ) > ) , String > {
103- spawn_server ( |_| {
104- r#"{"id":"chatcmpl-test","object":"chat.completion","created":1,"model":"host-echo","choices":[{"index":0,"message":{"role":"assistant","content":"hello-from-host"},"finish_reason":"stop"}]}"# . to_string ( )
105- } )
150+ impl Drop for DockerServer {
151+ fn drop ( & mut self ) {
152+ let _ = Command :: new ( "docker" )
153+ . args ( [ "rm" , "-f" , & self . container_id ] )
154+ . stdout ( Stdio :: null ( ) )
155+ . stderr ( Stdio :: null ( ) )
156+ . status ( ) ;
157+ }
106158}
107159
108160async fn provider_exists ( name : & str ) -> bool {
@@ -173,8 +225,10 @@ network_policies:
173225
174226#[ tokio:: test]
175227async fn sandbox_reaches_host_openshell_internal_via_host_gateway_alias ( ) {
176- let ( port, server) = spawn_echo_server ( ) . expect ( "start host echo server" ) ;
177- let policy = write_policy ( port) . expect ( "write custom policy" ) ;
228+ let server = DockerServer :: start ( r#"{"message":"hello-from-host"}"# )
229+ . await
230+ . expect ( "start host echo server" ) ;
231+ let policy = write_policy ( server. port ) . expect ( "write custom policy" ) ;
178232 let policy_path = policy
179233 . path ( )
180234 . to_str ( )
@@ -188,29 +242,20 @@ async fn sandbox_reaches_host_openshell_internal_via_host_gateway_alias() {
188242 "curl" ,
189243 "--silent" ,
190244 "--show-error" ,
191- & format ! ( "http://host.openshell.internal:{port}/" ) ,
245+ "--max-time" ,
246+ "15" ,
247+ & format ! ( "http://host.openshell.internal:{}/" , server. port) ,
192248 ] )
193249 . await
194250 . expect ( "sandbox create with host.openshell.internal echo request" ) ;
195251
196- server
197- . join ( )
198- . expect ( "echo server thread should exit cleanly" ) ;
199-
200252 assert ! (
201253 guard
202254 . create_output
203255 . contains( "\" message\" :\" hello-from-host\" " ) ,
204256 "expected sandbox to receive host echo response:\n {}" ,
205257 guard. create_output
206258 ) ;
207- assert ! (
208- guard
209- . create_output
210- . contains( "\" request_line\" :\" GET / HTTP/1.1\" " ) ,
211- "expected host echo server to receive sandbox HTTP request:\n {}" ,
212- guard. create_output
213- ) ;
214259}
215260
216261#[ tokio:: test]
@@ -227,7 +272,11 @@ async fn sandbox_inference_local_routes_to_host_openshell_internal() {
227272 return ;
228273 }
229274
230- let ( port, server) = spawn_inference_server ( ) . expect ( "start host inference echo server" ) ;
275+ let server = DockerServer :: start (
276+ r#"{"id":"chatcmpl-test","object":"chat.completion","created":1,"model":"host-echo","choices":[{"index":0,"message":{"role":"assistant","content":"hello-from-host"},"finish_reason":"stop"}]}"# ,
277+ )
278+ . await
279+ . expect ( "start host inference echo server" ) ;
231280
232281 if provider_exists ( INFERENCE_PROVIDER_NAME ) . await {
233282 delete_provider ( INFERENCE_PROVIDER_NAME ) . await ;
@@ -243,7 +292,10 @@ async fn sandbox_inference_local_routes_to_host_openshell_internal() {
243292 "--credential" ,
244293 "OPENAI_API_KEY=dummy" ,
245294 "--config" ,
246- & format ! ( "OPENAI_BASE_URL=http://host.openshell.internal:{port}/v1" ) ,
295+ & format ! (
296+ "OPENAI_BASE_URL=http://host.openshell.internal:{}/v1" ,
297+ server. port
298+ ) ,
247299 ] )
248300 . await
249301 . expect ( "create host-backed OpenAI provider" ) ;
@@ -265,17 +317,15 @@ async fn sandbox_inference_local_routes_to_host_openshell_internal() {
265317 "curl" ,
266318 "--silent" ,
267319 "--show-error" ,
320+ "--max-time" ,
321+ "15" ,
268322 "https://inference.local/v1/chat/completions" ,
269323 "--json" ,
270324 r#"{"messages":[{"role":"user","content":"hello"}]}"# ,
271325 ] )
272326 . await
273327 . expect ( "sandbox create with inference.local request" ) ;
274328
275- server
276- . join ( )
277- . expect ( "inference echo server thread should exit cleanly" ) ;
278-
279329 assert ! (
280330 guard
281331 . create_output
0 commit comments