Skip to content

Avoid retrying on IO errors when it’s unclear if the server received the request. #192

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 21 additions & 15 deletions redis/src/aio/multiplexed_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ where
&mut self,
item: SinkItem,
timeout: Duration,
) -> Result<Value, Option<RedisError>> {
) -> Result<Value, RedisError> {
self.send_recv(item, None, timeout).await
}

Expand All @@ -359,7 +359,7 @@ where
// If `None`, this is a single request, not a pipeline of multiple requests.
pipeline_response_count: Option<usize>,
timeout: Duration,
) -> Result<Value, Option<RedisError>> {
) -> Result<Value, RedisError> {
let (sender, receiver) = oneshot::channel();

self.sender
Expand All @@ -369,15 +369,27 @@ where
output: sender,
})
.await
.map_err(|_| None)?;
.map_err(|err| {
// If an error occurs here, it means the request never reached the server, as guaranteed
// by the 'send' function. Since the server did not receive the data, it is safe to retry
// the request.
RedisError::from((
crate::ErrorKind::IoErrorRetrySafe,
"Failed to send the request to the server",
format!("{err}"),
))
})?;
match Runtime::locate().timeout(timeout, receiver).await {
Ok(Ok(result)) => result.map_err(Some),
Ok(Ok(result)) => result,
Ok(Err(_)) => {
// The `sender` was dropped which likely means that the stream part
// failed for one reason or another
Err(None)
// failed for one reason or another.
// Since we don't know if the server received the request, retrying it isn't safe.
// For example, retrying an INCR request could result in double increments.
// Hence, we return an IoError instead of an IoErrorRetrySafe.
Err(RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe)))
}
Err(elapsed) => Err(Some(elapsed.into())),
Err(elapsed) => Err(elapsed.into()),
}
}

