diff --git a/ractor/Cargo.toml b/ractor/Cargo.toml index 1d9a321b..b4a8eae9 100644 --- a/ractor/Cargo.toml +++ b/ractor/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ractor" -version = "0.14.4" +version = "0.14.5" authors = ["Sean Lawlor", "Evan Au", "Dillon George"] description = "A actor framework for Rust" documentation = "https://docs.rs/ractor" @@ -14,7 +14,7 @@ categories = ["asynchronous"] rust-version = "1.64" [lints.rust] -unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] } +unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)', 'cfg(rust_analyzer)'] } [features] ### Other features diff --git a/ractor/src/actor/derived_actor.rs b/ractor/src/actor/derived_actor.rs index 058a64df..cedaf05a 100644 --- a/ractor/src/actor/derived_actor.rs +++ b/ractor/src/actor/derived_actor.rs @@ -140,10 +140,18 @@ use std::sync::Arc; /// kitchen_actor_handle.await.unwrap(); /// } /// ``` -#[derive(Clone)] pub struct DerivedActorRef { - converter: Arc Result<(), MessagingErr>>, - inner: ActorCell, + converter: Arc Result<(), MessagingErr> + Send + Sync + 'static>, + pub(crate) inner: ActorCell, +} + +impl Clone for DerivedActorRef { + fn clone(&self) -> Self { + Self { + converter: self.converter.clone(), + inner: self.inner.clone(), + } + } } impl std::fmt::Debug for DerivedActorRef { diff --git a/ractor/src/factory/factoryimpl.rs b/ractor/src/factory/factoryimpl.rs index 34ff9034..602983d3 100644 --- a/ractor/src/factory/factoryimpl.rs +++ b/ractor/src/factory/factoryimpl.rs @@ -11,8 +11,6 @@ use std::fmt::Debug; use std::marker::PhantomData; use std::sync::Arc; -use bon::Builder; - use self::routing::RouteResult; use crate::concurrency::Duration; use crate::concurrency::Instant; @@ -99,7 +97,7 @@ where } /// Arguments for configuring and starting a [Factory] actor instance. -#[derive(Builder)] +#[derive(bon::Builder)] #[builder(on(String, into))] pub struct FactoryArguments where diff --git a/ractor/src/rpc/mod.rs b/ractor/src/rpc/mod.rs index ef495c44..17a25adc 100644 --- a/ractor/src/rpc/mod.rs +++ b/ractor/src/rpc/mod.rs @@ -89,13 +89,53 @@ use crate::concurrency::{self, Duration, JoinHandle}; -use crate::{ActorCell, ActorRef, Message, MessagingErr, RpcReplyPort}; +use crate::{ActorCell, ActorRef, DerivedActorRef, Message, MessagingErr, RpcReplyPort}; pub mod call_result; pub use call_result::CallResult; #[cfg(test)] mod tests; +fn internal_cast(sender: F, msg: TMessage) -> Result<(), MessagingErr> +where + F: Fn(TMessage) -> Result<(), MessagingErr>, + TMessage: Message, +{ + sender(msg) +} + +async fn internal_call( + sender: F, + msg_builder: TMsgBuilder, + timeout_option: Option, +) -> Result, MessagingErr> +where + F: Fn(TMessage) -> Result<(), MessagingErr>, + TMessage: Message, + TMsgBuilder: FnOnce(RpcReplyPort) -> TMessage, +{ + let (tx, rx) = concurrency::oneshot(); + let port: RpcReplyPort = match timeout_option { + Some(duration) => (tx, duration).into(), + None => tx.into(), + }; + sender(msg_builder(port))?; + + // wait for the reply + Ok(if let Some(duration) = timeout_option { + match crate::concurrency::timeout(duration, rx).await { + Ok(Ok(result)) => CallResult::Success(result), + Ok(Err(_send_err)) => CallResult::SenderError, + Err(_timeout_err) => CallResult::Timeout, + } + } else { + match rx.await { + Ok(result) => CallResult::Success(result), + Err(_send_err) => CallResult::SenderError, + } + }) +} + /// Sends an asynchronous request to the specified actor, ignoring if the /// actor is alive or healthy and simply returns immediately /// @@ -107,7 +147,7 @@ pub fn cast(actor: &ActorCell, msg: TMessage) -> Result<(), MessagingE where TMessage: Message, { - actor.send_message::(msg) + internal_cast(|m| actor.send_message::(m), msg) } /// Sends an asynchronous request to the specified actor, building a one-time @@ -129,26 +169,7 @@ where TMessage: Message, TMsgBuilder: FnOnce(RpcReplyPort) -> TMessage, { - let (tx, rx) = concurrency::oneshot(); - let port: RpcReplyPort = match timeout_option { - Some(duration) => (tx, duration).into(), - None => tx.into(), - }; - actor.send_message::(msg_builder(port))?; - - // wait for the reply - Ok(if let Some(duration) = timeout_option { - match crate::concurrency::timeout(duration, rx).await { - Ok(Ok(result)) => CallResult::Success(result), - Ok(Err(_send_err)) => CallResult::SenderError, - Err(_timeout_err) => CallResult::Timeout, - } - } else { - match rx.await { - Ok(result) => CallResult::Success(result), - Err(_send_err) => CallResult::SenderError, - } - }) + internal_call(|m| actor.send_message(m), msg_builder, timeout_option).await } /// Sends an asynchronous request to the specified actors, building a one-time @@ -327,3 +348,25 @@ where ) } } + +impl DerivedActorRef +where + TMessage: Message, +{ + /// Alias of [cast] + pub fn cast(&self, msg: TMessage) -> Result<(), MessagingErr> { + internal_cast(|m| self.send_message(m), msg) + } + + /// Alias of [call] + pub async fn call( + &self, + msg_builder: TMsgBuilder, + timeout_option: Option, + ) -> Result, MessagingErr> + where + TMsgBuilder: FnOnce(RpcReplyPort) -> TMessage, + { + internal_call(|m| self.send_message(m), msg_builder, timeout_option).await + } +} diff --git a/ractor/src/time/mod.rs b/ractor/src/time/mod.rs index 2f3a916b..538d3bde 100644 --- a/ractor/src/time/mod.rs +++ b/ractor/src/time/mod.rs @@ -216,3 +216,39 @@ where kill_after(period, self.get_cell()) } } + +/// Add the timing functionality on top of the [crate::ActorRef] +impl crate::DerivedActorRef +where + TMessage: crate::Message, +{ + /// Alias of [send_interval] + pub fn send_interval(&self, period: Duration, msg: F) -> JoinHandle<()> + where + F: Fn() -> TMessage + Send + 'static, + { + send_interval::(period, self.get_cell(), msg) + } + + /// Alias of [send_after] + pub fn send_after( + &self, + period: Duration, + msg: F, + ) -> JoinHandle>> + where + F: FnOnce() -> TMessage + Send + 'static, + { + send_after::(period, self.get_cell(), msg) + } + + /// Alias of [exit_after] + pub fn exit_after(&self, period: Duration) -> JoinHandle<()> { + exit_after(period, self.get_cell()) + } + + /// Alias of [kill_after] + pub fn kill_after(&self, period: Duration) -> JoinHandle<()> { + kill_after(period, self.get_cell()) + } +} diff --git a/ractor_cluster/Cargo.toml b/ractor_cluster/Cargo.toml index 9ccdfdc9..5cc75f13 100644 --- a/ractor_cluster/Cargo.toml +++ b/ractor_cluster/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ractor_cluster" -version = "0.14.4" +version = "0.14.5" authors = ["Sean Lawlor "] description = "Distributed cluster environment of Ractor actors" documentation = "https://docs.rs/ractor" diff --git a/ractor_cluster_derive/Cargo.toml b/ractor_cluster_derive/Cargo.toml index 7ac675fe..0c7e83a2 100644 --- a/ractor_cluster_derive/Cargo.toml +++ b/ractor_cluster_derive/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ractor_cluster_derive" -version = "0.14.4" +version = "0.14.5" authors = ["Sean Lawlor "] description = "Derives for ractor_cluster" license = "MIT"