Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SlotMap refactor: Added NodesMap, sharing shard addresses between shard nodes and slot map values. #185

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions redis/src/cluster_async/connections_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,21 +154,25 @@ where
&self,
slot_map_value: &SlotMapValue,
) -> Option<ConnectionAndAddress<Connection>> {
let addrs = &slot_map_value.addrs.read().unwrap();
let addrs = &slot_map_value
.addrs
.read()
.expect("Failed to obtain ShardAddrs's read lock");
let initial_index = slot_map_value
.latest_used_replica
.last_used_replica
.load(std::sync::atomic::Ordering::Relaxed);
let mut check_count = 0;
loop {
check_count += 1;

// Looped through all replicas, no connected replica was found.
if check_count > addrs.replicas.len() {
return self.connection_for_address(addrs.primary.as_str());
if check_count > addrs.replicas().len() {
return self.connection_for_address(addrs.primary().as_str());
}
let index = (initial_index + check_count) % addrs.replicas.len();
if let Some(connection) = self.connection_for_address(addrs.replicas[index].as_str()) {
let _ = slot_map_value.latest_used_replica.compare_exchange_weak(
let index = (initial_index + check_count) % addrs.replicas().len();
if let Some(connection) = self.connection_for_address(addrs.replicas()[index].as_str())
{
let _ = slot_map_value.last_used_replica.compare_exchange_weak(
initial_index,
index,
std::sync::atomic::Ordering::Relaxed,
Expand All @@ -181,16 +185,19 @@ where

fn lookup_route(&self, route: &Route) -> Option<ConnectionAndAddress<Connection>> {
let slot_map_value = self.slot_map.slot_value_for_route(route)?;
let addrs = &slot_map_value.addrs.read().unwrap();
if addrs.replicas.is_empty() {
return self.connection_for_address(addrs.primary.as_str());
let addrs = &slot_map_value
.addrs
.read()
.expect("Failed to obtain ShardAddrs's read lock");
if addrs.replicas().is_empty() {
return self.connection_for_address(addrs.primary().as_str());
}

match route.slot_addr() {
SlotAddr::Master => self.connection_for_address(addrs.primary.as_str()),
SlotAddr::Master => self.connection_for_address(addrs.primary().as_str()),
SlotAddr::ReplicaOptional => match self.read_from_replica_strategy {
ReadFromReplicaStrategy::AlwaysFromPrimary => {
self.connection_for_address(addrs.primary.as_str())
self.connection_for_address(addrs.primary().as_str())
}
ReadFromReplicaStrategy::RoundRobin => {
self.round_robin_read_from_replica(slot_map_value)
Expand Down
18 changes: 16 additions & 2 deletions redis/src/cluster_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -896,8 +896,22 @@ pub enum SlotAddr {
/// a command is executed
#[derive(Debug, Eq, PartialEq, Clone, PartialOrd, Ord)]
pub(crate) struct ShardAddrs {
eifrah-aws marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) primary: Arc<String>,
pub(crate) replicas: Vec<Arc<String>>,
primary: Arc<String>,
replicas: Vec<Arc<String>>,
}

impl ShardAddrs {
pub(crate) fn new(primary: Arc<String>, replicas: Vec<Arc<String>>) -> Self {
Self { primary, replicas }
}

pub(crate) fn primary(&self) -> Arc<String> {
self.primary.clone()
}

pub(crate) fn replicas(&self) -> &Vec<Arc<String>> {
self.replicas.as_ref()
}
}

impl<'a> IntoIterator for &'a ShardAddrs {
Expand Down
81 changes: 54 additions & 27 deletions redis/src/cluster_slotmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub(crate) type NodesMap = DashMap<Arc<String>, Arc<RwLock<ShardAddrs>>>;
pub(crate) struct SlotMapValue {
pub(crate) start: u16,
pub(crate) addrs: Arc<RwLock<ShardAddrs>>,
pub(crate) latest_used_replica: AtomicUsize,
pub(crate) last_used_replica: AtomicUsize,
}

#[derive(Debug, Default, Clone, PartialEq, Copy)]
Expand All @@ -29,7 +29,7 @@ pub(crate) enum ReadFromReplicaStrategy {
#[derive(Debug, Default)]
pub(crate) struct SlotMap {
pub(crate) slots: BTreeMap<u16, SlotMapValue>,
pub(crate) nodes_map: NodesMap,
nodes_map: NodesMap,
read_from_replica: ReadFromReplicaStrategy,
}

Expand All @@ -38,50 +38,57 @@ fn get_address_from_slot(
read_from_replica: ReadFromReplicaStrategy,
slot_addr: SlotAddr,
) -> Arc<String> {
let addrs = slot.addrs.read().unwrap();
if slot_addr == SlotAddr::Master || addrs.replicas.is_empty() {
return addrs.primary.clone();
let addrs = slot
.addrs
.read()
.expect("Failed to obtain ShardAddrs's read lock");
if slot_addr == SlotAddr::Master || addrs.replicas().is_empty() {
return addrs.primary();
}
match read_from_replica {
ReadFromReplicaStrategy::AlwaysFromPrimary => addrs.primary.clone(),
ReadFromReplicaStrategy::AlwaysFromPrimary => addrs.primary(),
ReadFromReplicaStrategy::RoundRobin => {
let index = slot
.latest_used_replica
.last_used_replica
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
% addrs.replicas.len();
addrs.replicas[index].clone()
% addrs.replicas().len();
addrs.replicas()[index].clone()
}
}
}

impl SlotMap {
pub(crate) fn new(slots: Vec<Slot>, read_from_replica: ReadFromReplicaStrategy) -> Self {
let mut slot_map = SlotMap {
pub(crate) fn new_with_read_strategy(read_from_replica: ReadFromReplicaStrategy) -> Self {
SlotMap {
slots: BTreeMap::new(),
nodes_map: DashMap::new(),
read_from_replica,
};
}
}

pub(crate) fn new(slots: Vec<Slot>, read_from_replica: ReadFromReplicaStrategy) -> Self {
let mut slot_map = SlotMap::new_with_read_strategy(read_from_replica);
let mut shard_id = 0;
for slot in slots {
let primary = Arc::new(slot.master);
let replicas: Vec<Arc<String>> = slot.replicas.into_iter().map(Arc::new).collect();

// Get the shard addresses if the primary is already in nodes_map;
// otherwise, create a new ShardAddrs and add it
let shard_addrs_arc = slot_map
.nodes_map
.entry(primary.clone())
.or_insert_with(|| {
shard_id += 1;
Arc::new(RwLock::new(ShardAddrs {
primary,
replicas: replicas.clone(),
}))
let replicas: Vec<Arc<String>> =
slot.replicas.into_iter().map(Arc::new).collect();
Arc::new(RwLock::new(ShardAddrs::new(primary, replicas)))
})
.clone();

let replicas_reader = shard_addrs_arc
.read()
.expect("Failed to obtain reader lock for ShardAddrs");
// Add all replicas to nodes_map with a reference to the same ShardAddrs if not already present
replicas.iter().for_each(|replica| {
replicas_reader.replicas().iter().for_each(|replica| {
slot_map
.nodes_map
.entry(replica.clone())
Expand All @@ -94,17 +101,26 @@ impl SlotMap {
SlotMapValue {
addrs: shard_addrs_arc.clone(),
start: slot.start,
latest_used_replica: AtomicUsize::new(0),
last_used_replica: AtomicUsize::new(0),
},
);
}

slot_map
}

#[allow(dead_code)] // used in tests

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "tests" (when placed on the same file) are allowed to access private members, so the test can access the property member directly

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's being used in other file, however i'm going to use it in the next PR then i'll remove the dead_code

pub(crate) fn nodes_map(&self) -> &NodesMap {
&self.nodes_map
}

pub fn is_primary(&self, address: &String) -> bool {
self.nodes_map.get(address).map_or(false, |shard_addrs| {
*shard_addrs.read().unwrap().primary == *address
*shard_addrs
.read()
.expect("Failed to obtain ShardAddrs's read lock")
.primary()
== *address
})
}

Expand Down Expand Up @@ -133,7 +149,11 @@ impl SlotMap {
.iter()
.map(|map_item| {
let shard_addrs = map_item.value();
shard_addrs.read().unwrap().primary.clone()
shard_addrs
.read()
.expect("Failed to obtain ShardAddrs's read lock")
.primary()
.clone()
})
.collect()
}
Expand Down Expand Up @@ -165,9 +185,12 @@ impl SlotMap {
self.slots
.iter()
.filter_map(|(end, slot_value)| {
let addr_reader = slot_value.addrs.read().unwrap();
if addr_reader.primary == node_address
|| addr_reader.replicas.contains(&node_address)
let addr_reader = slot_value
.addrs
.read()
.expect("Failed to obtain ShardAddrs's read lock");
if addr_reader.primary() == node_address
|| addr_reader.replicas().contains(&node_address)
{
Some(slot_value.start..(*end + 1))
} else {
Expand Down Expand Up @@ -201,13 +224,17 @@ impl Display for SlotMap {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "Strategy: {:?}. Slot mapping:", self.read_from_replica)?;
for (end, slot_map_value) in self.slots.iter() {
let shard_addrs = slot_map_value
.addrs
.read()
.expect("Failed to obtain ShardAddrs's read lock");
writeln!(
f,
"({}-{}): primary: {}, replicas: {:?}",
slot_map_value.start,
end,
slot_map_value.addrs.read().unwrap().primary,
slot_map_value.addrs.read().unwrap().replicas
shard_addrs.primary(),
shard_addrs.replicas()
)?;
}
Ok(())
Expand Down
11 changes: 5 additions & 6 deletions redis/src/cluster_topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,19 +504,18 @@ mod tests {
}

fn get_node_addr(name: &str, port: u16) -> ShardAddrs {
ShardAddrs {
primary: format!("{name}:{port}").into(),
replicas: Vec::new(),
}
ShardAddrs::new(format!("{name}:{port}").into(), Vec::new())
}

fn collect_shard_addrs(slot_map: &SlotMap) -> Vec<ShardAddrs> {
let mut shard_addrs: Vec<ShardAddrs> = slot_map
.nodes_map
.nodes_map()
.iter()
.map(|map_item| {
let shard_addrs = map_item.value();
let addr_reader = shard_addrs.read().unwrap();
let addr_reader = shard_addrs
.read()
.expect("Failed to obtain ShardAddrs's read lock");
addr_reader.clone()
})
.collect();
Expand Down