Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new NodeSessionMessage to get the state of initial sync #322

Merged
merged 1 commit into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions ractor_cluster/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@
/// This specific node session has authenticated
ConnectionAuthenticated(ActorId),

/// This specific node session has finished all state exchange after authentication
ConnectionReady(ActorId),

/// A request to check if a session is currently open, and if it is is the ordering such that we should
/// reject the incoming request
///
Expand Down Expand Up @@ -151,6 +154,9 @@

/// Retrieve whether the session is authenticated or not
GetAuthenticationState(RpcReplyPort<bool>),

/// Retrieve whether the session has finished initial state exchange after authentication
GetReadyState(RpcReplyPort<bool>),
}

/// Node connection mode from the [Erlang](https://www.erlang.org/doc/reference_manual/distributed.html#node-connections)
Expand Down Expand Up @@ -273,6 +279,13 @@
/// * `ses`: The [NodeServerSessionInformation] representing the current state
/// of the node session
fn node_session_authenicated(&self, ses: NodeServerSessionInformation);

/// A node session is ready
///
/// * `ses`: The [NodeServerSessionInformation] representing the current state
/// of the node session
#[allow(unused_variables)]
fn node_session_ready(&self, ses: NodeServerSessionInformation) {}

Check warning on line 288 in ractor_cluster/src/node/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/mod.rs#L288

Added line #L288 was not covered by tests
}

/// The state of the node server
Expand Down Expand Up @@ -398,6 +411,13 @@
}
}
}
Self::Msg::ConnectionReady(actor_id) => {
if let Some(entry) = state.node_sessions.get(&actor_id) {
for (_, sub) in state.subscriptions.iter() {
sub.node_session_ready(entry.clone());
}
}

Check warning on line 419 in ractor_cluster/src/node/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/mod.rs#L414-L419

Added lines #L414 - L419 were not covered by tests
}
Self::Msg::UpdateSession { actor_id, name } => {
if let Some(entry) = state.node_sessions.get_mut(&actor_id) {
entry.update(name);
Expand Down
53 changes: 52 additions & 1 deletion ractor_cluster/src/node/node_session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,20 @@
}
}

#[derive(Debug)]
enum ReadyState {
Open,
SyncSent,
SyncReceived,
Ready,
}

impl ReadyState {
fn is_ok(&self) -> bool {
matches!(self, ReadyState::Ready)
}

Check warning on line 71 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L69-L71

Added lines #L69 - L71 were not covered by tests
}

/// Represents a remote connection to a `node()`. The [NodeSession] is the main
/// handler for all inter-node communication and handles
///
Expand Down Expand Up @@ -455,6 +469,21 @@

if let Some(msg) = message.msg {
match msg {
control_protocol::control_message::Msg::Ready(_) => match state.ready {
ReadyState::Open => {
state.ready = ReadyState::SyncReceived;
}

Check warning on line 475 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L472-L475

Added lines #L472 - L475 were not covered by tests
ReadyState::SyncSent => {
state.ready = ReadyState::Ready;
ractor::cast!(
self.node_server,
NodeServerMessage::ConnectionReady(myself.get_id())
)?;

Check warning on line 481 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L477-L481

Added lines #L477 - L481 were not covered by tests
}
ReadyState::SyncReceived | ReadyState::Ready => {
tracing::warn!("Received duplicate Ready signal");

Check warning on line 484 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L484

Added line #L484 was not covered by tests
}
},
control_protocol::control_message::Msg::Spawn(spawned_actors) => {
for net_actor in spawned_actors.actors {
if let Err(spawn_err) = self
Expand Down Expand Up @@ -720,6 +749,18 @@
state.tcp_send_control(control_message);
}
}
state.tcp_send_control(control_protocol::ControlMessage {
msg: Some(control_protocol::control_message::Msg::Ready(
control_protocol::Ready {},
)),
});
match state.ready {
ReadyState::Open => state.ready = ReadyState::SyncSent,
ReadyState::SyncReceived => state.ready = ReadyState::Ready,

Check warning on line 759 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L752-L759

Added lines #L752 - L759 were not covered by tests
ReadyState::SyncSent | ReadyState::Ready => {
unreachable!("after_authenticated() executed twice")

Check warning on line 761 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L761

Added line #L761 was not covered by tests
}
}
// TODO: subscribe to the named registry and synchronize it? What happes on a name clash? How would this be handled
// if both sessions had a "node_a" for example? Which resolves, local only?
}
Expand Down Expand Up @@ -759,6 +800,7 @@
epoch: Instant,
name: Option<auth_protocol::NameMessage>,
auth: AuthenticationState,
ready: ReadyState,
remote_actors: HashMap<u64, ActorRef<RemoteActorMessage>>,
}

