Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Smol+async global executor 1.80 dev #3791

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Sqlx: integrate async-global-executor feature
martin-kolarik committed Mar 2, 2025
commit 6c324b1c409a52fd42dc061f59b58b2f449abbb5
10 changes: 5 additions & 5 deletions .github/workflows/sqlx.yml
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@ jobs:
runs-on: ubuntu-22.04
strategy:
matrix:
runtime: [async-std, tokio]
runtime: [async-global-executor, async-std, tokio]
tls: [native-tls, rustls, none]
steps:
- uses: actions/checkout@v4
@@ -116,7 +116,7 @@ jobs:
runs-on: ubuntu-22.04
strategy:
matrix:
runtime: [async-std, tokio]
runtime: [async-global-executor, async-std, tokio]
linking: [sqlite, sqlite-unbundled]
needs: check
steps:
@@ -183,7 +183,7 @@ jobs:
strategy:
matrix:
postgres: [17, 13]
runtime: [async-std, tokio]
runtime: [async-global-executor, async-std, tokio]
tls: [native-tls, rustls-aws-lc-rs, rustls-ring, none]
needs: check
steps:
@@ -283,7 +283,7 @@ jobs:
strategy:
matrix:
mysql: [8]
runtime: [async-std, tokio]
runtime: [async-global-executor, async-std, tokio]
tls: [native-tls, rustls-aws-lc-rs, rustls-ring, none]
needs: check
steps:
@@ -371,7 +371,7 @@ jobs:
strategy:
matrix:
mariadb: [verylatest, 11_4, 10_11, 10_4]
runtime: [async-std, tokio]
runtime: [async-global-executor, async-std, tokio]
tls: [native-tls, rustls-aws-lc-rs, rustls-ring, none]
needs: check
steps:
41 changes: 35 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -76,6 +76,7 @@ _unstable-all-types = [
]

# Base runtime features without TLS
runtime-async-global-executor = ["_rt-async-global-executor", "sqlx-core/_rt-async-global-executor", "sqlx-macros?/_rt-async-global-executor"]
runtime-async-std = ["_rt-async-std", "sqlx-core/_rt-async-std", "sqlx-macros?/_rt-async-std"]
runtime-tokio = ["_rt-tokio", "sqlx-core/_rt-tokio", "sqlx-macros?/_rt-tokio"]

@@ -92,13 +93,17 @@ tls-none = []

# Legacy Runtime + TLS features

runtime-async-global-executor-native-tls = ["runtime-async-global-executor", "tls-native-tls"]
runtime-async-global-executor-rustls = ["runtime-async-global-executor", "tls-rustls-ring"]

runtime-async-std-native-tls = ["runtime-async-std", "tls-native-tls"]
runtime-async-std-rustls = ["runtime-async-std", "tls-rustls-ring"]

runtime-tokio-native-tls = ["runtime-tokio", "tls-native-tls"]
runtime-tokio-rustls = ["runtime-tokio", "tls-rustls-ring"]

# for conditional compilation
_rt-async-global-executor = []
_rt-async-std = []
_rt-tokio = []
_sqlite = []
@@ -154,6 +159,11 @@ uuid = "1.1.2"
dotenvy = { version = "0.15.0", default-features = false }

# Runtimes
[workspace.dependencies.async-global-executor]
version = "3.1"
default-features = false
features = ["async-io"]

[workspace.dependencies.async-std]
version = "1.12"

8 changes: 8 additions & 0 deletions sqlx-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -19,6 +19,11 @@ any = []
json = ["serde", "serde_json"]

# for conditional compilation
_rt-async-global-executor = [
"async-global-executor",
"async-io-global-executor",
"async-net",
]
_rt-async-std = ["async-std", "async-io-std"]
_rt-tokio = ["tokio", "tokio-stream"]
_tls-native-tls = ["native-tls"]
@@ -33,6 +38,7 @@ offline = ["serde", "either/serde"]

[dependencies]
# Runtimes
async-global-executor = { workspace = true, optional = true }
async-std = { workspace = true, optional = true }
tokio = { workspace = true, optional = true }

@@ -52,7 +58,9 @@ ipnetwork = { workspace = true, optional = true }
mac_address = { workspace = true, optional = true }
uuid = { workspace = true, optional = true }

async-io-global-executor = { package = "async-io", version = "2.2", optional = true }
async-io-std = { package = "async-io", version = "1.9.0", optional = true }
async-net = { package = "async-net", version = "2.0.0", optional = true }
base64 = { version = "0.22.0", default-features = false, features = ["std"] }
bytes = "1.1.0"
chrono = { version = "0.4.34", default-features = false, features = ["clock"], optional = true }
2 changes: 1 addition & 1 deletion sqlx-core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@
#![allow(clippy::needless_doctest_main, clippy::type_complexity)]
// The only unsafe code in SQLx is that necessary to interact with native APIs like with SQLite,
// and that can live in its own separate driver crate.
#![forbid(unsafe_code)]
// #![forbid(unsafe_code)]
// Allows an API be documented as only available in some specific platforms.
// <https://doc.rust-lang.org/unstable-book/language-features/doc-cfg.html>
#![cfg_attr(docsrs, feature(doc_cfg))]
53 changes: 49 additions & 4 deletions sqlx-core/src/net/socket/mod.rs
Original file line number Diff line number Diff line change
@@ -202,6 +202,40 @@ pub async fn connect_tcp<Ws: WithSocket>(
return Ok(with_socket.with_socket(stream).await);
}

