Skip to content

Commit

Permalink
This patch adds a generic stats layer trait that can be implemented t…
Browse files Browse the repository at this point in the history
…o provide whatever custom statistics collection medium downstream use-cases prefer. (#264)

Dynamic dispatch is utilized since we're going to use an `Arc` anyways and given the likelihood of a single implementation, LLVM will likely inline the type anyways. This saves the added generic overhead.
  • Loading branch information
slawlor authored Oct 1, 2024
1 parent f3ae940 commit 7277274
Show file tree
Hide file tree
Showing 16 changed files with 266 additions and 260 deletions.
2 changes: 1 addition & 1 deletion ractor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor"
version = "0.11.2"
version = "0.12.0"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "A actor framework for Rust"
documentation = "https://docs.rs/ractor"
Expand Down
80 changes: 51 additions & 29 deletions ractor/src/factory/factoryimpl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ where
/// for shutdown/startup/draining
pub lifecycle_hooks: Option<Box<dyn FactoryLifecycleHooks<TKey, TMsg>>>,

/// Identifies if the factory should collect statstics around each worker
pub collect_worker_stats: bool,
/// Defines the statistics collection layer for the factory. Useful for tracking factory properties.
pub stats: Option<Arc<dyn FactoryStatsLayer>>,
}

/// Builder for [FactoryArguments] which can be used to build the
Expand Down Expand Up @@ -173,7 +173,7 @@ where
dead_mans_switch: Option<DeadMansSwitchConfiguration>,
capacity_controller: Option<Box<dyn WorkerCapacityController>>,
lifecycle_hooks: Option<Box<dyn FactoryLifecycleHooks<TKey, TMsg>>>,
collect_worker_stats: bool,
stats: Option<Arc<dyn FactoryStatsLayer>>,
}

impl<TKey, TMsg, TWorkerStart, TWorker, TRouter, TQueue>
Expand Down Expand Up @@ -212,7 +212,7 @@ where
dead_mans_switch: None,
capacity_controller: None,
lifecycle_hooks: None,
collect_worker_stats: false,
stats: None,
}
}

Expand All @@ -228,7 +228,7 @@ where
dead_mans_switch,
capacity_controller,
lifecycle_hooks,
collect_worker_stats,
stats,
} = self;
FactoryArguments {
worker_builder,
Expand All @@ -240,7 +240,7 @@ where
dead_mans_switch,
capacity_controller,
lifecycle_hooks,
collect_worker_stats,
stats,
}
}

Expand Down Expand Up @@ -333,6 +333,16 @@ where
..self
}
}

/// Sets the factory's statistics collection implementation
///
/// This can be used to aggregate various statistics about the factory's processing.
pub fn with_stats_collector<TStats: FactoryStatsLayer>(self, stats: TStats) -> Self {
Self {
stats: Some(Arc::new(stats)),
..self
}
}
}

