Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Breaking change: reconnect_to_initial_nodes will be called only when AllConnectionsUnavailable #183

Merged
merged 3 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions redis/src/cluster_async/connections_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,14 @@ where
}
}

// Extends the current connection map with the provided one
pub(crate) fn extend_connection_map(
&mut self,
other_connection_map: ConnectionsMap<Connection>,
) {
self.connection_map.extend(other_connection_map.0);
}

/// Returns true if the address represents a known primary node.
pub(crate) fn is_primary(&self, address: &String) -> bool {
self.connection_for_address(address).is_some()
Expand Down Expand Up @@ -841,4 +849,33 @@ mod tests {

assert!(!container.is_primary(&address));
}

#[test]
fn test_extend_connection_map() {
let mut container = create_container();
let mut current_addresses: Vec<_> = container
.all_node_connections()
.map(|conn| conn.0)
.collect();

let new_node = "new_primary1".to_string();
// Check that `new_node` not exists in the current
assert!(container.connection_for_address(&new_node).is_none());
// Create new connection map
let new_connection_map = DashMap::new();
new_connection_map.insert(new_node.clone(), create_cluster_node(1, false));

// Extend the current connection map
container.extend_connection_map(ConnectionsMap(new_connection_map));

// Check that the new addresses vector contains both the new node and all previous nodes
let mut new_addresses: Vec<_> = container
.all_node_connections()
.map(|conn| conn.0)
.collect();
current_addresses.push(new_node);
current_addresses.sort();
new_addresses.sort();
assert_eq!(current_addresses, new_addresses);
}
}
23 changes: 9 additions & 14 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,7 @@ impl<C> Future for Request<C> {
let request = this.request.as_mut().unwrap();
// TODO - would be nice if we didn't need to repeat this code twice, with & without retries.
if request.retry >= this.retry_params.number_of_retries {
let next = if err.kind() == ErrorKind::ClusterConnectionNotFound {
let next = if err.kind() == ErrorKind::AllConnectionsUnavailable {
Next::ReconnectToInitialNodes { request: None }.into()
} else if matches!(err.retry_method(), crate::types::RetryMethod::MovedRedirect)
|| matches!(target, OperationTarget::NotFound)
Expand Down Expand Up @@ -836,7 +836,7 @@ impl<C> Future for Request<C> {
}
request.retry = request.retry.saturating_add(1);

if err.kind() == ErrorKind::ClusterConnectionNotFound {
if err.kind() == ErrorKind::AllConnectionsUnavailable {
return Next::ReconnectToInitialNodes {
request: Some(this.request.take().unwrap()),
}
Expand Down Expand Up @@ -1132,12 +1132,7 @@ where
}
};
let mut write_lock = inner.conn_lock.write().await;
*write_lock = ConnectionsContainer::new(
Default::default(),
connection_map,
inner.cluster_params.read_from_replicas,
0,
);
write_lock.extend_connection_map(connection_map);
drop(write_lock);
if let Err(err) = Self::refresh_slots_and_subscriptions_with_retries(
inner.clone(),
Expand Down Expand Up @@ -1260,7 +1255,7 @@ where
} else {
Err(last_err.unwrap_or_else(|| {
(
ErrorKind::ClusterConnectionNotFound,
ErrorKind::AllConnectionsUnavailable,
"Couldn't find any connection",
)
.into()
Expand Down Expand Up @@ -1656,7 +1651,7 @@ where
return OperationResult::Err((
OperationTarget::FanOut,
(
ErrorKind::ClusterConnectionNotFound,
ErrorKind::AllConnectionsUnavailable,
"No connections found for multi-node operation",
)
.into(),
Expand Down Expand Up @@ -1700,7 +1695,7 @@ where
)
} else {
let _ = sender.send(Err((
ErrorKind::ClusterConnectionNotFound,
ErrorKind::ConnectionNotFoundForRoute,
"Connection not found",
)
.into()));
Expand Down Expand Up @@ -1871,7 +1866,7 @@ where
&& !RoutingInfo::is_key_routing_command(&routable_cmd.unwrap())
{
return Err((
ErrorKind::ClusterConnectionNotFound,
ErrorKind::ConnectionNotFoundForRoute,
"Requested connection not found for route",
format!("{route:?}"),
)
Expand All @@ -1892,7 +1887,7 @@ where
return Ok((address, conn.await));
} else {
return Err((
ErrorKind::ClusterConnectionNotFound,
ErrorKind::ConnectionNotFoundForRoute,
"Requested connection not found",
address,
)
Expand Down Expand Up @@ -1938,7 +1933,7 @@ where
.random_connections(1, ConnectionType::User)
.next()
.ok_or(RedisError::from((
ErrorKind::ClusterConnectionNotFound,
ErrorKind::AllConnectionsUnavailable,
"No random connection found",
)))?;
return Ok((random_address, random_conn_future.await));
Expand Down
4 changes: 3 additions & 1 deletion redis/src/commands/cluster_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,9 @@ where
// the connection to the address cant be reached from different reasons, we will check we want to check if
// the problem is problem that we can recover from like failover or scale down or some network issue
// that we can retry the scan command to an address that own the next slot we are at.
ErrorKind::IoError | ErrorKind::ClusterConnectionNotFound => {
ErrorKind::IoError
| ErrorKind::AllConnectionsUnavailable
| ErrorKind::ConnectionNotFoundForRoute => {
let retry =
retry_scan(&scan_state, &core, match_pattern, count, object_type).await?;
(from_redis_value(&retry.0?)?, retry.1)
Expand Down
12 changes: 8 additions & 4 deletions redis/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,10 @@ pub enum ErrorKind {
EmptySentinelList,
/// Attempted to kill a script/function while they werent' executing
NotBusy,
/// Used when a cluster connection cannot find a connection to a valid node.
ClusterConnectionNotFound,
/// Used when no valid node connections remain in the cluster connection
AllConnectionsUnavailable,
/// Used when a connection is not found for the specified route.
ConnectionNotFoundForRoute,

#[cfg(feature = "json")]
/// Error Serializing a struct to JSON form
Expand Down Expand Up @@ -875,7 +877,8 @@ impl RedisError {
ErrorKind::NoValidReplicasFoundBySentinel => "no valid replicas found by sentinel",
ErrorKind::EmptySentinelList => "empty sentinel list",
ErrorKind::NotBusy => "not busy",
ErrorKind::ClusterConnectionNotFound => "connection to node in cluster not found",
ErrorKind::AllConnectionsUnavailable => "no valid connections remain in the cluster",
ErrorKind::ConnectionNotFoundForRoute => "No connection found for the requested route",
#[cfg(feature = "json")]
ErrorKind::Serialize => "serializing",
ErrorKind::RESP3NotSupported => "resp3 is not supported by server",
Expand Down Expand Up @@ -1046,7 +1049,8 @@ impl RedisError {

ErrorKind::ParseError => RetryMethod::Reconnect,
ErrorKind::AuthenticationFailed => RetryMethod::Reconnect,
ErrorKind::ClusterConnectionNotFound => RetryMethod::Reconnect,
ErrorKind::AllConnectionsUnavailable => RetryMethod::Reconnect,
ErrorKind::ConnectionNotFoundForRoute => RetryMethod::Reconnect,

ErrorKind::IoError => match &self.repr {
ErrorRepr::IoError(err) => match err.kind() {
Expand Down
6 changes: 3 additions & 3 deletions redis/tests/test_cluster_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2400,7 +2400,7 @@ mod cluster_async {
.block_on(cmd.query_async::<_, Vec<String>>(&mut connection))
.unwrap_err();
assert!(
matches!(result.kind(), ErrorKind::ClusterConnectionNotFound)
matches!(result.kind(), ErrorKind::ConnectionNotFoundForRoute)
|| result.is_connection_dropped()
);
}
Expand Down Expand Up @@ -4031,7 +4031,7 @@ mod cluster_async {
handler: _handler,
..
} = MockEnv::with_client_builder(
ClusterClient::builder(vec![&*format!("redis://{name}")]),
ClusterClient::builder(vec![&*format!("redis://{name}")]).retries(1),
name,
move |received_cmd: &[u8], _| {
let slots_config_vec = vec![
Expand Down Expand Up @@ -4071,7 +4071,7 @@ mod cluster_async {
let res_err = res.unwrap_err();
assert_eq!(
res_err.kind(),
ErrorKind::ClusterConnectionNotFound,
ErrorKind::ConnectionNotFoundForRoute,
"{:?}",
res_err
);
Expand Down
26 changes: 22 additions & 4 deletions redis/tests/test_cluster_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,12 @@ mod test_cluster_scan_async {

#[tokio::test] // test cluster scan with node fail in the middle
async fn test_async_cluster_scan_with_fail() {
let cluster = TestClusterContext::new(3, 0);
let cluster = TestClusterContext::new_with_cluster_client_builder(
3,
0,
|builder| builder.retries(1),
false,
);
let mut connection = cluster.async_connection(None).await;
// Set some keys
for i in 0..1000 {
Expand Down Expand Up @@ -224,7 +229,11 @@ mod test_cluster_scan_async {
let cluster = TestClusterContext::new_with_cluster_client_builder(
6,
1,
|builder| builder.slots_refresh_rate_limit(Duration::from_secs(0), 0),
|builder| {
builder
.slots_refresh_rate_limit(Duration::from_secs(0), 0)
.retries(1)
},
false,
);

Expand Down Expand Up @@ -374,7 +383,11 @@ mod test_cluster_scan_async {
let cluster = TestClusterContext::new_with_cluster_client_builder(
6,
1,
|builder| builder.slots_refresh_rate_limit(Duration::from_secs(0), 0),
|builder| {
builder
.slots_refresh_rate_limit(Duration::from_secs(0), 0)
.retries(1)
},
false,
);

Expand Down Expand Up @@ -772,7 +785,12 @@ mod test_cluster_scan_async {
// Testing cluster scan when connection fails in the middle and we get an error
// then cluster up again and scanning can continue without any problem
async fn test_async_cluster_scan_failover() {
let mut cluster = TestClusterContext::new(3, 0);
let mut cluster = TestClusterContext::new_with_cluster_client_builder(
3,
0,
|builder| builder.retries(1),
false,
);
let mut connection = cluster.async_connection(None).await;
let mut i = 0;
loop {
Expand Down