diff --git a/ractor/Cargo.toml b/ractor/Cargo.toml index 61221a75..a71a41d6 100644 --- a/ractor/Cargo.toml +++ b/ractor/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ractor" -version = "0.9.8" +version = "0.10.0" authors = ["Sean Lawlor", "Evan Au", "Dillon George"] description = "A actor framework for Rust" documentation = "https://docs.rs/ractor" @@ -27,7 +27,7 @@ default = ["tokio_runtime", "async-trait"] dashmap = "5" futures = "0.3" once_cell = "1" -rand = "0.8" +strum = { version = "0.26", features = ["derive"] } ## Configurable dependencies # Tracing feature requires --cfg=tokio_unstable diff --git a/ractor/examples/counter.rs b/ractor/examples/counter.rs index 6a537932..39e286cd 100644 --- a/ractor/examples/counter.rs +++ b/ractor/examples/counter.rs @@ -12,6 +12,8 @@ //! cargo run --example counter //! ``` +#![allow(clippy::incompatible_msrv)] + extern crate ractor; use ractor::{call_t, Actor, ActorProcessingErr, ActorRef, RpcReplyPort}; diff --git a/ractor/examples/monte_carlo.rs b/ractor/examples/monte_carlo.rs index afaa1fcb..713309f8 100644 --- a/ractor/examples/monte_carlo.rs +++ b/ractor/examples/monte_carlo.rs @@ -15,6 +15,8 @@ //! cargo run --example monte_carlo //! ``` +#![allow(clippy::incompatible_msrv)] + use std::collections::HashMap; use ractor::{cast, Actor, ActorId, ActorProcessingErr, ActorRef}; diff --git a/ractor/examples/output_port.rs b/ractor/examples/output_port.rs index 38c2bd76..d4b28bbd 100644 --- a/ractor/examples/output_port.rs +++ b/ractor/examples/output_port.rs @@ -11,6 +11,8 @@ //! cargo run --example output_port //! ``` +#![allow(clippy::incompatible_msrv)] + extern crate ractor; use std::sync::Arc; diff --git a/ractor/examples/philosophers.rs b/ractor/examples/philosophers.rs index 6e0f7df9..f17fe84d 100644 --- a/ractor/examples/philosophers.rs +++ b/ractor/examples/philosophers.rs @@ -18,6 +18,8 @@ //! cargo run --example philosophers //! ``` +#![allow(clippy::incompatible_msrv)] + use std::collections::{HashMap, VecDeque}; use ractor::{cast, Actor, ActorId, ActorName, ActorProcessingErr, ActorRef, RpcReplyPort}; diff --git a/ractor/examples/ping_pong.rs b/ractor/examples/ping_pong.rs index 338b7040..e2e5f3f3 100644 --- a/ractor/examples/ping_pong.rs +++ b/ractor/examples/ping_pong.rs @@ -12,6 +12,8 @@ //! cargo run --example ping_pong //! ``` +#![allow(clippy::incompatible_msrv)] + extern crate ractor; use ractor::{cast, Actor, ActorProcessingErr, ActorRef}; diff --git a/ractor/examples/supervisor.rs b/ractor/examples/supervisor.rs index c6671d8f..32649f8b 100644 --- a/ractor/examples/supervisor.rs +++ b/ractor/examples/supervisor.rs @@ -11,6 +11,8 @@ //! cargo run --example supervisor //! ``` +#![allow(clippy::incompatible_msrv)] + use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort, SupervisionEvent}; use tokio::time::Duration; diff --git a/ractor/src/actor/actor_ref.rs b/ractor/src/actor/actor_ref.rs index 3cc5b5a7..0dbe678a 100644 --- a/ractor/src/actor/actor_ref.rs +++ b/ractor/src/actor/actor_ref.rs @@ -18,25 +18,16 @@ use super::ActorCell; /// /// An [ActorRef] is the primary means of communication typically used /// when interfacing with [super::Actor]s -/// -/// The [ActorRef] is SPECIFICALLY marked [Sync], regardless of the message type -/// because all usages of the message type are to send an owned instance of a message -/// and in no case is that message instance shared across threads. This is guaranteed -/// by the underlying Tokio channel usages. Without this manual marking of [Sync] on -/// [ActorRef], we would need to constrain the message type [Message] to be [Sync] which -/// is overly restrictive. pub struct ActorRef { pub(crate) inner: ActorCell, - _tactor: PhantomData, + _tactor: PhantomData TMessage>, } -unsafe impl Sync for ActorRef {} - impl Clone for ActorRef { fn clone(&self) -> Self { ActorRef { inner: self.inner.clone(), - _tactor: PhantomData::, + _tactor: PhantomData, } } } @@ -53,7 +44,7 @@ impl From for ActorRef { fn from(value: ActorCell) -> Self { Self { inner: value, - _tactor: PhantomData::, + _tactor: PhantomData, } } } diff --git a/ractor/src/factory/discard.rs b/ractor/src/factory/discard.rs new file mode 100644 index 00000000..33411c6a --- /dev/null +++ b/ractor/src/factory/discard.rs @@ -0,0 +1,151 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Discard handler managing when jobs are discarded +use super::Job; +use super::JobKey; +use crate::Message; + +/// The discard mode of a factory +#[derive(Eq, PartialEq, Clone, Copy)] +pub enum DiscardMode { + /// Discard oldest incoming jobs under backpressure + Oldest, + /// Discard newest incoming jobs under backpressure + Newest, +} + +/// A worker's copy of the discard settings. +/// +/// Originally we passed a cloned box of the dynamic calculator to the workers, +/// but what that would do is have every worker re-compute the discard limit +/// which if you have many thousands of workers is kind of useless. +/// +/// Instead now we have the workers treat the limit as static, but have the +/// factory compute the limit on it's interval and propagate the limit to the +/// workers. The workers "think" it's static, but the factory handles the dynamics. +/// This way the factory can keep the [DynamicDiscardHandler] as a single, uncloned +/// instance. It also moves NUM_WORKER calculations to 1. +pub(crate) enum WorkerDiscardSettings { + None, + Static { limit: usize, mode: DiscardMode }, +} + +impl WorkerDiscardSettings { + pub(crate) fn update_worker_limit(&mut self, new_limit: usize) { + if let Self::Static { limit, .. } = self { + *limit = new_limit; + } + } + + pub(crate) fn get_limit_and_mode(&self) -> Option<(usize, DiscardMode)> { + match self { + Self::None => None, + Self::Static { limit, mode, .. } => Some((*limit, *mode)), + } + } +} +/// If a factory supports job discarding (loadshedding) it can have a few configurations +/// which are defined in this enum. There is +/// +/// 1. No discarding +/// 2. A static queueing limit discarding, with a specific discarding mode +/// 3. A dynamic queueing limit for discards, with a specified discarding mode and init discard limit. +pub enum DiscardSettings { + /// Don't discard jobs + None, + /// A static, immutable limit + Static { + /// The current limit. If 0, means jobs will never queue and immediately be discarded + /// once all workers are busy + limit: usize, + /// Define the factory messaging discard mode denoting if the oldest or newest messages + /// should be discarded in back-pressure scenarios + /// + /// Default is [DiscardMode::Oldest], meaning discard jobs at the head of the queue + mode: DiscardMode, + }, + /// Dynamic discarding is where the discard limit can change over time, controlled + /// by the `updater` which is an implementation of the [DynamicDiscardController] + Dynamic { + /// The current limit. If 0, means jobs will never queue and immediately be discarded + /// once all workers are busy + limit: usize, + /// Define the factory messaging discard mode denoting if the oldest or newest messages + /// should be discarded in back-pressure scenarios + /// + /// Default is [DiscardMode::Oldest], meaning discard jobs at the head of the queue + mode: DiscardMode, + /// The [DynamicDiscardController] implementation, which computes new limits dynamically + /// based on whatever metrics it wants + updater: Box, + }, +} + +impl DiscardSettings { + pub(crate) fn get_worker_settings(&self) -> WorkerDiscardSettings { + match &self { + Self::None => WorkerDiscardSettings::None, + Self::Static { limit, mode } => WorkerDiscardSettings::Static { + limit: *limit, + mode: *mode, + }, + Self::Dynamic { limit, mode, .. } => WorkerDiscardSettings::Static { + limit: *limit, + mode: *mode, + }, + } + } + + /// Retrieve the discarding limit and [DiscardMode], if configured + pub fn get_limit_and_mode(&self) -> Option<(usize, DiscardMode)> { + match self { + Self::None => None, + Self::Static { limit, mode, .. } => Some((*limit, *mode)), + Self::Dynamic { limit, mode, .. } => Some((*limit, *mode)), + } + } +} + +/// Controls the dynamic concurrency level by receiving periodic snapshots of job statistics +/// and emitting a new concurrency limit +#[cfg_attr(feature = "async-trait", crate::async_trait)] +pub trait DynamicDiscardController: Send + Sync + 'static { + /// Compute the new threshold for discarding + /// + /// If you want to utilize metrics exposed in [crate::modular_factory::stats] you can gather them + /// by utilizing `stats_facebook::service_data::get_service_data_singleton` to retrieve a + /// accessor to `ServiceData` which you can then resolve stats by name (either timeseries or + /// counters) + /// + /// The namespace of stats collected on the base controller factory are + /// `base_controller.factory.{FACTORY_NAME}.{STAT}` + /// + /// If no factory name is set, then "all" will be inserted + async fn compute(&mut self, current_threshold: usize) -> usize; +} + +/// Reason for discarding a job +pub enum DiscardReason { + /// The job TTLd + TtlExpired, + /// The job was rejected or dropped due to loadshedding + Loadshed, + /// The job was dropped due to factory shutting down + Shutdown, +} + +/// Trait defining the discard handler for a factory. +pub trait DiscardHandler: Send + Sync + 'static +where + TKey: JobKey, + TMsg: Message, +{ + /// Called on a job prior to being dropped from the factory. + /// + /// Useful scenarios are (a) collecting metrics, (b) logging, (c) tearing + /// down resources in the job, etc. + fn discard(&self, reason: DiscardReason, job: &mut Job); +} diff --git a/ractor/src/factory/factoryimpl.rs b/ractor/src/factory/factoryimpl.rs new file mode 100644 index 00000000..b106e605 --- /dev/null +++ b/ractor/src/factory/factoryimpl.rs @@ -0,0 +1,1015 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Factory definition + +use std::cmp::Ordering; +use std::collections::HashMap; +use std::marker::PhantomData; +use std::sync::Arc; + +use self::routing::RouteResult; +use crate::concurrency::Duration; +use crate::concurrency::Instant; +use crate::Actor; +use crate::ActorProcessingErr; +use crate::ActorRef; +use crate::Message; +use crate::MessagingErr; +use crate::SpawnErr; +use crate::SupervisionEvent; + +use super::queues::Queue; +use super::routing::Router; +use super::*; + +/// The global execution limit, any more than 1M and realistically +/// we'll get into scheduling problems unless the requests have an +/// incredibly low reception rate and high processing latency. At +/// which point, a factory probably doesn't make great sense for +/// load-shedding customization +const GLOBAL_WORKER_POOL_MAXIMUM: usize = 1_000_000; + +#[cfg(test)] +const PING_FREQUENCY: Duration = Duration::from_millis(150); +#[cfg(not(test))] +const PING_FREQUENCY: Duration = Duration::from_millis(10_000); +const CALCULATE_FREQUENCY: Duration = Duration::from_millis(100); + +#[derive(Eq, PartialEq)] +enum DrainState { + NotDraining, + Draining, + Drained, +} + +/// Factory definition. +/// +/// This is a placeholder instance which contains all of the type specifications +/// for the factories properties +pub struct Factory +where + TKey: JobKey, + TMsg: Message, + TWorkerStart: Message, + TWorker: Actor< + Msg = WorkerMessage, + Arguments = WorkerStartContext, + >, + TRouter: Router, + TQueue: Queue, +{ + _key: PhantomData TKey>, + _msg: PhantomData TMsg>, + _worker_start: PhantomData TWorkerStart>, + _worker: PhantomData TWorker>, + _router: PhantomData TRouter>, + _queue: PhantomData TQueue>, +} + +impl Default + for Factory +where + TKey: JobKey, + TMsg: Message, + TWorkerStart: Message, + TWorker: Actor< + Msg = WorkerMessage, + Arguments = WorkerStartContext, + >, + TRouter: Router, + TQueue: Queue, +{ + fn default() -> Self { + Self { + _key: PhantomData, + _msg: PhantomData, + _worker_start: PhantomData, + _worker: PhantomData, + _router: PhantomData, + _queue: PhantomData, + } + } +} + +/// Arguments for configuring and starting a [Factory] actor instance. +pub struct FactoryArguments +where + TKey: JobKey, + TMsg: Message, + TWorkerStart: Message, + TWorker: Actor< + Msg = WorkerMessage, + Arguments = WorkerStartContext, + >, + TRouter: Router, + TQueue: Queue, +{ + /// The factory is responsible for spawning workers and re-spawning workers + /// under failure scenarios. This means that it needs to understand how to + /// build workers. The WorkerBuilder trait is used by the factory to + /// construct new workers when needed. + pub worker_builder: Box>, + /// Number of (initial) workers in the factory + pub num_initial_workers: usize, + /// Message routing handler + pub router: TRouter, + /// Message queue implementation for the factory + pub queue: TQueue, + /// Discard callback when a job is discarded. + /// + /// Default is [None] + pub discard_handler: Option>>, + /// Maximum queue length. Any job arriving when the queue is at its max length + /// will cause a job at the head or tail of the queue to be dropped (which is + /// controlled by `discard_mode`). + /// + /// * For factories using [routing::QueuerRouting], [routing::StickyQueuerRouting] routing, these + /// are applied to the factory's internal queue. + /// * For all other routing protocols, this applies to the worker's message queue + /// + /// Default is [DiscardSettings::None] + pub discard_settings: DiscardSettings, + /// Controls the "dead man's" switching logic on the factory. Periodically + /// the factory will scan for stuck workers. If detected, the worker information + /// will be logged along with the current job key information. Optionally the worker + /// can be killed and replaced by the factory + pub dead_mans_switch: Option, + /// Controls the parallel capacity of the worker pool by dynamically growing/shrinking the pool + pub capacity_controller: Option>, + + /// Lifecycle hooks which provide access to points in the thrift factory's lifecycle + /// for shutdown/startup/draining + pub lifecycle_hooks: Option>>, + + /// Identifies if the factory should collect statstics around each worker + pub collect_worker_stats: bool, +} + +/// Builder for [FactoryArguments] which can be used to build the +/// [Factory]'s startup arguments. +pub struct FactoryArgumentsBuilder +where + TKey: JobKey, + TMsg: Message, + TWorkerStart: Message, + TWorker: Actor< + Msg = WorkerMessage, + Arguments = WorkerStartContext, + >, + TRouter: Router, + TQueue: Queue, +{ + // Required + worker_builder: Box>, + num_initial_workers: usize, + router: TRouter, + queue: TQueue, + // Optional + discard_handler: Option>>, + discard_settings: DiscardSettings, + dead_mans_switch: Option, + capacity_controller: Option>, + lifecycle_hooks: Option>>, + collect_worker_stats: bool, +} + +impl + FactoryArgumentsBuilder +where + TKey: JobKey, + TMsg: Message, + TWorkerStart: Message, + TWorker: Actor< + Msg = WorkerMessage, + Arguments = WorkerStartContext, + >, + TRouter: Router, + TQueue: Queue, +{ + /// Construct a new [FactoryArguments] with the required arguments + /// + /// * `worker_builder`: The implementation of the [WorkerBuilder] trait which is + /// used to construct worker instances as needed + /// * `router`: The message routing implementation the factory should use. Implements + /// the [Router] trait. + /// * `queue`: The message queueing implementation the factory should use. Implements + /// the [FactoryQueue] trait. + pub fn new + 'static>( + worker_builder: TBuilder, + router: TRouter, + queue: TQueue, + ) -> Self { + Self { + worker_builder: Box::new(worker_builder), + num_initial_workers: 1, + router, + queue, + discard_handler: None, + discard_settings: DiscardSettings::None, + dead_mans_switch: None, + capacity_controller: None, + lifecycle_hooks: None, + collect_worker_stats: false, + } + } + + /// Build the [FactoryArguments] required to start the [Factory] + pub fn build(self) -> FactoryArguments { + let Self { + worker_builder, + num_initial_workers, + router, + queue, + discard_handler, + discard_settings, + dead_mans_switch, + capacity_controller, + lifecycle_hooks, + collect_worker_stats, + } = self; + FactoryArguments { + worker_builder, + num_initial_workers, + router, + queue, + discard_handler, + discard_settings, + dead_mans_switch, + capacity_controller, + lifecycle_hooks, + collect_worker_stats, + } + } + + /// Sets the the number initial workers in the factory. + /// + /// This controls the factory's initial parallelism for handling + /// concurrent messages. + pub fn with_number_of_initial_workers(self, worker_count: usize) -> Self { + Self { + num_initial_workers: worker_count, + ..self + } + } + + /// Sets the factory's discard handler. This is a callback which + /// is used when a job is discarded (e.g. loadshed, timeout, shutdown, etc). + /// + /// Default is [None] + pub fn with_discard_handler>( + self, + discard_handler: TDiscard, + ) -> Self { + Self { + discard_handler: Some(Arc::new(discard_handler)), + ..self + } + } + + /// Sets the factory's discard settings. + /// + /// This controls the maximum queue length. Any job arriving when the queue is at + /// its max length will cause a job at the head or tail of the queue to be dropped + /// (which is controlled by `discard_mode`). + /// + /// * For factories using [routing::QueuerRouting], [routing::StickyQueuerRouting] routing, these + /// are applied to the factory's internal queue. + /// * For all other routing protocols, this applies to the worker's message queue + /// + /// Default is [DiscardSettings::None] + pub fn with_discard_settings(self, discard_settings: DiscardSettings) -> Self { + Self { + discard_settings, + ..self + } + } + + /// Controls the "dead man's" switching logic on the factory. + /// + /// Periodically the factory can scan for stuck workers. If detected, the worker information + /// will be logged along with the current job key information. + /// + /// Optionally the worker can be killed and replaced by the factory. + /// + /// This can be used to detect and kill stuck jobs that will never compelte in a reasonable + /// time (e.g. haven't configured internally a job execution timeout or something). + pub fn with_dead_mans_switch(self, dmd: DeadMansSwitchConfiguration) -> Self { + Self { + dead_mans_switch: Some(dmd), + ..self + } + } + + /// Set the factory's dynamic worker capacity controller. + /// + /// This, at runtime, controls the factory's capacity (i.e. number of + /// workers) and can adjust it up and down (bounded in `[1,1_000_000]`). + pub fn with_capacity_controller( + self, + capacity_controller: TCapacity, + ) -> Self { + Self { + capacity_controller: Some(Box::new(capacity_controller)), + ..self + } + } + + /// Sets the factory's lifecycle hooks implementation + /// + /// Lifecycle hooks provide access to points in the thrift factory's lifecycle + /// for shutdown/startup/draining where user-defined logic can execute (and + /// block factory lifecycle at critical points). For example, in thrift, this means + /// the factory won't start accepting requests until the complete startup routine + /// is completed. + pub fn with_lifecycle_hooks>( + self, + lifecycle_hooks: TLifecycle, + ) -> Self { + Self { + lifecycle_hooks: Some(Box::new(lifecycle_hooks)), + ..self + } + } +} + +/// State of a factory (backlogged jobs, handler, etc) +pub struct FactoryState +where + TKey: JobKey, + TMsg: Message, + TWorker: Actor< + Msg = WorkerMessage, + Arguments = WorkerStartContext, + >, + TWorkerStart: Message, + TRouter: Router, + TQueue: Queue, +{ + worker_builder: Box>, + pool_size: usize, + pool: HashMap>, + stats: MessageProcessingStats, + collect_worker_stats: bool, + router: TRouter, + queue: TQueue, + discard_handler: Option>>, + discard_settings: DiscardSettings, + drain_state: DrainState, + dead_mans_switch: Option, + capacity_controller: Option>, + lifecycle_hooks: Option>>, +} + +impl + FactoryState +where + TKey: JobKey, + TMsg: Message, + TWorker: Actor< + Msg = WorkerMessage, + Arguments = WorkerStartContext, + >, + TWorkerStart: Message, + TRouter: Router, + TQueue: Queue, +{ + /// This method tries to + /// + /// 1. Cleanup expired jobs at the head of the queue, discarding them + /// 2. Route the next non-expired job (if any) + /// - If a worker-hint was provided, and the worker is available, route it there (this is used + /// for when workers have just completed work, and should immediately receive a new job) + /// - If no hint provided, route to the next worker by routing protocol. + fn try_route_next_active_job( + &mut self, + worker_hint: WorkerId, + ) -> Result<(), MessagingErr>> { + // cleanup expired messages at the head of the queue + while let Some(true) = self.queue.peek().map(|m| m.is_expired()) { + // remove the job from the queue + if let Some(mut job) = self.queue.pop_front() { + self.stats.job_ttl_expired(); + if let Some(handler) = &self.discard_handler { + handler.discard(DiscardReason::TtlExpired, &mut job); + } + } else { + break; + } + } + + if let Some(worker) = self.pool.get_mut(&worker_hint).filter(|f| f.is_available()) { + if let Some(job) = self.queue.pop_front() { + worker.enqueue_job(job)?; + } + } else { + // target the next available worker + let target_worker = self + .queue + .peek() + .and_then(|job| { + self.router + .choose_target_worker(job, self.pool_size, &self.pool) + }) + .and_then(|wid| self.pool.get_mut(&wid)); + if let (Some(job), Some(worker)) = (self.queue.pop_front(), target_worker) { + worker.enqueue_job(job)?; + } + } + Ok(()) + } + + fn maybe_enqueue(&mut self, mut job: Job) { + let is_discardable = self.queue.is_job_discardable(&job.key); + let limit_and_mode = self.discard_settings.get_limit_and_mode(); + + match limit_and_mode { + Some((limit, DiscardMode::Newest)) => { + if is_discardable && self.queue.len() >= limit { + // load-shed the job + if let Some(handler) = &self.discard_handler { + handler.discard(DiscardReason::Loadshed, &mut job); + } + self.stats.job_discarded(); + } else { + self.queue.push_back(job); + } + } + Some((limit, DiscardMode::Oldest)) => { + self.queue.push_back(job); + while self.queue.len() > limit { + // try and shed a job, of the lowest priority working up + if let Some(mut msg) = self.queue.discard_oldest() { + self.stats.job_discarded(); + if let Some(handler) = &self.discard_handler { + handler.discard(DiscardReason::Loadshed, &mut msg); + } + } + } + } + None => { + // no load-shedding + self.queue.push_back(job); + } + } + } + + async fn grow_pool( + &mut self, + myself: &ActorRef>, + to_add: usize, + ) -> Result<(), SpawnErr> { + let curr_size = self.pool_size; + for wid in curr_size..(curr_size + to_add) { + tracing::trace!("Adding worker {}", wid); + if let Some(existing_worker) = self.pool.get_mut(&wid) { + // mark the worker as healthy again + existing_worker.set_draining(false); + } else { + // worker doesn't exist, add it + let (handler, custom_start) = self.worker_builder.build(wid); + let context = WorkerStartContext { + wid, + factory: myself.clone(), + custom_start, + }; + let (worker, handle) = + Actor::spawn_linked(None, handler, context, myself.get_cell()).await?; + let discard_settings = if self.router.is_factory_queueing() { + discard::WorkerDiscardSettings::None + } else { + self.discard_settings.get_worker_settings() + }; + self.pool.insert( + wid, + WorkerProperties::new( + wid, + worker, + discard_settings, + self.discard_handler.clone(), + self.collect_worker_stats, + handle, + ), + ); + } + } + Ok(()) + } + + fn shrink_pool(&mut self, to_remove: usize) { + let curr_size = self.pool_size; + for wid in (curr_size - to_remove)..curr_size { + match self.pool.entry(wid) { + std::collections::hash_map::Entry::Occupied(mut existing_worker) => { + let mut_worker = existing_worker.get_mut(); + if mut_worker.is_working() { + // mark the worker as draining + mut_worker.set_draining(true); + } else { + // drained, stop and drop + tracing::trace!("Stopping worker {wid}"); + mut_worker.actor.stop(None); + existing_worker.remove(); + } + } + std::collections::hash_map::Entry::Vacant(_) => { + // worker doesn't exist, ignore + } + } + } + } + + async fn resize_pool( + &mut self, + myself: &ActorRef>, + requested_pool_size: usize, + ) -> Result<(), SpawnErr> { + if requested_pool_size == 0 { + return Ok(()); + } + + let curr_size = self.pool_size; + let new_pool_size = std::cmp::min(GLOBAL_WORKER_POOL_MAXIMUM, requested_pool_size); + + match new_pool_size.cmp(&curr_size) { + Ordering::Greater => { + tracing::debug!( + factory = ?myself, "Resizing factory worker pool from {} -> {}", + curr_size, + new_pool_size + ); + // grow pool + let to_add = new_pool_size - curr_size; + self.grow_pool(myself, to_add).await?; + } + Ordering::Less => { + tracing::debug!( + factory = ?myself, "Resizing factory worker pool from {} -> {}", + curr_size, + new_pool_size + ); + // shrink pool + let to_remove = curr_size - new_pool_size; + self.shrink_pool(to_remove); + } + Ordering::Equal => { + // no-op + } + } + + self.pool_size = new_pool_size; + Ok(()) + } + + fn is_drained(&mut self) -> bool { + match &self.drain_state { + DrainState::NotDraining => false, + DrainState::Drained => true, + DrainState::Draining => { + let are_all_workers_free = self.pool.values().all(|worker| worker.is_available()); + if are_all_workers_free && self.queue.len() == 0 { + tracing::debug!("Worker pool is free and queue is empty."); + // everyone is free, all requests are drainined + self.drain_state = DrainState::Drained; + true + } else { + false + } + } + } + } + + fn dispatch(&mut self, mut job: Job) -> Result<(), ActorProcessingErr> { + // set the time the factory received the message + job.set_factory_time(); + self.stats.job_submitted(); + + if self.drain_state == DrainState::NotDraining { + if let RouteResult::Backlog(busy_job) = + self.router + .route_message(job, self.pool_size, &mut self.pool)? + { + // workers are busy, we need to queue a job + self.maybe_enqueue(busy_job); + } + } else { + tracing::debug!("Factory is draining but a job was received"); + if let Some(discard_handler) = &self.discard_handler { + discard_handler.discard(DiscardReason::Shutdown, &mut job); + } + } + Ok(()) + } + + fn worker_finished_job(&mut self, who: WorkerId, key: TKey) -> Result<(), ActorProcessingErr> { + let (is_worker_draining, should_drop_worker) = if let Some(worker) = self.pool.get_mut(&who) + { + if let Some(job_options) = worker.worker_complete(key)? { + self.stats.factory_job_done(&job_options); + } + + if worker.is_draining { + // don't schedule more work + (true, !worker.is_working()) + } else { + (false, false) + } + } else { + (false, false) + }; + + if should_drop_worker { + let worker = self.pool.remove(&who); + if let Some(w) = worker { + tracing::trace!("Stopping worker {}", w.wid); + w.actor.stop(None); + } + } else if !is_worker_draining { + self.try_route_next_active_job(who)?; + } + Ok(()) + } + + fn worker_pong(&mut self, wid: usize, time: Duration) { + let discard_limit = self + .discard_settings + .get_limit_and_mode() + .map_or(0, |(l, _)| l); + if let Some(worker) = self.pool.get_mut(&wid) { + worker.ping_received(time, discard_limit); + } + } + + async fn calculate_metrics( + &mut self, + myself: &ActorRef>, + ) -> Result<(), ActorProcessingErr> { + if let Some(capacity_controller) = &mut self.capacity_controller { + let new_capacity = capacity_controller.get_pool_size(self.pool_size).await; + if self.pool_size != new_capacity { + tracing::info!("Factory worker count {}", new_capacity); + self.resize_pool(myself, new_capacity).await?; + } + } + + // TTL expired on these items, remove them before even trying to dequeue & distribute them + if self.router.is_factory_queueing() { + let num_removed = self.queue.remove_expired_items(&self.discard_handler); + self.stats.jobs_ttls_expired(num_removed); + } + + self.stats.reset_global_counters(); + + // schedule next calculation + myself.send_after(CALCULATE_FREQUENCY, || FactoryMessage::Calculate); + Ok(()) + } + + async fn send_pings( + &mut self, + myself: &ActorRef>, + when: Instant, + ) -> Result<(), ActorProcessingErr> { + self.stats.ping_received(when.elapsed()); + + // if we have dyanmic discarding, we update the discard threshold + if let DiscardSettings::Dynamic { limit, updater, .. } = &mut self.discard_settings { + *limit = updater.compute(*limit).await; + } + + for worker in self.pool.values_mut() { + worker.send_factory_ping()?; + } + + // schedule next ping + myself.send_after(PING_FREQUENCY, || FactoryMessage::DoPings(Instant::now())); + + Ok(()) + } + + async fn identify_stuck_workers(&mut self, myself: &ActorRef>) { + if let Some(dmd) = &self.dead_mans_switch { + let mut dead_workers = vec![]; + for worker in self.pool.values_mut() { + if worker.is_stuck(dmd.detection_timeout) && dmd.kill_worker { + tracing::warn!( + factory = ?myself, "Factory killing stuck worker {}", + worker.wid + ); + worker.actor.kill(); + if let Some(h) = worker.get_join_handle() { + dead_workers.push(h); + } + } + } + + for w in dead_workers.into_iter() { + let _ = w.await; + } + + // schedule next check + myself.send_after(dmd.detection_timeout, || { + FactoryMessage::IdentifyStuckWorkers + }); + } + } + + async fn drain_requests( + &mut self, + myself: &ActorRef>, + ) -> Result<(), ActorProcessingErr> { + // put us into a draining state + tracing::debug!("Factory is moving to draining state"); + self.drain_state = DrainState::Draining; + if let Some(hooks) = &mut self.lifecycle_hooks { + hooks.on_factory_draining(myself.clone()).await?; + } + Ok(()) + } + + fn reply_with_available_capacity(&self, reply: RpcReplyPort) { + // calculate the worker's free capacity + let worker_availability = self + .pool + .values() + .filter(|worker| worker.is_available()) + .count(); + match self.discard_settings.get_limit_and_mode() { + Some((limit, _)) => { + // get the queue space and add it to the worker availability + let count = (limit - self.queue.len()) + worker_availability; + let _ = reply.send(count); + } + None => { + // there's no queueing limit, so we just report worker + // availability + let _ = reply.send(worker_availability); + } + } + } +} + +#[cfg_attr(feature = "async-trait", crate::async_trait)] +impl Actor + for Factory +where + TKey: JobKey, + TMsg: Message, + TWorkerStart: Message, + TWorker: Actor< + Msg = WorkerMessage, + Arguments = WorkerStartContext, + >, + TRouter: Router, + TQueue: Queue, +{ + type Msg = FactoryMessage; + type State = FactoryState; + type Arguments = FactoryArguments; + + async fn pre_start( + &self, + myself: ActorRef>, + FactoryArguments { + worker_builder, + num_initial_workers, + router, + queue, + discard_handler, + discard_settings, + dead_mans_switch, + capacity_controller, + lifecycle_hooks, + collect_worker_stats, + }: Self::Arguments, + ) -> Result { + tracing::debug!(factory = ?myself, "Factory starting"); + + // build the pool + let mut pool = HashMap::new(); + for wid in 0..num_initial_workers { + let (handler, custom_start) = worker_builder.build(wid); + let context = WorkerStartContext { + wid, + factory: myself.clone(), + custom_start, + }; + let (worker, worker_handle) = + Actor::spawn_linked(None, handler, context, myself.get_cell()).await?; + let worker_discard_settings = if router.is_factory_queueing() { + discard::WorkerDiscardSettings::None + } else { + discard_settings.get_worker_settings() + }; + + pool.insert( + wid, + WorkerProperties::new( + wid, + worker, + worker_discard_settings, + discard_handler.clone(), + collect_worker_stats, + worker_handle, + ), + ); + } + + // Startup worker pinging + myself.send_after(PING_FREQUENCY, || FactoryMessage::DoPings(Instant::now())); + + // startup calculations + myself.send_after(CALCULATE_FREQUENCY, || FactoryMessage::Calculate); + + // startup stuck worker detection + if let Some(dmd) = &dead_mans_switch { + myself.send_after(dmd.detection_timeout, || { + FactoryMessage::IdentifyStuckWorkers + }); + } + + // initial state + Ok(FactoryState { + worker_builder, + pool_size: num_initial_workers, + pool, + drain_state: DrainState::NotDraining, + capacity_controller, + dead_mans_switch, + discard_handler, + discard_settings, + lifecycle_hooks, + queue, + router, + stats: { + let mut s = MessageProcessingStats::default(); + s.enable(); + s + }, + collect_worker_stats, + }) + } + + async fn post_start( + &self, + myself: ActorRef>, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + tracing::debug!(factory = ?myself, "Factory started"); + if let Some(hooks) = &mut state.lifecycle_hooks { + hooks.on_factory_started(myself).await?; + } + Ok(()) + } + + async fn post_stop( + &self, + myself: ActorRef, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + tracing::debug!(factory = ?myself, "Factory stopped"); + + if let Some(handler) = &state.discard_handler { + while let Some(mut msg) = state.queue.pop_front() { + handler.discard(DiscardReason::Shutdown, &mut msg); + } + } + + // cleanup the pool and wait for it to exit + for worker_props in state.pool.values() { + worker_props.actor.stop(None); + } + // now wait on the handles until the workers finish + for worker_props in state.pool.values_mut() { + if let Some(handle) = worker_props.get_join_handle() { + let _ = handle.await; + } + } + + if let Some(hooks) = &mut state.lifecycle_hooks { + hooks.on_factory_stopped().await?; + } + + Ok(()) + } + + async fn handle_supervisor_evt( + &self, + myself: ActorRef>, + message: SupervisionEvent, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + match message { + SupervisionEvent::ActorTerminated(who, _, reason) => { + let wid = if let Some(worker) = state + .pool + .values_mut() + .find(|actor| actor.is_pid(who.get_id())) + { + tracing::warn!( + factory = ?myself, "Factory's worker {} terminated with {:?}", + worker.wid, + reason + ); + let (new_worker, custom_start) = state.worker_builder.build(worker.wid); + let spec = WorkerStartContext { + wid: worker.wid, + factory: myself.clone(), + custom_start, + }; + let (replacement, replacement_handle) = + Actor::spawn_linked(None, new_worker, spec, myself.get_cell()).await?; + + worker.replace_worker(replacement, replacement_handle)?; + Some(worker.wid) + } else { + None + }; + if let Some(wid) = wid { + state.try_route_next_active_job(wid)?; + } + } + SupervisionEvent::ActorPanicked(who, reason) => { + let wid = if let Some(worker) = state + .pool + .values_mut() + .find(|actor| actor.is_pid(who.get_id())) + { + tracing::warn!( + factory = ?myself, "Factory's worker {} panicked with {}", + worker.wid, + reason + ); + let (new_worker, custom_start) = state.worker_builder.build(worker.wid); + let spec = WorkerStartContext { + wid: worker.wid, + factory: myself.clone(), + custom_start, + }; + let (replacement, replacement_handle) = + Actor::spawn_linked(None, new_worker, spec, myself.get_cell()).await?; + + worker.replace_worker(replacement, replacement_handle)?; + Some(worker.wid) + } else { + None + }; + if let Some(wid) = wid { + state.try_route_next_active_job(wid)?; + } + } + _ => {} + } + Ok(()) + } + + async fn handle( + &self, + myself: ActorRef>, + message: FactoryMessage, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + match message { + FactoryMessage::Dispatch(job) => { + state.dispatch(job)?; + } + FactoryMessage::Finished(who, key) => { + state.worker_finished_job(who, key)?; + } + FactoryMessage::WorkerPong(wid, time) => { + state.worker_pong(wid, time); + } + FactoryMessage::Calculate => { + state.calculate_metrics(&myself).await?; + } + FactoryMessage::DoPings(when) => { + state.send_pings(&myself, when).await?; + } + FactoryMessage::IdentifyStuckWorkers => { + state.identify_stuck_workers(&myself).await; + } + FactoryMessage::GetQueueDepth(reply) => { + let _ = reply.send(state.queue.len()); + } + FactoryMessage::AdjustWorkerPool(requested_pool_size) => { + tracing::info!("Adjusting pool size to {}", requested_pool_size); + state.resize_pool(&myself, requested_pool_size).await?; + } + FactoryMessage::GetAvailableCapacity(reply) => { + state.reply_with_available_capacity(reply); + } + FactoryMessage::DrainRequests => { + state.drain_requests(&myself).await?; + } + } + + if state.is_drained() { + // If we're in a draining state, and all requests are now drained + // stop the factory + myself.stop(None); + } + Ok(()) + } +} diff --git a/ractor/src/factory/lifecycle.rs b/ractor/src/factory/lifecycle.rs new file mode 100644 index 00000000..36a49ace --- /dev/null +++ b/ractor/src/factory/lifecycle.rs @@ -0,0 +1,67 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Lifecycle hooks support interjecting external logic into the factory's +//! lifecycle (startup/shutdown/etc) such that users can intercept and +//! adjust factory functionality at key interjection points. + +use super::FactoryMessage; +use super::JobKey; +use crate::ActorProcessingErr; +use crate::ActorRef; +use crate::Message; +use crate::State; + +/// Hooks for [crate::factory::Factory] lifecycle events based on the +/// underlying actor's lifecycle. +#[cfg_attr(feature = "async-trait", crate::async_trait)] +pub trait FactoryLifecycleHooks: State + Sync +where + TKey: JobKey, + TMsg: Message, +{ + /// Called when the factory has completed it's startup routine but + /// PRIOR to processing any messages. Just before this point, the factory + /// is ready to accept and process requests and all workers are started. + /// + /// This hook is there to provide custom startup logic you want to make sure has run + /// prior to processing messages on workers + /// + /// WARNING: An error or panic returned here WILL shutdown the factory and notify supervisors + #[allow(unused_variables)] + async fn on_factory_started( + &self, + factory_ref: ActorRef>, + ) -> Result<(), ActorProcessingErr> { + Ok(()) + } + + /// Called when the factory has completed it's shutdown routine but + /// PRIOR to fully exiting and notifying any relevant supervisors. Just prior + /// to this call the factory has processed its last message and will process + /// no more messages. + /// + /// This hook is there to provide custom shutdown logic you want to make sure has run + /// prior to the factory fully exiting + async fn on_factory_stopped(&self) -> Result<(), ActorProcessingErr> { + Ok(()) + } + + /// Called when the factory has received a signal to drain requests and exit after + /// draining has completed. + /// + /// This hook is to provide the ability to notify external services that the factory + /// is in the process of shutting down. If the factory is never "drained" formally, + /// this hook won't be called. + /// + /// WARNING: An error or panic returned here WILL shutdown the factory and notify supervisors + #[allow(unused_variables)] + async fn on_factory_draining( + &self, + factory_ref: ActorRef>, + ) -> Result<(), ActorProcessingErr> { + Ok(()) + } +} diff --git a/ractor/src/factory/mod.rs b/ractor/src/factory/mod.rs index 980437c9..b1788ed2 100644 --- a/ractor/src/factory/mod.rs +++ b/ractor/src/factory/mod.rs @@ -42,10 +42,7 @@ ## Example Factory ```rust use ractor::concurrency::Duration; -use ractor::factory::{ - Factory, FactoryMessage, Job, JobOptions, RoutingMode, WorkerBuilder, WorkerMessage, - WorkerStartContext, -}; +use ractor::factory::*; use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort}; #[derive(Debug)] enum ExampleMessage { @@ -55,11 +52,11 @@ enum ExampleMessage { /// The worker's specification for the factory. This defines /// the business logic for each message that will be done in parallel. struct ExampleWorker; -#[ractor::async_trait] +#[cfg_attr(feature = \"async-trait\", ractor::async_trait)] impl Actor for ExampleWorker { type Msg = WorkerMessage<(), ExampleMessage>; - type State = WorkerStartContext<(), ExampleMessage>; - type Arguments = WorkerStartContext<(), ExampleMessage>; + type State = WorkerStartContext<(), ExampleMessage, ()>; + type Arguments = WorkerStartContext<(), ExampleMessage, ()>; async fn pre_start( &self, _myself: ActorRef, @@ -107,19 +104,26 @@ impl Actor for ExampleWorker { } /// Used by the factory to build new [ExampleWorker]s. struct ExampleWorkerBuilder; -impl WorkerBuilder for ExampleWorkerBuilder { - fn build(&self, _wid: usize) -> ExampleWorker { - ExampleWorker +impl WorkerBuilder for ExampleWorkerBuilder { + fn build(&self, _wid: usize) -> (ExampleWorker, ()) { + (ExampleWorker, ()) } } #[tokio::main] async fn main() { - let factory_def = Factory::<(), ExampleMessage, ExampleWorker> { - worker_count: 5, - routing_mode: RoutingMode::<()>::Queuer, - ..Default::default() - }; - let (factory, handle) = Actor::spawn(None, factory_def, Box::new(ExampleWorkerBuilder)) + let factory_def = Factory::< + (), + ExampleMessage, + (), + ExampleWorker, + routing::QueuerRouting<(), ExampleMessage>, + queues::DefaultQueue<(), ExampleMessage> + >::default(); + let factory_args = FactoryArgumentsBuilder::new(ExampleWorkerBuilder, Default::default(), Default::default()) + .with_number_of_initial_workers(5) + .build(); + + let (factory, handle) = Actor::spawn(None, factory_def, factory_args) .await .expect(\"Failed to startup factory\"); for i in 0..99 { @@ -153,239 +157,39 @@ async fn main() { " )] -use std::collections::{HashMap, VecDeque}; -use std::marker::PhantomData; - -use rand::Rng; - use crate::concurrency::{Duration, Instant}; #[cfg(feature = "cluster")] use crate::message::BoxedDowncastErr; -use crate::{Actor, ActorProcessingErr, ActorRef, Message, RpcReplyPort, SupervisionEvent}; +use crate::{Message, RpcReplyPort}; +pub mod discard; +pub mod factoryimpl; pub mod hash; pub mod job; -pub mod routing_mode; +pub mod lifecycle; +pub mod queues; +pub mod routing; pub mod stats; pub mod worker; -use stats::MessageProcessingStats; - -pub use job::{Job, JobKey, JobOptions}; -pub use routing_mode::{CustomHashFunction, RoutingMode}; -pub use worker::{WorkerMessage, WorkerProperties, WorkerStartContext}; - -/// Identifier for a worker in a factory -pub type WorkerId = usize; - -#[cfg(not(test))] -const PING_FREQUENCY_MS: u64 = 1000; -#[cfg(test)] -const PING_FREQUENCY_MS: u64 = 100; - #[cfg(test)] mod tests; -/// Trait defining the discard handler for a factory. -pub trait DiscardHandler: Send + Sync + 'static -where - TKey: JobKey, - TMsg: Message, -{ - /// Called on a job - fn discard(&self, job: Job); - - /// clone yourself into a box - fn clone_box(&self) -> Box>; -} - -/// Trait defining a builder of workers for a factory -pub trait WorkerBuilder: Send + Sync -where - TWorker: Actor, -{ - /// Build a new worker - /// - /// * `wid`: The worker's "id" or index in the worker pool - fn build(&self, wid: WorkerId) -> TWorker; -} - -/// The configuration for the dead-man's switch functionality -pub struct DeadMansSwitchConfiguration { - /// Duration before determining worker is stuck - pub detection_timeout: Duration, - /// Flag denoting if the stuck worker should be killed - /// and restarted - pub kill_worker: bool, -} - -/// A factory is a manager to a pool of worker actors used for job dispatching -pub struct Factory -where - TKey: JobKey, - TMsg: Message, - TWorker: Actor>, -{ - /// Number of workers in the factory - pub worker_count: usize, - - /// If [true], tells the factory to collect statistics on the workers. - /// - /// Default is [false] - pub collect_worker_stats: bool, - - /// Message routing mode - /// - /// Default is [RoutingMode::KeyPersistent] - pub routing_mode: RoutingMode, - - /// Maximum queue length. Any job arriving when the queue is at its max length - /// will cause an oldest job at the head of the queue will be dropped. - /// - /// * For [RoutingMode::Queuer] factories, applied to the factory's internal queue. - /// * For [RoutingMode::KeyPersistent] and [RoutingMode::CustomHashFunction], this applies to the worker's - /// message queue - /// - /// Default is [None] - pub discard_threshold: Option, - - /// Discard callback when a job is discarded. - /// - /// Default is [None] - pub discard_handler: Option>>, - - /// A worker's ability for parallelized work - /// - /// Default is 1 (worker is strictly sequential in dispatched work) - pub worker_parallel_capacity: usize, - - /// Controls the "dead man's" switching logic on the factory. Periodically - /// the factory will scan for stuck workers. If detected, the worker information - /// will be logged along with the current job key information. Optionally the worker - /// can be killed and replaced by the factory - pub dead_mans_switch: Option, - - /// The worker type - pub _worker: PhantomData, -} - -impl Factory -where - TKey: JobKey, - TMsg: Message, - TWorker: Actor>, -{ - fn maybe_dequeue( - &self, - state: &mut FactoryState, - who: WorkerId, - ) -> Result<(), ActorProcessingErr> { - match &self.routing_mode { - RoutingMode::Queuer | RoutingMode::StickyQueuer => { - // pop + discard expired jobs - let mut next_job = None; - while let Some(job) = state.messages.pop_front() { - if !job.is_expired() { - next_job = Some(job); - break; - } else { - state.stats.job_ttl_expired(); - } - } - // de-queue another job - if let Some(job) = next_job { - if let Some(worker) = state.pool.get_mut(&who).filter(|f| f.is_available()) { - // Check if this worker is now free (should be the case except potentially in a sticky queuer which may automatically - // move to a sticky message) - worker.enqueue_job(job)?; - } else if let Some(worker) = - state.pool.values_mut().find(|worker| worker.is_available()) - { - // If that worker is busy, try and scan the workers to find a free worker - worker.enqueue_job(job)?; - } else { - // no free worker, put the message back into the queue where it was (at the front of the queue) - state.messages.push_front(job); - } - } - } - _ => {} - } - Ok(()) - } - - fn maybe_enqueue(&self, state: &mut FactoryState, job: Job) { - if let Some(limit) = self.discard_threshold { - state.messages.push_back(job); - while state.messages.len() > limit { - // try and shed a job - if let Some(msg) = state.messages.pop_front() { - if let Some(handler) = &self.discard_handler { - handler.discard(msg); - } - state.stats.job_discarded(); - } - } - } else { - // no load-shedding - state.messages.push_back(job); - } - } -} - -impl Default for Factory -where - TKey: JobKey, - TMsg: Message, - TWorker: Actor>, -{ - fn default() -> Self { - Self { - worker_count: 1, - routing_mode: RoutingMode::default(), - discard_threshold: None, - discard_handler: None, - _worker: PhantomData, - worker_parallel_capacity: 1, - collect_worker_stats: false, - dead_mans_switch: None, - } - } -} - -/// State of a factory (backlogged jobs, handler, etc) -pub struct FactoryState -where - TKey: JobKey, - TMsg: Message, - TWorker: Actor>, -{ - worker_builder: Box>, - pool: HashMap>, - - messages: VecDeque>, - last_worker: WorkerId, - stats: MessageProcessingStats, -} - -impl FactoryState -where - TKey: JobKey, - TMsg: Message, - TWorker: Actor, Arguments = WorkerStartContext>, -{ - fn log_stats(&mut self, factory: &ActorRef>) { - let factory_identifier = if let Some(factory_name) = factory.get_name() { - format!("======== Factory {factory_name} stats ========\n") - } else { - format!("======== Factory ({}) stats ========\n", factory.get_id()) - }; +use stats::MessageProcessingStats; - tracing::debug!("{factory_identifier}\n{}", self.stats); - self.stats.reset_global_counters(); - } -} +pub use discard::{ + DiscardHandler, DiscardMode, DiscardReason, DiscardSettings, DynamicDiscardController, +}; +pub use factoryimpl::{Factory, FactoryArguments, FactoryArgumentsBuilder}; +pub use job::{Job, JobKey, JobOptions}; +pub use lifecycle::FactoryLifecycleHooks; +pub use worker::{ + DeadMansSwitchConfiguration, WorkerBuilder, WorkerCapacityController, WorkerMessage, + WorkerProperties, WorkerStartContext, +}; +/// Unique identifier of a disctinct worker in the factory +pub type WorkerId = usize; /// Messages to a factory. /// /// **A special note about factory messages in a distributed context!** @@ -410,6 +214,9 @@ where /// tracking the factory's timing itself DoPings(Instant), + /// Calculate factory properties (loadshedding, concurrency, etc) + Calculate, + /// A reply to a factory ping supplying the worker id and the time /// of the ping start WorkerPong(WorkerId, Duration), @@ -417,9 +224,21 @@ where /// Trigger a scan for stuck worker detection IdentifyStuckWorkers, - /// Retrieve the factory's statsistics - GetStats(RpcReplyPort), + /// Retrieve the current queue depth (if in a queueing mode) + GetQueueDepth(RpcReplyPort), + + /// Resize the worker pool to the requested size + AdjustWorkerPool(usize), + + /// Retrieve the available capacity of the worker pool + queue + GetAvailableCapacity(RpcReplyPort), + + /// Notify the factory that it's being drained, and to finish jobs + /// currently in the queue, but discard new work, and once drained + /// exit + DrainRequests, } + #[cfg(feature = "cluster")] impl Message for FactoryMessage where @@ -441,286 +260,3 @@ where Ok(Self::Dispatch(Job::::deserialize(bytes)?)) } } - -#[cfg_attr(feature = "async-trait", crate::async_trait)] -impl Actor for Factory -where - TKey: JobKey, - TMsg: Message, - TWorker: Actor, Arguments = WorkerStartContext>, -{ - type Msg = FactoryMessage; - type State = FactoryState; - type Arguments = Box>; - - async fn pre_start( - &self, - myself: ActorRef, - builder: Box>, - ) -> Result { - // build the pool - let mut pool = HashMap::new(); - for wid in 0..self.worker_count { - let context = WorkerStartContext { - wid, - factory: myself.clone(), - }; - let handler = builder.build(wid); - let (worker, worker_handle) = - Actor::spawn_linked(None, handler, context, myself.get_cell()).await?; - pool.insert( - wid, - WorkerProperties::new( - wid, - worker, - self.worker_parallel_capacity, - self.discard_threshold, - self.discard_handler.as_ref().map(|a| a.clone_box()), - self.collect_worker_stats, - worker_handle, - ), - ); - } - - // Startup worker pinging - myself.send_after(Duration::from_millis(PING_FREQUENCY_MS), || { - FactoryMessage::DoPings(Instant::now()) - }); - - // startup stuck worker detection - if let Some(dmd) = &self.dead_mans_switch { - myself.send_after(dmd.detection_timeout, || { - FactoryMessage::IdentifyStuckWorkers - }); - } - let mut stats = MessageProcessingStats::default(); - stats.enable(); - - // initial state - Ok(Self::State { - messages: VecDeque::new(), - worker_builder: builder, - pool, - last_worker: 0, - stats, - }) - } - - async fn post_stop( - &self, - _: ActorRef, - state: &mut Self::State, - ) -> Result<(), ActorProcessingErr> { - // send the stop signal to all workers - for worker in state.pool.values() { - worker.actor.stop(None); - } - // wait for all workers to exit - for worker in state.pool.values_mut() { - if let Some(handle) = worker.get_handle() { - let _ = handle.await; - } - } - Ok(()) - } - - async fn handle( - &self, - myself: ActorRef, - message: FactoryMessage, - state: &mut Self::State, - ) -> Result<(), ActorProcessingErr> { - // TODO: based on the routing spec, dispatch the message - match message { - FactoryMessage::Dispatch(mut job) => { - // set the time the factory received the message - job.set_factory_time(); - state.stats.job_submitted(); - - match &self.routing_mode { - RoutingMode::KeyPersistent => { - let key = hash::hash_with_max(&job.key, self.worker_count); - if let Some(worker) = state.pool.get_mut(&key) { - worker.enqueue_job(job)?; - } - } - RoutingMode::Queuer => { - if let Some(worker) = - state.pool.values_mut().find(|worker| worker.is_available()) - { - worker.enqueue_job(job)?; - } else { - // no free worker, maybe backlog the message - self.maybe_enqueue(state, job); - } - } - RoutingMode::StickyQueuer => { - // See if there's a worker processing this given job key already, if yes route to that worker - let maybe_worker = state - .pool - .values_mut() - .find(|worker| worker.is_processing_key(&job.key)); - if let Some(worker) = maybe_worker { - worker.enqueue_job(job)?; - } else if let Some(worker) = - state.pool.values_mut().find(|worker| worker.is_available()) - { - // If no matching sticky worker, find the first free worker - worker.enqueue_job(job)?; - } else { - // no free worker, maybe backlog the message - self.maybe_enqueue(state, job); - } - } - RoutingMode::RoundRobin => { - let mut key = state.last_worker + 1; - if key >= self.worker_count { - key = 0; - } - if let Some(worker) = state.pool.get_mut(&key) { - worker.enqueue_job(job)?; - } - state.last_worker = key; - } - RoutingMode::Random => { - let key = rand::thread_rng().gen_range(0..self.worker_count); - if let Some(worker) = state.pool.get_mut(&key) { - worker.enqueue_job(job)?; - } - } - RoutingMode::CustomHashFunction(hasher) => { - let key = hasher.hash(&job.key, self.worker_count); - if let Some(worker) = state.pool.get_mut(&key) { - worker.enqueue_job(job)?; - } - } - } - } - FactoryMessage::Finished(who, job_key) => { - let wid = if let Some(worker) = state.pool.get_mut(&who) { - if let Some(job_options) = worker.worker_complete(job_key)? { - // record the job data on the factory - state.stats.factory_job_done(&job_options); - } - Some(worker.wid) - } else { - None - }; - if let Some(wid) = wid { - self.maybe_dequeue(state, wid)?; - } - } - FactoryMessage::WorkerPong(wid, time) => { - if let Some(worker) = state.pool.get_mut(&wid) { - worker.ping_received(time); - } - } - FactoryMessage::DoPings(when) => { - if state.stats.ping_received(when.elapsed()) { - state.log_stats(&myself); - } - - for worker in state.pool.values_mut() { - worker.send_factory_ping()?; - } - - // schedule next ping - myself.send_after(Duration::from_millis(PING_FREQUENCY_MS), || { - FactoryMessage::DoPings(Instant::now()) - }); - } - FactoryMessage::IdentifyStuckWorkers => { - if let Some(dmd) = &self.dead_mans_switch { - for worker in state.pool.values() { - if worker.is_stuck(dmd.detection_timeout) && dmd.kill_worker { - tracing::info!( - "Factory {:?} killing stuck worker {}", - myself.get_name(), - worker.wid - ); - worker.actor.kill(); - } - } - - // schedule next check - myself.send_after(dmd.detection_timeout, || { - FactoryMessage::IdentifyStuckWorkers - }); - } - } - FactoryMessage::GetStats(reply) => { - let _ = reply.send(state.stats.clone()); - } - } - - Ok(()) - } - - async fn handle_supervisor_evt( - &self, - myself: ActorRef, - message: SupervisionEvent, - state: &mut Self::State, - ) -> Result<(), ActorProcessingErr> { - match message { - SupervisionEvent::ActorTerminated(who, _, reason) => { - let wid = if let Some(worker) = state - .pool - .values_mut() - .find(|actor| actor.is_pid(who.get_id())) - { - tracing::warn!( - "Factory {:?}'s worker {} terminated with {reason:?}", - myself.get_name(), - worker.wid - ); - let new_worker = state.worker_builder.build(worker.wid); - let spec = WorkerStartContext { - wid: worker.wid, - factory: myself.clone(), - }; - let (replacement, replacement_handle) = - Actor::spawn_linked(None, new_worker, spec, myself.get_cell()).await?; - - worker.replace_worker(replacement, replacement_handle)?; - Some(worker.wid) - } else { - None - }; - if let Some(wid) = wid { - self.maybe_dequeue(state, wid)?; - } - } - SupervisionEvent::ActorPanicked(who, reason) => { - let wid = if let Some(worker) = state - .pool - .values_mut() - .find(|actor| actor.is_pid(who.get_id())) - { - tracing::warn!( - "Factory {:?}'s worker {} panicked with {reason}", - myself.get_name(), - worker.wid - ); - let new_worker = state.worker_builder.build(worker.wid); - let spec = WorkerStartContext { - wid: worker.wid, - factory: myself.clone(), - }; - let (replacement, replacement_handle) = - Actor::spawn_linked(None, new_worker, spec, myself.get_cell()).await?; - - worker.replace_worker(replacement, replacement_handle)?; - Some(worker.wid) - } else { - None - }; - if let Some(wid) = wid { - self.maybe_dequeue(state, wid)?; - } - } - _ => {} - } - Ok(()) - } -} diff --git a/ractor/src/factory/queues.rs b/ractor/src/factory/queues.rs new file mode 100644 index 00000000..e7b0678a --- /dev/null +++ b/ractor/src/factory/queues.rs @@ -0,0 +1,335 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Queue implementations for Factories + +use std::collections::VecDeque; +use std::marker::PhantomData; +use std::sync::Arc; + +use crate::factory::DiscardHandler; +use crate::factory::DiscardReason; +use crate::factory::Job; +use crate::factory::JobKey; +use crate::Message; + +/// Implementation of backing queue for factory messages when workers are +/// all busy +pub trait Queue: Send + 'static +where + TKey: JobKey, + TMsg: Message, +{ + /// Retrieve the size of the factory's queue + fn len(&self) -> usize; + + /// Check if the queue is empty + fn is_empty(&self) -> bool; + + /// Pop the next message from the front of the queue + fn pop_front(&mut self) -> Option>; + + /// Try and discard a message according to the queue semantics + /// in an overload scenario (e.g. lowest priority if priority + /// queueing). In a basic queueing scenario, this is equivalent + /// to `pop_front` + fn discard_oldest(&mut self) -> Option>; + + /// Peek an item from the head of the queue + fn peek(&self) -> Option<&Job>; + + /// Push an item to the back of the queue + fn push_back(&mut self, job: Job); + + /// Remove expired items from the queue + /// + /// * `discard_handler` - The handler to call for each discarded job. Will be called + /// with [DiscardReason::Loadshed]. + /// + /// Returns the number of elements removed from the queue + fn remove_expired_items( + &mut self, + discard_handler: &Option>>, + ) -> usize; + + /// Determine if a given job can be discarded. Default is [true] for all jobs. + /// + /// This can be overridden to customize discard semantics. + fn is_job_discardable(&self, _key: &TKey) -> bool { + true + } +} + +/// Priority trait which denotes the usize value of a [Priority] +pub trait Priority: Default + From + Send + 'static { + /// Retrieve the index for the Priority value. This should be + /// contiguous from 0, 0 being the highest priority. + fn get_index(&self) -> usize; +} + +/// Basic 5-category priority definition. This is probably flexible enough +/// for most use-cases +#[derive(strum::FromRepr, Default, Debug, Clone, Copy, Eq, PartialEq, Hash)] +#[repr(usize)] +pub enum StandardPriority { + /// Most important + Highest = 0, + /// High + High = 1, + /// Important + Important = 2, + /// Normal + #[default] + Normal = 3, + /// Low/best-effort priority + BestEffort = 4, +} + +#[cfg(feature = "cluster")] +impl crate::BytesConvertable for StandardPriority { + fn from_bytes(bytes: Vec) -> Self { + (u64::from_bytes(bytes) as usize).into() + } + fn into_bytes(self) -> Vec { + (self as u64).into_bytes() + } +} + +impl StandardPriority { + /// Retrieve the number of variants of this enum, as a constant + pub const fn size() -> usize { + 5 + } +} + +impl Priority for StandardPriority { + fn get_index(&self) -> usize { + *self as usize + } +} + +impl From for StandardPriority { + fn from(value: usize) -> Self { + Self::from_repr(value).unwrap_or_default() + } +} + +/// The [PriorityManager] is responsible for extracting the job priority from +/// a given job's key (`TKey`). Additionally in some scenarios (e.g. thrift servers) +/// some jobs may be non-discardable, i.e. can be enqueued regardless of the backpressure +/// status of the factory. This is also responsible for determining if a job can be loadshed. +pub trait PriorityManager: Send + Sync + 'static +where + TKey: JobKey, + TPriority: Priority, +{ + /// Determine if this job can be discarded under load. + /// + /// Returns [true] if the job can be discarded, [false] otherwise. + fn is_discardable(&self, job: &TKey) -> bool; + + /// Retrieve the job's priority. + /// + /// Returns [None] if the job does not have a priority, [Some(`TPriority`)] otherwise. + fn get_priority(&self, job: &TKey) -> Option; +} + +// =============== Default Queue ================= // +/// A simple, no-priority queue +/// +/// Equivalent to a [VecDeque] +pub struct DefaultQueue +where + TKey: JobKey, + TMsg: Message, +{ + q: VecDeque>, +} + +impl Default for DefaultQueue +where + TKey: JobKey, + TMsg: Message, +{ + fn default() -> Self { + Self { q: VecDeque::new() } + } +} + +impl Queue for DefaultQueue +where + TKey: JobKey, + TMsg: Message, +{ + /// Retrieve the size of the factory's queue + fn len(&self) -> usize { + self.q.len() + } + + /// Check if the queue is empty + fn is_empty(&self) -> bool { + self.q.is_empty() + } + + /// Pop the next message from the front of the queue + fn pop_front(&mut self) -> Option> { + self.q.pop_front() + } + + fn discard_oldest(&mut self) -> Option> { + self.pop_front() + } + + fn peek(&self) -> Option<&Job> { + self.q.front() + } + + /// Push an item to the back of the queue, with the given priority + fn push_back(&mut self, job: Job) { + self.q.push_back(job) + } + + /// Remove expired items from the queue + fn remove_expired_items( + &mut self, + discard_handler: &Option>>, + ) -> usize { + let before = self.q.len(); + // scan backlog for expired jobs and pop, discard, and drop them + self.q.retain_mut(|queued_item| { + if queued_item.is_expired() { + if let Some(handler) = discard_handler { + handler.discard(DiscardReason::TtlExpired, queued_item); + } + false + } else { + true + } + }); + self.q.len() - before + } +} + +// =============== Priority Queue ================= // +/// A queue with `NUM_PRIORITIES` priorities +/// +/// It requires a [PriorityManager] implementation associated with it in order to +/// determine the priorities of given jobs and inform discard semantics. +pub struct PriorityQueue +where + TKey: JobKey, + TMsg: Message, + TPriority: Priority, + TPriorityManager: PriorityManager, +{ + queues: [VecDeque>; NUM_PRIORITIES], + priority_manager: TPriorityManager, + _p: PhantomData TPriority>, +} + +impl + PriorityQueue +where + TKey: JobKey, + TMsg: Message, + TPriority: Priority, + TPriorityManager: PriorityManager, +{ + /// Construct a new [PriorityQueue] instance with the supplied [PriorityManager] + /// implementation. + pub fn new(priority_manager: TPriorityManager) -> Self { + Self { + _p: PhantomData, + priority_manager, + queues: [(); NUM_PRIORITIES].map(|_| VecDeque::new()), + } + } +} + +impl Queue + for PriorityQueue +where + TKey: JobKey, + TMsg: Message, + TPriority: Priority, + TPriorityManager: PriorityManager, +{ + /// Retrieve the size of the factory's queue + fn len(&self) -> usize { + self.queues.iter().map(|q| q.len()).sum() + } + + /// Check if the queue is empty + fn is_empty(&self) -> bool { + self.queues.iter().all(|q| q.is_empty()) + } + + /// Pop the next message from the front of the queue + fn pop_front(&mut self) -> Option> { + for i in 0..NUM_PRIORITIES { + if let Some(r) = self.queues[i].pop_front() { + return Some(r); + } + } + None + } + + fn discard_oldest(&mut self) -> Option> { + for i in (0..NUM_PRIORITIES).rev() { + if let Some(r) = self.queues[i].pop_front() { + return Some(r); + } + } + None + } + + fn peek(&self) -> Option<&Job> { + for i in 0..NUM_PRIORITIES { + let maybe = self.queues[i].front(); + if maybe.is_some() { + return maybe; + } + } + None + } + + /// Push an item to the back of the queue + fn push_back(&mut self, job: Job) { + let priority = self + .priority_manager + .get_priority(&job.key) + .unwrap_or_else(Default::default); + let idx = priority.get_index(); + self.queues[idx].push_back(job); + } + + /// Remove expired items from the queue + fn remove_expired_items( + &mut self, + discard_handler: &Option>>, + ) -> usize { + let mut num_removed = 0; + + // scan backlog for expired jobs and pop, discard, and drop them + for i in 0..NUM_PRIORITIES { + self.queues[i].retain_mut(|queued_item| { + if queued_item.is_expired() { + if let Some(handler) = discard_handler { + handler.discard(DiscardReason::TtlExpired, queued_item); + } + num_removed += 1; + false + } else { + true + } + }); + } + num_removed + } + + fn is_job_discardable(&self, key: &TKey) -> bool { + self.priority_manager.is_discardable(key) + } +} diff --git a/ractor/src/factory/routing.rs b/ractor/src/factory/routing.rs new file mode 100644 index 00000000..c243a017 --- /dev/null +++ b/ractor/src/factory/routing.rs @@ -0,0 +1,386 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Routing protocols for Factories + +use std::collections::HashMap; +use std::marker::PhantomData; + +use crate::factory::worker::WorkerProperties; +use crate::factory::Job; +use crate::factory::JobKey; +use crate::factory::WorkerId; +use crate::ActorProcessingErr; +use crate::Message; + +/// Custom hashing behavior for factory routing to workers +pub trait CustomHashFunction: Send + Sync +where + TKey: Send + Sync + 'static, +{ + /// Hash the key into the space 0..usize + fn hash(&self, key: &TKey, worker_count: usize) -> usize; +} + +/// The possible results from a routing operation. +pub enum RouteResult +where + TKey: JobKey, + TMsg: Message, +{ + /// The job has been handled and routed successfully + Handled, + /// The job needs to be backlogged into the internal factory's queue (if + /// configured) + Backlog(Job), +} + +/// A routing mode controls how a request is routed from the factory to a +/// designated worker +pub trait Router: Send + Sync + 'static +where + TKey: JobKey, + TMsg: Message, +{ + /// Route a [Job] based on the specific routing methodology + /// + /// * `job` - The job to be routed + /// * `pool_size` - The size of the ACTIVE worker pool (excluding draining workers) + /// * `worker_pool` - The current worker pool, which may contain draining workers + /// + /// Returns [RouteResult::Handled] if the job was routed successfully, otherwise + /// [RouteResult::Backlog] is returned indicating that the job should be enqueued in + /// the factory's internal queue. + fn route_message( + &mut self, + job: Job, + pool_size: usize, + worker_pool: &mut HashMap>, + ) -> Result, ActorProcessingErr>; + + /// Identifies if a job CAN be routed, and to which worker, without + /// requiring dequeueing the job + /// + /// This prevents the need to support pushing jobs that have been dequeued, + /// but no worker is available to accept the job, back into the front of the + /// queue. And given the single-threaded nature of a Factory, this is safe + /// to call outside of a locked context. It is assumed that if this returns + /// [Some(WorkerId)], then the job is guaranteed to be routed, as internal state to + /// the router may be updated. + /// + /// * `job` - A reference to the job to be routed + /// * `pool_size` - The size of the ACTIVE worker pool (excluding draining workers) + /// * `worker_pool` - The current worker pool, which may contain draining workers + /// + /// Returns [None] if no worker can be identified or no worker is avaialble to accept + /// the job, otherwise [Some(WorkerId)] indicating the target worker is returned + fn choose_target_worker( + &mut self, + job: &Job, + pool_size: usize, + worker_pool: &HashMap>, + ) -> Option; + + /// Returns a flag indicating if the factory does discard/overload management ([true]) + /// or if is handled by the workers worker(s) ([false]) + fn is_factory_queueing(&self) -> bool; +} + +// ============================ Macros ======================= // +macro_rules! impl_routing_mode { + ($routing_mode: ident, $doc:expr) => { + #[doc = $doc] + pub struct $routing_mode + where + TKey: JobKey, + TMsg: Message, + { + _key: PhantomData TKey>, + _msg: PhantomData TMsg>, + } + + impl Default for $routing_mode + where + TKey: JobKey, + TMsg: Message, + { + fn default() -> Self { + Self { + _key: PhantomData, + _msg: PhantomData, + } + } + } + }; +} + +// ============================ Key Persistent routing ======================= // +impl_routing_mode! {KeyPersistentRouting, "Factory will select worker by hashing the job's key. +Workers will have jobs placed into their incoming message queue's"} + +impl Router for KeyPersistentRouting +where + TKey: JobKey, + TMsg: Message, +{ + fn route_message( + &mut self, + job: Job, + pool_size: usize, + worker_pool: &mut HashMap>, + ) -> Result, ActorProcessingErr> { + let key = crate::factory::hash::hash_with_max(&job.key, pool_size); + if let Some(worker) = worker_pool.get_mut(&key) { + worker.enqueue_job(job)?; + } + Ok(RouteResult::Handled) + } + + fn choose_target_worker( + &mut self, + job: &Job, + pool_size: usize, + _worker_pool: &HashMap>, + ) -> Option { + let key = crate::factory::hash::hash_with_max(&job.key, pool_size); + Some(key) + } + + fn is_factory_queueing(&self) -> bool { + false + } +} + +// ============================ Queuer routing ======================= // +impl_routing_mode! {QueuerRouting, "Factory will dispatch job to first available worker. +Factory will maintain shared internal queue of messages"} + +impl Router for QueuerRouting +where + TKey: JobKey, + TMsg: Message, +{ + fn route_message( + &mut self, + job: Job, + pool_size: usize, + worker_pool: &mut HashMap>, + ) -> Result, ActorProcessingErr> { + if let Some(worker) = self + .choose_target_worker(&job, pool_size, worker_pool) + .and_then(|wid| worker_pool.get_mut(&wid)) + { + worker.enqueue_job(job)?; + Ok(RouteResult::Handled) + } else { + Ok(RouteResult::Backlog(job)) + } + } + + fn choose_target_worker( + &mut self, + _job: &Job, + _pool_size: usize, + worker_pool: &HashMap>, + ) -> Option { + worker_pool + .iter() + .find(|(_, worker)| worker.is_available()) + .map(|(wid, _)| *wid) + } + + fn is_factory_queueing(&self) -> bool { + true + } +} + +// ============================ Sticky Queuer routing ======================= // +impl_routing_mode! {StickyQueuerRouting, "Factory will dispatch jobs to a worker that is processing the same key (if any). +Factory will maintain shared internal queue of messages. + +Note: This is helpful for sharded db access style scenarios. If a worker is +currently doing something on a given row id for example, we want subsequent updates +to land on the same worker so it can serialize updates to the same row consistently."} + +impl Router for StickyQueuerRouting +where + TKey: JobKey, + TMsg: Message, +{ + fn route_message( + &mut self, + job: Job, + pool_size: usize, + worker_pool: &mut HashMap>, + ) -> Result, ActorProcessingErr> { + if let Some(worker) = self + .choose_target_worker(&job, pool_size, worker_pool) + .and_then(|wid| worker_pool.get_mut(&wid)) + { + worker.enqueue_job(job)?; + Ok(RouteResult::Handled) + } else { + Ok(RouteResult::Backlog(job)) + } + } + + fn choose_target_worker( + &mut self, + job: &Job, + _pool_size: usize, + worker_pool: &HashMap>, + ) -> Option { + let maybe_worker = worker_pool + .iter() + .find(|(_, worker)| worker.is_processing_key(&job.key)) + .map(|(a, _)| *a); + if maybe_worker.is_some() { + return maybe_worker; + } + + // fallback to first free worker as there's no sticky worker + worker_pool + .iter() + .find(|(_, worker)| worker.is_available()) + .map(|(wid, _)| *wid) + } + + fn is_factory_queueing(&self) -> bool { + true + } +} + +// ============================ Round-robin routing ======================= // +/// Factory will dispatch to the next worker in order. +/// +/// Workers will have jobs placed into their incoming message queue's +pub struct RoundRobinRouting +where + TKey: JobKey, + TMsg: Message, +{ + _key: PhantomData TKey>, + _msg: PhantomData TMsg>, + last_worker: WorkerId, +} + +impl Default for RoundRobinRouting +where + TKey: JobKey, + TMsg: Message, +{ + fn default() -> Self { + Self { + _key: PhantomData, + _msg: PhantomData, + last_worker: 0, + } + } +} + +impl Router for RoundRobinRouting +where + TKey: JobKey, + TMsg: Message, +{ + fn route_message( + &mut self, + job: Job, + pool_size: usize, + worker_pool: &mut HashMap>, + ) -> Result, ActorProcessingErr> { + if let Some(worker) = self + .choose_target_worker(&job, pool_size, worker_pool) + .and_then(|wid| worker_pool.get_mut(&wid)) + { + worker.enqueue_job(job)?; + } + Ok(RouteResult::Handled) + } + + fn choose_target_worker( + &mut self, + _job: &Job, + pool_size: usize, + _worker_pool: &HashMap>, + ) -> Option { + let mut key = self.last_worker + 1; + if key >= pool_size { + key = 0; + } + self.last_worker = key; + Some(key) + } + + fn is_factory_queueing(&self) -> bool { + false + } +} + +// ============================ Custom routing ======================= // +/// Factory will dispatch to workers based on a custom hash function. +/// +/// The factory maintains no queue in this scenario, and jobs are pushed +/// to worker's queues. +pub struct CustomRouting +where + TKey: JobKey, + TMsg: Message, + THasher: CustomHashFunction, +{ + _key: PhantomData TKey>, + _msg: PhantomData TMsg>, + hasher: THasher, +} + +impl CustomRouting +where + TKey: JobKey, + TMsg: Message, + THasher: CustomHashFunction, +{ + /// Construct a new [CustomRouting] instance with the supplied hash function + pub fn new(hasher: THasher) -> Self { + Self { + _key: PhantomData, + _msg: PhantomData, + hasher, + } + } +} + +impl Router for CustomRouting +where + TKey: JobKey, + TMsg: Message, + THasher: CustomHashFunction + 'static, +{ + fn route_message( + &mut self, + job: Job, + pool_size: usize, + worker_pool: &mut HashMap>, + ) -> Result, ActorProcessingErr> { + let key = self.hasher.hash(&job.key, pool_size); + if let Some(worker) = worker_pool.get_mut(&key) { + worker.enqueue_job(job)?; + } + Ok(RouteResult::Handled) + } + + fn choose_target_worker( + &mut self, + job: &Job, + pool_size: usize, + _worker_pool: &HashMap>, + ) -> Option { + let key = self.hasher.hash(&job.key, pool_size); + Some(key) + } + + fn is_factory_queueing(&self) -> bool { + false + } +} diff --git a/ractor/src/factory/routing_mode.rs b/ractor/src/factory/routing_mode.rs deleted file mode 100644 index 66813627..00000000 --- a/ractor/src/factory/routing_mode.rs +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright (c) Sean Lawlor -// -// This source code is licensed under both the MIT license found in the -// LICENSE-MIT file in the root directory of this source tree. - -//! Factory routing mode - -/// Custom hashing behavior for factory routing to workers -pub trait CustomHashFunction: Send + Sync -where - TKey: Send + Sync + 'static, -{ - /// Hash the key into the space 0..usize - fn hash(&self, key: &TKey, worker_count: usize) -> usize; -} - -/// Routing mode for jobs through the factory to workers -pub enum RoutingMode -where - TKey: Send + Sync + 'static, -{ - /// Factory will select worker by hashing the job's key. - /// Workers will have jobs placed into their incoming message queue's - KeyPersistent, - - /// Factory will dispatch job to first available worker. - /// Factory will maintain shared internal queue of messages - Queuer, - - /// Factory will dispatch jobs to a worker that is processing the same key (if any). - /// Factory will maintain shared internal queue of messages - StickyQueuer, - - /// Factory will dispatch to the next worker in order - RoundRobin, - - /// Factory will dispatch to a worker in a random order - Random, - - /// Similar to [RoutingMode::KeyPersistent] but with a custom hash function - CustomHashFunction(Box>), -} - -impl Default for RoutingMode -where - TKey: Send + Sync + 'static, -{ - fn default() -> Self { - Self::KeyPersistent - } -} diff --git a/ractor/src/factory/stats.rs b/ractor/src/factory/stats.rs index 9e904221..d2b20e5d 100644 --- a/ractor/src/factory/stats.rs +++ b/ractor/src/factory/stats.rs @@ -166,6 +166,13 @@ impl MessageProcessingStats { self.total_num_expired_jobs += 1; } + pub(crate) fn jobs_ttls_expired(&mut self, num_jobs: usize) { + if !self.enabled { + return; + } + self.total_num_expired_jobs += num_jobs as u128; + } + pub(crate) fn job_discarded(&mut self) { if !self.enabled { return; diff --git a/ractor/src/factory/tests/basic.rs b/ractor/src/factory/tests/basic.rs new file mode 100644 index 00000000..01d56706 --- /dev/null +++ b/ractor/src/factory/tests/basic.rs @@ -0,0 +1,849 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +use std::marker::PhantomData; +use std::sync::atomic::AtomicU16; +use std::sync::atomic::Ordering; +use std::sync::Arc; + +use crate::concurrency::Duration; +use crate::Actor; +use crate::ActorProcessingErr; +use crate::ActorRef; + +use crate::factory::routing::CustomHashFunction; +use crate::factory::*; +use crate::periodic_check; + +const NUM_TEST_WORKERS: usize = 3; + +#[derive(Debug, Hash, Clone, Eq, PartialEq)] +struct TestKey { + id: u64, +} +#[cfg(feature = "cluster")] +impl crate::BytesConvertable for TestKey { + fn from_bytes(bytes: Vec) -> Self { + Self { + id: u64::from_bytes(bytes), + } + } + fn into_bytes(self) -> Vec { + self.id.into_bytes() + } +} + +#[derive(Debug)] +enum TestMessage { + /// Doh'k + #[allow(dead_code)] + Ok, + /// Doh'k + #[allow(dead_code)] + Count(u16), +} +#[cfg(feature = "cluster")] +impl crate::Message for TestMessage {} + +type DefaultQueue = crate::factory::queues::DefaultQueue; + +struct TestWorker { + counter: Arc, + slow: Option, +} + +#[cfg_attr(feature = "async-trait", crate::async_trait)] +impl Actor for TestWorker { + type Msg = WorkerMessage; + type State = Self::Arguments; + type Arguments = WorkerStartContext; + + async fn pre_start( + &self, + _myself: ActorRef, + startup_context: Self::Arguments, + ) -> Result { + Ok(startup_context) + } + + async fn handle( + &self, + _myself: ActorRef, + message: Self::Msg, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + match message { + WorkerMessage::FactoryPing(time) => { + state + .factory + .cast(FactoryMessage::WorkerPong(state.wid, time.elapsed()))?; + } + WorkerMessage::Dispatch(job) => { + tracing::debug!("Worker received {:?}", job.msg); + + self.counter.fetch_add(1, Ordering::Relaxed); + + if let Some(timeout_ms) = self.slow { + crate::concurrency::sleep(Duration::from_millis(timeout_ms)).await; + } + + // job finished, on success or err we report back to the factory + state + .factory + .cast(FactoryMessage::Finished(state.wid, job.key))?; + } + } + Ok(()) + } +} + +struct FastTestWorkerBuilder { + counters: [Arc; NUM_TEST_WORKERS], +} + +impl WorkerBuilder for FastTestWorkerBuilder { + fn build(&self, wid: usize) -> (TestWorker, ()) { + ( + TestWorker { + counter: self.counters[wid].clone(), + slow: None, + }, + (), + ) + } +} + +struct SlowTestWorkerBuilder { + counters: [Arc; NUM_TEST_WORKERS], +} + +impl WorkerBuilder for SlowTestWorkerBuilder { + fn build(&self, wid: usize) -> (TestWorker, ()) { + ( + TestWorker { + counter: self.counters[wid].clone(), + slow: Some(10), + }, + (), + ) + } +} + +struct InsanelySlowWorkerBuilder { + counters: [Arc; NUM_TEST_WORKERS], +} + +impl WorkerBuilder for InsanelySlowWorkerBuilder { + fn build(&self, wid: usize) -> (TestWorker, ()) { + ( + TestWorker { + counter: self.counters[wid].clone(), + slow: Some(10000), + }, + (), + ) + } +} + +#[crate::concurrency::test] +#[tracing_test::traced_test] +async fn test_dispatch_key_persistent() { + let worker_counters: [_; NUM_TEST_WORKERS] = [ + Arc::new(AtomicU16::new(0)), + Arc::new(AtomicU16::new(0)), + Arc::new(AtomicU16::new(0)), + ]; + + let worker_builder = FastTestWorkerBuilder { + counters: worker_counters.clone(), + }; + let factory_definition = Factory::< + TestKey, + TestMessage, + (), + TestWorker, + routing::KeyPersistentRouting, + DefaultQueue, + >::default(); + let (factory, factory_handle) = Actor::spawn( + None, + factory_definition, + FactoryArguments { + num_initial_workers: NUM_TEST_WORKERS, + queue: DefaultQueue::default(), + router: Default::default(), + capacity_controller: None, + dead_mans_switch: None, + discard_handler: None, + discard_settings: DiscardSettings::None, + lifecycle_hooks: None, + worker_builder: Box::new(worker_builder), + collect_worker_stats: false, + }, + ) + .await + .expect("Failed to spawn factory"); + + for _ in 0..999 { + factory + .cast(FactoryMessage::Dispatch(Job { + key: TestKey { id: 1 }, + msg: TestMessage::Ok, + options: JobOptions::default(), + })) + .expect("Failed to send to factory"); + } + + // give some time to process all the messages + let check_counters = worker_counters[0].clone(); + periodic_check( + move || { + let all_counter = check_counters.load(Ordering::Relaxed); + all_counter == 999 + }, + Duration::from_secs(3), + ) + .await; + + factory.stop(None); + factory_handle.await.unwrap(); + + println!( + "Counters: [{}] [{}] [{}]", + worker_counters[0].load(Ordering::Relaxed), + worker_counters[1].load(Ordering::Relaxed), + worker_counters[2].load(Ordering::Relaxed) + ); +} + +#[crate::concurrency::test] +#[tracing_test::traced_test] +async fn test_dispatch_queuer() { + let worker_counters: [_; NUM_TEST_WORKERS] = [ + Arc::new(AtomicU16::new(0)), + Arc::new(AtomicU16::new(0)), + Arc::new(AtomicU16::new(0)), + ]; + + let worker_builder = FastTestWorkerBuilder { + counters: worker_counters.clone(), + }; + let factory_definition = Factory::< + TestKey, + TestMessage, + (), + TestWorker, + routing::QueuerRouting, + DefaultQueue, + >::default(); + let (factory, factory_handle) = Actor::spawn( + None, + factory_definition, + FactoryArguments { + num_initial_workers: NUM_TEST_WORKERS, + queue: DefaultQueue::default(), + router: Default::default(), + capacity_controller: None, + dead_mans_switch: None, + discard_handler: None, + discard_settings: DiscardSettings::None, + lifecycle_hooks: None, + worker_builder: Box::new(worker_builder), + collect_worker_stats: false, + }, + ) + .await + .expect("Failed to spawn factory"); + + for id in 0..100 { + factory + .cast(FactoryMessage::Dispatch(Job { + key: TestKey { id }, + msg: TestMessage::Ok, + options: JobOptions::default(), + })) + .expect("Failed to send to factory"); + } + + // give some time to process all the messages + let check_counters = worker_counters.clone(); + periodic_check( + || { + check_counters + .iter() + .map(|c| c.load(Ordering::Relaxed)) + .sum::() + == 100 + }, + Duration::from_secs(5), + ) + .await; + + factory.stop(None); + factory_handle.await.unwrap(); + + println!( + "Counters: [{}] [{}] [{}]", + worker_counters[0].load(Ordering::Relaxed), + worker_counters[1].load(Ordering::Relaxed), + worker_counters[2].load(Ordering::Relaxed) + ); +} + +#[crate::concurrency::test] +#[tracing_test::traced_test] +async fn test_dispatch_round_robin() { + let worker_counters: [_; NUM_TEST_WORKERS] = [ + Arc::new(AtomicU16::new(0)), + Arc::new(AtomicU16::new(0)), + Arc::new(AtomicU16::new(0)), + ]; + + let worker_builder = FastTestWorkerBuilder { + counters: worker_counters.clone(), + }; + let factory_definition = Factory::< + TestKey, + TestMessage, + (), + TestWorker, + routing::RoundRobinRouting, + DefaultQueue, + >::default(); + let (factory, factory_handle) = Actor::spawn( + None, + factory_definition, + FactoryArguments { + num_initial_workers: NUM_TEST_WORKERS, + queue: DefaultQueue::default(), + router: Default::default(), + capacity_controller: None, + dead_mans_switch: None, + discard_handler: None, + discard_settings: DiscardSettings::None, + lifecycle_hooks: None, + worker_builder: Box::new(worker_builder), + collect_worker_stats: false, + }, + ) + .await + .expect("Failed to spawn factory"); + + for _ in 0..999 { + factory + .cast(FactoryMessage::Dispatch(Job { + key: TestKey { id: 1 }, + msg: TestMessage::Ok, + options: JobOptions::default(), + })) + .expect("Failed to send to factory"); + } + + // give some time to process all the messages + let check_counters = worker_counters.clone(); + periodic_check( + move || { + check_counters + .iter() + .all(|counter| counter.load(Ordering::Relaxed) == 333) + }, + Duration::from_secs(3), + ) + .await; + + factory.stop(None); + factory_handle.await.unwrap(); + + println!( + "Counters: [{}] [{}] [{}]", + worker_counters[0].load(Ordering::Relaxed), + worker_counters[1].load(Ordering::Relaxed), + worker_counters[2].load(Ordering::Relaxed) + ); +} + +#[crate::concurrency::test] +#[tracing_test::traced_test] +async fn test_dispatch_custom_hashing() { + struct MyHasher + where + TKey: crate::Message + Sync, + { + _key: PhantomData, + } + + impl CustomHashFunction for MyHasher + where + TKey: crate::Message + Sync, + { + fn hash(&self, _key: &TKey, _worker_count: usize) -> usize { + 2 + } + } + + let worker_counters: [_; NUM_TEST_WORKERS] = [ + Arc::new(AtomicU16::new(0)), + Arc::new(AtomicU16::new(0)), + Arc::new(AtomicU16::new(0)), + ]; + + let worker_builder = FastTestWorkerBuilder { + counters: worker_counters.clone(), + }; + let factory_definition = Factory::< + TestKey, + TestMessage, + (), + TestWorker, + routing::CustomRouting>, + DefaultQueue, + >::default(); + let (factory, factory_handle) = Actor::spawn( + None, + factory_definition, + FactoryArguments { + num_initial_workers: NUM_TEST_WORKERS, + queue: DefaultQueue::default(), + router: routing::CustomRouting::new(MyHasher { _key: PhantomData }), + capacity_controller: None, + dead_mans_switch: None, + discard_handler: None, + discard_settings: DiscardSettings::None, + lifecycle_hooks: None, + worker_builder: Box::new(worker_builder), + collect_worker_stats: false, + }, + ) + .await + .expect("Failed to spawn factory"); + + for _ in 0..999 { + factory + .cast(FactoryMessage::Dispatch(Job { + key: TestKey { id: 1 }, + msg: TestMessage::Ok, + options: JobOptions::default(), + })) + .expect("Failed to send to factory"); + } + + // give some time to process all the messages + let check_counters = worker_counters.clone(); + periodic_check( + move || check_counters[2].load(Ordering::Relaxed) == 999, + Duration::from_secs(3), + ) + .await; + + factory.stop(None); + factory_handle.await.unwrap(); + + println!( + "Counters: [{}] [{}] [{}]", + worker_counters[0].load(Ordering::Relaxed), + worker_counters[1].load(Ordering::Relaxed), + worker_counters[2].load(Ordering::Relaxed) + ); +} + +#[crate::concurrency::test] +#[tracing_test::traced_test] +async fn test_dispatch_sticky_queueing() { + let worker_counters: [_; NUM_TEST_WORKERS] = [ + Arc::new(AtomicU16::new(0)), + Arc::new(AtomicU16::new(0)), + Arc::new(AtomicU16::new(0)), + ]; + + let worker_builder = SlowTestWorkerBuilder { + counters: worker_counters.clone(), + }; + let factory_definition = Factory::< + TestKey, + TestMessage, + (), + TestWorker, + routing::StickyQueuerRouting, + DefaultQueue, + >::default(); + let (factory, factory_handle) = Actor::spawn( + None, + factory_definition, + FactoryArguments { + num_initial_workers: NUM_TEST_WORKERS, + queue: DefaultQueue::default(), + router: Default::default(), + capacity_controller: None, + dead_mans_switch: None, + discard_handler: None, + discard_settings: DiscardSettings::None, + lifecycle_hooks: None, + worker_builder: Box::new(worker_builder), + collect_worker_stats: false, + }, + ) + .await + .expect("Failed to spawn factory"); + + // Since we're dispatching all of the same key, they should all get routed to the same worker + for _ in 0..5 { + factory + .cast(FactoryMessage::Dispatch(Job { + key: TestKey { id: 1 }, + msg: TestMessage::Ok, + options: JobOptions::default(), + })) + .expect("Failed to send to factory"); + } + + // give some time to process all the messages + let check_counters = worker_counters.clone(); + periodic_check( + move || { + // one worker got all 5 messages due to sticky queueing + check_counters + .iter() + .any(|counter| counter.load(Ordering::Relaxed) == 5) + }, + Duration::from_secs(3), + ) + .await; + + factory.stop(None); + factory_handle.await.unwrap(); + + println!( + "Counters: [{}] [{}] [{}]", + worker_counters[0].load(Ordering::Relaxed), + worker_counters[1].load(Ordering::Relaxed), + worker_counters[2].load(Ordering::Relaxed) + ); +} + +#[crate::concurrency::test] +#[tracing_test::traced_test] +async fn test_discarding_old_records_on_queuer() { + let worker_counters: [_; NUM_TEST_WORKERS] = [ + Arc::new(AtomicU16::new(0)), + Arc::new(AtomicU16::new(0)), + Arc::new(AtomicU16::new(0)), + ]; + let discard_counter = Arc::new(AtomicU16::new(0)); + + struct TestDiscarder { + counter: Arc, + } + impl DiscardHandler for TestDiscarder { + fn discard(&self, _reason: DiscardReason, _job: &mut Job) { + let _ = self.counter.fetch_add(1, Ordering::Relaxed); + } + } + + let worker_builder = InsanelySlowWorkerBuilder { + counters: worker_counters.clone(), + }; + let factory_definition = Factory::< + TestKey, + TestMessage, + (), + TestWorker, + routing::QueuerRouting, + DefaultQueue, + >::default(); + let (factory, factory_handle) = Actor::spawn( + None, + factory_definition, + FactoryArguments { + num_initial_workers: NUM_TEST_WORKERS, + queue: DefaultQueue::default(), + router: Default::default(), + capacity_controller: None, + dead_mans_switch: None, + discard_handler: Some(Arc::new(TestDiscarder { + counter: discard_counter.clone(), + })), + discard_settings: DiscardSettings::Static { + limit: 5, + mode: DiscardMode::Oldest, + }, + lifecycle_hooks: None, + worker_builder: Box::new(worker_builder), + collect_worker_stats: false, + }, + ) + .await + .expect("Failed to spawn factory"); + + for _ in 0..108 { + factory + .cast(FactoryMessage::Dispatch(Job { + key: TestKey { id: 1 }, + msg: TestMessage::Ok, + options: JobOptions::default(), + })) + .expect("Failed to send to factory"); + } + + // give some time to process all the messages + crate::concurrency::sleep(Duration::from_millis(250)).await; + + println!( + "Counters: [{}] [{}] [{}]", + worker_counters[0].load(Ordering::Relaxed), + worker_counters[1].load(Ordering::Relaxed), + worker_counters[2].load(Ordering::Relaxed) + ); + + // assert + + // each worker only got 1 message, then "slept" + assert!(worker_counters + .iter() + .map(|c| c.load(Ordering::Relaxed)) + .all(|count| count == 1)); + + // 5 messages should be left in the factory's queue, while the remaining should get "discarded" + // (3 in workers) + (5 in queue) + (100 discarded) = 108, the number of msgs we sent to the factory + assert_eq!(100, discard_counter.load(Ordering::Relaxed)); + + // wait for factory termination + factory.stop(None); + factory_handle.await.unwrap(); +} + +struct StuckWorker { + counter: Arc, + slow: Option, +} + +#[cfg_attr(feature = "async-trait", crate::async_trait)] +impl Actor for StuckWorker { + type Msg = WorkerMessage; + type State = Self::Arguments; + type Arguments = WorkerStartContext; + + async fn pre_start( + &self, + _myself: ActorRef, + startup_context: Self::Arguments, + ) -> Result { + self.counter.fetch_add(1, Ordering::Relaxed); + Ok(startup_context) + } + + async fn handle( + &self, + _myself: ActorRef, + message: Self::Msg, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + match message { + WorkerMessage::FactoryPing(time) => { + state + .factory + .cast(FactoryMessage::WorkerPong(state.wid, time.elapsed()))?; + } + WorkerMessage::Dispatch(job) => { + tracing::debug!("Worker received {:?}", job.msg); + + if let Some(timeout_ms) = self.slow { + crate::concurrency::sleep(Duration::from_millis(timeout_ms)).await; + } + + // job finished, on success or err we report back to the factory + state + .factory + .cast(FactoryMessage::Finished(state.wid, job.key))?; + } + } + Ok(()) + } +} + +#[crate::concurrency::test] +#[tracing_test::traced_test] +async fn test_stuck_workers() { + let worker_counters: [_; NUM_TEST_WORKERS] = [ + Arc::new(AtomicU16::new(0)), + Arc::new(AtomicU16::new(0)), + Arc::new(AtomicU16::new(0)), + ]; + + struct StuckWorkerBuilder { + counters: [Arc; NUM_TEST_WORKERS], + } + + impl WorkerBuilder for StuckWorkerBuilder { + fn build(&self, wid: usize) -> (TestWorker, ()) { + ( + TestWorker { + counter: self.counters[wid].clone(), + slow: Some(10000), + }, + (), + ) + } + } + + let worker_builder = StuckWorkerBuilder { + counters: worker_counters.clone(), + }; + let factory_definition = Factory::< + TestKey, + TestMessage, + (), + TestWorker, + routing::RoundRobinRouting, + DefaultQueue, + >::default(); + let (factory, factory_handle) = Actor::spawn( + None, + factory_definition, + FactoryArguments { + num_initial_workers: NUM_TEST_WORKERS, + queue: DefaultQueue::default(), + router: Default::default(), + capacity_controller: None, + dead_mans_switch: Some(DeadMansSwitchConfiguration { + detection_timeout: Duration::from_millis(50), + kill_worker: true, + }), + discard_handler: None, + discard_settings: DiscardSettings::None, + lifecycle_hooks: None, + worker_builder: Box::new(worker_builder), + collect_worker_stats: false, + }, + ) + .await + .expect("Failed to spawn factory"); + + for _ in 0..9 { + factory + .cast(FactoryMessage::Dispatch(Job { + key: TestKey { id: 1 }, + msg: TestMessage::Ok, + options: JobOptions::default(), + })) + .expect("Failed to send to factory"); + } + + // give some time to process all the messages + crate::concurrency::sleep(Duration::from_millis(500)).await; + + // wait for factory termination + factory.stop(None); + factory_handle.await.unwrap(); + + println!( + "Counters: [{}] [{}] [{}]", + worker_counters[0].load(Ordering::Relaxed), + worker_counters[1].load(Ordering::Relaxed), + worker_counters[2].load(Ordering::Relaxed) + ); + + // assert + + // each worker only got 1 message, then "slept" + assert!(worker_counters + .iter() + .map(|c| c.load(Ordering::Relaxed)) + .all(|count| count > 1)); +} + +#[crate::concurrency::test] +#[tracing_test::traced_test] +async fn test_discarding_new_records_on_queuer() { + let worker_counters: [_; NUM_TEST_WORKERS] = [ + Arc::new(AtomicU16::new(0)), + Arc::new(AtomicU16::new(0)), + Arc::new(AtomicU16::new(0)), + ]; + let discard_counter = Arc::new(AtomicU16::new(0)); + + struct TestDiscarder { + counter: Arc, + } + impl DiscardHandler for TestDiscarder { + fn discard(&self, _reason: DiscardReason, job: &mut Job) { + if let TestMessage::Count(count) = job.msg { + let _ = self.counter.fetch_add(count, Ordering::Relaxed); + } + } + } + + let worker_builder = InsanelySlowWorkerBuilder { + counters: worker_counters.clone(), + }; + let factory_definition = Factory::< + TestKey, + TestMessage, + (), + TestWorker, + routing::QueuerRouting, + DefaultQueue, + >::default(); + let (factory, factory_handle) = Actor::spawn( + None, + factory_definition, + FactoryArguments { + num_initial_workers: NUM_TEST_WORKERS, + queue: DefaultQueue::default(), + router: Default::default(), + capacity_controller: None, + dead_mans_switch: None, + discard_handler: Some(Arc::new(TestDiscarder { + counter: discard_counter.clone(), + })), + discard_settings: DiscardSettings::Static { + limit: 5, + mode: DiscardMode::Newest, + }, + lifecycle_hooks: None, + worker_builder: Box::new(worker_builder), + collect_worker_stats: false, + }, + ) + .await + .expect("Failed to spawn factory"); + + for i in 0..10 { + factory + .cast(FactoryMessage::Dispatch(Job { + key: TestKey { id: 1 }, + msg: TestMessage::Count(i), + options: JobOptions::default(), + })) + .expect("Failed to send to factory"); + } + + // give some time to process all the messages + crate::concurrency::sleep(Duration::from_millis(250)).await; + + println!( + "Counters: [{}] [{}] [{}]", + worker_counters[0].load(Ordering::Relaxed), + worker_counters[1].load(Ordering::Relaxed), + worker_counters[2].load(Ordering::Relaxed) + ); + + // assert + + // each worker only got 1 message, then "slept" + assert!(worker_counters + .iter() + .map(|c| c.load(Ordering::Relaxed)) + .all(|count| count == 1)); + + // 5 messages should be left in the factory's queue, while the remaining should get "discarded" + // + // The "newest" messages had values (8) and (9), respectively, which together should mean + // the discard counter is 17 + assert_eq!(17, discard_counter.load(Ordering::Relaxed)); + + // wait for factory termination + factory.stop(None); + factory_handle.await.unwrap(); +} diff --git a/ractor/src/factory/tests/draining_requests.rs b/ractor/src/factory/tests/draining_requests.rs new file mode 100644 index 00000000..522c99fa --- /dev/null +++ b/ractor/src/factory/tests/draining_requests.rs @@ -0,0 +1,171 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Tests around draining a factory of current work, making sure all jobs execute before the factory exits + +use std::sync::atomic::AtomicU16; +use std::sync::atomic::Ordering; +use std::sync::Arc; + +use crate::concurrency::sleep; +use crate::concurrency::Duration; +use crate::factory::*; +use crate::Actor; +use crate::ActorProcessingErr; +use crate::ActorRef; + +#[derive(Debug, Hash, Clone, Eq, PartialEq)] +struct TestKey { + id: u64, +} +#[cfg(feature = "cluster")] +impl crate::BytesConvertable for TestKey { + fn from_bytes(bytes: Vec) -> Self { + Self { + id: u64::from_bytes(bytes), + } + } + fn into_bytes(self) -> Vec { + self.id.into_bytes() + } +} + +#[derive(Debug)] +enum TestMessage { + /// Doh'k + #[allow(dead_code)] + Ok, + /// Doh'k + #[allow(dead_code)] + Count(u16), +} +#[cfg(feature = "cluster")] +impl crate::Message for TestMessage {} + +struct TestWorker { + counter: Arc, +} + +#[cfg_attr(feature = "async-trait", crate::async_trait)] +impl Actor for TestWorker { + type Msg = WorkerMessage; + type State = Self::Arguments; + type Arguments = WorkerStartContext; + + async fn pre_start( + &self, + _: ActorRef, + args: Self::Arguments, + ) -> Result { + Ok(args) + } + + async fn handle( + &self, + _: ActorRef, + message: Self::Msg, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + match message { + WorkerMessage::FactoryPing(time) => { + state + .factory + .cast(FactoryMessage::WorkerPong(state.wid, time.elapsed()))?; + } + WorkerMessage::Dispatch(job) => { + self.counter.fetch_add(1, Ordering::Relaxed); + + sleep(Duration::from_millis(5)).await; + + // job finished, on success or err we report back to the factory + state + .factory + .cast(FactoryMessage::Finished(state.wid, job.key))?; + } + } + Ok(()) + } +} + +struct SlowWorkerBuilder { + counter: Arc, +} + +impl WorkerBuilder for SlowWorkerBuilder { + fn build(&self, _wid: usize) -> (TestWorker, ()) { + ( + TestWorker { + counter: self.counter.clone(), + }, + (), + ) + } +} + +#[crate::concurrency::test] +#[tracing_test::traced_test] +async fn test_request_draining() { + let counter = Arc::new(AtomicU16::new(0)); + + let worker_builder = SlowWorkerBuilder { + counter: counter.clone(), + }; + let factory_definition = Factory::< + TestKey, + TestMessage, + (), + TestWorker, + routing::QueuerRouting, + queues::DefaultQueue, + >::default(); + let (factory, factory_handle) = Actor::spawn( + None, + factory_definition, + FactoryArguments { + num_initial_workers: 2, + queue: queues::DefaultQueue::default(), + router: Default::default(), + capacity_controller: None, + dead_mans_switch: None, + discard_handler: None, + discard_settings: DiscardSettings::None, + lifecycle_hooks: None, + worker_builder: Box::new(worker_builder), + collect_worker_stats: false, + }, + ) + .await + .expect("Failed to spawn factory"); + + for id in 0..999 { + factory + .cast(FactoryMessage::Dispatch(Job { + key: TestKey { id }, + msg: TestMessage::Ok, + options: JobOptions::default(), + })) + .expect("Failed to send to factory"); + } + + // start draining requests + factory + .cast(FactoryMessage::DrainRequests) + .expect("Failed to contact factory"); + + // try and push a new message, but it should be rejected since we're now draining + factory + .cast(FactoryMessage::Dispatch(Job { + key: TestKey { id: 1000 }, + msg: TestMessage::Ok, + options: JobOptions::default(), + })) + .expect("Failed to send to factory"); + + // wait for factory to exit (it should once drained) + factory_handle.await.unwrap(); + + // check the counter + assert_eq!(999, counter.load(Ordering::Relaxed)); +} diff --git a/ractor/src/factory/tests/dynamic_discarding.rs b/ractor/src/factory/tests/dynamic_discarding.rs new file mode 100644 index 00000000..0ce9fbb2 --- /dev/null +++ b/ractor/src/factory/tests/dynamic_discarding.rs @@ -0,0 +1,223 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +use std::sync::atomic::AtomicU16; +use std::sync::atomic::Ordering; +use std::sync::Arc; + +use crate::concurrency::Duration; +use crate::factory::*; +use crate::Actor; +use crate::ActorProcessingErr; +use crate::ActorRef; + +const NUM_TEST_WORKERS: usize = 2; + +#[derive(Debug, Hash, Clone, Eq, PartialEq)] +struct TestKey { + id: u64, +} +#[cfg(feature = "cluster")] +impl crate::BytesConvertable for TestKey { + fn from_bytes(bytes: Vec) -> Self { + Self { + id: u64::from_bytes(bytes), + } + } + fn into_bytes(self) -> Vec { + self.id.into_bytes() + } +} + +#[derive(Debug)] +enum TestMessage { + /// Doh'k + #[allow(dead_code)] + Count(u16), +} +#[cfg(feature = "cluster")] +impl crate::Message for TestMessage {} + +struct TestWorker { + counter: Arc, + slow: Option, +} + +#[cfg_attr(feature = "async-trait", crate::async_trait)] +impl Actor for TestWorker { + type Msg = WorkerMessage; + type State = Self::Arguments; + type Arguments = WorkerStartContext; + + async fn pre_start( + &self, + _myself: ActorRef, + startup_context: Self::Arguments, + ) -> Result { + Ok(startup_context) + } + + async fn handle( + &self, + _myself: ActorRef, + message: Self::Msg, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + match message { + WorkerMessage::FactoryPing(time) => { + state + .factory + .cast(FactoryMessage::WorkerPong(state.wid, time.elapsed()))?; + } + WorkerMessage::Dispatch(job) => { + tracing::debug!("Worker received {:?}", job.msg); + + self.counter.fetch_add(1, Ordering::Relaxed); + + if let Some(timeout_ms) = self.slow { + crate::concurrency::sleep(Duration::from_millis(timeout_ms)).await; + } + + // job finished, on success or err we report back to the factory + state + .factory + .cast(FactoryMessage::Finished(state.wid, job.key))?; + } + } + Ok(()) + } +} + +struct SlowTestWorkerBuilder { + counters: [Arc; NUM_TEST_WORKERS], +} + +impl WorkerBuilder for SlowTestWorkerBuilder { + fn build(&self, wid: usize) -> (TestWorker, ()) { + ( + TestWorker { + counter: self.counters[wid].clone(), + slow: Some(10), + }, + (), + ) + } +} + +struct TestDiscarder { + counter: Arc, +} +impl DiscardHandler for TestDiscarder { + fn discard(&self, _reason: DiscardReason, job: &mut Job) { + let TestMessage::Count(count) = job.msg; + let _ = self.counter.fetch_add(count, Ordering::Relaxed); + } +} + +struct DiscardController {} + +#[cfg_attr(feature = "async-trait", crate::async_trait)] +impl DynamicDiscardController for DiscardController { + async fn compute(&mut self, _current_threshold: usize) -> usize { + 10 + } +} + +#[crate::concurrency::test] +#[tracing_test::traced_test] +async fn test_dynamic_dispatch_basic() { + // let handle = tokio::runtime::Handle::current(); + // Setup + let worker_counters: [_; NUM_TEST_WORKERS] = + [Arc::new(AtomicU16::new(0)), Arc::new(AtomicU16::new(0))]; + let discard_counter = Arc::new(AtomicU16::new(0)); + + let worker_builder = SlowTestWorkerBuilder { + counters: worker_counters.clone(), + }; + let factory_definition = Factory::< + TestKey, + TestMessage, + (), + TestWorker, + routing::QueuerRouting, + queues::DefaultQueue, + >::default(); + let (factory, factory_handle) = Actor::spawn( + None, + factory_definition, + FactoryArguments { + num_initial_workers: NUM_TEST_WORKERS, + queue: queues::DefaultQueue::default(), + router: Default::default(), + capacity_controller: None, + dead_mans_switch: None, + discard_handler: Some(Arc::new(TestDiscarder { + counter: discard_counter.clone(), + })), + discard_settings: DiscardSettings::Dynamic { + limit: 5, + mode: DiscardMode::Newest, + updater: Box::new(DiscardController {}), + }, + lifecycle_hooks: None, + worker_builder: Box::new(worker_builder), + collect_worker_stats: false, + }, + ) + .await + .expect("Failed to spawn factory"); + + // Act + for i in 0..10 { + factory + .cast(FactoryMessage::Dispatch(Job { + key: TestKey { id: 1 }, + msg: TestMessage::Count(i), + options: JobOptions::default(), + })) + .expect("Failed to send to factory"); + } + // give some time to process all the messages (10ms/msg by 2 workers for 7 msgs) + crate::periodic_check( + || { + // Assert + // we should have shed the 3 newest messages, so 7, 8, 9 + discard_counter.load(Ordering::Relaxed) == 24 + }, + Duration::from_secs(1), + ) + .await; + + // now we wait for the ping to change the discard threshold to 10 + crate::concurrency::sleep(Duration::from_millis(300)).await; + + // Act again + for i in 0..14 { + factory + .cast(FactoryMessage::Dispatch(Job { + key: TestKey { id: 1 }, + msg: TestMessage::Count(i), + options: JobOptions::default(), + })) + .expect("Failed to send to factory"); + } + + // give some time to process all the messages (10ms/msg by 2 workers for 7 msgs) + crate::periodic_check( + || { + // Assert + // we should have shed the 2 newest messages, so 12 and 13 + original amount of 24 + discard_counter.load(Ordering::Relaxed) == 49 + }, + Duration::from_secs(1), + ) + .await; + + // Cleanup + // wait for factory termination + factory.stop(None); + factory_handle.await.unwrap(); +} diff --git a/ractor/src/factory/tests/dynamic_pool.rs b/ractor/src/factory/tests/dynamic_pool.rs new file mode 100644 index 00000000..e266499e --- /dev/null +++ b/ractor/src/factory/tests/dynamic_pool.rs @@ -0,0 +1,287 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Tests around dynamic worker pool configuration. These require a more complex setup than the basic tests +//! and therefore are separated out + +use std::sync::Arc; + +use crate::concurrency::Duration; +use crate::Actor; +use crate::ActorProcessingErr; +use crate::ActorRef; + +use crate::factory::*; + +#[derive(Debug, Hash, Clone, Eq, PartialEq)] +struct TestKey { + id: u64, +} + +#[derive(Debug)] +enum TestMessage { + /// Doh'k + #[allow(dead_code)] + Count(u16), +} +#[cfg(feature = "cluster")] +impl crate::BytesConvertable for TestKey { + fn from_bytes(bytes: Vec) -> Self { + Self { + id: u64::from_bytes(bytes), + } + } + fn into_bytes(self) -> Vec { + self.id.into_bytes() + } +} + +struct TestWorker { + id_map: Arc>, +} +#[cfg(feature = "cluster")] +impl crate::Message for TestMessage {} + +#[cfg_attr(feature = "async-trait", crate::async_trait)] +impl Actor for TestWorker { + type Msg = WorkerMessage; + type State = Self::Arguments; + type Arguments = WorkerStartContext; + + async fn pre_start( + &self, + _myself: ActorRef, + startup_context: Self::Arguments, + ) -> Result { + Ok(startup_context) + } + + async fn handle( + &self, + _myself: ActorRef, + message: Self::Msg, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + match message { + WorkerMessage::FactoryPing(time) => { + state + .factory + .cast(FactoryMessage::WorkerPong(state.wid, time.elapsed()))?; + } + WorkerMessage::Dispatch(job) => { + tracing::debug!("Worker received {:?}", job.msg); + + self.id_map.insert(state.wid); + + // job finished, on success or err we report back to the factory + state + .factory + .cast(FactoryMessage::Finished(state.wid, job.key))?; + } + } + Ok(()) + } +} + +struct TestWorkerBuilder { + id_map: Arc>, +} + +impl WorkerBuilder for TestWorkerBuilder { + fn build(&self, _wid: usize) -> (TestWorker, ()) { + ( + TestWorker { + id_map: self.id_map.clone(), + }, + (), + ) + } +} + +#[crate::concurrency::test] +#[tracing_test::traced_test] +async fn test_worker_pool_adjustment_manual() { + // Setup + + let id_map = Arc::new(dashmap::DashSet::new()); + + let worker_builder = TestWorkerBuilder { + id_map: id_map.clone(), + }; + let factory_definition = Factory::< + TestKey, + TestMessage, + (), + TestWorker, + routing::RoundRobinRouting, + queues::DefaultQueue, + >::default(); + let (factory, factory_handle) = Actor::spawn( + None, + factory_definition, + FactoryArguments { + num_initial_workers: 4, + queue: queues::DefaultQueue::default(), + router: Default::default(), + capacity_controller: None, + dead_mans_switch: None, + discard_handler: None, + discard_settings: DiscardSettings::None, + lifecycle_hooks: None, + worker_builder: Box::new(worker_builder), + collect_worker_stats: false, + }, + ) + .await + .expect("Failed to spawn factory"); + + // Act + for i in 0..50 { + factory + .cast(FactoryMessage::Dispatch(Job { + key: TestKey { id: 1 }, + msg: TestMessage::Count(i), + options: JobOptions::default(), + })) + .expect("Failed to send to factory"); + } + + crate::periodic_check( + || { + // The map should only have 4 entries, the id of each worker + id_map.len() == 4 + }, + Duration::from_millis(200), + ) + .await; + + // Setup new state + id_map.clear(); + factory + .cast(FactoryMessage::AdjustWorkerPool(25)) + .expect("Failed to send to factory"); + + // Act again + for i in 0..50 { + factory + .cast(FactoryMessage::Dispatch(Job { + key: TestKey { id: 1 }, + msg: TestMessage::Count(i), + options: JobOptions::default(), + })) + .expect("Failed to send to factory"); + } + + crate::periodic_check( + || { + // The map should have 25 entries, the id of each worker + id_map.len() == 25 + }, + Duration::from_millis(200), + ) + .await; + + // Cleanup + // wait for factory termination + factory.stop(None); + factory_handle.await.unwrap(); +} + +#[crate::concurrency::test] +#[tracing_test::traced_test] +async fn test_worker_pool_adjustment_automatic() { + // Setup + + struct DynamicWorkerController; + + #[cfg_attr(feature = "async-trait", crate::async_trait)] + impl WorkerCapacityController for DynamicWorkerController { + async fn get_pool_size(&mut self, _current: usize) -> usize { + 10 + } + } + + let id_map = Arc::new(dashmap::DashSet::new()); + + let worker_builder = TestWorkerBuilder { + id_map: id_map.clone(), + }; + let factory_definition = Factory::< + TestKey, + TestMessage, + (), + TestWorker, + routing::RoundRobinRouting, + queues::DefaultQueue, + >::default(); + let (factory, factory_handle) = Actor::spawn( + None, + factory_definition, + FactoryArguments { + num_initial_workers: 4, + queue: queues::DefaultQueue::default(), + router: Default::default(), + capacity_controller: Some(Box::new(DynamicWorkerController)), + dead_mans_switch: None, + discard_handler: None, + discard_settings: DiscardSettings::None, + lifecycle_hooks: None, + worker_builder: Box::new(worker_builder), + collect_worker_stats: false, + }, + ) + .await + .expect("Failed to spawn factory"); + + // Act + for i in 0..50 { + factory + .cast(FactoryMessage::Dispatch(Job { + key: TestKey { id: 1 }, + msg: TestMessage::Count(i), + options: JobOptions::default(), + })) + .expect("Failed to send to factory"); + } + + crate::periodic_check( + || { + // The map should only have 4 entries, the id of each worker + id_map.len() == 4 + }, + Duration::from_millis(200), + ) + .await; + + // Setup new state + id_map.clear(); + // now we wait for the ping to change the worker pool to 10 + crate::concurrency::sleep(Duration::from_millis(300)).await; + + // Act again + for i in 0..50 { + factory + .cast(FactoryMessage::Dispatch(Job { + key: TestKey { id: 1 }, + msg: TestMessage::Count(i), + options: JobOptions::default(), + })) + .expect("Failed to send to factory"); + } + + crate::periodic_check( + || { + // The map should have 10 entries, the id of each worker + id_map.len() == 10 + }, + Duration::from_millis(200), + ) + .await; + + // Cleanup + // wait for factory termination + factory.stop(None); + factory_handle.await.unwrap(); +} diff --git a/ractor/src/factory/tests/lifecycle.rs b/ractor/src/factory/tests/lifecycle.rs new file mode 100644 index 00000000..e166db59 --- /dev/null +++ b/ractor/src/factory/tests/lifecycle.rs @@ -0,0 +1,168 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Lifecycle hooks tests + +use std::sync::atomic::AtomicU8; +use std::sync::atomic::Ordering; +use std::sync::Arc; + +use crate::concurrency::sleep; +use crate::concurrency::Duration; +use crate::Actor; +use crate::ActorProcessingErr; +use crate::ActorRef; + +use crate::factory::*; +use crate::periodic_check; + +#[derive(Clone)] +struct AtomicHooks { + state: Arc, +} + +#[cfg_attr(feature = "async-trait", crate::async_trait)] +impl FactoryLifecycleHooks<(), ()> for AtomicHooks { + async fn on_factory_started( + &self, + _factory_ref: ActorRef>, + ) -> Result<(), ActorProcessingErr> { + self.state.store(1, Ordering::SeqCst); + Ok(()) + } + + async fn on_factory_stopped(&self) -> Result<(), ActorProcessingErr> { + self.state.store(3, Ordering::SeqCst); + Ok(()) + } + + async fn on_factory_draining( + &self, + _factory_ref: ActorRef>, + ) -> Result<(), ActorProcessingErr> { + self.state.store(2, Ordering::SeqCst); + Ok(()) + } +} + +struct TestWorker; + +#[cfg_attr(feature = "async-trait", crate::async_trait)] +impl Actor for TestWorker { + type State = Self::Arguments; + type Msg = WorkerMessage<(), ()>; + type Arguments = WorkerStartContext<(), (), ()>; + + async fn pre_start( + &self, + _: ActorRef, + args: Self::Arguments, + ) -> Result { + // slow down factory startup waiting for workers to spawn + sleep(Duration::from_millis(100)).await; + Ok(args) + } + + async fn post_stop( + &self, + _: ActorRef, + _: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + // slow down factory shutdown waiting for workers to die + sleep(Duration::from_millis(100)).await; + Ok(()) + } + + async fn handle( + &self, + _: ActorRef, + message: Self::Msg, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + match message { + WorkerMessage::FactoryPing(time) => { + state + .factory + .cast(FactoryMessage::WorkerPong(state.wid, time.elapsed()))?; + } + WorkerMessage::Dispatch(_job) => { + tracing::warn!("Worker received message"); + // job finished, on success or err we report back to the factory + state + .factory + .cast(FactoryMessage::Finished(state.wid, ()))?; + } + } + Ok(()) + } +} + +struct TestWorkerBuilder; + +impl WorkerBuilder for TestWorkerBuilder { + fn build(&self, _wid: crate::factory::WorkerId) -> (TestWorker, ()) { + (TestWorker, ()) + } +} + +#[crate::concurrency::test] +#[tracing_test::traced_test] +async fn test_lifecycle_hooks() { + let hooks = AtomicHooks { + state: Arc::new(AtomicU8::new(0)), + }; + + let worker_builder = TestWorkerBuilder; + + let factory_definition = Factory::< + (), + (), + (), + TestWorker, + routing::QueuerRouting<(), ()>, + queues::DefaultQueue<(), ()>, + >::default(); + let (factory, factory_handle) = Actor::spawn( + None, + factory_definition, + FactoryArguments { + num_initial_workers: 1, + queue: Default::default(), + router: Default::default(), + capacity_controller: None, + dead_mans_switch: None, + discard_handler: None, + discard_settings: DiscardSettings::None, + lifecycle_hooks: Some(Box::new(hooks.clone())), + worker_builder: Box::new(worker_builder), + collect_worker_stats: false, + }, + ) + .await + .expect("Failed to spawn factory"); + + // startup has some delay creating workers, so we shouldn't see on_started called immediately + assert_eq!(0, hooks.state.load(Ordering::SeqCst)); + periodic_check( + || hooks.state.load(Ordering::SeqCst) == 1, + Duration::from_millis(500), + ) + .await; + + assert_eq!(1, hooks.state.load(Ordering::SeqCst)); + factory + .cast(FactoryMessage::DrainRequests) + .expect("Failed to message factory"); + + // give a little time to see if the factory moved to the draining state + periodic_check( + || hooks.state.load(Ordering::SeqCst) == 2, + Duration::from_millis(500), + ) + .await; + // once the factory is stopped, the shutdown handler should have been called + factory_handle.await.unwrap(); + assert_eq!(3, hooks.state.load(Ordering::SeqCst)); +} diff --git a/ractor/src/factory/tests/mod.rs b/ractor/src/factory/tests/mod.rs index d3a9dfc0..d5defdc2 100644 --- a/ractor/src/factory/tests/mod.rs +++ b/ractor/src/factory/tests/mod.rs @@ -3,727 +3,12 @@ // This source code is licensed under both the MIT license found in the // LICENSE-MIT file in the root directory of this source tree. -use std::{ - marker::PhantomData, - sync::{ - atomic::{AtomicU16, Ordering}, - Arc, - }, -}; - -use crate::{ - common_test::{periodic_async_check, periodic_check}, - concurrency::Duration, - factory::DiscardHandler, - Actor, ActorProcessingErr, ActorRef, -}; - -use super::{ - CustomHashFunction, Factory, FactoryMessage, Job, JobOptions, RoutingMode, WorkerMessage, - WorkerStartContext, -}; - +//! Factory functionality tests + +mod basic; +mod draining_requests; +mod dynamic_discarding; +mod dynamic_pool; +mod lifecycle; +mod priority_queueing; mod worker_lifecycle; - -const NUM_TEST_WORKERS: usize = 3; - -#[derive(Debug, Hash, Clone, Eq, PartialEq)] -struct TestKey { - id: u64, -} -#[cfg(feature = "cluster")] -impl crate::BytesConvertable for TestKey { - fn from_bytes(bytes: Vec) -> Self { - Self { - id: u64::from_bytes(bytes), - } - } - fn into_bytes(self) -> Vec { - self.id.into_bytes() - } -} - -#[derive(Debug)] -enum TestMessage { - /// Doh'k - #[allow(dead_code)] - Ok, -} -#[cfg(feature = "cluster")] -impl crate::Message for TestMessage {} - -struct TestWorker { - counter: Arc, - slow: Option, -} - -#[cfg_attr(feature = "async-trait", crate::async_trait)] -impl Actor for TestWorker { - type Msg = super::WorkerMessage; - type State = Self::Arguments; - type Arguments = WorkerStartContext; - - async fn pre_start( - &self, - _myself: ActorRef, - startup_context: Self::Arguments, - ) -> Result { - Ok(startup_context) - } - - async fn handle( - &self, - _myself: ActorRef, - message: Self::Msg, - state: &mut Self::State, - ) -> Result<(), ActorProcessingErr> { - match message { - WorkerMessage::FactoryPing(time) => { - state - .factory - .cast(FactoryMessage::WorkerPong(state.wid, time.elapsed()))?; - } - WorkerMessage::Dispatch(job) => { - tracing::trace!("Worker received {:?}", job.msg); - - self.counter.fetch_add(1, Ordering::Relaxed); - - if let Some(timeout_ms) = self.slow { - crate::concurrency::sleep(Duration::from_millis(timeout_ms)).await; - } - - // job finished, on success or err we report back to the factory - state - .factory - .cast(FactoryMessage::Finished(state.wid, job.key))?; - } - } - Ok(()) - } -} - -struct FastTestWorkerBuilder { - counters: [Arc; NUM_TEST_WORKERS], -} - -impl super::WorkerBuilder for FastTestWorkerBuilder { - fn build(&self, wid: usize) -> TestWorker { - TestWorker { - counter: self.counters[wid].clone(), - slow: None, - } - } -} - -struct SlowTestWorkerBuilder { - counters: [Arc; NUM_TEST_WORKERS], -} - -impl super::WorkerBuilder for SlowTestWorkerBuilder { - fn build(&self, wid: usize) -> TestWorker { - TestWorker { - counter: self.counters[wid].clone(), - slow: Some(10), - } - } -} - -struct InsanelySlowWorkerBuilder { - counters: [Arc; NUM_TEST_WORKERS], -} - -impl super::WorkerBuilder for InsanelySlowWorkerBuilder { - fn build(&self, wid: usize) -> TestWorker { - TestWorker { - counter: self.counters[wid].clone(), - slow: Some(10000), - } - } -} - -#[crate::concurrency::test] -#[tracing_test::traced_test] -async fn test_dispatch_key_persistent() { - let worker_counters: [_; NUM_TEST_WORKERS] = [ - Arc::new(AtomicU16::new(0)), - Arc::new(AtomicU16::new(0)), - Arc::new(AtomicU16::new(0)), - ]; - - let worker_builder = FastTestWorkerBuilder { - counters: worker_counters.clone(), - }; - let factory_definition = Factory:: { - worker_count: NUM_TEST_WORKERS, - routing_mode: RoutingMode::::KeyPersistent, - ..Default::default() - }; - let (factory, factory_handle) = - Actor::spawn(None, factory_definition, Box::new(worker_builder)) - .await - .expect("Failed to spawn factory"); - - for _ in 0..999 { - factory - .cast(FactoryMessage::Dispatch(Job { - key: TestKey { id: 1 }, - msg: TestMessage::Ok, - options: JobOptions::default(), - })) - .expect("Failed to send to factory"); - } - - periodic_check( - || worker_counters[0].load(Ordering::Relaxed) == 999, - Duration::from_secs(5), - ) - .await; - - factory.stop(None); - factory_handle.await.unwrap(); - - tracing::info!( - "Counters: [{}] [{}] [{}]", - worker_counters[0].load(Ordering::Relaxed), - worker_counters[1].load(Ordering::Relaxed), - worker_counters[2].load(Ordering::Relaxed) - ); -} - -// TODO: This test should probably use like a slow queuer or something to check we move to the next item, etc -#[crate::concurrency::test] -async fn test_dispatch_queuer() { - let worker_counters: [_; NUM_TEST_WORKERS] = [ - Arc::new(AtomicU16::new(0)), - Arc::new(AtomicU16::new(0)), - Arc::new(AtomicU16::new(0)), - ]; - - let worker_builder = FastTestWorkerBuilder { - counters: worker_counters.clone(), - }; - let factory_definition = Factory:: { - worker_count: NUM_TEST_WORKERS, - routing_mode: RoutingMode::::Queuer, - ..Default::default() - }; - let (factory, factory_handle) = - Actor::spawn(None, factory_definition, Box::new(worker_builder)) - .await - .expect("Failed to spawn factory"); - - for _ in 0..999 { - factory - .cast(FactoryMessage::Dispatch(Job { - key: TestKey { id: 1 }, - msg: TestMessage::Ok, - options: JobOptions::default(), - })) - .expect("Failed to send to factory"); - } - - periodic_check( - || { - let all_counter: u16 = worker_counters - .iter() - .map(|w| w.load(Ordering::Relaxed)) - .sum(); - all_counter == 999 - }, - Duration::from_secs(5), - ) - .await; - - factory.stop(None); - factory_handle.await.unwrap(); - - tracing::info!( - "Counters: [{}] [{}] [{}]", - worker_counters[0].load(Ordering::Relaxed), - worker_counters[1].load(Ordering::Relaxed), - worker_counters[2].load(Ordering::Relaxed) - ); -} - -#[crate::concurrency::test] -#[tracing_test::traced_test] -async fn test_dispatch_round_robin() { - let worker_counters: [_; NUM_TEST_WORKERS] = [ - Arc::new(AtomicU16::new(0)), - Arc::new(AtomicU16::new(0)), - Arc::new(AtomicU16::new(0)), - ]; - - let worker_builder = FastTestWorkerBuilder { - counters: worker_counters.clone(), - }; - let factory_definition = Factory:: { - worker_count: NUM_TEST_WORKERS, - routing_mode: RoutingMode::::RoundRobin, - ..Default::default() - }; - let (factory, factory_handle) = - Actor::spawn(None, factory_definition, Box::new(worker_builder)) - .await - .expect("Failed to spawn factory"); - - for _ in 0..999 { - factory - .cast(FactoryMessage::Dispatch(Job { - key: TestKey { id: 1 }, - msg: TestMessage::Ok, - options: JobOptions::default(), - })) - .expect("Failed to send to factory"); - } - - periodic_check( - || { - worker_counters - .iter() - .all(|counter| counter.load(Ordering::Relaxed) == 333) - }, - Duration::from_secs(5), - ) - .await; - - factory.stop(None); - factory_handle.await.unwrap(); - - tracing::info!( - "Counters: [{}] [{}] [{}]", - worker_counters[0].load(Ordering::Relaxed), - worker_counters[1].load(Ordering::Relaxed), - worker_counters[2].load(Ordering::Relaxed) - ); -} - -#[crate::concurrency::test] -#[tracing_test::traced_test] -async fn test_dispatch_random() { - let worker_counters: [_; NUM_TEST_WORKERS] = [ - Arc::new(AtomicU16::new(0)), - Arc::new(AtomicU16::new(0)), - Arc::new(AtomicU16::new(0)), - ]; - - let worker_builder = FastTestWorkerBuilder { - counters: worker_counters.clone(), - }; - let factory_definition = Factory:: { - worker_count: NUM_TEST_WORKERS, - routing_mode: RoutingMode::::Random, - ..Default::default() - }; - let (factory, factory_handle) = - Actor::spawn(None, factory_definition, Box::new(worker_builder)) - .await - .expect("Failed to spawn factory"); - - for _ in 0..999 { - factory - .cast(FactoryMessage::Dispatch(Job { - key: TestKey { id: 1 }, - msg: TestMessage::Ok, - options: JobOptions::default(), - })) - .expect("Failed to send to factory"); - } - - periodic_check( - || { - let all_counter: u16 = worker_counters - .iter() - .map(|w| w.load(Ordering::Relaxed)) - .sum(); - all_counter == 999 - }, - Duration::from_secs(5), - ) - .await; - - factory.stop(None); - factory_handle.await.unwrap(); - - tracing::info!( - "Counters: [{}] [{}] [{}]", - worker_counters[0].load(Ordering::Relaxed), - worker_counters[1].load(Ordering::Relaxed), - worker_counters[2].load(Ordering::Relaxed) - ); -} - -#[crate::concurrency::test] -#[tracing_test::traced_test] -async fn test_dispatch_custom_hashing() { - struct MyHasher - where - TKey: crate::Message + Sync, - { - _key: PhantomData, - } - - impl CustomHashFunction for MyHasher - where - TKey: crate::Message + Sync, - { - fn hash(&self, _key: &TKey, _worker_count: usize) -> usize { - 2 - } - } - - let worker_counters: [_; NUM_TEST_WORKERS] = [ - Arc::new(AtomicU16::new(0)), - Arc::new(AtomicU16::new(0)), - Arc::new(AtomicU16::new(0)), - ]; - - let worker_builder = FastTestWorkerBuilder { - counters: worker_counters.clone(), - }; - let factory_definition = Factory:: { - worker_count: NUM_TEST_WORKERS, - routing_mode: RoutingMode::::CustomHashFunction(Box::new(MyHasher { - _key: PhantomData, - })), - ..Default::default() - }; - let (factory, factory_handle) = - Actor::spawn(None, factory_definition, Box::new(worker_builder)) - .await - .expect("Failed to spawn factory"); - - for _ in 0..999 { - factory - .cast(FactoryMessage::Dispatch(Job { - key: TestKey { id: 1 }, - msg: TestMessage::Ok, - options: JobOptions::default(), - })) - .expect("Failed to send to factory"); - } - - periodic_check( - || { - let all_counter = worker_counters[2].load(Ordering::Relaxed); - all_counter == 999 - }, - Duration::from_secs(5), - ) - .await; - - factory.stop(None); - factory_handle.await.unwrap(); - - tracing::info!( - "Counters: [{}] [{}] [{}]", - worker_counters[0].load(Ordering::Relaxed), - worker_counters[1].load(Ordering::Relaxed), - worker_counters[2].load(Ordering::Relaxed) - ); -} - -#[crate::concurrency::test] -#[tracing_test::traced_test] -async fn test_dispatch_sticky_queueing() { - let worker_counters: [_; NUM_TEST_WORKERS] = [ - Arc::new(AtomicU16::new(0)), - Arc::new(AtomicU16::new(0)), - Arc::new(AtomicU16::new(0)), - ]; - - let worker_builder = SlowTestWorkerBuilder { - counters: worker_counters.clone(), - }; - let factory_definition = Factory:: { - worker_count: NUM_TEST_WORKERS, - routing_mode: RoutingMode::::StickyQueuer, - ..Default::default() - }; - let (factory, factory_handle) = - Actor::spawn(None, factory_definition, Box::new(worker_builder)) - .await - .expect("Failed to spawn factory"); - - // Since we're dispatching all of the same key, they should all get routed to the same worker - for _ in 0..5 { - factory - .cast(FactoryMessage::Dispatch(Job { - key: TestKey { id: 1 }, - msg: TestMessage::Ok, - options: JobOptions::default(), - })) - .expect("Failed to send to factory"); - } - - periodic_check( - || { - worker_counters - .iter() - .map(|c| c.load(Ordering::Relaxed)) - .any(|count| count == 5) - }, - Duration::from_secs(5), - ) - .await; - - factory.stop(None); - factory_handle.await.unwrap(); - - tracing::info!( - "Counters: [{}] [{}] [{}]", - worker_counters[0].load(Ordering::Relaxed), - worker_counters[1].load(Ordering::Relaxed), - worker_counters[2].load(Ordering::Relaxed) - ); -} - -#[crate::concurrency::test] -#[tracing_test::traced_test] -async fn test_discards_on_queuer() { - let worker_counters: [_; NUM_TEST_WORKERS] = [ - Arc::new(AtomicU16::new(0)), - Arc::new(AtomicU16::new(0)), - Arc::new(AtomicU16::new(0)), - ]; - let discard_counter = Arc::new(AtomicU16::new(0)); - - struct TestDiscarder { - counter: Arc, - } - impl DiscardHandler for TestDiscarder { - fn clone_box(&self) -> Box> { - Box::new(TestDiscarder { - counter: self.counter.clone(), - }) - } - fn discard(&self, _job: Job) { - let _ = self.counter.fetch_add(1, Ordering::Relaxed); - } - } - - let worker_builder = InsanelySlowWorkerBuilder { - counters: worker_counters.clone(), - }; - let factory_definition = Factory:: { - worker_count: NUM_TEST_WORKERS, - routing_mode: RoutingMode::::Queuer, - discard_handler: Some(Box::new(TestDiscarder { - counter: discard_counter.clone(), - })), - discard_threshold: Some(5), - ..Default::default() - }; - let (factory, factory_handle) = - Actor::spawn(None, factory_definition, Box::new(worker_builder)) - .await - .expect("Failed to spawn factory"); - - // Since we're dispatching all of the same key, they should all get routed to the same worker - for _ in 0..108 { - factory - .cast(FactoryMessage::Dispatch(Job { - key: TestKey { id: 1 }, - msg: TestMessage::Ok, - options: JobOptions::default(), - })) - .expect("Failed to send to factory"); - } - - // each worker only got 1 message, then "slept" - periodic_check( - || { - worker_counters - .iter() - .map(|c| c.load(Ordering::Relaxed)) - .all(|count| count == 1) - }, - Duration::from_secs(5), - ) - .await; - - // wait for factory termination - factory.stop(None); - factory_handle.await.unwrap(); - - tracing::info!( - "Counters: [{}] [{}] [{}]", - worker_counters[0].load(Ordering::Relaxed), - worker_counters[1].load(Ordering::Relaxed), - worker_counters[2].load(Ordering::Relaxed) - ); - - // 5 messages should be left in the factory's queue, while the remaining should get "discarded" - // (3 in workers) + (5 in queue) + (100 discarded) = 108, the number of msgs we sent to the factory - assert_eq!(100, discard_counter.load(Ordering::Relaxed)); -} - -struct StuckWorker { - counter: Arc, - slow: Option, -} - -#[cfg_attr(feature = "async-trait", crate::async_trait)] -impl Actor for StuckWorker { - type Msg = super::WorkerMessage; - type State = Self::Arguments; - type Arguments = WorkerStartContext; - - async fn pre_start( - &self, - _myself: ActorRef, - startup_context: Self::Arguments, - ) -> Result { - self.counter.fetch_add(1, Ordering::Relaxed); - Ok(startup_context) - } - - async fn handle( - &self, - _myself: ActorRef, - message: Self::Msg, - state: &mut Self::State, - ) -> Result<(), ActorProcessingErr> { - match message { - WorkerMessage::FactoryPing(time) => { - state - .factory - .cast(FactoryMessage::WorkerPong(state.wid, time.elapsed()))?; - } - WorkerMessage::Dispatch(job) => { - tracing::debug!("Worker received {:?}", job.msg); - - if let Some(timeout_ms) = self.slow { - crate::concurrency::sleep(Duration::from_millis(timeout_ms)).await; - } - - // job finished, on success or err we report back to the factory - state - .factory - .cast(FactoryMessage::Finished(state.wid, job.key))?; - } - } - Ok(()) - } -} - -#[crate::concurrency::test] -#[tracing_test::traced_test] -async fn test_stuck_workers() { - let worker_counters: [_; NUM_TEST_WORKERS] = [ - Arc::new(AtomicU16::new(0)), - Arc::new(AtomicU16::new(0)), - Arc::new(AtomicU16::new(0)), - ]; - - struct StuckWorkerBuilder { - counters: [Arc; NUM_TEST_WORKERS], - } - - impl super::WorkerBuilder for StuckWorkerBuilder { - fn build(&self, wid: usize) -> TestWorker { - TestWorker { - counter: self.counters[wid].clone(), - slow: Some(10000), - } - } - } - - let worker_builder = StuckWorkerBuilder { - counters: worker_counters.clone(), - }; - - let factory_definition = Factory:: { - worker_count: NUM_TEST_WORKERS, - routing_mode: RoutingMode::::RoundRobin, - dead_mans_switch: Some(super::DeadMansSwitchConfiguration { - detection_timeout: Duration::from_millis(50), - kill_worker: true, - }), - ..Default::default() - }; - let (factory, factory_handle) = - Actor::spawn(None, factory_definition, Box::new(worker_builder)) - .await - .expect("Failed to spawn factory"); - - // Since we're dispatching all of the same key, they should all get routed to the same worker - for _ in 0..9 { - factory - .cast(FactoryMessage::Dispatch(Job { - key: TestKey { id: 1 }, - msg: TestMessage::Ok, - options: JobOptions::default(), - })) - .expect("Failed to send to factory"); - } - - periodic_check( - || { - worker_counters - .iter() - .map(|c| c.load(Ordering::Relaxed)) - .all(|count| count > 1) - }, - Duration::from_secs(5), - ) - .await; - - // wait for factory termination - factory.stop(None); - factory_handle.await.unwrap(); - - tracing::info!( - "Counters: [{}] [{}] [{}]", - worker_counters[0].load(Ordering::Relaxed), - worker_counters[1].load(Ordering::Relaxed), - worker_counters[2].load(Ordering::Relaxed) - ); -} - -#[crate::concurrency::test] -#[tracing_test::traced_test] -async fn test_worker_pings() { - let worker_counters: [_; NUM_TEST_WORKERS] = [ - Arc::new(AtomicU16::new(0)), - Arc::new(AtomicU16::new(0)), - Arc::new(AtomicU16::new(0)), - ]; - - let worker_builder = FastTestWorkerBuilder { - counters: worker_counters.clone(), - }; - let factory_definition = Factory:: { - worker_count: NUM_TEST_WORKERS, - routing_mode: RoutingMode::::KeyPersistent, - collect_worker_stats: true, - ..Default::default() - }; - let (factory, factory_handle) = - Actor::spawn(None, factory_definition, Box::new(worker_builder)) - .await - .expect("Failed to spawn factory"); - - for _ in 0..999 { - factory - .cast(FactoryMessage::Dispatch(Job { - key: TestKey { id: 1 }, - msg: TestMessage::Ok, - options: JobOptions::default(), - })) - .expect("Failed to send to factory"); - } - - periodic_async_check( - || async { - let stats = crate::call_t!(factory, FactoryMessage::GetStats, 200) - .expect("Failed to get statistics"); - stats.ping_count > 0 - }, - Duration::from_secs(10), - ) - .await; - - factory.stop(None); - factory_handle.await.unwrap(); -} diff --git a/ractor/src/factory/tests/priority_queueing.rs b/ractor/src/factory/tests/priority_queueing.rs new file mode 100644 index 00000000..91df92b4 --- /dev/null +++ b/ractor/src/factory/tests/priority_queueing.rs @@ -0,0 +1,217 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +use std::sync::atomic::AtomicU16; +use std::sync::atomic::Ordering; +use std::sync::Arc; + +use crate::Actor; +use crate::ActorProcessingErr; +use crate::ActorRef; +use tokio::sync::Notify; + +use crate::factory::queues::Priority; +use crate::factory::queues::PriorityManager; +use crate::factory::queues::StandardPriority; +use crate::factory::*; + +type TestKey = StandardPriority; + +#[derive(Debug)] +enum TestMessage { + /// Doh'k + #[allow(dead_code)] + Count(u16), +} +#[cfg(feature = "cluster")] +impl crate::Message for TestMessage {} + +struct TestWorker { + counters: [Arc; 5], + signal: Arc, +} + +struct TestPriorityManager; + +impl PriorityManager for TestPriorityManager { + fn get_priority(&self, job: &StandardPriority) -> Option { + Some(*job) + } + fn is_discardable(&self, _job: &StandardPriority) -> bool { + true + } +} + +#[cfg_attr(feature = "async-trait", crate::async_trait)] +impl Actor for TestWorker { + type Msg = WorkerMessage; + type State = (Self::Arguments, u16); + type Arguments = WorkerStartContext; + + async fn pre_start( + &self, + _myself: ActorRef, + startup_context: Self::Arguments, + ) -> Result { + Ok((startup_context, 0)) + } + + async fn handle( + &self, + _myself: ActorRef, + message: Self::Msg, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + match message { + WorkerMessage::FactoryPing(time) => { + state + .0 + .factory + .cast(FactoryMessage::WorkerPong(state.0.wid, time.elapsed()))?; + } + WorkerMessage::Dispatch(Job { + key, + msg: TestMessage::Count(count), + .. + }) => { + self.counters[key.get_index()].fetch_add(count, Ordering::Relaxed); + + // job finished, on success or err we report back to the factory + state + .0 + .factory + .cast(FactoryMessage::Finished(state.0.wid, key))?; + + state.1 += 1; + if state.1 == 5 { + self.signal.notify_one(); + // wait to be notified back + self.signal.notified().await; + // reset the counter + state.1 = 0; + } + } + } + Ok(()) + } +} + +struct TestWorkerBuilder { + counters: [Arc; 5], + signal: Arc, +} + +impl WorkerBuilder for TestWorkerBuilder { + fn build(&self, _wid: usize) -> (TestWorker, ()) { + ( + TestWorker { + counters: self.counters.clone(), + signal: self.signal.clone(), + }, + (), + ) + } +} + +#[crate::concurrency::test] +#[tracing_test::traced_test] +async fn test_basic_priority_queueing() { + // Setup + // a counter for each priority + let counters = [ + Arc::new(AtomicU16::new(0)), + Arc::new(AtomicU16::new(0)), + Arc::new(AtomicU16::new(0)), + Arc::new(AtomicU16::new(0)), + Arc::new(AtomicU16::new(0)), + ]; + let signal = Arc::new(Notify::new()); + + let factory_definition = Factory::< + TestKey, + TestMessage, + (), + TestWorker, + routing::QueuerRouting, + queues::PriorityQueue< + TestKey, + TestMessage, + StandardPriority, + TestPriorityManager, + { StandardPriority::size() }, + >, + >::default(); + let (factory, factory_handle) = Actor::spawn( + None, + factory_definition, + FactoryArguments { + num_initial_workers: 1, + queue: queues::PriorityQueue::new(TestPriorityManager), + router: Default::default(), + capacity_controller: None, + dead_mans_switch: None, + discard_handler: None, + discard_settings: DiscardSettings::None, + lifecycle_hooks: None, + worker_builder: Box::new(TestWorkerBuilder { + counters: counters.clone(), + signal: signal.clone(), + }), + collect_worker_stats: false, + }, + ) + .await + .expect("Failed to spawn factory"); + + // Act + // Send 5 high pri and 5 low pri messages to the factory. Only the high pri should + // be serviced before the notifier is triggered + let pri = StandardPriority::Highest; + for _i in 0..5 { + factory + .cast(FactoryMessage::Dispatch(Job { + key: pri, + msg: TestMessage::Count(1), + options: JobOptions::default(), + })) + .expect("Failed to send to factory"); + } + let pri = StandardPriority::BestEffort; + for _i in 0..5 { + factory + .cast(FactoryMessage::Dispatch(Job { + key: pri, + msg: TestMessage::Count(1), + options: JobOptions::default(), + })) + .expect("Failed to send to factory"); + } + + // wait for the factory to signal + signal.notified().await; + + // check the counters + let hpc = counters[0].load(Ordering::Relaxed); + let lpc = counters[4].load(Ordering::Relaxed); + assert_eq!(hpc, 5); + assert_eq!(lpc, 0); + + // tell the factory to continue + signal.notify_one(); + + // wait for the next batch to complete + signal.notified().await; + signal.notify_one(); + + let hpc = counters[0].load(Ordering::Relaxed); + let lpc = counters[4].load(Ordering::Relaxed); + assert_eq!(hpc, 5); + assert_eq!(lpc, 5); + + // Cleanup + // wait for factory termination + factory.stop(None); + factory_handle.await.unwrap(); +} diff --git a/ractor/src/factory/tests/worker_lifecycle.rs b/ractor/src/factory/tests/worker_lifecycle.rs index 415d4c76..a065a47c 100644 --- a/ractor/src/factory/tests/worker_lifecycle.rs +++ b/ractor/src/factory/tests/worker_lifecycle.rs @@ -7,7 +7,6 @@ use std::sync::atomic::AtomicU16; use std::sync::atomic::Ordering; use std::sync::Arc; -use crate::common_test::periodic_check; use crate::concurrency::sleep; use crate::concurrency::Duration; use crate::Actor; @@ -26,15 +25,14 @@ enum MyWorkerMessage { Increment, Boom, } - #[cfg(feature = "cluster")] -impl Message for MyWorkerMessage {} +impl crate::Message for MyWorkerMessage {} #[cfg_attr(feature = "async-trait", crate::async_trait)] impl Actor for MyWorker { type State = Self::Arguments; type Msg = WorkerMessage<(), MyWorkerMessage>; - type Arguments = WorkerStartContext<(), MyWorkerMessage>; + type Arguments = WorkerStartContext<(), MyWorkerMessage, ()>; async fn pre_start( &self, @@ -85,11 +83,14 @@ struct MyWorkerBuilder { counter: Arc, } -impl WorkerBuilder for MyWorkerBuilder { - fn build(&self, _wid: WorkerId) -> MyWorker { - MyWorker { - counter: self.counter.clone(), - } +impl WorkerBuilder for MyWorkerBuilder { + fn build(&self, _wid: crate::factory::WorkerId) -> (MyWorker, ()) { + ( + MyWorker { + counter: self.counter.clone(), + }, + (), + ) } } @@ -100,17 +101,35 @@ async fn test_worker_death_restarts_and_gets_next_message() { let worker_builder = MyWorkerBuilder { counter: counter.clone(), }; - let factory_definition = Factory::<(), MyWorkerMessage, MyWorker> { - worker_count: 1, - routing_mode: RoutingMode::Queuer, - discard_threshold: Some(10), - ..Default::default() - }; - - let (factory, factory_handle) = - Actor::spawn(None, factory_definition, Box::new(worker_builder)) - .await - .expect("Failed to spawn factory"); + let factory_definition = Factory::< + (), + MyWorkerMessage, + (), + MyWorker, + routing::RoundRobinRouting<(), MyWorkerMessage>, + queues::DefaultQueue<(), MyWorkerMessage>, + >::default(); + let (factory, factory_handle) = Actor::spawn( + None, + factory_definition, + FactoryArguments { + num_initial_workers: 1, + queue: Default::default(), + router: Default::default(), + capacity_controller: None, + dead_mans_switch: None, + discard_handler: None, + discard_settings: DiscardSettings::Static { + limit: 10, + mode: DiscardMode::Newest, + }, + lifecycle_hooks: None, + worker_builder: Box::new(worker_builder), + collect_worker_stats: false, + }, + ) + .await + .expect("Failed to spawn factory"); // Now we need to send a specific sequence of events // first make the worker busy so we can be sure the "boom" job is enqueued @@ -142,10 +161,10 @@ async fn test_worker_death_restarts_and_gets_next_message() { })) .expect("Failed to send message to factory"); } - - periodic_check( + // now wait for everything to propogate + crate::periodic_check( || counter.load(Ordering::Relaxed) == 5, - Duration::from_secs(2), + Duration::from_secs(1), ) .await; diff --git a/ractor/src/factory/worker.rs b/ractor/src/factory/worker.rs index a370a068..b5961e43 100644 --- a/ractor/src/factory/worker.rs +++ b/ractor/src/factory/worker.rs @@ -6,17 +6,59 @@ //! Factory worker properties use std::collections::{HashMap, VecDeque}; +use std::sync::Arc; use crate::concurrency::{Duration, Instant, JoinHandle}; -use crate::{ActorId, ActorProcessingErr}; +use crate::{Actor, ActorId, ActorProcessingErr}; use crate::{ActorRef, Message, MessagingErr}; +use super::discard::{DiscardMode, WorkerDiscardSettings}; use super::stats::MessageProcessingStats; use super::FactoryMessage; use super::Job; use super::JobKey; use super::WorkerId; -use super::{DiscardHandler, JobOptions}; +use super::{DiscardHandler, DiscardReason, JobOptions}; + +/// The configuration for the dead-man's switch functionality +pub struct DeadMansSwitchConfiguration { + /// Duration before determining worker is stuck + pub detection_timeout: Duration, + /// Flag denoting if the stuck worker should be killed + /// and restarted + pub kill_worker: bool, +} + +/// The [super::Factory] is responsible for spawning workers +/// and re-spawning workers under failure scenarios. This means that +/// it needs to understand how to build workers. The [WorkerBuilder] +/// trait is used by the factory to construct new workers when needed. +pub trait WorkerBuilder: Send + Sync +where + TWorker: Actor, + TWorkerStart: Message, +{ + /// Build a new worker + /// + /// * `wid`: The worker's "id" or index in the worker pool + /// + /// Returns a tuple of the worker and a custom startup definition giving the worker + /// owned control of some structs that it may need to work. + fn build(&self, wid: WorkerId) -> (TWorker, TWorkerStart); +} + +/// Controls the size of the worker pool by dynamically growing/shrinking the pool +/// to requested size +#[cfg_attr(feature = "async-trait", crate::async_trait)] +pub trait WorkerCapacityController: 'static + Send + Sync { + /// Retrieve the new pool size + /// + /// * `current` - The current pool size + /// + /// Returns the "new" pool size. If returns 0, adjustment will be + /// ignored + async fn get_pool_size(&mut self, current: usize) -> usize; +} /// Message to a worker pub enum WorkerMessage @@ -44,16 +86,20 @@ where } /// Startup context data (`Arguments`) which are passed to a worker on start -pub struct WorkerStartContext +pub struct WorkerStartContext where TKey: JobKey, TMsg: Message, + TCustomStart: Message, { /// The worker's identifier pub wid: WorkerId, /// The factory the worker belongs to pub factory: ActorRef>, + + /// Custom startup arguments to the worker + pub custom_start: TCustomStart, } /// Properties of a worker @@ -65,22 +111,23 @@ where /// Worker identifier pub(crate) wid: WorkerId, - /// Worker's capacity for parallel work - capacity: usize, - /// Worker actor pub(crate) actor: ActorRef>, + /// The join handle for the worker + handle: Option>, + /// Worker's message queue message_queue: VecDeque>, + /// Maximum queue length. Any job arriving when the queue is at its max length /// will cause an oldest job at the head of the queue will be dropped. /// - /// Default is disabled - discard_threshold: Option, + /// Default is [WorkerDiscardSettings::None] + discard_settings: WorkerDiscardSettings, /// A function to be called for each job to be dropped. - discard_handler: Option>>, + discard_handler: Option>>, /// Flag indicating if this worker has a ping currently pending is_ping_pending: bool, @@ -91,8 +138,8 @@ where /// Current pending jobs dispatched to the worker (for tracking stats) curr_jobs: HashMap, - /// The join handle for the worker - handle: Option>, + /// Flag indicating if this worker is currently "draining" work due to resizing + pub(crate) is_draining: bool, } impl WorkerProperties @@ -101,10 +148,13 @@ where TMsg: Message, { fn get_next_non_expired_job(&mut self) -> Option> { - while let Some(job) = self.message_queue.pop_front() { + while let Some(mut job) = self.message_queue.pop_front() { if !job.is_expired() { return Some(job); } else { + if let Some(handler) = &self.discard_handler { + handler.discard(DiscardReason::TtlExpired, &mut job); + } self.stats.job_ttl_expired(); } } @@ -114,9 +164,8 @@ where pub(crate) fn new( wid: WorkerId, actor: ActorRef>, - capacity: usize, - discard_threshold: Option, - discard_handler: Option>>, + discard_settings: WorkerDiscardSettings, + discard_handler: Option>>, collect_stats: bool, handle: JoinHandle<()>, ) -> Self { @@ -126,18 +175,22 @@ where } Self { actor, + discard_settings, discard_handler, - discard_threshold, message_queue: VecDeque::new(), curr_jobs: HashMap::new(), wid, - capacity, is_ping_pending: false, stats, handle: Some(handle), + is_draining: false, } } + pub(crate) fn get_join_handle(&mut self) -> Option> { + self.handle.take() + } + pub(crate) fn is_pid(&self, pid: ActorId) -> bool { self.actor.get_id() == pid } @@ -166,12 +219,12 @@ where Ok(()) } - pub(crate) fn get_handle(&mut self) -> Option> { - self.handle.take() + pub(crate) fn is_available(&self) -> bool { + self.curr_jobs.is_empty() } - pub(crate) fn is_available(&self) -> bool { - self.curr_jobs.len() < self.capacity + pub(crate) fn is_working(&self) -> bool { + !self.curr_jobs.is_empty() } /// Denotes if the worker is stuck (i.e. unable to complete it's current job) @@ -199,7 +252,18 @@ where // track per-job statistics self.stats.job_submitted(); - if self.curr_jobs.len() < self.capacity { + if let Some((limit, DiscardMode::Newest)) = self.discard_settings.get_limit_and_mode() { + if limit > 0 && self.message_queue.len() >= limit { + // Discard THIS job as it's the newest one + if let Some(handler) = &self.discard_handler { + handler.discard(DiscardReason::Loadshed, &mut job); + } + return Ok(()); + } + } + + // if the job isn't front-load shedded, it's "accepted" + if self.curr_jobs.is_empty() { self.curr_jobs.insert(job.key.clone(), job.options.clone()); if let Some(mut older_job) = self.get_next_non_expired_job() { self.message_queue.push_back(job); @@ -212,12 +276,13 @@ where return Ok(()); } self.message_queue.push_back(job); - if let Some(discard_threshold) = self.discard_threshold { - while discard_threshold > 0 && self.message_queue.len() > discard_threshold { - if let Some(discarded) = self.get_next_non_expired_job() { - self.stats.job_discarded(); + + if let Some((limit, DiscardMode::Oldest)) = self.discard_settings.get_limit_and_mode() { + // load-shed the OLDEST jobs + while limit > 0 && self.message_queue.len() > limit { + if let Some(mut discarded) = self.get_next_non_expired_job() { if let Some(handler) = &self.discard_handler { - handler.discard(discarded); + handler.discard(DiscardReason::Loadshed, &mut discarded); } } } @@ -239,7 +304,8 @@ where } /// Comes back when a ping went out - pub(crate) fn ping_received(&mut self, time: Duration) { + pub(crate) fn ping_received(&mut self, time: Duration, discard_limit: usize) { + self.discard_settings.update_worker_limit(discard_limit); if self.stats.ping_received(time) { // TODO log metrics ? Should be configurable on the factory level } @@ -262,4 +328,9 @@ where Ok(options) } + + /// Set the draining status of the worker + pub(crate) fn set_draining(&mut self, is_draining: bool) { + self.is_draining = is_draining; + } } diff --git a/ractor/src/lib.rs b/ractor/src/lib.rs index 1a2ffbc2..bedf8179 100644 --- a/ractor/src/lib.rs +++ b/ractor/src/lib.rs @@ -155,8 +155,11 @@ pub mod actor; #[cfg(test)] pub(crate) mod common_test; +#[cfg(test)] +pub use common_test::*; pub mod concurrency; pub mod errors; +#[cfg(feature = "async-trait")] pub mod factory; pub mod macros; pub mod message; @@ -168,6 +171,9 @@ pub mod rpc; pub mod serialization; pub mod time; +#[cfg(not(feature = "async-trait"))] +use strum as _; + // ======================== Test Modules and blind imports ======================== // #[cfg(test)] diff --git a/ractor_cluster/Cargo.toml b/ractor_cluster/Cargo.toml index df703dc5..42cec783 100644 --- a/ractor_cluster/Cargo.toml +++ b/ractor_cluster/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ractor_cluster" -version = "0.9.8" +version = "0.10.0" authors = ["Sean Lawlor", "Evan Au", "Dillon George"] description = "Distributed cluster environment of Ractor actors" documentation = "https://docs.rs/ractor" @@ -23,8 +23,8 @@ prost-build = { version = "0.12" } bytes = { version = "1" } prost = { version = "0.12" } prost-types = { version = "0.12" } -ractor = { version = "0.9.0", features = ["cluster"], path = "../ractor" } -ractor_cluster_derive = { version = "0.9.0", path = "../ractor_cluster_derive" } +ractor = { version = "0.10.0", features = ["cluster"], path = "../ractor" } +ractor_cluster_derive = { version = "0.10.0", path = "../ractor_cluster_derive" } rand = "0.8" sha2 = "0.10" tokio = { version = "1", features = ["rt", "time", "sync", "macros", "net", "io-util", "tracing"]} diff --git a/ractor_cluster/src/remote_actor/tests.rs b/ractor_cluster/src/remote_actor/tests.rs index e8f00ad1..39b8ec63 100644 --- a/ractor_cluster/src/remote_actor/tests.rs +++ b/ractor_cluster/src/remote_actor/tests.rs @@ -80,14 +80,14 @@ async fn remote_actor_serialized_message_handling() { .await; assert!(call_output.is_ok()); assert_eq!(1, remote_actor_state.message_tag); - assert!(remote_actor_state.pending_requests.get(&1).is_some()); + assert!(remote_actor_state.pending_requests.contains_key(&1)); let reply = SerializedMessage::CallReply(1, vec![3, 4, 5]); let reply_output = remote_actor_instance .handle_serialized(remote_actor_ref.clone(), reply, &mut remote_actor_state) .await; assert!(reply_output.is_ok()); - assert!(remote_actor_state.pending_requests.get(&1).is_none()); + assert!(!remote_actor_state.pending_requests.contains_key(&1)); // cleanup remote_actor_ref.stop(None); diff --git a/ractor_cluster_derive/Cargo.toml b/ractor_cluster_derive/Cargo.toml index ba06558e..88e47f32 100644 --- a/ractor_cluster_derive/Cargo.toml +++ b/ractor_cluster_derive/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ractor_cluster_derive" -version = "0.9.8" +version = "0.10.0" authors = ["Sean Lawlor "] description = "Derives for ractor_cluster" license = "MIT"