/// State of a factory (backlogged jobs, handler, etc)
Expand All @@ -348,11 +358,11 @@ where
TRouter: Router<TKey, TMsg>,
TQueue: Queue<TKey, TMsg>,
{
factory_name: String,
worker_builder: Box<dyn WorkerBuilder<TWorker, TWorkerStart>>,
pool_size: usize,
pool: HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
stats: MessageProcessingStats,
collect_worker_stats: bool,
stats: Option<Arc<dyn FactoryStatsLayer>>,
router: TRouter,
queue: TQueue,
discard_handler: Option<Arc<dyn DiscardHandler<TKey, TMsg>>>,
Expand Down Expand Up @@ -391,17 +401,19 @@ where
while let Some(true) = self.queue.peek().map(|m| m.is_expired()) {
// remove the job from the queue
if let Some(mut job) = self.queue.pop_front() {
self.stats.job_ttl_expired();
self.stats.job_ttl_expired(&self.factory_name, 1);
if let Some(handler) = &self.discard_handler {
handler.discard(DiscardReason::TtlExpired, &mut job);
}
job.reject();
} else {
break;
}
}

if let Some(worker) = self.pool.get_mut(&worker_hint).filter(|f| f.is_available()) {
if let Some(job) = self.queue.pop_front() {
if let Some(mut job) = self.queue.pop_front() {
job.accept();
worker.enqueue_job(job)?;
}
} else {
Expand All @@ -414,7 +426,8 @@ where
.choose_target_worker(job, self.pool_size, &self.pool)
})
.and_then(|wid| self.pool.get_mut(&wid));
if let (Some(job), Some(worker)) = (self.queue.pop_front(), target_worker) {
if let (Some(mut job), Some(worker)) = (self.queue.pop_front(), target_worker) {
job.accept();
worker.enqueue_job(job)?;
}
}
Expand All @@ -432,17 +445,20 @@ where
if let Some(handler) = &self.discard_handler {
handler.discard(DiscardReason::Loadshed, &mut job);
}
self.stats.job_discarded();
job.reject();
self.stats.job_discarded(&self.factory_name);
} else {
job.accept();
self.queue.push_back(job);
}
}
Some((limit, DiscardMode::Oldest)) => {
job.accept();
self.queue.push_back(job);
while self.queue.len() > limit {
// try and shed a job, of the lowest priority working up
if let Some(mut msg) = self.queue.discard_oldest() {
self.stats.job_discarded();
self.stats.job_discarded(&self.factory_name);
if let Some(handler) = &self.discard_handler {
handler.discard(DiscardReason::Loadshed, &mut msg);
}
Expand All @@ -451,6 +467,7 @@ where
}
None => {
// no load-shedding
job.accept();
self.queue.push_back(job);
}
}
Expand Down Expand Up @@ -485,12 +502,13 @@ where
self.pool.insert(
wid,
WorkerProperties::new(
self.factory_name.clone(),
wid,
worker,
discard_settings,
self.discard_handler.clone(),
self.collect_worker_stats,
handle,
self.stats.clone(),
),
);
}
Expand Down Expand Up @@ -584,9 +602,16 @@ where
fn dispatch(&mut self, mut job: Job<TKey, TMsg>) -> Result<(), ActorProcessingErr> {
// set the time the factory received the message
job.set_factory_time();
self.stats.job_submitted();
self.stats.new_job(&self.factory_name);

if self.drain_state == DrainState::NotDraining {
// Check if TTL has been exceeded prior to trying anything.
if job.is_expired() {
self.stats.job_ttl_expired(&self.factory_name, 1);
if let Some(discard_handler) = &self.discard_handler {
discard_handler.discard(DiscardReason::TtlExpired, &mut job);
}
job.reject();
} else if self.drain_state == DrainState::NotDraining {
if let RouteResult::Backlog(busy_job) =
self.router
.route_message(job, self.pool_size, &mut self.pool)?
Expand All @@ -599,6 +624,7 @@ where
if let Some(discard_handler) = &self.discard_handler {
discard_handler.discard(DiscardReason::Shutdown, &mut job);
}
job.reject();
}
Ok(())
}
Expand All @@ -607,7 +633,7 @@ where
let (is_worker_draining, should_drop_worker) = if let Some(worker) = self.pool.get_mut(&who)
{
if let Some(job_options) = worker.worker_complete(key)? {
self.stats.factory_job_done(&job_options);
self.stats.job_completed(&self.factory_name, &job_options);
}

if worker.is_draining {
Expand Down Expand Up @@ -657,11 +683,9 @@ where
// TTL expired on these items, remove them before even trying to dequeue & distribute them
if self.router.is_factory_queueing() {
let num_removed = self.queue.remove_expired_items(&self.discard_handler);
self.stats.jobs_ttls_expired(num_removed);
self.stats.job_ttl_expired(&self.factory_name, num_removed);
}

self.stats.reset_global_counters();

// schedule next calculation
myself.send_after(CALCULATE_FREQUENCY, || FactoryMessage::Calculate);
Ok(())
Expand All @@ -672,7 +696,7 @@ where
myself: &ActorRef<FactoryMessage<TKey, TMsg>>,
when: Instant,
) -> Result<(), ActorProcessingErr> {
self.stats.ping_received(when.elapsed());
self.stats.factory_ping_received(&self.factory_name, when);

// if we have dyanmic discarding, we update the discard threshold
if let DiscardSettings::Dynamic { limit, updater, .. } = &mut self.discard_settings {
Expand Down Expand Up @@ -782,10 +806,11 @@ where
dead_mans_switch,
capacity_controller,
lifecycle_hooks,
collect_worker_stats,
stats,
}: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
tracing::debug!(factory = ?myself, "Factory starting");
let factory_name = myself.get_name().unwrap_or_else(|| "all".to_string());

// build the pool
let mut pool = HashMap::new();
Expand All @@ -807,12 +832,13 @@ where
pool.insert(
wid,
WorkerProperties::new(
factory_name.clone(),
wid,
worker,
worker_discard_settings,
discard_handler.clone(),
collect_worker_stats,
worker_handle,
stats.clone(),
),
);
}
Expand All @@ -832,6 +858,7 @@ where

// initial state
Ok(FactoryState {
factory_name,
worker_builder,
pool_size: num_initial_workers,
pool,
Expand All @@ -843,12 +870,7 @@ where
lifecycle_hooks,
queue,
router,
stats: {
let mut s = MessageProcessingStats::default();
s.enable();
s
},
collect_worker_stats,
stats,
})
}

Expand Down
45 changes: 43 additions & 2 deletions ractor/src/factory/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use std::fmt::Debug;
use std::{hash::Hash, time::SystemTime};

use crate::RpcReplyPort;
use crate::{concurrency::Duration, Message};

#[cfg(feature = "cluster")]
Expand Down Expand Up @@ -116,6 +117,16 @@ where
/// The job's options, mainly related to timing
/// information of the job
pub options: JobOptions,
/// If provided, this channel can be used to block pushes
/// into the factory until the factory can "accept" the message
/// into its internal processing. This can be used to synchronize
/// external threadpools to the Tokio processing pool and prevent
/// overloading the unbounded channel which fronts all actors.
///
/// The reply channel return [None] if the job was accepted, or
/// [Some(`Job`)] if it was rejected & loadshed, and then the
/// job may be retried by the caller at a later time (if desired).
pub accepted: Option<RpcReplyPort<Option<Self>>>,
}

