Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added logic to update the slot map based on MOVED errors #186

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions redis/benches/bench_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ fn bench_simple_getsetdel_async(b: &mut Bencher) {
redis::cmd("SET")
.arg(key)
.arg(42)
.query_async(&mut con)
.query_async::<_, Option<Value>>(&mut con)
.await?;
let _: isize = redis::cmd("GET").arg(key).query_async(&mut con).await?;
redis::cmd("DEL").arg(key).query_async(&mut con).await?;
redis::cmd("DEL")
.arg(key)
.query_async::<_, Option<Value>>(&mut con)
.await?;
Ok::<_, RedisError>(())
})
.unwrap()
Expand Down
11 changes: 9 additions & 2 deletions redis/benches/bench_cluster_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,16 @@ fn bench_cluster_async(
runtime
.block_on(async {
let key = "test_key";
redis::cmd("SET").arg(key).arg(42).query_async(con).await?;
redis::cmd("SET")
.arg(key)
.arg(42)
.query_async::<_, Option<redis::Value>>(con)
.await?;
let _: isize = redis::cmd("GET").arg(key).query_async(con).await?;
redis::cmd("DEL").arg(key).query_async(con).await?;
redis::cmd("DEL")
.arg(key)
.query_async::<_, Option<redis::Value>>(con)
.await?;

Ok::<_, RedisError>(())
})
Expand Down
23 changes: 14 additions & 9 deletions redis/src/cluster_async/connections_container.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::cluster_async::ConnectionFuture;
use crate::cluster_routing::{Route, SlotAddr};
use crate::cluster_routing::{Route, ShardAddrs, SlotAddr};
use crate::cluster_slotmap::{ReadFromReplicaStrategy, SlotMap, SlotMapValue};
use crate::cluster_topology::TopologyHash;
use dashmap::DashMap;
use futures::FutureExt;
use rand::seq::IteratorRandom;
use std::net::IpAddr;
use std::sync::Arc;

/// A struct that encapsulates a network connection along with its associated IP address.
#[derive(Clone, Eq, PartialEq, Debug)]
Expand Down Expand Up @@ -137,6 +138,16 @@ where
}
}

/// Returns an iterator over the nodes in the `slot_map`, yielding pairs of the node address and its associated shard addresses.
pub(crate) fn slot_map_nodes(
&self,
) -> impl Iterator<Item = (Arc<String>, Arc<ShardAddrs>)> + '_ {
self.slot_map
.nodes_map()
.iter()
.map(|item| (item.key().clone(), item.value().clone()))
}

// Extends the current connection map with the provided one
pub(crate) fn extend_connection_map(
&mut self,
Expand All @@ -154,10 +165,7 @@ where
&self,
slot_map_value: &SlotMapValue,
) -> Option<ConnectionAndAddress<Connection>> {
let addrs = &slot_map_value
.addrs
.read()
.expect("Failed to obtain ShardAddrs's read lock");
let addrs = &slot_map_value.addrs;
let initial_index = slot_map_value
.last_used_replica
.load(std::sync::atomic::Ordering::Relaxed);
Expand Down Expand Up @@ -185,10 +193,7 @@ where

fn lookup_route(&self, route: &Route) -> Option<ConnectionAndAddress<Connection>> {
let slot_map_value = self.slot_map.slot_value_for_route(route)?;
let addrs = &slot_map_value
.addrs
.read()
.expect("Failed to obtain ShardAddrs's read lock");
let addrs = &slot_map_value.addrs;
if addrs.replicas().is_empty() {
return self.connection_for_address(addrs.primary().as_str());
}
Expand Down
152 changes: 126 additions & 26 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub mod testing {
pub use super::connections_logic::*;
}
use crate::{
cluster_routing::{Routable, RoutingInfo},
cluster_routing::{Routable, RoutingInfo, ShardUpdateResult},
cluster_slotmap::SlotMap,
cluster_topology::SLOT_SIZE,
cmd,
Expand Down Expand Up @@ -629,19 +629,17 @@ impl From<String> for OperationTarget {
#[derive(Clone, Debug)]
pub(crate) struct RedirectNode {
/// The address of the redirect node.
pub _address: String,
pub address: String,
/// The slot of the redirect node.
pub _slot: u16,
pub slot: u16,
}

impl RedirectNode {
/// This function expects an `Option` containing a tuple with a string slice and a u16.
/// The tuple represents an address and a slot, respectively. If the input is `Some`,
/// the function converts the address to a `String` and constructs a `RedirectNode`.
/// Constructs a `RedirectNode` from an optional tuple containing an address and a slot number.
pub(crate) fn from_option_tuple(option: Option<(&str, u16)>) -> Option<Self> {
option.map(|(address, slot)| RedirectNode {
_address: address.to_string(),
_slot: slot,
address: address.to_string(),
slot,
})
}
}
Expand Down Expand Up @@ -757,6 +755,10 @@ pin_project! {
#[pin]
sleep: BoxFuture<'static, ()>,
},
UpdateMoved {
#[pin]
future: BoxFuture<'static, RedisResult<()>>,
},
}
}

Expand Down Expand Up @@ -819,8 +821,25 @@ impl<C> Future for Request<C> {
}
.into();
}
RequestStateProj::UpdateMoved { future } => {
if let Err(err) = ready!(future.poll(cx)) {
// Updating the slot map based on the MOVED error is an optimization.
// If it fails, proceed by retrying the request with the redirected node,
// and allow the slot refresh task to correct the slot map.
info!(
"Failed to update the slot map based on the received MOVED error.
Error: {err:?}"
);
}
if let Some(request) = self.project().request.take() {
return Next::Retry { request }.into();
} else {
return Next::Done.into();
}
}
_ => panic!("Request future must be Some"),
};

