Skip to content

Commit 65c9a5d

Browse files
committed
SlotMap refactor: Added NodesMap, sharing shard addresses between shard nodes and slot map values. (#185)
* SlotMap refactor: Added new NodesMap, changed shard addresses to be shard between shard nodes and slot map values
1 parent 391b54d commit 65c9a5d

File tree

7 files changed

+307
-213
lines changed

7 files changed

+307
-213
lines changed

redis/src/cluster.rs

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,14 @@
3535
//! .expire(key, 60).ignore()
3636
//! .query(&mut connection).unwrap();
3737
//! ```
38+
use rand::{seq::IteratorRandom, thread_rng, Rng};
3839
use std::cell::RefCell;
3940
use std::collections::HashSet;
4041
use std::str::FromStr;
42+
use std::sync::Arc;
4143
use std::thread;
4244
use std::time::Duration;
4345

44-
use rand::{seq::IteratorRandom, thread_rng, Rng};
45-
4646
use crate::cluster_pipeline::UNROUTABLE_ERROR;
4747
use crate::cluster_routing::{
4848
MultipleNodeRoutingInfo, ResponsePolicy, Routable, SingleNodeRoutingInfo, SlotAddr,
@@ -343,22 +343,20 @@ where
343343
let mut slots = self.slots.borrow_mut();
344344
*slots = self.create_new_slots()?;
345345

346-
let mut nodes = slots.values().flatten().collect::<Vec<_>>();
347-
nodes.sort_unstable();
348-
nodes.dedup();
349-
346+
let nodes = slots.all_node_addresses();
350347
let mut connections = self.connections.borrow_mut();
351348
*connections = nodes
352349
.into_iter()
353350
.filter_map(|addr| {
354-
if connections.contains_key(addr) {
355-
let mut conn = connections.remove(addr).unwrap();
351+
let addr = addr.to_string();
352+
if connections.contains_key(&addr) {
353+
let mut conn = connections.remove(&addr).unwrap();
356354
if conn.check_connection() {
357355
return Some((addr.to_string(), conn));
358356
}
359357
}
360358

361-
if let Ok(mut conn) = self.connect(addr) {
359+
if let Ok(mut conn) = self.connect(&addr) {
362360
if conn.check_connection() {
363361
return Some((addr.to_string(), conn));
364362
}
@@ -424,7 +422,7 @@ where
424422
if let Some(addr) = slots.slot_addr_for_route(route) {
425423
Ok((
426424
addr.to_string(),
427-
self.get_connection_by_addr(connections, addr)?,
425+
self.get_connection_by_addr(connections, &addr)?,
428426
))
429427
} else {
430428
// try a random node next. This is safe if slots are involved
@@ -495,13 +493,13 @@ where
495493
fn execute_on_all<'a>(
496494
&'a self,
497495
input: Input,
498-
addresses: HashSet<&'a str>,
496+
addresses: HashSet<Arc<String>>,
499497
connections: &'a mut HashMap<String, C>,
500-
) -> Vec<RedisResult<(&'a str, Value)>> {
498+
) -> Vec<RedisResult<(Arc<String>, Value)>> {
501499
addresses
502500
.into_iter()
503501
.map(|addr| {
504-
let connection = self.get_connection_by_addr(connections, addr)?;
502+
let connection = self.get_connection_by_addr(connections, &addr)?;
505503
match input {
506504
Input::Slice { cmd, routable: _ } => connection.req_packed_command(cmd),
507505
Input::Cmd(cmd) => connection.req_command(cmd),
@@ -526,16 +524,16 @@ where
526524
input: Input,
527525
slots: &'a mut SlotMap,
528526
connections: &'a mut HashMap<String, C>,
529-
) -> Vec<RedisResult<(&'a str, Value)>> {
530-
self.execute_on_all(input, slots.addresses_for_all_nodes(), connections)
527+
) -> Vec<RedisResult<(Arc<String>, Value)>> {
528+
self.execute_on_all(input, slots.all_node_addresses(), connections)
531529
}
532530

533531
fn execute_on_all_primaries<'a>(
534532
&'a self,
535533
input: Input,
536534
slots: &'a mut SlotMap,
537535
connections: &'a mut HashMap<String, C>,
538-
) -> Vec<RedisResult<(&'a str, Value)>> {
536+
) -> Vec<RedisResult<(Arc<String>, Value)>> {
539537
self.execute_on_all(input, slots.addresses_for_all_primaries(), connections)
540538
}
541539

@@ -545,7 +543,7 @@ where
545543
slots: &'a mut SlotMap,
546544
connections: &'a mut HashMap<String, C>,
547545
routes: &'b [(Route, Vec<usize>)],
548-
) -> Vec<RedisResult<(&'a str, Value)>>
546+
) -> Vec<RedisResult<(Arc<String>, Value)>>
549547
where
550548
'b: 'a,
551549
{
@@ -557,7 +555,7 @@ where
557555
ErrorKind::IoError,
558556
"Couldn't find connection",
559557
)))?;
560-
let connection = self.get_connection_by_addr(connections, addr)?;
558+
let connection = self.get_connection_by_addr(connections, &addr)?;
561559
let (_, indices) = routes.get(index).unwrap();
562560
let cmd =
563561
crate::cluster_routing::command_for_multi_slot_indices(&input, indices.iter());

redis/src/cluster_async/connections_container.rs

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -147,32 +147,32 @@ where
147147

148148
/// Returns true if the address represents a known primary node.
149149
pub(crate) fn is_primary(&self, address: &String) -> bool {
150-
self.connection_for_address(address).is_some()
151-
&& self
152-
.slot_map
153-
.values()
154-
.any(|slot_addrs| slot_addrs.primary.as_str() == address)
150+
self.connection_for_address(address).is_some() && self.slot_map.is_primary(address)
155151
}
156152

157153
fn round_robin_read_from_replica(
158154
&self,
159155
slot_map_value: &SlotMapValue,
160156
) -> Option<ConnectionAndAddress<Connection>> {
161-
let addrs = &slot_map_value.addrs;
157+
let addrs = &slot_map_value
158+
.addrs
159+
.read()
160+
.expect("Failed to obtain ShardAddrs's read lock");
162161
let initial_index = slot_map_value
163-
.latest_used_replica
162+
.last_used_replica
164163
.load(std::sync::atomic::Ordering::Relaxed);
165164
let mut check_count = 0;
166165
loop {
167166
check_count += 1;
168167

169168
// Looped through all replicas, no connected replica was found.
170-
if check_count > addrs.replicas.len() {
171-
return self.connection_for_address(addrs.primary.as_str());
169+
if check_count > addrs.replicas().len() {
170+
return self.connection_for_address(addrs.primary().as_str());
172171
}
173-
let index = (initial_index + check_count) % addrs.replicas.len();
174-
if let Some(connection) = self.connection_for_address(addrs.replicas[index].as_str()) {
175-
let _ = slot_map_value.latest_used_replica.compare_exchange_weak(
172+
let index = (initial_index + check_count) % addrs.replicas().len();
173+
if let Some(connection) = self.connection_for_address(addrs.replicas()[index].as_str())
174+
{
175+
let _ = slot_map_value.last_used_replica.compare_exchange_weak(
176176
initial_index,
177177
index,
178178
std::sync::atomic::Ordering::Relaxed,
@@ -185,16 +185,19 @@ where
185185

186186
fn lookup_route(&self, route: &Route) -> Option<ConnectionAndAddress<Connection>> {
187187
let slot_map_value = self.slot_map.slot_value_for_route(route)?;
188-
let addrs = &slot_map_value.addrs;
189-
if addrs.replicas.is_empty() {
190-
return self.connection_for_address(addrs.primary.as_str());
188+
let addrs = &slot_map_value
189+
.addrs
190+
.read()
191+
.expect("Failed to obtain ShardAddrs's read lock");
192+
if addrs.replicas().is_empty() {
193+
return self.connection_for_address(addrs.primary().as_str());
191194
}
192195

193196
match route.slot_addr() {
194-
SlotAddr::Master => self.connection_for_address(addrs.primary.as_str()),
197+
SlotAddr::Master => self.connection_for_address(addrs.primary().as_str()),
195198
SlotAddr::ReplicaOptional => match self.read_from_replica_strategy {
196199
ReadFromReplicaStrategy::AlwaysFromPrimary => {
197-
self.connection_for_address(addrs.primary.as_str())
200+
self.connection_for_address(addrs.primary().as_str())
198201
}
199202
ReadFromReplicaStrategy::RoundRobin => {
200203
self.round_robin_read_from_replica(slot_map_value)
@@ -232,7 +235,7 @@ where
232235
self.slot_map
233236
.addresses_for_all_primaries()
234237
.into_iter()
235-
.flat_map(|addr| self.connection_for_address(addr))
238+
.flat_map(|addr| self.connection_for_address(&addr))
236239
}
237240

238241
pub(crate) fn node_for_address(&self, address: &str) -> Option<ClusterNode<Connection>> {

redis/src/cluster_async/mod.rs

Lines changed: 11 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ where
438438
&self,
439439
slot: u16,
440440
slot_addr: SlotAddr,
441-
) -> Option<String> {
441+
) -> Option<Arc<String>> {
442442
self.conn_lock
443443
.read()
444444
.await
@@ -486,7 +486,7 @@ where
486486
}
487487

488488
// return slots of node
489-
pub(crate) async fn get_slots_of_address(&self, node_address: &str) -> Vec<u16> {
489+
pub(crate) async fn get_slots_of_address(&self, node_address: Arc<String>) -> Vec<u16> {
490490
self.conn_lock
491491
.read()
492492
.await
@@ -1090,7 +1090,6 @@ where
10901090
Self::refresh_slots_and_subscriptions_with_retries(
10911091
connection.inner.clone(),
10921092
&RefreshPolicy::NotThrottable,
1093-
None,
10941093
)
10951094
.await?;
10961095

@@ -1250,7 +1249,6 @@ where
12501249
if let Err(err) = Self::refresh_slots_and_subscriptions_with_retries(
12511250
inner.clone(),
12521251
&RefreshPolicy::Throttable,
1253-
None,
12541252
)
12551253
.await
12561254
{
@@ -1492,7 +1490,6 @@ where
14921490
async fn refresh_slots_and_subscriptions_with_retries(
14931491
inner: Arc<InnerCore<C>>,
14941492
policy: &RefreshPolicy,
1495-
moved_redirect: Option<RedirectNode>,
14961493
) -> RedisResult<()> {
14971494
let SlotRefreshState {
14981495
in_progress,
@@ -1544,10 +1541,6 @@ where
15441541
Self::refresh_slots(inner.clone(), curr_retry)
15451542
})
15461543
.await;
1547-
} else if moved_redirect.is_some() {
1548-
// Update relevant slots in the slots map based on the moved_redirect address,
1549-
// rather than refreshing all slots by querying the cluster nodes for their topology view.
1550-
Self::update_slots_for_redirect_change(inner.clone(), moved_redirect).await?;
15511544
}
15521545
in_progress.store(false, Ordering::Relaxed);
15531546

@@ -1556,15 +1549,6 @@ where
15561549
res
15571550
}
15581551

1559-
/// Update relevant slots in the slots map based on the moved_redirect address
1560-
pub(crate) async fn update_slots_for_redirect_change(
1561-
_inner: Arc<InnerCore<C>>,
1562-
_moved_redirect: Option<RedirectNode>,
1563-
) -> RedisResult<()> {
1564-
// TODO: Add implementation
1565-
Ok(())
1566-
}
1567-
15681552
/// Determines if the cluster topology has changed and refreshes slots and subscriptions if needed.
15691553
/// Returns `RedisResult` with `true` if changes were detected and slots were refreshed,
15701554
/// or `false` if no changes were found. Raises an error if refreshing the topology fails.
@@ -1574,7 +1558,7 @@ where
15741558
) -> RedisResult<bool> {
15751559
let topology_changed = Self::check_for_topology_diff(inner.clone()).await;
15761560
if topology_changed {
1577-
Self::refresh_slots_and_subscriptions_with_retries(inner.clone(), policy, None).await?;
1561+
Self::refresh_slots_and_subscriptions_with_retries(inner.clone(), policy).await?;
15781562
}
15791563
Ok(topology_changed)
15801564
}
@@ -1797,21 +1781,20 @@ where
17971781
.0?;
17981782
let connections = &*read_guard;
17991783
// Create a new connection vector of the found nodes
1800-
let mut nodes = new_slots.values().flatten().collect::<Vec<_>>();
1801-
nodes.sort_unstable();
1802-
nodes.dedup();
1784+
let nodes = new_slots.all_node_addresses();
18031785
let nodes_len = nodes.len();
18041786
let addresses_and_connections_iter = stream::iter(nodes)
18051787
.fold(
18061788
Vec::with_capacity(nodes_len),
18071789
|mut addrs_and_conns, addr| async move {
1790+
let addr = addr.to_string();
18081791
if let Some(node) = connections.node_for_address(addr.as_str()) {
18091792
addrs_and_conns.push((addr, Some(node)));
18101793
return addrs_and_conns;
18111794
}
18121795
// If it's a DNS endpoint, it could have been stored in the existing connections vector using the resolved IP address instead of the DNS endpoint's name.
18131796
// We shall check if a connection is already exists under the resolved IP name.
1814-
let (host, port) = match get_host_and_port_from_addr(addr) {
1797+
let (host, port) = match get_host_and_port_from_addr(&addr) {
18151798
Some((host, port)) => (host, port),
18161799
None => {
18171800
addrs_and_conns.push((addr, None));
@@ -1837,18 +1820,18 @@ where
18371820
|connections, (addr, node)| async {
18381821
let mut cluster_params = inner.cluster_params.clone();
18391822
let subs_guard = inner.subscriptions_by_address.read().await;
1840-
cluster_params.pubsub_subscriptions = subs_guard.get(addr).cloned();
1823+
cluster_params.pubsub_subscriptions = subs_guard.get(&addr).cloned();
18411824
drop(subs_guard);
18421825
let node = get_or_create_conn(
1843-
addr,
1826+
&addr,
18441827
node,
18451828
&cluster_params,
18461829
RefreshConnectionType::AllConnections,
18471830
inner.glide_connection_options.clone(),
18481831
)
18491832
.await;
18501833
if let Ok(node) = node {
1851-
connections.0.insert(addr.into(), node);
1834+
connections.0.insert(addr, node);
18521835
}
18531836
connections
18541837
},
@@ -2192,7 +2175,6 @@ where
21922175
*future = Box::pin(Self::refresh_slots_and_subscriptions_with_retries(
21932176
self.inner.clone(),
21942177
&RefreshPolicy::Throttable,
2195-
None,
21962178
));
21972179
Poll::Ready(Err(err))
21982180
}
@@ -2439,12 +2421,12 @@ where
24392421

24402422
match ready!(self.poll_complete(cx)) {
24412423
PollFlushAction::None => return Poll::Ready(Ok(())),
2442-
PollFlushAction::RebuildSlots(moved_redirect) => {
2424+
PollFlushAction::RebuildSlots(_moved_redirect) => {
2425+
// TODO: Add logic to update the slots map based on the MOVED error
24432426
self.state = ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin(
24442427
ClusterConnInner::refresh_slots_and_subscriptions_with_retries(
24452428
self.inner.clone(),
24462429
&RefreshPolicy::Throttable,
2447-
moved_redirect,
24482430
),
24492431
)));
24502432
}

0 commit comments

Comments
 (0)