Skip to content

Commit 5b7187f

Browse files
committed
fix: use monolithic time in zmq last_recv time
1 parent 4c3b219 commit 5b7187f

File tree

3 files changed

+72
-52
lines changed

3 files changed

+72
-52
lines changed

Cargo.lock

Lines changed: 34 additions & 34 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/rs-dapi/src/clients/drive_client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ mod tests {
248248
#[tokio::test]
249249
async fn test_drive_client_tracing_integration() {
250250
// Test that DriveClient can be created with tracing interceptor
251-
// Note: This will fail if no server is running, which is expected in unit tests
251+
// Note: This should succeed even if no server is running; connectivity validation logs a warning.
252252
match DriveClient::new("http://localhost:1443").await {
253253
Ok(client) => {
254254
// If connection succeeds, verify the structure

packages/rs-dapi/src/services/streaming_service/zmq_listener.rs

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
//! - subscribers subscribe to events via [`ZmqListener::subscribe`] to receive [`ZmqEvent`]s
1616
//!
1717
use std::future::Future;
18-
use std::sync::Arc;
19-
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
18+
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
19+
use std::sync::{Arc, LazyLock};
2020

2121
use crate::error::{DAPIResult, DapiError};
2222
use crate::sync::Workers;
@@ -29,7 +29,7 @@ use tokio::select;
2929
use tokio::sync::Mutex;
3030
use tokio::sync::broadcast;
3131
use tokio::sync::mpsc;
32-
use tokio::time::{Duration, sleep};
32+
use tokio::time::{Duration, Instant, sleep};
3333
use tokio_util::sync::CancellationToken;
3434
use tracing::span;
3535
use tracing::{debug, trace};
@@ -40,6 +40,9 @@ use zeromq::ZmqMessage;
4040
use zeromq::ZmqResult;
4141
use zeromq::prelude::*;
4242

43+
/// Start time for calculating durations
44+
static START_TIME: LazyLock<Instant> = LazyLock::new(Instant::now);
45+
4346
/// ZMQ topics that we subscribe to from Dash Core
4447
4548
#[derive(Debug, Clone)]
@@ -234,7 +237,7 @@ impl ZmqConnection {
234237
zmq_tx: tx,
235238
cancel: cancel.clone(),
236239
connected: self.connected.clone(),
237-
last_recv: Arc::new(AtomicI64::new(0)),
240+
last_recv: Arc::new(AtomicU64::new(0)),
238241
}
239242
.spawn(&self.workers);
240243
}
@@ -551,7 +554,8 @@ struct ZmqDispatcher {
551554
/// Cancellation token to stop all spawned threads; cancelled when the connection is lost
552555
cancel: CancellationToken,
553556
connected: Arc<AtomicBool>,
554-
last_recv: Arc<AtomicI64>,
557+
/// Time of last received message, in seconds since [START_TIME]
558+
last_recv: Arc<AtomicU64>,
555559
}
556560

557561
impl ZmqDispatcher {
@@ -581,7 +585,7 @@ impl ZmqDispatcher {
581585
return Err(DapiError::ClientGone("ZMQ receiver exited".to_string()));
582586
} else {
583587
// update last received timestamp
584-
self.last_recv.store(chrono::Utc::now().timestamp(), Ordering::SeqCst);
588+
self.last_recv_update();
585589
}
586590
},
587591
Err(e) => {
@@ -603,9 +607,8 @@ impl ZmqDispatcher {
603607

604608
/// Event that happens every ten seconds to check connection status
605609
async fn tick_event_10s(&mut self) {
606-
// first, if we received a message within last 10s, we are connected
607-
let last_recv = self.last_recv.load(Ordering::SeqCst);
608-
if last_recv + 10 >= chrono::Utc::now().timestamp() {
610+
// if we have received a message in less than 10s, we are connected
611+
if self.last_recv_elapsed() < Duration::from_secs(10) {
609612
self.connected.store(true, Ordering::SeqCst);
610613
return;
611614
}
@@ -636,16 +639,33 @@ impl ZmqDispatcher {
636639

637640
// if we are connected, we assume last_recv is now
638641
if current_status {
639-
self.last_recv
640-
.compare_exchange(
641-
last_recv,
642-
chrono::Utc::now().timestamp(),
643-
Ordering::AcqRel,
644-
Ordering::Relaxed,
645-
)
646-
.ok();
642+
self.last_recv_update();
647643
}
648644
}
645+
646+
/// Get duration since last received message.
647+
/// Defaults to [START_TIME] on error.
648+
fn last_recv_elapsed(&self) -> Duration {
649+
let now = Instant::now();
650+
let start_time = *START_TIME;
651+
652+
let last_recv_secs = self.last_recv.load(Ordering::Relaxed);
653+
let last_recv = START_TIME
654+
.checked_add(Duration::from_secs(last_recv_secs))
655+
.unwrap_or_else(|| {
656+
tracing::warn!(?start_time, ?now, "zmq last receive time out of bounds");
657+
*START_TIME
658+
});
659+
660+
now.duration_since(last_recv)
661+
}
662+
663+
/// Update the last received timestamp
664+
fn last_recv_update(&self) {
665+
let duration = Instant::now().duration_since(*START_TIME);
666+
667+
self.last_recv.store(duration.as_secs(), Ordering::Relaxed);
668+
}
649669
}
650670

651671
/// Helper function to run a future with cancellation support.

0 commit comments

Comments
 (0)