Skip to content

Commit

Permalink
Now need to add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
barshaul committed Sep 5, 2024
1 parent 6c62867 commit 3a867cb
Show file tree
Hide file tree
Showing 5 changed files with 454 additions and 28 deletions.
24 changes: 23 additions & 1 deletion redis/src/cluster_async/connections_container.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use crate::cluster_async::ConnectionFuture;
use crate::cluster_routing::{Route, SlotAddr};
use crate::cluster_routing::{Route, ShardAddrs, SlotAddr};
use crate::cluster_slotmap::{ReadFromReplicaStrategy, SlotMap, SlotMapValue};
use crate::cluster_topology::TopologyHash;
use dashmap::DashMap;
use futures::FutureExt;
use rand::seq::IteratorRandom;
use std::net::IpAddr;
use std::sync::Arc;
use std::sync::RwLock;

/// A struct that encapsulates a network connection along with its associated IP address.
#[derive(Clone, Eq, PartialEq, Debug)]
Expand Down Expand Up @@ -137,6 +139,26 @@ where
}
}

/// Retrieves the shard addresses (`ShardAddrs`) for the specified `slot` by looking it up in the `slot_map`,
/// returning a reference to the stored shard addresses if found.
pub(crate) fn shard_addrs_for_slot(&self, slot: u16) -> Option<Arc<RwLock<ShardAddrs>>> {
self.slot_map
.slots
.range(slot..)
.next()
.map(|(_, slot_value)| slot_value.addrs.clone())
}

/// Returns an iterator over the nodes in the `slot_map`, yielding pairs of the node address and its associated shard addresses.
pub(crate) fn slot_map_nodes(
&self,
) -> impl Iterator<Item = (Arc<String>, Arc<RwLock<ShardAddrs>>)> + '_ {
self.slot_map
.nodes_map()
.iter()
.map(|item| (item.key().clone(), item.value().clone()))
}

// Extends the current connection map with the provided one
pub(crate) fn extend_connection_map(
&mut self,
Expand Down
185 changes: 163 additions & 22 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,9 +629,9 @@ impl From<String> for OperationTarget {
#[derive(Clone, Debug)]
pub(crate) struct RedirectNode {
/// The address of the redirect node.
pub _address: String,
pub address: String,
/// The slot of the redirect node.
pub _slot: u16,
pub slot: u16,
}

impl RedirectNode {
Expand All @@ -640,8 +640,8 @@ impl RedirectNode {
/// the function converts the address to a `String` and constructs a `RedirectNode`.
pub(crate) fn from_option_tuple(option: Option<(&str, u16)>) -> Option<Self> {
option.map(|(address, slot)| RedirectNode {
_address: address.to_string(),
_slot: slot,
address: address.to_string(),
slot,
})
}
}
Expand Down Expand Up @@ -757,6 +757,10 @@ pin_project! {
#[pin]
sleep: BoxFuture<'static, ()>,
},
UpdateMoved {
#[pin]
future: BoxFuture<'static, RedisResult<()>>,
},
}
}

Expand Down Expand Up @@ -819,8 +823,27 @@ impl<C> Future for Request<C> {
}
.into();
}
RequestStateProj::UpdateMoved { future } => {
println!("poll is found for UpdatedMoved, starting to poll it");
if let Err(err) = ready!(future.poll(cx)) {
warn!(
"Failed to update the slot map based on the recieved moved error.\n
Error: {err:?}"
);
}
println!("updaing the slot map finished");
if let Some(request) = self.project().request.take() {
println!("request is Some, moving to retry");
return Next::Retry { request }.into();
} else {
println!("request is None, not retrying");
return Next::Done.into();
}
}
_ => panic!("Request future must be Some"),
};
println!("polling request");

