From 2940034e9bcc92bfa3f00383c87d84fa6bd435ac Mon Sep 17 00:00:00 2001 From: Sean Lawlor Date: Thu, 26 Jan 2023 10:55:18 -0500 Subject: [PATCH] PG Groups lifecycle + PID lifecycle synchronization --- ractor-cluster/src/build.rs | 4 +- ractor-cluster/src/node/client.rs | 2 +- ractor-cluster/src/node/node_session.rs | 282 +++++++++++++++--- ractor-cluster/src/protocol/control.proto | 47 ++- ractor/benches/actor.rs | 14 +- ractor/examples/monte_carlo.rs | 2 +- ractor/examples/output_port.rs | 11 +- ractor/examples/philosophers.rs | 14 +- ractor/examples/supervisor.rs | 13 +- .../src/actor/actor_cell/actor_properties.rs | 4 + ractor/src/actor/actor_cell/mod.rs | 33 +- ractor/src/actor/errors.rs | 7 +- ractor/src/actor/messages.rs | 24 +- ractor/src/actor/tests/mod.rs | 7 + ractor/src/actor/tests/supervisor.rs | 8 +- ractor/src/actor_id.rs | 4 +- ractor/src/lib.rs | 6 +- ractor/src/pg/mod.rs | 27 ++ ractor/src/pg/tests.rs | 60 +++- ractor/src/registry/mod.rs | 39 +-- ractor/src/registry/pid_registry.rs | 129 ++++++++ ractor/src/registry/tests.rs | 200 +++++++++++++ ractor/src/rpc/tests.rs | 4 +- 23 files changed, 780 insertions(+), 161 deletions(-) create mode 100644 ractor/src/registry/pid_registry.rs diff --git a/ractor-cluster/src/build.rs b/ractor-cluster/src/build.rs index d337a3fc..a06dabf7 100644 --- a/ractor-cluster/src/build.rs +++ b/ractor-cluster/src/build.rs @@ -17,8 +17,8 @@ fn build_protobufs() { let mut protobuf_files = Vec::with_capacity(PROTOBUF_FILES.len()); for file in PROTOBUF_FILES.iter() { - let proto_file = format!("{}/{}.proto", PROTOBUF_BASE_DIRECTORY, file); - println!("cargo:rerun-if-changed={}", proto_file); + let proto_file = format!("{PROTOBUF_BASE_DIRECTORY}/{file}.proto"); + println!("cargo:rerun-if-changed={proto_file}"); protobuf_files.push(proto_file); } diff --git a/ractor-cluster/src/node/client.rs b/ractor-cluster/src/node/client.rs index 2d6f8fca..9f73fe79 100644 --- a/ractor-cluster/src/node/client.rs +++ b/ractor-cluster/src/node/client.rs @@ -26,7 +26,7 @@ pub enum ClientConnectError { impl Display for ClientConnectError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self) + write!(f, "{self:?}") } } diff --git a/ractor-cluster/src/node/node_session.rs b/ractor-cluster/src/node/node_session.rs index 2d3b47be..db11eb99 100644 --- a/ractor-cluster/src/node/node_session.rs +++ b/ractor-cluster/src/node/node_session.rs @@ -12,8 +12,10 @@ use std::convert::TryInto; use std::net::SocketAddr; use ractor::message::SerializedMessage; +use ractor::pg::GroupChangeMessage; +use ractor::registry::PidLifecycleEvent; use ractor::rpc::CallResult; -use ractor::{Actor, ActorId, ActorRef, SupervisionEvent}; +use ractor::{Actor, ActorId, ActorRef, SpawnErr, SupervisionEvent}; use rand::Rng; use tokio::time::Duration; @@ -301,14 +303,18 @@ impl NodeSession { if let Some(msg) = message.msg { match msg { node_protocol::node_message::Msg::Cast(cast_args) => { - if let Some(actor) = ractor::registry::get_pid(ActorId::Local(cast_args.to)) { + if let Some(actor) = + ractor::registry::where_is_pid(ActorId::Local(cast_args.to)) + { let _ = actor.send_serialized(SerializedMessage::Cast(cast_args.what)); } } node_protocol::node_message::Msg::Call(call_args) => { let to = call_args.to; let tag = call_args.tag; - if let Some(actor) = ractor::registry::get_pid(ActorId::Local(call_args.to)) { + if let Some(actor) = + ractor::registry::where_is_pid(ActorId::Local(call_args.to)) + { let (tx, rx) = ractor::concurrency::oneshot(); // send off the transmission in the serialized format, letting the message's own deserialization handle @@ -327,6 +333,7 @@ impl NodeSession { } // kick off a background task to reply to the channel request, threading the tag and who to reply to + #[allow(clippy::let_underscore_future)] let _ = ractor::concurrency::spawn(async move { if let Some(timeout) = maybe_timeout { if let Ok(Ok(result)) = @@ -384,42 +391,30 @@ impl NodeSession { ) { if let Some(msg) = message.msg { match msg { - control_protocol::control_message::Msg::Spawn(spawn_actor) => { - let actor = crate::remote_actor::RemoteActor { - session: myself.clone(), - }; - - match actor - .spawn_linked( - spawn_actor.name, - spawn_actor.id, - self.node_id, - myself.get_cell(), - ) - .await - { - Ok((actor, _)) => { - state.remote_actors.insert(spawn_actor.id, actor); - } - Err(spawn_err) => { + control_protocol::control_message::Msg::Spawn(spawned_actors) => { + for net_actor in spawned_actors.actors { + if let Err(spawn_err) = self + .get_or_spawn_remote_actor( + &myself, + net_actor.name, + net_actor.pid, + state, + ) + .await + { log::error!("Failed to spawn remote actor with {}", spawn_err); } } } control_protocol::control_message::Msg::Terminate(termination) => { - if let Some(actor) = state.remote_actors.remove(&termination.id) { - actor.stop(Some(format!("Remote: {:?}", termination))); - if termination.is_panic { - log::info!( - "Remote actor {} panicked with {:?}", - actor.get_id(), - termination.panic_reason - ); - } else { + for pid in termination.ids { + if let Some(actor) = state.remote_actors.remove(&pid) { + actor.stop(Some("remote".to_string())); log::debug!( - "Remote actor {} exited with {:?}", - actor.get_id(), - termination.panic_reason + "Actor {} on node {} exited, terminating local `RemoteActor` {}", + pid, + self.node_id, + actor.get_id() ); } } @@ -458,23 +453,135 @@ impl NodeSession { // schedule next ping state.schedule_tcp_ping(); } + control_protocol::control_message::Msg::PgJoin(join) => { + let mut cells = vec![]; + for control_protocol::Actor { name, pid } in join.actors { + match self + .get_or_spawn_remote_actor(&myself, name, pid, state) + .await + { + Ok(actor) => { + cells.push(actor.get_cell()); + } + Err(spawn_err) => { + log::error!("Failed to spawn remote actor with '{}'", spawn_err); + } + } + } + // join the remote actors to the local PG group + if !cells.is_empty() { + ractor::pg::join(join.group, cells); + } + } + control_protocol::control_message::Msg::PgLeave(leave) => { + let mut cells = vec![]; + for control_protocol::Actor { name, pid } in leave.actors { + match self + .get_or_spawn_remote_actor(&myself, name, pid, state) + .await + { + Ok(actor) => { + cells.push(actor.get_cell()); + } + Err(spawn_err) => { + log::error!("Failed to spawn remote actor with '{}'", spawn_err); + } + } + } + // join the remote actors to the local PG group + if !cells.is_empty() { + ractor::pg::leave(leave.group, cells); + } + } } } } /// Called once the session is authenticated - fn after_authenticated(&self, state: &mut NodeSessionState) { + fn after_authenticated(&self, myself: ActorRef, state: &mut NodeSessionState) { log::info!( "Session authenticated on NodeSession {} - ({:?})", self.node_id, state.peer_addr ); - // startup the ping sending operation + // startup the ping healthcheck activity state.schedule_tcp_ping(); - // TODO: startup control message processing and additionally subscribe to process - // group changes + // setup PID monitoring + ractor::registry::pid_registry::monitor(myself.get_cell()); + + // Scan all PIDs and spawn them on the remote host + let pids = ractor::registry::pid_registry::get_all_pids() + .into_iter() + .filter(|act| act.supports_remoting()) + .map(|a| control_protocol::Actor { + name: a.get_name(), + pid: a.get_id().pid(), + }) + .collect::>(); + if !pids.is_empty() { + let msg = control_protocol::ControlMessage { + msg: Some(control_protocol::control_message::Msg::Spawn( + control_protocol::Spawn { actors: pids }, + )), + }; + state.tcp_send_control(msg); + } + + // setup PG monitoring + ractor::pg::monitor( + ractor::pg::ALL_GROUPS_NOTIFICATION.to_string(), + myself.get_cell(), + ); + + // Scan all PG groups + synchronize them + let groups = ractor::pg::which_groups(); + for group in groups { + let local_members = ractor::pg::get_local_members(&group) + .into_iter() + .filter(|v| v.supports_remoting()) + .map(|act| control_protocol::Actor { + name: act.get_name(), + pid: act.get_id().get_pid(), + }) + .collect::>(); + if !local_members.is_empty() { + let control_message = control_protocol::ControlMessage { + msg: Some(control_protocol::control_message::Msg::PgJoin( + control_protocol::PgJoin { + group, + actors: local_members, + }, + )), + }; + state.tcp_send_control(control_message); + } + } + // TODO: subscribe to the named registry and synchronize it? What happes on a name clash? How would this be handled + // if both sessions had a "node_a" for example? Which resolves, local only? + } + + /// Get a given remote actor, or spawn it if it doesn't exist. + async fn get_or_spawn_remote_actor( + &self, + myself: &ActorRef, + actor_name: Option, + actor_pid: u64, + state: &mut NodeSessionState, + ) -> Result, SpawnErr> { + match state.remote_actors.get(&actor_pid) { + Some(actor) => Ok(actor.clone()), + _ => { + let (remote_actor, _) = crate::remote_actor::RemoteActor { + session: myself.clone(), + } + .spawn_linked(actor_name, actor_pid, self.node_id, myself.get_cell()) + .await?; + state.remote_actors.insert(actor_pid, remote_actor.clone()); + Ok(remote_actor) + } + } } } @@ -527,6 +634,7 @@ impl NodeSessionState { fn schedule_tcp_ping(&self) { if let Some(tcp) = &self.tcp { + #[allow(clippy::let_underscore_future)] let _ = tcp.send_after(Self::get_send_delay(), || { let ping = control_protocol::ControlMessage { msg: Some(control_protocol::control_message::Msg::Ping( @@ -573,6 +681,13 @@ impl Actor for NodeSession { } } + async fn post_stop(&self, myself: ActorRef, _state: &mut Self::State) { + ractor::pg::demonitor( + ractor::pg::ALL_GROUPS_NOTIFICATION.to_string(), + myself.get_id(), + ); + } + async fn handle(&self, myself: ActorRef, message: Self::Msg, state: &mut Self::State) { match message { super::SessionMessage::SetTcpStream(stream) if state.tcp.is_none() => { @@ -607,9 +722,9 @@ impl Actor for NodeSession { match network_message { crate::protocol::meta::network_message::Message::Auth(auth_message) => { let p_state = state.auth.is_ok(); - self.handle_auth(state, auth_message, myself).await; + self.handle_auth(state, auth_message, myself.clone()).await; if !p_state && state.auth.is_ok() { - self.after_authenticated(state); + self.after_authenticated(myself, state); } } crate::protocol::meta::network_message::Message::Node(node_message) => { @@ -639,6 +754,7 @@ impl Actor for NodeSession { state: &mut Self::State, ) { match message { + SupervisionEvent::ActorStarted(_) => {} SupervisionEvent::ActorPanicked(actor, msg) => { if state.is_tcp_actor(actor.get_id()) { log::error!( @@ -659,13 +775,11 @@ impl Actor for NodeSession { // NOTE: This is a legitimate panic of the `RemoteActor`, not the actor on the remote machine panicking (which // is handled by the remote actor's supervisor). Therefore we should re-spawn the actor let pid = actor.get_id().get_pid(); - let (remote_actor, _) = crate::remote_actor::RemoteActor { - session: myself.clone(), - } - .spawn_linked(actor.get_name(), pid, self.node_id, myself.get_cell()) - .await - .expect("Failed to spawn remote actor"); - state.remote_actors.insert(pid, remote_actor); + let name = actor.get_name(); + let _ = self + .get_or_spawn_remote_actor(&myself, name, pid, state) + .await + .expect("Failed to restart remote actor"); } else { log::error!("NodeSesion {:?} received an unknown child panic superivision message from {} - '{}'", state.name, @@ -693,9 +807,79 @@ impl Actor for NodeSession { ); } } - _ => { - //no-op - } + SupervisionEvent::ProcessGroupChanged(change) => match change { + GroupChangeMessage::Join(group, actors) => { + let filtered = actors + .into_iter() + .filter(|act| act.supports_remoting()) + .map(|act| control_protocol::Actor { + name: act.get_name(), + pid: act.get_id().get_pid(), + }) + .collect::>(); + if !filtered.is_empty() { + let msg = control_protocol::ControlMessage { + msg: Some(control_protocol::control_message::Msg::PgJoin( + control_protocol::PgJoin { + group, + actors: filtered, + }, + )), + }; + state.tcp_send_control(msg); + } + } + GroupChangeMessage::Leave(group, actors) => { + let filtered = actors + .into_iter() + .filter(|act| act.supports_remoting()) + .map(|act| control_protocol::Actor { + name: act.get_name(), + pid: act.get_id().get_pid(), + }) + .collect::>(); + if !filtered.is_empty() { + let msg = control_protocol::ControlMessage { + msg: Some(control_protocol::control_message::Msg::PgLeave( + control_protocol::PgLeave { + group, + actors: filtered, + }, + )), + }; + state.tcp_send_control(msg); + } + } + }, + SupervisionEvent::PidLifecycleEvent(pid) => match pid { + PidLifecycleEvent::Spawn(who) => { + if who.supports_remoting() { + let msg = control_protocol::ControlMessage { + msg: Some(control_protocol::control_message::Msg::Spawn( + control_protocol::Spawn { + actors: vec![control_protocol::Actor { + pid: who.get_id().get_pid(), + name: who.get_name(), + }], + }, + )), + }; + state.tcp_send_control(msg); + } + } + PidLifecycleEvent::Terminate(who) => { + if who.supports_remoting() { + let msg = control_protocol::ControlMessage { + msg: Some(control_protocol::control_message::Msg::Terminate( + control_protocol::Terminate { + ids: vec![who.get_id().get_pid()], + }, + )), + }; + state.tcp_send_control(msg); + } + } + }, } } } diff --git a/ractor-cluster/src/protocol/control.proto b/ractor-cluster/src/protocol/control.proto index 1d585904..9a26c5ef 100644 --- a/ractor-cluster/src/protocol/control.proto +++ b/ractor-cluster/src/protocol/control.proto @@ -16,6 +16,14 @@ package control; import "google/protobuf/timestamp.proto"; +// Represents a single actor +message Actor { + // The local PID of the actor + uint64 pid = 1; + // The optional name of the actor + optional string name = 2; +} + // A heartbeat between actors message Ping { // The original time of the ping send, returned in the `Pong` message @@ -23,27 +31,38 @@ message Ping { google.protobuf.Timestamp timestamp = 1; } +// Reply to a ping message Pong { // The original time of the ping send, set by the original sender google.protobuf.Timestamp timestamp = 1; } -// Spawn an actor +// Actor(s) spawn notification message Spawn { - // The actor's Id - uint64 id = 1; - // The actor's name (to be inserted in the global registry) - optional string name = 2; + // The actors to spawn + repeated Actor actors = 1; } -// An actor termination event +// Actor(s) termination event message Terminate { - // The remote actor's PID - uint64 id = 1; - // Flag denoting if the termination was due to panic - bool is_panic = 2; - // The exit reason or panic message - string panic_reason = 3; + // The remote actors' PIDs + repeated uint64 ids = 1; +} + +// Process group join occurred +message PgJoin { + // The group + string group = 1; + // The actors + repeated Actor actors = 2; +} + +// Process group leave occurred +message PgLeave { + // The group + string group = 1; + // The actors + repeated Actor actors = 2; } // Control messages between authenticated `node()`s which are dist-connected @@ -58,5 +77,9 @@ message ControlMessage { Ping ping = 3; // A pong Pong pong = 4; + // A PG group join event + PgJoin pg_join = 5; + // A PG group leave event + PgLeave pg_leave = 6; } } \ No newline at end of file diff --git a/ractor/benches/actor.rs b/ractor/benches/actor.rs index f663c3e8..94167aea 100644 --- a/ractor/benches/actor.rs +++ b/ractor/benches/actor.rs @@ -7,7 +7,9 @@ extern crate criterion; use criterion::{BatchSize, Criterion}; -use ractor::{Actor, ActorRef, Message}; +#[cfg(feature = "cluster")] +use ractor::Message; +use ractor::{Actor, ActorRef}; struct BenchActor; @@ -34,7 +36,7 @@ fn create_actors(c: &mut Criterion) { let small = 100; let large = 10000; - let id = format!("Creation of {} actors", small); + let id = format!("Creation of {small} actors"); let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); c.bench_function(&id, move |b| { b.iter_batched( @@ -55,7 +57,7 @@ fn create_actors(c: &mut Criterion) { ); }); - let id = format!("Creation of {} actors", large); + let id = format!("Creation of {large} actors"); let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); c.bench_function(&id, move |b| { b.iter_batched( @@ -81,7 +83,7 @@ fn schedule_work(c: &mut Criterion) { let small = 100; let large = 1000; - let id = format!("Waiting on {} actors to process first message", small); + let id = format!("Waiting on {small} actors to process first message"); let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); c.bench_function(&id, move |b| { b.iter_batched( @@ -105,7 +107,7 @@ fn schedule_work(c: &mut Criterion) { ); }); - let id = format!("Waiting on {} actors to process first message", large); + let id = format!("Waiting on {large} actors to process first message"); let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); c.bench_function(&id, move |b| { b.iter_batched( @@ -163,7 +165,7 @@ fn process_messages(c: &mut Criterion) { } } - let id = format!("Waiting on {} messages to be processed", NUM_MSGS); + let id = format!("Waiting on {NUM_MSGS} messages to be processed"); let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); c.bench_function(&id, move |b| { b.iter_batched( diff --git a/ractor/examples/monte_carlo.rs b/ractor/examples/monte_carlo.rs index b4c09c73..f0cf52f2 100644 --- a/ractor/examples/monte_carlo.rs +++ b/ractor/examples/monte_carlo.rs @@ -172,7 +172,7 @@ impl Actor for GameManager { / state.total_games as i64; println!("Simulations ran: {}", state.results.len()); - println!("Final average funds: ${}", average_funds); + println!("Final average funds: ${average_funds}"); myself.stop(None); } diff --git a/ractor/examples/output_port.rs b/ractor/examples/output_port.rs index bbbbb140..3f759292 100644 --- a/ractor/examples/output_port.rs +++ b/ractor/examples/output_port.rs @@ -44,8 +44,8 @@ impl Actor for Publisher { async fn handle(&self, _myself: ActorRef, message: Self::Msg, _state: &mut Self::State) { match message { Self::Msg::Publish(msg) => { - println!("Publishing {}", msg); - self.output.send(Output(format!("Published: {}", msg))); + println!("Publishing {msg}"); + self.output.send(Output(format!("Published: {msg}"))); } } } @@ -70,10 +70,7 @@ impl Actor for Subscriber { async fn handle(&self, myself: ActorRef, message: Self::Msg, _state: &mut Self::State) { match message { Self::Msg::Published(msg) => { - println!( - "Subscriber ({:?}) received published message '{}'", - myself, msg - ); + println!("Subscriber ({myself:?}) received published message '{msg}'"); } } } @@ -113,7 +110,7 @@ async fn main() { // send some messages (we should see the subscribers printout) for i in 0..3 { publisher_ref - .cast(PublisherMessage::Publish(format!("Something {}", i))) + .cast(PublisherMessage::Publish(format!("Something {i}"))) .expect("Send failed"); tokio::time::sleep(Duration::from_millis(500)).await; } diff --git a/ractor/examples/philosophers.rs b/ractor/examples/philosophers.rs index 3cc6b46a..bb0be46c 100644 --- a/ractor/examples/philosophers.rs +++ b/ractor/examples/philosophers.rs @@ -85,10 +85,7 @@ impl Fork { ); } None => { - println!( - "ERROR Received `UsingFork` from {:?}. Real owner is `None`", - who - ); + println!("ERROR Received `UsingFork` from {who:?}. Real owner is `None`"); } }, ForkMessage::PutForkDown(who) => match &state.owned_by { @@ -104,10 +101,7 @@ impl Fork { ); } None => { - println!( - "ERROR Received `PutForkDown` from {:?}. Real owner is `None`", - who - ); + println!("ERROR Received `PutForkDown` from {who:?}. Real owner is `None`"); } }, } @@ -253,6 +247,7 @@ impl Philosopher { // schedule become hungry after the thinking time has elapsed let metrics_count = state.metrics.state_change_count; + #[allow(clippy::let_underscore_future)] let _ = myself.send_after(self.time_slice, move || { PhilosopherMessage::BecomeHungry(metrics_count) }); @@ -278,6 +273,7 @@ impl Philosopher { // schedule stop eating after the eating time has elapsed let metrics_count = state.metrics.state_change_count; + #[allow(clippy::let_underscore_future)] let _ = myself.send_after(self.time_slice, move || { PhilosopherMessage::StopEating(metrics_count) }); @@ -503,6 +499,6 @@ async fn main() { // print metrics println!("Simulation results"); for (who, metric) in results { - println!("{}: {:?}", who, metric); + println!("{who}: {metric:?}"); } } diff --git a/ractor/examples/supervisor.rs b/ractor/examples/supervisor.rs index add0efda..4f290a8c 100644 --- a/ractor/examples/supervisor.rs +++ b/ractor/examples/supervisor.rs @@ -163,10 +163,7 @@ impl Actor for MidLevelActor { SupervisionEvent::ActorPanicked(dead_actor, panic_msg) if dead_actor.get_id() == state.leaf_actor.get_id() => { - println!( - "MidLevelActor: {:?} panicked with '{}'", - dead_actor, panic_msg - ); + println!("MidLevelActor: {dead_actor:?} panicked with '{panic_msg}'"); panic!( "MidLevelActor: Mid-level actor panicking because Leaf actor panicked with '{}'", @@ -174,7 +171,7 @@ impl Actor for MidLevelActor { ); } other => { - println!("MidLevelActor: recieved supervisor event '{}'", other); + println!("MidLevelActor: recieved supervisor event '{other}'"); } } } @@ -201,7 +198,7 @@ impl Actor for RootActor { type State = RootActorState; async fn pre_start(&self, myself: ActorRef) -> Self::State { - println!("RootActor: Started {:?}", myself); + println!("RootActor: Started {myself:?}"); let (mid_level_actor, _) = Actor::spawn_linked(Some("mid-level".to_string()), MidLevelActor, myself.into()) .await @@ -239,13 +236,13 @@ impl Actor for RootActor { SupervisionEvent::ActorPanicked(dead_actor, panic_msg) if dead_actor.get_id() == state.mid_level_actor.get_id() => { - println!("RootActor: {:?} panicked with '{}'", dead_actor, panic_msg); + println!("RootActor: {dead_actor:?} panicked with '{panic_msg}'"); println!("RootActor: Terminating root actor, all my kids are dead!"); myself.stop(Some("Everyone died :(".to_string())); } other => { - println!("RootActor: recieved supervisor event '{}'", other); + println!("RootActor: recieved supervisor event '{other}'"); } } } diff --git a/ractor/src/actor/actor_cell/actor_properties.rs b/ractor/src/actor/actor_cell/actor_properties.rs index 9471f860..25e645d2 100644 --- a/ractor/src/actor/actor_cell/actor_properties.rs +++ b/ractor/src/actor/actor_cell/actor_properties.rs @@ -30,6 +30,8 @@ pub(crate) struct ActorProperties { pub(crate) message: InputPort, pub(crate) tree: SupervisionTree, pub(crate) type_id: std::any::TypeId, + #[cfg(feature = "cluster")] + pub(crate) supports_remoting: bool, } impl ActorProperties { @@ -76,6 +78,8 @@ impl ActorProperties { message: tx_message, tree: SupervisionTree::default(), type_id: std::any::TypeId::of::(), + #[cfg(feature = "cluster")] + supports_remoting: TActor::Msg::serializable(), }, rx_signal, rx_stop, diff --git a/ractor/src/actor/actor_cell/mod.rs b/ractor/src/actor/actor_cell/mod.rs index 96aac839..39fbd688 100644 --- a/ractor/src/actor/actor_cell/mod.rs +++ b/ractor/src/actor/actor_cell/mod.rs @@ -164,8 +164,13 @@ impl ActorCell { let cell = Self { inner: Arc::new(props), }; + #[cfg(feature = "cluster")] - crate::registry::register_pid(cell.get_id(), cell.clone()); + { + // registry to the PID registry + crate::registry::pid_registry::register_pid(cell.get_id(), cell.clone())?; + } + if let Some(r_name) = name { crate::registry::register(r_name, cell.clone())?; } @@ -191,16 +196,17 @@ impl ActorCell { TActor: Actor, { if id.is_local() { - panic!("Cannot create a new remote actor handler without the actor id being marked as a remote actor!"); + return Err(SpawnErr::StartupPanic("Cannot create a new remote actor handler without the actor id being marked as a remote actor!".to_string())); } - let (props, rx1, rx2, rx3, rx4) = ActorProperties::new_remote::(name.clone(), id); + let (props, rx1, rx2, rx3, rx4) = ActorProperties::new_remote::(name, id); let cell = Self { inner: Arc::new(props), }; - if let Some(r_name) = name { - crate::registry::register(r_name, cell.clone())?; - } + // TODO: remote actors don't appear in the name registry + // if let Some(r_name) = name { + // crate::registry::register(r_name, cell.clone())?; + // } Ok(( cell, ActorPortSet { @@ -229,6 +235,14 @@ impl ActorCell { self.inner.get_status() } + /// Identifies if this actor supports remote (dist) communication + /// + /// Returns [true] if the actor's messaging protocols support remote calls, [false] otherwise + #[cfg(feature = "cluster")] + pub fn supports_remoting(&self) -> bool { + self.inner.supports_remoting + } + /// Set the status of the [super::Actor]. If the status is set to /// [ActorStatus::Stopping] or [ActorStatus::Stopped] the actor /// will also be unenrolled from both the named registry ([crate::registry]) @@ -239,7 +253,12 @@ impl ActorCell { // The actor is shut down if status == ActorStatus::Stopped || status == ActorStatus::Stopping { #[cfg(feature = "cluster")] - crate::registry::unregister_pid(self.get_id()); + { + // stop monitoring for updates + crate::registry::pid_registry::demonitor(self.get_id()); + // unregistry from the PID registry + crate::registry::pid_registry::unregister_pid(self.get_id()); + } // If it's enrolled in the registry, remove it if let Some(name) = self.get_name() { crate::registry::unregister(name); diff --git a/ractor/src/actor/errors.rs b/ractor/src/actor/errors.rs index 676bf085..da053df4 100644 --- a/ractor/src/actor/errors.rs +++ b/ractor/src/actor/errors.rs @@ -26,7 +26,7 @@ impl Display for SpawnErr { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::StartupPanic(panic_msg) => { - write!(f, "Actor panicked during startup '{}'", panic_msg) + write!(f, "Actor panicked during startup '{panic_msg}'") } Self::StartupCancelled => { write!( @@ -40,8 +40,7 @@ impl Display for SpawnErr { Self::ActorAlreadyRegistered(actor_name) => { write!( f, - "Actor '{}' is already registered in the actor registry", - actor_name + "Actor '{actor_name}' is already registered in the actor registry" ) } } @@ -71,7 +70,7 @@ impl Display for ActorErr { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Panic(panic_msg) => { - write!(f, "Actor panicked '{}'", panic_msg) + write!(f, "Actor panicked '{panic_msg}'") } Self::Cancelled => { write!(f, "Actor operation cancelled") diff --git a/ractor/src/actor/messages.rs b/ractor/src/actor/messages.rs index 0d23b9a5..993fd909 100644 --- a/ractor/src/actor/messages.rs +++ b/ractor/src/actor/messages.rs @@ -63,7 +63,7 @@ pub enum StopMessage { impl Debug for StopMessage { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Stop message: {}", self) + write!(f, "Stop message: {self}") } } @@ -71,7 +71,7 @@ impl std::fmt::Display for StopMessage { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Stop => write!(f, "Stop"), - Self::Reason(reason) => write!(f, "Stop (reason = {})", reason), + Self::Reason(reason) => write!(f, "Stop (reason = {reason})"), } } } @@ -94,11 +94,15 @@ pub enum SupervisionEvent { /// A subscribed process group changed ProcessGroupChanged(crate::pg::GroupChangeMessage), + + /// A process lifecycle event occurred + #[cfg(feature = "cluster")] + PidLifecycleEvent(crate::registry::PidLifecycleEvent), } impl Debug for SupervisionEvent { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Supervision event: {}", self) + write!(f, "Supervision event: {self}") } } @@ -106,21 +110,25 @@ impl std::fmt::Display for SupervisionEvent { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { SupervisionEvent::ActorStarted(actor) => { - write!(f, "Started actor {:?}", actor) + write!(f, "Started actor {actor:?}") } SupervisionEvent::ActorTerminated(actor, _, reason) => { if let Some(r) = reason { - write!(f, "Stopped actor {:?} (reason = {})", actor, r) + write!(f, "Stopped actor {actor:?} (reason = {r})") } else { - write!(f, "Stopped actor {:?}", actor) + write!(f, "Stopped actor {actor:?}") } } SupervisionEvent::ActorPanicked(actor, panic_msg) => { - write!(f, "Actor panicked {:?} - {}", actor, panic_msg) + write!(f, "Actor panicked {actor:?} - {panic_msg}") } SupervisionEvent::ProcessGroupChanged(change) => { write!(f, "Process group {} changed", change.get_group()) } + #[cfg(feature = "cluster")] + SupervisionEvent::PidLifecycleEvent(change) => { + write!(f, "PID lifecycle event {change:?}") + } } } } @@ -134,7 +142,7 @@ pub enum Signal { impl Debug for Signal { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Signal: {}", self) + write!(f, "Signal: {self}") } } diff --git a/ractor/src/actor/tests/mod.rs b/ractor/src/actor/tests/mod.rs index 4ff023f9..b2859800 100644 --- a/ractor/src/actor/tests/mod.rs +++ b/ractor/src/actor/tests/mod.rs @@ -75,6 +75,9 @@ async fn test_stop_higher_priority_over_messages() { .await .expect("Actor failed to start"); + #[cfg(feature = "cluster")] + assert!(!actor.supports_remoting()); + // pump 10 messages on the queue for _i in 0..10 { actor @@ -322,6 +325,8 @@ async fn test_serialized_cast() { .await .expect("Failed to spawn test actor"); + assert!(actor.supports_remoting()); + let serialized = (TestMessage).serialize(); actor .send_serialized(serialized) @@ -444,6 +449,8 @@ async fn test_serialized_rpc() { .send_serialized(msg) .expect("Serialized message send failed!"); + assert!(actor.supports_remoting()); + let data = rx .await .expect("Faield to get reply from actor (within 100ms)"); diff --git a/ractor/src/actor/tests/supervisor.rs b/ractor/src/actor/tests/supervisor.rs index ff9cae92..0b4fec0e 100644 --- a/ractor/src/actor/tests/supervisor.rs +++ b/ractor/src/actor/tests/supervisor.rs @@ -53,7 +53,7 @@ async fn test_supervision_panic_in_post_startup() { message: SupervisionEvent, _state: &mut Self::State, ) { - println!("Supervisor event received {:?}", message); + println!("Supervisor event received {message:?}"); // check that the panic was captured if let SupervisionEvent::ActorPanicked(dead_actor, _panic_msg) = message { @@ -125,7 +125,7 @@ async fn test_supervision_panic_in_handle() { message: SupervisionEvent, _state: &mut Self::State, ) { - println!("Supervisor event received {:?}", message); + println!("Supervisor event received {message:?}"); // check that the panic was captured if let SupervisionEvent::ActorPanicked(dead_actor, _panic_msg) = message { @@ -195,7 +195,7 @@ async fn test_supervision_panic_in_post_stop() { message: SupervisionEvent, _state: &mut Self::State, ) { - println!("Supervisor event received {:?}", message); + println!("Supervisor event received {message:?}"); // check that the panic was captured if let SupervisionEvent::ActorPanicked(dead_actor, _panic_msg) = message { @@ -286,7 +286,7 @@ async fn test_supervision_panic_in_supervisor_handle() { message: SupervisionEvent, _state: &mut Self::State, ) { - println!("Supervisor event received {:?}", message); + println!("Supervisor event received {message:?}"); // check that the panic was captured if let SupervisionEvent::ActorPanicked(dead_actor, _panic_msg) = message { diff --git a/ractor/src/actor_id.rs b/ractor/src/actor_id.rs index 67b28bcd..185d6edb 100644 --- a/ractor/src/actor_id.rs +++ b/ractor/src/actor_id.rs @@ -43,8 +43,8 @@ impl ActorId { impl Display for ActorId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - ActorId::Local(id) => write!(f, "0.{}", id), - ActorId::Remote { node_id, pid } => write!(f, "{}.{}", node_id, pid), + ActorId::Local(id) => write!(f, "0.{id}"), + ActorId::Remote { node_id, pid } => write!(f, "{node_id}.{pid}"), } } } diff --git a/ractor/src/lib.rs b/ractor/src/lib.rs index 377ebb77..947a45ab 100644 --- a/ractor/src/lib.rs +++ b/ractor/src/lib.rs @@ -221,13 +221,13 @@ impl std::fmt::Display for RactorErr { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Actor(actor_err) => { - write!(f, "{}", actor_err) + write!(f, "{actor_err}") } Self::Messaging(messaging_err) => { - write!(f, "{}", messaging_err) + write!(f, "{messaging_err}") } Self::Spawn(spawn_err) => { - write!(f, "{}", spawn_err) + write!(f, "{spawn_err}") } Self::Timeout => { write!(f, "timeout") diff --git a/ractor/src/pg/mod.rs b/ractor/src/pg/mod.rs index 5519b3cf..a0e0e44d 100644 --- a/ractor/src/pg/mod.rs +++ b/ractor/src/pg/mod.rs @@ -28,6 +28,9 @@ use once_cell::sync::OnceCell; use crate::{ActorCell, ActorId, GroupName, SupervisionEvent}; +/// Key to monitor all of the groups +pub const ALL_GROUPS_NOTIFICATION: &str = "__world__"; + #[cfg(test)] mod tests; @@ -94,6 +97,14 @@ pub fn join(group: GroupName, actors: Vec) { )); } } + // notify the world monitors + if let Some(listeners) = monitor.listeners.get(ALL_GROUPS_NOTIFICATION) { + for listener in listeners.value() { + let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( + GroupChangeMessage::Join(group.clone(), actors.clone()), + )); + } + } } /// Leaves the specified [crate::Actor]s from the PG group @@ -120,6 +131,14 @@ pub fn leave(group: GroupName, actors: Vec) { )); } } + // notify the world monitors + if let Some(listeners) = monitor.listeners.get(ALL_GROUPS_NOTIFICATION) { + for listener in listeners.value() { + let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( + GroupChangeMessage::Leave(group.clone(), actors.clone()), + )); + } + } } } } @@ -152,6 +171,14 @@ pub(crate) fn leave_all(actor: ActorId) { )); }); } + // notify the world monitors + if let Some(listeners) = all_listeners.get(ALL_GROUPS_NOTIFICATION) { + for listener in listeners.value() { + let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( + GroupChangeMessage::Leave(group.clone(), vec![cell.clone()]), + )); + } + } } // Cleanup empty groups diff --git a/ractor/src/pg/tests.rs b/ractor/src/pg/tests.rs index 99fe3446..259d237d 100644 --- a/ractor/src/pg/tests.rs +++ b/ractor/src/pg/tests.rs @@ -268,5 +268,61 @@ async fn test_pg_monitoring() { monitor_handle.await.expect("Actor cleanup failed"); } -// TODO: Tests to add -// 1. Local vs remote members (can't test until we have proper remoting) +#[named] +#[cfg(feature = "cluster")] +#[crate::concurrency::test] +async fn local_vs_remote_pg_members() { + use crate::ActorRuntime; + + let group = function_name!().to_string(); + + struct TestRemoteActor; + struct TestRemoteActorMessage; + impl crate::Message for TestRemoteActorMessage {} + #[async_trait::async_trait] + impl Actor for TestRemoteActor { + type Msg = TestRemoteActorMessage; + type State = (); + async fn pre_start(&self, _this_actor: crate::ActorRef) -> Self::State {} + } + + let remote_pid = crate::ActorId::Remote { node_id: 1, pid: 1 }; + + let mut actors: Vec = vec![]; + let mut handles = vec![]; + for _ in 0..10 { + let (actor, handle) = Actor::spawn(None, TestActor) + .await + .expect("Failed to spawn test actor"); + actors.push(actor.into()); + handles.push(handle); + } + let (actor, handle) = ActorRuntime::spawn_linked_remote( + None, + TestRemoteActor, + remote_pid, + actors.first().unwrap().clone(), + ) + .await + .expect("Failed to spawn remote actor"); + actors.push(actor.into()); + handles.push(handle); + + // join the group + pg::join(group.clone(), actors.to_vec()); + + // assert + let members = pg::get_local_members(&group); + assert_eq!(10, members.len()); + + let members = pg::get_members(&group); + assert_eq!(11, members.len()); + + // Cleanup + for actor in actors { + actor.stop(None); + } + for handle in handles.into_iter() { + handle.await.expect("Actor cleanup failed"); + } +} diff --git a/ractor/src/registry/mod.rs b/ractor/src/registry/mod.rs index cb847218..b43d426e 100644 --- a/ractor/src/registry/mod.rs +++ b/ractor/src/registry/mod.rs @@ -38,10 +38,13 @@ use dashmap::mapref::entry::Entry::{Occupied, Vacant}; use dashmap::DashMap; use once_cell::sync::OnceCell; -#[cfg(feature = "cluster")] -use crate::ActorId; use crate::{ActorCell, ActorName}; +#[cfg(feature = "cluster")] +pub mod pid_registry; +#[cfg(feature = "cluster")] +pub use pid_registry::{get_all_pids, where_is_pid, PidLifecycleEvent}; + #[cfg(test)] mod tests; @@ -53,17 +56,11 @@ pub enum ActorRegistryErr { /// The name'd actor registry static ACTOR_REGISTRY: OnceCell>> = OnceCell::new(); -#[cfg(feature = "cluster")] -static PID_REGISTRY: OnceCell>> = OnceCell::new(); /// Retrieve the named actor registry handle fn get_actor_registry<'a>() -> &'a Arc> { ACTOR_REGISTRY.get_or_init(|| Arc::new(DashMap::new())) } -#[cfg(feature = "cluster")] -fn get_pid_registry<'a>() -> &'a Arc> { - PID_REGISTRY.get_or_init(|| Arc::new(DashMap::new())) -} /// Put an actor into the registry pub(crate) fn register(name: ActorName, actor: ActorCell) -> Result<(), ActorRegistryErr> { @@ -75,12 +72,6 @@ pub(crate) fn register(name: ActorName, actor: ActorCell) -> Result<(), ActorReg } } } -#[cfg(feature = "cluster")] -pub(crate) fn register_pid(id: ActorId, actor: ActorCell) { - if id.is_local() { - get_pid_registry().insert(id, actor); - } -} /// Remove an actor from the registry given it's actor name pub(crate) fn unregister(name: ActorName) { @@ -88,12 +79,6 @@ pub(crate) fn unregister(name: ActorName) { let _ = reg.remove(&name); } } -#[cfg(feature = "cluster")] -pub(crate) fn unregister_pid(id: ActorId) { - if id.is_local() { - let _ = get_pid_registry().remove(&id); - } -} /// Try and retrieve an actor from the registry /// @@ -114,17 +99,3 @@ pub fn registered() -> Vec { let reg = get_actor_registry(); reg.iter().map(|kvp| kvp.key().clone()).collect::>() } - -/// Retrieve an actor from the global registery of all local actors -/// -/// * `id` - The **local** id of the actor to retrieve -/// -/// Returns [Some(_)] if the actor exists locally, [None] otherwise -#[cfg(feature = "cluster")] -pub fn get_pid(id: ActorId) -> Option { - if id.is_local() { - get_pid_registry().get(&id).map(|v| v.value().clone()) - } else { - None - } -} diff --git a/ractor/src/registry/pid_registry.rs b/ractor/src/registry/pid_registry.rs new file mode 100644 index 00000000..ba8b8afe --- /dev/null +++ b/ractor/src/registry/pid_registry.rs @@ -0,0 +1,129 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Represents a PID-based registration. Includes all LOCAL actors and their associated pids. It's kept in +//! sync via actor spawn + death management. + +use std::fmt::Debug; +use std::sync::Arc; + +use dashmap::mapref::entry::Entry::{Occupied, Vacant}; +use dashmap::DashMap; +use once_cell::sync::OnceCell; + +use crate::{ActorCell, ActorId, SupervisionEvent}; + +/// Represents a change in group or scope membership +#[derive(Clone)] +pub enum PidLifecycleEvent { + /// Some actors joined a group + Spawn(ActorCell), + /// Some actors left a group + Terminate(ActorCell), +} + +impl Debug for PidLifecycleEvent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Spawn(who) => { + write!(f, "Spawn {}", who.get_id()) + } + Self::Terminate(who) => { + write!(f, "Terminate {}", who.get_id()) + } + } + } +} + +static PID_REGISTRY: OnceCell>> = OnceCell::new(); +static PID_REGISTRY_LISTENERS: OnceCell>> = OnceCell::new(); + +fn get_pid_registry<'a>() -> &'a Arc> { + PID_REGISTRY.get_or_init(|| Arc::new(DashMap::new())) +} + +fn get_pid_listeners<'a>() -> &'a Arc> { + PID_REGISTRY_LISTENERS.get_or_init(|| Arc::new(DashMap::new())) +} + +pub(crate) fn register_pid(id: ActorId, actor: ActorCell) -> Result<(), super::ActorRegistryErr> { + if id.is_local() { + match get_pid_registry().entry(id) { + Occupied(_o) => Err(super::ActorRegistryErr::AlreadyRegistered(format!( + "PID {id} already alive" + ))), + Vacant(v) => { + v.insert(actor.clone()); + // notify lifecycle listeners + for listener in get_pid_listeners().iter() { + let _ = + listener + .value() + .send_supervisor_evt(SupervisionEvent::PidLifecycleEvent( + PidLifecycleEvent::Spawn(actor.clone()), + )); + } + Ok(()) + } + } + } else { + Ok(()) + } +} + +pub(crate) fn unregister_pid(id: ActorId) { + if id.is_local() { + if let Some((_, cell)) = get_pid_registry().remove(&id) { + // notify lifecycle listeners + for listener in get_pid_listeners().iter() { + let _ = listener + .value() + .send_supervisor_evt(SupervisionEvent::PidLifecycleEvent( + PidLifecycleEvent::Terminate(cell.clone()), + )); + } + } + } +} + +/// Retrieve all currently registered [crate::Actor]s from the registry +/// +/// Returns [Vec<_>] of [crate::ActorCell]s representing the current actors +/// registered +pub fn get_all_pids() -> Vec { + get_pid_registry() + .iter() + .map(|v| v.value().clone()) + .collect::>() +} + +/// Retrieve an actor from the global registery of all local actors +/// +/// * `id` - The **local** id of the actor to retrieve +/// +/// Returns [Some(_)] if the actor exists locally, [None] otherwise +pub fn where_is_pid(id: ActorId) -> Option { + if id.is_local() { + get_pid_registry().get(&id).map(|v| v.value().clone()) + } else { + None + } +} + +/// Subscribes the provided [crate::Actor] to the PID registry lifecycle +/// events +/// +/// * `actor` - The [ActorCell] representing who will receive updates +pub fn monitor(actor: ActorCell) { + get_pid_listeners().insert(actor.get_id(), actor); +} + +/// Unsubscribes the provided [crate::Actor] from the PID registry lifecycle +/// events +/// +/// * `actor` - The [ActorCell] representing who was receiving updates +pub fn demonitor(actor: ActorId) { + let _ = get_pid_listeners().remove(&actor); +} diff --git a/ractor/src/registry/tests.rs b/ractor/src/registry/tests.rs index 5d01401f..a3000f98 100644 --- a/ractor/src/registry/tests.rs +++ b/ractor/src/registry/tests.rs @@ -97,3 +97,203 @@ async fn test_actor_registry_unenrollment() { // the actor was automatically removed assert!(crate::registry::where_is("unenrollment".to_string()).is_none()); } + +#[cfg(feature = "cluster")] +mod pid_registry_tests { + use std::sync::Arc; + + use dashmap::DashMap; + + use super::super::pid_registry::*; + use crate::{concurrency::Duration, Actor, ActorId, SupervisionEvent}; + + struct RemoteActor; + struct RemoteActorMessage; + impl crate::Message for RemoteActorMessage {} + #[async_trait::async_trait] + impl Actor for RemoteActor { + type Msg = RemoteActorMessage; + type State = (); + async fn pre_start(&self, _this_actor: crate::ActorRef) -> Self::State {} + } + + #[crate::concurrency::test] + async fn try_enroll_remote_actor() { + struct EmptyActor; + #[async_trait::async_trait] + impl Actor for EmptyActor { + type Msg = (); + type State = (); + async fn pre_start(&self, _this_actor: crate::ActorRef) -> Self::State {} + } + let remote_pid = ActorId::Remote { node_id: 1, pid: 1 }; + + let (actor, handle) = Actor::spawn(None, EmptyActor) + .await + .expect("Actor failed to start"); + + let (remote_actor, remote_handle) = crate::ActorRuntime::spawn_linked_remote( + None, + RemoteActor, + remote_pid, + actor.get_cell(), + ) + .await + .expect("Failed to start remote actor"); + + assert!(crate::registry::where_is_pid(remote_actor.get_id()).is_none()); + assert!(crate::registry::where_is_pid(actor.get_id()).is_some()); + + remote_actor.stop(None); + actor.stop(None); + handle.await.expect("Failed to clean stop the actor"); + remote_handle.await.expect("Failed to stop remote actor"); + } + + #[crate::concurrency::test] + async fn test_basic_registation() { + struct EmptyActor; + + #[async_trait::async_trait] + impl Actor for EmptyActor { + type Msg = (); + + type State = (); + + async fn pre_start(&self, _this_actor: crate::ActorRef) -> Self::State {} + } + + let (actor, handle) = Actor::spawn(None, EmptyActor) + .await + .expect("Actor failed to start"); + + assert!(crate::registry::where_is_pid(actor.get_id()).is_some()); + + actor.stop(None); + handle.await.expect("Failed to clean stop the actor"); + } + + #[crate::concurrency::test] + async fn test_actor_registry_unenrollment() { + struct EmptyActor; + + #[async_trait::async_trait] + impl Actor for EmptyActor { + type Msg = (); + + type State = (); + + async fn pre_start(&self, _this_actor: crate::ActorRef) -> Self::State {} + } + + let (actor, handle) = Actor::spawn(None, EmptyActor) + .await + .expect("Actor failed to start"); + + assert!(crate::registry::where_is_pid(actor.get_id()).is_some()); + + // stop the actor and wait for its death + actor.stop(None); + handle.await.expect("Failed to wait for agent stop"); + + let id = actor.get_id(); + + // drop the actor ref's + drop(actor); + + // unenrollment is a cast operation, so it's not immediate. wait for cleanup + crate::concurrency::sleep(Duration::from_millis(100)).await; + + // the actor was automatically removed + assert!(crate::registry::where_is_pid(id).is_none()); + } + + #[crate::concurrency::test] + async fn test_pid_lifecycle_monitoring() { + let counter = Arc::new(DashMap::new()); + + struct AutoJoinActor; + + #[async_trait::async_trait] + impl Actor for AutoJoinActor { + type Msg = (); + + type State = (); + + async fn pre_start(&self, _myself: crate::ActorRef) -> Self::State {} + } + + struct NotificationMonitor { + counter: Arc>, + } + + #[async_trait::async_trait] + impl Actor for NotificationMonitor { + type Msg = (); + + type State = (); + + async fn pre_start(&self, myself: crate::ActorRef) -> Self::State { + monitor(myself.into()); + } + + async fn handle_supervisor_evt( + &self, + _myself: crate::ActorRef, + message: SupervisionEvent, + _state: &mut Self::State, + ) { + if let SupervisionEvent::PidLifecycleEvent(change) = message { + match change { + PidLifecycleEvent::Spawn(who) => { + self.counter.insert(who.get_id(), 1); + // self.counter.get_mut(&who.get_id()) + // self.counter.fetch_add(1, Ordering::Relaxed); + } + PidLifecycleEvent::Terminate(who) => { + // self.counter.fetch_sub(1, Ordering::Relaxed); + self.counter.insert(who.get_id(), 0); + } + } + } + } + } + let (monitor_actor, monitor_handle) = Actor::spawn( + None, + NotificationMonitor { + counter: counter.clone(), + }, + ) + .await + .expect("Failed to start monitor actor"); + + // this actor's startup should "monitor" for PG changes + let (test_actor, test_handle) = Actor::spawn(None, AutoJoinActor) + .await + .expect("Failed to start test actor"); + + // the monitor is notified async, so we need to wait a tiny bit + crate::concurrency::sleep(Duration::from_millis(100)).await; + // DUE to the static nature of the PID monitors, we're creating a LOT of actors + // across the tests and there's a counting race here. So we use a map to check + // this specific test actor + assert!(matches!( + counter.get(&test_actor.get_id()).map(|v| *v), + Some(1) + )); + + // kill the pg member + test_actor.stop(None); + test_handle.await.expect("Actor cleanup failed"); + + // should have decremented + assert!(matches!( + counter.get(&test_actor.get_id()).map(|v| *v), + Some(0) + )); + + // cleanup + monitor_actor.stop(None); + monitor_handle.await.expect("Actor cleanup failed"); + } +} diff --git a/ractor/src/rpc/tests.rs b/ractor/src/rpc/tests.rs index ac3e2084..5c2356b9 100644 --- a/ractor/src/rpc/tests.rs +++ b/ractor/src/rpc/tests.rs @@ -100,7 +100,7 @@ async fn test_rpc_call() { let _ = reply.send("howdy".to_string()); } Self::Msg::MultiArg(message, count, reply) => { - let _ = reply.send(format!("{}-{}", message, count)); + let _ = reply.send(format!("{message}-{count}")); } } } @@ -125,7 +125,7 @@ async fn test_rpc_call() { let rpc_timeout = call_t!(actor_ref, MessageFormat::Timeout, 10); assert!(rpc_timeout.is_err()); - println!("RPC Error {:?}", rpc_timeout); + println!("RPC Error {rpc_timeout:?}"); let rpc_value = call!(actor_ref, MessageFormat::MultiArg, "Msg".to_string(), 32).unwrap(); assert_eq!("Msg-32".to_string(), rpc_value);