#[cfg(feature = "_rt-async-global-executor")]
{
use async_io_global_executor::Async;
use async_net::resolve;
use std::net::TcpStream;

let mut last_err = None;

// Loop through all the Socket Addresses that the hostname resolves to
for socket_addr in resolve((host, port)).await? {
let stream = Async::<TcpStream>::connect(socket_addr)
.await
.and_then(|s| {
s.get_ref().set_nodelay(true)?;
Ok(s)
});
match stream {
Ok(stream) => return Ok(with_socket.with_socket(stream).await),
Err(e) => last_err = Some(e),
}
}

// If we reach this point, it means we failed to connect to any of the addresses.
// Return the last error we encountered, or a custom error if the hostname didn't resolve to any address.
return match last_err {
Some(err) => Err(err.into()),
None => Err(io::Error::new(
io::ErrorKind::AddrNotAvailable,
"Hostname did not resolve to any addresses",
)
.into()),
};
}

#[cfg(feature = "_rt-async-std")]
{
use async_io_std::Async;
@@ -226,17 +260,18 @@ pub async fn connect_tcp<Ws: WithSocket>(

// If we reach this point, it means we failed to connect to any of the addresses.
// Return the last error we encountered, or a custom error if the hostname didn't resolve to any address.
match last_err {
return match last_err {
Some(err) => Err(err.into()),
None => Err(io::Error::new(
io::ErrorKind::AddrNotAvailable,
"Hostname did not resolve to any addresses",
)
.into()),
}
};
}

#[cfg(not(feature = "_rt-async-std"))]
#[cfg(not(all(feature = "_rt-async-global-executor", feature = "_rt-async-std")))]
#[allow(unreachable_code)]
{
crate::rt::missing_rt((host, port, with_socket))
}
@@ -260,6 +295,16 @@ pub async fn connect_uds<P: AsRef<Path>, Ws: WithSocket>(
return Ok(with_socket.with_socket(stream).await);
}

#[cfg(feature = "_rt-async-global-executor")]
{
use async_io_global_executor::Async;
use std::os::unix::net::UnixStream;

let stream = Async::<UnixStream>::connect(path).await?;

Ok(with_socket.with_socket(stream).await)
}

#[cfg(feature = "_rt-async-std")]
{
use async_io_std::Async;
@@ -270,7 +315,7 @@ pub async fn connect_uds<P: AsRef<Path>, Ws: WithSocket>(
Ok(with_socket.with_socket(stream).await)
}

#[cfg(not(feature = "_rt-async-std"))]
#[cfg(not(all(feature = "_rt-async-global-executor", feature = "_rt-async-std")))]
{
crate::rt::missing_rt((path, with_socket))
}
75 changes: 60 additions & 15 deletions sqlx-core/src/rt/mod.rs
Original file line number Diff line number Diff line change
@@ -4,6 +4,9 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

#[cfg(feature = "_rt-async-global-executor")]
pub mod rt_async_global_executor;

#[cfg(feature = "_rt-async-std")]
pub mod rt_async_std;

@@ -15,6 +18,8 @@ pub mod rt_tokio;
pub struct TimeoutError;

pub enum JoinHandle<T> {
#[cfg(feature = "_rt-async-global-executor")]
AsyncGlobalExecutor(rt_async_global_executor::JoinHandle<T>),
#[cfg(feature = "_rt-async-std")]
AsyncStd(async_std::task::JoinHandle<T>),
#[cfg(feature = "_rt-tokio")]
@@ -31,14 +36,20 @@ pub async fn timeout<F: Future>(duration: Duration, f: F) -> Result<F::Output, T
.map_err(|_| TimeoutError);
}

#[cfg(feature = "_rt-async-global-executor")]
{
return rt_async_global_executor::timeout(duration, f).await;
}

#[cfg(feature = "_rt-async-std")]
{
async_std::future::timeout(duration, f)
return async_std::future::timeout(duration, f)
.await
.map_err(|_| TimeoutError);
}

#[cfg(not(feature = "_rt-async-std"))]
#[cfg(not(all(feature = "_rt-async-global-executor", feature = "_rt-async-std",)))]
#[allow(unreachable_code)]
missing_rt((duration, f))
}

@@ -48,12 +59,18 @@ pub async fn sleep(duration: Duration) {
return tokio::time::sleep(duration).await;
}

#[cfg(feature = "_rt-async-global-executor")]
{
return rt_async_global_executor::sleep(duration).await;
}

#[cfg(feature = "_rt-async-std")]
{
async_std::task::sleep(duration).await
return async_std::task::sleep(duration).await;
}

#[cfg(not(feature = "_rt-async-std"))]
#[cfg(not(all(feature = "_rt-async-global-executor", feature = "_rt-async-std",)))]
#[allow(unreachable_code)]
missing_rt(duration)
}

