Skip to content

Commit

Permalink
PG Groups lifecycle + PID lifecycle synchronization
Browse files Browse the repository at this point in the history
  • Loading branch information
slawlor committed Jan 26, 2023
1 parent 0664c73 commit 2940034
Show file tree
Hide file tree
Showing 23 changed files with 780 additions and 161 deletions.
4 changes: 2 additions & 2 deletions ractor-cluster/src/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ fn build_protobufs() {
let mut protobuf_files = Vec::with_capacity(PROTOBUF_FILES.len());

for file in PROTOBUF_FILES.iter() {
let proto_file = format!("{}/{}.proto", PROTOBUF_BASE_DIRECTORY, file);
println!("cargo:rerun-if-changed={}", proto_file);
let proto_file = format!("{PROTOBUF_BASE_DIRECTORY}/{file}.proto");
println!("cargo:rerun-if-changed={proto_file}");
protobuf_files.push(proto_file);
}

Expand Down
2 changes: 1 addition & 1 deletion ractor-cluster/src/node/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub enum ClientConnectError {

impl Display for ClientConnectError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
write!(f, "{self:?}")
}
}

Expand Down
282 changes: 233 additions & 49 deletions ractor-cluster/src/node/node_session.rs

Large diffs are not rendered by default.

47 changes: 35 additions & 12 deletions ractor-cluster/src/protocol/control.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,53 @@ package control;

import "google/protobuf/timestamp.proto";

// Represents a single actor
message Actor {
// The local PID of the actor
uint64 pid = 1;
// The optional name of the actor
optional string name = 2;
}

// A heartbeat between actors
message Ping {
// The original time of the ping send, returned in the `Pong` message
// to measure latency
google.protobuf.Timestamp timestamp = 1;
}

// Reply to a ping
message Pong {
// The original time of the ping send, set by the original sender
google.protobuf.Timestamp timestamp = 1;
}

// Spawn an actor
// Actor(s) spawn notification
message Spawn {
// The actor's Id
uint64 id = 1;
// The actor's name (to be inserted in the global registry)
optional string name = 2;
// The actors to spawn
repeated Actor actors = 1;
}

// An actor termination event
// Actor(s) termination event
message Terminate {
// The remote actor's PID
uint64 id = 1;
// Flag denoting if the termination was due to panic
bool is_panic = 2;
// The exit reason or panic message
string panic_reason = 3;
// The remote actors' PIDs
repeated uint64 ids = 1;
}

// Process group join occurred
message PgJoin {
// The group
string group = 1;
// The actors
repeated Actor actors = 2;
}

// Process group leave occurred
message PgLeave {
// The group
string group = 1;
// The actors
repeated Actor actors = 2;
}

// Control messages between authenticated `node()`s which are dist-connected
Expand All @@ -58,5 +77,9 @@ message ControlMessage {
Ping ping = 3;
// A pong
Pong pong = 4;
// A PG group join event
PgJoin pg_join = 5;
// A PG group leave event
PgLeave pg_leave = 6;
}
}
14 changes: 8 additions & 6 deletions ractor/benches/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
extern crate criterion;

use criterion::{BatchSize, Criterion};
use ractor::{Actor, ActorRef, Message};
#[cfg(feature = "cluster")]
use ractor::Message;
use ractor::{Actor, ActorRef};

struct BenchActor;

