@@ -161,17 +161,18 @@ mod tests;
161161use crate :: any:: Any ;
162162use crate :: cell:: UnsafeCell ;
163163use crate :: ffi:: CStr ;
164+ use crate :: future:: Future ;
164165use crate :: marker:: PhantomData ;
165166use crate :: mem:: { self , ManuallyDrop , forget} ;
166167use crate :: num:: NonZero ;
167168use crate :: pin:: Pin ;
168- use crate :: sync:: Arc ;
169169use crate :: sync:: atomic:: { AtomicUsize , Ordering } ;
170+ use crate :: sync:: { Arc , Mutex , PoisonError } ;
170171use crate :: sys:: sync:: Parker ;
171172use crate :: sys:: thread as imp;
172173use crate :: sys_common:: { AsInner , IntoInner } ;
173174use crate :: time:: { Duration , Instant } ;
174- use crate :: { env, fmt, io, panic, panicking, str} ;
175+ use crate :: { env, fmt, io, panic, panicking, str, task } ;
175176
176177#[ stable( feature = "scoped_threads" , since = "1.63.0" ) ]
177178mod scoped;
@@ -487,6 +488,7 @@ impl Builder {
487488 let my_packet: Arc < Packet < ' scope , T > > = Arc :: new ( Packet {
488489 scope : scope_data,
489490 result : UnsafeCell :: new ( None ) ,
491+ waker : Mutex :: new ( task:: Waker :: noop ( ) . clone ( ) ) ,
490492 _marker : PhantomData ,
491493 } ) ;
492494 let their_packet = my_packet. clone ( ) ;
@@ -537,15 +539,35 @@ impl Builder {
537539 let try_result = panic:: catch_unwind ( panic:: AssertUnwindSafe ( || {
538540 crate :: sys:: backtrace:: __rust_begin_short_backtrace ( f)
539541 } ) ) ;
542+
543+ // Store the `Result` of the thread that the `JoinHandle` can retrieve.
544+ //
540545 // SAFETY: `their_packet` as been built just above and moved by the
541546 // closure (it is an Arc<...>) and `my_packet` will be stored in the
542547 // same `JoinInner` as this closure meaning the mutation will be
543548 // safe (not modify it and affect a value far away).
544549 unsafe { * their_packet. result . get ( ) = Some ( try_result) } ;
545- // Here `their_packet` gets dropped, and if this is the last `Arc` for that packet that
546- // will call `decrement_num_running_threads` and therefore signal that this thread is
547- // done.
550+
551+ // Fetch the `Waker` from the packet; this is needed to support `.into_join_future()`.
552+ // If unused, this just returns `Waker::noop()` which will do nothing.
553+ let waker: task:: Waker = {
554+ let placeholder = task:: Waker :: noop ( ) . clone ( ) ;
555+ let mut guard = their_packet. waker . lock ( ) . unwrap_or_else ( PoisonError :: into_inner) ;
556+ mem:: replace ( & mut * guard, placeholder)
557+ } ;
558+
559+ // Here `their_packet` gets dropped, and if this is the last `Arc` for that packet
560+ // (which happens if the `JoinHandle` has been dropped) that will call
561+ // `decrement_num_running_threads` and therefore signal to the scope (if there is one)
562+ // that this thread is done.
548563 drop ( their_packet) ;
564+
565+ // Now that we have become visibly “finished” by dropping the packet
566+ // (`JoinInner::is_finished` will return true), we can use the `Waker` to signal
567+ // any waiting `JoinFuture`. If instead we are being waited for by
568+ // `JoinHandle::join()`, the actual platform thread termination will be the wakeup.
569+ waker. wake ( ) ;
570+
549571 // Here, the lifetime `'scope` can end. `main` keeps running for a bit
550572 // after that before returning itself.
551573 } ;
@@ -1189,8 +1211,6 @@ impl ThreadId {
11891211 }
11901212 }
11911213 } else {
1192- use crate :: sync:: { Mutex , PoisonError } ;
1193-
11941214 static COUNTER : Mutex <u64 > = Mutex :: new( 0 ) ;
11951215
11961216 let mut counter = COUNTER . lock( ) . unwrap_or_else( PoisonError :: into_inner) ;
@@ -1571,16 +1591,30 @@ impl fmt::Debug for Thread {
15711591#[ stable( feature = "rust1" , since = "1.0.0" ) ]
15721592pub type Result < T > = crate :: result:: Result < T , Box < dyn Any + Send + ' static > > ;
15731593
1574- // This packet is used to communicate the return value between the spawned
1575- // thread and the rest of the program. It is shared through an `Arc` and
1576- // there's no need for a mutex here because synchronization happens with `join()`
1577- // (the caller will never read this packet until the thread has exited).
1578- //
1579- // An Arc to the packet is stored into a `JoinInner` which in turns is placed
1580- // in `JoinHandle`.
1594+ /// This packet is used to communicate the return value between the spawned
1595+ /// thread and the rest of the program. It is shared through an [`Arc`].
1596+ ///
1597+ /// An Arc to the packet is stored into a [`JoinInner`] which in turn is placed
1598+ /// in [`JoinHandle`] or [`ScopedJoinHandle`].
15811599struct Packet < ' scope , T > {
1600+ /// Communication with the enclosing thread scope if there is one.
15821601 scope : Option < Arc < scoped:: ScopeData > > ,
1602+
1603+ /// Holds the return value.
1604+ ///
1605+ /// Synchronization happens via reference counting: as long as the `Arc<Packet>`
1606+ /// has two or more references, this field is never read, and will only be written
1607+ /// once as the thread terminates. After that happens, either the packet is dropped,
1608+ /// or [`JoinInner::join()`] will `take()` the result value from here.
15831609 result : UnsafeCell < Option < Result < T > > > ,
1610+
1611+ /// If a [`JoinFuture`] for this thread exists and has been polled,
1612+ /// this is the waker from that poll. If it does not exist or has not
1613+ /// been polled yet, this is [`task::Waker::noop()`].
1614+ // FIXME: This should be an `AtomicWaker` instead of a `Mutex`,
1615+ // to be cheaper and impossible to deadlock.
1616+ waker : Mutex < task:: Waker > ,
1617+
15841618 _marker : PhantomData < Option < & ' scope scoped:: ScopeData > > ,
15851619}
15861620
@@ -1634,6 +1668,10 @@ impl<'scope, T> JoinInner<'scope, T> {
16341668 self . native . join ( ) ;
16351669 Arc :: get_mut ( & mut self . packet ) . unwrap ( ) . result . get_mut ( ) . take ( ) . unwrap ( )
16361670 }
1671+
1672+ fn is_finished ( & self ) -> bool {
1673+ Arc :: strong_count ( & self . packet ) == 1
1674+ }
16371675}
16381676
16391677/// An owned permission to join on a thread (block on its termination).
@@ -1780,6 +1818,45 @@ impl<T> JoinHandle<T> {
17801818 self . 0 . join ( )
17811819 }
17821820
1821+ /// Returns a [`Future`] that resolves when the thread has finished.
1822+ ///
1823+ /// Its [output](Future::Output) value is identical to that of [`JoinHandle::join()`];
1824+ /// this is the `async` equivalent of that blocking function.
1825+ ///
1826+ /// If the returned future is dropped (cancelled), the thread will become *detached*;
1827+ /// there will be no way to observe or wait for the thread’s termination.
1828+ /// This is identical to the behavior of `JoinHandle` itself.
1829+ ///
1830+ /// # Example
1831+ ///
1832+ // FIXME: ideally we would actually run this example, with the help of a trivial async executor
1833+ /// ```no_run
1834+ /// #![feature(thread_join_future)]
1835+ /// use std::thread;
1836+ ///
1837+ /// async fn do_some_heavy_tasks_in_parallel() -> thread::Result<()> {
1838+ /// let future_1 = thread::spawn(|| {
1839+ /// // ... do something ...
1840+ /// }).into_join_future();
1841+ /// let future_2 = thread::spawn(|| {
1842+ /// // ... do something else ...
1843+ /// }).into_join_future();
1844+ ///
1845+ /// // Both threads have been started; now await the completion of both.
1846+ /// future_1.await?;
1847+ /// future_2.await?;
1848+ /// Ok(())
1849+ /// }
1850+ /// ```
1851+ #[ unstable( feature = "thread_join_future" , issue = "none" ) ]
1852+ pub fn into_join_future ( self ) -> JoinFuture < ' static , T > {
1853+ // The method is not named `into_future()` to avoid overlapping with the stable
1854+ // `IntoFuture::into_future()`. We're not implementing `IntoFuture` in order to
1855+ // keep this unstable and preserve the *option* of compatibly making this obey structured
1856+ // concurrency via an async-Drop that waits for the thread to end.
1857+ JoinFuture :: new ( self . 0 )
1858+ }
1859+
17831860 /// Checks if the associated thread has finished running its main function.
17841861 ///
17851862 /// `is_finished` supports implementing a non-blocking join operation, by checking
@@ -1792,7 +1869,7 @@ impl<T> JoinHandle<T> {
17921869 /// to return quickly, without blocking for any significant amount of time.
17931870 #[ stable( feature = "thread_is_running" , since = "1.61.0" ) ]
17941871 pub fn is_finished ( & self ) -> bool {
1795- Arc :: strong_count ( & self . 0 . packet ) == 1
1872+ self . 0 . is_finished ( )
17961873 }
17971874}
17981875
@@ -1818,9 +1895,88 @@ impl<T> fmt::Debug for JoinHandle<T> {
18181895fn _assert_sync_and_send ( ) {
18191896 fn _assert_both < T : Send + Sync > ( ) { }
18201897 _assert_both :: < JoinHandle < ( ) > > ( ) ;
1898+ _assert_both :: < JoinFuture < ' static , ( ) > > ( ) ;
18211899 _assert_both :: < Thread > ( ) ;
18221900}
18231901
1902+ /// A [`Future`] that resolves when a thread has finished.
1903+ ///
1904+ /// Its [output](Future::Output) value is identical to that of [`JoinHandle::join()`];
1905+ /// this is the `async` equivalent of that blocking function.
1906+ /// Obtain it by calling [`JoinHandle::into_join_future()`] or
1907+ /// [`ScopedJoinHandle::into_join_future()`].
1908+ ///
1909+ /// If a `JoinFuture` is dropped (cancelled), and the thread does not belong to a [scope],
1910+ /// the associated thread will become *detached*;
1911+ /// there will be no way to observe or wait for the thread’s termination.
1912+ #[ unstable( feature = "thread_join_future" , issue = "none" ) ]
1913+ pub struct JoinFuture < ' scope , T > ( Option < JoinInner < ' scope , T > > ) ;
1914+
1915+ impl < ' scope , T > JoinFuture < ' scope , T > {
1916+ fn new ( inner : JoinInner < ' scope , T > ) -> Self {
1917+ Self ( Some ( inner) )
1918+ }
1919+
1920+ /// Implements the “getting a result” part of joining/polling, without blocking or changing
1921+ /// the `Waker`. Part of the implementation of `poll()`.
1922+ ///
1923+ /// If this returns `Some`, then `self.0` is now `None` and the future will panic
1924+ /// if polled again.
1925+ fn take_result ( & mut self ) -> Option < Result < T > > {
1926+ self . 0 . take_if ( |i| i. is_finished ( ) ) . map ( JoinInner :: join)
1927+ }
1928+ }
1929+
1930+ #[ unstable( feature = "thread_join_future" , issue = "none" ) ]
1931+ impl < T > Future for JoinFuture < ' _ , T > {
1932+ type Output = Result < T > ;
1933+ fn poll ( mut self : Pin < & mut Self > , cx : & mut task:: Context < ' _ > ) -> task:: Poll < Self :: Output > {
1934+ if let Some ( result) = self . take_result ( ) {
1935+ return task:: Poll :: Ready ( result) ;
1936+ }
1937+
1938+ // Update the `Waker` the thread should wake when it completes.
1939+ {
1940+ let Some ( inner) = & mut self . 0 else {
1941+ panic ! ( "polled after complete" ) ;
1942+ } ;
1943+
1944+ let new_waker = cx. waker ( ) ;
1945+
1946+ // Lock the mutex, and ignore the poison state because there are no meaningful ways
1947+ // the existing contents can be corrupted; they will be overwritten completely and the
1948+ // overwrite is atomic-in-the-database-sense.
1949+ let mut current_waker_guard =
1950+ inner. packet . waker . lock ( ) . unwrap_or_else ( PoisonError :: into_inner) ;
1951+
1952+ // Overwrite the waker. Note that we are executing the new waker’s clone and the old
1953+ // waker’s destructor; these could panic (which will merely poison the lock) or hang,
1954+ // which will hold the lock, but the most that can do is prevent the thread from
1955+ // exiting because it's trying to acquire `packet.waker`, which it won't do while
1956+ // holding any *other* locks (...unless the thread’s data includes a lock guard that
1957+ // the waker also wants).
1958+ if !new_waker. will_wake ( & * current_waker_guard) {
1959+ * current_waker_guard = new_waker. clone ( ) ;
1960+ }
1961+ }
1962+
1963+ // Check for completion again in case the thread finished while we were busy
1964+ // setting the waker, to prevent a lost wakeup in that case.
1965+ if let Some ( result) = self . take_result ( ) {
1966+ task:: Poll :: Ready ( result)
1967+ } else {
1968+ task:: Poll :: Pending
1969+ }
1970+ }
1971+ }
1972+
1973+ #[ unstable( feature = "thread_join_future" , issue = "none" ) ]
1974+ impl < T > fmt:: Debug for JoinFuture < ' _ , T > {
1975+ fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
1976+ f. debug_struct ( "JoinHandle" ) . finish_non_exhaustive ( )
1977+ }
1978+ }
1979+
18241980/// Returns an estimate of the default amount of parallelism a program should use.
18251981///
18261982/// Parallelism is a resource. A given machine provides a certain capacity for
0 commit comments