Skip to content

Commit

Permalink
Add a new after auth handshake to ensure connection is ready
Browse files Browse the repository at this point in the history
Remote actors and PG memberships are sync after authentication
are done. A new message and NodeEventSubscription callback can
be used to query the connection ready state. The session setup
is complete from both side once the query returns true.

Signed-off-by: Kan-Ru Chen <[email protected]>
  • Loading branch information
kanru committed Jan 10, 2025
1 parent 3b7cf3d commit 54888d5
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 3 deletions.
20 changes: 20 additions & 0 deletions ractor_cluster/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ pub enum NodeServerMessage {
/// This specific node session has authenticated
ConnectionAuthenticated(ActorId),

/// This specific node session has finished all state exchange after authentication
ConnectionReady(ActorId),

/// A request to check if a session is currently open, and if it is is the ordering such that we should
/// reject the incoming request
///
Expand Down Expand Up @@ -151,6 +154,9 @@ pub enum NodeSessionMessage {

/// Retrieve whether the session is authenticated or not
GetAuthenticationState(RpcReplyPort<bool>),

/// Retrieve whether the session has finished initial state exchange after authentication
GetReadyState(RpcReplyPort<bool>),
}

/// Node connection mode from the [Erlang](https://www.erlang.org/doc/reference_manual/distributed.html#node-connections)
Expand Down Expand Up @@ -273,6 +279,13 @@ pub trait NodeEventSubscription: Send + 'static {
/// * `ses`: The [NodeServerSessionInformation] representing the current state
/// of the node session
fn node_session_authenicated(&self, ses: NodeServerSessionInformation);

/// A node session is ready
///
/// * `ses`: The [NodeServerSessionInformation] representing the current state
/// of the node session
#[allow(unused_variables)]
fn node_session_ready(&self, ses: NodeServerSessionInformation) {}

Check warning on line 288 in ractor_cluster/src/node/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/mod.rs#L288

Added line #L288 was not covered by tests
}

