From 3150e0d0b3756c25869e487fdb5d69d22e3745d3 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Fri, 27 Sep 2024 11:29:33 -0300 Subject: [PATCH] use Notify for a more deterministic approach --- core/node/da_dispatcher/src/da_dispatcher.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/core/node/da_dispatcher/src/da_dispatcher.rs b/core/node/da_dispatcher/src/da_dispatcher.rs index 9e8f064fe67d..27b4c7f27929 100644 --- a/core/node/da_dispatcher/src/da_dispatcher.rs +++ b/core/node/da_dispatcher/src/da_dispatcher.rs @@ -9,7 +9,7 @@ use anyhow::Context; use chrono::Utc; use futures::future::join_all; use rand::Rng; -use tokio::sync::{mpsc, watch::Receiver}; +use tokio::sync::{mpsc, watch::Receiver, Notify}; use zksync_config::DADispatcherConfig; use zksync_da_client::{ types::{DAError, InclusionData}, @@ -122,8 +122,10 @@ impl DataAvailabilityDispatcher { let config = self.config.clone(); let client = self.client.clone(); let request_semaphore = self.request_semaphore.clone(); + let notifier = Arc::new(Notify::new()); let pending_blobs_sender = tokio::spawn(async move { let mut spawned_requests = vec![]; + let notifier = notifier.clone(); loop { if *stop_receiver.borrow() { break; @@ -141,6 +143,7 @@ impl DataAvailabilityDispatcher { let pool = pool.clone(); let config = config.clone(); let next_expected_batch = next_expected_batch.clone(); + let notifier = notifier.clone(); let request = tokio::spawn(async move { let _permit = permit; // move permit into scope let dispatch_latency = METRICS.blob_dispatch_latency.start(); @@ -165,13 +168,7 @@ impl DataAvailabilityDispatcher { while batch.l1_batch_number.0 as i64 > next_expected_batch.load(std::sync::atomic::Ordering::Relaxed) { - tracing::info!( - "batch_number: {} finished DA dispatch, but the current expected batch is: {}, waiting for the correct order", - batch.l1_batch_number, next_expected_batch.load(std::sync::atomic::Ordering::Relaxed)); - // Wait a base time of 5 seconds plus an additional 1 second per batch number difference - let waiting_time = 5 + (batch.l1_batch_number.0 as i64) - - next_expected_batch.load(std::sync::atomic::Ordering::Relaxed); - tokio::time::sleep(Duration::from_secs(waiting_time as u64)).await; + notifier.clone().notified().await; } let mut conn = pool.connection_tagged("da_dispatcher").await?; @@ -186,6 +183,7 @@ impl DataAvailabilityDispatcher { // Update the next expected batch number next_expected_batch.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + notifier.notify_waiters(); METRICS .last_dispatched_l1_batch