diff --git a/Cargo.toml b/Cargo.toml index 0515a6f6..eae5cf38 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ members = [ "ractor", + "ractor-cluster", "ractor-playground", "xtask" ] diff --git a/ractor-cluster/Cargo.toml b/ractor-cluster/Cargo.toml new file mode 100644 index 00000000..36457908 --- /dev/null +++ b/ractor-cluster/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "ractor-cluster" +version = "0.4.0" +authors = ["Sean Lawlor", "Evan Au", "Dillon George"] +description = "Distributed cluster environment of Ractor actors" +documentation = "https://docs.rs/ractor" +license = "MIT" +edition = "2018" +keywords = ["actor", "ractor", "cluster"] +repository = "https://github.com/slawlor/ractor" +readme = "../README.md" +homepage = "https://github.com/slawlor/ractor" +categories = ["actor", "erlang"] +build = "src/build.rs" + +[build-dependencies] +protobuf-src = "1" +prost-build = { version = "0.11" } + +[dependencies] +## Required dependencies +async-trait = "0.1" +bytes = { version = "1" } +# dashmap = "5" +# futures = "0.3" +log = "0.4" +# once_cell = "1" +prost = { version = "0.11" } +ractor = { version = "0.4", features = ["cluster"], path = "../ractor" } +rand = "0.8" +sha2 = "0.10" +tokio = { version = "1", features = ["rt", "time", "sync", "macros", "net"]} + +## Optional dependencies +# tokio-rustls = { version = "0.23", optional = true } + +[dev-dependencies] +tokio = { version = "1", features = ["rt", "time", "sync", "macros", "rt-multi-thread"] } diff --git a/ractor-cluster/src/build.rs b/ractor-cluster/src/build.rs new file mode 100644 index 00000000..d1b4fc48 --- /dev/null +++ b/ractor-cluster/src/build.rs @@ -0,0 +1,31 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! This is the pre-compilation build script for the crate `ractor` when running in distrubted +//! mode. It's used to compile protobuf into Rust code prior to compilation. + +/// The shared-path for all protobuf specifications +const PROTOBUF_BASE_DIRECTORY: &str = "src/protocol"; +/// The list of protobuf files to generate inside PROBUF_BASE_DIRECTORY +const PROTOBUF_FILES: [&str; 3] = ["meta", "node", "auth"]; + +fn build_protobufs() { + std::env::set_var("PROTOC", protobuf_src::protoc()); + + let mut protobuf_files = Vec::with_capacity(PROTOBUF_FILES.len()); + + for file in PROTOBUF_FILES.iter() { + let proto_file = format!("{}/{}.proto", PROTOBUF_BASE_DIRECTORY, file); + println!("cargo:rerun-if-changed={}", proto_file); + protobuf_files.push(proto_file); + } + + prost_build::compile_protos(&protobuf_files, &[PROTOBUF_BASE_DIRECTORY]).unwrap(); +} + +fn main() { + // compile the spec files into Rust code + build_protobufs(); +} diff --git a/ractor-cluster/src/hash.rs b/ractor-cluster/src/hash.rs new file mode 100644 index 00000000..fc03b11e --- /dev/null +++ b/ractor-cluster/src/hash.rs @@ -0,0 +1,25 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Hashing utilities mainly used around challenge computation + +pub(crate) const DIGEST_BYTES: usize = 32; +pub(crate) type Digest = [u8; DIGEST_BYTES]; + +/// Compute a challenge digest +pub(crate) fn challenge_digest(secret: &'_ str, challenge: u32) -> Digest { + use sha2::Digest; + + let secret_bytes = secret.as_bytes(); + let mut data = Vec::with_capacity(secret_bytes.len() + 4); + + let challenge_bytes = challenge.to_be_bytes(); + data.copy_from_slice(&challenge_bytes); + data[4..].copy_from_slice(secret_bytes); + + let hash = sha2::Sha256::digest(&data); + + hash.into() +} diff --git a/ractor/src/distributed/mod.rs b/ractor-cluster/src/lib.rs similarity index 53% rename from ractor/src/distributed/mod.rs rename to ractor-cluster/src/lib.rs index acdf01ea..477408a8 100644 --- a/ractor/src/distributed/mod.rs +++ b/ractor-cluster/src/lib.rs @@ -4,33 +4,33 @@ // LICENSE-MIT file in the root directory of this source tree. //! Support for remote nodes in a distributed cluster. -//! +//! //! A node is the same as [Erlang's definition](https://www.erlang.org/doc/reference_manual/distributed.html) //! for distributed Erlang, in that it's a remote "hosting" process in the distributed pool of processes. -//! +//! //! In this realization, nodes are simply actors which handle an external connection to the other nodes in the pool. //! When nodes connect, they identify all of the nodes the remote node is also connected to and additionally connect //! to them as well. They merge registries and pg groups together in order to create larger clusters of services. -//! -//! For messages to be transmittable across the [Node] boundaries to other [Node]s in the pool, they need to be -//! serializable to a binary format (say protobuf) +//! +//! We have chosen protobuf for our inter-node defined protocol, however you can chose whatever medium you like +//! for binary serialization + deserialization. The "remote" actor will simply encode your message type and send it +//! over the wire for you -use dashmap::DashMap; +// #![deny(warnings)] +#![warn(unused_imports)] +#![warn(unsafe_code)] +#![warn(missing_docs)] +#![warn(unused_crate_dependencies)] +#![cfg_attr(docsrs, feature(doc_cfg))] -/// Represents messages that can cross the node boundary which can be serialized and sent over the wire -pub trait NodeSerializableMessage { - /// Serialize the message to binary - fn serialize(&self) -> &[u8]; +mod hash; +mod net; +pub mod node; +pub mod protocol; - /// Deserialize from binary back into the message type - fn deserialize(&self, data: &[u8]) -> Self; -} +// Re-exports -/// The identifier of a node is a globally unique u64 -pub type NodeId = u64; +pub use node::NodeServer; -/// A node in the distributed compute pool. -pub struct Node { - node_id: u64, - other_nodes: DashMap, -} \ No newline at end of file +/// Node's are representing by an integer id +pub type NodeId = u64; diff --git a/ractor-cluster/src/net/listener.rs b/ractor-cluster/src/net/listener.rs new file mode 100644 index 00000000..a61667be --- /dev/null +++ b/ractor-cluster/src/net/listener.rs @@ -0,0 +1,97 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! TCP Server to accept incoming sessions + +use ractor::cast; +use ractor::{Actor, ActorRef}; +use tokio::net::TcpListener; + +use crate::node::SessionManagerMessage; + +/// A Tcp Socket [Listener] responsible for accepting new connections and spawning [super::session::Session]s +/// which handle the message sending and receiving over the socket. +/// +/// The [Listener] supervises all of the TCP [super::session::Session] actors and is responsible for logging +/// connects and disconnects as well as tracking the current open [super::session::Session] actors. +pub struct Listener { + port: super::NetworkPort, + session_manager: ActorRef, +} + +impl Listener { + /// Create a new `Listener` + pub fn new( + port: super::NetworkPort, + session_manager: ActorRef, + ) -> Self { + Self { + port, + session_manager, + } + } +} + +/// The Node listener's state +pub struct ListenerState { + listener: Option, +} + +#[async_trait::async_trait] +impl Actor for Listener { + type Msg = (); + + type State = ListenerState; + + async fn pre_start(&self, myself: ActorRef) -> Self::State { + let addr = format!("0.0.0.0:{}", self.port); + let listener = match TcpListener::bind(&addr).await { + Ok(l) => l, + Err(err) => { + panic!("Error listening to socket: {}", err); + } + }; + + // startup the event processing loop by sending an initial msg + let _ = myself.cast(()); + + // create the initial state + Self::State { + listener: Some(listener), + } + } + + async fn post_stop(&self, _myself: ActorRef, state: &mut Self::State) { + // close the listener properly, in case anyone else has handles to the actor stopping + // total droppage + drop(state.listener.take()); + } + + async fn handle(&self, myself: ActorRef, _message: Self::Msg, state: &mut Self::State) { + if let Some(listener) = &mut state.listener { + match listener.accept().await { + Ok((stream, addr)) => { + let _ = cast!( + self.session_manager, + SessionManagerMessage::ConnectionOpened { + stream, + is_server: true + } + ); + log::info!("TCP Session opened for {}", addr); + } + Err(socket_accept_error) => { + log::warn!( + "Error accepting socket {} on Node server", + socket_accept_error + ); + } + } + } + + // continue accepting new sockets + let _ = myself.cast(()); + } +} diff --git a/ractor-cluster/src/net/mod.rs b/ractor-cluster/src/net/mod.rs new file mode 100644 index 00000000..33f4c3bb --- /dev/null +++ b/ractor-cluster/src/net/mod.rs @@ -0,0 +1,21 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! TCP server and session actors which transmit [prost::Message] encoded messages + +// TODO: we need a way to identify which session messages are coming from + going to. Therefore +// we should actually have a notification when a new session is launched, which can be used +// to match which session is tied to which actor id + +pub mod listener; +pub mod session; + +/// A trait which implements [prost::Message], [Default], and has a static lifetime +/// denoting protobuf-encoded messages which can be transmitted over the wire +pub trait NetworkMessage: prost::Message + Default + 'static {} +impl NetworkMessage for T {} + +/// A network port +pub type NetworkPort = u16; diff --git a/ractor-cluster/src/net/session.rs b/ractor-cluster/src/net/session.rs new file mode 100644 index 00000000..a54ce9b8 --- /dev/null +++ b/ractor-cluster/src/net/session.rs @@ -0,0 +1,377 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! TCP session actor which is managing the specific communication to a node + +// TODO: RUSTLS + Tokio : https://github.com/tokio-rs/tls/blob/master/tokio-rustls/examples/server/src/main.rs + +use std::marker::PhantomData; +use std::net::SocketAddr; + +use bytes::Bytes; +use prost::Message; +use ractor::{Actor, ActorCell, ActorRef}; +use ractor::{SpawnErr, SupervisionEvent}; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; +use tokio::io::ErrorKind; +use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; +use tokio::net::TcpStream; + +use super::NetworkMessage; + +/// Helper method to read exactly `len` bytes from the stream into a pre-allocated buffer +/// of bytes +async fn read_n_bytes(stream: &mut OwnedReadHalf, len: usize) -> Result, tokio::io::Error> { + let mut buf = Vec::with_capacity(len); + let mut c_len = 0; + while c_len < len { + let n = stream.read(&mut buf[c_len..]).await?; + c_len += n; + } + Ok(buf) +} + +// ========================= Node Session actor ========================= // + +/// Represents a bi-directional tcp connection along with send + receive operations +/// +/// The [Session] actor supervises two child actors, [SessionReader] and [SessionWriter]. Should +/// either the reader or writer exit, they will terminate the entire session. +pub struct Session { + pub(crate) handler: ActorRef, + pub(crate) addr: SocketAddr, +} + +impl Session { + pub(crate) async fn spawn_linked( + handler: ActorRef, + stream: TcpStream, + addr: SocketAddr, + supervisor: ActorCell, + ) -> Result, SpawnErr> { + match Actor::spawn_linked(None, Session { handler, addr }, supervisor).await { + Err(err) => { + log::error!("Failed to spawn session writer actor: {}", err); + Err(err) + } + Ok((a, _)) => { + // intiialize this actor & its children + let _ = a.cast(SessionMessage::SetStream(stream)); + // return the actor handle + Ok(a) + } + } + } +} + +/// The node connection messages +pub enum SessionMessage { + /// Set the session's tcp stream, which initializes all underlying states + SetStream(TcpStream), + + /// Send a message over the channel + Send(crate::protocol::NetworkMessage), + + /// An object was received on the channel + ObjectAvailable(crate::protocol::NetworkMessage), +} + +/// The node session's state +pub struct SessionState { + writer: ActorRef>, + reader: ActorRef, +} + +#[async_trait::async_trait] +impl Actor for Session { + type Msg = SessionMessage; + type State = SessionState; + + async fn pre_start(&self, myself: ActorRef) -> Self::State { + // spawn writer + reader child actors + let (writer, _) = Actor::spawn_linked( + None, + SessionWriter:: { + _phantom: PhantomData, + }, + myself.get_cell(), + ) + .await + .expect("Failed to start session writer"); + let (reader, _) = Actor::spawn_linked( + None, + SessionReader { + session: myself.clone(), + }, + myself.get_cell(), + ) + .await + .expect("Failed to start session reader"); + + Self::State { writer, reader } + } + + async fn post_stop(&self, _myself: ActorRef, _state: &mut Self::State) { + log::info!("TCP Session closed for {}", self.addr); + } + + async fn handle(&self, _myself: ActorRef, message: Self::Msg, state: &mut Self::State) { + match message { + Self::Msg::SetStream(stream) => { + let (read, write) = stream.into_split(); + // initialize the writer & reader state's + let _ = state.writer.cast(SessionWriterMessage::SetStream(write)); + let _ = state.reader.cast(SessionReaderMessage::SetStream(read)); + } + Self::Msg::Send(msg) => { + let _ = state.writer.cast(SessionWriterMessage::WriteObject(msg)); + } + Self::Msg::ObjectAvailable(msg) => { + let _ = self + .handler + .cast(crate::node::SessionMessage::MessageReceived(msg)); + } + } + } + + async fn handle_supervisor_evt( + &self, + myself: ActorRef, + message: SupervisionEvent, + state: &mut Self::State, + ) { + // sockets open, they close, the world goes round... If a reader or writer exits for any reason, we'll start the shutdown procedure + // which requires that all actors exit + match message { + SupervisionEvent::ActorPanicked(actor, panic_msg) => { + if actor.get_id() == state.reader.get_id() { + log::error!("TCP Session's reader panicked with '{}'", panic_msg); + } else if actor.get_id() == state.writer.get_id() { + log::error!("TCP Session's writer panicked with '{}'", panic_msg); + } else { + log::error!("TCP Session received a child panic from an unknown child actor ({}) - '{}'", actor.get_id(), panic_msg); + } + myself.stop(Some("child_panic".to_string())); + } + SupervisionEvent::ActorTerminated(actor, _, exit_reason) => { + if actor.get_id() == state.reader.get_id() { + log::debug!("TCP Session's reader exited"); + } else if actor.get_id() == state.writer.get_id() { + log::debug!("TCP Session's writer exited"); + } else { + log::warn!("TCP Session received a child exit from an unknown child actor ({}) - '{:?}'", actor.get_id(), exit_reason); + } + myself.stop(Some("child_terminate".to_string())); + } + _ => { + // all ok + } + } + } +} + +// ========================= Node Session writer ========================= // + +struct SessionWriter +where + TMsg: NetworkMessage, +{ + _phantom: PhantomData, +} + +struct SessionWriterState { + writer: Option, +} + +enum SessionWriterMessage +where + TMsg: NetworkMessage, +{ + /// Set the stream, providing a [TcpStream], which + /// to utilize for this node's connection + SetStream(OwnedWriteHalf), + + /// Write an object over the wire + WriteObject(TMsg), +} + +#[async_trait::async_trait] +impl Actor for SessionWriter +where + TMsg: NetworkMessage, +{ + type Msg = SessionWriterMessage; + + type State = SessionWriterState; + + async fn pre_start(&self, _myself: ActorRef) -> Self::State { + // OK we've established connection, now we can process requests + + Self::State { writer: None } + } + + async fn post_stop(&self, _myself: ActorRef, state: &mut Self::State) { + // drop the channel to close it should we be exiting + drop(state.writer.take()); + } + + async fn handle(&self, _myself: ActorRef, message: Self::Msg, state: &mut Self::State) { + match message { + Self::Msg::SetStream(stream) if state.writer.is_none() => { + state.writer = Some(stream); + } + Self::Msg::WriteObject(msg) if state.writer.is_some() => { + if let Some(stream) = &mut state.writer { + stream.writable().await.unwrap(); + + let length = msg.encoded_len(); + + // encode the length into a buffer and first write those bytes + let mut length_buf = Vec::with_capacity(10); + prost::encode_length_delimiter(length, &mut length_buf).unwrap(); + if let Err(write_err) = stream.write_all(&length_buf).await { + log::warn!("Error writing to the stream: '{}'", write_err); + } else { + // Serialize the full object and write it over the wire + let mut buf = Vec::with_capacity(length); + msg.encode(&mut buf).unwrap(); + if let Err(write_err) = stream.write_all(&buf).await { + log::warn!("Error writing to the stream: '{}'", write_err); + } + } + } + } + _ => { + // no-op, wait for next send request + } + } + } +} + +// ========================= Node Session reader ========================= // + +struct SessionReader { + session: ActorRef, +} + +/// The node connection messages +pub enum SessionReaderMessage { + /// Set the stream, providing a [TcpStream], which + /// to utilize for this node's connection + SetStream(OwnedReadHalf), + + /// Wait for an object from the stream + WaitForObject, + + /// Read next object off the stream + ReadObject(usize), +} + +struct SessionReaderState { + reader: Option, +} + +#[async_trait::async_trait] +impl Actor for SessionReader { + type Msg = SessionReaderMessage; + + type State = SessionReaderState; + + async fn pre_start(&self, _myself: ActorRef) -> Self::State { + Self::State { reader: None } + } + + async fn post_stop(&self, _myself: ActorRef, state: &mut Self::State) { + // drop the channel to close it should we be exiting + drop(state.reader.take()); + } + + async fn handle(&self, myself: ActorRef, message: Self::Msg, state: &mut Self::State) { + match message { + Self::Msg::SetStream(stream) if state.reader.is_none() => { + state.reader = Some(stream); + // wait for an incoming object + let _ = myself.cast(SessionReaderMessage::WaitForObject); + } + Self::Msg::WaitForObject if state.reader.is_some() => { + if let Some(stream) = &mut state.reader { + stream.readable().await.unwrap(); + match read_n_bytes(stream, 10).await { + Ok(buf) => { + let bytes = Bytes::from(buf); + match prost::decode_length_delimiter(bytes) { + Ok(protobuf_len) => { + let _ = + myself.cast(SessionReaderMessage::ReadObject(protobuf_len)); + return; + } + Err(decode_err) => { + log::warn!( + "Failed to decode protobuf object length with {}", + decode_err + ); + // continue processing + } + } + } + Err(err) if err.kind() == ErrorKind::UnexpectedEof => { + // EOF, close the stream by dropping the stream + drop(state.reader.take()); + myself.stop(Some("channel_closed".to_string())); + } + Err(_other_err) => { + // some other TCP error, more handling necessary + } + } + } + + let _ = myself.cast(SessionReaderMessage::WaitForObject); + } + Self::Msg::ReadObject(length) if state.reader.is_some() => { + if let Some(stream) = &mut state.reader { + match read_n_bytes(stream, length).await { + Ok(buf) => { + // NOTE: Our implementation writes 2 messages when sending something over the wire, the first + // is exactly 10 bytes which constitute the length of the payload message, followed by the payload. + // This tells our TCP reader how much data to read off the wire + + // [buf] here should contain the exact amount of data to decode an object properly. + let bytes = Bytes::from(buf); + match crate::protocol::NetworkMessage::decode(bytes) { + Ok(msg) => { + // we decoded a message, pass it up the chain + let _ = self.session.cast(SessionMessage::ObjectAvailable(msg)); + } + Err(decode_err) => { + log::error!( + "Error decoding network message: '{}'. Discarding", + decode_err + ); + } + } + } + Err(err) if err.kind() == ErrorKind::UnexpectedEof => { + // EOF, close the stream by dropping the stream + drop(state.reader.take()); + myself.stop(Some("channel_closed".to_string())); + return; + } + Err(_other_err) => { + // TODO: some other TCP error, more handling necessary + } + } + } + + // we've read the object, now wait for next object + let _ = myself.cast(SessionReaderMessage::WaitForObject); + } + _ => { + // no stream is available, keep looping until one is available + let _ = myself.cast(SessionReaderMessage::WaitForObject); + } + } + } +} diff --git a/ractor-cluster/src/node/auth.rs b/ractor-cluster/src/node/auth.rs new file mode 100644 index 00000000..4d45cea4 --- /dev/null +++ b/ractor-cluster/src/node/auth.rs @@ -0,0 +1,168 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Define's a node's authentication process between peers. Definition +//! can be found in [Erlang's handshake](https://www.erlang.org/doc/apps/erts/erl_dist_protocol.html) + +use rand::RngCore; + +use crate::hash::Digest; +use crate::protocol::auth as proto; + +/// Server authentication FSM +pub(crate) enum ServerAuthenticationProcess { + /// (1) Client initiates handshake by sending their peer name + WaitingOnPeerName, + + /// (2) We have the peer name, and have replied with our own [proto::ServerStatus] + /// reply + HavePeerName(proto::NameMessage), + + /// (2B) Waiting on the client's status (true/false), if [proto::ClientStatus] was `alive` + WaitingOnClientStatus, + + /// (3) Waiting on the client's reply to the [proto::Challenge] from the server. + /// State is the name message from the client, the challenge, and the expected digest reply + /// from the client + /// + /// Arguments are the challenge to send to the client and the expected digest we should get back + WaitingOnClientChallengeReply(u32, Digest), + + /// (4) We processed the client challenge value, and replied and we're ok with the channel. + /// The client has the final decision after they check our challenge computation which we send + /// with [proto::ChallengeAck] + /// + /// Argument is the digest to send to the client + Ok(Digest), + + /// Close + Close, +} + +impl ServerAuthenticationProcess { + /// Initialize the FSM state + pub fn init() -> Self { + Self::WaitingOnPeerName + } + + pub fn start_challenge(&self, cookie: &'_ str) -> Self { + if matches!(self, Self::WaitingOnClientStatus | Self::HavePeerName(_)) { + let challenge = rand::thread_rng().next_u32(); + let digest = crate::hash::challenge_digest(cookie, challenge); + Self::WaitingOnClientChallengeReply(challenge, digest) + } else { + Self::Close + } + } + + /// Implement the FSM state transitions + pub fn next(&self, auth_message: proto::AuthenticationMessage, cookie: &'_ str) -> Self { + if let Some(msg) = auth_message.msg { + match msg { + proto::authentication_message::Msg::Name(name) => { + if let Self::WaitingOnPeerName = &self { + return Self::HavePeerName(name); + } + } + proto::authentication_message::Msg::ClientStatus(status) => { + if let Self::WaitingOnClientStatus = &self { + // client says to not continue the session + if !status.status { + return Self::Close; + } else { + return self.start_challenge(cookie); + } + } + } + proto::authentication_message::Msg::ClientChallenge(challenge_reply) => { + if let Self::WaitingOnClientChallengeReply(_, digest) = &self { + if digest.to_vec() == challenge_reply.digest { + let reply_digest = + crate::hash::challenge_digest(cookie, challenge_reply.challenge); + return Self::Ok(reply_digest); + } else { + // digest's don't match! + return Self::Close; + } + } + } + _ => {} + } + } + // received either an empty message or an out-of-order message. The node can't be trusted + Self::Close + } +} + +/// Client authentication FSM +pub(crate) enum ClientAuthenticationProcess { + /// (1) After the client has sent their peer name + /// they wait for the [proto::ServerStatus] from the server + WaitingForServerStatus, + + /// (2) We've potentially sent our client status. Either way + /// we're waiting for the [proto::Challenge] from the server + WaitingForServerChallenge(proto::ServerStatus), + + /// (3) We've sent our challenge to the server, and we're waiting + /// on the server's calculation to determine if we should open the + /// channel. State is our challenge value and the expected digest + /// + /// Arguments are servers_challenge, server_digest_reply, client_challenge_value, expected_digest + WaitingForServerChallengeAck(proto::Challenge, Digest, u32, Digest), + + /// (4) We've validated the server's challenge digest and agree + /// that the channel is now open for node inter-communication + Ok, + + /// Close + Close, +} + +impl ClientAuthenticationProcess { + /// Initialize the FSM state + pub fn init() -> Self { + Self::WaitingForServerStatus + } + + /// Implement the client FSM transitions + pub fn next(&self, auth_message: proto::AuthenticationMessage, cookie: &'_ str) -> Self { + if let Some(msg) = auth_message.msg { + match msg { + proto::authentication_message::Msg::ServerStatus(status) => { + if let Self::WaitingForServerStatus = &self { + return Self::WaitingForServerChallenge(status); + } + } + proto::authentication_message::Msg::ServerChallenge(challenge_msg) => { + if let Self::WaitingForServerChallenge(_) = &self { + let server_digest = + crate::hash::challenge_digest(cookie, challenge_msg.challenge); + let challenge = rand::thread_rng().next_u32(); + let expected_digest = crate::hash::challenge_digest(cookie, challenge); + return Self::WaitingForServerChallengeAck( + challenge_msg, + server_digest, + challenge, + expected_digest, + ); + } + } + proto::authentication_message::Msg::ServerAck(challenge_ack) => { + if let Self::WaitingForServerChallengeAck(_, _, _, expected_digest) = &self { + if expected_digest.to_vec() == challenge_ack.digest { + return Self::Ok; + } else { + return Self::Close; + } + } + } + _ => {} + } + } + // received either an empty message or an out-of-order message. The node can't be trusted + Self::Close + } +} diff --git a/ractor-cluster/src/node/client.rs b/ractor-cluster/src/node/client.rs new file mode 100644 index 00000000..a3680011 --- /dev/null +++ b/ractor-cluster/src/node/client.rs @@ -0,0 +1,73 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! This module contains the logic for initiating client requests to other [super::NodeServer]s + +use ractor::{cast, ActorRef, MessagingErr, SpawnErr}; +use tokio::net::TcpStream; + +/// Client connection error types +pub enum ClientConnectError { + /// Socket failed to bind, returning the underlying tokio error + Socket(tokio::io::Error), + /// Error communicating to the [super::NodeServer] actor. Actor receiving port is + /// closed + Messaging(MessagingErr), + /// A timeout in trying to start a new [NodeSession] + Timeout, + /// Error spawning the tcp session actor supervision tree + TcpSpawn(SpawnErr), +} + +impl From for ClientConnectError { + fn from(value: tokio::io::Error) -> Self { + Self::Socket(value) + } +} + +impl From for ClientConnectError { + fn from(value: MessagingErr) -> Self { + Self::Messaging(value) + } +} + +impl From for ClientConnectError { + fn from(value: SpawnErr) -> Self { + Self::TcpSpawn(value) + } +} + +/// Connect to another [super::NodeServer] instance +/// +/// * `host` - The hostname to connect to +/// * `port` - The host's port to connect to +/// +/// Returns: [Ok(())] if the connection was successful and the [NodeSession] was started. Handshake will continue +/// automatically. Results in a [Err(ClientConnectError)] if any part of the process failed to initiate +pub async fn connect( + node_server: ActorRef, + host: &'static str, + port: crate::net::NetworkPort, +) -> Result<(), ClientConnectError> { + // connect to the socket + let stream = TcpStream::connect(format!("{host}:{port}")).await?; + + // Startup the TCP handler, linked to the newly created `NodeSession` + let addr = stream.peer_addr()?; + + let _ = cast!( + node_server, + super::SessionManagerMessage::ConnectionOpened { + stream, + is_server: false + } + ); + + // // notify the `NodeSession` about it's tcp connection + // let _ = session_handler.cast(super::SessionMessage::SetTcpSession(tcp_actor)); + log::info!("TCP Session opened for {}", addr); + + Ok(()) +} diff --git a/ractor-cluster/src/node/mod.rs b/ractor-cluster/src/node/mod.rs new file mode 100644 index 00000000..d80d8bdf --- /dev/null +++ b/ractor-cluster/src/node/mod.rs @@ -0,0 +1,375 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Erlang `node()` host communication for managing remote actor communication in +//! a cluster +//! +//! The supervision tree is the following +//! +//! [NodeServer] supervises +//! 1. The server-socket TCP [crate::net::listener::Listener] +//! 2. All of the individual [NodeSession]s +//! +//! Each [NodeSession] supervises +//! 1. The TCP [crate::net::session::Session] connection +//! 2. (todo) All of the remote referenced actors. That way if the overall node session closes (due to tcp err for example) will lose connectivity +//! to all of the remote actors +//! +//! Each [crate::net::session::Session] supervises +//! 1. A TCP writer actor (`crate::net::session::SessionWriter`) +//! 2. A TCP reader actor (`crate::net::session::SessionReader`) +//! -> If either child actor closes, then it will terminate the overall [crate::net::session::Session] which in +//! turn will terminate the [NodeSession] and the [NodeServer] will de-register the [NodeSession] from its +//! internal state +//! + +/* +TODO: + +Overview: + +A `NodeServer` handles opening the TCP listener and managing incoming and outgoing `NodeSession` requests. `NodeSession`s +will represent a remote server locally. + +Additionally, you can open a session as a "client" by requesting a new session from the NodeServer +after intially connecting a [TcpStream] to the desired endpoint and then attaching the NodeSession +to the TcpStream (and linking the actor). (See src/node/client.rs) + +What's there to do? +1. The inter-node messaging protocol -> Based heavily on https://www.erlang.org/doc/apps/erts/erl_dist_protocol.html#protocol-between-connected-nodes +2. Having a [NodeSession] manage child actors from the remote system +3. Remote-supportive actors, which support serializing their payloads over the wire +4. Populating the global + pg registries with remote registered actors +5. Adjustments in the default Message type which allow messages to be serializable. (with `cluster` feature enabled) +6. Allow actor id's to be set for remote actors, tied to a specific node session + +*/ + +pub mod auth; +pub mod client; +pub mod node_session; +pub use node_session::NodeSession; +use tokio::net::TcpStream; + +use std::collections::HashMap; +use std::{cmp::Ordering, collections::hash_map::Entry}; + +use ractor::{cast, Actor, ActorId, ActorRef, RpcReplyPort, SupervisionEvent}; + +use crate::protocol::auth as auth_protocol; + +const PROTOCOL_VERSION: u32 = 1; + +/// Reply to a [SessionManagerMessage::CheckSession] message +pub enum SessionCheckReply { + /// There is no other connection with this peer + NoOtherConnection, + /// There is another connection with this peer, and it + /// should continue. Shutdown this connection. + OtherConnectionContinues, + /// There is another connection with this peer, but + /// this connection should take over. Terminating the other + /// connection + ThisConnectionContinues, + /// There is another connection with the peer, + /// in the same format as this attempted connection. + /// Perhaps the other connection is dying or the peer is + /// confused + DuplicateConnection, +} + +impl From for auth_protocol::server_status::Status { + fn from(value: SessionCheckReply) -> Self { + match value { + SessionCheckReply::NoOtherConnection => Self::Ok, + SessionCheckReply::ThisConnectionContinues => Self::OkSimultaneous, + SessionCheckReply::OtherConnectionContinues => Self::NotOk, + SessionCheckReply::DuplicateConnection => Self::Alive, + } + } +} + +/// Messages to/from the session aggregator +pub enum SessionManagerMessage { + /// Notifies the session manager that a new incoming (`is_server = true`) or outgoing (`is_server = false`) + /// [TcpStream] was accepted + ConnectionOpened { + /// The [TcpStream] for this network connection + stream: TcpStream, + /// Flag denoting if it's a server (incoming) connection when [true], [false] for outgoing + is_server: bool, + }, + + /// A request to check if a session is currently open, and if it is is the ordering such that we should + /// reject the incoming request + /// + /// i.e. if A is connected to B and A.name > B.name, but then B connects to A, B's request to connect + /// to A should be rejected + CheckSession { + /// The peer's name to investigate + peer_name: auth_protocol::NameMessage, + /// Reply channel for RPC + reply: RpcReplyPort, + }, + + /// A request to update the session mapping with this now known node's name + UpdateSession { + /// The ID of the [NodeSession] actor + actor_id: ActorId, + /// The node's name (now that we've received it) + name: auth_protocol::NameMessage, + }, +} + +/// Message from the TCP [session::Session] actor and the +/// monitoring Sesson actor +pub enum SessionMessage { + /// The Session actor is setting it's handle + SetTcpStream(TcpStream), + + /// A network message was received from the network + MessageReceived(crate::protocol::NetworkMessage), +} + +/// Represents the server which is managing all node session instances +/// +/// The [NodeServer] supervises a single [crate::net::listener::Listener] actor which is +/// responsible for hosting a server port for incoming `node()` connections. It also supervises +/// all of the [NodeSession] actors which are tied to tcp sessions and manage the FSM around `node()`s +/// establishing inter connections. +pub struct NodeServer { + port: crate::net::NetworkPort, + cookie: String, + node_name: String, + hostname: String, +} + +impl NodeServer { + /// Create a new node server instance + pub fn new( + port: crate::net::NetworkPort, + cookie: String, + node_name: String, + hostname: String, + ) -> Self { + Self { + port, + cookie, + node_name, + hostname, + } + } +} + +struct NodeServerSessionInformation { + actor: ActorRef, + peer_name: Option, + is_server: bool, + node_id: u64, +} + +impl NodeServerSessionInformation { + fn new(actor: ActorRef, node_id: u64, is_server: bool) -> Self { + Self { + actor, + peer_name: None, + is_server, + node_id, + } + } + + fn update(&mut self, peer_name: auth_protocol::NameMessage) { + self.peer_name = Some(peer_name); + } +} + +/// The state of the node server +pub struct NodeServerState { + listener: ActorRef, + node_sessions: HashMap, + node_id_counter: u64, + this_node_name: auth_protocol::NameMessage, +} + +impl NodeServerState { + fn check_peers(&self, new_peer: auth_protocol::NameMessage) -> SessionCheckReply { + for (_key, value) in self.node_sessions.iter() { + if let Some(existing_peer) = &value.peer_name { + if existing_peer.name == new_peer.name { + match ( + existing_peer.name.cmp(&self.this_node_name.name), + value.is_server, + ) { + // the peer's name is > this node's name and they connected to us + // od + // the peer's name is < this node's name and we connected to them + (Ordering::Greater, true) | (Ordering::Less, false) => { + value.actor.stop(Some("duplicate_connection".to_string())); + return SessionCheckReply::OtherConnectionContinues; + } + (Ordering::Greater, false) | (Ordering::Less, true) => { + // the inverse of the first two conditions, terminate the other + // connection and let this one continue + return SessionCheckReply::ThisConnectionContinues; + } + _ => { + // something funky is going on... + return SessionCheckReply::DuplicateConnection; + } + } + } + } + } + SessionCheckReply::NoOtherConnection + } +} + +#[async_trait::async_trait] +impl Actor for NodeServer { + type Msg = SessionManagerMessage; + type State = NodeServerState; + async fn pre_start(&self, myself: ActorRef) -> Self::State { + let listener = crate::net::listener::Listener::new(self.port, myself.clone()); + + let (actor_ref, _) = Actor::spawn_linked(None, listener, myself.get_cell()) + .await + .expect("Failed to start listener"); + + Self::State { + node_sessions: HashMap::new(), + listener: actor_ref, + node_id_counter: 0, + this_node_name: auth_protocol::NameMessage { + flags: Some(auth_protocol::NodeFlags { + version: PROTOCOL_VERSION, + }), + name: format!("{}@{}", self.node_name, self.hostname), + }, + } + } + + async fn handle(&self, myself: ActorRef, message: Self::Msg, state: &mut Self::State) { + match message { + Self::Msg::ConnectionOpened { stream, is_server } => { + let node_id = state.node_id_counter; + if let Ok((actor, _)) = Actor::spawn_linked( + None, + NodeSession::new( + node_id, + is_server, + self.cookie.clone(), + myself.clone(), + state.this_node_name.clone(), + ), + myself.get_cell(), + ) + .await + { + let _ = cast!(actor, SessionMessage::SetTcpStream(stream)); + state.node_sessions.insert( + actor.get_id(), + NodeServerSessionInformation::new(actor.clone(), node_id, is_server), + ); + state.node_id_counter += 1; + } else { + // failed to startup actor, drop the socket + log::warn!("Failed to startup `NodeSession`, dropping connection"); + drop(stream); + } + } + Self::Msg::UpdateSession { actor_id, name } => { + if let Some(entry) = state.node_sessions.get_mut(&actor_id) { + entry.update(name); + } + } + Self::Msg::CheckSession { peer_name, reply } => { + let _ = reply.send(state.check_peers(peer_name)); + } + } + } + + async fn handle_supervisor_evt( + &self, + myself: ActorRef, + message: SupervisionEvent, + state: &mut Self::State, + ) { + match message { + SupervisionEvent::ActorPanicked(actor, msg) => { + if state.listener.get_id() == actor.get_id() { + log::error!( + "The Node server's TCP listener failed with '{}'. Respawning!", + msg + ); + + // try to re-create the listener. If it's a port-bind issue, we will have already panicked on + // trying to start the NodeServer + let listener = crate::net::listener::Listener::new(self.port, myself.clone()); + + let (actor_ref, _) = Actor::spawn_linked(None, listener, myself.get_cell()) + .await + .expect("Failed to start listener"); + state.listener = actor_ref; + } else { + match state.node_sessions.entry(actor.get_id()) { + Entry::Occupied(o) => { + log::warn!( + "Node session {:?} panicked with '{}'", + o.get().peer_name, + msg + ); + o.remove(); + } + Entry::Vacant(_) => { + log::warn!( + "An unknown actor ({:?}) panicked with '{}'", + actor.get_id(), + msg + ); + } + } + } + } + SupervisionEvent::ActorTerminated(actor, _, maybe_reason) => { + if state.listener.get_id() == actor.get_id() { + log::error!( + "The Node server's TCP listener exited with '{:?}'. Respawning!", + maybe_reason + ); + + // try to re-create the listener. If it's a port-bind issue, we will have already panicked on + // trying to start the NodeServer + let listener = crate::net::listener::Listener::new(self.port, myself.clone()); + + let (actor_ref, _) = Actor::spawn_linked(None, listener, myself.get_cell()) + .await + .expect("Failed to start listener"); + state.listener = actor_ref; + } else { + match state.node_sessions.entry(actor.get_id()) { + Entry::Occupied(o) => { + log::warn!( + "Node session {:?} exited with '{:?}'", + o.get().peer_name, + maybe_reason + ); + o.remove(); + } + Entry::Vacant(_) => { + log::warn!( + "An unknown actor ({:?}) exited with '{:?}'", + actor.get_id(), + maybe_reason + ); + } + } + } + } + _ => { + //no-op + } + } + } +} diff --git a/ractor-cluster/src/node/node_session.rs b/ractor-cluster/src/node/node_session.rs new file mode 100644 index 00000000..5968045e --- /dev/null +++ b/ractor-cluster/src/node/node_session.rs @@ -0,0 +1,398 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! A [NodeSession] is an individual connection between a specific pair of +//! `node()`s and all of its authentication and communication for that +//! pairing + +use ractor::rpc::CallResult; +use ractor::{Actor, ActorId, ActorRef, SupervisionEvent}; +use tokio::time::Duration; + +use super::{auth, NodeServer}; +use crate::net::session::SessionMessage; +use crate::protocol::auth as auth_protocol; +use crate::protocol::node as node_protocol; + +enum AuthenticationState { + AsClient(auth::ClientAuthenticationProcess), + AsServer(auth::ServerAuthenticationProcess), +} + +impl AuthenticationState { + fn is_ok(&self) -> bool { + match self { + Self::AsClient(c) => matches!(c, auth::ClientAuthenticationProcess::Ok), + Self::AsServer(s) => matches!(s, auth::ServerAuthenticationProcess::Ok(_)), + } + } + + fn is_close(&self) -> bool { + match self { + Self::AsClient(c) => matches!(c, auth::ClientAuthenticationProcess::Close), + Self::AsServer(s) => matches!(s, auth::ServerAuthenticationProcess::Close), + } + } +} + +/// Represents a session with a specific node +pub struct NodeSession { + node_id: u64, + is_server: bool, + cookie: String, + node_server: ActorRef, + node_name: auth_protocol::NameMessage, +} + +impl NodeSession { + /// Construct a new [NodeSession] with the supplied + /// arguments + pub fn new( + node_id: u64, + is_server: bool, + cookie: String, + node_server: ActorRef, + node_name: auth_protocol::NameMessage, + ) -> Self { + Self { + node_id, + is_server, + cookie, + node_server, + node_name, + } + } +} + +impl NodeSession { + async fn handle_auth( + &self, + state: &mut NodeSessionState, + message: auth_protocol::AuthenticationMessage, + myself: ActorRef, + ) { + if state.auth.is_ok() { + // nothing to do, we're already authenticated + return; + } + if state.auth.is_close() { + // we need to shutdown, the session needs to be terminated + myself.stop(Some("auth_fail".to_string())); + if let Some(tcp) = &state.tcp { + tcp.stop(Some("auth_fail".to_string())); + } + } + + match &state.auth { + AuthenticationState::AsClient(client_auth) => { + let mut next = client_auth.next(message, &self.cookie); + match &next { + auth::ClientAuthenticationProcess::WaitingForServerChallenge(server_status) => { + match server_status.status() { + auth_protocol::server_status::Status::Ok => { + // this handshake will continue + } + auth_protocol::server_status::Status::OkSimultaneous => { + // this handshake will continue, but there is another handshake underway + // that will be shut down (i.e. this was a server connection and we're currently trying + // a client connection) + } + auth_protocol::server_status::Status::NotOk => { + // The handshake will not continue, as there's already another client handshake underway + // which itself initiated (Simultaneous connect where the other connection's name is > this node + // name) + next = auth::ClientAuthenticationProcess::Close; + } + auth_protocol::server_status::Status::NotAllowed => { + // unspecified auth reason + next = auth::ClientAuthenticationProcess::Close; + } + auth_protocol::server_status::Status::Alive => { + // A connection to the node is already alive, which means either the + // node is confused in its connection state or the previous TCP connection is + // breaking down. Send ClientStatus + // TODO: check the status properly + state.tcp_send_auth(auth_protocol::AuthenticationMessage { + msg: Some( + auth_protocol::authentication_message::Msg::ClientStatus( + auth_protocol::ClientStatus { status: true }, + ), + ), + }); + } + } + } + auth::ClientAuthenticationProcess::WaitingForServerChallengeAck( + server_challenge_value, + reply_to_server, + our_challenge, + _expected_digest, + ) => { + // record the name + state.name = Some(auth_protocol::NameMessage { + name: server_challenge_value.name.clone(), + flags: server_challenge_value.flags.clone(), + }); + // tell the node server that we now know this peer's name information + let _ = + self.node_server + .cast(super::SessionManagerMessage::UpdateSession { + actor_id: myself.get_id(), + name: self.node_name.clone(), + }); + // send the client challenge to the server + let reply = auth_protocol::AuthenticationMessage { + msg: Some(auth_protocol::authentication_message::Msg::ClientChallenge( + auth_protocol::ChallengeReply { + digest: reply_to_server.to_vec(), + challenge: *our_challenge, + }, + )), + }; + state.tcp_send_auth(reply); + } + _ => { + // no message to send + } + } + + if let auth::ClientAuthenticationProcess::Close = &next { + myself.stop(Some("auth_fail".to_string())); + } + state.auth = AuthenticationState::AsClient(next); + } + AuthenticationState::AsServer(server_auth) => { + let mut next = server_auth.next(message, &self.cookie); + + match &next { + auth::ServerAuthenticationProcess::HavePeerName(peer_name) => { + // store the peer node's name in the session state + state.name = Some(peer_name.clone()); + + // send the status message, followed by the server's challenge + let server_status_result = self + .node_server + .call( + |tx| super::SessionManagerMessage::CheckSession { + peer_name: peer_name.clone(), + reply: tx, + }, + Some(Duration::from_millis(500)), + ) + .await; + match server_status_result { + Err(_) | Ok(CallResult::Timeout) | Ok(CallResult::SenderError) => { + next = auth::ServerAuthenticationProcess::Close; + } + Ok(CallResult::Success(reply)) => { + let server_status: auth_protocol::server_status::Status = + reply.into(); + // Send the server's status message + let status_msg = auth_protocol::AuthenticationMessage { + msg: Some( + auth_protocol::authentication_message::Msg::ServerStatus( + auth_protocol::ServerStatus { + status: server_status.into(), + }, + ), + ), + }; + state.tcp_send_auth(status_msg); + + match server_status { + auth_protocol::server_status::Status::Ok + | auth_protocol::server_status::Status::OkSimultaneous => { + // Good to proceed, start a challenge + next = next.start_challenge(&self.cookie); + if let auth::ServerAuthenticationProcess::WaitingOnClientChallengeReply( + challenge, + _digest, + ) = &next + { + let challenge_msg = auth_protocol::AuthenticationMessage { + msg: Some( + auth_protocol::authentication_message::Msg::ServerChallenge( + auth_protocol::Challenge { + name: self.node_name.name.clone(), + flags: self.node_name.flags.clone(), + challenge: *challenge, + }, + ), + ), + }; + state.tcp_send_auth(challenge_msg); + } + } + auth_protocol::server_status::Status::NotOk + | auth_protocol::server_status::Status::NotAllowed => { + next = auth::ServerAuthenticationProcess::Close; + } + auth_protocol::server_status::Status::Alive => { + // we sent the `Alive` status, so we're waiting on the client to confirm their status + // before continuing + next = auth::ServerAuthenticationProcess::WaitingOnClientStatus; + } + } + } + } + } + auth::ServerAuthenticationProcess::Ok(digest) => { + let client_challenge_reply = auth_protocol::AuthenticationMessage { + msg: Some(auth_protocol::authentication_message::Msg::ServerAck( + auth_protocol::ChallengeAck { + digest: digest.to_vec(), + }, + )), + }; + state.tcp_send_auth(client_challenge_reply); + } + _ => { + // no message to send + } + } + + if let auth::ServerAuthenticationProcess::Close = &next { + myself.stop(Some("auth_fail".to_string())); + } + state.auth = AuthenticationState::AsServer(next); + } + } + } + + fn handle_node( + &self, + _state: &mut NodeSessionState, + _message: node_protocol::NodeMessage, + _myself: ActorRef, + ) { + } +} + +/// The state of the node session +pub struct NodeSessionState { + tcp: Option>, + name: Option, + auth: AuthenticationState, +} + +impl NodeSessionState { + fn is_tcp_actor(&self, actor: ActorId) -> bool { + self.tcp + .as_ref() + .map(|t| t.get_id() == actor) + .unwrap_or(false) + } + + fn tcp_send_auth(&self, msg: auth_protocol::AuthenticationMessage) { + if let Some(tcp) = &self.tcp { + let net_msg = crate::protocol::NetworkMessage { + message: Some(crate::protocol::meta::network_message::Message::Auth(msg)), + }; + let _ = tcp.cast(SessionMessage::Send(net_msg)); + } + } + + fn tcp_send_node(&self, msg: node_protocol::NodeMessage) { + if let Some(tcp) = &self.tcp { + let net_msg = crate::protocol::NetworkMessage { + message: Some(crate::protocol::meta::network_message::Message::Node(msg)), + }; + let _ = tcp.cast(SessionMessage::Send(net_msg)); + } + } +} + +#[async_trait::async_trait] +impl Actor for NodeSession { + type Msg = super::SessionMessage; + type State = NodeSessionState; + async fn pre_start(&self, _myself: ActorRef) -> Self::State { + Self::State { + tcp: None, + name: None, + auth: if self.is_server { + AuthenticationState::AsServer(auth::ServerAuthenticationProcess::init()) + } else { + AuthenticationState::AsClient(auth::ClientAuthenticationProcess::init()) + }, + } + } + + async fn handle(&self, myself: ActorRef, message: Self::Msg, state: &mut Self::State) { + match message { + Self::Msg::SetTcpStream(stream) if state.tcp.is_none() => { + let addr = stream.peer_addr().expect("Failed to get peer address"); + // startup the TCP socket handler for message write + reading + let actor = crate::net::session::Session::spawn_linked( + myself.clone(), + stream, + addr, + myself.get_cell(), + ) + .await + .expect("Failed to spawn TCP session"); + state.tcp = Some(actor); + } + Self::Msg::MessageReceived(maybe_network_message) if state.tcp.is_some() => { + if let Some(network_message) = maybe_network_message.message { + match network_message { + crate::protocol::meta::network_message::Message::Auth(auth_message) => { + self.handle_auth(state, auth_message, myself).await; + } + crate::protocol::meta::network_message::Message::Node(node_message) => { + self.handle_node(state, node_message, myself); + } + } + } + } + _ => { + // no-op, ignore + } + } + } + + async fn handle_supervisor_evt( + &self, + myself: ActorRef, + message: SupervisionEvent, + state: &mut Self::State, + ) { + match message { + SupervisionEvent::ActorPanicked(actor, msg) => { + if state.is_tcp_actor(actor.get_id()) { + log::error!( + "Node session {:?}'s TCP session panicked with '{}'", + state.name, + msg + ); + myself.stop(Some("tcp_session_err".to_string())); + } else { + // TODO: handle other actors + log::warn!( + "Node session {:?} recieved an unknown child panic of '{}'", + state.name, + msg + ); + } + } + SupervisionEvent::ActorTerminated(actor, _, maybe_reason) => { + if state.is_tcp_actor(actor.get_id()) { + log::info!("Connection closed to node {:?}", state.name); + myself.stop(Some("tcp_session_closed".to_string())); + } else { + // TODO: handle other actors + log::debug!( + "Node session {:?} received a child exit with reason '{:?}'", + state.name, + maybe_reason + ); + } + } + _ => { + //no-op + } + } + } +} diff --git a/ractor-cluster/src/protocol/auth.proto b/ractor-cluster/src/protocol/auth.proto new file mode 100644 index 00000000..9240df88 --- /dev/null +++ b/ractor-cluster/src/protocol/auth.proto @@ -0,0 +1,116 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +// NOTE: We make heavy use of oneof syntax here in order to deal with a rust-like enum +// type. You can see https://developers.google.com/protocol-buffers/docs/proto3#oneof for +// the syntax and guide + +// The protocol messages defined here roughly follow the Erlang distributed systems guide +// found at: https://www.erlang.org/doc/apps/erts/erl_dist_protocol.html#distribution-handshake + +syntax = "proto3"; + +package auth; + +// Placeholder to represent a node's flags +message NodeFlags { + // The node version + uint32 version = 1; +} + +// A message containing the node's name +message NameMessage { + // The node's full name + // Format: `node_name@hostname` + string name = 1; + + // The node's capability flags + NodeFlags flags = 2; +} + +// Server -> Client: `SendStatus` is the server replying with the handshake status to the client +message ServerStatus { + // Status types + enum Status { + // The handshake will continue + OK = 0; + // The handshake will continue, but A is informed that B has another ongoing + // connection attempt that will be shut down (simultaneous connect where A's + // name is greater than B's name, compared literally). + OK_SIMULTANEOUS = 1; + // The handshake will not continue, as B already has an ongoing handshake, which + // it itself has initiated (simultaneous connect where B's name is greater than A's). + NOT_OK = 2; + // The connection is disallowed for some (unspecified) security reason. + NOT_ALLOWED = 3; + // A connection to the node is already active, which either means that node A is confused + // or that the TCP connection breakdown of a previous node with this name has not yet + // reached node B. + ALIVE = 4; + + // Skipped NAMED = 5; + } + + // The status + Status status = 1; +} + +// The client's status reply if the `ServerStatus` was ALIVE +// +// If status was alive, node A answers with another status message containing either true, +// which means that the connection is to continue (the old connection from this node is +// broken), or false, which means that the connection is to be closed (the connection +// attempt was a mistake. +message ClientStatus { + // The status + bool status = 1; +} + +// The server's initial challenge request +message Challenge { + // The server's name + string name = 1; + // The node's capability flags + NodeFlags flags = 2; + // The challenge value + uint32 challenge = 3; +} + +// The reply to the server's challenge. +message ChallengeReply { + // The client's own challenge for the server to handle + uint32 challenge = 1; + // An MD5 digest that the client constructed from the server's + // challenge value + bytes digest = 2; +} + +// The server's reply to the client about their own +// challenge +message ChallengeAck { + // Another MD5 digest that the server constructed from the + // client's challenge value + bytes digest = 1; +} + +// A authentication message +message AuthenticationMessage { + // The inner message type + oneof msg { + // Send the name + NameMessage name = 1; + // Send the status + ServerStatus server_status = 2; + // Send the client status + ClientStatus client_status = 3; + // Server's challenge to the client + Challenge server_challenge = 4; + // Client's reply to server's challenge and + // client's own challenge to the server + ChallengeReply client_challenge = 5; + // Server's reply to the client's challenge + ChallengeAck server_ack = 6; + } +} \ No newline at end of file diff --git a/ractor-cluster/src/protocol/meta.proto b/ractor-cluster/src/protocol/meta.proto new file mode 100644 index 00000000..89c275d9 --- /dev/null +++ b/ractor-cluster/src/protocol/meta.proto @@ -0,0 +1,23 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + + +syntax = "proto3"; + +import "auth.proto"; +import "node.proto"; + +package meta; + +// Represents a message over the network +message NetworkMessage { + // The inner message + oneof message { + // An authentication message + auth.AuthenticationMessage auth = 1; + // An inter-node message + node.NodeMessage node = 2; + } +} \ No newline at end of file diff --git a/ractor-cluster/src/protocol/mod.rs b/ractor-cluster/src/protocol/mod.rs new file mode 100644 index 00000000..c0f61983 --- /dev/null +++ b/ractor-cluster/src/protocol/mod.rs @@ -0,0 +1,24 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Protobuf specifications for over-the-wire intercommuncation +//! between nodes. Generated via [prost] + +/// Node authentication protocol +pub mod auth { + include!(concat!(env!("OUT_DIR"), "/auth.rs")); +} + +/// Node inter-communication protocol +pub mod node { + include!(concat!(env!("OUT_DIR"), "/node.rs")); +} + +/// Meta types which include all base network protocol message types +pub mod meta { + include!(concat!(env!("OUT_DIR"), "/meta.rs")); +} + +pub use meta::NetworkMessage; diff --git a/ractor-cluster/src/protocol/node.proto b/ractor-cluster/src/protocol/node.proto new file mode 100644 index 00000000..8a0fb675 --- /dev/null +++ b/ractor-cluster/src/protocol/node.proto @@ -0,0 +1,22 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +// NOTE: We make heavy use of oneof syntax here in order to deal with a rust-like enum +// type. You can see https://developers.google.com/protocol-buffers/docs/proto3#oneof for +// the syntax and guide + +syntax = "proto3"; + +package node; + + + + +// A placeholder message representing an +// authenticated inter-ndoe message +message NodeMessage { + // Some value + int32 value = 1; +} \ No newline at end of file diff --git a/ractor/Cargo.toml b/ractor/Cargo.toml index b44f723b..fa49c0b5 100644 --- a/ractor/Cargo.toml +++ b/ractor/Cargo.toml @@ -12,16 +12,19 @@ readme = "../README.md" homepage = "https://github.com/slawlor/ractor" categories = ["actor", "erlang"] +[features] # WIP -# [features] # tokio_runtime = ["tokio/time"] # async_std_runtime = ["async-std"] # default = ["tokio_runtime"] # default = ["async_std_runtime"] +cluster = [] +default = [] + [dependencies] -async-std = { version = "1", optional = true } +## Required dependencies async-trait = "0.1" dashmap = "5" futures = "0.3" diff --git a/ractor/benches/actor.rs b/ractor/benches/actor.rs index 4dc71e0f..f3fd6036 100644 --- a/ractor/benches/actor.rs +++ b/ractor/benches/actor.rs @@ -95,7 +95,7 @@ fn schedule_work(c: &mut Criterion) { }) }, |mut handles| { - runtime.block_on(async move { while let Some(_) = handles.join_next().await {} }) + runtime.block_on(async move { while handles.join_next().await.is_some() {} }) }, BatchSize::PerIteration, ); @@ -118,7 +118,7 @@ fn schedule_work(c: &mut Criterion) { }) }, |mut handles| { - runtime.block_on(async move { while let Some(_) = handles.join_next().await {} }) + runtime.block_on(async move { while handles.join_next().await.is_some() {} }) }, BatchSize::PerIteration, ); diff --git a/ractor/examples/philosophers.rs b/ractor/examples/philosophers.rs index 1b456a0f..1f3de975 100644 --- a/ractor/examples/philosophers.rs +++ b/ractor/examples/philosophers.rs @@ -494,7 +494,7 @@ async fn main() { } // wait for everything to shut down - while let Some(_) = all_handles.join_next().await {} + while all_handles.join_next().await.is_some() {} // print metrics println!("Simulation results"); diff --git a/ractor/src/actor/actor_cell/actor_properties.rs b/ractor/src/actor/actor_cell/actor_properties.rs index 50c0830b..ac878049 100644 --- a/ractor/src/actor/actor_cell/actor_properties.rs +++ b/ractor/src/actor/actor_cell/actor_properties.rs @@ -6,7 +6,7 @@ use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::Arc; -use crate::concurrency as mpsc; +use crate::{concurrency as mpsc, Message}; use crate::actor::messages::{BoxedMessage, StopMessage}; use crate::actor::supervision::SupervisionTree; @@ -96,7 +96,7 @@ impl ActorProperties { return Err(MessagingErr::InvalidActorType); } - let boxed = BoxedMessage::new(message); + let boxed = message.box_message(&self.id); self.message.send(boxed).map_err(|e| e.into()) } diff --git a/ractor/src/actor/messages.rs b/ractor/src/actor/messages.rs index 79d6372f..822b30d6 100644 --- a/ractor/src/actor/messages.rs +++ b/ractor/src/actor/messages.rs @@ -12,7 +12,7 @@ use std::any::Any; use std::fmt::Debug; -use crate::{Message, State}; +use crate::State; /// An error downcasting a boxed item to a strong type #[derive(Debug)] @@ -22,38 +22,9 @@ pub struct BoxedDowncastErr; /// but generic so it can be passed around without type /// constraints pub struct BoxedMessage { - /// The message value - pub msg: Option>, -} - -impl BoxedMessage { - /// Create a new [BoxedMessage] from a strongly-typed message - pub fn new(msg: T) -> Self - where - T: Message, - { - Self { - msg: Some(Box::new(msg)), - } - } - - /// Try and take the resulting message as a specific type, consumes - /// the boxed message - pub fn take(&mut self) -> Result - where - T: Message, - { - match self.msg.take() { - Some(m) => { - if m.is::() { - Ok(*m.downcast::().unwrap()) - } else { - Err(BoxedDowncastErr) - } - } - None => Err(BoxedDowncastErr), - } - } + pub(crate) msg: Option>, + #[cfg(feature = "cluster")] + pub(crate) serialized_msg: Option>, } /// A "boxed" message denoting a strong-type message diff --git a/ractor/src/actor/mod.rs b/ractor/src/actor/mod.rs index 48c1ebad..b0117219 100644 --- a/ractor/src/actor/mod.rs +++ b/ractor/src/actor/mod.rs @@ -98,7 +98,8 @@ pub trait Actor: Sized + Sync + Send + 'static { async fn handle(&self, myself: ActorRef, message: Self::Msg, state: &mut Self::State) {} /// Handle the incoming supervision event. Unhandled panicks will captured and - /// sent the the supervisor(s) + /// sent the the supervisor(s). The default supervision behavior is to ignore all + /// child events. To override this behavior, implement this method. /// /// * `myself` - A handle to the [ActorCell] representing this actor /// * `message` - The message to process @@ -411,10 +412,10 @@ where myself: ActorRef, state: &mut TState, handler: Arc, - mut msg: BoxedMessage, + msg: BoxedMessage, ) { // panic in order to kill the actor - let typed_msg = match msg.take() { + let typed_msg = match TMsg::from_boxed(msg) { Ok(m) => m, Err(_) => { panic!( diff --git a/ractor/src/actor_id.rs b/ractor/src/actor_id.rs index cf386fcd..6abfc112 100644 --- a/ractor/src/actor_id.rs +++ b/ractor/src/actor_id.rs @@ -15,7 +15,12 @@ pub enum ActorId { Local(u64), /// A remote actor on another system (system, id) - Remote(u64, u64), + Remote { + /// The remote node id + node_id: u64, + /// The local id on the remote system + pid: u64, + }, } impl ActorId { @@ -31,7 +36,7 @@ impl Display for ActorId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { ActorId::Local(id) => write!(f, "0.{}", id), - ActorId::Remote(system_id, id) => write!(f, "{}.{}", system_id, id), + ActorId::Remote { node_id, pid } => write!(f, "{}.{}", node_id, pid), } } } @@ -39,17 +44,23 @@ impl Display for ActorId { /// The local id allocator for actors static ACTOR_ID_ALLOCATOR: AtomicU64 = AtomicU64::new(0u64); -/// Retreiev a new local id +/// Retrieve a new local id pub(crate) fn get_new_local_id() -> ActorId { ActorId::Local(ACTOR_ID_ALLOCATOR.fetch_add(1, std::sync::atomic::Ordering::AcqRel)) } +/// Create a new actor id for an actor on a remote `node()` +#[cfg(feature = "cluster")] +pub fn new_remote_id(node_id: u64, pid: u64) -> ActorId { + ActorId::Remote { node_id, pid } +} + impl ActorId { /// Retrieve the PID of the actor, ignoring local/remote properties pub fn get_pid(&self) -> u64 { match self { ActorId::Local(pid) => *pid, - ActorId::Remote(_, pid) => *pid, + ActorId::Remote { pid, .. } => *pid, } } } diff --git a/ractor/src/lib.rs b/ractor/src/lib.rs index 08b88ebf..ff3e3bb2 100644 --- a/ractor/src/lib.rs +++ b/ractor/src/lib.rs @@ -153,18 +153,18 @@ pub mod registry; pub mod rpc; pub mod time; +// #[cfg(feature = "cluster")] +// pub mod remote; + #[cfg(test)] mod tests; +use actor::messages::{BoxedDowncastErr, BoxedMessage}; #[cfg(test)] use criterion as _; #[cfg(test)] use rand as _; -// WIP -// #[cfg(feature = "remote")] -// pub mod distributed; - // re-exports pub use actor::actor_cell::{ActorCell, ActorRef, ActorStatus, ACTIVE_STATES}; pub use actor::errors::{ActorErr, MessagingErr, SpawnErr}; @@ -187,8 +187,90 @@ pub use port::{OutputMessage, OutputPort, RpcReplyPort}; /// PrintName, /// } /// ``` -pub trait Message: Any + Send + 'static {} -impl Message for T {} +pub trait Message: Any + Send + Sized + 'static { + /// Convert a [BoxedMessage] to this concrete type + #[cfg(feature = "cluster")] + fn from_boxed(mut m: BoxedMessage) -> Result { + if m.msg.is_some() { + match m.msg.take() { + Some(m) => { + if m.is::() { + Ok(*m.downcast::().unwrap()) + } else { + Err(BoxedDowncastErr) + } + } + _ => { + Err(BoxedDowncastErr) + } + } + } else if m.serialized_msg.is_some() { + match m.serialized_msg.take() { + Some(m) => { + Self::deserialize(m) + } + _ => { + Err(BoxedDowncastErr) + } + } + } else { + Err(BoxedDowncastErr) + } + } + + /// Convert a [BoxedMessage] to this concrete type + #[cfg(not(feature = "cluster"))] + fn from_boxed(mut m: BoxedMessage) -> Result { + match m.msg.take() { + Some(m) => { + if m.is::() { + Ok(*m.downcast::().unwrap()) + } else { + Err(BoxedDowncastErr) + } + } + _ => { + Err(BoxedDowncastErr) + } + } + } + + /// Convert this message to a [BoxedMessage] + #[cfg(feature = "cluster")] + fn box_message(self, pid: &ActorId) -> BoxedMessage { + if Self::serializable() && !pid.is_local() { + // it's a message to a remote actor, serialize it and send it over the wire! + BoxedMessage { msg: None, serialized_msg: Some(self.serialize()) } + } else { + BoxedMessage { msg: Some(Box::new(self)), serialized_msg: None } + } + } + + /// Convert this message to a [BoxedMessage] + #[cfg(not(feature = "cluster"))] + fn box_message(self, _pid: &ActorId) -> BoxedMessage { + BoxedMessage { msg: Some(Box::new(self)) } + } + + /// Determines if this type is serializable + #[cfg(feature = "cluster")] + fn serializable() -> bool { + false + } + + /// Serializes this message (if supported) + #[cfg(feature = "cluster")] + fn serialize(&self) -> Vec { + vec![] + } + + /// Deserialize binary data to this message type + #[cfg(feature = "cluster")] + fn deserialize(_bytes: Vec) -> Result { + Err(BoxedDowncastErr) + } +} +impl Message for T {} /// Represents the state of an actor. Must be safe /// to send between threads (same bounds as a [Message]) diff --git a/ractor/src/remote/mod.rs b/ractor/src/remote/mod.rs new file mode 100644 index 00000000..1c5147d7 --- /dev/null +++ b/ractor/src/remote/mod.rs @@ -0,0 +1,35 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Remote actor support + +use std::any::Any; + +use crate::{ + actor::messages::{MessageChannelTrait, LocalMessage, BoxedDowncastErr}, + ActorId, +}; + +// Automatically implement the [RemotableMessage] trait for +// prost (protobuf) encoded messages +impl Message for T { + fn serializable() -> bool { + true + } + + fn serialize(&self) -> Vec { + let length = self.encoded_len(); + let mut buf = Vec::with_capacity(length); + self.encode(&mut buf) + .expect("Failed to encode message to binary bytes"); + buf + } + + fn deserialize(buf: Vec) -> Result { + let bytes = bytes::Bytes::from(buf); + T::decode(bytes) + .map_err(|prost_err| format!("Failed to decode message with error '{}'", prost_err)) + } +}