Skip to content
2 changes: 1 addition & 1 deletion tokio/src/runtime/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl Handle {
pub(crate) fn time(&self) -> &crate::runtime::time::Handle {
self.time
.as_ref()
.expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.")
.expect(crate::util::error::TIME_DISABLED_ERROR)
}

pub(crate) fn clock(&self) -> &Clock {
Expand Down
29 changes: 29 additions & 0 deletions tokio/src/runtime/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,24 @@ cfg_rt! {
}
}

/// # Panics
///
/// Panics if the current [`Context`] is not available
/// in the current thread.
// remove this `allow(dead_code)` when this method
// is used by other other modules except the `time`.
#[cfg_attr(not(feature = "time"), allow(dead_code))]
#[track_caller]
pub(crate) fn with_current<F, R>(f: F) -> R
where
F: FnOnce(&Handle) -> R,
{
match context::with_current(|hdl| f(hdl)) {
Ok(ret) => ret,
Err(e) => panic!("{e}"),
}
}

pub(crate) fn blocking_spawner(&self) -> &blocking::Spawner {
match_flavor!(self, Handle(h) => &h.blocking_spawner)
}
Expand Down Expand Up @@ -268,8 +286,19 @@ cfg_not_rt! {
))]
impl Handle {
#[track_caller]
#[cfg_attr(feature = "time", allow(dead_code))]
pub(crate) fn current() -> Handle {
panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
}

cfg_time! {
#[track_caller]
pub(crate) fn with_current<F, R>(_f: F) -> R
where
F: FnOnce(&Handle) -> R,
{
panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
}
}
}
}
65 changes: 34 additions & 31 deletions tokio/src/runtime/time/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ use crate::loom::sync::atomic::AtomicU64;
use crate::loom::sync::atomic::Ordering;

use crate::runtime::scheduler;
use crate::runtime::time;
use crate::sync::AtomicWaker;
use crate::time::Instant;
use crate::util::error::{RUNTIME_SHUTTING_DOWN_ERROR, TIME_DISABLED_ERROR};
use crate::util::linked_list;

