Skip to content

Commit 1592f57

Browse files
committed
Introduce a fast reconnect process for async cluster connections.
The process is periodic and can be configured via ClusterParams. This process ensures that all expected user connections exist and have not been passively closed. The expected connections are calculated from the current slot map. Additionally, for the Tokio runtime, an instant disconnect notification is available, allowing the reconnect process to be triggered instantly without waiting for the periodic check. This process is especially important for pub/sub support, as passive disconnects can render a pub/sub subscriber inoperative. Three integration tests are introduced with this feature: a generic fast reconnect test, pub/sub resilience to passive disconnects, and pub/sub resilience to scale-out.
1 parent 2d7200f commit 1592f57

22 files changed

+1015
-618
lines changed

Diff for: redis-test/src/lib.rs

+4
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,10 @@ impl AioConnectionLike for MockRedisConnection {
288288
fn get_db(&self) -> i64 {
289289
0
290290
}
291+
292+
fn is_closed(&self) -> bool {
293+
false
294+
}
291295
}
292296

293297
#[cfg(test)]

Diff for: redis/examples/async-await.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use redis::AsyncCommands;
44
#[tokio::main]
55
async fn main() -> redis::RedisResult<()> {
66
let client = redis::Client::open("redis://127.0.0.1/").unwrap();
7-
let mut con = client.get_multiplexed_async_connection(None).await?;
7+
let mut con = client.get_multiplexed_async_connection(None, None).await?;
88

99
con.set("key1", b"foo").await?;
1010

Diff for: redis/examples/async-connection-loss.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,9 @@ async fn main() -> RedisResult<()> {
8080

8181
let client = redis::Client::open("redis://127.0.0.1/").unwrap();
8282
match mode {
83-
Mode::Default => run_multi(client.get_multiplexed_tokio_connection(None).await?).await?,
83+
Mode::Default => {
84+
run_multi(client.get_multiplexed_tokio_connection(None, None).await?).await?
85+
}
8486
Mode::Reconnect => run_multi(client.get_connection_manager().await?).await?,
8587
#[allow(deprecated)]
8688
Mode::Deprecated => run_single(client.get_async_connection(None).await?).await?,

Diff for: redis/examples/async-multiplexed.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@ async fn test_cmd(con: &MultiplexedConnection, i: i32) -> RedisResult<()> {
3434
async fn main() {
3535
let client = redis::Client::open("redis://127.0.0.1/").unwrap();
3636

37-
let con = client.get_multiplexed_tokio_connection(None).await.unwrap();
37+
let con = client
38+
.get_multiplexed_tokio_connection(None, None)
39+
.await
40+
.unwrap();
3841

3942
let cmds = (0..100).map(|i| test_cmd(&con, i));
4043
let result = future::try_join_all(cmds).await.unwrap();

Diff for: redis/examples/async-pub-sub.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use redis::AsyncCommands;
55
#[tokio::main]
66
async fn main() -> redis::RedisResult<()> {
77
let client = redis::Client::open("redis://127.0.0.1/").unwrap();
8-
let mut publish_conn = client.get_multiplexed_async_connection(None).await?;
8+
let mut publish_conn = client.get_multiplexed_async_connection(None, None).await?;
99
let mut pubsub_conn = client.get_async_pubsub().await?;
1010

1111
pubsub_conn.subscribe("wavephone").await?;

Diff for: redis/examples/async-scan.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use redis::{AsyncCommands, AsyncIter};
55
#[tokio::main]
66
async fn main() -> redis::RedisResult<()> {
77
let client = redis::Client::open("redis://127.0.0.1/").unwrap();
8-
let mut con = client.get_multiplexed_async_connection(None).await?;
8+
let mut con = client.get_multiplexed_async_connection(None, None).await?;
99

1010
con.set("async-key1", b"foo").await?;
1111
con.set("async-key2", b"foo").await?;

Diff for: redis/src/aio/connection.rs

+5
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,11 @@ where
305305
fn get_db(&self) -> i64 {
306306
self.db
307307
}
308+
309+
fn is_closed(&self) -> bool {
310+
// always false for AsyncRead + AsyncWrite (cant do better)
311+
false
312+
}
308313
}
309314

310315
/// Represents a `PubSub` connection.

Diff for: redis/src/aio/connection_manager.rs

+6
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ impl ConnectionManager {
196196
response_timeout,
197197
connection_timeout,
198198
None,
199+
None,
199200
)
200201
})
201202
.await
@@ -301,4 +302,9 @@ impl ConnectionLike for ConnectionManager {
301302
fn get_db(&self) -> i64 {
302303
self.client.connection_info().redis.db
303304
}
305+
306+
fn is_closed(&self) -> bool {
307+
// always return false due to automatic reconnect
308+
false
309+
}
304310
}

Diff for: redis/src/aio/mod.rs

+18
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,24 @@ pub trait ConnectionLike {
8585
/// also might be incorrect if the connection like object is not
8686
/// actually connected.
8787
fn get_db(&self) -> i64;
88+
89+
/// Returns the state of the connection
90+
fn is_closed(&self) -> bool;
91+
}
92+
93+
/// Implements ability to notify about disconnection events
94+
pub trait DisconnectNotifier: Send + Sync {
95+
/// Notify about disconnect event
96+
fn notify_disconnect(&mut self);
97+
98+
/// Inteded to be used with Box
99+
fn clone_box(&self) -> Box<dyn DisconnectNotifier>;
100+
}
101+
102+
impl Clone for Box<dyn DisconnectNotifier> {
103+
fn clone(&self) -> Box<dyn DisconnectNotifier> {
104+
self.clone_box()
105+
}
88106
}
89107

90108
// Initial setup for every connection.

Diff for: redis/src/aio/multiplexed_connection.rs

+47-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use super::{ConnectionLike, Runtime};
22
use crate::aio::setup_connection;
3+
use crate::aio::DisconnectNotifier;
34
use crate::cmd::Cmd;
45
#[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))]
56
use crate::parser::ValueCodec;
@@ -23,6 +24,7 @@ use std::fmt;
2324
use std::fmt::Debug;
2425
use std::io;
2526
use std::pin::Pin;
27+
use std::sync::atomic::{AtomicBool, Ordering};
2628
use std::sync::Arc;
2729
use std::task::{self, Poll};
2830
use std::time::Duration;
@@ -77,13 +79,15 @@ struct Pipeline<SinkItem> {
7779
sender: mpsc::Sender<PipelineMessage<SinkItem>>,
7880

7981
push_manager: Arc<ArcSwap<PushManager>>,
82+
is_stream_closed: Arc<AtomicBool>,
8083
}
8184

8285
impl<SinkItem> Clone for Pipeline<SinkItem> {
8386
fn clone(&self) -> Self {
8487
Pipeline {
8588
sender: self.sender.clone(),
8689
push_manager: self.push_manager.clone(),
90+
is_stream_closed: self.is_stream_closed.clone(),
8791
}
8892
}
8993
}
@@ -104,14 +108,21 @@ pin_project! {
104108
in_flight: VecDeque<InFlight>,
105109
error: Option<RedisError>,
106110
push_manager: Arc<ArcSwap<PushManager>>,
111+
disconnect_notifier: Option<Box<dyn DisconnectNotifier>>,
112+
is_stream_closed: Arc<AtomicBool>,
107113
}
108114
}
109115

110116
impl<T> PipelineSink<T>
111117
where
112118
T: Stream<Item = RedisResult<Value>> + 'static,
113119
{
114-
fn new<SinkItem>(sink_stream: T, push_manager: Arc<ArcSwap<PushManager>>) -> Self
120+
fn new<SinkItem>(
121+
sink_stream: T,
122+
push_manager: Arc<ArcSwap<PushManager>>,
123+
disconnect_notifier: Option<Box<dyn DisconnectNotifier>>,
124+
is_stream_closed: Arc<AtomicBool>,
125+
) -> Self
115126
where
116127
T: Sink<SinkItem, Error = RedisError> + Stream<Item = RedisResult<Value>> + 'static,
117128
{
@@ -120,6 +131,8 @@ where
120131
in_flight: VecDeque::new(),
121132
error: None,
122133
push_manager,
134+
disconnect_notifier,
135+
is_stream_closed,
123136
}
124137
}
125138

@@ -130,7 +143,15 @@ where
130143
Some(result) => result,
131144
// The redis response stream is not going to produce any more items so we `Err`
132145
// to break out of the `forward` combinator and stop handling requests
133-
None => return Poll::Ready(Err(())),
146+
None => {
147+
// this is the right place to notify about the passive TCP disconnect
148+
// In other places we cannot distinguish between the active destruction of MultiplexedConnection and passive disconnect
149+
if let Some(disconnect_notifier) = self.as_mut().project().disconnect_notifier {
150+
disconnect_notifier.notify_disconnect();
151+
}
152+
self.is_stream_closed.store(true, Ordering::Relaxed);
153+
return Poll::Ready(Err(()));
154+
}
134155
};
135156
self.as_mut().send_result(item);
136157
}
@@ -296,7 +317,10 @@ impl<SinkItem> Pipeline<SinkItem>
296317
where
297318
SinkItem: Send + 'static,
298319
{
299-
fn new<T>(sink_stream: T) -> (Self, impl Future<Output = ()>)
320+
fn new<T>(
321+
sink_stream: T,
322+
disconnect_notifier: Option<Box<dyn DisconnectNotifier>>,
323+
) -> (Self, impl Future<Output = ()>)
300324
where
301325
T: Sink<SinkItem, Error = RedisError> + Stream<Item = RedisResult<Value>> + 'static,
302326
T: Send + 'static,
@@ -308,7 +332,13 @@ where
308332
let (sender, mut receiver) = mpsc::channel(BUFFER_SIZE);
309333
let push_manager: Arc<ArcSwap<PushManager>> =
310334
Arc::new(ArcSwap::new(Arc::new(PushManager::default())));
311-
let sink = PipelineSink::new::<SinkItem>(sink_stream, push_manager.clone());
335+
let is_stream_closed = Arc::new(AtomicBool::new(false));
336+
let sink = PipelineSink::new::<SinkItem>(
337+
sink_stream,
338+
push_manager.clone(),
339+
disconnect_notifier,
340+
is_stream_closed.clone(),
341+
);
312342
let f = stream::poll_fn(move |cx| receiver.poll_recv(cx))
313343
.map(Ok)
314344
.forward(sink)
@@ -317,6 +347,7 @@ where
317347
Pipeline {
318348
sender,
319349
push_manager,
350+
is_stream_closed,
320351
},
321352
f,
322353
)
@@ -363,6 +394,10 @@ where
363394
async fn set_push_manager(&mut self, push_manager: PushManager) {
364395
self.push_manager.store(Arc::new(push_manager));
365396
}
397+
398+
pub fn is_closed(&self) -> bool {
399+
self.is_stream_closed.load(Ordering::Relaxed)
400+
}
366401
}
367402

