Skip to content

Thin Wrapper for Owned Traits #70

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,771 changes: 0 additions & 1,771 deletions Cargo.lock

This file was deleted.

13 changes: 11 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,15 @@ axum-core = { version = "0.4.3", optional = true }
http = { version = "1", optional = true }
async-trait = { version = "0.1", optional = true }


# Futrues integration
futures-lite = {version = "2.2.0", optional = true}

[features]
default = ["simd"]
default = ["simd", "upgrade", "unstable-split"]
simd = ["simdutf8/aarch64_neon"]
upgrade = ["hyper", "pin-project", "base64", "sha1", "hyper-util", "http-body-util"]
futures = ["futures-lite"]
unstable-split = []
# Axum integration
with_axum = ["axum-core", "http", "async-trait"]
Expand All @@ -65,9 +70,13 @@ assert2 = "0.3.4"
trybuild = "1.0.80"
criterion = "0.4.0"
anyhow = "1.0.71"
webpki-roots = "0.23.0"
webpki-roots = "0.25.2"
bytes = "1.4.0"
axum = "0.7.4"
async-std = {version ="1.12.0", features=["attributes", "unstable"]}
futures-rustls = "0.24.0"
futures-lite = "2.2.0"


[[bench]]
name = "unmask"
Expand Down
27 changes: 24 additions & 3 deletions examples/autobahn_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,17 @@ use hyper::header::CONNECTION;
use hyper::header::UPGRADE;
use hyper::upgrade::Upgraded;
use hyper::Request;
use hyper_util::rt::TokioIo;

#[cfg(not(feature = "futures"))]
use hyper_util::rt::TokioIo as IoWrapper;
#[cfg(not(feature = "futures"))]
use tokio::net::TcpStream;

#[cfg(feature = "futures")]
use async_std::net::TcpStream;
#[cfg(feature = "futures")]
use fastwebsockets::FuturesIo as IoWrapper;

struct SpawnExecutor;

impl<Fut> hyper::rt::Executor<Fut> for SpawnExecutor
Expand All @@ -39,7 +47,7 @@ where
}
}