use pin_project_lite::pin_project;
Expand Down Expand Up @@ -285,9 +287,6 @@ pin_project! {
// before polling.
#[derive(Debug)]
pub(crate) struct TimerEntry {
// Arc reference to the runtime handle. We can only free the driver after
// deregistering everything from their respective timer wheels.
driver: scheduler::Handle,
// Shared inner structure; this is part of an intrusive linked list, and
// therefore other references can exist to it while mutable references to
// Entry exist.
Expand Down Expand Up @@ -340,6 +339,10 @@ pub(crate) struct TimerShared {
/// Only accessed under the entry lock.
pointers: linked_list::Pointers<TimerShared>,

// Arc reference to the runtime handle. We can only free the driver after
// deregistering everything from their respective timer wheels.
driver: scheduler::Handle,

/// The time when the [`TimerEntry`] was registered into the Wheel,
/// [`STATE_DEREGISTERED`] means it is not registered.
///
Expand Down Expand Up @@ -384,11 +387,19 @@ generate_addr_of_methods! {

impl TimerShared {
pub(super) fn new() -> Self {
Self {
registered_when: AtomicU64::new(0),
pointers: linked_list::Pointers::new(),
state: StateCell::default(),
_p: PhantomPinned,
// ensure both scheduler handle and time driver are available,
// otherwise panic
let maybe_hdl =
scheduler::Handle::with_current(|hdl| hdl.driver().time.as_ref().map(|_| hdl.clone()));
match maybe_hdl {
Some(hdl) => Self {
driver: hdl,
registered_when: AtomicU64::new(0),
pointers: linked_list::Pointers::new(),
state: StateCell::default(),
_p: PhantomPinned,
},
None => panic!("{TIME_DISABLED_ERROR}"),
}
}

Expand Down Expand Up @@ -453,6 +464,10 @@ impl TimerShared {
pub(super) fn might_be_registered(&self) -> bool {
self.state.might_be_registered()
}

fn driver(&self) -> &time::Handle {
self.driver.driver().time()
}
}

unsafe impl linked_list::Link for TimerShared {
Expand All @@ -479,12 +494,8 @@ unsafe impl linked_list::Link for TimerShared {

impl TimerEntry {
#[track_caller]
pub(crate) fn new(handle: scheduler::Handle, deadline: Instant) -> Self {
// Panic if the time driver is not enabled
let _ = handle.driver().time();

pub(crate) fn new(deadline: Instant) -> Self {
Self {
driver: handle,
inner: None,
deadline,
registered: false,
Expand Down Expand Up @@ -565,15 +576,14 @@ impl TimerEntry {
// driver did so far and happens-before everything the driver does in
// the future. While we have the lock held, we also go ahead and
// deregister the entry if necessary.
unsafe { self.driver().clear_entry(NonNull::from(inner)) };
unsafe { inner.driver().clear_entry(NonNull::from(inner)) };
}

pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant, reregister: bool) {
let this = self.as_mut().project();
*this.deadline = new_time;
*this.registered = reregister;

let tick = self.driver().time_source().deadline_to_tick(new_time);
let inner = match self.inner() {
Some(inner) => inner,
None => {
Expand All @@ -582,15 +592,17 @@ impl TimerEntry {
.expect("inner should already be initialized by `this.init_inner()`")
}
};
let tick = inner.driver().time_source().deadline_to_tick(new_time);

if inner.extend_expiration(tick).is_ok() {
return;
}

if reregister {
unsafe {
self.driver()
.reregister(&self.driver.driver().io, tick, inner.into());
inner
.driver()
.reregister(&inner.driver.driver().io, tick, inner.into());
}
}
}
Expand All @@ -599,12 +611,6 @@ impl TimerEntry {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), super::Error>> {
assert!(
!self.driver().is_shutdown(),
"{}",
crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR
);

if !self.registered {
let deadline = self.deadline;
self.as_mut().reset(deadline, true);
Expand All @@ -613,16 +619,13 @@ impl TimerEntry {
let inner = self
.inner()
.expect("inner should already be initialized by `self.reset()`");
inner.state.poll(cx.waker())
}

pub(crate) fn driver(&self) -> &super::Handle {
self.driver.driver().time()
}
assert!(
!inner.driver().is_shutdown(),
"{RUNTIME_SHUTTING_DOWN_ERROR}"
);

#[cfg(all(tokio_unstable, feature = "tracing"))]
pub(crate) fn clock(&self) -> &super::Clock {
self.driver.driver().clock()
inner.state.poll(cx.waker())
}
}

Expand Down
68 changes: 40 additions & 28 deletions tokio/src/runtime/time/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![cfg(not(target_os = "wasi"))]

use std::future::poll_fn;
use std::{task::Context, time::Duration};

#[cfg(not(loom))]
Expand Down Expand Up @@ -45,16 +46,19 @@ fn single_timer() {
model(|| {
let rt = rt(false);
let handle = rt.handle();
let handle_clone = handle.clone();

let handle_ = handle.clone();
let jh = thread::spawn(move || {
let entry = TimerEntry::new(
handle_.inner.clone(),
handle_.inner.driver().clock().now() + Duration::from_secs(1),
);
let _guard = handle_clone.enter();
let clock = handle_clone.inner.driver().clock();
let entry = TimerEntry::new(clock.now() + Duration::from_secs(1));
pin!(entry);

block_on(std::future::poll_fn(|cx| entry.as_mut().poll_elapsed(cx))).unwrap();
block_on(poll_fn(|cx| {
let _guard = handle_clone.enter();
entry.as_mut().poll_elapsed(cx)
}))
.unwrap();
});

thread::yield_now();
Expand All @@ -74,13 +78,13 @@ fn drop_timer() {
model(|| {
let rt = rt(false);
let handle = rt.handle();
let handle_clone = handle.clone();

let handle_ = handle.clone();
let jh = thread::spawn(move || {
let entry = TimerEntry::new(
handle_.inner.clone(),
handle_.inner.driver().clock().now() + Duration::from_secs(1),
);
let _guard = handle_clone.enter();

let clock = handle_clone.inner.driver().clock();
let entry = TimerEntry::new(clock.now() + Duration::from_secs(1));
pin!(entry);

let _ = entry
Expand Down Expand Up @@ -108,20 +112,24 @@ fn change_waker() {
model(|| {
let rt = rt(false);
let handle = rt.handle();
let handle_clone = handle.clone();

let handle_ = handle.clone();
let jh = thread::spawn(move || {
let entry = TimerEntry::new(
handle_.inner.clone(),
handle_.inner.driver().clock().now() + Duration::from_secs(1),
);
let _guard = handle_clone.enter();

let clock = handle_clone.inner.driver().clock();
let entry = TimerEntry::new(clock.now() + Duration::from_secs(1));
pin!(entry);

let _ = entry
.as_mut()
.poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));

block_on(std::future::poll_fn(|cx| entry.as_mut().poll_elapsed(cx))).unwrap();
block_on(poll_fn(|cx| {
let _guard = handle_clone.enter();
entry.as_mut().poll_elapsed(cx)
}))
.unwrap();
});

thread::yield_now();
Expand All @@ -143,13 +151,15 @@ fn reset_future() {

let rt = rt(false);
let handle = rt.handle();
let handle_clone = handle.clone();

let handle_ = handle.clone();
let finished_early_ = finished_early.clone();
let start = handle.inner.driver().clock().now();

let jh = thread::spawn(move || {
let entry = TimerEntry::new(handle_.inner.clone(), start + Duration::from_secs(1));
let _guard = handle_clone.enter();

let entry = TimerEntry::new(start + Duration::from_secs(1));
pin!(entry);

let _ = entry
Expand All @@ -159,7 +169,11 @@ fn reset_future() {
entry.as_mut().reset(start + Duration::from_secs(2), true);

// shouldn't complete before 2s
block_on(std::future::poll_fn(|cx| entry.as_mut().poll_elapsed(cx))).unwrap();
block_on(poll_fn(|cx| {
let _guard = handle_clone.enter();
entry.as_mut().poll_elapsed(cx)
}))
.unwrap();

finished_early_.store(true, Ordering::Relaxed);
});
Expand Down Expand Up @@ -202,14 +216,13 @@ fn normal_or_miri<T>(normal: T, miri: T) -> T {
fn poll_process_levels() {
let rt = rt(true);
let handle = rt.handle();
let clock = handle.inner.driver().clock();
let _guard = handle.enter();

let mut entries = vec![];

for i in 0..normal_or_miri(1024, 64) {
let mut entry = Box::pin(TimerEntry::new(
handle.inner.clone(),
handle.inner.driver().clock().now() + Duration::from_millis(i),
));
let mut entry = Box::pin(TimerEntry::new(clock.now() + Duration::from_millis(i)));

let _ = entry
.as_mut()
Expand Down Expand Up @@ -239,11 +252,10 @@ fn poll_process_levels_targeted() {

let rt = rt(true);
let handle = rt.handle();
let clock = handle.inner.driver().clock();
let _guard = handle.enter();

let e1 = TimerEntry::new(
handle.inner.clone(),
handle.inner.driver().clock().now() + Duration::from_millis(193),
);
let e1 = TimerEntry::new(clock.now() + Duration::from_millis(193));
pin!(e1);

let handle = handle.inner.driver().time();
Expand Down
23 changes: 15 additions & 8 deletions tokio/src/time/sleep.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::runtime::time::TimerEntry;
use crate::time::{error::Error, Duration, Instant};
use crate::util::error::TIME_DISABLED_ERROR;
use crate::util::trace;

use crate::runtime::scheduler;
use pin_project_lite::pin_project;
use std::future::Future;
use std::panic::Location;
Expand Down Expand Up @@ -251,9 +253,11 @@ impl Sleep {
deadline: Instant,
location: Option<&'static Location<'static>>,
) -> Sleep {
use crate::runtime::scheduler;
let handle = scheduler::Handle::current();
let entry = TimerEntry::new(handle, deadline);
// ensure both scheduler handle and time driver are available,
// otherwise panic
let is_time_enabled = scheduler::Handle::with_current(|hdl| hdl.driver().time.is_some());
assert!(is_time_enabled, "{TIME_DISABLED_ERROR}");
let entry = TimerEntry::new(deadline);
#[cfg(all(tokio_unstable, feature = "tracing"))]
let inner = {
let handle = scheduler::Handle::current();
Expand Down Expand Up @@ -380,11 +384,14 @@ impl Sleep {
tracing::trace_span!("runtime.resource.async_op.poll");

let duration = {
let clock = me.entry.clock();
let time_source = me.entry.driver().time_source();
let now = time_source.now(clock);
let deadline_tick = time_source.deadline_to_tick(deadline);
deadline_tick.saturating_sub(now)
scheduler::Handle::with_current(|hdl| {
let driver = hdl.driver();
let clock = driver.clock();
let time_source = driver.time().time_source();
let now = time_source.now(clock);
let deadline_tick = time_source.deadline_to_tick(deadline);
deadline_tick.saturating_sub(now)
})
};

tracing::trace!(
Expand Down
Loading
Loading