diff --git a/ractor/Cargo.toml b/ractor/Cargo.toml index 8de404e9..dbb49c21 100644 --- a/ractor/Cargo.toml +++ b/ractor/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ractor" -version = "0.12.4" +version = "0.13.0" authors = ["Sean Lawlor", "Evan Au", "Dillon George"] description = "A actor framework for Rust" documentation = "https://docs.rs/ractor" @@ -26,6 +26,7 @@ default = ["tokio_runtime", "async-trait"] [dependencies] ## Required dependencies +bon = "2" dashmap = "6" futures = "0.3" once_cell = "1" diff --git a/ractor/src/actor/actor_cell.rs b/ractor/src/actor/actor_cell.rs index a1a4c1f1..848cdc16 100644 --- a/ractor/src/actor/actor_cell.rs +++ b/ractor/src/actor/actor_cell.rs @@ -201,11 +201,24 @@ pub struct ActorCell { impl std::fmt::Debug for ActorCell { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - if let Some(name) = self.get_name() { - write!(f, "Actor '{}' (id: {})", name, self.get_id()) - } else { - write!(f, "Actor with id: {}", self.get_id()) - } + f.debug_struct("Actor") + .field("name", &self.get_name()) + .field("id", &self.get_id()) + .finish() + } +} + +impl PartialEq for ActorCell { + fn eq(&self, other: &Self) -> bool { + other.get_id() == self.get_id() + } +} + +impl Eq for ActorCell {} + +impl std::hash::Hash for ActorCell { + fn hash(&self, state: &mut H) { + self.get_id().hash(state) } } diff --git a/ractor/src/actor/actor_id.rs b/ractor/src/actor/actor_id.rs index de56dbf0..7a717117 100644 --- a/ractor/src/actor/actor_id.rs +++ b/ractor/src/actor/actor_id.rs @@ -31,19 +31,28 @@ impl ActorId { /// Determine if this actor id is a local or remote actor /// /// Returns [true] if it is a local actor, [false] otherwise - pub fn is_local(&self) -> bool { + pub const fn is_local(&self) -> bool { matches!(self, ActorId::Local(_)) } /// Retrieve the actor's PID /// /// Returns the actor's [u64] instance identifier (process id). - pub fn pid(&self) -> u64 { + pub const fn pid(&self) -> u64 { match self { ActorId::Local(pid) => *pid, ActorId::Remote { pid, .. } => *pid, } } + + /// Retrieve the node id of this PID. 0 = a local actor, while + /// any non-zero value is the ide of the remote node running this actor + pub const fn node(&self) -> u64 { + match self { + ActorId::Local(_) => 0, + ActorId::Remote { node_id, .. } => *node_id, + } + } } impl Display for ActorId { diff --git a/ractor/src/actor/actor_ref.rs b/ractor/src/actor/actor_ref.rs index e73ad65b..a60b6f61 100644 --- a/ractor/src/actor/actor_ref.rs +++ b/ractor/src/actor/actor_ref.rs @@ -5,7 +5,6 @@ //! [ActorRef] is a strongly-typed wrapper over an [ActorCell] -use std::any::TypeId; use std::marker::PhantomData; use crate::{ActorName, Message, MessagingErr, SupervisionEvent}; @@ -57,7 +56,7 @@ impl From> for ActorCell { impl std::fmt::Debug for ActorRef { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.inner) + self.inner.fmt(f) } } @@ -101,13 +100,11 @@ where pub fn where_is(name: ActorName) -> Option> { if let Some(actor) = crate::registry::where_is(name) { // check the type id when pulling from the registry - if actor.get_type_id() == TypeId::of::() { - Some(actor.into()) - } else { - None + let check = actor.is_message_type_of::(); + if check.is_none() || matches!(check, Some(true)) { + return Some(actor.into()); } - } else { - None } + None } } diff --git a/ractor/src/actor/messages.rs b/ractor/src/actor/messages.rs index cd7077ec..fac638d1 100644 --- a/ractor/src/actor/messages.rs +++ b/ractor/src/actor/messages.rs @@ -26,7 +26,7 @@ pub struct BoxedState { impl Debug for BoxedState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "BoxedState") + f.debug_struct("BoxedState").finish() } } @@ -61,6 +61,7 @@ impl BoxedState { } /// Messages to stop an actor +#[derive(Debug)] pub enum StopMessage { /// Normal stop Stop, @@ -68,12 +69,6 @@ pub enum StopMessage { Reason(String), } -impl Debug for StopMessage { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Stop message: {self}") - } -} - impl std::fmt::Display for StopMessage { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -172,18 +167,12 @@ impl std::fmt::Display for SupervisionEvent { } /// A signal message which takes priority above all else -#[derive(Clone)] +#[derive(Clone, Debug)] pub enum Signal { /// Terminate the agent, cancelling all async work immediately Kill, } -impl Debug for Signal { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Signal: {self}") - } -} - impl std::fmt::Display for Signal { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { diff --git a/ractor/src/actor/mod.rs b/ractor/src/actor/mod.rs index 4d7c5cf9..8fdd6bd1 100644 --- a/ractor/src/actor/mod.rs +++ b/ractor/src/actor/mod.rs @@ -102,7 +102,7 @@ pub(crate) fn get_panic_string(e: Box) -> ActorProcess /// * `post_stop` /// * `handle` /// * `handle_serialized` (Available with `cluster` feature only) -/// * `handle_supervision_evt` +/// * `handle_supervisor_evt` /// /// return a [Result<_, ActorProcessingError>] where the error type is an /// alias of [Box]. This is treated @@ -481,11 +481,10 @@ where impl Debug for ActorRuntime { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - if let Some(name) = self.name.as_ref() { - write!(f, "ActorRuntime('{}' - {})", name, self.id) - } else { - write!(f, "ActorRuntime({})", self.id) - } + f.debug_struct("ActorRuntime") + .field("name", &self.name) + .field("id", &self.id) + .finish() } } diff --git a/ractor/src/actor/supervision.rs b/ractor/src/actor/supervision.rs index 0b9e21a4..62c5e197 100644 --- a/ractor/src/actor/supervision.rs +++ b/ractor/src/actor/supervision.rs @@ -52,11 +52,7 @@ impl SupervisionTree { /// from the supervision tree since the supervisor is shutting down /// and can't deal with superivison events anyways pub(crate) fn terminate_all_children(&self) { - let mut guard = self.children.lock().unwrap(); - let cells = guard.iter().map(|(_, a)| a.clone()).collect::>(); - guard.clear(); - // drop the guard to not deadlock on double-link - drop(guard); + let cells = self.get_children(); for cell in cells { cell.terminate(); cell.clear_supervisor(); @@ -65,11 +61,7 @@ impl SupervisionTree { /// Stop all the linked children, but does NOT unlink them (stop flow will do that) pub(crate) fn stop_all_children(&self, reason: Option) { - let mut guard = self.children.lock().unwrap(); - let cells = guard.iter().map(|(_, a)| a.clone()).collect::>(); - guard.clear(); - // drop the guard to not deadlock on double-link - drop(guard); + let cells = self.get_children(); for cell in cells { cell.stop(reason.clone()); } @@ -77,11 +69,7 @@ impl SupervisionTree { /// Drain all the linked children, but does NOT unlink them pub(crate) fn drain_all_children(&self) { - let mut guard = self.children.lock().unwrap(); - let cells = guard.iter().map(|(_, a)| a.clone()).collect::>(); - guard.clear(); - // drop the guard to not deadlock on double-link - drop(guard); + let cells = self.get_children(); for cell in cells { _ = cell.drain(); } @@ -94,14 +82,7 @@ impl SupervisionTree { reason: Option, timeout: Option, ) { - let cells; - { - let mut guard = self.children.lock().unwrap(); - cells = guard.iter().map(|(_, a)| a.clone()).collect::>(); - guard.clear(); - // drop the guard to not deadlock on double-link - drop(guard); - } + let cells = self.get_children(); let mut js = crate::concurrency::JoinSet::new(); for cell in cells { let lreason = reason.clone(); @@ -116,14 +97,7 @@ impl SupervisionTree { &self, timeout: Option, ) { - let cells; - { - let mut guard = self.children.lock().unwrap(); - cells = guard.iter().map(|(_, a)| a.clone()).collect::>(); - guard.clear(); - // drop the guard to not deadlock on double-link - drop(guard); - } + let cells = self.get_children(); let mut js = crate::concurrency::JoinSet::new(); for cell in cells { let ltimeout = timeout; diff --git a/ractor/src/actor/tests/mod.rs b/ractor/src/actor/tests/mod.rs index 53a3da41..5be425c8 100644 --- a/ractor/src/actor/tests/mod.rs +++ b/ractor/src/actor/tests/mod.rs @@ -956,6 +956,8 @@ fn returns_actor_references() { ]; for (want, event) in tests { + // Cloned cells are "equal" since they point to the same actor id + assert_eq!(event.actor_cell(), event.actor_cell().clone()); assert_eq!(event.actor_cell().is_some(), want); assert_eq!(event.actor_id().is_some(), want); } diff --git a/ractor/src/factory/factoryimpl.rs b/ractor/src/factory/factoryimpl.rs index 99344180..26555b04 100644 --- a/ractor/src/factory/factoryimpl.rs +++ b/ractor/src/factory/factoryimpl.rs @@ -11,6 +11,8 @@ use std::fmt::Debug; use std::marker::PhantomData; use std::sync::Arc; +use bon::Builder; + use self::routing::RouteResult; use crate::concurrency::Duration; use crate::concurrency::Instant; @@ -97,6 +99,8 @@ where } /// Arguments for configuring and starting a [Factory] actor instance. +#[derive(Builder)] +#[builder(on(String, into))] pub struct FactoryArguments where TKey: JobKey, @@ -115,6 +119,9 @@ where /// construct new workers when needed. pub worker_builder: Box>, /// Number of (initial) workers in the factory + /// + /// Default = `1` worker + #[builder(default = 1)] pub num_initial_workers: usize, /// Message routing handler pub router: TRouter, @@ -128,25 +135,36 @@ where /// will cause a job at the head or tail of the queue to be dropped (which is /// controlled by `discard_mode`). /// - /// * For factories using [routing::QueuerRouting], [routing::StickyQueuerRouting] routing, these - /// are applied to the factory's internal queue. - /// * For all other routing protocols, this applies to the worker's message queue + /// * For factories using routing protocols like [routing::QueuerRouting], + /// [routing::StickyQueuerRouting] routing, these are applied to the factory's internal queue. + /// * For all other routing non-factory-queueing protocols, + /// this applies to the worker's message queue /// /// Default is [DiscardSettings::None] + #[builder(default = DiscardSettings::None)] pub discard_settings: DiscardSettings, /// Controls the "dead man's" switching logic on the factory. Periodically /// the factory will scan for stuck workers. If detected, the worker information /// will be logged along with the current job key information. Optionally the worker /// can be killed and replaced by the factory + /// + /// Default is [None] pub dead_mans_switch: Option, /// Controls the parallel capacity of the worker pool by dynamically growing/shrinking the pool + /// + /// Default is [None] pub capacity_controller: Option>, - - /// Lifecycle hooks which provide access to points in the factory's lifecycle - /// for shutdown/startup/draining + /// Lifecycle hooks provide access to points in the factory's lifecycle + /// for shutdown/startup/draining where user-defined logic can execute (and + /// block factory lifecycle at critical points). For example, this means + /// the factory won't start accepting requests until the complete startup routine + /// is completed. + /// + /// Default is [None] pub lifecycle_hooks: Option>>, - /// Defines the statistics collection layer for the factory. Useful for tracking factory properties. + /// + /// Default is [None] pub stats: Option>, } @@ -170,231 +188,17 @@ where .field("queue", &std::any::type_name::()) .field("discard_settings", &self.discard_settings) .field("dead_mans_switch", &self.dead_mans_switch) + .field( + "has_capacity_controller", + &self.capacity_controller.is_some(), + ) + .field("has_lifecycle_hooks", &self.lifecycle_hooks.is_some()) + .field("has_stats", &self.stats.is_some()) + .field("has_discard_handler", &self.discard_handler.is_some()) .finish() } } -/// Builder for [FactoryArguments] which can be used to build the -/// [Factory]'s startup arguments. -pub struct FactoryArgumentsBuilder -where - TKey: JobKey, - TMsg: Message, - TWorkerStart: Message, - TWorker: Actor< - Msg = WorkerMessage, - Arguments = WorkerStartContext, - >, - TRouter: Router, - TQueue: Queue, -{ - // Required - worker_builder: Box>, - num_initial_workers: usize, - router: TRouter, - queue: TQueue, - // Optional - discard_handler: Option>>, - discard_settings: DiscardSettings, - dead_mans_switch: Option, - capacity_controller: Option>, - lifecycle_hooks: Option>>, - stats: Option>, -} - -impl Debug - for FactoryArgumentsBuilder -where - TKey: JobKey, - TMsg: Message, - TWorkerStart: Message, - TWorker: Actor< - Msg = WorkerMessage, - Arguments = WorkerStartContext, - >, - TRouter: Router, - TQueue: Queue, -{ - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("FactoryArgumentsBuilder") - .field("num_initial_workers", &self.num_initial_workers) - .field("router", &std::any::type_name::()) - .field("queue", &std::any::type_name::()) - .field("discard_settings", &self.discard_settings) - .field("dead_mans_switch", &self.dead_mans_switch) - .finish() - } -} - -impl - FactoryArgumentsBuilder -where - TKey: JobKey, - TMsg: Message, - TWorkerStart: Message, - TWorker: Actor< - Msg = WorkerMessage, - Arguments = WorkerStartContext, - >, - TRouter: Router, - TQueue: Queue, -{ - /// Construct a new [FactoryArguments] with the required arguments - /// - /// * `worker_builder`: The implementation of the [WorkerBuilder] trait which is - /// used to construct worker instances as needed - /// * `router`: The message routing implementation the factory should use. Implements - /// the [Router] trait. - /// * `queue`: The message queueing implementation the factory should use. Implements - /// the [Queue] trait. - pub fn new + 'static>( - worker_builder: TBuilder, - router: TRouter, - queue: TQueue, - ) -> Self { - Self { - worker_builder: Box::new(worker_builder), - num_initial_workers: 1, - router, - queue, - discard_handler: None, - discard_settings: DiscardSettings::None, - dead_mans_switch: None, - capacity_controller: None, - lifecycle_hooks: None, - stats: None, - } - } - - /// Build the [FactoryArguments] required to start the [Factory] - pub fn build(self) -> FactoryArguments { - let Self { - worker_builder, - num_initial_workers, - router, - queue, - discard_handler, - discard_settings, - dead_mans_switch, - capacity_controller, - lifecycle_hooks, - stats, - } = self; - FactoryArguments { - worker_builder, - num_initial_workers, - router, - queue, - discard_handler, - discard_settings, - dead_mans_switch, - capacity_controller, - lifecycle_hooks, - stats, - } - } - - /// Sets the the number initial workers in the factory. - /// - /// This controls the factory's initial parallelism for handling - /// concurrent messages. - pub fn with_number_of_initial_workers(self, worker_count: usize) -> Self { - Self { - num_initial_workers: worker_count, - ..self - } - } - - /// Sets the factory's discard handler. This is a callback which - /// is used when a job is discarded (e.g. loadshed, timeout, shutdown, etc). - /// - /// Default is [None] - pub fn with_discard_handler>( - self, - discard_handler: TDiscard, - ) -> Self { - Self { - discard_handler: Some(Arc::new(discard_handler)), - ..self - } - } - - /// Sets the factory's discard settings. - /// - /// This controls the maximum queue length. Any job arriving when the queue is at - /// its max length will cause a job at the head or tail of the queue to be dropped - /// (which is controlled by `discard_mode`). - /// - /// * For factories using [routing::QueuerRouting], [routing::StickyQueuerRouting] routing, these - /// are applied to the factory's internal queue. - /// * For all other routing protocols, this applies to the worker's message queue - /// - /// Default is [DiscardSettings::None] - pub fn with_discard_settings(self, discard_settings: DiscardSettings) -> Self { - Self { - discard_settings, - ..self - } - } - - /// Controls the "dead man's" switching logic on the factory. - /// - /// Periodically the factory can scan for stuck workers. If detected, the worker information - /// will be logged along with the current job key information. - /// - /// Optionally the worker can be killed and replaced by the factory. - /// - /// This can be used to detect and kill stuck jobs that will never compelte in a reasonable - /// time (e.g. haven't configured internally a job execution timeout or something). - pub fn with_dead_mans_switch(self, dmd: DeadMansSwitchConfiguration) -> Self { - Self { - dead_mans_switch: Some(dmd), - ..self - } - } - - /// Set the factory's dynamic worker capacity controller. - /// - /// This, at runtime, controls the factory's capacity (i.e. number of - /// workers) and can adjust it up and down (bounded in `[1,1_000_000]`). - pub fn with_capacity_controller( - self, - capacity_controller: TCapacity, - ) -> Self { - Self { - capacity_controller: Some(Box::new(capacity_controller)), - ..self - } - } - - /// Sets the factory's lifecycle hooks implementation - /// - /// Lifecycle hooks provide access to points in the factory's lifecycle - /// for shutdown/startup/draining where user-defined logic can execute (and - /// block factory lifecycle at critical points). This means - /// the factory won't start accepting requests until the complete startup routine - /// is completed. - pub fn with_lifecycle_hooks>( - self, - lifecycle_hooks: TLifecycle, - ) -> Self { - Self { - lifecycle_hooks: Some(Box::new(lifecycle_hooks)), - ..self - } - } - - /// Sets the factory's statistics collection implementation - /// - /// This can be used to aggregate various statistics about the factory's processing. - pub fn with_stats_collector(self, stats: TStats) -> Self { - Self { - stats: Some(Arc::new(stats)), - ..self - } - } -} - /// State of a factory (backlogged jobs, handler, etc) pub struct FactoryState where @@ -445,6 +249,13 @@ where .field("discard_settings", &self.discard_settings) .field("dead_mans_switch", &self.dead_mans_switch) .field("drain_state", &self.drain_state) + .field( + "has_capacity_controller", + &self.capacity_controller.is_some(), + ) + .field("has_lifecycle_hooks", &self.lifecycle_hooks.is_some()) + .field("has_stats", &self.stats.is_some()) + .field("has_discard_handler", &self.discard_handler.is_some()) .finish() } } @@ -757,7 +568,7 @@ where if let Some(capacity_controller) = &mut self.capacity_controller { let new_capacity = capacity_controller.get_pool_size(self.pool_size).await; if self.pool_size != new_capacity { - tracing::info!("Factory worker count {}", new_capacity); + tracing::info!(factory = ?myself, "Factory worker count {}", new_capacity); self.resize_pool(myself, new_capacity).await?; } } @@ -893,7 +704,7 @@ where &self, myself: ActorRef>, FactoryArguments { - worker_builder, + mut worker_builder, num_initial_workers, router, queue, diff --git a/ractor/src/factory/job.rs b/ractor/src/factory/job.rs index bd627ea2..0e3b76bf 100644 --- a/ractor/src/factory/job.rs +++ b/ractor/src/factory/job.rs @@ -10,6 +10,8 @@ use std::panic::RefUnwindSafe; use std::sync::Arc; use std::{hash::Hash, time::SystemTime}; +use bon::Builder; + use crate::{concurrency::Duration, Message}; use crate::{ActorRef, RpcReplyPort}; @@ -109,6 +111,7 @@ impl BytesConvertable for JobOptions { /// Depending on the [super::Factory]'s routing scheme the /// [Job]'s `key` is utilized to dispatch the job to specific /// workers. +#[derive(Builder)] pub struct Job where TKey: JobKey, @@ -120,6 +123,9 @@ where pub msg: TMsg, /// The job's options, mainly related to timing /// information of the job + /// + /// Default = [JobOptions::default()] + #[builder(default = JobOptions::default())] pub options: JobOptions, /// If provided, this channel can be used to block pushes /// into the factory until the factory can "accept" the message @@ -130,6 +136,8 @@ where /// The reply channel return [None] if the job was accepted, or /// [Some(`Job`)] if it was rejected & loadshed, and then the /// job may be retried by the caller at a later time (if desired). + /// + /// Default = [None] pub accepted: Option>>, } @@ -141,6 +149,7 @@ where fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Job") .field("options", &self.options) + .field("has_accepted", &self.accepted.is_some()) .finish() } } @@ -268,7 +277,10 @@ where TKey: JobKey, TMsg: Message, { - /// Determine if this job is expired + /// Determine if this job's TTL is expired + /// + /// Expiration only takes effect prior to the job being + /// started execution on a worker. pub fn is_expired(&self) -> bool { if let Some(ttl) = self.options.ttl { self.options.submit_time.elapsed().unwrap() > ttl @@ -277,7 +289,7 @@ where } } - /// Set the time the factor received the job + /// Set the time the factory received the job pub(crate) fn set_factory_time(&mut self) { self.options.factory_time = SystemTime::now(); } @@ -287,14 +299,15 @@ where self.options.worker_time = SystemTime::now(); } - /// Accept the job (telling the submitter that the job was accepted and enqueued to the factory) + /// Accept the job (if needed, telling the submitter that the job + /// was accepted and enqueued to the factory) pub(crate) fn accept(&mut self) { if let Some(port) = self.accepted.take() { let _ = port.send(None); } } - /// Reject the job. Consumes the job and returns it to the caller under backpressure scenarios. + /// Reject the job. Consumes the job and returns it to the caller (if needed). pub(crate) fn reject(mut self) { if let Some(port) = self.accepted.take() { let _ = port.send(Some(self)); diff --git a/ractor/src/factory/mod.rs b/ractor/src/factory/mod.rs index cd86f619..fbb98e16 100644 --- a/ractor/src/factory/mod.rs +++ b/ractor/src/factory/mod.rs @@ -118,7 +118,7 @@ //! /// Used by the factory to build new [ExampleWorker]s. //! struct ExampleWorkerBuilder; //! impl WorkerBuilder for ExampleWorkerBuilder { -//! fn build(&self, _wid: usize) -> (ExampleWorker, ()) { +//! fn build(&mut self, _wid: usize) -> (ExampleWorker, ()) { //! (ExampleWorker, ()) //! } //! } @@ -132,8 +132,11 @@ //! routing::QueuerRouting<(), ExampleMessage>, //! queues::DefaultQueue<(), ExampleMessage> //! >::default(); -//! let factory_args = FactoryArgumentsBuilder::new(ExampleWorkerBuilder, Default::default(), Default::default()) -//! .with_number_of_initial_workers(5) +//! let factory_args = FactoryArguments::builder() +//! .worker_builder(Box::new(ExampleWorkerBuilder)) +//! .queue(Default::default()) +//! .router(Default::default()) +//! .num_initial_workers(5) //! .build(); //! //! let (factory, handle) = Actor::spawn(None, factory_def, factory_args) diff --git a/ractor/src/factory/queues.rs b/ractor/src/factory/queues.rs index 253b0c1e..11a6a575 100644 --- a/ractor/src/factory/queues.rs +++ b/ractor/src/factory/queues.rs @@ -47,7 +47,7 @@ where /// Remove expired items from the queue /// /// * `discard_handler` - The handler to call for each discarded job. Will be called - /// with [DiscardReason::Loadshed]. + /// with [DiscardReason::TtlExpired]. /// /// Returns the number of elements removed from the queue fn remove_expired_items( diff --git a/ractor/src/factory/tests/basic.rs b/ractor/src/factory/tests/basic.rs index db0f6c92..bdbd1e18 100644 --- a/ractor/src/factory/tests/basic.rs +++ b/ractor/src/factory/tests/basic.rs @@ -104,7 +104,7 @@ struct FastTestWorkerBuilder { } impl WorkerBuilder for FastTestWorkerBuilder { - fn build(&self, wid: usize) -> (TestWorker, ()) { + fn build(&mut self, wid: usize) -> (TestWorker, ()) { ( TestWorker { counter: self.counters[wid].clone(), @@ -120,7 +120,7 @@ struct SlowTestWorkerBuilder { } impl WorkerBuilder for SlowTestWorkerBuilder { - fn build(&self, wid: usize) -> (TestWorker, ()) { + fn build(&mut self, wid: usize) -> (TestWorker, ()) { ( TestWorker { counter: self.counters[wid].clone(), @@ -136,7 +136,7 @@ struct InsanelySlowWorkerBuilder { } impl WorkerBuilder for InsanelySlowWorkerBuilder { - fn build(&self, wid: usize) -> (TestWorker, ()) { + fn build(&mut self, wid: usize) -> (TestWorker, ()) { ( TestWorker { counter: self.counters[wid].clone(), @@ -682,7 +682,7 @@ async fn test_stuck_workers() { } impl WorkerBuilder for StuckWorkerBuilder { - fn build(&self, wid: usize) -> (TestWorker, ()) { + fn build(&mut self, wid: usize) -> (TestWorker, ()) { ( TestWorker { counter: self.counters[wid].clone(), @@ -704,36 +704,37 @@ async fn test_stuck_workers() { routing::RoundRobinRouting, DefaultQueue, >::default(); - let (factory, factory_handle) = Actor::spawn( - None, - factory_definition, - FactoryArguments { - num_initial_workers: NUM_TEST_WORKERS, - queue: DefaultQueue::default(), - router: Default::default(), - capacity_controller: None, - dead_mans_switch: Some(DeadMansSwitchConfiguration { - detection_timeout: Duration::from_millis(50), - kill_worker: true, - }), - discard_handler: None, - discard_settings: DiscardSettings::None, - lifecycle_hooks: None, - worker_builder: Box::new(worker_builder), - stats: None, - }, - ) - .await - .expect("Failed to spawn factory"); + let dms = DeadMansSwitchConfiguration::builder() + .detection_timeout(Duration::from_millis(50)) + .kill_worker(true) + .build(); + tracing::debug!("DMS settings: {dms:?}"); + let args = FactoryArguments::builder() + .num_initial_workers(NUM_TEST_WORKERS) + .queue(Default::default()) + .router(Default::default()) + .worker_builder(Box::new(worker_builder)) + .dead_mans_switch(dms) + .build(); + tracing::debug!("Factory args {args:?}"); + let (factory, factory_handle) = Actor::spawn(None, factory_definition, args) + .await + .expect("Failed to spawn factory"); + + tracing::debug!( + "Actor node {}, pid {}", + factory.get_id().node(), + factory.get_id().pid() + ); for _ in 0..9 { factory - .cast(FactoryMessage::Dispatch(Job { - key: TestKey { id: 1 }, - msg: TestMessage::Ok, - options: JobOptions::default(), - accepted: None, - })) + .cast(FactoryMessage::Dispatch( + Job::builder() + .key(TestKey { id: 1 }) + .msg(TestMessage::Ok) + .build(), + )) .expect("Failed to send to factory"); } diff --git a/ractor/src/factory/tests/draining_requests.rs b/ractor/src/factory/tests/draining_requests.rs index f7df3459..e0d90b61 100644 --- a/ractor/src/factory/tests/draining_requests.rs +++ b/ractor/src/factory/tests/draining_requests.rs @@ -94,7 +94,7 @@ struct SlowWorkerBuilder { } impl WorkerBuilder for SlowWorkerBuilder { - fn build(&self, _wid: usize) -> (TestWorker, ()) { + fn build(&mut self, _wid: usize) -> (TestWorker, ()) { ( TestWorker { counter: self.counter.clone(), @@ -120,24 +120,15 @@ async fn test_request_draining() { routing::QueuerRouting, queues::DefaultQueue, >::default(); - let (factory, factory_handle) = Actor::spawn( - None, - factory_definition, - FactoryArguments { - num_initial_workers: 2, - queue: queues::DefaultQueue::default(), - router: Default::default(), - capacity_controller: None, - dead_mans_switch: None, - discard_handler: None, - discard_settings: DiscardSettings::None, - lifecycle_hooks: None, - worker_builder: Box::new(worker_builder), - stats: None, - }, - ) - .await - .expect("Failed to spawn factory"); + let args = FactoryArguments::builder() + .num_initial_workers(2) + .queue(Default::default()) + .router(Default::default()) + .worker_builder(Box::new(worker_builder)) + .build(); + let (factory, factory_handle) = Actor::spawn(None, factory_definition, args) + .await + .expect("Failed to spawn factory"); for id in 0..999 { factory diff --git a/ractor/src/factory/tests/dynamic_discarding.rs b/ractor/src/factory/tests/dynamic_discarding.rs index 97dc4877..26661eca 100644 --- a/ractor/src/factory/tests/dynamic_discarding.rs +++ b/ractor/src/factory/tests/dynamic_discarding.rs @@ -97,7 +97,7 @@ struct SlowTestWorkerBuilder { } impl WorkerBuilder for SlowTestWorkerBuilder { - fn build(&self, wid: usize) -> (TestWorker, ()) { + fn build(&mut self, wid: usize) -> (TestWorker, ()) { ( TestWorker { counter: self.counters[wid].clone(), diff --git a/ractor/src/factory/tests/dynamic_pool.rs b/ractor/src/factory/tests/dynamic_pool.rs index 9c58ceb9..d8cd68c5 100644 --- a/ractor/src/factory/tests/dynamic_pool.rs +++ b/ractor/src/factory/tests/dynamic_pool.rs @@ -93,7 +93,7 @@ struct TestWorkerBuilder { } impl WorkerBuilder for TestWorkerBuilder { - fn build(&self, _wid: usize) -> (TestWorker, ()) { + fn build(&mut self, _wid: usize) -> (TestWorker, ()) { ( TestWorker { id_map: self.id_map.clone(), diff --git a/ractor/src/factory/tests/lifecycle.rs b/ractor/src/factory/tests/lifecycle.rs index 9a13f6cd..61fb31b7 100644 --- a/ractor/src/factory/tests/lifecycle.rs +++ b/ractor/src/factory/tests/lifecycle.rs @@ -141,7 +141,7 @@ impl Actor for TestWorker { struct TestWorkerBuilder; impl WorkerBuilder for TestWorkerBuilder { - fn build(&self, _wid: crate::factory::WorkerId) -> (TestWorker, ()) { + fn build(&mut self, _wid: crate::factory::WorkerId) -> (TestWorker, ()) { (TestWorker, ()) } } diff --git a/ractor/src/factory/tests/priority_queueing.rs b/ractor/src/factory/tests/priority_queueing.rs index 6b2a4291..5abe90b1 100644 --- a/ractor/src/factory/tests/priority_queueing.rs +++ b/ractor/src/factory/tests/priority_queueing.rs @@ -104,7 +104,7 @@ struct TestWorkerBuilder { } impl WorkerBuilder for TestWorkerBuilder { - fn build(&self, _wid: usize) -> (TestWorker, ()) { + fn build(&mut self, _wid: usize) -> (TestWorker, ()) { ( TestWorker { counters: self.counters.clone(), @@ -143,27 +143,17 @@ async fn test_basic_priority_queueing() { { StandardPriority::size() }, >, >::default(); - let (factory, factory_handle) = Actor::spawn( - None, - factory_definition, - FactoryArguments { - num_initial_workers: 1, - queue: queues::PriorityQueue::new(TestPriorityManager), - router: Default::default(), - capacity_controller: None, - dead_mans_switch: None, - discard_handler: None, - discard_settings: DiscardSettings::None, - lifecycle_hooks: None, - worker_builder: Box::new(TestWorkerBuilder { - counters: counters.clone(), - signal: signal.clone(), - }), - stats: None, - }, - ) - .await - .expect("Failed to spawn factory"); + let args = FactoryArguments::builder() + .queue(queues::PriorityQueue::new(TestPriorityManager)) + .router(Default::default()) + .worker_builder(Box::new(TestWorkerBuilder { + counters: counters.clone(), + signal: signal.clone(), + })) + .build(); + let (factory, factory_handle) = Actor::spawn(None, factory_definition, args) + .await + .expect("Failed to spawn factory"); // Act // Send 5 high pri and 5 low pri messages to the factory. Only the high pri should diff --git a/ractor/src/factory/tests/worker_lifecycle.rs b/ractor/src/factory/tests/worker_lifecycle.rs index d04c51d0..4c59c81c 100644 --- a/ractor/src/factory/tests/worker_lifecycle.rs +++ b/ractor/src/factory/tests/worker_lifecycle.rs @@ -87,7 +87,7 @@ struct MyWorkerBuilder { } impl WorkerBuilder for MyWorkerBuilder { - fn build(&self, _wid: crate::factory::WorkerId) -> (MyWorker, ()) { + fn build(&mut self, _wid: crate::factory::WorkerId) -> (MyWorker, ()) { ( MyWorker { counter: self.counter.clone(), diff --git a/ractor/src/factory/worker.rs b/ractor/src/factory/worker.rs index a1124e11..92dc3a92 100644 --- a/ractor/src/factory/worker.rs +++ b/ractor/src/factory/worker.rs @@ -9,6 +9,8 @@ use std::collections::{HashMap, VecDeque}; use std::fmt::Debug; use std::sync::Arc; +use bon::Builder; + use crate::concurrency::{Duration, Instant, JoinHandle}; use crate::{Actor, ActorId, ActorProcessingErr}; use crate::{ActorRef, Message, MessagingErr}; @@ -22,12 +24,15 @@ use super::WorkerId; use super::{DiscardHandler, DiscardReason, JobOptions}; /// The configuration for the dead-man's switch functionality -#[derive(Debug)] +#[derive(Builder, Debug)] pub struct DeadMansSwitchConfiguration { /// Duration before determining worker is stuck pub detection_timeout: Duration, /// Flag denoting if the stuck worker should be killed /// and restarted + /// + /// Default = [true] + #[builder(default = true)] pub kill_worker: bool, } @@ -46,7 +51,7 @@ where /// /// Returns a tuple of the worker and a custom startup definition giving the worker /// owned control of some structs that it may need to work. - fn build(&self, wid: WorkerId) -> (TWorker, TWorkerStart); + fn build(&mut self, wid: WorkerId) -> (TWorker, TWorkerStart); } /// Controls the size of the worker pool by dynamically growing/shrinking the pool diff --git a/ractor_cluster/Cargo.toml b/ractor_cluster/Cargo.toml index 3b966faa..f9e593ae 100644 --- a/ractor_cluster/Cargo.toml +++ b/ractor_cluster/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ractor_cluster" -version = "0.12.4" +version = "0.13.0" authors = ["Sean Lawlor", "Evan Au", "Dillon George"] description = "Distributed cluster environment of Ractor actors" documentation = "https://docs.rs/ractor" @@ -23,8 +23,8 @@ prost-build = { version = "0.13" } bytes = { version = "1" } prost = { version = "0.13" } prost-types = { version = "0.13" } -ractor = { version = "0.12.0", features = ["cluster"], path = "../ractor" } -ractor_cluster_derive = { version = "0.12.0", path = "../ractor_cluster_derive" } +ractor = { version = "0.13.0", features = ["cluster"], path = "../ractor" } +ractor_cluster_derive = { version = "0.13.0", path = "../ractor_cluster_derive" } rand = "0.8" sha2 = "0.10" tokio = { version = "1", features = ["rt", "time", "sync", "macros", "net", "io-util", "tracing"]} diff --git a/ractor_cluster_derive/Cargo.toml b/ractor_cluster_derive/Cargo.toml index 10004496..e9a4cbcf 100644 --- a/ractor_cluster_derive/Cargo.toml +++ b/ractor_cluster_derive/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ractor_cluster_derive" -version = "0.12.4" +version = "0.13.0" authors = ["Sean Lawlor "] description = "Derives for ractor_cluster" license = "MIT"