Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ http = "0.2.9"
hyper = { version = "1.0.0-rc.4", features = ["client","server","http1"] }
clap = { version = "4", features = ["derive"] }
tokio-socks = "0.5"
tokio = { version = "1.28", features = ["macros", "rt-multi-thread", "signal", "process"] }
tokio = { version = "1.28", features = ["macros", "rt-multi-thread", "signal", "process", "io-util"] }
bytes = "1.4.0"
http-body-util = "0.1.0-rc.2"
tracing = "0.1.37"
Expand Down
51 changes: 33 additions & 18 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,34 +391,49 @@ async fn proxy(
.handshake(io)
.await?;

// Spawn a task to drive the HTTP connection
let conn_task = tokio::spawn(async move {
if let Err(e) = conn.await {
debug!("HTTP #{} connection ended: {}", conn_id, e);
// Drive the HTTP connection with a timeout and proper shutdown
tokio::task::spawn(async move {
let mut conn = Some(conn);
let result = tokio::time::timeout(tokio::time::Duration::from_secs(30), async {
conn.as_mut().unwrap().await
})
.await;

let remaining = ACTIVE_SOCKS5_CONNECTIONS.fetch_sub(1, Ordering::Relaxed) - 1;

match result {
Ok(Ok(_)) => debug!(
"HTTP #{} connection completed, {} active",
conn_id, remaining
),
Ok(Err(e)) => debug!(
"HTTP #{} connection ended: {}, {} active",
conn_id, e, remaining
),
Err(_) => {
debug!(
"HTTP #{} connection timed out, {} active",
conn_id, remaining
);
if let Some(conn) = conn.take() {
let parts = conn.into_parts();
let mut io = parts.io.into_inner();
if let Err(e) = io.shutdown().await {
debug!("HTTP #{} shutdown error after timeout: {}", conn_id, e);
}
}
}
}
});

// Channel to signal when the request is finished
let (closed_tx, closed) = tokio::sync::oneshot::channel();

// Send the request with Connection: close header
let mut resp = sender.send_request(req).await?;
resp.headers_mut()
.insert("Connection", HeaderValue::from_static("close"));

// Drop the sender to allow the closed future to resolve
// Drop the sender to terminate the connection once done
drop(sender);

// Ensure the SOCKS5 stream is closed when the client disconnects
tokio::spawn(async move {
let _ = closed.await;
conn_task.abort();
ACTIVE_SOCKS5_CONNECTIONS.fetch_sub(1, Ordering::Relaxed);
});

// Signal that the HTTP exchange has completed
let _ = closed_tx.send(());

// Critical fix: Ensure response body will be properly handled
// The response body must be consumed by the client to prevent CLOSE_WAIT
// We return the response as-is, but the HTTP framework will handle consumption
Expand Down