Skip to content

Commit

Permalink
Make latency measurement in ping message robust against time drift
Browse files Browse the repository at this point in the history
Time drift happens and time going backwards is common on NTP enabled
servers. When time drift happens it's not worth it to kill the node
session.

Change the timestamp payload to use duration against NodeSession
specific Instant epoch initialized at creation time. This ensures
the timestamp increases monotonically. It can only go backwards
when the payload is malformed.

Signed-off-by: Kan-Ru Chen <[email protected]>
  • Loading branch information
kanru committed Jan 9, 2025
1 parent b0cd1a0 commit ad44a78
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 6 deletions.
16 changes: 10 additions & 6 deletions ractor_cluster/src/node/node_session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
use std::net::SocketAddr;
use std::time::{Instant, SystemTime};

use ractor::message::SerializedMessage;
use ractor::pg::{get_scoped_local_members, which_scopes_and_groups, GroupChangeMessage};
Expand Down Expand Up @@ -500,10 +501,10 @@ impl NodeSession {
.expect("Timestamp missing in Pong")
.try_into()
.expect("Failed to convert Pong(Timestamp) to SystemTime");
let delta_ms = std::time::SystemTime::now()
.duration_since(ts)
.expect("Time went backwards")
.as_millis();
let inst = ts
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Time went backwards");
let delta_ms = (state.epoch.elapsed() - inst).as_millis();

Check warning on line 507 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L504-L507

Added lines #L504 - L507 were not covered by tests
tracing::debug!("Ping -> Pong took {delta_ms}ms");
if delta_ms > 50 {
tracing::warn!(
Expand Down Expand Up @@ -755,6 +756,7 @@ pub struct NodeSessionState {
tcp: Option<ActorRef<crate::net::session::SessionMessage>>,
peer_addr: SocketAddr,
local_addr: SocketAddr,
epoch: Instant,
name: Option<auth_protocol::NameMessage>,
auth: AuthenticationState,
remote_actors: HashMap<u64, ActorRef<RemoteActorMessage>>,
Expand Down Expand Up @@ -798,14 +800,15 @@ impl NodeSessionState {
}

fn schedule_tcp_ping(&self) {
let epoch = self.epoch;

Check warning on line 803 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L803

Added line #L803 was not covered by tests
if let Some(tcp) = &self.tcp {
#[allow(clippy::let_underscore_future)]
let _ = tcp.send_after(Self::get_send_delay(), || {
let _ = tcp.send_after(Self::get_send_delay(), move || {

Check warning on line 806 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L806

Added line #L806 was not covered by tests
let ping = control_protocol::ControlMessage {
msg: Some(control_protocol::control_message::Msg::Ping(
control_protocol::Ping {
timestamp: Some(prost_types::Timestamp::from(
std::time::SystemTime::now(),
SystemTime::UNIX_EPOCH + epoch.elapsed(),

Check warning on line 811 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L811

Added line #L811 was not covered by tests
)),
},
)),
Expand Down Expand Up @@ -861,6 +864,7 @@ impl Actor for NodeSession {
remote_actors: HashMap::new(),
peer_addr,
local_addr,
epoch: Instant::now(),

Check warning on line 867 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L867

Added line #L867 was not covered by tests
};

// If a client-connection, startup the handshake
Expand Down
7 changes: 7 additions & 0 deletions ractor_cluster/src/node/node_session/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::sync::{
atomic::{AtomicU8, Ordering},
Arc,
};
use std::time::Instant;

use ractor::concurrency::sleep;

Expand Down Expand Up @@ -113,6 +114,7 @@ async fn node_sesison_client_auth_success() {
name: None,
remote_actors: HashMap::new(),
tcp: None,
epoch: Instant::now(),
};

// Client sends their name, Server responds with Ok
Expand Down Expand Up @@ -257,6 +259,7 @@ async fn node_session_client_auth_session_state_failures() {
name: None,
remote_actors: HashMap::new(),
tcp: None,
epoch: Instant::now(),
};

// Client sends their name, Server responds with Ok
Expand Down Expand Up @@ -386,6 +389,7 @@ async fn node_session_server_auth_success() {
name: None,
remote_actors: HashMap::new(),
tcp: None,
epoch: Instant::now(),
};

// Client sends their name
Expand Down Expand Up @@ -479,6 +483,7 @@ async fn node_session_server_auth_session_state_failures() {
name: None,
remote_actors: HashMap::new(),
tcp: None,
epoch: Instant::now(),
};

// Other session continues, this one dies
Expand Down Expand Up @@ -627,6 +632,7 @@ async fn node_session_handle_node_msg() {
name: None,
remote_actors: HashMap::new(),
tcp: None,
epoch: Instant::now(),
};
// add the "remote" actor
state
Expand Down Expand Up @@ -723,6 +729,7 @@ async fn node_session_handle_control() {
name: None,
remote_actors: HashMap::new(),
tcp: None,
epoch: Instant::now(),
};

// check spawn creates a remote actor
Expand Down

0 comments on commit ad44a78

Please sign in to comment.