Expand Down Expand Up @@ -861,6 +903,7 @@
} else {
AuthenticationState::AsClient(auth::ClientAuthenticationProcess::init())
},
ready: ReadyState::Open,

Check warning on line 906 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L906

Added line #L906 was not covered by tests
remote_actors: HashMap::new(),
peer_addr,
local_addr,
Expand Down Expand Up @@ -927,7 +970,12 @@
self.node_server.cast(
NodeServerMessage::ConnectionAuthenticated(myself.get_id()),
)?;
self.after_authenticated(myself, state);
self.after_authenticated(myself.clone(), state);
if state.ready.is_ok() {
self.node_server.cast(NodeServerMessage::ConnectionReady(
myself.get_id(),
))?;
}

Check warning on line 978 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L973-L978

Added lines #L973 - L978 were not covered by tests
}
}
crate::protocol::meta::network_message::Message::Node(node_message) => {
Expand All @@ -947,6 +995,9 @@
Self::Msg::GetAuthenticationState(reply) => {
let _ = reply.send(state.auth.is_ok());
}
Self::Msg::GetReadyState(reply) => {
let _ = reply.send(state.ready.is_ok());
}

Check warning on line 1000 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L998-L1000

Added lines #L998 - L1000 were not covered by tests
_ => {
// no-op, ignore
}
Expand Down
6 changes: 6 additions & 0 deletions ractor_cluster/src/node/node_session/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ async fn node_sesison_client_auth_success() {

let mut state = NodeSessionState {
auth: AuthenticationState::AsClient(auth::ClientAuthenticationProcess::init()),
ready: ReadyState::Open,
local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
name: None,
Expand Down Expand Up @@ -254,6 +255,7 @@ async fn node_session_client_auth_session_state_failures() {

let mut state = NodeSessionState {
auth: AuthenticationState::AsClient(auth::ClientAuthenticationProcess::init()),
ready: ReadyState::Open,
local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
name: None,
Expand Down Expand Up @@ -384,6 +386,7 @@ async fn node_session_server_auth_success() {
// let addr = SocketAddr::
let mut state = NodeSessionState {
auth: AuthenticationState::AsServer(auth::ServerAuthenticationProcess::init()),
ready: ReadyState::Open,
local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
name: None,
Expand Down Expand Up @@ -478,6 +481,7 @@ async fn node_session_server_auth_session_state_failures() {

let mut state = NodeSessionState {
auth: AuthenticationState::AsServer(auth::ServerAuthenticationProcess::init()),
ready: ReadyState::Open,
local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
name: None,
Expand Down Expand Up @@ -627,6 +631,7 @@ async fn node_session_handle_node_msg() {

let mut state = NodeSessionState {
auth: AuthenticationState::AsServer(auth::ServerAuthenticationProcess::Ok([0u8; 32])),
ready: ReadyState::Open,
local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
name: None,
Expand Down Expand Up @@ -724,6 +729,7 @@ async fn node_session_handle_control() {

let mut state = NodeSessionState {
auth: AuthenticationState::AsClient(auth::ClientAuthenticationProcess::Ok),
ready: ReadyState::Open,
local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
name: None,
Expand Down
6 changes: 6 additions & 0 deletions ractor_cluster/src/protocol/control.proto
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ message NodeSessions {
repeated auth.NameMessage sessions = 1;
}

// All state for initial sync has been pushed
message Ready {
}

// Control messages between authenticated `node()`s which are dist-connected
message ControlMessage {
// The message payload
Expand All @@ -96,5 +100,7 @@ message ControlMessage {
auth.NameMessage enumerate_node_sessions = 7;
// The list of node sessions on the remote host for transitive connections
NodeSessions node_sessions = 8;
// All state for initial sync has been pushed
Ready ready = 9;
}
}
4 changes: 2 additions & 2 deletions ractor_cluster_integration_tests/src/tests/pg_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ pub(crate) async fn test(config: PgGroupsConfig) -> i32 {
{
let is_authenticated = ractor::call_t!(
item.actor,
ractor_cluster::NodeSessionMessage::GetAuthenticationState,
ractor_cluster::NodeSessionMessage::GetReadyState,
200
);
match is_authenticated {
Expand All @@ -167,7 +167,7 @@ pub(crate) async fn test(config: PgGroupsConfig) -> i32 {
}
Ok(true) => {
err_code = 0;
tracing::info!("Authentication succeeded. Exiting test");
tracing::info!("Authentication and initial sync succeeded.");
break;
}
}
Expand Down
Loading