Skip to content

Commit 4f7d0c4

Browse files
committed
fix(inference): prevent silent truncation of large streaming responses
The L7 inference proxy silently dropped tool_calls from large streaming responses due to an aggressive 30s per-chunk idle timeout and a reqwest total-request timeout that capped the entire body stream. Reasoning models that pause during "thinking" phases triggered these timeouts, producing valid-looking but truncated HTTP responses with no client-visible error. - Extract prepare_backend_request() helper and create a streaming variant that omits the total request timeout; body stream liveness is now enforced solely by the per-chunk idle timeout - Add 30s connect_timeout to the reqwest Client builder - Increase CHUNK_IDLE_TIMEOUT from 30s to 120s for reasoning models - Inject SSE error events (proxy_stream_error) before the HTTP chunked terminator on all truncation paths so clients can detect data loss - Wrap the streaming relay in BufWriter to reduce per-chunk TLS flush overhead - Bump OCSF severity for streaming truncation from Low to Medium Closes #829
1 parent fdca543 commit 4f7d0c4

File tree

6 files changed

+316
-35
lines changed

6 files changed

+316
-35
lines changed

architecture/inference-routing.md

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,13 @@ sequenceDiagram
2828
Backend->>Router: Response headers + body stream
2929
Router->>Proxy: StreamingProxyResponse (headers first)
3030
Proxy->>Agent: HTTP/1.1 headers (chunked TE)
31-
loop Each body chunk
31+
loop Each body chunk (120s idle timeout per chunk)
3232
Router->>Proxy: chunk via next_chunk()
3333
Proxy->>Agent: Chunked-encoded frame
3434
end
35+
alt Stream truncated (idle timeout, byte limit, upstream error)
36+
Proxy->>Agent: SSE error event (proxy_stream_error)
37+
end
3538
Proxy->>Agent: Chunk terminator (0\r\n\r\n)
3639
```
3740

@@ -102,7 +105,7 @@ Key messages:
102105
Files:
103106

104107
- `crates/openshell-sandbox/src/proxy.rs` -- proxy interception, inference context, request routing
105-
- `crates/openshell-sandbox/src/l7/inference.rs` -- pattern detection, HTTP parsing, response formatting
108+
- `crates/openshell-sandbox/src/l7/inference.rs` -- pattern detection, HTTP parsing, response formatting, SSE error generation (`format_sse_error()`)
106109
- `crates/openshell-sandbox/src/lib.rs` -- inference context initialization, route refresh
107110
- `crates/openshell-sandbox/src/grpc_client.rs` -- `fetch_inference_bundle()`
108111

@@ -156,7 +159,7 @@ If no pattern matches, the proxy returns `403 Forbidden` with `{"error": "connec
156159
Files:
157160

158161
- `crates/openshell-router/src/lib.rs` -- `Router`, `proxy_with_candidates()`, `proxy_with_candidates_streaming()`
159-
- `crates/openshell-router/src/backend.rs` -- `proxy_to_backend()`, `proxy_to_backend_streaming()`, URL construction
162+
- `crates/openshell-router/src/backend.rs` -- `prepare_backend_request()`, `send_backend_request()`, `send_backend_request_streaming()`, `proxy_to_backend()`, `proxy_to_backend_streaming()`, URL construction
160163
- `crates/openshell-router/src/config.rs` -- `RouteConfig`, `ResolvedRoute`, YAML loading
161164

162165
### Route selection
@@ -165,7 +168,7 @@ Files:
165168

166169
### Request rewriting
167170

168-
`proxy_to_backend()` rewrites outgoing requests:
171+
`prepare_backend_request()` (shared by both buffered and streaming paths) rewrites outgoing requests:
169172

170173
1. **Auth injection**: Uses the route's `AuthHeader` -- either `Authorization: Bearer <key>` or a custom header (e.g. `x-api-key: <key>` for Anthropic).
171174
2. **Header stripping**: Removes `authorization`, `x-api-key`, `host`, and any header names that will be set from route defaults.
@@ -198,20 +201,47 @@ The sandbox proxy (`route_inference_request()` in `proxy.rs`) uses the streaming
198201

199202
1. Calls `proxy_with_candidates_streaming()` to get headers immediately.
200203
2. Formats and sends the HTTP/1.1 response header with `Transfer-Encoding: chunked` via `format_http_response_header()`.
201-
3. Loops on `body.next_chunk()`, wrapping each fragment in HTTP chunked encoding via `format_chunk()`.
202-
4. Sends the chunk terminator (`0\r\n\r\n`) via `format_chunk_terminator()`.
204+
3. Wraps the TLS client stream in a `BufWriter` (16 KiB capacity) to coalesce small SSE chunks into fewer TLS records, reducing per-chunk flush overhead.
205+
4. Loops on `body.next_chunk()` with a per-chunk idle timeout (`CHUNK_IDLE_TIMEOUT`, 120 seconds), wrapping each fragment in HTTP chunked encoding via `format_chunk()`. The 120-second timeout accommodates reasoning models (e.g. nemotron-3-super, o1, o3) that pause 60+ seconds between thinking and output phases.
206+
5. Enforces a total streaming body cap (`MAX_STREAMING_BODY`, 32 MiB).
207+
6. On truncation (idle timeout, byte limit, or upstream read error), injects an SSE error event before the chunk terminator so clients can detect the truncation rather than silently losing data.
208+
7. Sends the chunk terminator (`0\r\n\r\n`) via `format_chunk_terminator()` and flushes the `BufWriter`.
203209

204210
This eliminates full-body buffering for streaming responses (SSE). Time-to-first-byte is determined by the backend's first chunk latency rather than the full generation time.
205211

212+
#### Truncation signaling
213+
214+
When the proxy truncates a streaming response, it injects an SSE error event via `format_sse_error()` (in `crates/openshell-sandbox/src/l7/inference.rs`) before sending the HTTP chunked terminator:
215+
216+
```
217+
data: {"error":{"message":"<reason>","type":"proxy_stream_error"}}
218+
```
219+
220+
Three truncation paths exist:
221+
222+
| Cause | SSE error message | OCSF severity |
223+
|-------|-------------------|---------------|
224+
| Per-chunk idle timeout (120s) | `response truncated: chunk idle timeout exceeded` | Medium |
225+
| Upstream read error | `response truncated: upstream read error` | Medium |
226+
| Streaming body exceeds 32 MiB | `response truncated: exceeded maximum streaming body size` | *(warn log only)* |
227+
228+
The `reason` field in the SSE event is sanitized — it never contains internal URLs, hostnames, or credentials. Full details are captured server-side in the OCSF log.
229+
206230
### Mock routes
207231

208232
File: `crates/openshell-router/src/mock.rs`
209233

210234
Routes with `mock://` scheme endpoints return canned responses without making HTTP requests. Mock responses are protocol-aware (OpenAI chat completion, OpenAI completion, Anthropic messages, or generic JSON). Mock routes include an `x-openshell-mock: true` response header.
211235

212-
### Per-request timeout
236+
### Timeout model
237+
238+
The router uses a layered timeout strategy with separate handling for buffered and streaming responses.
239+
240+
**Client connect timeout**: The `reqwest::Client` is built with a 30-second `connect_timeout` (in `crates/openshell-router/src/lib.rs``Router::new()`). This bounds TCP connection establishment and applies to all outgoing requests regardless of response mode.
241+
242+
**Buffered responses** (`proxy_to_backend()` via `send_backend_request()`): Apply the route's `timeout` as a total request timeout covering the entire lifecycle (connect + headers + body). When `timeout_secs` is `0` in the proto message, the default of 60 seconds is used (defined as `DEFAULT_ROUTE_TIMEOUT` in `config.rs`). Timeouts and connection failures map to `RouterError::UpstreamUnavailable`.
213243

214-
Each `ResolvedRoute` carries a `timeout` field (`Duration`). The `reqwest::Client` has no global timeout; instead, each outgoing request applies `.timeout(route.timeout)` on the request builder. When `timeout_secs` is `0` in the proto message, the default of 60 seconds is used (defined as `DEFAULT_ROUTE_TIMEOUT` in `config.rs`). Timeouts and connection failures map to `RouterError::UpstreamUnavailable`.
244+
**Streaming responses** (`proxy_to_backend_streaming()` via `send_backend_request_streaming()`): Do **not** apply a total request timeout. The total duration of a streaming response is unbounded — liveness is enforced by the sandbox proxy's per-chunk idle timeout (`CHUNK_IDLE_TIMEOUT`, 120 seconds in `proxy.rs`) instead. This separation exists because streaming inference responses (especially from reasoning models) can legitimately take minutes to complete while still sending data. The `prepare_backend_request()` helper in `backend.rs` builds the request identically for both paths; the caller decides whether to chain `.timeout()` before sending.
215245

216246
Timeout changes propagate dynamically to running sandboxes. The bundle revision hash includes `timeout_secs`, so when the timeout is updated via `openshell inference update --timeout`, the refresh loop detects the revision change and updates the route cache within one polling interval (5 seconds by default).
217247

crates/openshell-router/src/backend.rs

Lines changed: 61 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -83,18 +83,19 @@ impl StreamingProxyResponse {
8383
}
8484
}
8585

