Skip to content

Commit

Permalink
fix: Ensure pending messages are delivered when OutputPort is dropped. (
Browse files Browse the repository at this point in the history
#293)

* fix: Ensure pending messages are delivered when OutputPort is dropped.

* chore: cargo fmt
  • Loading branch information
stuartcarnie authored Nov 27, 2024
1 parent 1949d92 commit cd7de35
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 18 deletions.
18 changes: 0 additions & 18 deletions ractor/src/port/output/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,6 @@ where
}
}

impl<TMsg> Drop for OutputPort<TMsg>
where
TMsg: OutputMessage,
{
fn drop(&mut self) {
let mut subs = self.subscriptions.write().unwrap();
for sub in subs.iter_mut() {
sub.stop();
}
subs.clear();
}
}

// ============== Subscription implementation ============== //

/// The output port's subscription handle. It holds a handle to a [JoinHandle]
Expand All @@ -128,11 +115,6 @@ impl OutputPortSubscription {
self.handle.is_finished()
}

/// Stop the subscription, by aborting the underlying [JoinHandle]
pub(crate) fn stop(&mut self) {
self.handle.abort();
}

/// Create a new subscription
pub(crate) fn new<TMsg, F, TReceiverMsg>(
mut port: pubsub::Receiver<Option<TMsg>>,
Expand Down
77 changes: 77 additions & 0 deletions ractor/src/port/output/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,83 @@ async fn test_50_receivers() {
.unwrap();
}

#[crate::concurrency::test]
#[tracing_test::traced_test]
async fn test_delivery() {
struct TestActor;
enum TestActorMessage {
Stop,
}
#[cfg(feature = "cluster")]
impl crate::Message for TestActorMessage {}
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestActor {
type Msg = TestActorMessage;
type Arguments = ();
type State = u8;

async fn pre_start(
&self,
_this_actor: crate::ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
Ok(0u8)
}

async fn handle(
&self,
myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
println!("Test actor received a message");
match message {
Self::Msg::Stop => {
if *state > 3 {
myself.stop(None);
}
}
}
*state += 1;
Ok(())
}
}

let handles: Vec<(ActorRef<TestActorMessage>, JoinHandle<()>)> =
join_all((0..50).map(|_| async move {
Actor::spawn(None, TestActor, ())
.await
.expect("Failed to start test actor")
}))
.await;

let mut actor_refs = vec![];
let mut actor_handles = vec![];
for item in handles.into_iter() {
let (a, b) = item;
actor_refs.push(a);
actor_handles.push(b);
}

let output = OutputPort::<()>::default();
for actor in actor_refs.into_iter() {
output.subscribe(actor, |_| Some(TestActorMessage::Stop));
}

let all_handle = crate::concurrency::spawn(async move { join_all(actor_handles).await });

// send 4 sends, should exit
for _ in 0..5 {
output.send(());
}
drop(output);

timeout(Duration::from_millis(100), all_handle)
.await
.expect("Test actor failed in exit")
.unwrap();
}

#[allow(unused_imports)]
use output_port_subscriber_tests::*;

Expand Down

0 comments on commit cd7de35

Please sign in to comment.