Skip to content

Commit f027913

Browse files
committed
Refactor the waitlist to another module
1 parent 75c51d9 commit f027913

File tree

4 files changed

+149
-130
lines changed

4 files changed

+149
-130
lines changed

src/conn/pool/futures/disconnect_pool.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use futures_core::ready;
1616
use tokio::sync::mpsc::UnboundedSender;
1717

1818
use crate::{
19-
conn::pool::{Inner, Pool, QUEUE_END_ID},
19+
conn::pool::{waitlist::QUEUE_END_ID, Inner, Pool},
2020
error::Error,
2121
Conn,
2222
};

src/conn/pool/futures/get_conn.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use {
2222

2323
use crate::{
2424
conn::{
25-
pool::{Pool, QueueId},
25+
pool::{waitlist::QueueId, Pool},
2626
Conn,
2727
},
2828
error::*,

src/conn/pool/mod.rs

Lines changed: 8 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,13 @@
77
// modified, or distributed except according to those terms.
88

99
use futures_util::FutureExt;
10-
use keyed_priority_queue::KeyedPriorityQueue;
1110
use tokio::sync::mpsc;
1211

1312
use std::{
14-
borrow::Borrow,
15-
cmp::Reverse,
1613
collections::VecDeque,
17-
hash::{Hash, Hasher},
1814
str::FromStr,
1915
sync::{atomic, Arc, Mutex},
20-
task::{Context, Poll, Waker},
16+
task::{Context, Poll},
2117
time::{Duration, Instant},
2218
};
2319

@@ -29,12 +25,14 @@ use crate::{
2925
};
3026

3127
pub use metrics::Metrics;
28+
use waitlist::{QueueId, Waitlist};
3229

3330
mod recycler;
3431
// this is a really unfortunate name for a module
3532
pub mod futures;
3633
mod metrics;
3734
mod ttl_check_inerval;
35+
mod waitlist;
3836

3937
/// Connection that is idling in the pool.
4038
#[derive(Debug)]
@@ -104,86 +102,6 @@ impl Exchange {
104102
}
105103
}
106104

107-
#[derive(Default, Debug)]
108-
struct Waitlist {
109-
queue: KeyedPriorityQueue<QueuedWaker, QueueId>,
110-
}
111-
112-
impl Waitlist {
113-
/// Returns `true` if pushed.
114-
fn push(&mut self, waker: Waker, queue_id: QueueId) -> bool {
115-
// The documentation of Future::poll says:
116-
// Note that on multiple calls to poll, only the Waker from
117-
// the Context passed to the most recent call should be
118-
// scheduled to receive a wakeup.
119-
//
120-
// But the the documentation of KeyedPriorityQueue::push says:
121-
// Adds new element to queue if missing key or replace its
122-
// priority if key exists. In second case doesn’t replace key.
123-
//
124-
// This means we have to remove first to have the most recent
125-
// waker in the queue.
126-
let occupied = self.remove(queue_id);
127-
self.queue.push(QueuedWaker { queue_id, waker }, queue_id);
128-
!occupied
129-
}
130-
131-
fn pop(&mut self) -> Option<Waker> {
132-
match self.queue.pop() {
133-
Some((qw, _)) => Some(qw.waker),
134-
None => None,
135-
}
136-
}
137-
138-
/// Returns `true` if removed.
139-
fn remove(&mut self, id: QueueId) -> bool {
140-
self.queue.remove(&id).is_some()
141-
}
142-
143-
fn peek_id(&mut self) -> Option<QueueId> {
144-
self.queue.peek().map(|(qw, _)| qw.queue_id)
145-
}
146-
}
147-
148-
const QUEUE_END_ID: QueueId = QueueId(Reverse(u64::MAX));
149-
150-
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
151-
pub(crate) struct QueueId(Reverse<u64>);
152-
153-
impl QueueId {
154-
fn next() -> Self {
155-
static NEXT_QUEUE_ID: atomic::AtomicU64 = atomic::AtomicU64::new(0);
156-
let id = NEXT_QUEUE_ID.fetch_add(1, atomic::Ordering::SeqCst);
157-
QueueId(Reverse(id))
158-
}
159-
}
160-
161-
#[derive(Debug)]
162-
struct QueuedWaker {
163-
queue_id: QueueId,
164-
waker: Waker,
165-
}
166-
167-
impl Eq for QueuedWaker {}
168-
169-
impl Borrow<QueueId> for QueuedWaker {
170-
fn borrow(&self) -> &QueueId {
171-
&self.queue_id
172-
}
173-
}
174-
175-
impl PartialEq for QueuedWaker {
176-
fn eq(&self, other: &Self) -> bool {
177-
self.queue_id == other.queue_id
178-
}
179-
}
180-
181-
impl Hash for QueuedWaker {
182-
fn hash<H: Hasher>(&self, state: &mut H) {
183-
self.queue_id.hash(state)
184-
}
185-
}
186-
187105
/// Connection pool data.
188106
#[derive(Debug)]
189107
pub struct Inner {
@@ -474,20 +392,16 @@ mod test {
474392
use waker_fn::waker_fn;
475393

476394
use std::{
477-
cmp::Reverse,
478395
future::Future,
479396
pin::pin,
480397
sync::{Arc, OnceLock},
481-
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
398+
task::{Context, Poll},
482399
time::Duration,
483400
};
484401

485402
use crate::{
486-
conn::pool::{Pool, QueueId, Waitlist, QUEUE_END_ID},
487-
opts::PoolOpts,
488-
prelude::*,
489-
test_misc::get_opts,
490-
PoolConstraints, Row, TxOpts, Value,
403+
conn::pool::Pool, opts::PoolOpts, prelude::*, test_misc::get_opts, PoolConstraints, Row,
404+
TxOpts, Value,
491405
};
492406

493407
macro_rules! conn_ex_field {
@@ -1016,7 +930,7 @@ mod test {
1016930
}
1017931
drop(only_conn);
1018932

1019-
assert_eq!(0, pool.inner.exchange.lock().unwrap().waiting.queue.len());
933+
assert_eq!(0, pool.inner.exchange.lock().unwrap().waiting.len());
1020934
// metrics should catch up with waiting queue (see #335)
1021935
assert_eq!(
1022936
0,
@@ -1070,40 +984,6 @@ mod test {
1070984
Ok(())
1071985
}
1072986

1073-
#[test]
1074-
fn waitlist_integrity() {
1075-
const DATA: *const () = &();
1076-
const NOOP_CLONE_FN: unsafe fn(*const ()) -> RawWaker = |_| RawWaker::new(DATA, &RW_VTABLE);
1077-
const NOOP_FN: unsafe fn(*const ()) = |_| {};
1078-
static RW_VTABLE: RawWakerVTable =
1079-
RawWakerVTable::new(NOOP_CLONE_FN, NOOP_FN, NOOP_FN, NOOP_FN);
1080-
let w = unsafe { Waker::from_raw(RawWaker::new(DATA, &RW_VTABLE)) };
1081-
1082-
let mut waitlist = Waitlist::default();
1083-
assert_eq!(0, waitlist.queue.len());
1084-
1085-
waitlist.push(w.clone(), QueueId(Reverse(4)));
1086-
waitlist.push(w.clone(), QueueId(Reverse(2)));
1087-
waitlist.push(w.clone(), QueueId(Reverse(8)));
1088-
waitlist.push(w.clone(), QUEUE_END_ID);
1089-
waitlist.push(w.clone(), QueueId(Reverse(10)));
1090-
1091-
waitlist.remove(QueueId(Reverse(8)));
1092-
1093-
assert_eq!(4, waitlist.queue.len());
1094-
1095-
let (_, id) = waitlist.queue.pop().unwrap();
1096-
assert_eq!(2, id.0 .0);
1097-
let (_, id) = waitlist.queue.pop().unwrap();
1098-
assert_eq!(4, id.0 .0);
1099-
let (_, id) = waitlist.queue.pop().unwrap();
1100-
assert_eq!(10, id.0 .0);
1101-
let (_, id) = waitlist.queue.pop().unwrap();
1102-
assert_eq!(QUEUE_END_ID, id);
1103-
1104-
assert_eq!(0, waitlist.queue.len());
1105-
}
1106-
1107987
#[tokio::test]
1108988
async fn check_absolute_connection_ttl() -> super::Result<()> {
1109989
let constraints = PoolConstraints::new(1, 3).unwrap();
@@ -1185,7 +1065,7 @@ mod test {
11851065

11861066
let queue_len = || {
11871067
let exchange = pool.inner.exchange.lock().unwrap();
1188-
exchange.waiting.queue.len()
1068+
exchange.waiting.len()
11891069
};
11901070

11911071
// Get a connection, so we know the next futures will be

src/conn/pool/waitlist.rs

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
use keyed_priority_queue::KeyedPriorityQueue;
2+
3+
use std::{
4+
borrow::Borrow,
5+
cmp::Reverse,
6+
hash::{Hash, Hasher},
7+
sync::atomic,
8+
task::Waker,
9+
};
10+
11+
pub(crate) const QUEUE_END_ID: QueueId = QueueId(Reverse(u64::MAX));
12+
13+
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
14+
pub(crate) struct QueueId(Reverse<u64>);
15+
16+
impl QueueId {
17+
pub(crate) fn next() -> Self {
18+
static NEXT_QUEUE_ID: atomic::AtomicU64 = atomic::AtomicU64::new(0);
19+
let id = NEXT_QUEUE_ID.fetch_add(1, atomic::Ordering::SeqCst);
20+
QueueId(Reverse(id))
21+
}
22+
}
23+
24+
#[derive(Debug)]
25+
struct QueuedWaker {
26+
queue_id: QueueId,
27+
waker: Waker,
28+
}
29+
30+
impl Eq for QueuedWaker {}
31+
32+
impl Borrow<QueueId> for QueuedWaker {
33+
fn borrow(&self) -> &QueueId {
34+
&self.queue_id
35+
}
36+
}
37+
38+
impl PartialEq for QueuedWaker {
39+
fn eq(&self, other: &Self) -> bool {
40+
self.queue_id == other.queue_id
41+
}
42+
}
43+
44+
impl Hash for QueuedWaker {
45+
fn hash<H: Hasher>(&self, state: &mut H) {
46+
self.queue_id.hash(state)
47+
}
48+
}
49+
50+
#[derive(Default, Debug)]
51+
pub(crate) struct Waitlist {
52+
queue: KeyedPriorityQueue<QueuedWaker, QueueId>,
53+
}
54+
55+
impl Waitlist {
56+
/// Returns `true` if pushed.
57+
pub(crate) fn push(&mut self, waker: Waker, queue_id: QueueId) -> bool {
58+
// The documentation of Future::poll says:
59+
// Note that on multiple calls to poll, only the Waker from
60+
// the Context passed to the most recent call should be
61+
// scheduled to receive a wakeup.
62+
//
63+
// But the the documentation of KeyedPriorityQueue::push says:
64+
// Adds new element to queue if missing key or replace its
65+
// priority if key exists. In second case doesn’t replace key.
66+
//
67+
// This means we have to remove first to have the most recent
68+
// waker in the queue.
69+
let occupied = self.remove(queue_id);
70+
self.queue.push(QueuedWaker { queue_id, waker }, queue_id);
71+
!occupied
72+
}
73+
74+
pub(crate) fn pop(&mut self) -> Option<Waker> {
75+
match self.queue.pop() {
76+
Some((qw, _)) => Some(qw.waker),
77+
None => None,
78+
}
79+
}
80+
81+
/// Returns `true` if removed.
82+
pub(crate) fn remove(&mut self, id: QueueId) -> bool {
83+
self.queue.remove(&id).is_some()
84+
}
85+
86+
pub(crate) fn peek_id(&mut self) -> Option<QueueId> {
87+
self.queue.peek().map(|(qw, _)| qw.queue_id)
88+
}
89+
90+
// only used in tests for now
91+
#[allow(dead_code)]
92+
pub(crate) fn len(&self) -> usize {
93+
self.queue.len()
94+
}
95+
}
96+
97+
#[cfg(test)]
98+
mod test {
99+
use std::cmp::Reverse;
100+
use std::task::RawWaker;
101+
use std::task::RawWakerVTable;
102+
use std::task::Waker;
103+
104+
use super::*;
105+
106+
#[test]
107+
fn waitlist_integrity() {
108+
const DATA: *const () = &();
109+
const NOOP_CLONE_FN: unsafe fn(*const ()) -> RawWaker = |_| RawWaker::new(DATA, &RW_VTABLE);
110+
const NOOP_FN: unsafe fn(*const ()) = |_| {};
111+
static RW_VTABLE: RawWakerVTable =
112+
RawWakerVTable::new(NOOP_CLONE_FN, NOOP_FN, NOOP_FN, NOOP_FN);
113+
let w = unsafe { Waker::from_raw(RawWaker::new(DATA, &RW_VTABLE)) };
114+
115+
let mut waitlist = Waitlist::default();
116+
assert_eq!(0, waitlist.queue.len());
117+
118+
waitlist.push(w.clone(), QueueId(Reverse(4)));
119+
waitlist.push(w.clone(), QueueId(Reverse(2)));
120+
waitlist.push(w.clone(), QueueId(Reverse(8)));
121+
waitlist.push(w.clone(), QUEUE_END_ID);
122+
waitlist.push(w.clone(), QueueId(Reverse(10)));
123+
124+
waitlist.remove(QueueId(Reverse(8)));
125+
126+
assert_eq!(4, waitlist.queue.len());
127+
128+
let (_, id) = waitlist.queue.pop().unwrap();
129+
assert_eq!(2, id.0 .0);
130+
let (_, id) = waitlist.queue.pop().unwrap();
131+
assert_eq!(4, id.0 .0);
132+
let (_, id) = waitlist.queue.pop().unwrap();
133+
assert_eq!(10, id.0 .0);
134+
let (_, id) = waitlist.queue.pop().unwrap();
135+
assert_eq!(QUEUE_END_ID, id);
136+
137+
assert_eq!(0, waitlist.queue.len());
138+
}
139+
}

0 commit comments

Comments
 (0)