match ready!(future.poll(cx)) {
Ok(item) => {
self.respond(Ok(item));
Expand All @@ -835,6 +858,7 @@ impl<C> Future for Request<C> {
} else if matches!(err.retry_method(), crate::types::RetryMethod::MovedRedirect)
|| matches!(target, OperationTarget::NotFound)
{
println!("recieved moved = {:?}", err.redirect_node());
Next::RefreshSlots {
request: None,
sleep_duration: None,
Expand Down Expand Up @@ -1683,6 +1707,109 @@ where
Ok(())
}

/// Handles MOVED errors by updating the client's slot and node mappings based on the new primary's role:
/// 1. **No Change**: If the new primary is already the slot owner.
/// 2. **Failover**: If moved to a replica within the same shard, suggesting a failover has occurred.
/// 3. **Slot Migration**: Indicates a potential ongoing migration if moved to a primary in another shard.
/// 4. **Moved to an Existing Replica in a Different Shard**:
/// - The replica became the primary of its shard after a failover, with new slots migrated to it.
/// - The replica has moved to a different shard as the primary.
/// Information about the new shard—whether it’s the original shard or a new one
/// with different slots and nodes—is unknown.
/// 5. **New Node**: If moved to an unknown node, this suggests the addition of a new node and possible scale-out.
///
/// # Arguments
/// * `inner` - Shared reference to InnerCore containing connection and slot state.
/// * `slot` - The slot number reported as moved.
/// * `new_primary` - The address of the node now responsible for the slot.
///
/// # Returns
/// * `RedisResult<()>` indicating success or failure in updating slot mappings.
async fn update_upon_moved_error(
inner: Arc<InnerCore<C>>,
slot: u16,
new_primary: Arc<String>,
) -> RedisResult<()> {
let connections_container = inner.conn_lock.read().await;
let curr_shard_addrs = connections_container.shard_addrs_for_slot(slot);
println!("before={}", connections_container.slot_map);

// Check if the new primary is already a part of the current shard nodes
if let Some(curr_shard_addrs) = curr_shard_addrs {
let curr_shard_addrs_read = curr_shard_addrs
.read()
.expect("Failed to acquire read lock for ShardAddrs");
if *curr_shard_addrs_read.primary() == *new_primary {
println!("Scenario 1");
// Scenario 1: No change needed if the new primary is already recognized as the slot owner.
return Ok(());
}

if curr_shard_addrs_read.replicas().contains(&new_primary) {
// Scenario 2: Handle failover within the same shard by updating all slots managed by the old primary to the new primary.
println!("Scenario 2");
drop(curr_shard_addrs_read);
let mut curr_shard_addrs_write = curr_shard_addrs
.write()
.expect("Failed to acquire write lock for ShardAddrs");
return curr_shard_addrs_write.update_primary_after_failover(new_primary);
}
}
// Scenario 3 and 4: Check if the node exists in other shards
let mut nodes_iter = connections_container.slot_map_nodes();
for (node_addr, shard_addrs_arc) in &mut nodes_iter {
if node_addr == new_primary {
let is_existing_primary;
{
let shard_addrs = shard_addrs_arc
.read()
.expect("Failed to acquire read lock for ShardAddrs");
is_existing_primary = *shard_addrs.primary() == *new_primary;
}
if is_existing_primary {
// Scenario 3: Slot migration to an existing primary in another shard.
// Update the associated addresses for `slot` to `shard_addrs`.
println!("Scenario 3");
drop(nodes_iter);
drop(connections_container);
let mut connections_container = inner.conn_lock.write().await;
let x = connections_container
.slot_map
.update_slot(slot, shard_addrs_arc.clone());
println!("after={}", connections_container.slot_map);
return x;
} else {
println!("Scenario 4");
// Scenario 4: The MOVED error redirects to `new_primary` which is known as a replica in a shard that doesn’t own `slot`.
// Remove the replica from its existing shard and treat it as a new node in a new shard.
{
let mut prev_shard_addrs = shard_addrs_arc
.write()
.expect("Failed to acquire write lock for ShardAddrs");
if let Err(err) = prev_shard_addrs.remove_replica(new_primary.clone()) {
warn!("Failed to remove node {new_primary:?} from previous shard: {prev_shard_addrs:?}.\nError: {err:?}");
}
}
drop(nodes_iter);
drop(connections_container);
let mut connections_container = inner.conn_lock.write().await;
return connections_container
.slot_map
.add_new_primary(slot, new_primary);
}
}
}

// Scenario 5: `new_primary` isn’t present in the current slots map. Add it as a new node in a new shard.
println!("Scenario 5");
drop(nodes_iter);
drop(connections_container);
let mut connections_container = inner.conn_lock.write().await;
connections_container
.slot_map
.add_new_primary(slot, new_primary)
}

async fn execute_on_multiple_nodes<'a>(
cmd: &'a Arc<Cmd>,
routing: &'a MultipleNodeRoutingInfo,
Expand Down Expand Up @@ -1813,6 +1940,7 @@ where
let (address, mut conn) = Self::get_connection(routing, core, Some(cmd.clone()))
.await
.map_err(|err| (OperationTarget::NotFound, err))?;
println!("recieved connection for {address:?}");
conn.req_packed_command(&cmd)
.await
.map(Response::Single)
Expand Down Expand Up @@ -2104,25 +2232,40 @@ where
sleep_duration,
moved_redirect,
} => {
poll_flush_action = poll_flush_action
.change_state(PollFlushAction::RebuildSlots(moved_redirect));
if let Some(request) = request {
let future: RequestState<
Pin<Box<dyn Future<Output = OperationResult> + Send>>,
> = match sleep_duration {
Some(sleep_duration) => RequestState::Sleep {
poll_flush_action =
poll_flush_action.change_state(PollFlushAction::RebuildSlots);
let future: Option<
RequestState<Pin<Box<dyn Future<Output = OperationResult> + Send>>>,
> = if let Some(moved_redirect) = moved_redirect {
println!(
"moved_redirect is found, adding future to update_upon_moved_error"
);
Some(RequestState::UpdateMoved {
future: Box::pin(ClusterConnInner::update_upon_moved_error(
self.inner.clone(),
moved_redirect.slot,
moved_redirect.address.into(),
)),
})
} else if let Some(ref request) = request {
match sleep_duration {
Some(sleep_duration) => Some(RequestState::Sleep {
sleep: boxed_sleep(sleep_duration),
},
None => RequestState::Future {
}),
None => Some(RequestState::Future {
future: Box::pin(Self::try_request(
request.info.clone(),
self.inner.clone(),
)),
},
};
}),
}
} else {
None
};
if let Some(future) = future {
self.in_flight_requests.push(Box::pin(Request {
retry_params: self.inner.cluster_params.retry_params.clone(),
request: Some(request),
request,
future,
}));
}
Expand Down Expand Up @@ -2175,7 +2318,7 @@ where

enum PollFlushAction {
None,
RebuildSlots(Option<RedirectNode>),
RebuildSlots,
Reconnect(Vec<String>),
ReconnectFromInitialConnections,
}
Expand All @@ -2190,9 +2333,8 @@ impl PollFlushAction {
PollFlushAction::ReconnectFromInitialConnections
}

(PollFlushAction::RebuildSlots(moved_redirect), _)
| (_, PollFlushAction::RebuildSlots(moved_redirect)) => {
PollFlushAction::RebuildSlots(moved_redirect)
(PollFlushAction::RebuildSlots, _) | (_, PollFlushAction::RebuildSlots) => {
PollFlushAction::RebuildSlots
}

(PollFlushAction::Reconnect(mut addrs), PollFlushAction::Reconnect(new_addrs)) => {
Expand Down Expand Up @@ -2253,8 +2395,7 @@ where

match ready!(self.poll_complete(cx)) {
PollFlushAction::None => return Poll::Ready(Ok(())),
PollFlushAction::RebuildSlots(_moved_redirect) => {
// TODO: Add logic to update the slots map based on the MOVED error
PollFlushAction::RebuildSlots => {
self.state = ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin(
ClusterConnInner::refresh_slots_and_subscriptions_with_retries(
self.inner.clone(),
Expand Down
47 changes: 46 additions & 1 deletion redis/src/cluster_routing.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::cluster_topology::get_slot;
use crate::cmd::{Arg, Cmd};
use crate::types::Value;
use crate::{ErrorKind, RedisResult};
use crate::{ErrorKind, RedisError, RedisResult};
use std::cmp::min;
use std::collections::HashMap;
use std::iter::Once;
Expand Down Expand Up @@ -905,13 +905,58 @@ impl ShardAddrs {
Self { primary, replicas }
}

pub(crate) fn new_with_primary(primary: Arc<String>) -> Self {
Self::new(primary, Vec::new())
}

pub(crate) fn primary(&self) -> Arc<String> {
self.primary.clone()
}

pub(crate) fn replicas(&self) -> &Vec<Arc<String>> {
self.replicas.as_ref()
}

/// Updates the primary slot mapping in response to a failover event.
/// This method promotes the specified replica (`new_primary`) to the primary role by swapping it with the current primary.
/// Invoked after detecting a failover through a MOVED error, allowing the slot map to be updated accordingly.
///
/// # Arguments
/// * `new_primary` - A `String` representing the address of the replica to be promoted to primary.
pub(crate) fn update_primary_after_failover(
&mut self,
new_primary: Arc<String>,
) -> RedisResult<()> {
for replica in self.replicas.iter_mut() {
if *replica == new_primary {
std::mem::swap(&mut self.primary, replica);
return Ok(());
}
}
Err(RedisError::from((
ErrorKind::ClientError,
"Failed to update primary: Specified new primary not found among the replicas",
)))
}

fn replica_index(&self, target_replica: Arc<String>) -> Option<usize> {
self.replicas
.iter()
.position(|curr_replica| **curr_replica == *target_replica)
}

pub(crate) fn remove_replica(&mut self, replica_to_remove: Arc<String>) -> RedisResult<()> {
if let Some(index) = self.replica_index(replica_to_remove.clone()) {
self.replicas.remove(index);
Ok(())
} else {
Err(RedisError::from((
ErrorKind::ClientError,
"Couldn't remove replica",
format!("Replica {replica_to_remove:?} not found"),
)))
}
}
}

impl<'a> IntoIterator for &'a ShardAddrs {
Expand Down
Loading

0 comments on commit 3a867cb

Please sign in to comment.