Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to the factory construct #237

Merged
merged 1 commit into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ractor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor"
version = "0.9.8"
version = "0.10.0"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "A actor framework for Rust"
documentation = "https://docs.rs/ractor"
Expand All @@ -27,7 +27,7 @@ default = ["tokio_runtime", "async-trait"]
dashmap = "5"
futures = "0.3"
once_cell = "1"
rand = "0.8"
strum = { version = "0.26", features = ["derive"] }

## Configurable dependencies
# Tracing feature requires --cfg=tokio_unstable
Expand Down
2 changes: 2 additions & 0 deletions ractor/examples/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
//! cargo run --example counter
//! ```

#![allow(clippy::incompatible_msrv)]

extern crate ractor;

use ractor::{call_t, Actor, ActorProcessingErr, ActorRef, RpcReplyPort};
Expand Down
2 changes: 2 additions & 0 deletions ractor/examples/monte_carlo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
//! cargo run --example monte_carlo
//! ```

#![allow(clippy::incompatible_msrv)]

use std::collections::HashMap;

use ractor::{cast, Actor, ActorId, ActorProcessingErr, ActorRef};
Expand Down
2 changes: 2 additions & 0 deletions ractor/examples/output_port.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
//! cargo run --example output_port
//! ```

#![allow(clippy::incompatible_msrv)]

extern crate ractor;

use std::sync::Arc;
Expand Down
2 changes: 2 additions & 0 deletions ractor/examples/philosophers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
//! cargo run --example philosophers
//! ```

#![allow(clippy::incompatible_msrv)]

use std::collections::{HashMap, VecDeque};

use ractor::{cast, Actor, ActorId, ActorName, ActorProcessingErr, ActorRef, RpcReplyPort};
Expand Down
2 changes: 2 additions & 0 deletions ractor/examples/ping_pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
//! cargo run --example ping_pong
//! ```

#![allow(clippy::incompatible_msrv)]

extern crate ractor;

use ractor::{cast, Actor, ActorProcessingErr, ActorRef};
Expand Down
2 changes: 2 additions & 0 deletions ractor/examples/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
//! cargo run --example supervisor
//! ```

#![allow(clippy::incompatible_msrv)]

use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort, SupervisionEvent};

use tokio::time::Duration;
Expand Down
15 changes: 3 additions & 12 deletions ractor/src/actor/actor_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,16 @@ use super::ActorCell;
///
/// An [ActorRef] is the primary means of communication typically used
/// when interfacing with [super::Actor]s
///
/// The [ActorRef] is SPECIFICALLY marked [Sync], regardless of the message type
/// because all usages of the message type are to send an owned instance of a message
/// and in no case is that message instance shared across threads. This is guaranteed
/// by the underlying Tokio channel usages. Without this manual marking of [Sync] on
/// [ActorRef], we would need to constrain the message type [Message] to be [Sync] which
/// is overly restrictive.
pub struct ActorRef<TMessage> {
pub(crate) inner: ActorCell,
_tactor: PhantomData<TMessage>,
_tactor: PhantomData<fn() -> TMessage>,
}

unsafe impl<T> Sync for ActorRef<T> {}

impl<TMessage> Clone for ActorRef<TMessage> {
fn clone(&self) -> Self {
ActorRef {
inner: self.inner.clone(),
_tactor: PhantomData::<TMessage>,
_tactor: PhantomData,
}
}
}
Expand All @@ -53,7 +44,7 @@ impl<TMessage> From<ActorCell> for ActorRef<TMessage> {
fn from(value: ActorCell) -> Self {
Self {
inner: value,
_tactor: PhantomData::<TMessage>,
_tactor: PhantomData,
}
}
}
Expand Down
151 changes: 151 additions & 0 deletions ractor/src/factory/discard.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Copyright (c) Sean Lawlor
//
// This source code is licensed under both the MIT license found in the
// LICENSE-MIT file in the root directory of this source tree.

//! Discard handler managing when jobs are discarded
use super::Job;
use super::JobKey;
use crate::Message;

/// The discard mode of a factory
#[derive(Eq, PartialEq, Clone, Copy)]
pub enum DiscardMode {
/// Discard oldest incoming jobs under backpressure
Oldest,
/// Discard newest incoming jobs under backpressure
Newest,
}