Expand All @@ -34,7 +36,7 @@ fn create_actors(c: &mut Criterion) {
let small = 100;
let large = 10000;

let id = format!("Creation of {} actors", small);
let id = format!("Creation of {small} actors");
let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
c.bench_function(&id, move |b| {
b.iter_batched(
Expand All @@ -55,7 +57,7 @@ fn create_actors(c: &mut Criterion) {
);
});

let id = format!("Creation of {} actors", large);
let id = format!("Creation of {large} actors");
let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
c.bench_function(&id, move |b| {
b.iter_batched(
Expand All @@ -81,7 +83,7 @@ fn schedule_work(c: &mut Criterion) {
let small = 100;
let large = 1000;

let id = format!("Waiting on {} actors to process first message", small);
let id = format!("Waiting on {small} actors to process first message");
let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
c.bench_function(&id, move |b| {
b.iter_batched(
Expand All @@ -105,7 +107,7 @@ fn schedule_work(c: &mut Criterion) {
);
});

let id = format!("Waiting on {} actors to process first message", large);
let id = format!("Waiting on {large} actors to process first message");
let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
c.bench_function(&id, move |b| {
b.iter_batched(
Expand Down Expand Up @@ -163,7 +165,7 @@ fn process_messages(c: &mut Criterion) {
}
}

let id = format!("Waiting on {} messages to be processed", NUM_MSGS);
let id = format!("Waiting on {NUM_MSGS} messages to be processed");
let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
c.bench_function(&id, move |b| {
b.iter_batched(
Expand Down
2 changes: 1 addition & 1 deletion ractor/examples/monte_carlo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl Actor for GameManager {
/ state.total_games as i64;

println!("Simulations ran: {}", state.results.len());
println!("Final average funds: ${}", average_funds);
println!("Final average funds: ${average_funds}");

myself.stop(None);
}
Expand Down
11 changes: 4 additions & 7 deletions ractor/examples/output_port.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ impl Actor for Publisher {
async fn handle(&self, _myself: ActorRef<Self>, message: Self::Msg, _state: &mut Self::State) {
match message {
Self::Msg::Publish(msg) => {
println!("Publishing {}", msg);
self.output.send(Output(format!("Published: {}", msg)));
println!("Publishing {msg}");
self.output.send(Output(format!("Published: {msg}")));
}
}
}
Expand All @@ -70,10 +70,7 @@ impl Actor for Subscriber {
async fn handle(&self, myself: ActorRef<Self>, message: Self::Msg, _state: &mut Self::State) {
match message {
Self::Msg::Published(msg) => {
println!(
"Subscriber ({:?}) received published message '{}'",
myself, msg
);
println!("Subscriber ({myself:?}) received published message '{msg}'");
}
}
}
Expand Down Expand Up @@ -113,7 +110,7 @@ async fn main() {
// send some messages (we should see the subscribers printout)
for i in 0..3 {
publisher_ref
.cast(PublisherMessage::Publish(format!("Something {}", i)))
.cast(PublisherMessage::Publish(format!("Something {i}")))
.expect("Send failed");
tokio::time::sleep(Duration::from_millis(500)).await;
}
Expand Down
14 changes: 5 additions & 9 deletions ractor/examples/philosophers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,7 @@ impl Fork {
);
}
None => {
println!(
"ERROR Received `UsingFork` from {:?}. Real owner is `None`",
who
);
println!("ERROR Received `UsingFork` from {who:?}. Real owner is `None`");
}
},
ForkMessage::PutForkDown(who) => match &state.owned_by {
Expand All @@ -104,10 +101,7 @@ impl Fork {
);
}
None => {
println!(
"ERROR Received `PutForkDown` from {:?}. Real owner is `None`",
who
);
println!("ERROR Received `PutForkDown` from {who:?}. Real owner is `None`");
}
},
}
Expand Down Expand Up @@ -253,6 +247,7 @@ impl Philosopher {

// schedule become hungry after the thinking time has elapsed
let metrics_count = state.metrics.state_change_count;
#[allow(clippy::let_underscore_future)]
let _ = myself.send_after(self.time_slice, move || {
PhilosopherMessage::BecomeHungry(metrics_count)
});
Expand All @@ -278,6 +273,7 @@ impl Philosopher {

// schedule stop eating after the eating time has elapsed
let metrics_count = state.metrics.state_change_count;
#[allow(clippy::let_underscore_future)]
let _ = myself.send_after(self.time_slice, move || {
PhilosopherMessage::StopEating(metrics_count)
});
Expand Down Expand Up @@ -503,6 +499,6 @@ async fn main() {
// print metrics
println!("Simulation results");
for (who, metric) in results {
println!("{}: {:?}", who, metric);
println!("{who}: {metric:?}");
}
}
13 changes: 5 additions & 8 deletions ractor/examples/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,18 +163,15 @@ impl Actor for MidLevelActor {
SupervisionEvent::ActorPanicked(dead_actor, panic_msg)
if dead_actor.get_id() == state.leaf_actor.get_id() =>
{
println!(
"MidLevelActor: {:?} panicked with '{}'",
dead_actor, panic_msg
);
println!("MidLevelActor: {dead_actor:?} panicked with '{panic_msg}'");

panic!(
"MidLevelActor: Mid-level actor panicking because Leaf actor panicked with '{}'",
panic_msg
);
}
other => {
println!("MidLevelActor: recieved supervisor event '{}'", other);
println!("MidLevelActor: recieved supervisor event '{other}'");
}
}
}
Expand All @@ -201,7 +198,7 @@ impl Actor for RootActor {
type State = RootActorState;

async fn pre_start(&self, myself: ActorRef<Self>) -> Self::State {
println!("RootActor: Started {:?}", myself);
println!("RootActor: Started {myself:?}");
let (mid_level_actor, _) =
Actor::spawn_linked(Some("mid-level".to_string()), MidLevelActor, myself.into())
.await
Expand Down Expand Up @@ -239,13 +236,13 @@ impl Actor for RootActor {
SupervisionEvent::ActorPanicked(dead_actor, panic_msg)
if dead_actor.get_id() == state.mid_level_actor.get_id() =>
{
println!("RootActor: {:?} panicked with '{}'", dead_actor, panic_msg);
println!("RootActor: {dead_actor:?} panicked with '{panic_msg}'");

println!("RootActor: Terminating root actor, all my kids are dead!");
myself.stop(Some("Everyone died :(".to_string()));
}
other => {
println!("RootActor: recieved supervisor event '{}'", other);
println!("RootActor: recieved supervisor event '{other}'");
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions ractor/src/actor/actor_cell/actor_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ pub(crate) struct ActorProperties {
pub(crate) message: InputPort<BoxedMessage>,
pub(crate) tree: SupervisionTree,
pub(crate) type_id: std::any::TypeId,
#[cfg(feature = "cluster")]
pub(crate) supports_remoting: bool,
}

impl ActorProperties {
Expand Down Expand Up @@ -76,6 +78,8 @@ impl ActorProperties {
message: tx_message,
tree: SupervisionTree::default(),
type_id: std::any::TypeId::of::<TActor>(),
#[cfg(feature = "cluster")]
supports_remoting: TActor::Msg::serializable(),
},
rx_signal,
rx_stop,
Expand Down
33 changes: 26 additions & 7 deletions ractor/src/actor/actor_cell/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,13 @@ impl ActorCell {
let cell = Self {
inner: Arc::new(props),
};

#[cfg(feature = "cluster")]
crate::registry::register_pid(cell.get_id(), cell.clone());
{
// registry to the PID registry
crate::registry::pid_registry::register_pid(cell.get_id(), cell.clone())?;
}

if let Some(r_name) = name {
crate::registry::register(r_name, cell.clone())?;
}
Expand All @@ -191,16 +196,17 @@ impl ActorCell {
TActor: Actor,
{
if id.is_local() {
panic!("Cannot create a new remote actor handler without the actor id being marked as a remote actor!");
return Err(SpawnErr::StartupPanic("Cannot create a new remote actor handler without the actor id being marked as a remote actor!".to_string()));
}

let (props, rx1, rx2, rx3, rx4) = ActorProperties::new_remote::<TActor>(name.clone(), id);
let (props, rx1, rx2, rx3, rx4) = ActorProperties::new_remote::<TActor>(name, id);
let cell = Self {
inner: Arc::new(props),
};
if let Some(r_name) = name {
crate::registry::register(r_name, cell.clone())?;
}
// TODO: remote actors don't appear in the name registry
// if let Some(r_name) = name {
// crate::registry::register(r_name, cell.clone())?;
// }
Ok((
cell,
ActorPortSet {
Expand Down Expand Up @@ -229,6 +235,14 @@ impl ActorCell {
self.inner.get_status()
}

/// Identifies if this actor supports remote (dist) communication
///
/// Returns [true] if the actor's messaging protocols support remote calls, [false] otherwise
#[cfg(feature = "cluster")]
pub fn supports_remoting(&self) -> bool {
self.inner.supports_remoting
}

/// Set the status of the [super::Actor]. If the status is set to
/// [ActorStatus::Stopping] or [ActorStatus::Stopped] the actor
/// will also be unenrolled from both the named registry ([crate::registry])
Expand All @@ -239,7 +253,12 @@ impl ActorCell {
// The actor is shut down
if status == ActorStatus::Stopped || status == ActorStatus::Stopping {
#[cfg(feature = "cluster")]
crate::registry::unregister_pid(self.get_id());
{
// stop monitoring for updates
crate::registry::pid_registry::demonitor(self.get_id());
// unregistry from the PID registry
crate::registry::pid_registry::unregister_pid(self.get_id());
}
// If it's enrolled in the registry, remove it
if let Some(name) = self.get_name() {
crate::registry::unregister(name);
Expand Down
Loading

0 comments on commit 2940034

Please sign in to comment.