86-
/// Build and send an HTTP request to the backend configured in `route`.
86+
/// Build an HTTP request to the backend configured in `route`.
8787
///
88-
/// Returns the [`reqwest::Response`] with status, headers, and an un-consumed
89-
/// body stream. Shared by both the buffered and streaming public APIs.
90-
async fn send_backend_request(
88+
/// Returns the prepared [`reqwest::RequestBuilder`] with auth, headers, model
89+
/// rewrite, and body applied. The caller decides whether to apply a total
90+
/// request timeout before sending.
91+
fn prepare_backend_request(
9192
client: &reqwest::Client,
9293
route: &ResolvedRoute,
9394
method: &str,
9495
path: &str,
95-
headers: Vec<(String, String)>,
96+
headers: &[(String, String)],
9697
body: bytes::Bytes,
97-
) -> Result<reqwest::Response, RouterError> {
98+
) -> Result<(reqwest::RequestBuilder, String), RouterError> {
9899
let url = build_backend_url(&route.endpoint, path);
99100

100101
let reqwest_method: reqwest::Method = method
@@ -118,7 +119,7 @@ async fn send_backend_request(
118119
let strip_headers: [&str; 3] = ["authorization", "x-api-key", "host"];
119120

120121
// Forward non-sensitive headers.
121-
for (name, value) in &headers {
122+
for (name, value) in headers {
122123
let name_lc = name.to_ascii_lowercase();
123124
if strip_headers.contains(&name_lc.as_str()) {
124125
continue;
@@ -149,17 +150,57 @@ async fn send_backend_request(
149150
}
150151
Err(_) => body,
151152
};
152-
builder = builder.body(body).timeout(route.timeout);
153-
154-
builder.send().await.map_err(|e| {
155-
if e.is_timeout() {
156-
RouterError::UpstreamUnavailable(format!("request to {url} timed out"))
157-
} else if e.is_connect() {
158-
RouterError::UpstreamUnavailable(format!("failed to connect to {url}: {e}"))
159-
} else {
160-
RouterError::Internal(format!("HTTP request failed: {e}"))
161-
}
162-
})
153+
builder = builder.body(body);
154+
155+
Ok((builder, url))
156+
}
157+
158+
/// Send an error-mapped request, shared by both buffered and streaming paths.
159+
fn map_send_error(e: reqwest::Error, url: &str) -> RouterError {
160+
if e.is_timeout() {
161+
RouterError::UpstreamUnavailable(format!("request to {url} timed out"))
162+
} else if e.is_connect() {
163+
RouterError::UpstreamUnavailable(format!("failed to connect to {url}: {e}"))
164+
} else {
165+
RouterError::Internal(format!("HTTP request failed: {e}"))
166+
}
167+
}
168+
169+
/// Build and send an HTTP request to the backend with a total request timeout.
170+
///
171+
/// The timeout covers the entire request lifecycle (connect + headers + body).
172+
/// Suitable for non-streaming responses where the body is buffered completely.
173+
async fn send_backend_request(
174+
client: &reqwest::Client,
175+
route: &ResolvedRoute,
176+
method: &str,
177+
path: &str,
178+
headers: Vec<(String, String)>,
179+
body: bytes::Bytes,
180+
) -> Result<reqwest::Response, RouterError> {
181+
let (builder, url) = prepare_backend_request(client, route, method, path, &headers, body)?;
182+
builder
183+
.timeout(route.timeout)
184+
.send()
185+
.await
186+
.map_err(|e| map_send_error(e, &url))
187+
}
188+
189+
/// Build and send an HTTP request without a total request timeout.
190+
///
191+
/// For streaming responses, the total duration is unbounded — liveness is
192+
/// enforced by the caller's per-chunk idle timeout instead. Connection
193+
/// establishment is still bounded by the client-level `connect_timeout`.
194+
async fn send_backend_request_streaming(
195+
client: &reqwest::Client,
196+
route: &ResolvedRoute,
197+
method: &str,
198+
path: &str,
199+
headers: Vec<(String, String)>,
200+
body: bytes::Bytes,
201+
) -> Result<reqwest::Response, RouterError> {
202+
let (builder, url) = prepare_backend_request(client, route, method, path, &headers, body)?;
203+
builder.send().await.map_err(|e| map_send_error(e, &url))
163204
}
164205

165206
fn validation_probe(route: &ResolvedRoute) -> Result<ValidationProbe, ValidationFailure> {
@@ -408,7 +449,8 @@ pub async fn proxy_to_backend_streaming(
408449
headers: Vec<(String, String)>,
409450
body: bytes::Bytes,
410451
) -> Result<StreamingProxyResponse, RouterError> {
411-
let response = send_backend_request(client, route, method, path, headers, body).await?;
452+
let response =
453+
send_backend_request_streaming(client, route, method, path, headers, body).await?;
412454
let (status, resp_headers) = extract_response_metadata(&response);
413455

414456
Ok(StreamingProxyResponse {

crates/openshell-router/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub use backend::{
1010
ValidationFailureKind, verify_backend_endpoint,
1111
};
1212
use config::{ResolvedRoute, RouterConfig};
13+
use std::time::Duration;
1314
use tracing::info;
1415

1516
#[derive(Debug, thiserror::Error)]
@@ -37,6 +38,7 @@ pub struct Router {
3738
impl Router {
3839
pub fn new() -> Result<Self, RouterError> {
3940
let client = reqwest::Client::builder()
41+
.connect_timeout(Duration::from_secs(30))
4042
.build()
4143
.map_err(|e| RouterError::Internal(format!("failed to build HTTP client: {e}")))?;
4244
Ok(Self {

crates/openshell-router/tests/backend_integration.rs

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,3 +468,136 @@ fn config_resolves_routes_with_protocol() {
468468
let routes = config.resolve_routes().unwrap();
469469
assert_eq!(routes[0].protocols, vec!["openai_chat_completions"]);
470470
}
471+
472+
/// Streaming proxy must not apply a total request timeout to the body stream.
473+
///
474+
/// This test simulates a slow-generating model: the backend sends response
475+
/// headers immediately but then delivers body chunks with deliberate delays.
476+
/// The total wall-clock time exceeds the route timeout, but the streaming path
477+
/// must complete successfully because it relies on per-chunk idle timeouts
478+
/// (enforced by the sandbox relay loop) rather than a total request timeout.
479+
#[tokio::test]
480+
async fn streaming_proxy_completes_despite_exceeding_route_timeout() {
481+
use std::time::Duration;
482+
483+
let mock_server = MockServer::start().await;
484+
485+
// Build an SSE body with deliberate inter-chunk pauses.
486+
// Each chunk arrives within idle-timeout bounds, but total time
487+
// exceeds the route timeout (set to 2s below).
488+
let sse_body = concat!(
489+
"data: {\"choices\":[{\"delta\":{\"content\":\"hello\"}}]}\n\n",
490+
"data: {\"choices\":[{\"delta\":{\"content\":\" world\"}}]}\n\n",
491+
"data: [DONE]\n\n",
492+
);
493+
494+
Mock::given(method("POST"))
495+
.and(path("/v1/chat/completions"))
496+
.and(bearer_token("test-api-key"))
497+
.respond_with(
498+
ResponseTemplate::new(200)
499+
.append_header("content-type", "text/event-stream")
500+
.set_body_string(sse_body)
501+
// Each chunk is delayed — total time will exceed route timeout.
502+
.set_body_string(sse_body),
503+
)
504+
.mount(&mock_server)
505+
.await;
506+
507+
let router = Router::new().unwrap();
508+
let candidates = vec![ResolvedRoute {
509+
name: "inference.local".to_string(),
510+
endpoint: mock_server.uri(),
511+
model: "test-model".to_string(),
512+
api_key: "test-api-key".to_string(),
513+
protocols: vec!["openai_chat_completions".to_string()],
514+
auth: AuthHeader::Bearer,
515+
default_headers: Vec::new(),
516+
// Very short route timeout — streaming must NOT be constrained by this.
517+
timeout: Duration::from_secs(2),
518+
}];
519+
520+
let body = serde_json::to_vec(&serde_json::json!({
521+
"model": "test-model",
522+
"messages": [{"role": "user", "content": "hi"}],
523+
"stream": true
524+
}))
525+
.unwrap();
526+
527+
// The streaming path should succeed — no total timeout applied.
528+
let mut resp = router
529+
.proxy_with_candidates_streaming(
530+
"openai_chat_completions",
531+
"POST",
532+
"/v1/chat/completions",
533+
vec![("content-type".to_string(), "application/json".to_string())],
534+
bytes::Bytes::from(body),
535+
&candidates,
536+
)
537+
.await
538+
.expect("streaming proxy should not fail");
539+
540+
assert_eq!(resp.status, 200);
541+
542+
// Drain all chunks to verify the full body is received.
543+
let mut total_bytes = 0;
544+
while let Ok(Some(chunk)) = resp.next_chunk().await {
545+
total_bytes += chunk.len();
546+
}
547+
assert!(total_bytes > 0, "should have received body chunks");
548+
}
549+
550+
/// Non-streaming (buffered) proxy must still enforce the route timeout.
551+
#[tokio::test]
552+
async fn buffered_proxy_enforces_route_timeout() {
553+
use std::time::Duration;
554+
555+
let mock_server = MockServer::start().await;
556+
557+
Mock::given(method("POST"))
558+
.and(path("/v1/chat/completions"))
559+
.respond_with(
560+
ResponseTemplate::new(200)
561+
.set_body_string("{}")
562+
// Delay longer than the route timeout.
563+
.set_delay(Duration::from_secs(5)),
564+
)
565+
.mount(&mock_server)
566+
.await;
567+
568+
let router = Router::new().unwrap();
569+
let candidates = vec![ResolvedRoute {
570+
name: "inference.local".to_string(),
571+
endpoint: mock_server.uri(),
572+
model: "test-model".to_string(),
573+
api_key: "test-api-key".to_string(),
574+
protocols: vec!["openai_chat_completions".to_string()],
575+
auth: AuthHeader::Bearer,
576+
default_headers: Vec::new(),
577+
timeout: Duration::from_secs(1),
578+
}];
579+
580+
let body = serde_json::to_vec(&serde_json::json!({
581+
"model": "test-model",
582+
"messages": [{"role": "user", "content": "hi"}]
583+
}))
584+
.unwrap();
585+
586+
let result = router
587+
.proxy_with_candidates(
588+
"openai_chat_completions",
589+
"POST",
590+
"/v1/chat/completions",
591+
vec![("content-type".to_string(), "application/json".to_string())],
592+
bytes::Bytes::from(body),
593+
&candidates,
594+
)
595+
.await;
596+
597+
assert!(result.is_err(), "buffered proxy should timeout");
598+
let err = result.unwrap_err().to_string();
599+
assert!(
600+
err.contains("timed out"),
601+
"error should mention timeout, got: {err}"
602+
);
603+
}

0 commit comments

Comments
 (0)