Skip to content

Commit

Permalink
Add support for draining an actor's message queue.
Browse files Browse the repository at this point in the history
This is done by multiplexing the low-pri messaging port and when an internal draining message is received, we've emptied the actor's queue and can shut down.
  • Loading branch information
slawlor committed Oct 4, 2024
1 parent 7f6e8f6 commit dbeb602
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 18 deletions.
19 changes: 14 additions & 5 deletions ractor/src/actor/actor_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ use std::sync::Arc;
#[cfg(feature = "async-std")]
use futures::FutureExt;

use super::actor_properties::MuxedMessage;
use super::messages::{Signal, StopMessage};
use super::SupervisionEvent;
use crate::actor::actor_properties::ActorProperties;
use crate::concurrency::{MpscUnboundedReceiver as InputPortReceiver, OneshotReceiver};
use crate::errors::MessagingErr;
use crate::message::BoxedMessage;
#[cfg(feature = "cluster")]
use crate::message::SerializedMessage;
use crate::RactorErr;
Expand All @@ -39,10 +39,12 @@ pub enum ActorStatus {
Running = 2u8,
/// Upgrading
Upgrading = 3u8,
/// Draining
Draining = 4u8,
/// Stopping
Stopping = 4u8,
Stopping = 5u8,
/// Dead
Stopped = 5u8,
Stopped = 6u8,
}

/// Actor states where operations can continue to interact with an agent
Expand All @@ -61,7 +63,7 @@ pub(crate) struct ActorPortSet {
/// The inner supervisor port
pub(crate) supervisor_rx: InputPortReceiver<SupervisionEvent>,
/// The inner message port
pub(crate) message_rx: InputPortReceiver<BoxedMessage>,
pub(crate) message_rx: InputPortReceiver<MuxedMessage>,
}

