Skip to content

Commit

Permalink
Adding handling if the actor fails during pre_start that it doesn't p…
Browse files Browse the repository at this point in the history
…oison the named and pid registries.

Resolves #240
  • Loading branch information
slawlor committed May 24, 2024
1 parent f43bd25 commit ce61425
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 8 deletions.
2 changes: 1 addition & 1 deletion ractor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor"
version = "0.10.2"
version = "0.10.3"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "A actor framework for Rust"
documentation = "https://docs.rs/ractor"
Expand Down
35 changes: 30 additions & 5 deletions ractor/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,12 @@ where
startup_args: TActor::Arguments,
) -> Result<(ActorRef<TActor::Msg>, JoinHandle<()>), SpawnErr> {
let (actor, ports) = Self::new(name, handler)?;
actor.start(ports, startup_args, None).await
let aref = actor.actor_ref.clone();
let result = actor.start(ports, startup_args, None).await;
if result.is_err() {
aref.set_status(ActorStatus::Stopped);
}
result
}

/// Spawn an actor with a supervisor, automatically starting the actor
Expand All @@ -517,7 +522,12 @@ where
supervisor: ActorCell,
) -> Result<(ActorRef<TActor::Msg>, JoinHandle<()>), SpawnErr> {
let (actor, ports) = Self::new(name, handler)?;
actor.start(ports, startup_args, Some(supervisor)).await
let aref = actor.actor_ref.clone();
let result = actor.start(ports, startup_args, Some(supervisor)).await;
if result.is_err() {
aref.set_status(ActorStatus::Stopped);
}
result
}

/// Spawn an actor instantly, not waiting on the actor's `pre_start` routine. This is helpful
Expand Down Expand Up @@ -550,8 +560,13 @@ where
> {
let (actor, ports) = Self::new(name.clone(), handler)?;
let actor_ref = actor.actor_ref.clone();
let actor_ref2 = actor_ref.clone();
let join_op = crate::concurrency::spawn_named(name.as_deref(), async move {
let (_, handle) = actor.start(ports, startup_args, None).await?;
let result = actor.start(ports, startup_args, None).await;
if result.is_err() {
actor_ref2.set_status(ActorStatus::Stopped);
}
let (_, handle) = result?;
Ok(handle)
});
Ok((actor_ref, join_op))
Expand Down Expand Up @@ -592,8 +607,13 @@ where
> {
let (actor, ports) = Self::new(name.clone(), handler)?;
let actor_ref = actor.actor_ref.clone();
let actor_ref2 = actor_ref.clone();
let join_op = crate::concurrency::spawn_named(name.as_deref(), async move {
let (_, handle) = actor.start(ports, startup_args, Some(supervisor)).await?;
let result = actor.start(ports, startup_args, Some(supervisor)).await;
if result.is_err() {
actor_ref2.set_status(ActorStatus::Stopped);
}
let (_, handle) = result?;
Ok(handle)
});
Ok((actor_ref, join_op))
Expand Down Expand Up @@ -627,6 +647,7 @@ where
let (actor_cell, ports) = actor_cell::ActorCell::new_remote::<TActor>(name, id)?;
let id = actor_cell.get_id();
let name = actor_cell.get_name();
let actor_cell2 = actor_cell.clone();
let (actor, ports) = (
Self {
actor_ref: actor_cell.into(),
Expand All @@ -636,7 +657,11 @@ where
},
ports,
);
actor.start(ports, startup_args, Some(supervisor)).await
let result = actor.start(ports, startup_args, Some(supervisor)).await;
if result.is_err() {
actor_cell2.set_status(ActorStatus::Stopped);
}
result
}
}

Expand Down
44 changes: 44 additions & 0 deletions ractor/src/actor/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -956,3 +956,47 @@ fn returns_actor_references() {
assert_eq!(event.actor_id().is_some(), want);
}
}

/// https://github.com/slawlor/ractor/issues/240
#[crate::concurrency::test]
#[tracing_test::traced_test]
async fn actor_failing_in_spawn_err_doesnt_poison_registries() {
struct Test;

#[crate::async_trait]
impl Actor for Test {
type Msg = ();
type State = ();
type Arguments = ();

async fn pre_start(&self, _: ActorRef<Self::Msg>, _: ()) -> Result<(), ActorProcessingErr> {
Err("something".into())
}
}

struct Test2;

#[crate::async_trait]
impl Actor for Test2 {
type Msg = ();
type State = ();
type Arguments = ();

async fn pre_start(&self, _: ActorRef<Self::Msg>, _: ()) -> Result<(), ActorProcessingErr> {
Ok(())
}
}

let a = Actor::spawn(Some("test".to_owned()), Test, ()).await;
assert!(a.is_err());
drop(a);

let (a, h) = Actor::spawn(Some("test".to_owned()), Test2, ())
.await
.expect("Failed to spawn second actor with name clash");

// startup ok, we were able to reuse the name

a.stop(None);
h.await.unwrap();
}
2 changes: 1 addition & 1 deletion ractor_cluster/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor_cluster"
version = "0.10.2"
version = "0.10.3"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "Distributed cluster environment of Ractor actors"
documentation = "https://docs.rs/ractor"
Expand Down
2 changes: 1 addition & 1 deletion ractor_cluster_derive/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor_cluster_derive"
version = "0.10.2"
version = "0.10.3"
authors = ["Sean Lawlor <[email protected]>"]
description = "Derives for ractor_cluster"
license = "MIT"
Expand Down

0 comments on commit ce61425

Please sign in to comment.