async fn connect(path: &str) -> Result<FragmentCollector<TokioIo<Upgraded>>> {
async fn connect(path: &str) -> Result<FragmentCollector<IoWrapper<Upgraded>>> {
let stream = TcpStream::connect("localhost:9001").await?;

let req = Request::builder()
Expand Down Expand Up @@ -67,7 +75,19 @@ async fn get_case_count() -> Result<u32> {
Ok(std::str::from_utf8(&msg.payload)?.parse()?)
}

#[tokio::main(flavor = "current_thread")]
macro_rules! runtime_main {
($($body:tt)*) => {
#[cfg(feature = "futures")]
#[async_std::main]
$($body)*

#[cfg(not(feature = "futures"))]
#[tokio::main]
$($body)*
};
}

runtime_main! {
async fn main() -> Result<()> {
let count = get_case_count().await?;

Expand Down Expand Up @@ -103,3 +123,4 @@ async fn main() -> Result<()> {

Ok(())
}
}
76 changes: 55 additions & 21 deletions examples/echo_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#[cfg(feature = "futures")]
use async_std::net::TcpListener;
use fastwebsockets::upgrade;
use fastwebsockets::OpCode;
use fastwebsockets::WebSocketError;
Expand All @@ -22,6 +24,7 @@ use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::Request;
use hyper::Response;
#[cfg(not(feature = "futures"))]
use tokio::net::TcpListener;

async fn handle_client(fut: upgrade::UpgradeFut) -> Result<(), WebSocketError> {
Expand All @@ -45,36 +48,67 @@ async fn server_upgrade(
) -> Result<Response<Empty<Bytes>>, WebSocketError> {
let (response, fut) = upgrade::upgrade(&mut req)?;

#[cfg(not(feature = "futures"))]
tokio::task::spawn(async move {
if let Err(e) = tokio::task::unconstrained(handle_client(fut)).await {
eprintln!("Error in websocket connection: {}", e);
}
});

#[cfg(feature = "futures")]
async_std::task::spawn(async move {
if let Err(e) = handle_client(fut).await {
eprintln!("Error in websocket connection: {}", e);
}
});
Ok(response)
}

fn main() -> Result<(), WebSocketError> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap();
#[cfg(feature = "futures")]
{
async_std::task::block_on(async move {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Server started, listening on {}", "127.0.0.1:8080");
loop {
let (stream, _) = listener.accept().await?;
println!("client connected");
async_std::task::spawn(async move {
let io = fastwebsockets::FuturesIo::new(stream);
let conn_fut = http1::Builder::new()
.serve_connection(io, service_fn(server_upgrade))
.with_upgrades();
if let Err(e) = conn_fut.await {
println!("An error occured {:?}", e);
}
});
}
})
}

rt.block_on(async move {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Server started, listening on {}", "127.0.0.1:8080");
loop {
let (stream, _) = listener.accept().await?;
println!("Client connected");
tokio::spawn(async move {
let io = hyper_util::rt::TokioIo::new(stream);
let conn_fut = http1::Builder::new()
.serve_connection(io, service_fn(server_upgrade))
.with_upgrades();
if let Err(e) = conn_fut.await {
println!("An error occurred: {:?}", e);
}
});
}
})
#[cfg(not(feature = "futures"))]
{
let rt = tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap();

rt.block_on(async move {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Server started, listening on {}", "127.0.0.1:8080");
loop {
let (stream, _) = listener.accept().await?;
println!("Client connected");
tokio::spawn(async move {
let io = hyper_util::rt::TokioIo::new(stream);
let conn_fut = http1::Builder::new()
.serve_connection(io, service_fn(server_upgrade))
.with_upgrades();
if let Err(e) = conn_fut.await {
println!("An error occurred: {:?}", e);
}
});
}
})
}
}
82 changes: 61 additions & 21 deletions examples/echo_server_split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#[cfg(feature = "futures")]
use async_std::net::TcpListener;
use fastwebsockets::upgrade;
use fastwebsockets::FragmentCollectorRead;
use fastwebsockets::OpCode;
Expand All @@ -23,11 +25,16 @@ use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::Request;
use hyper::Response;
#[cfg(not(feature = "futures"))]
use tokio::net::TcpListener;

async fn handle_client(fut: upgrade::UpgradeFut) -> Result<(), WebSocketError> {
let ws = fut.await?;

#[cfg(not(feature = "futures"))]
let (rx, mut tx) = ws.split(tokio::io::split);
#[cfg(feature = "futures")]
let (rx, mut tx) = ws.split(futures_lite::io::split);
let mut rx = FragmentCollectorRead::new(rx);
loop {
// Empty send_fn is fine because the benchmark does not create obligated writes.
Expand All @@ -47,41 +54,74 @@ async fn handle_client(fut: upgrade::UpgradeFut) -> Result<(), WebSocketError> {

Ok(())
}

async fn server_upgrade(
mut req: Request<Incoming>,
) -> Result<Response<Empty<Bytes>>, WebSocketError> {
let (response, fut) = upgrade::upgrade(&mut req)?;

#[cfg(not(feature = "futures"))]
tokio::task::spawn(async move {
if let Err(e) = tokio::task::unconstrained(handle_client(fut)).await {
eprintln!("Error in websocket connection: {}", e);
}
});

#[cfg(feature = "futures")]
async_std::task::spawn(async move {
if let Err(e) = handle_client(fut).await {
eprintln!("Error in websocket connection: {}", e);
}
});

Ok(response)
}

fn main() -> Result<(), WebSocketError> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap();
#[cfg(feature = "futures")]
{
async_std::task::block_on(async move {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Server started, listening on {}", "127.0.0.1:8080");
loop {
let (stream, _) = listener.accept().await?;
println!("client connected");
async_std::task::spawn(async move {
let io = fastwebsockets::FuturesIo::new(stream);
let conn_fut = http1::Builder::new()
.serve_connection(io, service_fn(server_upgrade))
.with_upgrades();
if let Err(e) = conn_fut.await {
println!("An error occured {:?}", e);
}
});
}
})
}

rt.block_on(async move {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Server started, listening on {}", "127.0.0.1:8080");
loop {
let (stream, _) = listener.accept().await?;
println!("Client connected");
tokio::spawn(async move {
let io = hyper_util::rt::TokioIo::new(stream);
let conn_fut = http1::Builder::new()
.serve_connection(io, service_fn(server_upgrade))
.with_upgrades();
if let Err(e) = conn_fut.await {
println!("An error occurred: {:?}", e);
}
});
}
})
#[cfg(not(feature = "futures"))]
{
let rt = tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap();

rt.block_on(async move {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Server started, listening on {}", "127.0.0.1:8080");
loop {
let (stream, _) = listener.accept().await?;
println!("Client connected");
tokio::spawn(async move {
let io = hyper_util::rt::TokioIo::new(stream);
let conn_fut = http1::Builder::new()
.serve_connection(io, service_fn(server_upgrade))
.with_upgrades();
if let Err(e) = conn_fut.await {
println!("An error occurred: {:?}", e);
}
});
}
})
}
}
Loading