Skip to content

Commit c48287a

Browse files
authored
Added logic to update the slot map based on MOVED errors (#186)
1 parent 8d05984 commit c48287a

File tree

9 files changed

+1556
-107
lines changed

9 files changed

+1556
-107
lines changed

redis/benches/bench_basic.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,13 @@ fn bench_simple_getsetdel_async(b: &mut Bencher) {
3131
redis::cmd("SET")
3232
.arg(key)
3333
.arg(42)
34-
.query_async(&mut con)
34+
.query_async::<_, Option<Value>>(&mut con)
3535
.await?;
3636
let _: isize = redis::cmd("GET").arg(key).query_async(&mut con).await?;
37-
redis::cmd("DEL").arg(key).query_async(&mut con).await?;
37+
redis::cmd("DEL")
38+
.arg(key)
39+
.query_async::<_, Option<Value>>(&mut con)
40+
.await?;
3841
Ok::<_, RedisError>(())
3942
})
4043
.unwrap()

redis/benches/bench_cluster_async.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,16 @@ fn bench_cluster_async(
2121
runtime
2222
.block_on(async {
2323
let key = "test_key";
24-
redis::cmd("SET").arg(key).arg(42).query_async(con).await?;
24+
redis::cmd("SET")
25+
.arg(key)
26+
.arg(42)
27+
.query_async::<_, Option<redis::Value>>(con)
28+
.await?;
2529
let _: isize = redis::cmd("GET").arg(key).query_async(con).await?;
26-
redis::cmd("DEL").arg(key).query_async(con).await?;
30+
redis::cmd("DEL")
31+
.arg(key)
32+
.query_async::<_, Option<redis::Value>>(con)
33+
.await?;
2734

2835
Ok::<_, RedisError>(())
2936
})

redis/src/cluster_async/connections_container.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
use crate::cluster_async::ConnectionFuture;
2-
use crate::cluster_routing::{Route, SlotAddr};
2+
use crate::cluster_routing::{Route, ShardAddrs, SlotAddr};
33
use crate::cluster_slotmap::{ReadFromReplicaStrategy, SlotMap, SlotMapValue};
44
use crate::cluster_topology::TopologyHash;
55
use dashmap::DashMap;
66
use futures::FutureExt;
77
use rand::seq::IteratorRandom;
88
use std::net::IpAddr;
9+
use std::sync::Arc;
910

1011
/// A struct that encapsulates a network connection along with its associated IP address.
1112
#[derive(Clone, Eq, PartialEq, Debug)]
@@ -137,6 +138,16 @@ where
137138
}
138139
}
139140