/// A worker's copy of the discard settings.
///
/// Originally we passed a cloned box of the dynamic calculator to the workers,
/// but what that would do is have every worker re-compute the discard limit
/// which if you have many thousands of workers is kind of useless.
///
/// Instead now we have the workers treat the limit as static, but have the
/// factory compute the limit on it's interval and propagate the limit to the
/// workers. The workers "think" it's static, but the factory handles the dynamics.
/// This way the factory can keep the [DynamicDiscardHandler] as a single, uncloned
/// instance. It also moves NUM_WORKER calculations to 1.
pub(crate) enum WorkerDiscardSettings {
None,
Static { limit: usize, mode: DiscardMode },
}

impl WorkerDiscardSettings {
pub(crate) fn update_worker_limit(&mut self, new_limit: usize) {
if let Self::Static { limit, .. } = self {
*limit = new_limit;
}
}

pub(crate) fn get_limit_and_mode(&self) -> Option<(usize, DiscardMode)> {
match self {
Self::None => None,
Self::Static { limit, mode, .. } => Some((*limit, *mode)),
}
}
}
/// If a factory supports job discarding (loadshedding) it can have a few configurations
/// which are defined in this enum. There is
///
/// 1. No discarding
/// 2. A static queueing limit discarding, with a specific discarding mode
/// 3. A dynamic queueing limit for discards, with a specified discarding mode and init discard limit.
pub enum DiscardSettings {
/// Don't discard jobs
None,
/// A static, immutable limit
Static {
/// The current limit. If 0, means jobs will never queue and immediately be discarded
/// once all workers are busy
limit: usize,
/// Define the factory messaging discard mode denoting if the oldest or newest messages
/// should be discarded in back-pressure scenarios
///
/// Default is [DiscardMode::Oldest], meaning discard jobs at the head of the queue
mode: DiscardMode,
},
/// Dynamic discarding is where the discard limit can change over time, controlled
/// by the `updater` which is an implementation of the [DynamicDiscardController]
Dynamic {
/// The current limit. If 0, means jobs will never queue and immediately be discarded
/// once all workers are busy
limit: usize,
/// Define the factory messaging discard mode denoting if the oldest or newest messages
/// should be discarded in back-pressure scenarios
///
/// Default is [DiscardMode::Oldest], meaning discard jobs at the head of the queue
mode: DiscardMode,
/// The [DynamicDiscardController] implementation, which computes new limits dynamically
/// based on whatever metrics it wants
updater: Box<dyn DynamicDiscardController>,
},
}

impl DiscardSettings {
pub(crate) fn get_worker_settings(&self) -> WorkerDiscardSettings {
match &self {
Self::None => WorkerDiscardSettings::None,
Self::Static { limit, mode } => WorkerDiscardSettings::Static {
limit: *limit,
mode: *mode,
},
Self::Dynamic { limit, mode, .. } => WorkerDiscardSettings::Static {
limit: *limit,
mode: *mode,
},
}
}

/// Retrieve the discarding limit and [DiscardMode], if configured
pub fn get_limit_and_mode(&self) -> Option<(usize, DiscardMode)> {
match self {
Self::None => None,
Self::Static { limit, mode, .. } => Some((*limit, *mode)),
Self::Dynamic { limit, mode, .. } => Some((*limit, *mode)),
}
}
}

/// Controls the dynamic concurrency level by receiving periodic snapshots of job statistics
/// and emitting a new concurrency limit
#[cfg_attr(feature = "async-trait", crate::async_trait)]
pub trait DynamicDiscardController: Send + Sync + 'static {
/// Compute the new threshold for discarding
///
/// If you want to utilize metrics exposed in [crate::modular_factory::stats] you can gather them
/// by utilizing `stats_facebook::service_data::get_service_data_singleton` to retrieve a
/// accessor to `ServiceData` which you can then resolve stats by name (either timeseries or
/// counters)
///
/// The namespace of stats collected on the base controller factory are
/// `base_controller.factory.{FACTORY_NAME}.{STAT}`
///
/// If no factory name is set, then "all" will be inserted
async fn compute(&mut self, current_threshold: usize) -> usize;
}

/// Reason for discarding a job
pub enum DiscardReason {
/// The job TTLd
TtlExpired,
/// The job was rejected or dropped due to loadshedding
Loadshed,
/// The job was dropped due to factory shutting down
Shutdown,
}

/// Trait defining the discard handler for a factory.
pub trait DiscardHandler<TKey, TMsg>: Send + Sync + 'static
where
TKey: JobKey,
TMsg: Message,
{
/// Called on a job prior to being dropped from the factory.
///
/// Useful scenarios are (a) collecting metrics, (b) logging, (c) tearing
/// down resources in the job, etc.
fn discard(&self, reason: DiscardReason, job: &mut Job<TKey, TMsg>);
}
Loading
Loading