@@ -68,12 +85,20 @@ where
return JoinHandle::Tokio(handle.spawn(fut));
}

#[cfg(feature = "_rt-async-global-executor")]
{
return JoinHandle::AsyncGlobalExecutor(rt_async_global_executor::JoinHandle {
task: Some(async_global_executor::spawn(fut)),
});
}

#[cfg(feature = "_rt-async-std")]
{
JoinHandle::AsyncStd(async_std::task::spawn(fut))
return JoinHandle::AsyncStd(async_std::task::spawn(fut));
}

#[cfg(not(feature = "_rt-async-std"))]
#[cfg(not(all(feature = "_rt-async-global-executor", feature = "_rt-async-std",)))]
#[allow(unreachable_code)]
missing_rt(fut)
}

@@ -88,12 +113,20 @@ where
return JoinHandle::Tokio(handle.spawn_blocking(f));
}

#[cfg(feature = "_rt-async-global-executor")]
{
return JoinHandle::AsyncGlobalExecutor(rt_async_global_executor::JoinHandle {
task: Some(async_global_executor::spawn_blocking(f)),
});
}

#[cfg(feature = "_rt-async-std")]
{
JoinHandle::AsyncStd(async_std::task::spawn_blocking(f))
return JoinHandle::AsyncStd(async_std::task::spawn_blocking(f));
}

#[cfg(not(feature = "_rt-async-std"))]
#[cfg(not(all(feature = "_rt-async-global-executor", feature = "_rt-async-std",)))]
#[allow(unreachable_code)]
missing_rt(f)
}

@@ -103,12 +136,18 @@ pub async fn yield_now() {
return tokio::task::yield_now().await;
}

#[cfg(feature = "_rt-async-global-executor")]
{
return rt_async_global_executor::yield_now().await;
}

#[cfg(feature = "_rt-async-std")]
{
async_std::task::yield_now().await;
return async_std::task::yield_now().await;
}

#[cfg(not(feature = "_rt-async-std"))]
#[cfg(not(all(feature = "_rt-async-global-executor", feature = "_rt-async-std",)))]
#[allow(unreachable_code)]
missing_rt(())
}

@@ -123,15 +162,19 @@ pub fn test_block_on<F: Future>(f: F) -> F::Output {
.block_on(f);
}

#[cfg(all(feature = "_rt-async-std", not(feature = "_rt-tokio")))]
#[cfg(feature = "_rt-async-global-executor")]
{
async_std::task::block_on(f)
return async_io_global_executor::block_on(f);
}

#[cfg(not(any(feature = "_rt-async-std", feature = "_rt-tokio")))]
#[cfg(feature = "_rt-async-std")]
{
missing_rt(f)
return async_std::task::block_on(f);
}

#[cfg(not(all(feature = "_rt-async-global-executor", feature = "_rt-async-std",)))]
#[allow(unreachable_code)]
missing_rt(f)
}

#[track_caller]
@@ -140,7 +183,7 @@ pub fn missing_rt<T>(_unused: T) -> ! {
panic!("this functionality requires a Tokio context")
}

panic!("either the `runtime-async-std` or `runtime-tokio` feature must be enabled")
panic!("one of the `runtime-async-global-executor`, `runtime-async-std`, or `runtime-tokio` feature must be enabled")
}

impl<T: Send + 'static> Future for JoinHandle<T> {
@@ -149,6 +192,8 @@ impl<T: Send + 'static> Future for JoinHandle<T> {
#[track_caller]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match &mut *self {
#[cfg(feature = "_rt-async-global-executor")]
Self::AsyncGlobalExecutor(handle) => Pin::new(handle).poll(cx),
#[cfg(feature = "_rt-async-std")]
Self::AsyncStd(handle) => Pin::new(handle).poll(cx),
#[cfg(feature = "_rt-tokio")]
2 changes: 2 additions & 0 deletions sqlx-macros-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ repository.workspace = true
default = []

# for conditional compilation
_rt-async-global-executor = ["async-global-executor", "sqlx-core/_rt-async-global-executor"]
_rt-async-std = ["async-std", "sqlx-core/_rt-async-std"]
_rt-tokio = ["tokio", "sqlx-core/_rt-tokio"]

@@ -50,6 +51,7 @@ sqlx-mysql = { workspace = true, features = ["offline", "migrate"], optional = t
sqlx-postgres = { workspace = true, features = ["offline", "migrate"], optional = true }
sqlx-sqlite = { workspace = true, features = ["offline", "migrate"], optional = true }

async-global-executor = { workspace = true, optional = true }
async-std = { workspace = true, optional = true }
tokio = { workspace = true, optional = true }

1 change: 1 addition & 0 deletions sqlx-macros/Cargo.toml
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@ proc-macro = true
default = []

# for conditional compilation
_rt-async-global-executor = ["sqlx-macros-core/_rt-async-global-executor"]
_rt-async-std = ["sqlx-macros-core/_rt-async-std"]
_rt-tokio = ["sqlx-macros-core/_rt-tokio"]