Expand Down Expand Up @@ -503,10 +515,7 @@ impl MultiplexedConnection {
let result = self
.pipeline
.send_single(cmd.get_packed_command(), self.response_timeout)
.await
.map_err(|err| {
err.unwrap_or_else(|| RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe)))
});
.await;
if self.protocol != ProtocolVersion::RESP2 {
if let Err(e) = &result {
if e.is_connection_dropped() {
Expand Down Expand Up @@ -537,10 +546,7 @@ impl MultiplexedConnection {
Some(offset + count),
self.response_timeout,
)
.await
.map_err(|err| {
err.unwrap_or_else(|| RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe)))
});
.await;

if self.protocol != ProtocolVersion::RESP2 {
if let Err(e) = &result {
Expand Down
3 changes: 2 additions & 1 deletion redis/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,8 @@ where
.wait_time_for_retry(retries);
thread::sleep(sleep_time);
}
crate::types::RetryMethod::Reconnect => {
crate::types::RetryMethod::Reconnect
| crate::types::RetryMethod::ReconnectAndRetry => {
if *self.auto_reconnect.borrow() {
if let Ok(mut conn) = self.connect(&addr) {
if conn.check_connection() {
Expand Down
35 changes: 19 additions & 16 deletions redis/src/cluster_async/connections_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,16 +255,18 @@ where
&self,
amount: usize,
conn_type: ConnectionType,
) -> impl Iterator<Item = ConnectionAndAddress<Connection>> + '_ {
self.connection_map
.iter()
.choose_multiple(&mut rand::thread_rng(), amount)
.into_iter()
.map(move |item| {
let (address, node) = (item.key(), item.value());
let conn = node.get_connection(&conn_type);
(address.clone(), conn)
})
) -> Option<impl Iterator<Item = ConnectionAndAddress<Connection>> + '_> {
(!self.connection_map.is_empty()).then_some({
self.connection_map
.iter()
.choose_multiple(&mut rand::thread_rng(), amount)
.into_iter()
.map(move |item| {
let (address, node) = (item.key(), item.value());
let conn = node.get_connection(&conn_type);
(address.clone(), conn)
})
})
}

pub(crate) fn replace_or_add_connection_for_address(
Expand Down Expand Up @@ -633,6 +635,7 @@ mod tests {

let random_connections: HashSet<_> = container
.random_connections(3, ConnectionType::User)
.expect("No connections found")
.map(|pair| pair.1)
.collect();

Expand All @@ -647,12 +650,9 @@ mod tests {
let container = create_container();
remove_all_connections(&container);

assert_eq!(
0,
container
.random_connections(1, ConnectionType::User)
.count()
);
assert!(container
.random_connections(1, ConnectionType::User)
.is_none());
}

#[test]
Expand All @@ -665,6 +665,7 @@ mod tests {
);
let random_connections: Vec<_> = container
.random_connections(1, ConnectionType::User)
.expect("No connections found")
.collect();

assert_eq!(vec![(address, 4)], random_connections);
Expand All @@ -675,6 +676,7 @@ mod tests {
let container = create_container();
let mut random_connections: Vec<_> = container
.random_connections(1000, ConnectionType::User)
.expect("No connections found")
.map(|pair| pair.1)
.collect();
random_connections.sort();
Expand All @@ -687,6 +689,7 @@ mod tests {
let container = create_container_with_strategy(ReadFromReplicaStrategy::RoundRobin, true);
let mut random_connections: Vec<_> = container
.random_connections(1000, ConnectionType::PreferManagement)
.expect("No connections found")
.map(|pair| pair.1)
.collect();
random_connections.sort();
Expand Down
99 changes: 69 additions & 30 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,7 @@ impl<C> Future for Request<C> {
let request = this.request.as_mut().unwrap();
// TODO - would be nice if we didn't need to repeat this code twice, with & without retries.
if request.retry >= this.retry_params.number_of_retries {
let retry_method = err.retry_method();
let next = if err.kind() == ErrorKind::AllConnectionsUnavailable {
Next::ReconnectToInitialNodes { request: None }.into()
} else if matches!(err.retry_method(), crate::types::RetryMethod::MovedRedirect)
Expand All @@ -876,7 +877,9 @@ impl<C> Future for Request<C> {
sleep_duration: None,
}
.into()
} else if matches!(err.retry_method(), crate::types::RetryMethod::Reconnect) {
} else if matches!(retry_method, crate::types::RetryMethod::Reconnect)
|| matches!(retry_method, crate::types::RetryMethod::ReconnectAndRetry)
{
if let OperationTarget::Node { address } = target {
Next::Reconnect {
request: None,
Expand Down Expand Up @@ -955,13 +958,18 @@ impl<C> Future for Request<C> {
});
self.poll(cx)
}
crate::types::RetryMethod::Reconnect => {
crate::types::RetryMethod::Reconnect
| crate::types::RetryMethod::ReconnectAndRetry => {
let mut request = this.request.take().unwrap();
// TODO should we reset the redirect here?
request.info.reset_routing();
warn!("disconnected from {:?}", address);
let should_retry = matches!(
err.retry_method(),
crate::types::RetryMethod::ReconnectAndRetry
);
Next::Reconnect {
request: Some(request),
request: should_retry.then_some(request),
target: address,
}
.into()
Expand Down Expand Up @@ -1207,8 +1215,11 @@ where
Ok(connections.0)
}

fn reconnect_to_initial_nodes(&mut self) -> impl Future<Output = ()> {
let inner = self.inner.clone();
// Reconnet to the initial nodes provided by the user in the creation of the client,
// and try to refresh the slots based on the initial connections.
// Being used when all cluster connections are unavailable.
fn reconnect_to_initial_nodes(inner: Arc<InnerCore<C>>) -> impl Future<Output = ()> {
let inner = inner.clone();
async move {
let connection_map = match Self::create_initial_connections(
&inner.initial_nodes,
Expand Down Expand Up @@ -1714,7 +1725,9 @@ where
Self::refresh_slots_inner(inner, curr_retry)
.await
.map_err(|err| {
if curr_retry > DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES {
if curr_retry > DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES
|| err.kind() == ErrorKind::AllConnectionsUnavailable
{
BackoffError::Permanent(err)
} else {
BackoffError::from(err)
Expand Down Expand Up @@ -2107,14 +2120,22 @@ where
}
ConnectionCheck::RandomConnection => {
let read_guard = core.conn_lock.read().await;
let (random_address, random_conn_future) = read_guard
read_guard
.random_connections(1, ConnectionType::User)
.next()
.ok_or(RedisError::from((
ErrorKind::AllConnectionsUnavailable,
"No random connection found",
)))?;
return Ok((random_address, random_conn_future.await));
.and_then(|mut random_connections| {
random_connections.next().map(
|(random_address, random_conn_future)| async move {
(random_address, random_conn_future.await)
},
)
})
.ok_or_else(|| {
RedisError::from((
ErrorKind::AllConnectionsUnavailable,
"No random connection found",
))
})?
.await
}
};

Expand All @@ -2129,29 +2150,38 @@ where
ConnectionState::PollComplete => return Poll::Ready(Ok(())),
ConnectionState::Recover(future) => future,
};
match recover_future {
let (next_state, poll) = match recover_future {
RecoverFuture::RecoverSlots(ref mut future) => match ready!(future.as_mut().poll(cx)) {
Ok(_) => {
trace!("Recovered!");
self.state = ConnectionState::PollComplete;
Poll::Ready(Ok(()))
(ConnectionState::PollComplete, Poll::Ready(Ok(())))
}
Err(err) => {
trace!("Recover slots failed!");
*future = Box::pin(Self::refresh_slots_and_subscriptions_with_retries(
self.inner.clone(),
&RefreshPolicy::Throttable,
));
Poll::Ready(Err(err))

let next_state = if err.kind() == ErrorKind::AllConnectionsUnavailable {
ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin(
ClusterConnInner::reconnect_to_initial_nodes(self.inner.clone()),
)))
} else {
ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin(
Self::refresh_slots_and_subscriptions_with_retries(
self.inner.clone(),
&RefreshPolicy::Throttable,
),
)))
};
(next_state, Poll::Ready(Err(err)))
}
},
RecoverFuture::Reconnect(ref mut future) => {
ready!(future.as_mut().poll(cx));
trace!("Reconnected connections");
self.state = ConnectionState::PollComplete;
Poll::Ready(Ok(()))
(ConnectionState::PollComplete, Poll::Ready(Ok(())))
}
}
};
self.state = next_state;
poll
}

async fn handle_loading_error(
Expand Down Expand Up @@ -2260,9 +2290,7 @@ where
}));
}
}
Next::Reconnect {
request, target, ..
} => {
Next::Reconnect { request, target } => {
poll_flush_action =
poll_flush_action.change_state(PollFlushAction::Reconnect(vec![target]));
if let Some(request) = request {
Expand Down Expand Up @@ -2405,7 +2433,7 @@ where
}
PollFlushAction::ReconnectFromInitialConnections => {
self.state = ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin(
self.reconnect_to_initial_nodes(),
ClusterConnInner::reconnect_to_initial_nodes(self.inner.clone()),
)));
}
}
Expand Down Expand Up @@ -2447,8 +2475,19 @@ async fn calculate_topology_from_random_nodes<'a, C>(
where
C: ConnectionLike + Connect + Clone + Send + Sync + 'static,
{
let requested_nodes =
read_guard.random_connections(num_of_nodes_to_query, ConnectionType::PreferManagement);
let requested_nodes = if let Some(random_conns) =
read_guard.random_connections(num_of_nodes_to_query, ConnectionType::PreferManagement)
{
random_conns
} else {
return (
Err(RedisError::from((
ErrorKind::AllConnectionsUnavailable,
"No available connections to refresh slots from",
))),
vec![],
);
};
let topology_join_results =
futures::future::join_all(requested_nodes.map(|(addr, conn)| async move {
let mut conn: C = conn.await;
Expand Down
7 changes: 7 additions & 0 deletions redis/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ pub enum ErrorKind {
/// not native to the system. This is usually the case if
/// the cause is another error.
IoError,
/// An I/O error that is considered safe to retry as the request was not received by the server
IoErrorRetrySafe,
/// An error raised that was identified on the client before execution.
ClientError,
/// An extension error. This is an error created by the server
Expand Down Expand Up @@ -802,6 +804,7 @@ impl fmt::Debug for RedisError {

pub(crate) enum RetryMethod {
Reconnect,
ReconnectAndRetry,
NoRetry,
RetryImmediately,
WaitAndRetry,
Expand Down Expand Up @@ -870,6 +873,7 @@ impl RedisError {
ErrorKind::CrossSlot => "cross-slot",
ErrorKind::MasterDown => "master down",
ErrorKind::IoError => "I/O error",
ErrorKind::IoErrorRetrySafe => "I/O error - Request wasn't received by the server",
ErrorKind::ExtensionError => "extension error",
ErrorKind::ClientError => "client error",
ErrorKind::ReadOnly => "read-only",
Expand Down Expand Up @@ -957,6 +961,7 @@ impl RedisError {
pub fn is_unrecoverable_error(&self) -> bool {
match self.retry_method() {
RetryMethod::Reconnect => true,
RetryMethod::ReconnectAndRetry => true,

RetryMethod::NoRetry => false,
RetryMethod::RetryImmediately => false,
Expand Down Expand Up @@ -1064,12 +1069,14 @@ impl RedisError {

io::ErrorKind::PermissionDenied => RetryMethod::NoRetry,
io::ErrorKind::Unsupported => RetryMethod::NoRetry,
io::ErrorKind::TimedOut => RetryMethod::NoRetry,

_ => RetryMethod::RetryImmediately,
},
_ => RetryMethod::RetryImmediately,
},
ErrorKind::NotAllSlotsCovered => RetryMethod::NoRetry,
ErrorKind::IoErrorRetrySafe => RetryMethod::ReconnectAndRetry,
}
}
}
Expand Down
Loading