From 023e956fc8db32ddb5e860779933ef8a2603fed0 Mon Sep 17 00:00:00 2001 From: Qi Date: Wed, 16 Jul 2025 20:44:22 +0800 Subject: [PATCH 1/2] time: delay the `Arc::clone` until registering timer There are two usage of this handle for timer: 1. Ensure the time driver is enabled. 2. Registering or clear the entry from the global wheel. For (1), we just need the `&Handle`, no need to make a clone. For (2), we can delay the `.clone()` until we are about to register the entry. Delaying the `Arc::clone` improves the performance on multi-core machine. Signed-off-by: ADD-SP --- tokio/src/runtime/driver.rs | 2 +- tokio/src/runtime/scheduler/mod.rs | 29 ++++++++++++ tokio/src/runtime/time/entry.rs | 65 ++++++++++++++------------- tokio/src/runtime/time/tests/mod.rs | 68 +++++++++++++++++------------ tokio/src/time/sleep.rs | 23 ++++++---- tokio/src/util/error.rs | 3 ++ 6 files changed, 122 insertions(+), 68 deletions(-) diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs index f06b70427ce..99ef747bbfb 100644 --- a/tokio/src/runtime/driver.rs +++ b/tokio/src/runtime/driver.rs @@ -110,7 +110,7 @@ impl Handle { pub(crate) fn time(&self) -> &crate::runtime::time::Handle { self.time .as_ref() - .expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.") + .expect(crate::util::error::TIME_DISABLED_ERROR) } pub(crate) fn clock(&self) -> &Clock { diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index ecd56aeee10..cf8b2a20d8a 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -94,6 +94,24 @@ cfg_rt! { } } + /// # Panics + /// + /// Panics if the current [`Context`] is not available + /// in the current thread. + // remove this `allow(dead_code)` when this method + // is used by other other modules except the `time`. + #[cfg_attr(not(feature = "time"), allow(dead_code))] + #[track_caller] + pub(crate) fn with_current(f: F) -> R + where + F: FnOnce(&Handle) -> R, + { + match context::with_current(|hdl| f(hdl)) { + Ok(ret) => ret, + Err(e) => panic!("{e}"), + } + } + pub(crate) fn blocking_spawner(&self) -> &blocking::Spawner { match_flavor!(self, Handle(h) => &h.blocking_spawner) } @@ -268,8 +286,19 @@ cfg_not_rt! { ))] impl Handle { #[track_caller] + #[cfg_attr(feature = "time", allow(dead_code))] pub(crate) fn current() -> Handle { panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR) } + + cfg_time! { + #[track_caller] + pub(crate) fn with_current(_f: F) -> R + where + F: FnOnce(&Handle) -> R, + { + panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR) + } + } } } diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index 627fcbc5ec3..e53ed823c60 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -59,8 +59,10 @@ use crate::loom::sync::atomic::AtomicU64; use crate::loom::sync::atomic::Ordering; use crate::runtime::scheduler; +use crate::runtime::time; use crate::sync::AtomicWaker; use crate::time::Instant; +use crate::util::error::{RUNTIME_SHUTTING_DOWN_ERROR, TIME_DISABLED_ERROR}; use crate::util::linked_list; use pin_project_lite::pin_project; @@ -285,9 +287,6 @@ pin_project! { // before polling. #[derive(Debug)] pub(crate) struct TimerEntry { - // Arc reference to the runtime handle. We can only free the driver after - // deregistering everything from their respective timer wheels. - driver: scheduler::Handle, // Shared inner structure; this is part of an intrusive linked list, and // therefore other references can exist to it while mutable references to // Entry exist. @@ -340,6 +339,10 @@ pub(crate) struct TimerShared { /// Only accessed under the entry lock. pointers: linked_list::Pointers, + // Arc reference to the runtime handle. We can only free the driver after + // deregistering everything from their respective timer wheels. + driver: scheduler::Handle, + /// The time when the [`TimerEntry`] was registered into the Wheel, /// [`STATE_DEREGISTERED`] means it is not registered. /// @@ -384,11 +387,19 @@ generate_addr_of_methods! { impl TimerShared { pub(super) fn new() -> Self { - Self { - registered_when: AtomicU64::new(0), - pointers: linked_list::Pointers::new(), - state: StateCell::default(), - _p: PhantomPinned, + // ensure both scheduler handle and time driver are available, + // otherwise panic + let maybe_hdl = + scheduler::Handle::with_current(|hdl| hdl.driver().time.as_ref().map(|_| hdl.clone())); + match maybe_hdl { + Some(hdl) => Self { + driver: hdl, + registered_when: AtomicU64::new(0), + pointers: linked_list::Pointers::new(), + state: StateCell::default(), + _p: PhantomPinned, + }, + None => panic!("{TIME_DISABLED_ERROR}"), } } @@ -453,6 +464,10 @@ impl TimerShared { pub(super) fn might_be_registered(&self) -> bool { self.state.might_be_registered() } + + fn driver(&self) -> &time::Handle { + self.driver.driver().time() + } } unsafe impl linked_list::Link for TimerShared { @@ -479,12 +494,8 @@ unsafe impl linked_list::Link for TimerShared { impl TimerEntry { #[track_caller] - pub(crate) fn new(handle: scheduler::Handle, deadline: Instant) -> Self { - // Panic if the time driver is not enabled - let _ = handle.driver().time(); - + pub(crate) fn new(deadline: Instant) -> Self { Self { - driver: handle, inner: None, deadline, registered: false, @@ -565,7 +576,7 @@ impl TimerEntry { // driver did so far and happens-before everything the driver does in // the future. While we have the lock held, we also go ahead and // deregister the entry if necessary. - unsafe { self.driver().clear_entry(NonNull::from(inner)) }; + unsafe { inner.driver().clear_entry(NonNull::from(inner)) }; } pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant, reregister: bool) { @@ -573,7 +584,6 @@ impl TimerEntry { *this.deadline = new_time; *this.registered = reregister; - let tick = self.driver().time_source().deadline_to_tick(new_time); let inner = match self.inner() { Some(inner) => inner, None => { @@ -582,6 +592,7 @@ impl TimerEntry { .expect("inner should already be initialized by `this.init_inner()`") } }; + let tick = inner.driver().time_source().deadline_to_tick(new_time); if inner.extend_expiration(tick).is_ok() { return; @@ -589,8 +600,9 @@ impl TimerEntry { if reregister { unsafe { - self.driver() - .reregister(&self.driver.driver().io, tick, inner.into()); + inner + .driver() + .reregister(&inner.driver.driver().io, tick, inner.into()); } } } @@ -599,12 +611,6 @@ impl TimerEntry { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - assert!( - !self.driver().is_shutdown(), - "{}", - crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR - ); - if !self.registered { let deadline = self.deadline; self.as_mut().reset(deadline, true); @@ -613,16 +619,13 @@ impl TimerEntry { let inner = self .inner() .expect("inner should already be initialized by `self.reset()`"); - inner.state.poll(cx.waker()) - } - pub(crate) fn driver(&self) -> &super::Handle { - self.driver.driver().time() - } + assert!( + !inner.driver().is_shutdown(), + "{RUNTIME_SHUTTING_DOWN_ERROR}" + ); - #[cfg(all(tokio_unstable, feature = "tracing"))] - pub(crate) fn clock(&self) -> &super::Clock { - self.driver.driver().clock() + inner.state.poll(cx.waker()) } } diff --git a/tokio/src/runtime/time/tests/mod.rs b/tokio/src/runtime/time/tests/mod.rs index 33c4a5366d1..15125c41db2 100644 --- a/tokio/src/runtime/time/tests/mod.rs +++ b/tokio/src/runtime/time/tests/mod.rs @@ -1,5 +1,6 @@ #![cfg(not(target_os = "wasi"))] +use std::future::poll_fn; use std::{task::Context, time::Duration}; #[cfg(not(loom))] @@ -45,16 +46,19 @@ fn single_timer() { model(|| { let rt = rt(false); let handle = rt.handle(); + let handle_clone = handle.clone(); - let handle_ = handle.clone(); let jh = thread::spawn(move || { - let entry = TimerEntry::new( - handle_.inner.clone(), - handle_.inner.driver().clock().now() + Duration::from_secs(1), - ); + let _guard = handle_clone.enter(); + let clock = handle_clone.inner.driver().clock(); + let entry = TimerEntry::new(clock.now() + Duration::from_secs(1)); pin!(entry); - block_on(std::future::poll_fn(|cx| entry.as_mut().poll_elapsed(cx))).unwrap(); + block_on(poll_fn(|cx| { + let _guard = handle_clone.enter(); + entry.as_mut().poll_elapsed(cx) + })) + .unwrap(); }); thread::yield_now(); @@ -74,13 +78,13 @@ fn drop_timer() { model(|| { let rt = rt(false); let handle = rt.handle(); + let handle_clone = handle.clone(); - let handle_ = handle.clone(); let jh = thread::spawn(move || { - let entry = TimerEntry::new( - handle_.inner.clone(), - handle_.inner.driver().clock().now() + Duration::from_secs(1), - ); + let _guard = handle_clone.enter(); + + let clock = handle_clone.inner.driver().clock(); + let entry = TimerEntry::new(clock.now() + Duration::from_secs(1)); pin!(entry); let _ = entry @@ -108,20 +112,24 @@ fn change_waker() { model(|| { let rt = rt(false); let handle = rt.handle(); + let handle_clone = handle.clone(); - let handle_ = handle.clone(); let jh = thread::spawn(move || { - let entry = TimerEntry::new( - handle_.inner.clone(), - handle_.inner.driver().clock().now() + Duration::from_secs(1), - ); + let _guard = handle_clone.enter(); + + let clock = handle_clone.inner.driver().clock(); + let entry = TimerEntry::new(clock.now() + Duration::from_secs(1)); pin!(entry); let _ = entry .as_mut() .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref())); - block_on(std::future::poll_fn(|cx| entry.as_mut().poll_elapsed(cx))).unwrap(); + block_on(poll_fn(|cx| { + let _guard = handle_clone.enter(); + entry.as_mut().poll_elapsed(cx) + })) + .unwrap(); }); thread::yield_now(); @@ -143,13 +151,15 @@ fn reset_future() { let rt = rt(false); let handle = rt.handle(); + let handle_clone = handle.clone(); - let handle_ = handle.clone(); let finished_early_ = finished_early.clone(); let start = handle.inner.driver().clock().now(); let jh = thread::spawn(move || { - let entry = TimerEntry::new(handle_.inner.clone(), start + Duration::from_secs(1)); + let _guard = handle_clone.enter(); + + let entry = TimerEntry::new(start + Duration::from_secs(1)); pin!(entry); let _ = entry @@ -159,7 +169,11 @@ fn reset_future() { entry.as_mut().reset(start + Duration::from_secs(2), true); // shouldn't complete before 2s - block_on(std::future::poll_fn(|cx| entry.as_mut().poll_elapsed(cx))).unwrap(); + block_on(poll_fn(|cx| { + let _guard = handle_clone.enter(); + entry.as_mut().poll_elapsed(cx) + })) + .unwrap(); finished_early_.store(true, Ordering::Relaxed); }); @@ -202,14 +216,13 @@ fn normal_or_miri(normal: T, miri: T) -> T { fn poll_process_levels() { let rt = rt(true); let handle = rt.handle(); + let clock = handle.inner.driver().clock(); + let _guard = handle.enter(); let mut entries = vec![]; for i in 0..normal_or_miri(1024, 64) { - let mut entry = Box::pin(TimerEntry::new( - handle.inner.clone(), - handle.inner.driver().clock().now() + Duration::from_millis(i), - )); + let mut entry = Box::pin(TimerEntry::new(clock.now() + Duration::from_millis(i))); let _ = entry .as_mut() @@ -239,11 +252,10 @@ fn poll_process_levels_targeted() { let rt = rt(true); let handle = rt.handle(); + let clock = handle.inner.driver().clock(); + let _guard = handle.enter(); - let e1 = TimerEntry::new( - handle.inner.clone(), - handle.inner.driver().clock().now() + Duration::from_millis(193), - ); + let e1 = TimerEntry::new(clock.now() + Duration::from_millis(193)); pin!(e1); let handle = handle.inner.driver().time(); diff --git a/tokio/src/time/sleep.rs b/tokio/src/time/sleep.rs index 1e3fe80d127..eb81bb28ae6 100644 --- a/tokio/src/time/sleep.rs +++ b/tokio/src/time/sleep.rs @@ -1,7 +1,9 @@ use crate::runtime::time::TimerEntry; use crate::time::{error::Error, Duration, Instant}; +use crate::util::error::TIME_DISABLED_ERROR; use crate::util::trace; +use crate::runtime::scheduler; use pin_project_lite::pin_project; use std::future::Future; use std::panic::Location; @@ -251,9 +253,11 @@ impl Sleep { deadline: Instant, location: Option<&'static Location<'static>>, ) -> Sleep { - use crate::runtime::scheduler; - let handle = scheduler::Handle::current(); - let entry = TimerEntry::new(handle, deadline); + // ensure both scheduler handle and time driver are available, + // otherwise panic + let is_time_enabled = scheduler::Handle::with_current(|hdl| hdl.driver().time.is_some()); + assert!(is_time_enabled, "{TIME_DISABLED_ERROR}"); + let entry = TimerEntry::new(deadline); #[cfg(all(tokio_unstable, feature = "tracing"))] let inner = { let handle = scheduler::Handle::current(); @@ -380,11 +384,14 @@ impl Sleep { tracing::trace_span!("runtime.resource.async_op.poll"); let duration = { - let clock = me.entry.clock(); - let time_source = me.entry.driver().time_source(); - let now = time_source.now(clock); - let deadline_tick = time_source.deadline_to_tick(deadline); - deadline_tick.saturating_sub(now) + scheduler::Handle::with_current(|hdl| { + let driver = hdl.driver(); + let clock = driver.clock(); + let time_source = driver.time().time_source(); + let now = time_source.now(clock); + let deadline_tick = time_source.deadline_to_tick(deadline); + deadline_tick.saturating_sub(now) + }) }; tracing::trace!( diff --git a/tokio/src/util/error.rs b/tokio/src/util/error.rs index ebb27f6385f..6536ad4a69c 100644 --- a/tokio/src/util/error.rs +++ b/tokio/src/util/error.rs @@ -14,3 +14,6 @@ pub(crate) const RUNTIME_SHUTTING_DOWN_ERROR: &str = /// destructors of other thread-locals. pub(crate) const THREAD_LOCAL_DESTROYED_ERROR: &str = "The Tokio context thread-local variable has been destroyed."; + +pub(crate) const TIME_DISABLED_ERROR: &str = + "A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers."; From bc04f002614b2418465bd2279d59211f26184b4d Mon Sep 17 00:00:00 2001 From: Qi Date: Sun, 27 Jul 2025 15:14:31 +0800 Subject: [PATCH 2/2] ci: trigger ci using an empty commit Signed-off-by: ADD-SP