#[cfg(feature = "cluster")]
Expand Down Expand Up @@ -205,7 +216,12 @@ where
args,
metadata: None,
})?;
Ok(Self { msg, key, options })
Ok(Self {
msg,
key,
options,
accepted: None,
})
}
crate::message::SerializedMessage::Call {
variant,
Expand All @@ -220,7 +236,12 @@ where
reply,
metadata: None,
})?;
Ok(Self { msg, key, options })
Ok(Self {
msg,
key,
options,
accepted: None,
})
}
}
}
Expand Down Expand Up @@ -249,6 +270,20 @@ where
pub(crate) fn set_worker_time(&mut self) {
self.options.worker_time = SystemTime::now();
}

/// Accept the job (telling the submitter that the job was accepted and enqueued to the factory)
pub(crate) fn accept(&mut self) {
if let Some(port) = self.accepted.take() {
let _ = port.send(None);
}
}

/// Reject the job. Consumes the job and returns it to the caller under backpressure scenarios.
pub(crate) fn reject(mut self) {
if let Some(port) = self.accepted.take() {
let _ = port.send(Some(self));
}
}
}

#[cfg(feature = "cluster")]
Expand Down Expand Up @@ -345,11 +380,13 @@ mod tests {
key: TestKey { item: 123 },
msg: TestMessage::A("Hello".to_string()),
options: JobOptions::default(),
accepted: None,
};
let expected_a = TheJob {
key: TestKey { item: 123 },
msg: TestMessage::A("Hello".to_string()),
options: job_a.options.clone(),
accepted: None,
};

let serialized_a = job_a.serialize().expect("Failed to serialize job A");
Expand All @@ -376,11 +413,13 @@ mod tests {
ttl: Some(Duration::from_millis(1000)),
..Default::default()
},
accepted: None,
};
let expected_b = TheJob {
key: TestKey { item: 456 },
msg: TestMessage::B("Hi".to_string(), crate::concurrency::oneshot().0.into()),
options: job_b.options.clone(),
accepted: None,
};
let serialized_b = job_b.serialize().expect("Failed to serialize job B");
let deserialized_b =
Expand All @@ -406,11 +445,13 @@ mod tests {
key: TestKey { item: 123 },
msg: TestMessage::A("Hello".to_string()),
options: JobOptions::default(),
accepted: None,
};
let expected_a = TheJob {
key: TestKey { item: 123 },
msg: TestMessage::A("Hello".to_string()),
options: job_a.options.clone(),
accepted: None,
};

let msg = FactoryMessage::Dispatch(job_a);
Expand Down
Loading

0 comments on commit 7277274

Please sign in to comment.