Skip to content

Commit 558aa86

Browse files
authored
Merge pull request ikatson#443 from ikatson/librqbit-utp-vectored-read
[perf] librqbit-utp vectored read + vectored write
2 parents 8c5b013 + 19b61ff commit 558aa86

File tree

5 files changed

+27
-40
lines changed

5 files changed

+27
-40
lines changed

Cargo.lock

Lines changed: 12 additions & 24 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/librqbit/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ walkdir = "2.5.0"
123123
arc-swap = "1.7.1"
124124
intervaltree = "0.2.7"
125125
async-compression = { version = "0.4.18", features = ["tokio", "gzip"] }
126-
librqbit-utp = { version = "0.5.5", features = ["export-metrics"] }
126+
librqbit-utp = { version = "0.5.7", features = ["export-metrics"] }
127127
axum-extra = { version = "0.10.1", features = ["query"] }
128128
librqbit-dualstack-sockets = { version = "0.6.5", features = ["axum"] }
129129
socket2 = "0.5.10"

crates/librqbit/src/listen.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,7 @@ use tokio::io::AsyncWrite;
1010
use tokio_util::sync::CancellationToken;
1111
use tracing::info;
1212

13-
use crate::{
14-
stream_connect::ConnectionKind,
15-
vectored_traits::{AsyncReadVectored, AsyncReadVectoredIntoCompat},
16-
};
13+
use crate::{stream_connect::ConnectionKind, vectored_traits::AsyncReadVectored};
1714

1815
pub(crate) struct ListenResult {
1916
pub tcp_socket: Option<TcpListener>,
@@ -140,7 +137,7 @@ pub(crate) trait Accept {
140137
SocketAddr,
141138
(
142139
impl AsyncReadVectored + Send + 'static,
143-
(impl AsyncWrite + Unpin + Send + 'static),
140+
impl AsyncWrite + Unpin + Send + 'static,
144141
),
145142
)>;
146143
}
@@ -176,6 +173,6 @@ impl Accept for Arc<UtpSocketUdp> {
176173
let stream = self.accept().await.context("error accepting uTP")?;
177174
let addr = stream.remote_addr();
178175
let (read, write) = stream.split();
179-
Ok((addr, (read.into_vectored_compat(), write)))
176+
Ok((addr, (read, write)))
180177
}
181178
}

crates/librqbit/src/stream_connect.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ impl StreamConnector {
255255
match utp_res {
256256
Ok(Some(stream)) => {
257257
let (r, w) = stream.split();
258-
return Ok((ConnectionKind::Utp, Box::new(r.into_vectored_compat()), Box::new(w)));
258+
return Ok((ConnectionKind::Utp, Box::new(r), Box::new(w)));
259259
},
260260
Ok(None) => {
261261
utp_err = Some(None);

crates/librqbit/src/vectored_traits.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,16 @@ impl AsyncReadVectored for tokio::net::tcp::OwnedReadHalf {
5959
}
6060
}
6161

62+
impl AsyncReadVectored for librqbit_utp::UtpStreamReadHalf {
63+
fn poll_read_vectored(
64+
self: Pin<&mut Self>,
65+
cx: &mut std::task::Context<'_>,
66+
vec: &mut [IoSliceMut<'_>],
67+
) -> Poll<std::io::Result<usize>> {
68+
librqbit_utp::UtpStreamReadHalf::poll_read_vectored(self, cx, vec)
69+
}
70+
}
71+
6272
pub struct AsyncReadToVectoredCompat<T>(T);
6373

6474
impl<T: AsyncRead + Unpin> AsyncRead for AsyncReadToVectoredCompat<T> {
@@ -72,14 +82,6 @@ impl<T: AsyncRead + Unpin> AsyncRead for AsyncReadToVectoredCompat<T> {
7282
}
7383

7484
impl<T: AsyncRead + Unpin> AsyncReadVectored for AsyncReadToVectoredCompat<T> {
75-
// async fn read_vectored(&mut self, vec: &mut [IoSliceMut<'_>]) -> std::io::Result<usize> {
76-
// let first_non_empty = match vec.iter_mut().find(|s| !s.is_empty()) {
77-
// Some(s) => &mut **s,
78-
// None => return Ok(0),
79-
// };
80-
// self.read(first_non_empty).await
81-
// }
82-
8385
fn poll_read_vectored(
8486
self: Pin<&mut Self>,
8587
cx: &mut std::task::Context<'_>,

0 commit comments

Comments
 (0)