141+
/// Returns an iterator over the nodes in the `slot_map`, yielding pairs of the node address and its associated shard addresses.
142+
pub(crate) fn slot_map_nodes(
143+
&self,
144+
) -> impl Iterator<Item = (Arc<String>, Arc<ShardAddrs>)> + '_ {
145+
self.slot_map
146+
.nodes_map()
147+
.iter()
148+
.map(|item| (item.key().clone(), item.value().clone()))
149+
}
150+
140151
// Extends the current connection map with the provided one
141152
pub(crate) fn extend_connection_map(
142153
&mut self,
@@ -154,10 +165,7 @@ where
154165
&self,
155166
slot_map_value: &SlotMapValue,
156167
) -> Option<ConnectionAndAddress<Connection>> {
157-
let addrs = &slot_map_value
158-
.addrs
159-
.read()
160-
.expect("Failed to obtain ShardAddrs's read lock");
168+
let addrs = &slot_map_value.addrs;
161169
let initial_index = slot_map_value
162170
.last_used_replica
163171
.load(std::sync::atomic::Ordering::Relaxed);
@@ -185,10 +193,7 @@ where
185193

186194
fn lookup_route(&self, route: &Route) -> Option<ConnectionAndAddress<Connection>> {
187195
let slot_map_value = self.slot_map.slot_value_for_route(route)?;
188-
let addrs = &slot_map_value
189-
.addrs
190-
.read()
191-
.expect("Failed to obtain ShardAddrs's read lock");
196+
let addrs = &slot_map_value.addrs;
192197
if addrs.replicas().is_empty() {
193198
return self.connection_for_address(addrs.primary().as_str());
194199
}

redis/src/cluster_async/mod.rs

Lines changed: 126 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pub mod testing {
3030
pub use super::connections_logic::*;
3131
}
3232
use crate::{
33-
cluster_routing::{Routable, RoutingInfo},
33+
cluster_routing::{Routable, RoutingInfo, ShardUpdateResult},
3434
cluster_slotmap::SlotMap,
3535
cluster_topology::SLOT_SIZE,
3636
cmd,
@@ -629,19 +629,17 @@ impl From<String> for OperationTarget {
629629
#[derive(Clone, Debug)]
630630
pub(crate) struct RedirectNode {
631631
/// The address of the redirect node.
632-
pub _address: String,
632+
pub address: String,
633633
/// The slot of the redirect node.
634-
pub _slot: u16,
634+
pub slot: u16,
635635
}
636636

637637
impl RedirectNode {
638-
/// This function expects an `Option` containing a tuple with a string slice and a u16.
639-
/// The tuple represents an address and a slot, respectively. If the input is `Some`,
640-
/// the function converts the address to a `String` and constructs a `RedirectNode`.
638+
/// Constructs a `RedirectNode` from an optional tuple containing an address and a slot number.
641639
pub(crate) fn from_option_tuple(option: Option<(&str, u16)>) -> Option<Self> {
642640
option.map(|(address, slot)| RedirectNode {
643-
_address: address.to_string(),
644-
_slot: slot,
641+
address: address.to_string(),
642+
slot,
645643
})
646644
}
647645
}
@@ -757,6 +755,10 @@ pin_project! {
757755
#[pin]
758756
sleep: BoxFuture<'static, ()>,
759757
},
758+
UpdateMoved {
759+
#[pin]
760+
future: BoxFuture<'static, RedisResult<()>>,
761+
},
760762
}
761763
}
762764

@@ -819,8 +821,25 @@ impl<C> Future for Request<C> {
819821
}
820822
.into();
821823
}
824+
RequestStateProj::UpdateMoved { future } => {
825+
if let Err(err) = ready!(future.poll(cx)) {
826+
// Updating the slot map based on the MOVED error is an optimization.
827+
// If it fails, proceed by retrying the request with the redirected node,
828+
// and allow the slot refresh task to correct the slot map.
829+
info!(
830+
"Failed to update the slot map based on the received MOVED error.
831+
Error: {err:?}"
832+
);
833+
}
834+
if let Some(request) = self.project().request.take() {
835+
return Next::Retry { request }.into();
836+
} else {
837+
return Next::Done.into();
838+
}
839+
}
822840
_ => panic!("Request future must be Some"),
823841
};
842+
824843
match ready!(future.poll(cx)) {
825844
Ok(item) => {
826845
self.respond(Ok(item));
@@ -1683,6 +1702,77 @@ where
16831702
Ok(())
16841703
}
16851704

1705+
/// Handles MOVED errors by updating the client's slot and node mappings based on the new primary's role:
1706+
/// /// Updates the slot and node mappings in response to a MOVED error.
1707+
/// This function handles various scenarios based on the new primary's role:
1708+
///
1709+
/// 1. **No Change**: If the new primary is already the current slot owner, no updates are needed.
1710+
/// 2. **Failover**: If the new primary is a replica within the same shard (indicating a failover),
1711+
/// the slot ownership is updated by promoting the replica to the primary in the existing shard addresses.
1712+
/// 3. **Slot Migration**: If the new primary is an existing primary in another shard, this indicates a slot migration,
1713+
/// and the slot mapping is updated to point to the new shard addresses.
1714+
/// 4. **Replica Moved to a Different Shard**: If the new primary is a replica in a different shard, it can be due to:
1715+
/// - The replica became the primary of its shard after a failover, with new slots migrated to it.
1716+
/// - The replica has moved to a different shard as the primary.
1717+
/// Since further information is unknown, the replica is removed from its original shard and added as the primary of a new shard.
1718+
/// 5. **New Node**: If the new primary is unknown, it is added as a new node in a new shard, possibly indicating scale-out.
1719+
///
1720+
/// # Arguments
1721+
/// * `inner` - Shared reference to InnerCore containing connection and slot state.
1722+
/// * `slot` - The slot number reported as moved.
1723+
/// * `new_primary` - The address of the node now responsible for the slot.
1724+
///
1725+
/// # Returns
1726+
/// * `RedisResult<()>` indicating success or failure in updating slot mappings.
1727+
async fn update_upon_moved_error(
1728+
inner: Arc<InnerCore<C>>,
1729+
slot: u16,
1730+
new_primary: Arc<String>,
1731+
) -> RedisResult<()> {
1732+
let mut connections_container = inner.conn_lock.write().await;
1733+
let curr_shard_addrs = connections_container.slot_map.shard_addrs_for_slot(slot);
1734+
// Check if the new primary is part of the current shard and update if required
1735+
if let Some(curr_shard_addrs) = curr_shard_addrs {
1736+
match curr_shard_addrs.attempt_shard_role_update(new_primary.clone()) {
1737+
// Scenario 1: No changes needed as the new primary is already the current slot owner.
1738+
// Scenario 2: Failover occurred and the new primary was promoted from a replica.
1739+
ShardUpdateResult::AlreadyPrimary | ShardUpdateResult::Promoted => return Ok(()),
1740+
// The node was not found in this shard, proceed with further scenarios.
1741+
ShardUpdateResult::NodeNotFound => {}
1742+
}
1743+
}
1744+
1745+
// Scenario 3 & 4: Check if the new primary exists in other shards
1746+
let mut nodes_iter = connections_container.slot_map_nodes();
1747+
for (node_addr, shard_addrs_arc) in &mut nodes_iter {
1748+
if node_addr == new_primary {
1749+
let is_existing_primary = shard_addrs_arc.primary().eq(&new_primary);
1750+
if is_existing_primary {
1751+
// Scenario 3: Slot Migration - The new primary is an existing primary in another shard
1752+
// Update the associated addresses for `slot` to `shard_addrs`.
1753+
drop(nodes_iter);
1754+
return connections_container
1755+
.slot_map
1756+
.update_slot_range(slot, shard_addrs_arc.clone());
1757+
} else {
1758+
// Scenario 4: The MOVED error redirects to `new_primary` which is known as a replica in a shard that doesn’t own `slot`.
1759+
// Remove the replica from its existing shard and treat it as a new node in a new shard.
1760+
shard_addrs_arc.remove_replica(new_primary.clone())?;
1761+
drop(nodes_iter);
1762+
return connections_container
1763+
.slot_map
1764+
.add_new_primary(slot, new_primary);
1765+
}
1766+
}
1767+
}
1768+
1769+
// Scenario 5: New Node - The new primary is not present in the current slots map, add it as a primary of a new shard.
1770+
drop(nodes_iter);
1771+
connections_container
1772+
.slot_map
1773+
.add_new_primary(slot, new_primary)
1774+
}
1775+
16861776
async fn execute_on_multiple_nodes<'a>(
16871777
cmd: &'a Arc<Cmd>,
16881778
routing: &'a MultipleNodeRoutingInfo,
@@ -2104,25 +2194,37 @@ where
21042194
sleep_duration,
21052195
moved_redirect,
21062196
} => {
2107-
poll_flush_action = poll_flush_action
2108-
.change_state(PollFlushAction::RebuildSlots(moved_redirect));
2109-
if let Some(request) = request {
2110-
let future: RequestState<
2111-
Pin<Box<dyn Future<Output = OperationResult> + Send>>,
2112-
> = match sleep_duration {
2113-
Some(sleep_duration) => RequestState::Sleep {
2197+
poll_flush_action =
2198+
poll_flush_action.change_state(PollFlushAction::RebuildSlots);
2199+
let future: Option<
2200+
RequestState<Pin<Box<dyn Future<Output = OperationResult> + Send>>>,
2201+
> = if let Some(moved_redirect) = moved_redirect {
2202+
Some(RequestState::UpdateMoved {
2203+
future: Box::pin(ClusterConnInner::update_upon_moved_error(
2204+
self.inner.clone(),
2205+
moved_redirect.slot,
2206+
moved_redirect.address.into(),
2207+
)),
2208+
})
2209+
} else if let Some(ref request) = request {
2210+
match sleep_duration {
2211+
Some(sleep_duration) => Some(RequestState::Sleep {
21142212
sleep: boxed_sleep(sleep_duration),
2115-
},
2116-
None => RequestState::Future {
2213+
}),
2214+
None => Some(RequestState::Future {
21172215
future: Box::pin(Self::try_request(
21182216
request.info.clone(),
21192217
self.inner.clone(),
21202218
)),
2121-
},
2122-
};
2219+
}),
2220+
}
2221+
} else {
2222+
None
2223+
};
2224+
if let Some(future) = future {
21232225
self.in_flight_requests.push(Box::pin(Request {
21242226
retry_params: self.inner.cluster_params.retry_params.clone(),
2125-
request: Some(request),
2227+
request,
21262228
future,
21272229
}));
21282230
}
@@ -2175,7 +2277,7 @@ where
21752277

21762278
enum PollFlushAction {
21772279
None,
2178-
RebuildSlots(Option<RedirectNode>),
2280+
RebuildSlots,
21792281
Reconnect(Vec<String>),
21802282
ReconnectFromInitialConnections,
21812283
}
@@ -2190,9 +2292,8 @@ impl PollFlushAction {
21902292
PollFlushAction::ReconnectFromInitialConnections
21912293
}
21922294

2193-
(PollFlushAction::RebuildSlots(moved_redirect), _)
2194-
| (_, PollFlushAction::RebuildSlots(moved_redirect)) => {
2195-
PollFlushAction::RebuildSlots(moved_redirect)
2295+
(PollFlushAction::RebuildSlots, _) | (_, PollFlushAction::RebuildSlots) => {
2296+
PollFlushAction::RebuildSlots
21962297
}
21972298

21982299
(PollFlushAction::Reconnect(mut addrs), PollFlushAction::Reconnect(new_addrs)) => {
@@ -2253,8 +2354,7 @@ where
22532354

22542355
match ready!(self.poll_complete(cx)) {
22552356
PollFlushAction::None => return Poll::Ready(Ok(())),
2256-
PollFlushAction::RebuildSlots(_moved_redirect) => {
2257-
// TODO: Add logic to update the slots map based on the MOVED error
2357+
PollFlushAction::RebuildSlots => {
22582358
self.state = ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin(
22592359
ClusterConnInner::refresh_slots_and_subscriptions_with_retries(
22602360
self.inner.clone(),

0 commit comments

Comments
 (0)