diff --git a/ractor_cluster/src/node/node_session/mod.rs b/ractor_cluster/src/node/node_session/mod.rs index 7218442..addf35d 100644 --- a/ractor_cluster/src/node/node_session/mod.rs +++ b/ractor_cluster/src/node/node_session/mod.rs @@ -10,6 +10,7 @@ use std::collections::{HashMap, HashSet}; use std::convert::TryInto; use std::net::SocketAddr; +use std::time::{Instant, SystemTime}; use ractor::message::SerializedMessage; use ractor::pg::{get_scoped_local_members, which_scopes_and_groups, GroupChangeMessage}; @@ -500,10 +501,10 @@ impl NodeSession { .expect("Timestamp missing in Pong") .try_into() .expect("Failed to convert Pong(Timestamp) to SystemTime"); - let delta_ms = std::time::SystemTime::now() - .duration_since(ts) - .expect("Time went backwards") - .as_millis(); + let inst = ts + .duration_since(SystemTime::UNIX_EPOCH) + .expect("Time went backwards"); + let delta_ms = (state.epoch.elapsed() - inst).as_millis(); tracing::debug!("Ping -> Pong took {delta_ms}ms"); if delta_ms > 50 { tracing::warn!( @@ -755,6 +756,7 @@ pub struct NodeSessionState { tcp: Option>, peer_addr: SocketAddr, local_addr: SocketAddr, + epoch: Instant, name: Option, auth: AuthenticationState, remote_actors: HashMap>, @@ -798,14 +800,15 @@ impl NodeSessionState { } fn schedule_tcp_ping(&self) { + let epoch = self.epoch; if let Some(tcp) = &self.tcp { #[allow(clippy::let_underscore_future)] - let _ = tcp.send_after(Self::get_send_delay(), || { + let _ = tcp.send_after(Self::get_send_delay(), move || { let ping = control_protocol::ControlMessage { msg: Some(control_protocol::control_message::Msg::Ping( control_protocol::Ping { timestamp: Some(prost_types::Timestamp::from( - std::time::SystemTime::now(), + SystemTime::UNIX_EPOCH + epoch.elapsed(), )), }, )), @@ -861,6 +864,7 @@ impl Actor for NodeSession { remote_actors: HashMap::new(), peer_addr, local_addr, + epoch: Instant::now(), }; // If a client-connection, startup the handshake diff --git a/ractor_cluster/src/node/node_session/tests.rs b/ractor_cluster/src/node/node_session/tests.rs index 0160b10..30ae7f0 100644 --- a/ractor_cluster/src/node/node_session/tests.rs +++ b/ractor_cluster/src/node/node_session/tests.rs @@ -9,6 +9,7 @@ use std::sync::{ atomic::{AtomicU8, Ordering}, Arc, }; +use std::time::Instant; use ractor::concurrency::sleep; @@ -113,6 +114,7 @@ async fn node_sesison_client_auth_success() { name: None, remote_actors: HashMap::new(), tcp: None, + epoch: Instant::now(), }; // Client sends their name, Server responds with Ok @@ -257,6 +259,7 @@ async fn node_session_client_auth_session_state_failures() { name: None, remote_actors: HashMap::new(), tcp: None, + epoch: Instant::now(), }; // Client sends their name, Server responds with Ok @@ -386,6 +389,7 @@ async fn node_session_server_auth_success() { name: None, remote_actors: HashMap::new(), tcp: None, + epoch: Instant::now(), }; // Client sends their name @@ -479,6 +483,7 @@ async fn node_session_server_auth_session_state_failures() { name: None, remote_actors: HashMap::new(), tcp: None, + epoch: Instant::now(), }; // Other session continues, this one dies @@ -627,6 +632,7 @@ async fn node_session_handle_node_msg() { name: None, remote_actors: HashMap::new(), tcp: None, + epoch: Instant::now(), }; // add the "remote" actor state @@ -723,6 +729,7 @@ async fn node_session_handle_control() { name: None, remote_actors: HashMap::new(), tcp: None, + epoch: Instant::now(), }; // check spawn creates a remote actor