From 54888d52027979efc98c144d699f864d24255929 Mon Sep 17 00:00:00 2001 From: Kan-Ru Chen Date: Fri, 10 Jan 2025 21:35:21 +0900 Subject: [PATCH] Add a new after auth handshake to ensure connection is ready Remote actors and PG memberships are sync after authentication are done. A new message and NodeEventSubscription callback can be used to query the connection ready state. The session setup is complete from both side once the query returns true. Signed-off-by: Kan-Ru Chen --- ractor_cluster/src/node/mod.rs | 20 +++++++ ractor_cluster/src/node/node_session/mod.rs | 53 ++++++++++++++++++- ractor_cluster/src/node/node_session/tests.rs | 6 +++ ractor_cluster/src/protocol/control.proto | 6 +++ .../src/tests/pg_groups.rs | 4 +- 5 files changed, 86 insertions(+), 3 deletions(-) diff --git a/ractor_cluster/src/node/mod.rs b/ractor_cluster/src/node/mod.rs index 923e2770..311b607f 100644 --- a/ractor_cluster/src/node/mod.rs +++ b/ractor_cluster/src/node/mod.rs @@ -104,6 +104,9 @@ pub enum NodeServerMessage { /// This specific node session has authenticated ConnectionAuthenticated(ActorId), + /// This specific node session has finished all state exchange after authentication + ConnectionReady(ActorId), + /// A request to check if a session is currently open, and if it is is the ordering such that we should /// reject the incoming request /// @@ -151,6 +154,9 @@ pub enum NodeSessionMessage { /// Retrieve whether the session is authenticated or not GetAuthenticationState(RpcReplyPort), + + /// Retrieve whether the session has finished initial state exchange after authentication + GetReadyState(RpcReplyPort), } /// Node connection mode from the [Erlang](https://www.erlang.org/doc/reference_manual/distributed.html#node-connections) @@ -273,6 +279,13 @@ pub trait NodeEventSubscription: Send + 'static { /// * `ses`: The [NodeServerSessionInformation] representing the current state /// of the node session fn node_session_authenicated(&self, ses: NodeServerSessionInformation); + + /// A node session is ready + /// + /// * `ses`: The [NodeServerSessionInformation] representing the current state + /// of the node session + #[allow(unused_variables)] + fn node_session_ready(&self, ses: NodeServerSessionInformation) {} } /// The state of the node server @@ -398,6 +411,13 @@ impl Actor for NodeServer { } } } + Self::Msg::ConnectionReady(actor_id) => { + if let Some(entry) = state.node_sessions.get(&actor_id) { + for (_, sub) in state.subscriptions.iter() { + sub.node_session_ready(entry.clone()); + } + } + } Self::Msg::UpdateSession { actor_id, name } => { if let Some(entry) = state.node_sessions.get_mut(&actor_id) { entry.update(name); diff --git a/ractor_cluster/src/node/node_session/mod.rs b/ractor_cluster/src/node/node_session/mod.rs index addf35d2..a37557f9 100644 --- a/ractor_cluster/src/node/node_session/mod.rs +++ b/ractor_cluster/src/node/node_session/mod.rs @@ -57,6 +57,20 @@ impl AuthenticationState { } } +#[derive(Debug)] +enum ReadyState { + Open, + SyncSent, + SyncReceived, + Ready, +} + +impl ReadyState { + fn is_ok(&self) -> bool { + matches!(self, ReadyState::Ready) + } +} + /// Represents a remote connection to a `node()`. The [NodeSession] is the main /// handler for all inter-node communication and handles /// @@ -455,6 +469,21 @@ impl NodeSession { if let Some(msg) = message.msg { match msg { + control_protocol::control_message::Msg::Ready(_) => match state.ready { + ReadyState::Open => { + state.ready = ReadyState::SyncReceived; + } + ReadyState::SyncSent => { + state.ready = ReadyState::Ready; + ractor::cast!( + self.node_server, + NodeServerMessage::ConnectionReady(myself.get_id()) + )?; + } + ReadyState::SyncReceived | ReadyState::Ready => { + tracing::warn!("Received duplicate Ready signal"); + } + }, control_protocol::control_message::Msg::Spawn(spawned_actors) => { for net_actor in spawned_actors.actors { if let Err(spawn_err) = self @@ -720,6 +749,18 @@ impl NodeSession { state.tcp_send_control(control_message); } } + state.tcp_send_control(control_protocol::ControlMessage { + msg: Some(control_protocol::control_message::Msg::Ready( + control_protocol::Ready {}, + )), + }); + match state.ready { + ReadyState::Open => state.ready = ReadyState::SyncSent, + ReadyState::SyncReceived => state.ready = ReadyState::Ready, + ReadyState::SyncSent | ReadyState::Ready => { + unreachable!("after_authenticated() executed twice") + } + } // TODO: subscribe to the named registry and synchronize it? What happes on a name clash? How would this be handled // if both sessions had a "node_a" for example? Which resolves, local only? } @@ -759,6 +800,7 @@ pub struct NodeSessionState { epoch: Instant, name: Option, auth: AuthenticationState, + ready: ReadyState, remote_actors: HashMap>, } @@ -861,6 +903,7 @@ impl Actor for NodeSession { } else { AuthenticationState::AsClient(auth::ClientAuthenticationProcess::init()) }, + ready: ReadyState::Open, remote_actors: HashMap::new(), peer_addr, local_addr, @@ -927,7 +970,12 @@ impl Actor for NodeSession { self.node_server.cast( NodeServerMessage::ConnectionAuthenticated(myself.get_id()), )?; - self.after_authenticated(myself, state); + self.after_authenticated(myself.clone(), state); + if state.ready.is_ok() { + self.node_server.cast(NodeServerMessage::ConnectionReady( + myself.get_id(), + ))?; + } } } crate::protocol::meta::network_message::Message::Node(node_message) => { @@ -947,6 +995,9 @@ impl Actor for NodeSession { Self::Msg::GetAuthenticationState(reply) => { let _ = reply.send(state.auth.is_ok()); } + Self::Msg::GetReadyState(reply) => { + let _ = reply.send(state.ready.is_ok()); + } _ => { // no-op, ignore } diff --git a/ractor_cluster/src/node/node_session/tests.rs b/ractor_cluster/src/node/node_session/tests.rs index 30ae7f03..55df5bb7 100644 --- a/ractor_cluster/src/node/node_session/tests.rs +++ b/ractor_cluster/src/node/node_session/tests.rs @@ -109,6 +109,7 @@ async fn node_sesison_client_auth_success() { let mut state = NodeSessionState { auth: AuthenticationState::AsClient(auth::ClientAuthenticationProcess::init()), + ready: ReadyState::Open, local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), name: None, @@ -254,6 +255,7 @@ async fn node_session_client_auth_session_state_failures() { let mut state = NodeSessionState { auth: AuthenticationState::AsClient(auth::ClientAuthenticationProcess::init()), + ready: ReadyState::Open, local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), name: None, @@ -384,6 +386,7 @@ async fn node_session_server_auth_success() { // let addr = SocketAddr:: let mut state = NodeSessionState { auth: AuthenticationState::AsServer(auth::ServerAuthenticationProcess::init()), + ready: ReadyState::Open, local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), name: None, @@ -478,6 +481,7 @@ async fn node_session_server_auth_session_state_failures() { let mut state = NodeSessionState { auth: AuthenticationState::AsServer(auth::ServerAuthenticationProcess::init()), + ready: ReadyState::Open, local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), name: None, @@ -627,6 +631,7 @@ async fn node_session_handle_node_msg() { let mut state = NodeSessionState { auth: AuthenticationState::AsServer(auth::ServerAuthenticationProcess::Ok([0u8; 32])), + ready: ReadyState::Open, local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), name: None, @@ -724,6 +729,7 @@ async fn node_session_handle_control() { let mut state = NodeSessionState { auth: AuthenticationState::AsClient(auth::ClientAuthenticationProcess::Ok), + ready: ReadyState::Open, local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), name: None, diff --git a/ractor_cluster/src/protocol/control.proto b/ractor_cluster/src/protocol/control.proto index dd5804e2..a4b2adf9 100644 --- a/ractor_cluster/src/protocol/control.proto +++ b/ractor_cluster/src/protocol/control.proto @@ -76,6 +76,10 @@ message NodeSessions { repeated auth.NameMessage sessions = 1; } +// All state for initial sync has been pushed +message Ready { +} + // Control messages between authenticated `node()`s which are dist-connected message ControlMessage { // The message payload @@ -96,5 +100,7 @@ message ControlMessage { auth.NameMessage enumerate_node_sessions = 7; // The list of node sessions on the remote host for transitive connections NodeSessions node_sessions = 8; + // All state for initial sync has been pushed + Ready ready = 9; } } diff --git a/ractor_cluster_integration_tests/src/tests/pg_groups.rs b/ractor_cluster_integration_tests/src/tests/pg_groups.rs index 7908d35a..5218d476 100644 --- a/ractor_cluster_integration_tests/src/tests/pg_groups.rs +++ b/ractor_cluster_integration_tests/src/tests/pg_groups.rs @@ -154,7 +154,7 @@ pub(crate) async fn test(config: PgGroupsConfig) -> i32 { { let is_authenticated = ractor::call_t!( item.actor, - ractor_cluster::NodeSessionMessage::GetAuthenticationState, + ractor_cluster::NodeSessionMessage::GetReadyState, 200 ); match is_authenticated { @@ -167,7 +167,7 @@ pub(crate) async fn test(config: PgGroupsConfig) -> i32 { } Ok(true) => { err_code = 0; - tracing::info!("Authentication succeeded. Exiting test"); + tracing::info!("Authentication and initial sync succeeded."); break; } }