/// The state of the node server
Expand Down Expand Up @@ -398,6 +411,13 @@ impl Actor for NodeServer {
}
}
}
Self::Msg::ConnectionReady(actor_id) => {
if let Some(entry) = state.node_sessions.get(&actor_id) {
for (_, sub) in state.subscriptions.iter() {
sub.node_session_ready(entry.clone());
}
}

Check warning on line 419 in ractor_cluster/src/node/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/mod.rs#L414-L419

Added lines #L414 - L419 were not covered by tests
}
Self::Msg::UpdateSession { actor_id, name } => {
if let Some(entry) = state.node_sessions.get_mut(&actor_id) {
entry.update(name);
Expand Down
53 changes: 52 additions & 1 deletion ractor_cluster/src/node/node_session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,20 @@ impl AuthenticationState {
}
}

#[derive(Debug)]
enum ReadyState {
Open,
SyncSent,
SyncReceived,
Ready,
}

impl ReadyState {
fn is_ok(&self) -> bool {
matches!(self, ReadyState::Ready)
}

Check warning on line 71 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L69-L71

Added lines #L69 - L71 were not covered by tests
}

/// Represents a remote connection to a `node()`. The [NodeSession] is the main
/// handler for all inter-node communication and handles
///
Expand Down Expand Up @@ -455,6 +469,21 @@ impl NodeSession {

if let Some(msg) = message.msg {
match msg {
control_protocol::control_message::Msg::Ready(_) => match state.ready {
ReadyState::Open => {
state.ready = ReadyState::SyncReceived;
}

Check warning on line 475 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L472-L475

Added lines #L472 - L475 were not covered by tests
ReadyState::SyncSent => {
state.ready = ReadyState::Ready;
ractor::cast!(
self.node_server,
NodeServerMessage::ConnectionReady(myself.get_id())
)?;

Check warning on line 481 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L477-L481

Added lines #L477 - L481 were not covered by tests
}
ReadyState::SyncReceived | ReadyState::Ready => {
tracing::warn!("Received duplicate Ready signal");

Check warning on line 484 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L484

Added line #L484 was not covered by tests
}
},
control_protocol::control_message::Msg::Spawn(spawned_actors) => {
for net_actor in spawned_actors.actors {
if let Err(spawn_err) = self
Expand Down Expand Up @@ -720,6 +749,18 @@ impl NodeSession {
state.tcp_send_control(control_message);
}
}
state.tcp_send_control(control_protocol::ControlMessage {
msg: Some(control_protocol::control_message::Msg::Ready(
control_protocol::Ready {},
)),
});
match state.ready {
ReadyState::Open => state.ready = ReadyState::SyncSent,
ReadyState::SyncReceived => state.ready = ReadyState::Ready,

Check warning on line 759 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L752-L759

Added lines #L752 - L759 were not covered by tests
ReadyState::SyncSent | ReadyState::Ready => {
unreachable!("after_authenticated() executed twice")

Check warning on line 761 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L761

Added line #L761 was not covered by tests
}
}
// TODO: subscribe to the named registry and synchronize it? What happes on a name clash? How would this be handled
// if both sessions had a "node_a" for example? Which resolves, local only?
}
Expand Down Expand Up @@ -759,6 +800,7 @@ pub struct NodeSessionState {
epoch: Instant,
name: Option<auth_protocol::NameMessage>,
auth: AuthenticationState,
ready: ReadyState,
remote_actors: HashMap<u64, ActorRef<RemoteActorMessage>>,
}

Expand Down Expand Up @@ -861,6 +903,7 @@ impl Actor for NodeSession {
} else {
AuthenticationState::AsClient(auth::ClientAuthenticationProcess::init())
},
ready: ReadyState::Open,

Check warning on line 906 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L906

Added line #L906 was not covered by tests
remote_actors: HashMap::new(),
peer_addr,
local_addr,
Expand Down Expand Up @@ -927,7 +970,12 @@ impl Actor for NodeSession {
self.node_server.cast(
NodeServerMessage::ConnectionAuthenticated(myself.get_id()),
)?;
self.after_authenticated(myself, state);
self.after_authenticated(myself.clone(), state);
if state.ready.is_ok() {
self.node_server.cast(NodeServerMessage::ConnectionReady(
myself.get_id(),
))?;
}

Check warning on line 978 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L973-L978

Added lines #L973 - L978 were not covered by tests
}
}
crate::protocol::meta::network_message::Message::Node(node_message) => {
Expand All @@ -947,6 +995,9 @@ impl Actor for NodeSession {
Self::Msg::GetAuthenticationState(reply) => {
let _ = reply.send(state.auth.is_ok());
}
Self::Msg::GetReadyState(reply) => {
let _ = reply.send(state.ready.is_ok());
}

Check warning on line 1000 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L998-L1000

Added lines #L998 - L1000 were not covered by tests
_ => {
// no-op, ignore
}
Expand Down
6 changes: 6 additions & 0 deletions ractor_cluster/src/node/node_session/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ async fn node_sesison_client_auth_success() {

let mut state = NodeSessionState {
auth: AuthenticationState::AsClient(auth::ClientAuthenticationProcess::init()),
ready: ReadyState::Open,
local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
name: None,
Expand Down Expand Up @@ -254,6 +255,7 @@ async fn node_session_client_auth_session_state_failures() {

let mut state = NodeSessionState {
auth: AuthenticationState::AsClient(auth::ClientAuthenticationProcess::init()),
ready: ReadyState::Open,
local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
name: None,
Expand Down Expand Up @@ -384,6 +386,7 @@ async fn node_session_server_auth_success() {
// let addr = SocketAddr::
let mut state = NodeSessionState {
auth: AuthenticationState::AsServer(auth::ServerAuthenticationProcess::init()),
ready: ReadyState::Open,
local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
name: None,
Expand Down Expand Up @@ -478,6 +481,7 @@ async fn node_session_server_auth_session_state_failures() {

let mut state = NodeSessionState {
auth: AuthenticationState::AsServer(auth::ServerAuthenticationProcess::init()),
ready: ReadyState::Open,
local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
name: None,
Expand Down Expand Up @@ -627,6 +631,7 @@ async fn node_session_handle_node_msg() {

let mut state = NodeSessionState {
auth: AuthenticationState::AsServer(auth::ServerAuthenticationProcess::Ok([0u8; 32])),
ready: ReadyState::Open,
local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
name: None,
Expand Down Expand Up @@ -724,6 +729,7 @@ async fn node_session_handle_control() {

let mut state = NodeSessionState {
auth: AuthenticationState::AsClient(auth::ClientAuthenticationProcess::Ok),
ready: ReadyState::Open,
local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
name: None,
Expand Down
6 changes: 6 additions & 0 deletions ractor_cluster/src/protocol/control.proto
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ message NodeSessions {
repeated auth.NameMessage sessions = 1;
}

// All state for initial sync has been pushed
message Ready {
}

// Control messages between authenticated `node()`s which are dist-connected
message ControlMessage {
// The message payload
Expand All @@ -96,5 +100,7 @@ message ControlMessage {
auth.NameMessage enumerate_node_sessions = 7;
// The list of node sessions on the remote host for transitive connections
NodeSessions node_sessions = 8;
// All state for initial sync has been pushed
Ready ready = 9;
}
}
4 changes: 2 additions & 2 deletions ractor_cluster_integration_tests/src/tests/pg_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ pub(crate) async fn test(config: PgGroupsConfig) -> i32 {
{
let is_authenticated = ractor::call_t!(
item.actor,
ractor_cluster::NodeSessionMessage::GetAuthenticationState,
ractor_cluster::NodeSessionMessage::GetReadyState,
200
);
match is_authenticated {
Expand All @@ -167,7 +167,7 @@ pub(crate) async fn test(config: PgGroupsConfig) -> i32 {
}
Ok(true) => {
err_code = 0;
tracing::info!("Authentication succeeded. Exiting test");
tracing::info!("Authentication and initial sync succeeded.");
break;
}
}
Expand Down

0 comments on commit 54888d5

Please sign in to comment.