368403
/// A connection object which can be cloned, allowing requests to be be sent concurrently
@@ -392,6 +427,7 @@ impl MultiplexedConnection {
392427
connection_info: &ConnectionInfo,
393428
stream: C,
394429
push_sender: Option<mpsc::UnboundedSender<PushInfo>>,
430+
disconnect_notifier: Option<Box<dyn DisconnectNotifier>>,
395431
) -> RedisResult<(Self, impl Future<Output = ()>)>
396432
where
397433
C: Unpin + AsyncRead + AsyncWrite + Send + 'static,
@@ -401,6 +437,7 @@ impl MultiplexedConnection {
401437
stream,
402438
std::time::Duration::MAX,
403439
push_sender,
440+
disconnect_notifier,
404441
)
405442
.await
406443
}
@@ -412,6 +449,7 @@ impl MultiplexedConnection {
412449
stream: C,
413450
response_timeout: std::time::Duration,
414451
push_sender: Option<mpsc::UnboundedSender<PushInfo>>,
452+
disconnect_notifier: Option<Box<dyn DisconnectNotifier>>,
415453
) -> RedisResult<(Self, impl Future<Output = ()>)>
416454
where
417455
C: Unpin + AsyncRead + AsyncWrite + Send + 'static,
@@ -429,7 +467,7 @@ impl MultiplexedConnection {
429467
let codec = ValueCodec::default()
430468
.framed(stream)
431469
.and_then(|msg| async move { msg });
432-
let (mut pipeline, driver) = Pipeline::new(codec);
470+
let (mut pipeline, driver) = Pipeline::new(codec, disconnect_notifier);
433471
let driver = boxed(driver);
434472
let pm = PushManager::default();
435473
if let Some(sender) = push_sender {
@@ -560,6 +598,10 @@ impl ConnectionLike for MultiplexedConnection {
560598
fn get_db(&self) -> i64 {
561599
self.db
562600
}
601+
602+
fn is_closed(&self) -> bool {
603+
self.pipeline.is_closed()
604+
}
563605
}
564606
impl MultiplexedConnection {
565607
/// Subscribes to a new channel.

0 commit comments

Comments
 (0)