diff --git a/Cargo.toml b/Cargo.toml index 0515a6f6..eae5cf38 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ members = [ "ractor", + "ractor-cluster", "ractor-playground", "xtask" ] diff --git a/ractor-cluster/Cargo.toml b/ractor-cluster/Cargo.toml new file mode 100644 index 00000000..0ab20cf9 --- /dev/null +++ b/ractor-cluster/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "ractor-cluster" +version = "0.4.0" +authors = ["Sean Lawlor", "Evan Au", "Dillon George"] +description = "Distributed cluster environment of Ractor actors" +documentation = "https://docs.rs/ractor" +license = "MIT" +edition = "2018" +keywords = ["actor", "ractor", "cluster"] +repository = "https://github.com/slawlor/ractor" +readme = "../README.md" +homepage = "https://github.com/slawlor/ractor" +categories = ["actor", "erlang"] +build = "src/build.rs" + +[build-dependencies] +protobuf-src = "1" +prost-build = { version = "0.11" } + +[dependencies] +## Required dependencies +async-trait = "0.1" +bytes = { version = "1" } +# dashmap = "5" +# futures = "0.3" +log = "0.4" +# once_cell = "1" +prost = { version = "0.11" } +ractor = { version = "0.4", features = ["cluster"], path = "../ractor" } +tokio = { version = "1", features = ["rt", "time", "sync", "macros", "net"]} + +## Optional dependencies +# tokio-rustls = { version = "0.23", optional = true } + +[dev-dependencies] +tokio = { version = "1", features = ["rt", "time", "sync", "macros", "rt-multi-thread"] } diff --git a/ractor-cluster/src/build.rs b/ractor-cluster/src/build.rs new file mode 100644 index 00000000..c2362ba6 --- /dev/null +++ b/ractor-cluster/src/build.rs @@ -0,0 +1,31 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! This is the pre-compilation build script for the crate `ractor` when running in distrubted +//! mode. It's used to compile protobuf into Rust code prior to compilation. + +/// The shared-path for all protobuf specifications +const PROTOBUF_BASE_DIRECTORY: &str = "src/protocol"; +/// The list of protobuf files to generate inside PROBUF_BASE_DIRECTORY +const PROTOBUF_FILES: [&str; 1] = ["meta"]; //"node", "auth", + +fn build_protobufs() { + std::env::set_var("PROTOC", protobuf_src::protoc()); + + let mut protobuf_files = Vec::with_capacity(PROTOBUF_FILES.len()); + + for file in PROTOBUF_FILES.iter() { + let proto_file = format!("{}/{}.proto", PROTOBUF_BASE_DIRECTORY, file); + println!("cargo:rerun-if-changed={}", proto_file); + protobuf_files.push(proto_file); + } + + prost_build::compile_protos(&protobuf_files, &[PROTOBUF_BASE_DIRECTORY]).unwrap(); +} + +fn main() { + // compile the spec files into Rust code + build_protobufs(); +} diff --git a/ractor/src/distributed/mod.rs b/ractor-cluster/src/lib.rs similarity index 64% rename from ractor/src/distributed/mod.rs rename to ractor-cluster/src/lib.rs index acdf01ea..904813b2 100644 --- a/ractor/src/distributed/mod.rs +++ b/ractor-cluster/src/lib.rs @@ -4,33 +4,37 @@ // LICENSE-MIT file in the root directory of this source tree. //! Support for remote nodes in a distributed cluster. -//! +//! //! A node is the same as [Erlang's definition](https://www.erlang.org/doc/reference_manual/distributed.html) //! for distributed Erlang, in that it's a remote "hosting" process in the distributed pool of processes. -//! +//! //! In this realization, nodes are simply actors which handle an external connection to the other nodes in the pool. //! When nodes connect, they identify all of the nodes the remote node is also connected to and additionally connect //! to them as well. They merge registries and pg groups together in order to create larger clusters of services. //! -//! For messages to be transmittable across the [Node] boundaries to other [Node]s in the pool, they need to be -//! serializable to a binary format (say protobuf) +//! We have chosen protobuf for our inter-node defined protocol, however you can chose whatever medium you like +//! for binary serialization + deserialization. The "remote" actor will simply encode your message type and send it +//! over the wire for you + +#![deny(warnings)] +#![warn(unused_imports)] +#![warn(unsafe_code)] +#![warn(missing_docs)] +#![warn(unused_crate_dependencies)] +#![cfg_attr(docsrs, feature(doc_cfg))] + +pub mod net; +pub mod node; +pub mod protocol; -use dashmap::DashMap; +/// Node's are representing by an integer id +pub type NodeId = u64; /// Represents messages that can cross the node boundary which can be serialized and sent over the wire -pub trait NodeSerializableMessage { +pub trait SerializableMessage { /// Serialize the message to binary - fn serialize(&self) -> &[u8]; + fn serialize(&self) -> Vec; /// Deserialize from binary back into the message type fn deserialize(&self, data: &[u8]) -> Self; } - -/// The identifier of a node is a globally unique u64 -pub type NodeId = u64; - -/// A node in the distributed compute pool. -pub struct Node { - node_id: u64, - other_nodes: DashMap, -} \ No newline at end of file diff --git a/ractor-cluster/src/net/listener.rs b/ractor-cluster/src/net/listener.rs new file mode 100644 index 00000000..a5bd246a --- /dev/null +++ b/ractor-cluster/src/net/listener.rs @@ -0,0 +1,179 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! TCP Server to accept incoming sessions + +use std::collections::HashMap; +use std::{collections::hash_map::Entry, marker::PhantomData}; + +use ractor::rpc::CallResult; +use ractor::{Actor, ActorId, ActorRef, SupervisionEvent}; +use tokio::net::TcpListener; +use tokio::time::Duration; + +use super::NetworkMessage; + +/// A Tcp Socket listener responsible for accepting new connections and spawning [super::session::Session]s +/// which handle the message sending and receiving over the socket. +pub struct Listener +where + TSessionManager: Actor>, + TMsg: NetworkMessage, + TSessionHandler: Actor, +{ + port: super::NetworkPort, + session_manager: ActorRef, + _phantom: PhantomData, + _session_handler: PhantomData, +} + +impl Listener +where + TSessionManager: Actor>, + TMsg: NetworkMessage, + TSessionHandler: Actor, +{ + /// Create a new `Listener` + pub fn new(port: super::NetworkPort, session_manager: ActorRef) -> Self { + Self { + port, + session_manager, + _phantom: PhantomData, + _session_handler: PhantomData, + } + } +} + +/// The Node listener's state +pub struct ListenerState { + listener: TcpListener, + nodes: HashMap, +} + +#[async_trait::async_trait] +impl Actor + for Listener +where + TSessionManager: Actor>, + TMsg: NetworkMessage, + TSessionHandler: Actor, +{ + type Msg = (); + + type State = ListenerState; + + async fn pre_start(&self, myself: ActorRef) -> Self::State { + let addr = format!("0.0.0.0:{}", self.port); + let listener = match TcpListener::bind(&addr).await { + Ok(l) => l, + Err(err) => { + panic!("Error listening to socket: {}", err); + } + }; + + // startup the event processing loop by sending an initial msg + let _ = myself.cast(()); + + // create the initial state + Self::State { + listener, + nodes: HashMap::new(), + } + } + + async fn handle(&self, myself: ActorRef, _message: Self::Msg, state: &mut Self::State) { + match state.listener.accept().await { + Ok((stream, addr)) => { + // ask the session manager for a new session agent + let session = self + .session_manager + .call( + super::SessionManagerMessage::SessionOpened, + Some(Duration::from_millis(500)), + ) + .await + .unwrap(); + match session { + CallResult::Timeout => { + log::warn!("Timeout in trying to open session. Failed to retrieve a new Session handler from the SessionManager in {} ms. Refusing connection", 500); + } + CallResult::SenderError => { + log::error!("Sender error when trying to receive session handler"); + myself.stop(Some("Session handler retrieval failure".to_string())); + } + CallResult::Success(session_handler) => { + // Spawn off the connection management actor and make me the supervisor of it + if let Some(actor) = + super::session::Session::::spawn_linked( + session_handler, + stream, + myself.get_cell(), + ) + .await + { + state.nodes.insert(actor.get_id(), addr); + } + } + } + } + Err(socket_accept_error) => { + log::warn!( + "Error accepting socket {} on Node server", + socket_accept_error + ); + } + } + + // continue accepting new sockets + let _ = myself.cast(()); + } + + async fn handle_supervisor_evt( + &self, + _myself: ActorRef, + message: SupervisionEvent, + state: &mut Self::State, + ) { + // sockets open, they close, the world goes round... + match message { + SupervisionEvent::ActorPanicked(actor, msg) => { + match state.nodes.entry(actor.get_id()) { + Entry::Occupied(o) => { + log::error!("Connection with {} panicked with message: {}", o.get(), msg); + o.remove(); + } + Entry::Vacant(_) => { + log::error!("Connection with ([unknown]) panicked with message: {}", msg); + } + } + } + SupervisionEvent::ActorTerminated(actor, _, _) => { + match state.nodes.entry(actor.get_id()) { + Entry::Occupied(o) => { + log::error!("Connection closed with {}", o.get()); + o.remove(); + } + Entry::Vacant(_) => { + log::error!("Connection with ([unknown]) closed"); + } + } + let _ = self + .session_manager + .cast(super::SessionManagerMessage::SessionClosed(actor.get_id())); + } + SupervisionEvent::ActorStarted(actor) => match state.nodes.entry(actor.get_id()) { + Entry::Occupied(o) => { + log::error!("Connection opened with {}", o.get()); + } + Entry::Vacant(_) => { + log::error!("Connection with ([unknown]) opened"); + } + }, + _ => { + //no-op + } + } + } +} diff --git a/ractor-cluster/src/net/mod.rs b/ractor-cluster/src/net/mod.rs new file mode 100644 index 00000000..5edbb7e1 --- /dev/null +++ b/ractor-cluster/src/net/mod.rs @@ -0,0 +1,37 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! TCP server and session actors which transmit [prost::Message] encoded messages + +// TODO: we need a way to identify which session messages are coming from + going to. Therefore +// we should actually have a notification when a new session is launched, which can be used +// to match which session is tied to which actor id + +use ractor::{Actor, ActorId, ActorRef, RpcReplyPort}; + +pub mod listener; +pub mod session; + +/// Messages to/from the session aggregator +pub enum SessionManagerMessage +where + TSessionHandler: Actor, + TMsg: NetworkMessage, +{ + /// Notification when a new session is opened, and the handle to communicate with it + /// Returns the actor which will be responsible for handling messages on this session + SessionOpened(RpcReplyPort>), + + /// Notification when a session is closed, and the id of the actor to cleanup + SessionClosed(ActorId), +} + +/// A trait which implements [prost::Message], [Default], and has a static lifetime +/// denoting protobuf-encoded messages which can be transmitted over the wire +pub trait NetworkMessage: prost::Message + Default + 'static {} +impl NetworkMessage for T {} + +/// A network port +pub type NetworkPort = u16; diff --git a/ractor-cluster/src/net/session.rs b/ractor-cluster/src/net/session.rs new file mode 100644 index 00000000..33fab609 --- /dev/null +++ b/ractor-cluster/src/net/session.rs @@ -0,0 +1,353 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! TCP session actor which is managing the specific communication to a node + +// TODO: RUSTLS + Tokio : https://github.com/tokio-rs/tls/blob/master/tokio-rustls/examples/server/src/main.rs + +use std::marker::PhantomData; + +use bytes::Bytes; +use ractor::{Actor, ActorCell, ActorRef}; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; +use tokio::io::ErrorKind; +use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; +use tokio::net::TcpStream; + +use super::NetworkMessage; + +/// Helper method to read exactly `len` bytes from the stream into a pre-allocated buffer +/// of bytes +async fn read_n_bytes(stream: &mut OwnedReadHalf, len: usize) -> Result, tokio::io::Error> { + let mut buf = Vec::with_capacity(len); + let mut c_len = 0; + while c_len < len { + let n = stream.read(&mut buf[c_len..]).await?; + c_len += n; + } + Ok(buf) +} + +// ========================= Node Session actor ========================= // + +/// Represents a bi-directional tcp connection along with send + receive operations +pub struct Session +where + TSessionHandler: Actor, + TMsg: NetworkMessage, +{ + handler: ActorRef, +} + +impl Session +where + TSessionHandler: Actor, + TMsg: NetworkMessage, +{ + pub(crate) async fn spawn_linked( + handler: ActorRef, + stream: TcpStream, + supervisor: ActorCell, + ) -> Option> { + let actor = match Actor::spawn_linked(None, Session { handler }, supervisor).await { + Err(err) => { + log::error!("Failed to spawn session writer actor: {}", err); + None + } + Ok((a, _)) => { + // intiialize this actor & its children + let _ = a.cast(SessionMessage::SetStream(stream)); + // return the actor handle + Some(a) + } + }; + actor + } +} + +/// The node connection messages +pub enum SessionMessage +where + TMsg: NetworkMessage, +{ + /// Set the session's tcp stream, which initializes all underlying states + SetStream(TcpStream), + + /// Send a message over the channel + Send(TMsg), + + /// An object was received on the channel + ObjectAvailable(TMsg), +} + +/// The node session's state +pub struct SessionState +where + TSessionHandler: Actor, + TMsg: NetworkMessage, +{ + writer: ActorRef>, + reader: ActorRef>, +} + +#[async_trait::async_trait] +impl Actor for Session +where + TSessionHandler: Actor, + TMsg: NetworkMessage, +{ + type Msg = SessionMessage; + type State = SessionState; + + async fn pre_start(&self, _myself: ActorRef) -> Self::State { + // spawn writer + reader child actors + let (writer, _) = Actor::spawn_linked( + None, + SessionWriter:: { + _phantom: PhantomData, + }, + _myself.get_cell(), + ) + .await + .expect("Failed to start session writer"); + let (reader, _) = Actor::spawn_linked( + None, + SessionReader { + session: _myself.clone(), + }, + _myself.get_cell(), + ) + .await + .expect("Failed to start session reader"); + + // set the stream bits + + Self::State { writer, reader } + } + + async fn handle(&self, _myself: ActorRef, message: Self::Msg, state: &mut Self::State) { + match message { + Self::Msg::SetStream(stream) => { + let (read, write) = stream.into_split(); + // initialize the writer & reader state's + let _ = state.writer.cast(SessionWriterMessage::SetStream(write)); + let _ = state.reader.cast(SessionReaderMessage::SetStream(read)); + } + Self::Msg::Send(msg) => { + let _ = state.writer.cast(SessionWriterMessage::WriteObject(msg)); + } + Self::Msg::ObjectAvailable(msg) => { + let _ = self.handler.cast(msg); + } + } + } + + // TODO: fill out supervision tree logic! + // async fn handle_supervisor_evt(&self, _myself: ActorRef, _message: SupervisionEvent, _state: &mut Self::State) { + // // sockets open, they close, the world goes round... + // } +} + +// ========================= Node Session writer ========================= // + +struct SessionWriter +where + TMsg: NetworkMessage, +{ + _phantom: PhantomData, +} + +struct SessionWriterState { + writer: Option, +} + +enum SessionWriterMessage +where + TMsg: NetworkMessage, +{ + /// Set the stream, providing a [TcpStream], which + /// to utilize for this node's connection + SetStream(OwnedWriteHalf), + + /// Write an object over the wire + WriteObject(TMsg), +} + +#[async_trait::async_trait] +impl Actor for SessionWriter +where + TMsg: NetworkMessage, +{ + type Msg = SessionWriterMessage; + + type State = SessionWriterState; + + async fn pre_start(&self, _myself: ActorRef) -> Self::State { + // OK we've established connection, now we can process requests + + Self::State { writer: None } + } + + async fn handle(&self, _myself: ActorRef, message: Self::Msg, state: &mut Self::State) { + match message { + Self::Msg::SetStream(stream) => { + state.writer = Some(stream); + } + Self::Msg::WriteObject(msg) if state.writer.is_some() => { + if let Some(stream) = &mut state.writer { + stream.writable().await.unwrap(); + + let length = msg.encoded_len(); + + // encode the length into a buffer and first write those bytes + let mut length_buf = Vec::with_capacity(10); + prost::encode_length_delimiter(length, &mut length_buf).unwrap(); + if let Err(write_err) = stream.write_all(&length_buf).await { + log::warn!("Error writing to the stream: '{}'", write_err); + } else { + // Serialize the full object and write it over the wire + let mut buf = Vec::with_capacity(length); + msg.encode(&mut buf).unwrap(); + let mut write_buf = Bytes::from(buf); + if let Err(write_err) = stream.write_all_buf(&mut write_buf).await { + log::warn!("Error writing to the stream: '{}'", write_err); + } + } + } + } + _ => { + // no-op, wait for next send request + } + } + } +} + +// ========================= Node Session reader ========================= // + +struct SessionReader +where + TSessionHandler: Actor, + TMsg: NetworkMessage, +{ + session: ActorRef>, +} + +/// The node connection messages +pub enum SessionReaderMessage { + /// Set the stream, providing a [TcpStream], which + /// to utilize for this node's connection + SetStream(OwnedReadHalf), + + /// Wait for an object from the stream + WaitForObject, + + /// Read next object off the stream + ReadObject(usize), +} + +struct SessionReaderState { + reader: Option, +} + +#[async_trait::async_trait] +impl Actor for SessionReader +where + TSessionHandler: Actor, + TMsg: NetworkMessage, +{ + type Msg = SessionReaderMessage; + + type State = SessionReaderState; + + async fn pre_start(&self, _myself: ActorRef) -> Self::State { + Self::State { reader: None } + } + + async fn handle(&self, myself: ActorRef, message: Self::Msg, state: &mut Self::State) { + match message { + Self::Msg::SetStream(stream) => { + state.reader = Some(stream); + // wait for an incoming object + let _ = myself.cast(SessionReaderMessage::WaitForObject); + } + Self::Msg::WaitForObject if state.reader.is_some() => { + if let Some(stream) = &mut state.reader { + stream.readable().await.unwrap(); + match read_n_bytes(stream, 10).await { + Ok(buf) => { + let bytes = Bytes::from(buf); + match prost::decode_length_delimiter(bytes) { + Ok(protobuf_len) => { + let _ = + myself.cast(SessionReaderMessage::ReadObject(protobuf_len)); + return; + } + Err(decode_err) => { + log::warn!( + "Failed to decode protobuf object length with {}", + decode_err + ); + // continue processing + } + } + } + Err(err) if err.kind() == ErrorKind::UnexpectedEof => { + // EOF, close the stream by dropping the stream + drop(state.reader.take()); + myself.stop(Some("channel_closed".to_string())); + } + Err(_other_err) => { + // some other TCP error, more handling necessary + } + } + } + + let _ = myself.cast(SessionReaderMessage::WaitForObject); + } + Self::Msg::ReadObject(length) if state.reader.is_some() => { + if let Some(stream) = &mut state.reader { + match read_n_bytes(stream, length).await { + Ok(buf) => { + // NOTE: Our implementation writes 2 messages when sending something over the wire, the first + // is exactly 10 bytes which constitute the length of the payload message, followed by the payload. + // This tells our TCP reader how much data to read off the wire + + // [buf] here should contain the exact amount of data to decode an object properly. + let bytes = Bytes::from(buf); + match TMsg::decode(bytes) { + Ok(msg) => { + // we decoded a message, pass it up the chain + let _ = self.session.cast(SessionMessage::ObjectAvailable(msg)); + } + Err(decode_err) => { + log::error!( + "Error decoding network message: '{}'. Discarding", + decode_err + ); + } + } + } + Err(err) if err.kind() == ErrorKind::UnexpectedEof => { + // EOF, close the stream by dropping the stream + drop(state.reader.take()); + myself.stop(Some("channel_closed".to_string())); + } + Err(_other_err) => { + // TODO: some other TCP error, more handling necessary + } + } + } + + // we've read the object, now wait for next object + let _ = myself.cast(SessionReaderMessage::WaitForObject); + } + _ => { + // no stream is available, keep looping until one is available + let _ = myself.cast(SessionReaderMessage::WaitForObject); + } + } + } +} diff --git a/ractor-cluster/src/node/mod.rs b/ractor-cluster/src/node/mod.rs new file mode 100644 index 00000000..204bc430 --- /dev/null +++ b/ractor-cluster/src/node/mod.rs @@ -0,0 +1,112 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Erlang `node()` host communication for managing remote actor communication in +//! a cluster + +/* +TODO: + +Overview: + +A `NodeServer` handles opening the TCP listener and managing incoming and outgoing `NodeSession` requests. `NodeSession`s +will represent a remote server locally. + +We need to: +1. Authenticate the remote `node()` and determine if it's a valid node which can be connected to +2. Define and implement the remote node protocol + 1. Cast + 2. Call +3. Adjustments in the default Message type which allow messages to be serializable. (with `cluster` feature enabled) +4. Spawn all of the remote actors locally as "Remote Actors" (see actor_id.rs in `ractor`), and any requests to these +local representitive actors will be passed over the network to the remote actor. +5. Registry and pg groups need to be sync'd with local & remote actors across all nodes. + +Additionally, you can open a session as a "client" by requesting a new session from the NodeServer +after intially connecting a TcpStream to the desired endpoint and then attaching the NodeSession +to the TcpStream (and linking the actor). + +What's there to do? +1. The client-connection behavior does not yet exist + +*/ + +use std::collections::HashMap; + +use ractor::{Actor, ActorId, ActorRef}; + +const DEFAULT_PORT: crate::net::NetworkPort = 1230; + +/// Represents the server which is managing all incoming node session requests +pub struct NodeServer { + maybe_port: Option, +} + +/// The state of the node server +pub struct NodeServerState { + _listener: ActorRef< + crate::net::listener::Listener, + >, + node_sessions: HashMap>, +} + +#[async_trait::async_trait] +impl Actor for NodeServer { + type Msg = crate::net::SessionManagerMessage; + type State = NodeServerState; + async fn pre_start(&self, myself: ActorRef) -> Self::State { + let listener = crate::net::listener::Listener::new( + self.maybe_port.unwrap_or(DEFAULT_PORT), + myself.clone(), + ); + + let (actor_ref, _) = Actor::spawn_linked(None, listener, myself.get_cell()) + .await + .expect("Failed to start listener"); + + Self::State { + node_sessions: HashMap::new(), + _listener: actor_ref, + } + } + + async fn handle(&self, myself: ActorRef, message: Self::Msg, state: &mut Self::State) { + match message { + Self::Msg::SessionOpened(reply) => { + if let Ok((actor, _)) = + Actor::spawn_linked(None, NodeSession, myself.get_cell()).await + { + let _ = reply.send(actor); + } else { + // let the timeout take care of the child process and shut the channel down + } + } + Self::Msg::SessionClosed(id) => { + // remove the stale session + if let Some(session) = state.node_sessions.remove(&id) { + // session closed + session.stop(None); + } + } + } + } + + // TODO: fill out the supervision logic +} + +/// Represents a session with a specific node +pub struct NodeSession; + +#[async_trait::async_trait] +impl Actor for NodeSession { + type Msg = crate::protocol::NetworkMessage; + type State = (); + async fn pre_start(&self, _myself: ActorRef) -> Self::State {} +} + +// /// A node server is run by every `node()` process to accept inter-node connection requests +// pub type NodeServer = crate::net::listener::Listener; +// /// A node session in an active open connection with a peer `node()` +// pub type NodeSession = crate::net::session::Session; diff --git a/ractor-cluster/src/protocol/auth.proto b/ractor-cluster/src/protocol/auth.proto new file mode 100644 index 00000000..940606aa --- /dev/null +++ b/ractor-cluster/src/protocol/auth.proto @@ -0,0 +1,61 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +// NOTE: We make heavy use of oneof syntax here in order to deal with a rust-like enum +// type. You can see https://developers.google.com/protocol-buffers/docs/proto3#oneof for +// the syntax and guide + +// The protocol messages defined here roughly follow the Erlang distributed systems guide +// found at: https://www.erlang.org/doc/apps/erts/erl_dist_protocol.html + +syntax = "proto3"; + +package auth; + +// In the following, we're assuming client A is connecting to server B + +/* Client -> Server: `SendName` represents the initial client's request to the server. */ +message SendName { + // The node's name + string name = 1; +} + +// Server -> Client: `SendStatus` is the server replying with the handshake status to the client +message SendStatus { + // Status types + enum Status { + // The handshake will continue + OK = 0; + // The handshake will continue, but A is informed that B has another ongoing + // connection attempt that will be shut down (simultaneous connect where A's + // name is greater than B's name, compared literally). + OK_SIMULTANEOUS = 1; + // The handshake will not continue, as B already has an ongoing handshake, which + // it itself has initiated (simultaneous connect where B's name is greater than A's). + NOT_OK = 2; + // The connection is disallowed for some (unspecified) security reason. + NOT_ALLOWED = 3; + // A connection to the node is already active, which either means that node A is confused + // or that the TCP connection breakdown of a previous node with this name has not yet + // reached node B. + ALIVE = 4; + + // Skipped NAMED = 5; + } + + // The status + Status status = 1; +} + +// A authentication message +message AuthenticationMessage { + // The inner message type + oneof msg { + // Send the name + SendName send_name = 1; + // Send the status + SendStatus send_status = 2; + } +} \ No newline at end of file diff --git a/ractor-cluster/src/protocol/meta.proto b/ractor-cluster/src/protocol/meta.proto new file mode 100644 index 00000000..89c275d9 --- /dev/null +++ b/ractor-cluster/src/protocol/meta.proto @@ -0,0 +1,23 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + + +syntax = "proto3"; + +import "auth.proto"; +import "node.proto"; + +package meta; + +// Represents a message over the network +message NetworkMessage { + // The inner message + oneof message { + // An authentication message + auth.AuthenticationMessage auth = 1; + // An inter-node message + node.NodeMessage node = 2; + } +} \ No newline at end of file diff --git a/ractor-cluster/src/protocol/mod.rs b/ractor-cluster/src/protocol/mod.rs new file mode 100644 index 00000000..c0f61983 --- /dev/null +++ b/ractor-cluster/src/protocol/mod.rs @@ -0,0 +1,24 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Protobuf specifications for over-the-wire intercommuncation +//! between nodes. Generated via [prost] + +/// Node authentication protocol +pub mod auth { + include!(concat!(env!("OUT_DIR"), "/auth.rs")); +} + +/// Node inter-communication protocol +pub mod node { + include!(concat!(env!("OUT_DIR"), "/node.rs")); +} + +/// Meta types which include all base network protocol message types +pub mod meta { + include!(concat!(env!("OUT_DIR"), "/meta.rs")); +} + +pub use meta::NetworkMessage; diff --git a/ractor-cluster/src/protocol/node.proto b/ractor-cluster/src/protocol/node.proto new file mode 100644 index 00000000..1f25a97f --- /dev/null +++ b/ractor-cluster/src/protocol/node.proto @@ -0,0 +1,20 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +// NOTE: We make heavy use of oneof syntax here in order to deal with a rust-like enum +// type. You can see https://developers.google.com/protocol-buffers/docs/proto3#oneof for +// the syntax and guide + +syntax = "proto3"; + +package node; + + +// A placeholder message representing an +// authenticated inter-ndoe message +message NodeMessage { + // Some value + int32 value = 1; +} \ No newline at end of file diff --git a/ractor/Cargo.toml b/ractor/Cargo.toml index 237ffb01..e25a5625 100644 --- a/ractor/Cargo.toml +++ b/ractor/Cargo.toml @@ -12,7 +12,12 @@ readme = "../README.md" homepage = "https://github.com/slawlor/ractor" categories = ["actor", "erlang"] +[features] +cluster = [] +default = [] + [dependencies] +## Required dependencies async-trait = "0.1" dashmap = "5" futures = "0.3" diff --git a/ractor/src/actor/mod.rs b/ractor/src/actor/mod.rs index b779803a..37cb8b9d 100644 --- a/ractor/src/actor/mod.rs +++ b/ractor/src/actor/mod.rs @@ -98,7 +98,8 @@ pub trait Actor: Sized + Sync + Send + 'static { async fn handle(&self, myself: ActorRef, message: Self::Msg, state: &mut Self::State) {} /// Handle the incoming supervision event. Unhandled panicks will captured and - /// sent the the supervisor(s) + /// sent the the supervisor(s). The default supervision behavior is to ignore all + /// child events. To override this behavior, implement this method. /// /// * `myself` - A handle to the [ActorCell] representing this actor /// * `message` - The message to process diff --git a/ractor/src/actor/tests/mod.rs b/ractor/src/actor/tests/mod.rs index e1c3cfca..775efb03 100644 --- a/ractor/src/actor/tests/mod.rs +++ b/ractor/src/actor/tests/mod.rs @@ -233,9 +233,7 @@ async fn test_sending_message_to_invalid_actor_type() { impl Actor for TestActor1 { type Msg = TestMessage1; type State = (); - async fn pre_start(&self, _myself: ActorRef) -> Self::State { - () - } + async fn pre_start(&self, _myself: ActorRef) -> Self::State {} } struct TestActor2; struct TestMessage2; @@ -243,9 +241,7 @@ async fn test_sending_message_to_invalid_actor_type() { impl Actor for TestActor2 { type Msg = TestMessage2; type State = (); - async fn pre_start(&self, _myself: ActorRef) -> Self::State { - () - } + async fn pre_start(&self, _myself: ActorRef) -> Self::State {} } let (actor1, handle1) = Actor::spawn(None, TestActor1) diff --git a/ractor/src/lib.rs b/ractor/src/lib.rs index 179e1ba6..648a682d 100644 --- a/ractor/src/lib.rs +++ b/ractor/src/lib.rs @@ -135,8 +135,6 @@ #![warn(unused_crate_dependencies)] #![cfg_attr(docsrs, feature(doc_cfg))] -use std::any::Any; - /// An actor's name, equivalent to an [Erlang `atom()`](https://www.erlang.org/doc/reference_manual/data_types.html#atom) pub type ActorName = &'static str; @@ -145,6 +143,7 @@ pub type GroupName = &'static str; pub mod actor; pub mod actor_id; +pub mod message; pub mod pg; pub mod port; pub mod registry; @@ -156,36 +155,16 @@ use criterion as _; #[cfg(test)] use rand as _; -// WIP -// #[cfg(feature = "remote")] -// pub mod distributed; - // re-exports pub use actor::actor_cell::{ActorCell, ActorRef, ActorStatus, ACTIVE_STATES}; pub use actor::errors::{ActorErr, MessagingErr, SpawnErr}; pub use actor::messages::{Signal, SupervisionEvent}; pub use actor::{Actor, ActorRuntime}; pub use actor_id::ActorId; +pub use message::{BaseMessage, Message}; pub use port::{OutputMessage, OutputPort, RpcReplyPort}; -/// Message type for an actor. Generally an enum -/// which muxes the various types of inner-messages the actor -/// supports -/// -/// ## Example -/// -/// ```rust -/// pub enum MyMessage { -/// /// Record the name to the actor state -/// RecordName(String), -/// /// Print the recorded name from the state to command line -/// PrintName, -/// } -/// ``` -pub trait Message: Any + Send + 'static {} -impl Message for T {} - /// Represents the state of an actor. Must be safe /// to send between threads (same bounds as a [Message]) -pub trait State: Message {} -impl State for T {} +pub trait State: BaseMessage {} +impl State for T {} diff --git a/ractor/src/message.rs b/ractor/src/message.rs new file mode 100644 index 00000000..145e6855 --- /dev/null +++ b/ractor/src/message.rs @@ -0,0 +1,77 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Message types + +use std::any::Any; + +/// Message type for an actor. Generally an enum +/// which muxes the various types of inner-messages the actor +/// supports +/// +/// ## Example +/// +/// ```rust +/// pub enum MyMessage { +/// /// Record the name to the actor state +/// RecordName(String), +/// /// Print the recorded name from the state to command line +/// PrintName, +/// } +/// ``` +pub trait BaseMessage: Any + Send + 'static {} +impl BaseMessage for T {} + +// ============= Basic Message ============= // +/// Message type for an actor. Generally an enum +/// which muxes the various types of inner-messages the actor +/// supports +/// +/// ## Example +/// +/// ```rust +/// pub enum MyMessage { +/// /// Record the name to the actor state +/// RecordName(String), +/// /// Print the recorded name from the state to command line +/// PrintName, +/// } +/// ``` +#[cfg(not(feature = "cluster"))] +pub trait Message: BaseMessage {} +#[cfg(not(feature = "cluster"))] +impl Message for T {} + +// ============= Distributed Message ============= // + +/// Distributed message type for an actor which supports +/// serialization. +/// +/// ## Example +/// +/// ```rust +/// pub enum MyMessage { +/// /// Record the name to the actor state +/// RecordName(String), +/// /// Print the recorded name from the state to command line +/// PrintName, +/// } +/// ``` +#[cfg(feature = "cluster")] +pub trait Message: crate::BaseMessage {} //+ crate::distributed::SerializableMessage +#[cfg(feature = "cluster")] +impl Message for T {} //+ crate::distributed::SerializableMessage + +/* +/// impl ractor::distributed::SerializableMessage for MyMessage { +/// +/// fn serialize(&self) -> Vec { +/// &[] +/// } +/// fn deserialize(&self, data: &[u8]) -> Self { +/// MyMessage::PrintName +/// } +/// } +*/