Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make latency measurement in ping message robust against time drift #320

Merged
merged 1 commit into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions ractor_cluster/src/node/node_session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
use std::net::SocketAddr;
use std::time::{Instant, SystemTime};

use ractor::message::SerializedMessage;
use ractor::pg::{get_scoped_local_members, which_scopes_and_groups, GroupChangeMessage};
Expand Down Expand Up @@ -500,10 +501,10 @@
.expect("Timestamp missing in Pong")
.try_into()
.expect("Failed to convert Pong(Timestamp) to SystemTime");
let delta_ms = std::time::SystemTime::now()
.duration_since(ts)
.expect("Time went backwards")
.as_millis();
let inst = ts
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Time went backwards");
let delta_ms = (state.epoch.elapsed() - inst).as_millis();

Check warning on line 507 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L504-L507

Added lines #L504 - L507 were not covered by tests
tracing::debug!("Ping -> Pong took {delta_ms}ms");
if delta_ms > 50 {
tracing::warn!(
Expand Down Expand Up @@ -755,6 +756,7 @@
tcp: Option<ActorRef<crate::net::session::SessionMessage>>,
peer_addr: SocketAddr,
local_addr: SocketAddr,
epoch: Instant,
name: Option<auth_protocol::NameMessage>,
auth: AuthenticationState,
remote_actors: HashMap<u64, ActorRef<RemoteActorMessage>>,
Expand Down Expand Up @@ -798,14 +800,15 @@
}

fn schedule_tcp_ping(&self) {
let epoch = self.epoch;

Check warning on line 803 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L803

Added line #L803 was not covered by tests
if let Some(tcp) = &self.tcp {
#[allow(clippy::let_underscore_future)]
let _ = tcp.send_after(Self::get_send_delay(), || {
let _ = tcp.send_after(Self::get_send_delay(), move || {

Check warning on line 806 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L806

Added line #L806 was not covered by tests
let ping = control_protocol::ControlMessage {
msg: Some(control_protocol::control_message::Msg::Ping(
control_protocol::Ping {
timestamp: Some(prost_types::Timestamp::from(
std::time::SystemTime::now(),
SystemTime::UNIX_EPOCH + epoch.elapsed(),

Check warning on line 811 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L811

Added line #L811 was not covered by tests
)),
},
)),
Expand Down Expand Up @@ -861,6 +864,7 @@
remote_actors: HashMap::new(),
peer_addr,
local_addr,
epoch: Instant::now(),

Check warning on line 867 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L867

Added line #L867 was not covered by tests
};

// If a client-connection, startup the handshake
Expand Down
7 changes: 7 additions & 0 deletions ractor_cluster/src/node/node_session/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::sync::{
atomic::{AtomicU8, Ordering},
Arc,
};
use std::time::Instant;

use ractor::concurrency::sleep;

Expand Down Expand Up @@ -113,6 +114,7 @@ async fn node_sesison_client_auth_success() {
name: None,
remote_actors: HashMap::new(),
tcp: None,
epoch: Instant::now(),
};

// Client sends their name, Server responds with Ok
Expand Down Expand Up @@ -257,6 +259,7 @@ async fn node_session_client_auth_session_state_failures() {
name: None,
remote_actors: HashMap::new(),
tcp: None,
epoch: Instant::now(),
};

// Client sends their name, Server responds with Ok
Expand Down Expand Up @@ -386,6 +389,7 @@ async fn node_session_server_auth_success() {
name: None,
remote_actors: HashMap::new(),
tcp: None,
epoch: Instant::now(),
};

// Client sends their name
Expand Down Expand Up @@ -479,6 +483,7 @@ async fn node_session_server_auth_session_state_failures() {
name: None,
remote_actors: HashMap::new(),
tcp: None,
epoch: Instant::now(),
};

// Other session continues, this one dies
Expand Down Expand Up @@ -627,6 +632,7 @@ async fn node_session_handle_node_msg() {
name: None,
remote_actors: HashMap::new(),
tcp: None,
epoch: Instant::now(),
};
// add the "remote" actor
state
Expand Down Expand Up @@ -723,6 +729,7 @@ async fn node_session_handle_control() {
name: None,
remote_actors: HashMap::new(),
tcp: None,
epoch: Instant::now(),
};

// check spawn creates a remote actor
Expand Down
Loading