Skip to content

Commit

Permalink
Migrating to Result<_,_> types for handlers
Browse files Browse the repository at this point in the history
This change migrates changes to using `Result<_,_>` return types for actor handlers, to minimize calling `panic!`s. This additionally passes dyn Errors up to handlers so supervisors can handle extended error information cleanly.

Resolves #33
  • Loading branch information
slawlor committed Jan 27, 2023
1 parent 12db101 commit b2bb1cc
Show file tree
Hide file tree
Showing 44 changed files with 1,319 additions and 389 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

members = [
"ractor",
"ractor-cluster",
"ractor-playground",
"ractor_cluster",
"ractor_playground",
"xtask"
]
23 changes: 17 additions & 6 deletions ractor/benches/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ extern crate criterion;
use criterion::{BatchSize, Criterion};
#[cfg(feature = "cluster")]
use ractor::Message;
use ractor::{Actor, ActorRef};
use ractor::{Actor, ActorProcessingErr, ActorRef};

struct BenchActor;

Expand All @@ -23,12 +23,19 @@ impl Actor for BenchActor {

type State = ();

async fn pre_start(&self, myself: ActorRef<Self>) -> Self::State {
async fn pre_start(&self, myself: ActorRef<Self>) -> Result<Self::State, ActorProcessingErr> {
let _ = myself.cast(BenchActorMessage);
Ok(())
}

async fn handle(&self, myself: ActorRef<Self>, _message: Self::Msg, _state: &mut Self::State) {
async fn handle(
&self,
myself: ActorRef<Self>,
_message: Self::Msg,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
myself.stop(None);
Ok(())
}
}

Expand Down Expand Up @@ -145,23 +152,27 @@ fn process_messages(c: &mut Criterion) {

type State = u64;

async fn pre_start(&self, myself: ActorRef<Self>) -> Self::State {
async fn pre_start(
&self,
myself: ActorRef<Self>,
) -> Result<Self::State, ActorProcessingErr> {
let _ = myself.cast(BenchActorMessage);
0u64
Ok(0u64)
}

async fn handle(
&self,
myself: ActorRef<Self>,
_message: Self::Msg,
state: &mut Self::State,
) {
) -> Result<(), ActorProcessingErr> {
*state += 1;
if *state >= self.num_msgs {
myself.stop(None);
} else {
let _ = myself.cast(BenchActorMessage);
}
Ok(())
}
}

Expand Down
14 changes: 10 additions & 4 deletions ractor/examples/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
extern crate ractor;

use ractor::{call_t, Actor, ActorRef, RpcReplyPort};
use ractor::{call_t, Actor, ActorProcessingErr, ActorRef, RpcReplyPort};

struct Counter;

Expand All @@ -37,12 +37,17 @@ impl Actor for Counter {

type State = CounterState;

async fn pre_start(&self, _myself: ActorRef<Self>) -> Self::State {
async fn pre_start(&self, _myself: ActorRef<Self>) -> Result<Self::State, ActorProcessingErr> {
// create the initial state
CounterState { count: 0 }
Ok(CounterState { count: 0 })
}

async fn handle(&self, _myself: ActorRef<Self>, message: Self::Msg, state: &mut Self::State) {
async fn handle(
&self,
_myself: ActorRef<Self>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
CounterMessage::Increment(how_much) => {
state.count += how_much;
Expand All @@ -56,6 +61,7 @@ impl Actor for Counter {
}
}
}
Ok(())
}
}

Expand Down
26 changes: 19 additions & 7 deletions ractor/examples/monte_carlo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use std::collections::HashMap;

use ractor::{cast, Actor, ActorId, ActorRef};
use ractor::{cast, Actor, ActorId, ActorProcessingErr, ActorRef};
use rand::{thread_rng, Rng};

// ================== Player Actor ================== //
Expand Down Expand Up @@ -68,11 +68,16 @@ impl Actor for Game {

type State = GameState;

async fn pre_start(&self, _myself: ActorRef<Self>) -> Self::State {
GameState::default()
async fn pre_start(&self, _myself: ActorRef<Self>) -> Result<Self::State, ActorProcessingErr> {
Ok(GameState::default())
}

async fn handle(&self, myself: ActorRef<Self>, message: Self::Msg, state: &mut Self::State) {
async fn handle(
&self,
myself: ActorRef<Self>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
if state.current_round <= state.total_rounds {
state.current_round += 1;
match Self::State::roll_dice() {
Expand All @@ -95,6 +100,7 @@ impl Actor for Game {
// Because the `GameManager` is monitoring this actor we can stop this actor
myself.stop(None);
}
Ok(())
}
}

Expand Down Expand Up @@ -136,7 +142,7 @@ impl Actor for GameManager {

type State = GameManagerState;

async fn pre_start(&self, myself: ActorRef<Self>) -> Self::State {
async fn pre_start(&self, myself: ActorRef<Self>) -> Result<Self::State, ActorProcessingErr> {
// This is the first code that will run in the actor. It spawns the Game actors,
// registers them to its monitoring list, then sends them a message indicating
// that they should start their games.
Expand All @@ -153,10 +159,15 @@ impl Actor for GameManager {
cast!(actor, GameMessage(myself.clone())).expect("Failed to send message");
}

GameManagerState::new(self.num_games)
Ok(GameManagerState::new(self.num_games))
}

async fn handle(&self, myself: ActorRef<Self>, message: Self::Msg, state: &mut Self::State) {
async fn handle(
&self,
myself: ActorRef<Self>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
state.results.insert(message.id, message.results);

state.games_finished += 1;
Expand All @@ -176,6 +187,7 @@ impl Actor for GameManager {

myself.stop(None);
}
Ok(())
}
}

Expand Down
26 changes: 21 additions & 5 deletions ractor/examples/output_port.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ extern crate ractor;

use std::sync::Arc;

use ractor::{Actor, ActorRef, OutputPort};
use ractor::{Actor, ActorProcessingErr, ActorRef, OutputPort};
use tokio::time::{timeout, Duration};

enum PublisherMessage {
Expand All @@ -39,15 +39,23 @@ impl Actor for Publisher {

type State = ();

async fn pre_start(&self, _myself: ActorRef<Self>) -> Self::State {}
async fn pre_start(&self, _myself: ActorRef<Self>) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}

async fn handle(&self, _myself: ActorRef<Self>, message: Self::Msg, _state: &mut Self::State) {
async fn handle(
&self,
_myself: ActorRef<Self>,
message: Self::Msg,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
Self::Msg::Publish(msg) => {
println!("Publishing {msg}");
self.output.send(Output(format!("Published: {msg}")));
}
}
Ok(())
}
}

Expand All @@ -65,14 +73,22 @@ impl Actor for Subscriber {

type State = ();

async fn pre_start(&self, _myself: ActorRef<Self>) -> Self::State {}
async fn pre_start(&self, _myself: ActorRef<Self>) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}

async fn handle(&self, myself: ActorRef<Self>, message: Self::Msg, _state: &mut Self::State) {
async fn handle(
&self,
myself: ActorRef<Self>,
message: Self::Msg,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
Self::Msg::Published(msg) => {
println!("Subscriber ({myself:?}) received published message '{msg}'");
}
}
Ok(())
}
}

Expand Down
28 changes: 20 additions & 8 deletions ractor/examples/philosophers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use std::collections::{HashMap, VecDeque};

use ractor::{cast, Actor, ActorId, ActorName, ActorRef, RpcReplyPort};
use ractor::{cast, Actor, ActorId, ActorName, ActorProcessingErr, ActorRef, RpcReplyPort};
use tokio::time::{Duration, Instant};

// ============================ Fork Actor ============================ //
Expand Down Expand Up @@ -113,15 +113,20 @@ impl Fork {
impl Actor for Fork {
type Msg = ForkMessage;
type State = ForkState;
async fn pre_start(&self, _myself: ActorRef<Self>) -> Self::State {
Self::State {
async fn pre_start(&self, _myself: ActorRef<Self>) -> Result<Self::State, ActorProcessingErr> {
Ok(Self::State {
clean: false,
owned_by: None,
backlog: VecDeque::new(),
}
})
}

async fn handle(&self, myself: ActorRef<Self>, message: Self::Msg, state: &mut Self::State) {
async fn handle(
&self,
myself: ActorRef<Self>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
let mut maybe_unhandled = self.handle_internal(&myself, message, state);
if let Some(message) = maybe_unhandled {
state.backlog.push_back(message);
Expand All @@ -136,6 +141,7 @@ impl Actor for Fork {
state.backlog.push_front(msg);
}
}
Ok(())
}
}

Expand Down Expand Up @@ -302,13 +308,18 @@ impl Philosopher {
impl Actor for Philosopher {
type Msg = PhilosopherMessage;
type State = PhilosopherState;
async fn pre_start(&self, myself: ActorRef<Self>) -> Self::State {
async fn pre_start(&self, myself: ActorRef<Self>) -> Result<Self::State, ActorProcessingErr> {
// initialize the simulation by making the philosopher's hungry
let _ = cast!(myself, Self::Msg::BecomeHungry(0));
Self::State::new(self.left.clone(), self.right.clone())
Ok(Self::State::new(self.left.clone(), self.right.clone()))
}

async fn handle(&self, myself: ActorRef<Self>, message: Self::Msg, state: &mut Self::State) {
async fn handle(
&self,
myself: ActorRef<Self>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
PhilosopherMessage::SendMetrics(reply) => {
let _ = reply.send(state.metrics.clone());
Expand Down Expand Up @@ -418,6 +429,7 @@ impl Actor for Philosopher {
}
}
}
Ok(())
}
}

Expand Down
14 changes: 10 additions & 4 deletions ractor/examples/ping_pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
extern crate ractor;

use ractor::{cast, Actor, ActorRef};
use ractor::{cast, Actor, ActorProcessingErr, ActorRef};

pub struct PingPong;

Expand Down Expand Up @@ -48,14 +48,19 @@ impl Actor for PingPong {

type State = u8;

async fn pre_start(&self, myself: ActorRef<Self>) -> Self::State {
async fn pre_start(&self, myself: ActorRef<Self>) -> Result<Self::State, ActorProcessingErr> {
// startup the event processing
cast!(myself, Message::Ping).unwrap();
// create the initial state
0u8
Ok(0u8)
}

async fn handle(&self, myself: ActorRef<Self>, message: Self::Msg, state: &mut Self::State) {
async fn handle(
&self,
myself: ActorRef<Self>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
if *state < 10u8 {
message.print();
cast!(myself, message.next()).unwrap();
Expand All @@ -64,6 +69,7 @@ impl Actor for PingPong {
println!();
myself.stop(None);
}
Ok(())
}
}

Expand Down
Loading

0 comments on commit b2bb1cc

Please sign in to comment.