Skip to content

Commit 5af6738

Browse files
committed
Convert disconnect_pool to an async function
1 parent 3b4d142 commit 5af6738

File tree

1 file changed

+19
-54
lines changed

1 file changed

+19
-54
lines changed

src/conn/pool/futures/disconnect_pool.rs

Lines changed: 19 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -7,73 +7,38 @@
77
// modified, or distributed except according to those terms.
88

99
use std::{
10-
future::Future,
11-
pin::Pin,
10+
future::poll_fn,
11+
sync::atomic,
1212
task::{Context, Poll},
1313
};
1414

15-
use futures_core::ready;
16-
use tokio::sync::mpsc::UnboundedSender;
17-
1815
use crate::{
19-
conn::pool::{Inner, Pool, QUEUE_END_ID},
16+
conn::pool::{Pool, QUEUE_END_ID},
2017
error::Error,
21-
Conn,
2218
};
2319

24-
use std::sync::{atomic, Arc};
25-
26-
/// Future that disconnects this pool from a server and resolves to `()`.
20+
/// Disconnect this pool from a server and resolves to `()`.
2721
///
28-
/// **Note:** This Future won't resolve until all active connections, taken from it,
22+
/// **Note:** This won't resolve until all active connections, taken from the poll,
2923
/// are dropped or disonnected. Also all pending and new `GetConn`'s will resolve to error.
30-
#[derive(Debug)]
31-
#[must_use = "futures do nothing unless you `.await` or poll them"]
32-
struct DisconnectPool {
33-
pool_inner: Arc<Inner>,
34-
drop: Option<UnboundedSender<Option<Conn>>>,
35-
}
36-
37-
impl DisconnectPool {
38-
fn new(pool: Pool) -> Self {
39-
Self {
40-
pool_inner: pool.inner,
41-
drop: Some(pool.drop),
42-
}
43-
}
44-
}
24+
pub(crate) async fn disconnect_pool(pool: Pool) -> Result<(), Error> {
25+
let inner = pool.inner;
26+
let drop = pool.drop;
4527

46-
impl Future for DisconnectPool {
47-
type Output = Result<(), Error>;
28+
inner.close.store(true, atomic::Ordering::Release);
4829

49-
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
50-
self.pool_inner.close.store(true, atomic::Ordering::Release);
51-
let mut exchange = self.pool_inner.exchange.lock().unwrap();
52-
exchange.spawn_futures_if_needed(&self.pool_inner);
30+
let f = |cx: &mut Context| {
31+
let mut exchange = inner.exchange.lock().unwrap();
32+
exchange.spawn_futures_if_needed(&inner);
5333
exchange.waiting.push(cx.waker().clone(), QUEUE_END_ID);
54-
drop(exchange);
34+
Poll::Ready(())
35+
};
36+
poll_fn(f).await;
5537

56-
if self.pool_inner.closed.load(atomic::Ordering::Acquire) {
57-
Poll::Ready(Ok(()))
58-
} else {
59-
match self.drop.take() {
60-
Some(drop) => match drop.send(None) {
61-
Ok(_) => {
62-
// Recycler is alive. Waiting for it to finish.
63-
ready!(Box::pin(drop.closed()).as_mut().poll(cx));
64-
Poll::Ready(Ok(()))
65-
}
66-
Err(_) => {
67-
// Recycler seem dead. No one will wake us.
68-
Poll::Ready(Ok(()))
69-
}
70-
},
71-
None => Poll::Pending,
72-
}
73-
}
38+
if !inner.closed.load(atomic::Ordering::Acquire) && drop.send(None).is_ok() {
39+
// Recycler is alive. Wait for it to finish.
40+
drop.closed().await;
7441
}
75-
}
7642

77-
pub(crate) async fn disconnect_pool(pool: Pool) -> Result<(), Error> {
78-
DisconnectPool::new(pool).await
43+
Ok(())
7944
}

0 commit comments

Comments
 (0)