impl Drop for ActorPortSet {
Expand Down Expand Up @@ -89,7 +91,7 @@ pub(crate) enum ActorPortMessage {
/// A supervision message
Supervision(SupervisionEvent),
/// A regular message
Message(BoxedMessage),
Message(MuxedMessage),
}

impl ActorPortSet {
Expand Down Expand Up @@ -462,6 +464,13 @@ impl ActorCell {
self.inner.send_message::<TMessage>(message)
}

/// Drain the actor's message queue and when finished processing, terminate the actor.
///
/// Any messages received after the drain marker but prior to shutdown will be rejected
pub fn drain(&self) -> Result<(), MessagingErr<()>> {
self.inner.drain()
}

/// Send a serialized binary message to the actor.
///
/// * `message` - The message to send
Expand Down
55 changes: 44 additions & 11 deletions ractor/src/actor/actor_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,16 @@ use crate::concurrency::{
use crate::message::BoxedMessage;
#[cfg(feature = "cluster")]
use crate::message::SerializedMessage;
use crate::{concurrency as mpsc, Message};
use crate::{concurrency as mpsc, Message, ACTIVE_STATES};
use crate::{Actor, ActorId, ActorName, ActorStatus, MessagingErr, Signal, SupervisionEvent};

/// A muxed-message wrapper which allows the message port to receive either a message or a drain
/// request which is a point-in-time marker that the actor's input channel should be drained
pub(crate) enum MuxedMessage {
Drain,
Message(BoxedMessage),
}

// The inner-properties of an Actor
pub(crate) struct ActorProperties {
pub(crate) id: ActorId,
Expand All @@ -27,7 +34,7 @@ pub(crate) struct ActorProperties {
pub(crate) signal: Mutex<Option<OneshotInputPort<Signal>>>,
pub(crate) stop: Mutex<Option<OneshotInputPort<StopMessage>>>,
pub(crate) supervision: InputPort<SupervisionEvent>,
pub(crate) message: InputPort<BoxedMessage>,
pub(crate) message: InputPort<MuxedMessage>,
pub(crate) tree: SupervisionTree,
pub(crate) type_id: std::any::TypeId,
#[cfg(feature = "cluster")]
Expand All @@ -42,7 +49,7 @@ impl ActorProperties {
OneshotReceiver<Signal>,
OneshotReceiver<StopMessage>,
InputPortReceiver<SupervisionEvent>,
InputPortReceiver<BoxedMessage>,
InputPortReceiver<MuxedMessage>,
)
where
TActor: Actor,
Expand All @@ -58,7 +65,7 @@ impl ActorProperties {
OneshotReceiver<Signal>,
OneshotReceiver<StopMessage>,
InputPortReceiver<SupervisionEvent>,
InputPortReceiver<BoxedMessage>,
InputPortReceiver<MuxedMessage>,
)
where
TActor: Actor,
Expand Down Expand Up @@ -90,18 +97,19 @@ impl ActorProperties {
}

pub fn get_status(&self) -> ActorStatus {
match self.status.load(Ordering::Relaxed) {
match self.status.load(Ordering::SeqCst) {
0u8 => ActorStatus::Unstarted,
1u8 => ActorStatus::Starting,
2u8 => ActorStatus::Running,
3u8 => ActorStatus::Upgrading,
4u8 => ActorStatus::Stopping,
4u8 => ActorStatus::Draining,
5u8 => ActorStatus::Stopping,
_ => ActorStatus::Stopped,
}
}

pub fn set_status(&self, status: ActorStatus) {
self.status.store(status as u8, Ordering::Relaxed);
self.status.store(status as u8, Ordering::SeqCst);
}

pub fn send_signal(&self, signal: Signal) -> Result<(), MessagingErr<()>> {
Expand Down Expand Up @@ -131,12 +139,34 @@ impl ActorProperties {
return Err(MessagingErr::InvalidActorType);
}

if !ACTIVE_STATES.contains(&self.get_status()) {
return Err(MessagingErr::SendErr(message));
}

let boxed = message
.box_message(&self.id)
.map_err(|_e| MessagingErr::InvalidActorType)?;
self.message
.send(boxed)
.map_err(|e| MessagingErr::SendErr(TMessage::from_boxed(e.0).unwrap()))
.send(MuxedMessage::Message(boxed))
.map_err(|e| match e.0 {
MuxedMessage::Message(m) => MessagingErr::SendErr(TMessage::from_boxed(m).unwrap()),
_ => panic!("Expected a boxed message but got a drain message"),
})
}

pub fn drain(&self) -> Result<(), MessagingErr<()>> {
let _ = self
.status
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |f| {
if f < (ActorStatus::Stopping as u8) {
Some(ActorStatus::Draining as u8)
} else {
None
}
});
self.message
.send(MuxedMessage::Drain)
.map_err(|_| MessagingErr::SendErr(()))
}

#[cfg(feature = "cluster")]
Expand All @@ -149,8 +179,11 @@ impl ActorProperties {
serialized_msg: Some(message),
};
self.message
.send(boxed)
.map_err(|e| MessagingErr::SendErr(e.0.serialized_msg.unwrap()))
.send(MuxedMessage::Message(boxed))
.map_err(|e| match e.0 {
MuxedMessage::Message(m) => MessagingErr::SendErr(m.serialized_msg.unwrap()),
_ => panic!("Expected a boxed message but got a drain message"),
})
}

pub fn send_stop(&self, reason: Option<String>) -> Result<(), MessagingErr<StopMessage>> {
Expand Down
8 changes: 7 additions & 1 deletion ractor/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
use std::future::Future;
use std::panic::AssertUnwindSafe;

use actor_properties::MuxedMessage;
use futures::TryFutureExt;

use crate::concurrency::JoinHandle;
Expand Down Expand Up @@ -880,7 +881,7 @@ where
}
}
}
actor_cell::ActorPortMessage::Message(msg) => {
actor_cell::ActorPortMessage::Message(MuxedMessage::Message(msg)) => {
let future = Self::handle_message(myself.clone(), state, handler, msg);
match ports.run_with_signal(future).await {
Ok(Ok(())) => Ok(ActorLoopResult::ok()),
Expand All @@ -890,6 +891,11 @@ where
}
}
}
actor_cell::ActorPortMessage::Message(MuxedMessage::Drain) => {
// Drain is a stub marker that the actor should now stop, we've processed
// all the messages and we want the actor to die now
Ok(ActorLoopResult::stop(Some("Drained".to_string())))
}
},
Err(MessagingErr::ChannelClosed) => {
// one of the channels is closed, this means
Expand Down
65 changes: 64 additions & 1 deletion ractor/src/actor/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//! General tests, more logic-specific tests are contained in sub-modules
use std::sync::{
atomic::{AtomicU8, Ordering},
atomic::{AtomicU32, AtomicU8, Ordering},
Arc,
};

Expand Down Expand Up @@ -1058,3 +1058,66 @@ async fn actor_post_stop_executed_before_stop_and_wait_returns() {

handle.await.unwrap();
}

#[crate::concurrency::test]
#[tracing_test::traced_test]
async fn actor_drain_messages() {
struct TestActor {
signal: Arc<AtomicU32>,
}

#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestActor {
type Msg = EmptyMessage;
type Arguments = ();
type State = ();

async fn pre_start(
&self,
_this_actor: crate::ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}

async fn handle(
&self,
_: ActorRef<Self::Msg>,
_: Self::Msg,
_: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
sleep(Duration::from_millis(10)).await;
let _ = self.signal.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}

let signal = Arc::new(AtomicU32::new(0));
let (actor, handle) = Actor::spawn(
None,
TestActor {
signal: signal.clone(),
},
(),
)
.await
.expect("Failed to spawn test actor");

for _ in 0..1000 {
actor
.cast(EmptyMessage)
.expect("Failed to send message to actor");
}

assert!(signal.load(Ordering::SeqCst) < 1000);

actor.drain().expect("Failed to trigger actor draining");

// future cast's fail after draining triggered
assert!(actor.cast(EmptyMessage).is_err());

// wait for drain complete
handle.await.unwrap();

assert_eq!(1000, signal.load(Ordering::SeqCst));
}
7 changes: 7 additions & 0 deletions ractor/src/factory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,13 @@ where
/// Notify the factory that it's being drained, and to finish jobs
/// currently in the queue, but discard new work, and once drained
/// exit
///
/// NOTE: This is different from draining the actor itself, which allows the
/// pending message queue to flush and then exit. Since the factory
/// holds an internal queue for jobs, it's possible that the internal
/// state still has work to do while the factory's input queue is drained.
/// Therefore in order to propertly drain a factory, you should use the
/// `DrainRequests` version so the internal pending queue is properly flushed.
DrainRequests,
}

Expand Down

0 comments on commit dbeb602

Please sign in to comment.