Skip to content

Commit

Permalink
Add index field to PgState for scope->group mapping
Browse files Browse the repository at this point in the history
And added test cases for leavers
  • Loading branch information
leonqadirie committed Dec 12, 2023
1 parent 9382d21 commit 70e033a
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 16 deletions.
52 changes: 36 additions & 16 deletions ractor/src/pg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ impl ScopeGroupKey {

struct PgState {
map: Arc<DashMap<ScopeGroupKey, HashMap<ActorId, ActorCell>>>,
index: Arc<DashMap<ScopeName, Vec<GroupName>>>,
listeners: Arc<DashMap<ScopeGroupKey, Vec<ActorCell>>>,
}

Expand All @@ -98,6 +99,7 @@ static PG_MONITOR: OnceCell<PgState> = OnceCell::new();
fn get_monitor<'a>() -> &'a PgState {
PG_MONITOR.get_or_init(|| PgState {
map: Arc::new(DashMap::new()),
index: Arc::new(DashMap::new()),
listeners: Arc::new(DashMap::new()),
})
}
Expand Down Expand Up @@ -138,6 +140,18 @@ pub fn join_scoped(scope: ScopeName, group: GroupName, actors: Vec<ActorCell>) {
vacancy.insert(map);
}
}
match monitor.index.entry(scope.to_owned()) {
Occupied(mut occupied) => {
let oref = occupied.get_mut();
if !oref.contains(&group) {
oref.push(group.to_owned());
}
}
Vacant(vacancy) => {
vacancy.insert(vec![group.to_owned()]);
}
}

// notify supervisors
if let Some(listeners) = monitor.listeners.get(&key) {
for listener in listeners.value() {
Expand Down Expand Up @@ -189,7 +203,17 @@ pub fn leave_scoped(scope: ScopeName, group: GroupName, actors: Vec<ActorCell>)
// if the scope and group tuple is empty, remove it
if mut_ref.is_empty() {
occupied.remove();

// remove the group and possibly the scope from the monitor's index
if let Some(mut groups_in_scope) = monitor.index.get_mut(&scope) {
groups_in_scope.retain(|group_name| group_name != &group);
if groups_in_scope.is_empty() {
drop(groups_in_scope);
monitor.index.remove(&scope);
}
}

Check warning on line 214 in ractor/src/pg/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/pg/mod.rs#L214

Added line #L214 was not covered by tests
}

if let Some(listeners) = monitor.listeners.get(&key) {
for listener in listeners.value() {
let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged(
Expand Down Expand Up @@ -221,16 +245,17 @@ pub fn leave_scoped(scope: ScopeName, group: GroupName, actors: Vec<ActorCell>)
pub(crate) fn leave_all(actor: ActorId) {
let pg_monitor = get_monitor();
let map = pg_monitor.map.clone();
// let index = pg_monitor.index.clone();

let mut empty_groups = vec![];
let mut empty_scope_group_keys = vec![];
let mut removal_events = HashMap::new();

for mut kv in map.iter_mut() {
if let Some(actor_cell) = kv.value_mut().remove(&actor) {
removal_events.insert(kv.key().clone(), actor_cell);
}
if kv.value().is_empty() {
empty_groups.push(kv.key().clone());
empty_scope_group_keys.push(kv.key().clone());
}
}

Expand Down Expand Up @@ -267,8 +292,11 @@ pub(crate) fn leave_all(actor: ActorId) {
}

// Cleanup empty groups
for group in empty_groups {
map.remove(&group);
for scope_group_key in empty_scope_group_keys {
map.remove(&scope_group_key);
if let Some(mut groups_in_scope) = pg_monitor.index.get_mut(&scope_group_key.scope) {
groups_in_scope.retain(|group| group != &scope_group_key.group);
}
}
}

Expand Down Expand Up @@ -360,18 +388,10 @@ pub fn which_groups() -> Vec<GroupName> {
/// in `scope`
pub fn which_scoped_groups(scope: &ScopeName) -> Vec<GroupName> {
let monitor = get_monitor();
monitor
.map
.iter()
.filter_map(|kvp| {
let key = kvp.key();
if key.scope == *scope {
Some(key.group.to_owned())
} else {
None
}
})
.collect::<Vec<_>>()
match monitor.index.get(scope) {
Some(groups) => groups.to_owned(),
None => vec![],
}
}

/// Returns a list of all known scope-group combinations.
Expand Down
8 changes: 8 additions & 0 deletions ractor/src/pg/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,10 @@ async fn test_actor_leaves_pg_group_manually() {
let groups = pg::which_groups();
assert!(!groups.contains(&group));

// pif-paf-poof the group is gone from the monitor's index!
let scoped_groups = pg::which_scoped_groups(&group);
assert!(!scoped_groups.contains(&group));

// members comes back empty
let members = pg::get_members(&group);
assert_eq!(0, members.len());
Expand Down Expand Up @@ -463,6 +467,10 @@ async fn test_actor_leaves_scope_manually() {
let groups = pg::which_groups();
assert!(!groups.contains(&group));

// pif-paf-poof the group is gone from the monitor's index!
let scoped_groups = pg::which_scoped_groups(&group);
assert!(!scoped_groups.contains(&group));

// members comes back empty
let members = pg::get_scoped_members(&scope, &group);
assert_eq!(0, members.len());
Expand Down

0 comments on commit 70e033a

Please sign in to comment.