Skip to content

Commit

Permalink
Migrating to Result<_,_> types for handlers
Browse files Browse the repository at this point in the history
This change migrates changes to using `Result<_,_>` return types for actor handlers, to minimize calling `panic!`s. This additionally passes dyn Errors up to handlers so supervisors can handle extended error information cleanly.

Resolves #33
  • Loading branch information
slawlor committed Jan 27, 2023
1 parent 12db101 commit bf52aee
Show file tree
Hide file tree
Showing 26 changed files with 1,141 additions and 269 deletions.
25 changes: 18 additions & 7 deletions ractor-cluster/src/net/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

//! TCP Server to accept incoming sessions
use ractor::{cast, Message};
use ractor::{cast, ActorProcessingErr, Message};
use ractor::{Actor, ActorRef};
use tokio::net::TcpListener;

Expand Down Expand Up @@ -48,31 +48,41 @@ impl Actor for Listener {

type State = ListenerState;

async fn pre_start(&self, myself: ActorRef<Self>) -> Self::State {
async fn pre_start(&self, myself: ActorRef<Self>) -> Result<Self::State, ActorProcessingErr> {
let addr = format!("0.0.0.0:{}", self.port);
let listener = match TcpListener::bind(&addr).await {
Ok(l) => l,
Err(err) => {
panic!("Error listening to socket: {}", err);
return Err(From::from(err));
}
};

// startup the event processing loop by sending an initial msg
let _ = myself.cast(ListenerMessage);

// create the initial state
Self::State {
Ok(Self::State {
listener: Some(listener),
}
})
}

async fn post_stop(&self, _myself: ActorRef<Self>, state: &mut Self::State) {
async fn post_stop(
&self,
_myself: ActorRef<Self>,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
// close the listener properly, in case anyone else has handles to the actor stopping
// total droppage
drop(state.listener.take());
Ok(())
}

async fn handle(&self, myself: ActorRef<Self>, _message: Self::Msg, state: &mut Self::State) {
async fn handle(
&self,
myself: ActorRef<Self>,
_message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
if let Some(listener) = &mut state.listener {
match listener.accept().await {
Ok((stream, addr)) => {
Expand All @@ -96,5 +106,6 @@ impl Actor for Listener {

// continue accepting new sockets
let _ = myself.cast(ListenerMessage);
Ok(())
}
}
74 changes: 54 additions & 20 deletions ractor-cluster/src/net/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::net::SocketAddr;

use bytes::Bytes;
use prost::Message;
use ractor::{Actor, ActorCell, ActorRef};
use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef};
use ractor::{SpawnErr, SupervisionEvent};
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
Expand Down Expand Up @@ -112,7 +112,7 @@ impl Actor for Session {
type Msg = SessionMessage;
type State = SessionState;

async fn pre_start(&self, myself: ActorRef<Self>) -> Self::State {
async fn pre_start(&self, myself: ActorRef<Self>) -> Result<Self::State, ActorProcessingErr> {
// spawn writer + reader child actors
let (writer, _) = Actor::spawn_linked(
None,
Expand All @@ -121,26 +121,34 @@ impl Actor for Session {
},
myself.get_cell(),
)
.await
.expect("Failed to start session writer");
.await?;
let (reader, _) = Actor::spawn_linked(
None,
SessionReader {
session: myself.clone(),
},
myself.get_cell(),
)
.await
.expect("Failed to start session reader");
.await?;

Self::State { writer, reader }
Ok(Self::State { writer, reader })
}

async fn post_stop(&self, _myself: ActorRef<Self>, _state: &mut Self::State) {
async fn post_stop(
&self,
_myself: ActorRef<Self>,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
log::info!("TCP Session closed for {}", self.peer_addr);
Ok(())
}

async fn handle(&self, _myself: ActorRef<Self>, message: Self::Msg, state: &mut Self::State) {
async fn handle(
&self,
_myself: ActorRef<Self>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
Self::Msg::SetStream(stream) => {
let (read, write) = stream.into_split();
Expand Down Expand Up @@ -169,14 +177,15 @@ impl Actor for Session {
.cast(crate::node::SessionMessage::MessageReceived(msg));
}
}
Ok(())
}

async fn handle_supervisor_evt(
&self,
myself: ActorRef<Self>,
message: SupervisionEvent,
state: &mut Self::State,
) {
) -> Result<(), ActorProcessingErr> {
// sockets open, they close, the world goes round... If a reader or writer exits for any reason, we'll start the shutdown procedure
// which requires that all actors exit
match message {
Expand Down Expand Up @@ -204,6 +213,7 @@ impl Actor for Session {
// all ok
}
}
Ok(())
}
}

Expand Down Expand Up @@ -242,18 +252,28 @@ where

type State = SessionWriterState;

async fn pre_start(&self, _myself: ActorRef<Self>) -> Self::State {
async fn pre_start(&self, _myself: ActorRef<Self>) -> Result<Self::State, ActorProcessingErr> {
// OK we've established connection, now we can process requests

Self::State { writer: None }
Ok(Self::State { writer: None })
}

async fn post_stop(&self, _myself: ActorRef<Self>, state: &mut Self::State) {
async fn post_stop(
&self,
_myself: ActorRef<Self>,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
// drop the channel to close it should we be exiting
drop(state.writer.take());
Ok(())
}

async fn handle(&self, _myself: ActorRef<Self>, message: Self::Msg, state: &mut Self::State) {
async fn handle(
&self,
myself: ActorRef<Self>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
SessionWriterMessage::SetStream(stream) if state.writer.is_none() => {
state.writer = Some(stream);
Expand All @@ -273,6 +293,8 @@ where
// now send the object
if let Err(write_err) = stream.write_all(&encoded_data).await {
log::warn!("Error writing to the stream '{}'", write_err);
myself.stop(Some("channel_closed".to_string()));
return Ok(());
}
// flush the stream
stream.flush().await.unwrap();
Expand All @@ -283,6 +305,7 @@ where
// no-op, wait for next send request
}
}
Ok(())
}
}

Expand Down Expand Up @@ -317,16 +340,26 @@ impl Actor for SessionReader {

type State = SessionReaderState;

async fn pre_start(&self, _myself: ActorRef<Self>) -> Self::State {
Self::State { reader: None }
async fn pre_start(&self, _myself: ActorRef<Self>) -> Result<Self::State, ActorProcessingErr> {
Ok(Self::State { reader: None })
}

async fn post_stop(&self, _myself: ActorRef<Self>, state: &mut Self::State) {
async fn post_stop(
&self,
_myself: ActorRef<Self>,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
// drop the channel to close it should we be exiting
drop(state.reader.take());
Ok(())
}

async fn handle(&self, myself: ActorRef<Self>, message: Self::Msg, state: &mut Self::State) {
async fn handle(
&self,
myself: ActorRef<Self>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
Self::Msg::SetStream(stream) if state.reader.is_none() => {
state.reader = Some(stream);
Expand All @@ -340,7 +373,7 @@ impl Actor for SessionReader {
let length = u64::from_be_bytes(buf.try_into().unwrap());
log::trace!("Payload length message ({}) received", length);
let _ = myself.cast(SessionReaderMessage::ReadObject(length));
return;
return Ok(());
}
Err(err) if err.kind() == ErrorKind::UnexpectedEof => {
log::trace!("Error (EOF) on stream");
Expand Down Expand Up @@ -385,7 +418,7 @@ impl Actor for SessionReader {
// EOF, close the stream by dropping the stream
drop(state.reader.take());
myself.stop(Some("channel_closed".to_string()));
return;
return Ok(());
}
Err(_other_err) => {
// TODO: some other TCP error, more handling necessary
Expand All @@ -401,5 +434,6 @@ impl Actor for SessionReader {
let _ = myself.cast(SessionReaderMessage::WaitForObject);
}
}
Ok(())
}
}
33 changes: 18 additions & 15 deletions ractor-cluster/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use tokio::net::TcpStream;
use std::collections::HashMap;
use std::{cmp::Ordering, collections::hash_map::Entry};

use ractor::{cast, Actor, ActorId, ActorRef, RpcReplyPort, SupervisionEvent};
use ractor::{cast, Actor, ActorId, ActorProcessingErr, ActorRef, RpcReplyPort, SupervisionEvent};

use crate::protocol::auth as auth_protocol;

Expand Down Expand Up @@ -234,14 +234,12 @@ impl NodeServerState {
impl Actor for NodeServer {
type Msg = SessionManagerMessage;
type State = NodeServerState;
async fn pre_start(&self, myself: ActorRef<Self>) -> Self::State {
async fn pre_start(&self, myself: ActorRef<Self>) -> Result<Self::State, ActorProcessingErr> {
let listener = crate::net::listener::Listener::new(self.port, myself.clone());

let (actor_ref, _) = Actor::spawn_linked(None, listener, myself.get_cell())
.await
.expect("Failed to start listener");
let (actor_ref, _) = Actor::spawn_linked(None, listener, myself.get_cell()).await?;

Self::State {
Ok(Self::State {
node_sessions: HashMap::new(),
listener: actor_ref,
node_id_counter: 0,
Expand All @@ -251,10 +249,15 @@ impl Actor for NodeServer {
}),
name: format!("{}@{}", self.node_name, self.hostname),
},
}
})
}

async fn handle(&self, myself: ActorRef<Self>, message: Self::Msg, state: &mut Self::State) {
async fn handle(
&self,
myself: ActorRef<Self>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
Self::Msg::ConnectionOpened { stream, is_server } => {
let node_id = state.node_id_counter;
Expand Down Expand Up @@ -292,14 +295,15 @@ impl Actor for NodeServer {
let _ = reply.send(state.check_peers(peer_name));
}
}
Ok(())
}

async fn handle_supervisor_evt(
&self,
myself: ActorRef<Self>,
message: SupervisionEvent,
state: &mut Self::State,
) {
) -> Result<(), ActorProcessingErr> {
match message {
SupervisionEvent::ActorPanicked(actor, msg) => {
if state.listener.get_id() == actor.get_id() {
Expand All @@ -312,9 +316,8 @@ impl Actor for NodeServer {
// trying to start the NodeServer
let listener = crate::net::listener::Listener::new(self.port, myself.clone());

let (actor_ref, _) = Actor::spawn_linked(None, listener, myself.get_cell())
.await
.expect("Failed to start listener");
let (actor_ref, _) =
Actor::spawn_linked(None, listener, myself.get_cell()).await?;
state.listener = actor_ref;
} else {
match state.node_sessions.entry(actor.get_id()) {
Expand Down Expand Up @@ -347,9 +350,8 @@ impl Actor for NodeServer {
// trying to start the NodeServer
let listener = crate::net::listener::Listener::new(self.port, myself.clone());

let (actor_ref, _) = Actor::spawn_linked(None, listener, myself.get_cell())
.await
.expect("Failed to start listener");
let (actor_ref, _) =
Actor::spawn_linked(None, listener, myself.get_cell()).await?;
state.listener = actor_ref;
} else {
match state.node_sessions.entry(actor.get_id()) {
Expand All @@ -375,5 +377,6 @@ impl Actor for NodeServer {
//no-op
}
}
Ok(())
}
}
Loading

0 comments on commit bf52aee

Please sign in to comment.