Skip to content

Commit

Permalink
Migrating to Result<_,_> types for handlers
Browse files Browse the repository at this point in the history
This change migrates changes to using `Result<_,_>` return types for actor handlers, to minimize calling `panic!`s. This additionally passes dyn Errors up to handlers so supervisors can handle extended error information cleanly.

Resolves #33
  • Loading branch information
slawlor committed Jan 27, 2023
1 parent 12db101 commit 31d3716
Show file tree
Hide file tree
Showing 31 changed files with 1,314 additions and 383 deletions.
39 changes: 26 additions & 13 deletions ractor-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,38 @@
// This source code is licensed under both the MIT license found in the
// LICENSE-MIT file in the root directory of this source tree.

//! Support for remote nodes in a distributed cluster.
//! # Support for remote nodes in a distributed cluster.
//!
//! A node is the same as [Erlang's definition](https://www.erlang.org/doc/reference_manual/distributed.html)
//! A **node** is the same as [Erlang's definition](https://www.erlang.org/doc/reference_manual/distributed.html)
//! for distributed Erlang, in that it's a remote "hosting" process in the distributed pool of processes.
//!
//! In this realization, nodes are simply actors which handle an external connection to the other nodes in the pool.
//! When nodes connect, they identify all of the nodes the remote node is also connected to and additionally connect
//! to them as well. They merge registries and pg groups together in order to create larger clusters of services.
//! When nodes connect and are authenticated, they spawn their remote-supporting local actors on the remote system
//! as `RemoteActor`s. The additionally handle synchronizing PG groups so the groups can contain both local
//! and remote actors.
//!
//! We have chosen protobuf for our inter-node defined protocol, however you can chose whatever medium you like
//! for binary serialization + deserialization. The "remote" actor will simply encode your message type and send it
//! over the wire for you
//!
//! ## A note on usage
//!
//! (Future) When nodes connect, they identify all of the nodes the remote node is also connected to and additionally connect
//! to them as well.
//!
//! ## Important note on message serialization
//!
//! An important note on usage, when utilizing `ractor-cluster` and [ractor] in the cluster configuration
//! (i.e. `ractor/cluster`), you no longer receive the auto-implementation for all types for [ractor::Message]. This
//! is due to specialization (see: <https://github.com/rust-lang/rust/issues/31844>). Ideally we'd have the trait have a
//! "default" non-serializable implementation for all types that could be messages, and specific implementations for
//! those that can be messages sent over the network. However this is presently a `+nightly` only functionality and
//! has a soundness hole in it's processes. Therefore as a workaround, when the `cluster` feature is enabled on [ractor]
//! the default implementation, specifically `impl<T: Any + Send + Sized + 'static> Message for T {}` is disabled.
//! has a soundness hole in it's definition and usage. Therefore as a workaround, when the `cluster` feature is enabled
//! on [ractor] the default implementation, specifically
//!
//! ```
//! impl<T: Any + Send + Sized + 'static> ractor::Message for T {}
//! ```
//! is disabled.
//!
//! This means that you need to specify the implementation of the [ractor::Message] trait on all message types, and when
//! they're not network supported messages, this is just a default empty implementation. When they **are** potentially
Expand All @@ -40,14 +50,17 @@

mod hash;
mod net;
pub mod node;
pub(crate) mod protocol;
pub(crate) mod remote_actor;
mod protocol;
mod remote_actor;

pub mod macros;

// Re-exports
pub use node::NodeServer;
pub mod node;

/// Node's are representing by an integer id
pub type NodeId = u64;

// ============== Re-exports ============== //
pub use node::client::connect as client_connect;
pub use node::{
client::ClientConnectErr, NodeServer, NodeSession, SessionManagerMessage, SessionMessage,
};
25 changes: 18 additions & 7 deletions ractor-cluster/src/net/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

//! TCP Server to accept incoming sessions
use ractor::{cast, Message};
use ractor::{cast, ActorProcessingErr, Message};
use ractor::{Actor, ActorRef};
use tokio::net::TcpListener;

