Skip to content

Commit 820dbc6

Browse files
committed
SlotMap refactor: Added new NodesMap, changed shard addresses to be shard between shard nodes and slot map values
1 parent eafaadb commit 820dbc6

File tree

7 files changed

+241
-206
lines changed

7 files changed

+241
-206
lines changed

redis/src/cluster.rs

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,14 @@
3535
//! .expire(key, 60).ignore()
3636
//! .query(&mut connection).unwrap();
3737
//! ```
38+
use rand::{seq::IteratorRandom, thread_rng, Rng};
3839
use std::cell::RefCell;
3940
use std::collections::HashSet;
4041
use std::str::FromStr;
42+
use std::sync::Arc;
4143
use std::thread;
4244
use std::time::Duration;
4345

44-
use rand::{seq::IteratorRandom, thread_rng, Rng};
45-
4646
use crate::cluster_pipeline::UNROUTABLE_ERROR;
4747
use crate::cluster_routing::{
4848
MultipleNodeRoutingInfo, ResponsePolicy, Routable, SingleNodeRoutingInfo, SlotAddr,
@@ -343,22 +343,20 @@ where
343343
let mut slots = self.slots.borrow_mut();
344344
*slots = self.create_new_slots()?;
345345

346-
let mut nodes = slots.values().flatten().collect::<Vec<_>>();
347-
nodes.sort_unstable();
348-
nodes.dedup();
349-
346+
let nodes = slots.all_node_addresses();
350347
let mut connections = self.connections.borrow_mut();
351348
*connections = nodes
352349
.into_iter()
353350
.filter_map(|addr| {
354-
if connections.contains_key(addr) {
355-
let mut conn = connections.remove(addr).unwrap();
351+
let addr = addr.to_string();
352+
if connections.contains_key(&addr) {
353+
let mut conn = connections.remove(&addr).unwrap();
356354
if conn.check_connection() {
357355
return Some((addr.to_string(), conn));
358356
}
359357
}
360358

361-
if let Ok(mut conn) = self.connect(addr) {
359+
if let Ok(mut conn) = self.connect(&addr) {
362360
if conn.check_connection() {
363361
return Some((addr.to_string(), conn));
364362
}
@@ -424,7 +422,7 @@ where
424422
if let Some(addr) = slots.slot_addr_for_route(route) {
425423
Ok((
426424
addr.to_string(),
427-
self.get_connection_by_addr(connections, addr)?,
425+
self.get_connection_by_addr(connections, &addr)?,
428426
))
429427
} else {
430428
// try a random node next. This is safe if slots are involved
@@ -495,13 +493,13 @@ where
495493
fn execute_on_all<'a>(
496494
&'a self,
497495
input: Input,
498-
addresses: HashSet<&'a str>,
496+
addresses: HashSet<Arc<String>>,
499497
connections: &'a mut HashMap<String, C>,
500-
) -> Vec<RedisResult<(&'a str, Value)>> {
498+
) -> Vec<RedisResult<(Arc<String>, Value)>> {
501499
addresses
502500
.into_iter()
503501
.map(|addr| {
504-
let connection = self.get_connection_by_addr(connections, addr)?;
502+
let connection = self.get_connection_by_addr(connections, &addr)?;
505503
match input {
506504
Input::Slice { cmd, routable: _ } => connection.req_packed_command(cmd),
507505
Input::Cmd(cmd) => connection.req_command(cmd),
@@ -526,16 +524,16 @@ where
526524
input: Input,
527525
slots: &'a mut SlotMap,
528526
connections: &'a mut HashMap<String, C>,
529-
) -> Vec<RedisResult<(&'a str, Value)>> {
530-
self.execute_on_all(input, slots.addresses_for_all_nodes(), connections)
527+
) -> Vec<RedisResult<(Arc<String>, Value)>> {
528+
self.execute_on_all(input, slots.all_node_addresses(), connections)
531529
}
532530

533531
fn execute_on_all_primaries<'a>(
534532
&'a self,
535533
input: Input,
536534
slots: &'a mut SlotMap,
537535
connections: &'a mut HashMap<String, C>,
538-
) -> Vec<RedisResult<(&'a str, Value)>> {
536+
) -> Vec<RedisResult<(Arc<String>, Value)>> {
539537
self.execute_on_all(input, slots.addresses_for_all_primaries(), connections)
540538
}
541539

@@ -545,7 +543,7 @@ where
545543
slots: &'a mut SlotMap,
546544
connections: &'a mut HashMap<String, C>,
547545
routes: &'b [(Route, Vec<usize>)],
548-
) -> Vec<RedisResult<(&'a str, Value)>>
546+
) -> Vec<RedisResult<(Arc<String>, Value)>>
549547
where
550548
'b: 'a,
551549
{
@@ -557,7 +555,7 @@ where
557555
ErrorKind::IoError,
558556
"Couldn't find connection",
559557
)))?;
560-
let connection = self.get_connection_by_addr(connections, addr)?;
558+
let connection = self.get_connection_by_addr(connections, &addr)?;
561559
let (_, indices) = routes.get(index).unwrap();
562560
let cmd =
563561
crate::cluster_routing::command_for_multi_slot_indices(&input, indices.iter());

redis/src/cluster_async/connections_container.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -147,18 +147,14 @@ where
147147

148148
/// Returns true if the address represents a known primary node.
149149
pub(crate) fn is_primary(&self, address: &String) -> bool {
150-
self.connection_for_address(address).is_some()
151-
&& self
152-
.slot_map
153-
.values()
154-
.any(|slot_addrs| slot_addrs.primary.as_str() == address)
150+
self.connection_for_address(address).is_some() && self.slot_map.is_primary(address)
155151
}
156152

157153
fn round_robin_read_from_replica(
158154
&self,
159155
slot_map_value: &SlotMapValue,
160156
) -> Option<ConnectionAndAddress<Connection>> {
161-
let addrs = &slot_map_value.addrs;
157+
let addrs = &slot_map_value.addrs.read().unwrap();
162158
let initial_index = slot_map_value
163159
.latest_used_replica
164160
.load(std::sync::atomic::Ordering::Relaxed);
@@ -185,7 +181,7 @@ where
185181

186182
fn lookup_route(&self, route: &Route) -> Option<ConnectionAndAddress<Connection>> {
187183
let slot_map_value = self.slot_map.slot_value_for_route(route)?;
188-
let addrs = &slot_map_value.addrs;
184+
let addrs = &slot_map_value.addrs.read().unwrap();
189185
if addrs.replicas.is_empty() {
190186
return self.connection_for_address(addrs.primary.as_str());
191187
}
@@ -232,7 +228,7 @@ where
232228
self.slot_map
233229
.addresses_for_all_primaries()
234230
.into_iter()
235-
.flat_map(|addr| self.connection_for_address(addr))
231+
.flat_map(|addr| self.connection_for_address(&addr))
236232
}
237233

238234
pub(crate) fn node_for_address(&self, address: &str) -> Option<ClusterNode<Connection>> {

redis/src/cluster_async/mod.rs

Lines changed: 11 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ where
396396
&self,
397397
slot: u16,
398398
slot_addr: SlotAddr,
399-
) -> Option<String> {
399+
) -> Option<Arc<String>> {
400400
self.conn_lock
401401
.read()
402402
.await
@@ -444,7 +444,7 @@ where
444444
}
445445

446446
// return slots of node
447-
pub(crate) async fn get_slots_of_address(&self, node_address: &str) -> Vec<u16> {
447+
pub(crate) async fn get_slots_of_address(&self, node_address: Arc<String>) -> Vec<u16> {
448448
self.conn_lock
449449
.read()
450450
.await
@@ -1020,7 +1020,6 @@ where
10201020
Self::refresh_slots_and_subscriptions_with_retries(
10211021
connection.inner.clone(),
10221022
&RefreshPolicy::NotThrottable,
1023-
None,
10241023
)
10251024
.await?;
10261025

@@ -1164,7 +1163,6 @@ where
11641163
if let Err(err) = Self::refresh_slots_and_subscriptions_with_retries(
11651164
inner.clone(),
11661165
&RefreshPolicy::Throttable,
1167-
None,
11681166
)
11691167
.await
11701168
{
@@ -1336,7 +1334,6 @@ where
13361334
async fn refresh_slots_and_subscriptions_with_retries(
13371335
inner: Arc<InnerCore<C>>,
13381336
policy: &RefreshPolicy,
1339-
moved_redirect: Option<RedirectNode>,
13401337
) -> RedisResult<()> {
13411338
let SlotRefreshState {
13421339
in_progress,
@@ -1388,10 +1385,6 @@ where
13881385
Self::refresh_slots(inner.clone(), curr_retry)
13891386
})
13901387
.await;
1391-
} else if moved_redirect.is_some() {
1392-
// Update relevant slots in the slots map based on the moved_redirect address,
1393-
// rather than refreshing all slots by querying the cluster nodes for their topology view.
1394-
Self::update_slots_for_redirect_change(inner.clone(), moved_redirect).await?;
13951388
}
13961389
in_progress.store(false, Ordering::Relaxed);
13971390

@@ -1400,15 +1393,6 @@ where
14001393
res
14011394
}
14021395

1403-
/// Update relevant slots in the slots map based on the moved_redirect address
1404-
pub(crate) async fn update_slots_for_redirect_change(
1405-
_inner: Arc<InnerCore<C>>,
1406-
_moved_redirect: Option<RedirectNode>,
1407-
) -> RedisResult<()> {
1408-
// TODO: Add implementation
1409-
Ok(())
1410-
}
1411-
14121396
/// Determines if the cluster topology has changed and refreshes slots and subscriptions if needed.
14131397
/// Returns `RedisResult` with `true` if changes were detected and slots were refreshed,
14141398
/// or `false` if no changes were found. Raises an error if refreshing the topology fails.
@@ -1418,7 +1402,7 @@ where
14181402
) -> RedisResult<bool> {
14191403
let topology_changed = Self::check_for_topology_diff(inner.clone()).await;
14201404
if topology_changed {
1421-
Self::refresh_slots_and_subscriptions_with_retries(inner.clone(), policy, None).await?;
1405+
Self::refresh_slots_and_subscriptions_with_retries(inner.clone(), policy).await?;
14221406
}
14231407
Ok(topology_changed)
14241408
}
@@ -1629,21 +1613,20 @@ where
16291613
.0?;
16301614
let connections = &*read_guard;
16311615
// Create a new connection vector of the found nodes
1632-
let mut nodes = new_slots.values().flatten().collect::<Vec<_>>();
1633-
nodes.sort_unstable();
1634-
nodes.dedup();
1616+
let nodes = new_slots.all_node_addresses();
16351617
let nodes_len = nodes.len();
16361618
let addresses_and_connections_iter = stream::iter(nodes)
16371619
.fold(
16381620
Vec::with_capacity(nodes_len),
16391621
|mut addrs_and_conns, addr| async move {
1622+
let addr = addr.to_string();
16401623
if let Some(node) = connections.node_for_address(addr.as_str()) {
16411624
addrs_and_conns.push((addr, Some(node)));
16421625
return addrs_and_conns;
16431626
}
16441627
// If it's a DNS endpoint, it could have been stored in the existing connections vector using the resolved IP address instead of the DNS endpoint's name.
16451628
// We shall check if a connection is already exists under the resolved IP name.
1646-
let (host, port) = match get_host_and_port_from_addr(addr) {
1629+
let (host, port) = match get_host_and_port_from_addr(&addr) {
16471630
Some((host, port)) => (host, port),
16481631
None => {
16491632
addrs_and_conns.push((addr, None));
@@ -1669,18 +1652,18 @@ where
16691652
|connections, (addr, node)| async {
16701653
let mut cluster_params = inner.cluster_params.clone();
16711654
let subs_guard = inner.subscriptions_by_address.read().await;
1672-
cluster_params.pubsub_subscriptions = subs_guard.get(addr).cloned();
1655+
cluster_params.pubsub_subscriptions = subs_guard.get(&addr).cloned();
16731656
drop(subs_guard);
16741657
let node = get_or_create_conn(
1675-
addr,
1658+
&addr,
16761659
node,
16771660
&cluster_params,
16781661
RefreshConnectionType::AllConnections,
16791662
inner.push_sender.clone(),
16801663
)
16811664
.await;
16821665
if let Ok(node) = node {
1683-
connections.0.insert(addr.into(), node);
1666+
connections.0.insert(addr, node);
16841667
}
16851668
connections
16861669
},
@@ -2024,7 +2007,6 @@ where
20242007
*future = Box::pin(Self::refresh_slots_and_subscriptions_with_retries(
20252008
self.inner.clone(),
20262009
&RefreshPolicy::Throttable,
2027-
None,
20282010
));
20292011
Poll::Ready(Err(err))
20302012
}
@@ -2271,12 +2253,12 @@ where
22712253

22722254
match ready!(self.poll_complete(cx)) {
22732255
PollFlushAction::None => return Poll::Ready(Ok(())),
2274-
PollFlushAction::RebuildSlots(moved_redirect) => {
2256+
PollFlushAction::RebuildSlots(_moved_redirect) => {
2257+
// TODO: Add logic to update the slots map based on the MOVED error
22752258
self.state = ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin(
22762259
ClusterConnInner::refresh_slots_and_subscriptions_with_retries(
22772260
self.inner.clone(),
22782261
&RefreshPolicy::Throttable,
2279-
moved_redirect,
22802262
),
22812263
)));
22822264
}

redis/src/cluster_routing.rs

Lines changed: 10 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
use std::cmp::min;
2-
use std::collections::HashMap;
3-
41
use crate::cluster_topology::get_slot;
52
use crate::cmd::{Arg, Cmd};
63
use crate::types::Value;
74
use crate::{ErrorKind, RedisResult};
5+
use std::cmp::min;
6+
use std::collections::HashMap;
87
use std::iter::Once;
8+
use std::sync::Arc;
99

1010
#[derive(Clone)]
1111
pub(crate) enum Redirect {
@@ -866,14 +866,6 @@ impl Slot {
866866
}
867867
}
868868

869-
pub fn start(&self) -> u16 {
870-
self.start
871-
}
872-
873-
pub fn end(&self) -> u16 {
874-
self.end
875-
}
876-
877869
#[allow(dead_code)] // used in tests
878870
pub(crate) fn master(&self) -> &str {
879871
self.master.as_str()
@@ -902,25 +894,15 @@ pub enum SlotAddr {
902894
/// which stores only the master and [optional] replica
903895
/// to avoid the need to choose a replica each time
904896
/// a command is executed
905-
#[derive(Debug, Eq, PartialEq)]
906-
pub(crate) struct SlotAddrs {
907-
pub(crate) primary: String,
908-
pub(crate) replicas: Vec<String>,
909-
}
910-
911-
impl SlotAddrs {
912-
pub(crate) fn new(primary: String, replicas: Vec<String>) -> Self {
913-
Self { primary, replicas }
914-
}
915-
916-
pub(crate) fn from_slot(slot: Slot) -> Self {
917-
SlotAddrs::new(slot.master, slot.replicas)
918-
}
897+
#[derive(Debug, Eq, PartialEq, Clone, PartialOrd, Ord)]
898+
pub(crate) struct ShardAddrs {
899+
pub(crate) primary: Arc<String>,
900+
pub(crate) replicas: Vec<Arc<String>>,
919901
}
920902

921-
impl<'a> IntoIterator for &'a SlotAddrs {
922-
type Item = &'a String;
923-
type IntoIter = std::iter::Chain<Once<&'a String>, std::slice::Iter<'a, String>>;
903+
impl<'a> IntoIterator for &'a ShardAddrs {
904+
type Item = &'a Arc<String>;
905+
type IntoIter = std::iter::Chain<Once<&'a Arc<String>>, std::slice::Iter<'a, Arc<String>>>;
924906

925907
fn into_iter(self) -> Self::IntoIter {
926908
std::iter::once(&self.primary).chain(self.replicas.iter())

0 commit comments

Comments
 (0)