Skip to content

Commit 94259de

Browse files
committed
Add index field to PgState
1 parent 566791b commit 94259de

File tree

2 files changed

+44
-16
lines changed

2 files changed

+44
-16
lines changed

ractor/src/pg/mod.rs

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ impl ScopeGroupKey {
9090

9191
struct PgState {
9292
map: Arc<DashMap<ScopeGroupKey, HashMap<ActorId, ActorCell>>>,
93+
index: Arc<DashMap<ScopeName, Vec<GroupName>>>,
9394
listeners: Arc<DashMap<ScopeGroupKey, Vec<ActorCell>>>,
9495
}
9596

@@ -98,6 +99,7 @@ static PG_MONITOR: OnceCell<PgState> = OnceCell::new();
9899
fn get_monitor<'a>() -> &'a PgState {
99100
PG_MONITOR.get_or_init(|| PgState {
100101
map: Arc::new(DashMap::new()),
102+
index: Arc::new(DashMap::new()),
101103
listeners: Arc::new(DashMap::new()),
102104
})
103105
}
@@ -138,6 +140,18 @@ pub fn join_scoped(scope: ScopeName, group: GroupName, actors: Vec<ActorCell>) {
138140
vacancy.insert(map);
139141
}
140142
}
143+
match monitor.index.entry(scope.to_owned()) {
144+
Occupied(mut occupied) => {
145+
let oref = occupied.get_mut();
146+
if !oref.contains(&group) {
147+
oref.push(group.to_owned());
148+
}
149+
}
150+
Vacant(vacancy) => {
151+
vacancy.insert(vec![group.to_owned()]);
152+
}
153+
}
154+
141155
// notify supervisors
142156
if let Some(listeners) = monitor.listeners.get(&key) {
143157
for listener in listeners.value() {
@@ -189,7 +203,17 @@ pub fn leave_scoped(scope: ScopeName, group: GroupName, actors: Vec<ActorCell>)
189203
// if the scope and group tuple is empty, remove it
190204
if mut_ref.is_empty() {
191205
occupied.remove();
206+
207+
// remove the group and possibly the scope from the monitor's index
208+
if let Some(mut groups_in_scope) = monitor.index.get_mut(&scope) {
209+
groups_in_scope.retain(|group_name| group_name != &group);
210+
if groups_in_scope.is_empty() {
211+
drop(groups_in_scope);
212+
monitor.index.remove(&scope);
213+
}
214+
}
192215
}
216+
193217
if let Some(listeners) = monitor.listeners.get(&key) {
194218
for listener in listeners.value() {
195219
let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged(
@@ -221,16 +245,17 @@ pub fn leave_scoped(scope: ScopeName, group: GroupName, actors: Vec<ActorCell>)
221245
pub(crate) fn leave_all(actor: ActorId) {
222246
let pg_monitor = get_monitor();
223247
let map = pg_monitor.map.clone();
248+
// let index = pg_monitor.index.clone();
224249

225-
let mut empty_groups = vec![];
250+
let mut empty_scope_group_keys = vec![];
226251
let mut removal_events = HashMap::new();
227252

228253
for mut kv in map.iter_mut() {
229254
if let Some(actor_cell) = kv.value_mut().remove(&actor) {
230255
removal_events.insert(kv.key().clone(), actor_cell);
231256
}
232257
if kv.value().is_empty() {
233-
empty_groups.push(kv.key().clone());
258+
empty_scope_group_keys.push(kv.key().clone());
234259
}
235260
}
236261

@@ -267,8 +292,11 @@ pub(crate) fn leave_all(actor: ActorId) {
267292
}
268293

269294
// Cleanup empty groups
270-
for group in empty_groups {
271-
map.remove(&group);
295+
for scope_group_key in empty_scope_group_keys {
296+
map.remove(&scope_group_key);
297+
if let Some(mut groups_in_scope) = pg_monitor.index.get_mut(&scope_group_key.scope) {
298+
groups_in_scope.retain(|group| group != &scope_group_key.group);
299+
}
272300
}
273301
}
274302

@@ -360,18 +388,10 @@ pub fn which_groups() -> Vec<GroupName> {
360388
/// in `scope`
361389
pub fn which_scoped_groups(scope: &ScopeName) -> Vec<GroupName> {
362390
let monitor = get_monitor();
363-
monitor
364-
.map
365-
.iter()
366-
.filter_map(|kvp| {
367-
let key = kvp.key();
368-
if key.scope == *scope {
369-
Some(key.group.to_owned())
370-
} else {
371-
None
372-
}
373-
})
374-
.collect::<Vec<_>>()
391+
match monitor.index.get(scope) {
392+
Some(groups) => groups.to_owned(),
393+
None => vec![],
394+
}
375395
}
376396

377397
/// Returns a list of all known scope-group combinations.

ractor/src/pg/tests.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,10 @@ async fn test_actor_leaves_pg_group_manually() {
418418
let groups = pg::which_groups();
419419
assert!(!groups.contains(&group));
420420

421+
// pif-paf-poof the group is gone from the monitor's index!
422+
let scoped_groups = pg::which_scoped_groups(&group);
423+
assert!(!scoped_groups.contains(&group));
424+
421425
// members comes back empty
422426
let members = pg::get_members(&group);
423427
assert_eq!(0, members.len());
@@ -463,6 +467,10 @@ async fn test_actor_leaves_scope_manually() {
463467
let groups = pg::which_groups();
464468
assert!(!groups.contains(&group));
465469

470+
// pif-paf-poof the group is gone from the monitor's index!
471+
let scoped_groups = pg::which_scoped_groups(&group);
472+
assert!(!scoped_groups.contains(&group));
473+
466474
// members comes back empty
467475
let members = pg::get_scoped_members(&scope, &group);
468476
assert_eq!(0, members.len());

0 commit comments

Comments
 (0)