Expand Down Expand Up @@ -48,31 +48,41 @@ impl Actor for Listener {

type State = ListenerState;

async fn pre_start(&self, myself: ActorRef<Self>) -> Self::State {
async fn pre_start(&self, myself: ActorRef<Self>) -> Result<Self::State, ActorProcessingErr> {
let addr = format!("0.0.0.0:{}", self.port);
let listener = match TcpListener::bind(&addr).await {
Ok(l) => l,
Err(err) => {
panic!("Error listening to socket: {}", err);
return Err(From::from(err));
}
};

// startup the event processing loop by sending an initial msg
let _ = myself.cast(ListenerMessage);

// create the initial state
Self::State {
Ok(Self::State {
listener: Some(listener),
}
})
}

async fn post_stop(&self, _myself: ActorRef<Self>, state: &mut Self::State) {
async fn post_stop(
&self,
_myself: ActorRef<Self>,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
// close the listener properly, in case anyone else has handles to the actor stopping
// total droppage
drop(state.listener.take());
Ok(())
}

async fn handle(&self, myself: ActorRef<Self>, _message: Self::Msg, state: &mut Self::State) {
async fn handle(
&self,
myself: ActorRef<Self>,
_message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
if let Some(listener) = &mut state.listener {
match listener.accept().await {
Ok((stream, addr)) => {
Expand All @@ -96,5 +106,6 @@ impl Actor for Listener {

// continue accepting new sockets
let _ = myself.cast(ListenerMessage);
Ok(())
}
}
74 changes: 54 additions & 20 deletions ractor-cluster/src/net/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::net::SocketAddr;

use bytes::Bytes;
use prost::Message;
use ractor::{Actor, ActorCell, ActorRef};
use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef};
use ractor::{SpawnErr, SupervisionEvent};
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
Expand Down Expand Up @@ -112,7 +112,7 @@ impl Actor for Session {
type Msg = SessionMessage;
type State = SessionState;

async fn pre_start(&self, myself: ActorRef<Self>) -> Self::State {
async fn pre_start(&self, myself: ActorRef<Self>) -> Result<Self::State, ActorProcessingErr> {
// spawn writer + reader child actors
let (writer, _) = Actor::spawn_linked(
None,
Expand All @@ -121,26 +121,34 @@ impl Actor for Session {
},
myself.get_cell(),
)
.await
.expect("Failed to start session writer");
.await?;
let (reader, _) = Actor::spawn_linked(
None,
SessionReader {
session: myself.clone(),
},
myself.get_cell(),
)
.await
.expect("Failed to start session reader");
.await?;

Self::State { writer, reader }
Ok(Self::State { writer, reader })
}

async fn post_stop(&self, _myself: ActorRef<Self>, _state: &mut Self::State) {
async fn post_stop(
&self,
_myself: ActorRef<Self>,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
log::info!("TCP Session closed for {}", self.peer_addr);
Ok(())
}

async fn handle(&self, _myself: ActorRef<Self>, message: Self::Msg, state: &mut Self::State) {
async fn handle(
&self,
_myself: ActorRef<Self>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
Self::Msg::SetStream(stream) => {
let (read, write) = stream.into_split();
Expand Down Expand Up @@ -169,14 +177,15 @@ impl Actor for Session {
.cast(crate::node::SessionMessage::MessageReceived(msg));
}
}
Ok(())
}

async fn handle_supervisor_evt(
&self,
myself: ActorRef<Self>,
message: SupervisionEvent,
state: &mut Self::State,
) {
) -> Result<(), ActorProcessingErr> {
// sockets open, they close, the world goes round... If a reader or writer exits for any reason, we'll start the shutdown procedure
// which requires that all actors exit
match message {
Expand Down Expand Up @@ -204,6 +213,7 @@ impl Actor for Session {
// all ok
}
}
Ok(())
}
}

Expand Down Expand Up @@ -242,18 +252,28 @@ where

type State = SessionWriterState;

async fn pre_start(&self, _myself: ActorRef<Self>) -> Self::State {
async fn pre_start(&self, _myself: ActorRef<Self>) -> Result<Self::State, ActorProcessingErr> {
// OK we've established connection, now we can process requests

Self::State { writer: None }
Ok(Self::State { writer: None })
}

async fn post_stop(&self, _myself: ActorRef<Self>, state: &mut Self::State) {
async fn post_stop(
&self,
_myself: ActorRef<Self>,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
// drop the channel to close it should we be exiting
drop(state.writer.take());
Ok(())
}

async fn handle(&self, _myself: ActorRef<Self>, message: Self::Msg, state: &mut Self::State) {
async fn handle(
&self,
myself: ActorRef<Self>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
SessionWriterMessage::SetStream(stream) if state.writer.is_none() => {
state.writer = Some(stream);
Expand All @@ -273,6 +293,8 @@ where
// now send the object
if let Err(write_err) = stream.write_all(&encoded_data).await {
log::warn!("Error writing to the stream '{}'", write_err);
myself.stop(Some("channel_closed".to_string()));
return Ok(());
}
// flush the stream
stream.flush().await.unwrap();
Expand All @@ -283,6 +305,7 @@ where
// no-op, wait for next send request
}
}
Ok(())
}
}

Expand Down Expand Up @@ -317,16 +340,26 @@ impl Actor for SessionReader {

type State = SessionReaderState;

async fn pre_start(&self, _myself: ActorRef<Self>) -> Self::State {
Self::State { reader: None }
async fn pre_start(&self, _myself: ActorRef<Self>) -> Result<Self::State, ActorProcessingErr> {
Ok(Self::State { reader: None })
}

async fn post_stop(&self, _myself: ActorRef<Self>, state: &mut Self::State) {
async fn post_stop(
&self,
_myself: ActorRef<Self>,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
// drop the channel to close it should we be exiting
drop(state.reader.take());
Ok(())
}

async fn handle(&self, myself: ActorRef<Self>, message: Self::Msg, state: &mut Self::State) {
async fn handle(
&self,
myself: ActorRef<Self>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
Self::Msg::SetStream(stream) if state.reader.is_none() => {
state.reader = Some(stream);
Expand All @@ -340,7 +373,7 @@ impl Actor for SessionReader {
let length = u64::from_be_bytes(buf.try_into().unwrap());
log::trace!("Payload length message ({}) received", length);
let _ = myself.cast(SessionReaderMessage::ReadObject(length));
return;
return Ok(());
}
Err(err) if err.kind() == ErrorKind::UnexpectedEof => {
log::trace!("Error (EOF) on stream");
Expand Down Expand Up @@ -385,7 +418,7 @@ impl Actor for SessionReader {
// EOF, close the stream by dropping the stream
drop(state.reader.take());
myself.stop(Some("channel_closed".to_string()));
return;
return Ok(());
}
Err(_other_err) => {
// TODO: some other TCP error, more handling necessary
Expand All @@ -401,5 +434,6 @@ impl Actor for SessionReader {
let _ = myself.cast(SessionReaderMessage::WaitForObject);
}
}
Ok(())
}
}
Loading

0 comments on commit 31d3716

Please sign in to comment.