diff --git a/redis/benches/bench_basic.rs b/redis/benches/bench_basic.rs index 638417b64..4ee8d43e4 100644 --- a/redis/benches/bench_basic.rs +++ b/redis/benches/bench_basic.rs @@ -31,10 +31,13 @@ fn bench_simple_getsetdel_async(b: &mut Bencher) { redis::cmd("SET") .arg(key) .arg(42) - .query_async(&mut con) + .query_async::<_, Option>(&mut con) .await?; let _: isize = redis::cmd("GET").arg(key).query_async(&mut con).await?; - redis::cmd("DEL").arg(key).query_async(&mut con).await?; + redis::cmd("DEL") + .arg(key) + .query_async::<_, Option>(&mut con) + .await?; Ok::<_, RedisError>(()) }) .unwrap() diff --git a/redis/benches/bench_cluster_async.rs b/redis/benches/bench_cluster_async.rs index 347908f9e..feed2846e 100644 --- a/redis/benches/bench_cluster_async.rs +++ b/redis/benches/bench_cluster_async.rs @@ -21,9 +21,16 @@ fn bench_cluster_async( runtime .block_on(async { let key = "test_key"; - redis::cmd("SET").arg(key).arg(42).query_async(con).await?; + redis::cmd("SET") + .arg(key) + .arg(42) + .query_async::<_, Option>(con) + .await?; let _: isize = redis::cmd("GET").arg(key).query_async(con).await?; - redis::cmd("DEL").arg(key).query_async(con).await?; + redis::cmd("DEL") + .arg(key) + .query_async::<_, Option>(con) + .await?; Ok::<_, RedisError>(()) }) diff --git a/redis/src/cluster_async/connections_container.rs b/redis/src/cluster_async/connections_container.rs index dcad70e99..3c1bc10f1 100644 --- a/redis/src/cluster_async/connections_container.rs +++ b/redis/src/cluster_async/connections_container.rs @@ -1,11 +1,12 @@ use crate::cluster_async::ConnectionFuture; -use crate::cluster_routing::{Route, SlotAddr}; +use crate::cluster_routing::{Route, ShardAddrs, SlotAddr}; use crate::cluster_slotmap::{ReadFromReplicaStrategy, SlotMap, SlotMapValue}; use crate::cluster_topology::TopologyHash; use dashmap::DashMap; use futures::FutureExt; use rand::seq::IteratorRandom; use std::net::IpAddr; +use std::sync::Arc; /// A struct that encapsulates a network connection along with its associated IP address. #[derive(Clone, Eq, PartialEq, Debug)] @@ -137,6 +138,16 @@ where } } + /// Returns an iterator over the nodes in the `slot_map`, yielding pairs of the node address and its associated shard addresses. + pub(crate) fn slot_map_nodes( + &self, + ) -> impl Iterator, Arc)> + '_ { + self.slot_map + .nodes_map() + .iter() + .map(|item| (item.key().clone(), item.value().clone())) + } + // Extends the current connection map with the provided one pub(crate) fn extend_connection_map( &mut self, @@ -154,10 +165,7 @@ where &self, slot_map_value: &SlotMapValue, ) -> Option> { - let addrs = &slot_map_value - .addrs - .read() - .expect("Failed to obtain ShardAddrs's read lock"); + let addrs = &slot_map_value.addrs; let initial_index = slot_map_value .last_used_replica .load(std::sync::atomic::Ordering::Relaxed); @@ -185,10 +193,7 @@ where fn lookup_route(&self, route: &Route) -> Option> { let slot_map_value = self.slot_map.slot_value_for_route(route)?; - let addrs = &slot_map_value - .addrs - .read() - .expect("Failed to obtain ShardAddrs's read lock"); + let addrs = &slot_map_value.addrs; if addrs.replicas().is_empty() { return self.connection_for_address(addrs.primary().as_str()); } diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index 06aa87736..f81e140f5 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -30,7 +30,7 @@ pub mod testing { pub use super::connections_logic::*; } use crate::{ - cluster_routing::{Routable, RoutingInfo}, + cluster_routing::{Routable, RoutingInfo, ShardUpdateResult}, cluster_slotmap::SlotMap, cluster_topology::SLOT_SIZE, cmd, @@ -629,19 +629,17 @@ impl From for OperationTarget { #[derive(Clone, Debug)] pub(crate) struct RedirectNode { /// The address of the redirect node. - pub _address: String, + pub address: String, /// The slot of the redirect node. - pub _slot: u16, + pub slot: u16, } impl RedirectNode { - /// This function expects an `Option` containing a tuple with a string slice and a u16. - /// The tuple represents an address and a slot, respectively. If the input is `Some`, - /// the function converts the address to a `String` and constructs a `RedirectNode`. + /// Constructs a `RedirectNode` from an optional tuple containing an address and a slot number. pub(crate) fn from_option_tuple(option: Option<(&str, u16)>) -> Option { option.map(|(address, slot)| RedirectNode { - _address: address.to_string(), - _slot: slot, + address: address.to_string(), + slot, }) } } @@ -757,6 +755,10 @@ pin_project! { #[pin] sleep: BoxFuture<'static, ()>, }, + UpdateMoved { + #[pin] + future: BoxFuture<'static, RedisResult<()>>, + }, } } @@ -819,8 +821,25 @@ impl Future for Request { } .into(); } + RequestStateProj::UpdateMoved { future } => { + if let Err(err) = ready!(future.poll(cx)) { + // Updating the slot map based on the MOVED error is an optimization. + // If it fails, proceed by retrying the request with the redirected node, + // and allow the slot refresh task to correct the slot map. + info!( + "Failed to update the slot map based on the received MOVED error. + Error: {err:?}" + ); + } + if let Some(request) = self.project().request.take() { + return Next::Retry { request }.into(); + } else { + return Next::Done.into(); + } + } _ => panic!("Request future must be Some"), }; + match ready!(future.poll(cx)) { Ok(item) => { self.respond(Ok(item)); @@ -1683,6 +1702,77 @@ where Ok(()) } + /// Handles MOVED errors by updating the client's slot and node mappings based on the new primary's role: + /// /// Updates the slot and node mappings in response to a MOVED error. + /// This function handles various scenarios based on the new primary's role: + /// + /// 1. **No Change**: If the new primary is already the current slot owner, no updates are needed. + /// 2. **Failover**: If the new primary is a replica within the same shard (indicating a failover), + /// the slot ownership is updated by promoting the replica to the primary in the existing shard addresses. + /// 3. **Slot Migration**: If the new primary is an existing primary in another shard, this indicates a slot migration, + /// and the slot mapping is updated to point to the new shard addresses. + /// 4. **Replica Moved to a Different Shard**: If the new primary is a replica in a different shard, it can be due to: + /// - The replica became the primary of its shard after a failover, with new slots migrated to it. + /// - The replica has moved to a different shard as the primary. + /// Since further information is unknown, the replica is removed from its original shard and added as the primary of a new shard. + /// 5. **New Node**: If the new primary is unknown, it is added as a new node in a new shard, possibly indicating scale-out. + /// + /// # Arguments + /// * `inner` - Shared reference to InnerCore containing connection and slot state. + /// * `slot` - The slot number reported as moved. + /// * `new_primary` - The address of the node now responsible for the slot. + /// + /// # Returns + /// * `RedisResult<()>` indicating success or failure in updating slot mappings. + async fn update_upon_moved_error( + inner: Arc>, + slot: u16, + new_primary: Arc, + ) -> RedisResult<()> { + let mut connections_container = inner.conn_lock.write().await; + let curr_shard_addrs = connections_container.slot_map.shard_addrs_for_slot(slot); + // Check if the new primary is part of the current shard and update if required + if let Some(curr_shard_addrs) = curr_shard_addrs { + match curr_shard_addrs.attempt_shard_role_update(new_primary.clone()) { + // Scenario 1: No changes needed as the new primary is already the current slot owner. + // Scenario 2: Failover occurred and the new primary was promoted from a replica. + ShardUpdateResult::AlreadyPrimary | ShardUpdateResult::Promoted => return Ok(()), + // The node was not found in this shard, proceed with further scenarios. + ShardUpdateResult::NodeNotFound => {} + } + } + + // Scenario 3 & 4: Check if the new primary exists in other shards + let mut nodes_iter = connections_container.slot_map_nodes(); + for (node_addr, shard_addrs_arc) in &mut nodes_iter { + if node_addr == new_primary { + let is_existing_primary = shard_addrs_arc.primary().eq(&new_primary); + if is_existing_primary { + // Scenario 3: Slot Migration - The new primary is an existing primary in another shard + // Update the associated addresses for `slot` to `shard_addrs`. + drop(nodes_iter); + return connections_container + .slot_map + .update_slot_range(slot, shard_addrs_arc.clone()); + } else { + // Scenario 4: The MOVED error redirects to `new_primary` which is known as a replica in a shard that doesn’t own `slot`. + // Remove the replica from its existing shard and treat it as a new node in a new shard. + shard_addrs_arc.remove_replica(new_primary.clone())?; + drop(nodes_iter); + return connections_container + .slot_map + .add_new_primary(slot, new_primary); + } + } + } + + // Scenario 5: New Node - The new primary is not present in the current slots map, add it as a primary of a new shard. + drop(nodes_iter); + connections_container + .slot_map + .add_new_primary(slot, new_primary) + } + async fn execute_on_multiple_nodes<'a>( cmd: &'a Arc, routing: &'a MultipleNodeRoutingInfo, @@ -2104,25 +2194,37 @@ where sleep_duration, moved_redirect, } => { - poll_flush_action = poll_flush_action - .change_state(PollFlushAction::RebuildSlots(moved_redirect)); - if let Some(request) = request { - let future: RequestState< - Pin + Send>>, - > = match sleep_duration { - Some(sleep_duration) => RequestState::Sleep { + poll_flush_action = + poll_flush_action.change_state(PollFlushAction::RebuildSlots); + let future: Option< + RequestState + Send>>>, + > = if let Some(moved_redirect) = moved_redirect { + Some(RequestState::UpdateMoved { + future: Box::pin(ClusterConnInner::update_upon_moved_error( + self.inner.clone(), + moved_redirect.slot, + moved_redirect.address.into(), + )), + }) + } else if let Some(ref request) = request { + match sleep_duration { + Some(sleep_duration) => Some(RequestState::Sleep { sleep: boxed_sleep(sleep_duration), - }, - None => RequestState::Future { + }), + None => Some(RequestState::Future { future: Box::pin(Self::try_request( request.info.clone(), self.inner.clone(), )), - }, - }; + }), + } + } else { + None + }; + if let Some(future) = future { self.in_flight_requests.push(Box::pin(Request { retry_params: self.inner.cluster_params.retry_params.clone(), - request: Some(request), + request, future, })); } @@ -2175,7 +2277,7 @@ where enum PollFlushAction { None, - RebuildSlots(Option), + RebuildSlots, Reconnect(Vec), ReconnectFromInitialConnections, } @@ -2190,9 +2292,8 @@ impl PollFlushAction { PollFlushAction::ReconnectFromInitialConnections } - (PollFlushAction::RebuildSlots(moved_redirect), _) - | (_, PollFlushAction::RebuildSlots(moved_redirect)) => { - PollFlushAction::RebuildSlots(moved_redirect) + (PollFlushAction::RebuildSlots, _) | (_, PollFlushAction::RebuildSlots) => { + PollFlushAction::RebuildSlots } (PollFlushAction::Reconnect(mut addrs), PollFlushAction::Reconnect(new_addrs)) => { @@ -2253,8 +2354,7 @@ where match ready!(self.poll_complete(cx)) { PollFlushAction::None => return Poll::Ready(Ok(())), - PollFlushAction::RebuildSlots(_moved_redirect) => { - // TODO: Add logic to update the slots map based on the MOVED error + PollFlushAction::RebuildSlots => { self.state = ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin( ClusterConnInner::refresh_slots_and_subscriptions_with_retries( self.inner.clone(), diff --git a/redis/src/cluster_routing.rs b/redis/src/cluster_routing.rs index 1bc19959e..da10a4b44 100644 --- a/redis/src/cluster_routing.rs +++ b/redis/src/cluster_routing.rs @@ -1,11 +1,13 @@ use crate::cluster_topology::get_slot; use crate::cmd::{Arg, Cmd}; use crate::types::Value; -use crate::{ErrorKind, RedisResult}; +use crate::{ErrorKind, RedisError, RedisResult}; +use core::cmp::Ordering; use std::cmp::min; use std::collections::HashMap; use std::iter::Once; use std::sync::Arc; +use std::sync::{RwLock, RwLockWriteGuard}; #[derive(Clone)] pub(crate) enum Redirect { @@ -848,7 +850,7 @@ impl Routable for Value { } } -#[derive(Debug, Hash)] +#[derive(Debug, Hash, Clone)] pub(crate) struct Slot { pub(crate) start: u16, pub(crate) end: u16, @@ -890,36 +892,173 @@ pub enum SlotAddr { ReplicaRequired, } +/// Represents the result of checking a shard for the status of a node. +/// +/// This enum indicates whether a given node is already the primary, has been promoted to a primary from a replica, +/// or is not found in the shard at all. +/// +/// Variants: +/// - `AlreadyPrimary`: The specified node is already the primary for the shard, so no changes are needed. +/// - `Promoted`: The specified node was found as a replica and successfully promoted to primary. +/// - `NodeNotFound`: The specified node is neither the current primary nor a replica within the shard. +#[derive(PartialEq, Debug)] +pub(crate) enum ShardUpdateResult { + AlreadyPrimary, + Promoted, + NodeNotFound, +} + +const READ_LK_ERR_SHARDADDRS: &str = "Failed to acquire read lock for ShardAddrs"; +const WRITE_LK_ERR_SHARDADDRS: &str = "Failed to acquire write lock for ShardAddrs"; /// This is just a simplified version of [`Slot`], /// which stores only the master and [optional] replica /// to avoid the need to choose a replica each time /// a command is executed -#[derive(Debug, Eq, PartialEq, Clone, PartialOrd, Ord)] +#[derive(Debug)] pub(crate) struct ShardAddrs { - primary: Arc, - replicas: Vec>, + primary: RwLock>, + replicas: RwLock>>, +} + +impl PartialEq for ShardAddrs { + fn eq(&self, other: &Self) -> bool { + let self_primary = self.primary.read().expect(READ_LK_ERR_SHARDADDRS); + let other_primary = other.primary.read().expect(READ_LK_ERR_SHARDADDRS); + + let self_replicas = self.replicas.read().expect(READ_LK_ERR_SHARDADDRS); + let other_replicas = other.replicas.read().expect(READ_LK_ERR_SHARDADDRS); + + *self_primary == *other_primary && *self_replicas == *other_replicas + } +} + +impl Eq for ShardAddrs {} + +impl PartialOrd for ShardAddrs { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ShardAddrs { + fn cmp(&self, other: &Self) -> Ordering { + let self_primary = self.primary.read().expect(READ_LK_ERR_SHARDADDRS); + let other_primary = other.primary.read().expect(READ_LK_ERR_SHARDADDRS); + + let primary_cmp = self_primary.cmp(&other_primary); + if primary_cmp == Ordering::Equal { + let self_replicas = self.replicas.read().expect(READ_LK_ERR_SHARDADDRS); + let other_replicas = other.replicas.read().expect(READ_LK_ERR_SHARDADDRS); + return self_replicas.cmp(&other_replicas); + } + + primary_cmp + } } impl ShardAddrs { pub(crate) fn new(primary: Arc, replicas: Vec>) -> Self { + let primary = RwLock::new(primary); + let replicas = RwLock::new(replicas); Self { primary, replicas } } + pub(crate) fn new_with_primary(primary: Arc) -> Self { + Self::new(primary, Vec::default()) + } + pub(crate) fn primary(&self) -> Arc { - self.primary.clone() + self.primary.read().expect(READ_LK_ERR_SHARDADDRS).clone() + } + + pub(crate) fn replicas(&self) -> std::sync::RwLockReadGuard>> { + self.replicas.read().expect(READ_LK_ERR_SHARDADDRS) + } + + /// Attempts to update the shard roles based on the provided `new_primary`. + /// + /// This function evaluates whether the specified `new_primary` node is already + /// the primary, a replica that can be promoted to primary, or a node not present + /// in the shard. It handles three scenarios: + /// + /// 1. **Already Primary**: If the `new_primary` is already the current primary, + /// the function returns `ShardUpdateResult::AlreadyPrimary` and no changes are made. + /// + /// 2. **Promoted**: If the `new_primary` is found in the list of replicas, it is promoted + /// to primary by swapping it with the current primary, and the function returns + /// `ShardUpdateResult::Promoted`. + /// + /// 3. **Node Not Found**: If the `new_primary` is neither the current primary nor a replica, + /// the function returns `ShardUpdateResult::NodeNotFound` to indicate that the node is + /// not part of the current shard. + /// + /// # Arguments: + /// * `new_primary` - Representing the node to be promoted or checked. + /// + /// # Returns: + /// * `ShardUpdateResult` - The result of the role update operation. + pub(crate) fn attempt_shard_role_update(&self, new_primary: Arc) -> ShardUpdateResult { + let mut primary_lock = self.primary.write().expect(WRITE_LK_ERR_SHARDADDRS); + let mut replicas_lock = self.replicas.write().expect(WRITE_LK_ERR_SHARDADDRS); + + // If the new primary is already the current primary, return early. + if *primary_lock == new_primary { + return ShardUpdateResult::AlreadyPrimary; + } + + // If the new primary is found among replicas, swap it with the current primary. + if let Some(replica_idx) = Self::replica_index(&replicas_lock, new_primary.clone()) { + std::mem::swap(&mut *primary_lock, &mut replicas_lock[replica_idx]); + return ShardUpdateResult::Promoted; + } + + // If the new primary isn't part of the shard. + ShardUpdateResult::NodeNotFound } - pub(crate) fn replicas(&self) -> &Vec> { - self.replicas.as_ref() + fn replica_index( + replicas: &RwLockWriteGuard<'_, Vec>>, + target_replica: Arc, + ) -> Option { + replicas + .iter() + .position(|curr_replica| **curr_replica == *target_replica) + } + + /// Removes the specified `replica_to_remove` from the shard's replica list if it exists. + /// This method searches for the replica's index and removes it from the list. If the replica + /// is not found, it returns an error. + /// + /// # Arguments + /// * `replica_to_remove` - The address of the replica to be removed. + /// + /// # Returns + /// * `RedisResult<()>` - `Ok(())` if the replica was successfully removed, or an error if the + /// replica was not found. + pub(crate) fn remove_replica(&self, replica_to_remove: Arc) -> RedisResult<()> { + let mut replicas_lock = self.replicas.write().expect(WRITE_LK_ERR_SHARDADDRS); + if let Some(index) = Self::replica_index(&replicas_lock, replica_to_remove.clone()) { + replicas_lock.remove(index); + Ok(()) + } else { + Err(RedisError::from(( + ErrorKind::ClientError, + "Couldn't remove replica", + format!("Replica {replica_to_remove:?} not found"), + ))) + } } } impl<'a> IntoIterator for &'a ShardAddrs { - type Item = &'a Arc; - type IntoIter = std::iter::Chain>, std::slice::Iter<'a, Arc>>; + type Item = Arc; + type IntoIter = std::iter::Chain>, std::vec::IntoIter>>; fn into_iter(self) -> Self::IntoIter { - std::iter::once(&self.primary).chain(self.replicas.iter()) + let primary = self.primary.read().expect(READ_LK_ERR_SHARDADDRS).clone(); + let replicas = self.replicas.read().expect(READ_LK_ERR_SHARDADDRS).clone(); + + std::iter::once(primary).chain(replicas) } } @@ -949,10 +1088,12 @@ impl Route { mod tests { use super::{ command_for_multi_slot_indices, AggregateOp, MultipleNodeRoutingInfo, ResponsePolicy, - Route, RoutingInfo, SingleNodeRoutingInfo, SlotAddr, + Route, RoutingInfo, ShardAddrs, SingleNodeRoutingInfo, SlotAddr, }; + use crate::cluster_routing::ShardUpdateResult; use crate::{cluster_topology::slot, cmd, parser::parse_redis_value, Value}; use core::panic; + use std::sync::{Arc, RwLock}; #[test] fn test_routing_info_mixed_capatalization() { @@ -1343,4 +1484,44 @@ mod tests { let result = super::combine_map_results(input); assert!(result.is_err()); } + + fn create_shard_addrs(primary: &str, replicas: Vec<&str>) -> ShardAddrs { + ShardAddrs { + primary: RwLock::new(Arc::new(primary.to_string())), + replicas: RwLock::new( + replicas + .into_iter() + .map(|r| Arc::new(r.to_string())) + .collect(), + ), + } + } + + #[test] + fn test_attempt_shard_role_update_already_primary() { + let shard_addrs = create_shard_addrs("node1:6379", vec!["node2:6379", "node3:6379"]); + let result = shard_addrs.attempt_shard_role_update(Arc::new("node1:6379".to_string())); + assert_eq!(result, ShardUpdateResult::AlreadyPrimary); + } + + #[test] + fn test_attempt_shard_role_update_promoted() { + let shard_addrs = create_shard_addrs("node1:6379", vec!["node2:6379", "node3:6379"]); + let result = shard_addrs.attempt_shard_role_update(Arc::new("node2:6379".to_string())); + assert_eq!(result, ShardUpdateResult::Promoted); + + let primary = shard_addrs.primary.read().unwrap().clone(); + assert_eq!(primary.as_str(), "node2:6379"); + + let replicas = shard_addrs.replicas.read().unwrap(); + assert_eq!(replicas.len(), 2); + assert!(replicas.iter().any(|r| r.as_str() == "node1:6379")); + } + + #[test] + fn test_attempt_shard_role_update_node_not_found() { + let shard_addrs = create_shard_addrs("node1:6379", vec!["node2:6379", "node3:6379"]); + let result = shard_addrs.attempt_shard_role_update(Arc::new("node4:6379".to_string())); + assert_eq!(result, ShardUpdateResult::NodeNotFound); + } } diff --git a/redis/src/cluster_slotmap.rs b/redis/src/cluster_slotmap.rs index c64526748..68d6a5c2b 100644 --- a/redis/src/cluster_slotmap.rs +++ b/redis/src/cluster_slotmap.rs @@ -1,5 +1,4 @@ use std::sync::Arc; -use std::sync::RwLock; use std::{ collections::{BTreeMap, HashSet}, fmt::Display, @@ -9,14 +8,16 @@ use std::{ use dashmap::DashMap; use crate::cluster_routing::{Route, ShardAddrs, Slot, SlotAddr}; - -pub(crate) type NodesMap = DashMap, Arc>>; +use crate::ErrorKind; +use crate::RedisError; +use crate::RedisResult; +pub(crate) type NodesMap = DashMap, Arc>; #[derive(Debug)] pub(crate) struct SlotMapValue { pub(crate) start: u16, - pub(crate) addrs: Arc>, - pub(crate) last_used_replica: AtomicUsize, + pub(crate) addrs: Arc, + pub(crate) last_used_replica: Arc, } #[derive(Debug, Default, Clone, PartialEq, Copy)] @@ -38,10 +39,7 @@ fn get_address_from_slot( read_from_replica: ReadFromReplicaStrategy, slot_addr: SlotAddr, ) -> Arc { - let addrs = slot - .addrs - .read() - .expect("Failed to obtain ShardAddrs's read lock"); + let addrs = &slot.addrs; if slot_addr == SlotAddr::Master || addrs.replicas().is_empty() { return addrs.primary(); } @@ -80,15 +78,12 @@ impl SlotMap { shard_id += 1; let replicas: Vec> = slot.replicas.into_iter().map(Arc::new).collect(); - Arc::new(RwLock::new(ShardAddrs::new(primary, replicas))) + Arc::new(ShardAddrs::new(primary, replicas)) }) .clone(); - let replicas_reader = shard_addrs_arc - .read() - .expect("Failed to obtain reader lock for ShardAddrs"); // Add all replicas to nodes_map with a reference to the same ShardAddrs if not already present - replicas_reader.replicas().iter().for_each(|replica| { + shard_addrs_arc.replicas().iter().for_each(|replica| { slot_map .nodes_map .entry(replica.clone()) @@ -101,27 +96,21 @@ impl SlotMap { SlotMapValue { addrs: shard_addrs_arc.clone(), start: slot.start, - last_used_replica: AtomicUsize::new(0), + last_used_replica: Arc::new(AtomicUsize::new(0)), }, ); } - slot_map } - #[allow(dead_code)] // used in tests pub(crate) fn nodes_map(&self) -> &NodesMap { &self.nodes_map } pub fn is_primary(&self, address: &String) -> bool { - self.nodes_map.get(address).map_or(false, |shard_addrs| { - *shard_addrs - .read() - .expect("Failed to obtain ShardAddrs's read lock") - .primary() - == *address - }) + self.nodes_map + .get(address) + .map_or(false, |shard_addrs| *shard_addrs.primary() == *address) } pub fn slot_value_for_route(&self, route: &Route) -> Option<&SlotMapValue> { @@ -144,16 +133,21 @@ impl SlotMap { }) } + /// Retrieves the shard addresses (`ShardAddrs`) for the specified `slot` by looking it up in the `slots` tree, + /// returning a reference to the stored shard addresses if found. + pub(crate) fn shard_addrs_for_slot(&self, slot: u16) -> Option> { + self.slots + .range(slot..) + .next() + .map(|(_, slot_value)| slot_value.addrs.clone()) + } + pub fn addresses_for_all_primaries(&self) -> HashSet> { self.nodes_map .iter() .map(|map_item| { let shard_addrs = map_item.value(); - shard_addrs - .read() - .expect("Failed to obtain ShardAddrs's read lock") - .primary() - .clone() + shard_addrs.primary().clone() }) .collect() } @@ -185,13 +179,8 @@ impl SlotMap { self.slots .iter() .filter_map(|(end, slot_value)| { - let addr_reader = slot_value - .addrs - .read() - .expect("Failed to obtain ShardAddrs's read lock"); - if addr_reader.primary() == node_address - || addr_reader.replicas().contains(&node_address) - { + let addrs = &slot_value.addrs; + if addrs.primary() == node_address || addrs.replicas().contains(&node_address) { Some(slot_value.start..(*end + 1)) } else { None @@ -218,23 +207,253 @@ impl SlotMap { } }) } + + /// Inserts a single slot into the `slots` map, associating it with a new `SlotMapValue` + /// that contains the shard addresses (`shard_addrs`) and represents a range of just the given slot. + /// + /// # Returns + /// * `Option` - Returns the previous `SlotMapValue` if a slot already existed for the given key, + /// or `None` if the slot was newly inserted. + fn insert_single_slot( + &mut self, + slot: u16, + shard_addrs: Arc, + ) -> Option { + self.slots.insert( + slot, + SlotMapValue { + start: slot, + addrs: shard_addrs, + last_used_replica: Arc::new(AtomicUsize::new(0)), + }, + ) + } + + /// Creats a new shard addresses that contain only the primary node, adds it to the nodes map + /// and updates the slots tree for the given `slot` to point to the new primary. + pub(crate) fn add_new_primary(&mut self, slot: u16, node_addr: Arc) -> RedisResult<()> { + let shard_addrs = Arc::new(ShardAddrs::new_with_primary(node_addr.clone())); + self.nodes_map.insert(node_addr, shard_addrs.clone()); + self.update_slot_range(slot, shard_addrs) + } + + fn shard_addrs_equal(shard1: &Arc, shard2: &Arc) -> bool { + Arc::ptr_eq(shard1, shard2) + } + + /// Updates the end of an existing slot range in the `slots` tree. This function removes the slot entry + /// associated with the current end (`curr_end`) and reinserts it with a new end value (`new_end`). + /// + /// The operation effectively shifts the range's end boundary from `curr_end` to `new_end`, while keeping the + /// rest of the slot's data (e.g., shard addresses) unchanged. + /// + /// # Parameters: + /// - `curr_end`: The current end of the slot range that will be removed. + /// - `new_end`: The new end of the slot range where the slot data will be reinserted. + fn update_end_range(&mut self, curr_end: u16, new_end: u16) -> RedisResult<()> { + if let Some(curr_slot_val) = self.slots.remove(&curr_end) { + self.slots.insert(new_end, curr_slot_val); + return Ok(()); + } + Err(RedisError::from(( + ErrorKind::ClientError, + "Couldn't find slot range with end: {curr_end:?} in the slot map", + ))) + } + + /// Attempts to merge the current `slot` with the next slot range in the `slots` map, if they are consecutive + /// and share the same shard addresses. If the next slot's starting position is exactly `slot + 1` + /// and the shard addresses match, the next slot's starting point is moved to `slot`, effectively merging + /// the slot to the existing range. + /// + /// # Parameters: + /// - `slot`: The slot to attempt to merge with the next slot. + /// - `new_addrs`: The shard addresses to compare with the next slot's shard addresses. + /// + /// # Returns: + /// - `bool`: Returns `true` if the merge was successful, otherwise `false`. + fn try_merge_to_next_range(&mut self, slot: u16, new_addrs: Arc) -> bool { + if let Some((_next_end, next_slot_value)) = self.slots.range_mut((slot + 1)..).next() { + if next_slot_value.start == slot + 1 + && Self::shard_addrs_equal(&next_slot_value.addrs, &new_addrs) + { + next_slot_value.start = slot; + return true; + } + } + false + } + + /// Attempts to merge the current slot with the previous slot range in the `slots` map, if they are consecutive + /// and share the same shard addresses. If the previous slot ends at `slot - 1` and the shard addresses match, + /// the end of the previous slot is extended to `slot`, effectively merging the slot to the existing range. + /// + /// # Parameters: + /// - `slot`: The slot to attempt to merge with the previous slot. + /// - `new_addrs`: The shard addresses to compare with the previous slot's shard addresses. + /// + /// # Returns: + /// - `RedisResult`: Returns `Ok(true)` if the merge was successful, otherwise `Ok(false)`. + fn try_merge_to_prev_range( + &mut self, + slot: u16, + new_addrs: Arc, + ) -> RedisResult { + if let Some((prev_end, prev_slot_value)) = self.slots.range_mut(..slot).next_back() { + if *prev_end == slot - 1 && Self::shard_addrs_equal(&prev_slot_value.addrs, &new_addrs) + { + let prev_end = *prev_end; + self.update_end_range(prev_end, slot)?; + return Ok(true); + } + } + Ok(false) + } + + /// Updates the slot range in the `slots` to point to new shard addresses. + /// + /// This function handles the following scenarios when updating the slot mapping: + /// + /// **Scenario 1 - Same Shard Owner**: + /// - If the slot is already associated with the same shard addresses, no changes are needed. + /// + /// **Scenario 2 - Single Slot Range**: + /// - If the slot is the only slot in the current range (i.e., `start == end == slot`), + /// the function simply replaces the shard addresses for this slot with the new shard addresses. + /// + /// **Scenario 3 - Slot Matches the End of a Range**: + /// - If the slot is the last slot in the current range (`slot == end`), the function + /// adjusts the range by decrementing the end of the current range by 1 (making the + /// new end equal to `end - 1`). The current slot is then removed and a new entry is + /// inserted for the slot with the new shard addresses. + /// + /// **Scenario 4 - Slot Matches the Start of a Range**: + /// - If the slot is the first slot in the current range (`slot == start`), the function + /// increments the start of the current range by 1 (making the new start equal to + /// `start + 1`). A new entry is then inserted for the slot with the new shard addresses. + /// + /// **Scenario 5 - Slot is Within a Range**: + /// - If the slot falls between the start and end of a current range (`start < slot < end`), + /// the function splits the current range into two. The range before the slot (`start` to + /// `slot - 1`) remains with the old shard addresses, a new entry for the slot is added + /// with the new shard addresses, and the range after the slot (`slot + 1` to `end`) is + /// reinserted with the old shard addresses. + /// + /// **Scenario 6 - Slot is Not Covered**: + /// - If the slot is not part of any existing range, a new entry is simply inserted into + /// the `slots` tree with the new shard addresses. + /// + /// # Parameters: + /// - `slot`: The specific slot that needs to be updated. + /// - `new_addrs`: The new shard addresses to associate with the slot. + /// + /// # Returns: + /// - `RedisResult<()>`: Indicates the success or failure of the operation. + pub(crate) fn update_slot_range( + &mut self, + slot: u16, + new_addrs: Arc, + ) -> RedisResult<()> { + let curr_tree_node = + self.slots + .range_mut(slot..) + .next() + .and_then(|(&end, slot_map_value)| { + if slot >= slot_map_value.start && slot <= end { + Some((end, slot_map_value)) + } else { + None + } + }); + + if let Some((curr_end, curr_slot_val)) = curr_tree_node { + // Scenario 1: Same shard owner + if Self::shard_addrs_equal(&curr_slot_val.addrs, &new_addrs) { + return Ok(()); + } + // Scenario 2: The slot is the only slot in the current range + else if curr_slot_val.start == curr_end && curr_slot_val.start == slot { + // Replace the shard addresses of the current slot value + curr_slot_val.addrs = new_addrs; + // Scenario 3: Slot matches the end of the current range + } else if slot == curr_end { + // Merge with the next range if shard addresses match + if self.try_merge_to_next_range(slot, new_addrs.clone()) { + // Adjust current range end + self.update_end_range(curr_end, curr_end - 1)?; + } else { + // Insert as a standalone slot + let curr_slot_val = self.insert_single_slot(curr_end, new_addrs); + if let Some(curr_slot_val) = curr_slot_val { + // Adjust current range end + self.slots.insert(curr_end - 1, curr_slot_val); + } + } + + // Scenario 4: Slot matches the start of the current range + } else if slot == curr_slot_val.start { + // Adjust current range start + curr_slot_val.start += 1; + // Attempt to merge with the previous range + if !self.try_merge_to_prev_range(slot, new_addrs.clone())? { + // Insert as a standalone slot + self.insert_single_slot(slot, new_addrs); + } + + // Scenario 5: Slot is within the current range + } else if slot > curr_slot_val.start && slot < curr_end { + // We will split the current range into three parts: + // A: [start, slot - 1], which will remain owned by the current shard, + // B: [slot, slot], which will be owned by the new shard addresses, + // C: [slot + 1, end], which will remain owned by the current shard. + + let start: u16 = curr_slot_val.start; + let addrs = curr_slot_val.addrs.clone(); + let last_used_replica = curr_slot_val.last_used_replica.clone(); + + // Modify the current slot range to become part C: [slot + 1, end], still owned by the current shard. + curr_slot_val.start = slot + 1; + + // Create and insert a new SlotMapValue representing part A: [start, slot - 1], + // still owned by the current shard, into the slot map. + self.slots.insert( + slot - 1, + SlotMapValue { + start, + addrs, + last_used_replica, + }, + ); + + // Insert the new shard addresses into the slot map as part B: [slot, slot], + // which will be owned by the new shard addresses. + self.insert_single_slot(slot, new_addrs); + } + // Scenario 6: Slot isn't covered by any existing range + } else { + // Try merging with the previous or next range; if no merge is possible, insert as a standalone slot + if !self.try_merge_to_prev_range(slot, new_addrs.clone())? + && !self.try_merge_to_next_range(slot, new_addrs.clone()) + { + self.insert_single_slot(slot, new_addrs); + } + } + Ok(()) + } } impl Display for SlotMap { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { writeln!(f, "Strategy: {:?}. Slot mapping:", self.read_from_replica)?; for (end, slot_map_value) in self.slots.iter() { - let shard_addrs = slot_map_value - .addrs - .read() - .expect("Failed to obtain ShardAddrs's read lock"); + let addrs = &slot_map_value.addrs; writeln!( f, "({}-{}): primary: {}, replicas: {:?}", slot_map_value.start, end, - shard_addrs.primary(), - shard_addrs.replicas() + addrs.primary(), + addrs.replicas() )?; } Ok(()) @@ -519,4 +738,392 @@ mod tests_cluster_slotmap { (2001..3001).collect::>() ); } + + fn create_slot(start: u16, end: u16, master: &str, replicas: Vec<&str>) -> Slot { + Slot::new( + start, + end, + master.to_owned(), + replicas.into_iter().map(|r| r.to_owned()).collect(), + ) + } + + fn assert_equal_slot_maps(this: SlotMap, expected: Vec) { + for ((end, slot_value), expected_slot) in this.slots.iter().zip(expected.iter()) { + assert_eq!(*end, expected_slot.end); + assert_eq!(slot_value.start, expected_slot.start); + let shard_addrs = &slot_value.addrs; + assert_eq!(*shard_addrs.primary(), expected_slot.master); + let _ = shard_addrs + .replicas() + .iter() + .zip(expected_slot.replicas.iter()) + .map(|(curr, expected)| { + assert_eq!(**curr, *expected); + }); + } + } + + fn assert_slot_map_and_shard_addrs( + slot_map: SlotMap, + slot: u16, + new_shard_addrs: Arc, + expected_slots: Vec, + ) { + assert!(SlotMap::shard_addrs_equal( + &slot_map.shard_addrs_for_slot(slot).unwrap(), + &new_shard_addrs + )); + assert_equal_slot_maps(slot_map, expected_slots); + } + + #[test] + fn test_update_slot_range_single_slot_range() { + let test_slot = 8000; + let before_slots = vec![ + create_slot(0, 7999, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, 8000, "node1:6379", vec!["replica1:6379"]), + create_slot(8001, 16383, "node3:6379", vec!["replica3:6379"]), + ]; + + let mut slot_map = SlotMap::new(before_slots, ReadFromReplicaStrategy::AlwaysFromPrimary); + let new_shard_addrs = slot_map + .shard_addrs_for_slot(8001) + .expect("Couldn't find shard address for slot"); + + let res = slot_map.update_slot_range(test_slot, new_shard_addrs.clone()); + assert!(res.is_ok(), "{res:?}"); + + let after_slots = vec![ + create_slot(0, test_slot - 1, "node1:6379", vec!["replica1:6379"]), + create_slot(test_slot, test_slot, "node3:6379", vec!["replica3:6379"]), + create_slot(test_slot + 1, 16383, "node3:6379", vec!["replica3:6379"]), + ]; + + assert_slot_map_and_shard_addrs(slot_map, test_slot, new_shard_addrs, after_slots); + } + + #[test] + fn test_update_slot_range_slot_matches_end_range_merge_ranges() { + let test_slot = 7999; + let before_slots = vec![ + create_slot(0, 7999, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + let mut slot_map = SlotMap::new(before_slots, ReadFromReplicaStrategy::AlwaysFromPrimary); + let new_shard_addrs = slot_map + .shard_addrs_for_slot(8000) + .expect("Couldn't find shard address for slot"); + + let res = slot_map.update_slot_range(test_slot, new_shard_addrs.clone()); + assert!(res.is_ok(), "{res:?}"); + + let after_slots = vec![ + create_slot(0, test_slot - 1, "node1:6379", vec!["replica1:6379"]), + create_slot(test_slot, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + assert_slot_map_and_shard_addrs(slot_map, test_slot, new_shard_addrs, after_slots); + } + + #[test] + fn test_update_slot_range_slot_matches_end_range_cant_merge_ranges() { + let test_slot = 7999; + let before_slots = vec![ + create_slot(0, 7999, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + let mut slot_map = SlotMap::new(before_slots, ReadFromReplicaStrategy::AlwaysFromPrimary); + let new_shard_addrs = Arc::new(ShardAddrs::new( + Arc::new("node3:6379".to_owned()), + vec![Arc::new("replica3:6379".to_owned())], + )); + + let res = slot_map.update_slot_range(test_slot, new_shard_addrs.clone()); + assert!(res.is_ok(), "{res:?}"); + + let after_slots = vec![ + create_slot(0, test_slot - 1, "node1:6379", vec!["replica1:6379"]), + create_slot(test_slot, test_slot, "node3:6379", vec!["replica3:6379"]), + create_slot(test_slot + 1, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + assert_slot_map_and_shard_addrs(slot_map, test_slot, new_shard_addrs, after_slots); + } + + #[test] + fn test_update_slot_range_slot_matches_start_range_merge_ranges() { + let test_slot = 8000; + let before_slots = vec![ + create_slot(0, 7999, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + let mut slot_map = SlotMap::new(before_slots, ReadFromReplicaStrategy::AlwaysFromPrimary); + let new_shard_addrs = slot_map + .shard_addrs_for_slot(7999) + .expect("Couldn't find shard address for slot"); + + let res = slot_map.update_slot_range(test_slot, new_shard_addrs.clone()); + assert!(res.is_ok(), "{res:?}"); + + let after_slots = vec![ + create_slot(0, test_slot, "node1:6379", vec!["replica1:6379"]), + create_slot(test_slot + 1, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + assert_slot_map_and_shard_addrs(slot_map, test_slot, new_shard_addrs, after_slots); + } + + #[test] + fn test_update_slot_range_slot_matches_start_range_cant_merge_ranges() { + let test_slot = 8000; + let before_slots = vec![ + create_slot(0, 7999, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + let mut slot_map = SlotMap::new(before_slots, ReadFromReplicaStrategy::AlwaysFromPrimary); + let new_shard_addrs = Arc::new(ShardAddrs::new( + Arc::new("node3:6379".to_owned()), + vec![Arc::new("replica3:6379".to_owned())], + )); + + let res = slot_map.update_slot_range(test_slot, new_shard_addrs.clone()); + assert!(res.is_ok(), "{res:?}"); + + let after_slots = vec![ + create_slot(0, test_slot - 1, "node1:6379", vec!["replica1:6379"]), + create_slot(test_slot, test_slot, "node3:6379", vec!["replica3:6379"]), + create_slot(test_slot + 1, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + assert_slot_map_and_shard_addrs(slot_map, test_slot, new_shard_addrs, after_slots); + } + + #[test] + fn test_update_slot_range_slot_is_within_a_range() { + let test_slot = 4000; + let before_slots = vec![ + create_slot(0, 7999, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + let mut slot_map = SlotMap::new(before_slots, ReadFromReplicaStrategy::AlwaysFromPrimary); + let new_shard_addrs = slot_map + .shard_addrs_for_slot(8000) + .expect("Couldn't find shard address for slot"); + + let res = slot_map.update_slot_range(test_slot, new_shard_addrs.clone()); + assert!(res.is_ok(), "{res:?}"); + + let after_slots = vec![ + create_slot(0, test_slot - 1, "node1:6379", vec!["replica1:6379"]), + create_slot(test_slot, test_slot, "node2:6379", vec!["replica2:6379"]), + create_slot(test_slot + 1, 7999, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + assert_slot_map_and_shard_addrs(slot_map, test_slot, new_shard_addrs, after_slots); + } + + #[test] + fn test_update_slot_range_slot_is_not_covered_cant_merge_ranges() { + let test_slot = 7998; + let before_slots = vec![ + create_slot(0, 7000, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + let mut slot_map = SlotMap::new(before_slots, ReadFromReplicaStrategy::AlwaysFromPrimary); + let new_shard_addrs = slot_map + .shard_addrs_for_slot(8000) + .expect("Couldn't find shard address for slot"); + + let res = slot_map.update_slot_range(test_slot, new_shard_addrs.clone()); + assert!(res.is_ok(), "{res:?}"); + + let after_slots = vec![ + create_slot(0, 7000, "node1:6379", vec!["replica1:6379"]), + create_slot(test_slot, test_slot, "node2:6379", vec!["replica2:6379"]), + create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + assert_slot_map_and_shard_addrs(slot_map, test_slot, new_shard_addrs, after_slots); + } + + #[test] + fn test_update_slot_range_slot_is_not_covered_merge_with_next() { + let test_slot = 7999; + let before_slots = vec![ + create_slot(0, 7000, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + let mut slot_map = SlotMap::new(before_slots, ReadFromReplicaStrategy::AlwaysFromPrimary); + let new_shard_addrs = slot_map + .shard_addrs_for_slot(8000) + .expect("Couldn't find shard address for slot"); + + let res = slot_map.update_slot_range(test_slot, new_shard_addrs.clone()); + assert!(res.is_ok(), "{res:?}"); + + let after_slots = vec![ + create_slot(0, 7000, "node1:6379", vec!["replica1:6379"]), + create_slot(test_slot, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + assert_slot_map_and_shard_addrs(slot_map, test_slot, new_shard_addrs, after_slots); + } + + #[test] + fn test_update_slot_range_slot_is_not_covered_merge_with_prev() { + let test_slot = 7001; + let before_slots = vec![ + create_slot(0, 7000, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + let mut slot_map = SlotMap::new(before_slots, ReadFromReplicaStrategy::AlwaysFromPrimary); + let new_shard_addrs = slot_map + .shard_addrs_for_slot(7000) + .expect("Couldn't find shard address for slot"); + + let res = slot_map.update_slot_range(test_slot, new_shard_addrs.clone()); + assert!(res.is_ok(), "{res:?}"); + + let after_slots = vec![ + create_slot(0, test_slot, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + assert_slot_map_and_shard_addrs(slot_map, test_slot, new_shard_addrs, after_slots); + } + + #[test] + fn test_update_slot_range_same_shard_owner_no_change_needed() { + let test_slot = 7000; + let before_slots = vec![ + create_slot(0, 7999, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + let mut slot_map = SlotMap::new( + before_slots.clone(), + ReadFromReplicaStrategy::AlwaysFromPrimary, + ); + let new_shard_addrs = slot_map + .shard_addrs_for_slot(7000) + .expect("Couldn't find shard address for slot"); + + let res = slot_map.update_slot_range(test_slot, new_shard_addrs.clone()); + assert!(res.is_ok(), "{res:?}"); + + let after_slots = before_slots; + assert_slot_map_and_shard_addrs(slot_map, test_slot, new_shard_addrs, after_slots); + } + + #[test] + fn test_update_slot_range_max_slot_matches_end_range() { + let max_slot = 16383; + let before_slots = vec![ + create_slot(0, 7999, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + let mut slot_map = SlotMap::new( + before_slots.clone(), + ReadFromReplicaStrategy::AlwaysFromPrimary, + ); + let new_shard_addrs = slot_map + .shard_addrs_for_slot(7000) + .expect("Couldn't find shard address for slot"); + + let res = slot_map.update_slot_range(max_slot, new_shard_addrs.clone()); + assert!(res.is_ok(), "{res:?}"); + + let after_slots = vec![ + create_slot(0, 7999, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, max_slot - 1, "node2:6379", vec!["replica2:6379"]), + create_slot(max_slot, max_slot, "node1:6379", vec!["replica1:6379"]), + ]; + assert_slot_map_and_shard_addrs(slot_map, max_slot, new_shard_addrs, after_slots); + } + + #[test] + fn test_update_slot_range_max_slot_single_slot_range() { + let max_slot = 16383; + let before_slots = vec![ + create_slot(0, 16382, "node1:6379", vec!["replica1:6379"]), + create_slot(16383, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + let mut slot_map = SlotMap::new( + before_slots.clone(), + ReadFromReplicaStrategy::AlwaysFromPrimary, + ); + let new_shard_addrs = slot_map + .shard_addrs_for_slot(0) + .expect("Couldn't find shard address for slot"); + + let res = slot_map.update_slot_range(max_slot, new_shard_addrs.clone()); + assert!(res.is_ok(), "{res:?}"); + + let after_slots = vec![ + create_slot(0, max_slot - 1, "node1:6379", vec!["replica1:6379"]), + create_slot(max_slot, max_slot, "node1:6379", vec!["replica1:6379"]), + ]; + assert_slot_map_and_shard_addrs(slot_map, max_slot, new_shard_addrs, after_slots); + } + + #[test] + fn test_update_slot_range_min_slot_matches_start_range() { + let min_slot = 0; + let before_slots = vec![ + create_slot(0, 7999, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + let mut slot_map = SlotMap::new( + before_slots.clone(), + ReadFromReplicaStrategy::AlwaysFromPrimary, + ); + let new_shard_addrs = slot_map + .shard_addrs_for_slot(8000) + .expect("Couldn't find shard address for slot"); + + let res = slot_map.update_slot_range(min_slot, new_shard_addrs.clone()); + assert!(res.is_ok(), "{res:?}"); + + let after_slots = vec![ + create_slot(min_slot, min_slot, "node2:6379", vec!["replica2:6379"]), + create_slot(min_slot + 1, 7999, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + assert_slot_map_and_shard_addrs(slot_map, min_slot, new_shard_addrs, after_slots); + } + + #[test] + fn test_update_slot_range_min_slot_single_slot_range() { + let min_slot = 0; + let before_slots = vec![ + create_slot(0, 0, "node1:6379", vec!["replica1:6379"]), + create_slot(1, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + let mut slot_map = SlotMap::new( + before_slots.clone(), + ReadFromReplicaStrategy::AlwaysFromPrimary, + ); + let new_shard_addrs = slot_map + .shard_addrs_for_slot(1) + .expect("Couldn't find shard address for slot"); + + let res = slot_map.update_slot_range(min_slot, new_shard_addrs.clone()); + assert!(res.is_ok(), "{res:?}"); + + let after_slots = vec![ + create_slot(min_slot, min_slot, "node2:6379", vec!["replica2:6379"]), + create_slot(min_slot + 1, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + assert_slot_map_and_shard_addrs(slot_map, min_slot, new_shard_addrs, after_slots); + } } diff --git a/redis/src/cluster_topology.rs b/redis/src/cluster_topology.rs index e37284b18..9a5b5e99a 100644 --- a/redis/src/cluster_topology.rs +++ b/redis/src/cluster_topology.rs @@ -503,20 +503,17 @@ mod tests { } } - fn get_node_addr(name: &str, port: u16) -> ShardAddrs { - ShardAddrs::new(format!("{name}:{port}").into(), Vec::new()) + fn get_node_addr(name: &str, port: u16) -> Arc { + Arc::new(ShardAddrs::new(format!("{name}:{port}").into(), Vec::new())) } - fn collect_shard_addrs(slot_map: &SlotMap) -> Vec { - let mut shard_addrs: Vec = slot_map + fn collect_shard_addrs(slot_map: &SlotMap) -> Vec> { + let mut shard_addrs: Vec> = slot_map .nodes_map() .iter() .map(|map_item| { let shard_addrs = map_item.value(); - let addr_reader = shard_addrs - .read() - .expect("Failed to obtain ShardAddrs's read lock"); - addr_reader.clone() + shard_addrs.clone() }) .collect(); shard_addrs.sort_unstable(); @@ -543,7 +540,7 @@ mod tests { .unwrap(); let res = collect_shard_addrs(&topology_view); let node_1 = get_node_addr("node1", 6379); - let expected: Vec = vec![node_1]; + let expected = vec![node_1]; assert_eq!(res, expected); } @@ -586,7 +583,7 @@ mod tests { let res = collect_shard_addrs(&topology_view); let node_1 = get_node_addr("node1", 6379); let node_2 = get_node_addr("node2", 6380); - let expected: Vec = vec![node_1, node_2]; + let expected = vec![node_1, node_2]; assert_eq!(res, expected); } @@ -609,7 +606,7 @@ mod tests { let res = collect_shard_addrs(&topology_view); let node_1 = get_node_addr("node1", 6379); let node_2 = get_node_addr("node2", 6380); - let expected: Vec = vec![node_1, node_2]; + let expected = vec![node_1, node_2]; assert_eq!(res, expected); } @@ -633,7 +630,7 @@ mod tests { let res = collect_shard_addrs(&topology_view); let node_1 = get_node_addr("node3", 6381); let node_2 = get_node_addr("node4", 6382); - let expected: Vec = vec![node_1, node_2]; + let expected = vec![node_1, node_2]; assert_eq!(res, expected); } @@ -656,7 +653,7 @@ mod tests { .unwrap(); let res = collect_shard_addrs(&topology_view); let node_1 = get_node_addr("node1", 6379); - let expected: Vec = vec![node_1]; + let expected = vec![node_1]; assert_eq!(res, expected); } } diff --git a/redis/tests/support/mod.rs b/redis/tests/support/mod.rs index 24f786c2e..e5c4b9e22 100644 --- a/redis/tests/support/mod.rs +++ b/redis/tests/support/mod.rs @@ -642,7 +642,7 @@ pub fn build_keys_and_certs_for_tls(tempdir: &TempDir) -> TlsFilePaths { .arg("genrsa") .arg("-out") .arg(name) - .arg(&format!("{size}")) + .arg(format!("{size}")) .stdout(process::Stdio::null()) .stderr(process::Stdio::null()) .spawn() diff --git a/redis/tests/test_cluster_async.rs b/redis/tests/test_cluster_async.rs index 7d1249c3e..e5ab22345 100644 --- a/redis/tests/test_cluster_async.rs +++ b/redis/tests/test_cluster_async.rs @@ -1317,6 +1317,555 @@ mod cluster_async { assert_eq!(value, Ok(Some(123))); } + #[test] + fn test_async_cluster_update_slots_based_on_moved_error_indicates_slot_migration() { + // This test simulates the scenario where the client receives a MOVED error indicating that a key is now + // stored on the primary node of another shard. + // It ensures that the new slot now owned by the primary and its associated replicas. + let name = "test_async_cluster_update_slots_based_on_moved_error_indicates_slot_migration"; + let slots_config = vec![ + MockSlotRange { + primary_port: 6379, + replica_ports: vec![7000], + slot_range: (0..8000), + }, + MockSlotRange { + primary_port: 6380, + replica_ports: vec![7001], + slot_range: (8001..16380), + }, + ]; + + let moved_from_port = 6379; + let moved_to_port = 6380; + let new_shard_replica_port = 7001; + + // Tracking moved and replica requests for validation + let moved_requests = Arc::new(atomic::AtomicUsize::new(0)); + let cloned_moved_requests = moved_requests.clone(); + let replica_requests = Arc::new(atomic::AtomicUsize::new(0)); + let cloned_replica_requests = moved_requests.clone(); + + // Test key and slot + let key = "test"; + let key_slot = 6918; + + // Mock environment setup + let MockEnv { + runtime, + async_connection: mut connection, + handler: _handler, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]) + .slots_refresh_rate_limit(Duration::from_secs(1000000), 0) // Rate limiter to disable slot refresh + .read_from_replicas(), // Allow reads from replicas + name, + move |cmd: &[u8], port| { + if contains_slice(cmd, b"PING") + || contains_slice(cmd, b"SETNAME") + || contains_slice(cmd, b"READONLY") + { + return Err(Ok(Value::SimpleString("OK".into()))); + } + + if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") { + let slots = create_topology_from_config(name, slots_config.clone()); + return Err(Ok(slots)); + } + + if contains_slice(cmd, b"SET") { + if port == moved_to_port { + // Simulate primary OK response + Err(Ok(Value::SimpleString("OK".into()))) + } else if port == moved_from_port { + // Simulate MOVED error for other port + moved_requests.fetch_add(1, Ordering::Relaxed); + Err(parse_redis_value( + format!("-MOVED {key_slot} {name}:{moved_to_port}\r\n").as_bytes(), + )) + } else { + panic!("unexpected port for SET command: {port:?}.\n + Expected one of: moved_to_port={moved_to_port}, moved_from_port={moved_from_port}"); + } + } else if contains_slice(cmd, b"GET") { + if new_shard_replica_port == port { + // Simulate replica response for GET after slot migration + replica_requests.fetch_add(1, Ordering::Relaxed); + Err(Ok(Value::BulkString(b"123".to_vec()))) + } else { + panic!("unexpected port for GET command: {port:?}, Expected: {new_shard_replica_port:?}"); + } + } else { + panic!("unexpected command {cmd:?}") + } + }, + ); + + // First request: Trigger MOVED error and reroute + let value = runtime.block_on( + cmd("SET") + .arg(key) + .arg("bar") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Second request: Should be routed directly to the new primary node if the slots map is updated + let value = runtime.block_on( + cmd("SET") + .arg(key) + .arg("bar") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Handle slot migration scenario: Ensure the new shard's replicas are accessible + let value = runtime.block_on( + cmd("GET") + .arg(key) + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(123))); + assert_eq!(cloned_replica_requests.load(Ordering::Relaxed), 1); + + // Assert there was only a single MOVED error + assert_eq!(cloned_moved_requests.load(Ordering::Relaxed), 1); + } + + #[test] + fn test_async_cluster_update_slots_based_on_moved_error_indicates_failover() { + // This test simulates a failover scenario, where the client receives a MOVED error and the replica becomes the new primary. + // The test verifies that the client updates the slot mapping to promote the replica to the primary and routes future requests + // to the new primary, ensuring other slots in the shard are also handled by the new primary. + let name = "test_async_cluster_update_slots_based_on_moved_error_indicates_failover"; + let slots_config = vec![ + MockSlotRange { + primary_port: 6379, + replica_ports: vec![7001], + slot_range: (0..8000), + }, + MockSlotRange { + primary_port: 6380, + replica_ports: vec![7002], + slot_range: (8001..16380), + }, + ]; + + let moved_from_port = 6379; + let moved_to_port = 7001; + + // Tracking moved for validation + let moved_requests = Arc::new(atomic::AtomicUsize::new(0)); + let cloned_moved_requests = moved_requests.clone(); + + // Test key and slot + let key = "test"; + let key_slot = 6918; + + // Mock environment setup + let MockEnv { + runtime, + async_connection: mut connection, + handler: _handler, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]) + .slots_refresh_rate_limit(Duration::from_secs(1000000), 0), // Rate limiter to disable slot refresh + name, + move |cmd: &[u8], port| { + if contains_slice(cmd, b"PING") + || contains_slice(cmd, b"SETNAME") + || contains_slice(cmd, b"READONLY") + { + return Err(Ok(Value::SimpleString("OK".into()))); + } + + if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") { + let slots = create_topology_from_config(name, slots_config.clone()); + return Err(Ok(slots)); + } + + if contains_slice(cmd, b"SET") { + if port == moved_to_port { + // Simulate primary OK response + Err(Ok(Value::SimpleString("OK".into()))) + } else if port == moved_from_port { + // Simulate MOVED error for other port + moved_requests.fetch_add(1, Ordering::Relaxed); + Err(parse_redis_value( + format!("-MOVED {key_slot} {name}:{moved_to_port}\r\n").as_bytes(), + )) + } else { + panic!("unexpected port for SET command: {port:?}.\n + Expected one of: moved_to_port={moved_to_port}, moved_from_port={moved_from_port}"); + } + } else { + panic!("unexpected command {cmd:?}") + } + }, + ); + + // First request: Trigger MOVED error and reroute + let value = runtime.block_on( + cmd("SET") + .arg(key) + .arg("bar") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Second request: Should be routed directly to the new primary node if the slots map is updated + let value = runtime.block_on( + cmd("SET") + .arg(key) + .arg("bar") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Handle failover scenario: Ensure other slots in the same shard are updated to the new primary + let key_slot_1044 = "foo2"; + let value = runtime.block_on( + cmd("SET") + .arg(key_slot_1044) + .arg("bar2") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Assert there was only a single MOVED error + assert_eq!(cloned_moved_requests.load(Ordering::Relaxed), 1); + } + + #[test] + fn test_async_cluster_update_slots_based_on_moved_error_indicates_new_primary() { + // This test simulates the scenario where the client receives a MOVED error indicating that the key now belongs to + // an entirely new primary node that wasn't previously known. The test verifies that the client correctly adds the new + // primary node to its slot map and routes future requests to the new node. + let name = "test_async_cluster_update_slots_based_on_moved_error_indicates_new_primary"; + let slots_config = vec![ + MockSlotRange { + primary_port: 6379, + replica_ports: vec![], + slot_range: (0..8000), + }, + MockSlotRange { + primary_port: 6380, + replica_ports: vec![], + slot_range: (8001..16380), + }, + ]; + + let moved_from_port = 6379; + let moved_to_port = 6381; + + // Tracking moved for validation + let moved_requests = Arc::new(atomic::AtomicUsize::new(0)); + let cloned_moved_requests = moved_requests.clone(); + + // Test key and slot + let key = "test"; + let key_slot = 6918; + + // Mock environment setup + let MockEnv { + runtime, + async_connection: mut connection, + handler: _handler, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]) + .slots_refresh_rate_limit(Duration::from_secs(1000000), 0) // Rate limiter to disable slot refresh + .read_from_replicas(), // Allow reads from replicas + name, + move |cmd: &[u8], port| { + if contains_slice(cmd, b"PING") + || contains_slice(cmd, b"SETNAME") + || contains_slice(cmd, b"READONLY") + { + return Err(Ok(Value::SimpleString("OK".into()))); + } + + if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") { + let slots = create_topology_from_config(name, slots_config.clone()); + return Err(Ok(slots)); + } + + if contains_slice(cmd, b"SET") { + if port == moved_to_port { + // Simulate primary OK response + Err(Ok(Value::SimpleString("OK".into()))) + } else if port == moved_from_port { + // Simulate MOVED error for other port + moved_requests.fetch_add(1, Ordering::Relaxed); + Err(parse_redis_value( + format!("-MOVED {key_slot} {name}:{moved_to_port}\r\n").as_bytes(), + )) + } else { + panic!("unexpected port for SET command: {port:?}.\n + Expected one of: moved_to_port={moved_to_port}, moved_from_port={moved_from_port}"); + } + } else if contains_slice(cmd, b"GET") { + if moved_to_port == port { + // Simulate primary response for GET + Err(Ok(Value::BulkString(b"123".to_vec()))) + } else { + panic!( + "unexpected port for GET command: {port:?}, Expected: {moved_to_port}" + ); + } + } else { + panic!("unexpected command {cmd:?}") + } + }, + ); + + // First request: Trigger MOVED error and reroute + let value = runtime.block_on( + cmd("SET") + .arg(key) + .arg("bar") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Second request: Should be routed directly to the new primary node if the slots map is updated + let value = runtime.block_on( + cmd("SET") + .arg(key) + .arg("bar") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Third request: The new primary should have no replicas so it should be directed to it + let value = runtime.block_on( + cmd("GET") + .arg(key) + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(123))); + + // Assert there was only a single MOVED error + assert_eq!(cloned_moved_requests.load(Ordering::Relaxed), 1); + } + + #[test] + fn test_async_cluster_update_slots_based_on_moved_error_indicates_replica_of_different_shard() { + // This test simulates a scenario where the client receives a MOVED error indicating that a key + // has been moved to a replica in a different shard. The replica is then promoted to primary and + // no longer exists in the shard’s replica set. + // The test validates that the key gets correctly routed to the new primary and ensures that the + // shard updates its mapping accordingly, with only one MOVED error encountered during the process. + + let name = "test_async_cluster_update_slots_based_on_moved_error_indicates_replica_of_different_shard"; + let slots_config = vec![ + MockSlotRange { + primary_port: 6379, + replica_ports: vec![7000], + slot_range: (0..8000), + }, + MockSlotRange { + primary_port: 6380, + replica_ports: vec![7001], + slot_range: (8001..16380), + }, + ]; + + let moved_from_port = 6379; + let moved_to_port = 7001; + let primary_shard2 = 6380; + + // Tracking moved for validation + let moved_requests = Arc::new(atomic::AtomicUsize::new(0)); + let cloned_moved_requests = moved_requests.clone(); + + // Test key and slot of the first shard + let key = "test"; + let key_slot = 6918; + + // Test key of the second shard + let key_shard2 = "foo"; // slot 12182 + + // Mock environment setup + let MockEnv { + runtime, + async_connection: mut connection, + handler: _handler, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]) + .slots_refresh_rate_limit(Duration::from_secs(1000000), 0) // Rate limiter to disable slot refresh + .read_from_replicas(), // Allow reads from replicas + name, + move |cmd: &[u8], port| { + if contains_slice(cmd, b"PING") + || contains_slice(cmd, b"SETNAME") + || contains_slice(cmd, b"READONLY") + { + return Err(Ok(Value::SimpleString("OK".into()))); + } + + if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") { + let slots = create_topology_from_config(name, slots_config.clone()); + return Err(Ok(slots)); + } + + if contains_slice(cmd, b"SET") { + if port == moved_to_port { + // Simulate primary OK response + Err(Ok(Value::SimpleString("OK".into()))) + } else if port == moved_from_port { + // Simulate MOVED error for other port + moved_requests.fetch_add(1, Ordering::Relaxed); + Err(parse_redis_value( + format!("-MOVED {key_slot} {name}:{moved_to_port}\r\n").as_bytes(), + )) + } else { + panic!("unexpected port for SET command: {port:?}.\n + Expected one of: moved_to_port={moved_to_port}, moved_from_port={moved_from_port}"); + } + } else if contains_slice(cmd, b"GET") { + if port == primary_shard2 { + // Simulate second shard primary response for GET + Err(Ok(Value::BulkString(b"123".to_vec()))) + } else { + panic!("unexpected port for GET command: {port:?}, Expected: {primary_shard2:?}"); + } + } else { + panic!("unexpected command {cmd:?}") + } + }, + ); + + // First request: Trigger MOVED error and reroute + let value = runtime.block_on( + cmd("SET") + .arg(key) + .arg("bar") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Second request: Should be routed directly to the new primary node if the slots map is updated + let value = runtime.block_on( + cmd("SET") + .arg(key) + .arg("bar") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Third request: Verify that the promoted replica is no longer part of the second shard replicas by + // ensuring the response is received from the shard's primary + let value = runtime.block_on( + cmd("GET") + .arg(key_shard2) + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(123))); + + // Assert there was only a single MOVED error + assert_eq!(cloned_moved_requests.load(Ordering::Relaxed), 1); + } + + #[test] + fn test_async_cluster_update_slots_based_on_moved_error_no_change() { + // This test simulates a scenario where the client receives a MOVED error, but the new primary is the + // same as the old primary (no actual change). It ensures that no additional slot map + // updates are required and that the subsequent requests are still routed to the same primary node, with + // only one MOVED error encountered. + let name = "test_async_cluster_update_slots_based_on_moved_error_no_change"; + let slots_config = vec![ + MockSlotRange { + primary_port: 6379, + replica_ports: vec![7000], + slot_range: (0..8000), + }, + MockSlotRange { + primary_port: 6380, + replica_ports: vec![7001], + slot_range: (8001..16380), + }, + ]; + + let moved_from_port = 6379; + let moved_to_port = 6379; + + // Tracking moved for validation + let moved_requests = Arc::new(atomic::AtomicUsize::new(0)); + let cloned_moved_requests = moved_requests.clone(); + + // Test key and slot of the first shard + let key = "test"; + let key_slot = 6918; + + // Mock environment setup + let MockEnv { + runtime, + async_connection: mut connection, + handler: _handler, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]) + .slots_refresh_rate_limit(Duration::from_secs(1000000), 0), // Rate limiter to disable slot refresh + name, + move |cmd: &[u8], port| { + if contains_slice(cmd, b"PING") + || contains_slice(cmd, b"SETNAME") + || contains_slice(cmd, b"READONLY") + { + return Err(Ok(Value::SimpleString("OK".into()))); + } + + if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") { + let slots = create_topology_from_config(name, slots_config.clone()); + return Err(Ok(slots)); + } + + if contains_slice(cmd, b"SET") { + if port == moved_to_port { + if moved_requests.load(Ordering::Relaxed) == 0 { + moved_requests.fetch_add(1, Ordering::Relaxed); + Err(parse_redis_value( + format!("-MOVED {key_slot} {name}:{moved_to_port}\r\n").as_bytes(), + )) + } else { + Err(Ok(Value::SimpleString("OK".into()))) + } + } else { + panic!("unexpected port for SET command: {port:?}.\n + Expected one of: moved_to_port={moved_to_port}, moved_from_port={moved_from_port}"); + } + } else { + panic!("unexpected command {cmd:?}") + } + }, + ); + + // First request: Trigger MOVED error and reroute + let value = runtime.block_on( + cmd("SET") + .arg(key) + .arg("bar") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Second request: Should be still routed to the same primary node + let value = runtime.block_on( + cmd("SET") + .arg(key) + .arg("bar") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Assert there was only a single MOVED error + assert_eq!(cloned_moved_requests.load(Ordering::Relaxed), 1); + } + #[test] fn test_async_cluster_reconnect_even_with_zero_retries() { let name = "test_async_cluster_reconnect_even_with_zero_retries";