match ready!(future.poll(cx)) {
Ok(item) => {
self.respond(Ok(item));
Expand Down Expand Up @@ -1683,6 +1702,77 @@ where
Ok(())
}

/// Handles MOVED errors by updating the client's slot and node mappings based on the new primary's role:
/// /// Updates the slot and node mappings in response to a MOVED error.
/// This function handles various scenarios based on the new primary's role:
///
/// 1. **No Change**: If the new primary is already the current slot owner, no updates are needed.
/// 2. **Failover**: If the new primary is a replica within the same shard (indicating a failover),
/// the slot ownership is updated by promoting the replica to the primary in the existing shard addresses.
/// 3. **Slot Migration**: If the new primary is an existing primary in another shard, this indicates a slot migration,
/// and the slot mapping is updated to point to the new shard addresses.
/// 4. **Replica Moved to a Different Shard**: If the new primary is a replica in a different shard, it can be due to:
/// - The replica became the primary of its shard after a failover, with new slots migrated to it.
/// - The replica has moved to a different shard as the primary.
/// Since further information is unknown, the replica is removed from its original shard and added as the primary of a new shard.
/// 5. **New Node**: If the new primary is unknown, it is added as a new node in a new shard, possibly indicating scale-out.
///
/// # Arguments
/// * `inner` - Shared reference to InnerCore containing connection and slot state.
/// * `slot` - The slot number reported as moved.
/// * `new_primary` - The address of the node now responsible for the slot.
///
/// # Returns
/// * `RedisResult<()>` indicating success or failure in updating slot mappings.
async fn update_upon_moved_error(
inner: Arc<InnerCore<C>>,
slot: u16,
new_primary: Arc<String>,
) -> RedisResult<()> {
let mut connections_container = inner.conn_lock.write().await;
let curr_shard_addrs = connections_container.slot_map.shard_addrs_for_slot(slot);
// Check if the new primary is part of the current shard and update if required
if let Some(curr_shard_addrs) = curr_shard_addrs {
match curr_shard_addrs.attempt_shard_role_update(new_primary.clone()) {
// Scenario 1: No changes needed as the new primary is already the current slot owner.
// Scenario 2: Failover occurred and the new primary was promoted from a replica.
ShardUpdateResult::AlreadyPrimary | ShardUpdateResult::Promoted => return Ok(()),
// The node was not found in this shard, proceed with further scenarios.
ShardUpdateResult::NodeNotFound => {}
}
}

// Scenario 3 & 4: Check if the new primary exists in other shards
let mut nodes_iter = connections_container.slot_map_nodes();
for (node_addr, shard_addrs_arc) in &mut nodes_iter {
if node_addr == new_primary {
let is_existing_primary = shard_addrs_arc.primary().eq(&new_primary);
if is_existing_primary {
// Scenario 3: Slot Migration - The new primary is an existing primary in another shard
// Update the associated addresses for `slot` to `shard_addrs`.
drop(nodes_iter);
return connections_container
.slot_map
.update_slot_range(slot, shard_addrs_arc.clone());
} else {
// Scenario 4: The MOVED error redirects to `new_primary` which is known as a replica in a shard that doesn’t own `slot`.
// Remove the replica from its existing shard and treat it as a new node in a new shard.
shard_addrs_arc.remove_replica(new_primary.clone())?;
drop(nodes_iter);
return connections_container
.slot_map
.add_new_primary(slot, new_primary);
}
}
}

// Scenario 5: New Node - The new primary is not present in the current slots map, add it as a primary of a new shard.
drop(nodes_iter);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the explicit drop here? won't it be dropped when leaving the function?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to drop it bc it borrows connection container as immutable and we need to do some mutable borrowing next line

connections_container
.slot_map
.add_new_primary(slot, new_primary)
}

async fn execute_on_multiple_nodes<'a>(
cmd: &'a Arc<Cmd>,
routing: &'a MultipleNodeRoutingInfo,
Expand Down Expand Up @@ -2104,25 +2194,37 @@ where
sleep_duration,
moved_redirect,
} => {
poll_flush_action = poll_flush_action
.change_state(PollFlushAction::RebuildSlots(moved_redirect));
if let Some(request) = request {
let future: RequestState<
Pin<Box<dyn Future<Output = OperationResult> + Send>>,
> = match sleep_duration {
Some(sleep_duration) => RequestState::Sleep {
poll_flush_action =
poll_flush_action.change_state(PollFlushAction::RebuildSlots);
let future: Option<
RequestState<Pin<Box<dyn Future<Output = OperationResult> + Send>>>,
> = if let Some(moved_redirect) = moved_redirect {
Some(RequestState::UpdateMoved {
future: Box::pin(ClusterConnInner::update_upon_moved_error(
self.inner.clone(),
moved_redirect.slot,
moved_redirect.address.into(),
)),
})
} else if let Some(ref request) = request {
match sleep_duration {
Some(sleep_duration) => Some(RequestState::Sleep {
sleep: boxed_sleep(sleep_duration),
},
None => RequestState::Future {
}),
None => Some(RequestState::Future {
future: Box::pin(Self::try_request(
request.info.clone(),
self.inner.clone(),
)),
},
};
}),
}
} else {
None
};
if let Some(future) = future {
self.in_flight_requests.push(Box::pin(Request {
retry_params: self.inner.cluster_params.retry_params.clone(),
request: Some(request),
request,
future,
}));
}
Expand Down Expand Up @@ -2175,7 +2277,7 @@ where

enum PollFlushAction {
None,
RebuildSlots(Option<RedirectNode>),
RebuildSlots,
Reconnect(Vec<String>),
ReconnectFromInitialConnections,
}
Expand All @@ -2190,9 +2292,8 @@ impl PollFlushAction {
PollFlushAction::ReconnectFromInitialConnections
}

(PollFlushAction::RebuildSlots(moved_redirect), _)
| (_, PollFlushAction::RebuildSlots(moved_redirect)) => {
PollFlushAction::RebuildSlots(moved_redirect)
(PollFlushAction::RebuildSlots, _) | (_, PollFlushAction::RebuildSlots) => {
PollFlushAction::RebuildSlots
}

(PollFlushAction::Reconnect(mut addrs), PollFlushAction::Reconnect(new_addrs)) => {
Expand Down Expand Up @@ -2253,8 +2354,7 @@ where

match ready!(self.poll_complete(cx)) {
PollFlushAction::None => return Poll::Ready(Ok(())),
PollFlushAction::RebuildSlots(_moved_redirect) => {
// TODO: Add logic to update the slots map based on the MOVED error
PollFlushAction::RebuildSlots => {
self.state = ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin(
ClusterConnInner::refresh_slots_and_subscriptions_with_retries(
self.inner.clone(),
Expand Down
Loading