From 7bd6cb88264099ea6dbaeacc530b104f3a96254f Mon Sep 17 00:00:00 2001 From: benjamin-stacks <246469650+benjamin-stacks@users.noreply.github.com> Date: Mon, 22 Dec 2025 16:12:06 +0100 Subject: [PATCH 01/14] refactor: remove the ability to use event dispatcher without DB This ability was only used during tests; in actual production, there is always a DB. Not having to support the DB-less use is going to make it a lot cleaner to use the DB as a queue (see #6543). This commit also makes the method names and order of operations a little less confusing -- we had `send_payload()`, `send_payload_directly()`, `send_payload_given_db_path()` (ok that one was my fault), and `send_payload_with_bytes()`. Now we have this instead: ``` dispatch_to_observer() -> get_payload_bytes() -> save_to_db() -> make_http_request_and_delete_from_db() -> make_http_request() -> delete_from_db() ``` --- stacks-node/src/event_dispatcher.rs | 316 ++++++++++++---------- stacks-node/src/event_dispatcher/tests.rs | 117 +++----- stacks-node/src/nakamoto_node/miner.rs | 6 +- stacks-node/src/node.rs | 2 +- stacks-node/src/run_loop/nakamoto.rs | 2 +- stacks-node/src/run_loop/neon.rs | 2 +- 6 files changed, 222 insertions(+), 223 deletions(-) diff --git a/stacks-node/src/event_dispatcher.rs b/stacks-node/src/event_dispatcher.rs index e7aaff025db..d043da13d2f 100644 --- a/stacks-node/src/event_dispatcher.rs +++ b/stacks-node/src/event_dispatcher.rs @@ -16,6 +16,7 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; +use std::fmt; use std::path::PathBuf; #[cfg(test)] use std::sync::mpsc::channel; @@ -84,6 +85,51 @@ lazy_static! { pub static ref TEST_SKIP_BLOCK_ANNOUNCEMENT: TestFlag = TestFlag::default(); } +#[derive(Debug)] +enum EventDispatcherError { + SerializationError(serde_json::Error), + HttpError(std::io::Error), + DbError(stacks::util_lib::db::Error), +} + +impl fmt::Display for EventDispatcherError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + EventDispatcherError::SerializationError(ref e) => fmt::Display::fmt(e, f), + EventDispatcherError::HttpError(ref e) => fmt::Display::fmt(e, f), + EventDispatcherError::DbError(ref e) => fmt::Display::fmt(e, f), + } + } +} + +impl core::error::Error for EventDispatcherError { + fn cause(&self) -> Option<&dyn core::error::Error> { + match *self { + EventDispatcherError::SerializationError(ref e) => Some(e), + EventDispatcherError::HttpError(ref e) => Some(e), + EventDispatcherError::DbError(ref e) => Some(e), + } + } +} + +impl From for EventDispatcherError { + fn from(value: serde_json::Error) -> Self { + EventDispatcherError::SerializationError(value) + } +} + +impl From for EventDispatcherError { + fn from(value: stacks::util_lib::db::Error) -> Self { + EventDispatcherError::DbError(value) + } +} + +impl From for EventDispatcherError { + fn from(value: std::io::Error) -> Self { + EventDispatcherError::HttpError(value) + } +} + #[derive(Debug, Clone)] struct EventObserver { /// URL to which events will be sent @@ -157,16 +203,15 @@ pub struct EventDispatcher { block_proposal_observers_lookup: HashSet, /// Channel for sending StackerDB events to the miner coordinator pub stackerdb_channel: Arc>, - /// Path to the database where pending payloads are stored. If `None`, then - /// the database is not used and events are not recoverable across restarts. - db_path: Option, + /// Path to the database where pending payloads are stored. + db_path: PathBuf, } /// This struct is used specifically for receiving proposal responses. /// It's constructed separately to play nicely with threading. struct ProposalCallbackHandler { observers: Vec, - db_path: Option, + dispatcher: EventDispatcher, } impl ProposalCallbackReceiver for ProposalCallbackHandler { @@ -183,13 +228,8 @@ impl ProposalCallbackReceiver for ProposalCallbackHandler { }; for observer in self.observers.iter() { - EventDispatcher::send_payload_given_db_path( - &self.db_path, - observer, - &response, - PATH_PROPOSAL_RESPONSE, - None, - ); + self.dispatcher + .dispatch_to_observer(observer, &response, PATH_PROPOSAL_RESPONSE); } } } @@ -280,7 +320,7 @@ impl MemPoolEventDispatcher for EventDispatcher { } let handler = ProposalCallbackHandler { observers: callback_receivers, - db_path: self.db_path.clone(), + dispatcher: self.clone(), }; Some(Box::new(handler)) } @@ -361,20 +401,12 @@ impl BlockEventDispatcher for EventDispatcher { } } -impl Default for EventDispatcher { - fn default() -> Self { - EventDispatcher::new(None) - } -} - impl EventDispatcher { - pub fn new(working_dir: Option) -> EventDispatcher { - let db_path = if let Some(mut db_path) = working_dir { - db_path.push("event_observers.sqlite"); - Some(db_path) - } else { - None - }; + pub fn new(working_dir: PathBuf) -> EventDispatcher { + let mut db_path = working_dir; + db_path.push("event_observers.sqlite"); + EventDispatcherDbConnection::new(&db_path).expect("Failed to initialize database"); + EventDispatcher { stackerdb_channel: Arc::new(Mutex::new(StackerDBChannel::new())), registered_observers: vec![], @@ -603,11 +635,10 @@ impl EventDispatcher { ); // Send payload - self.send_payload( + self.dispatch_to_observer( &self.registered_observers[observer_id], &payload, PATH_BLOCK_PROCESSED, - None, ); } } @@ -1010,11 +1041,8 @@ impl EventDispatcher { /// Process any pending payloads in the database. /// This is called when the event dispatcher is first instantiated. pub fn process_pending_payloads(&self) { - let Some(db_path) = &self.db_path else { - return; - }; let conn = - EventDispatcherDbConnection::new(db_path).expect("Failed to initialize database"); + EventDispatcherDbConnection::new(&self.db_path).expect("Failed to initialize database"); let pending_payloads = match conn.get_pending_payloads() { Ok(payloads) => payloads, Err(e) => { @@ -1062,35 +1090,50 @@ impl EventDispatcher { continue; }; - Self::send_payload_with_bytes( - &self.db_path, - observer, - payload_bytes, - full_url.path(), - Some(id), + self.make_http_request_and_delete_from_db( + &payload_bytes, + full_url.as_str(), + observer.timeout, + observer.disable_retries, + id, ); + } + } - #[cfg(test)] - if TEST_EVENT_OBSERVER_SKIP_RETRY.get() { - warn!("Fault injection: delete_payload"); - return; - } - - if let Err(e) = conn.delete_payload(id) { + fn dispatch_to_observer( + &self, + event_observer: &EventObserver, + payload: &serde_json::Value, + path: &str, + ) { + let full_url = Self::get_full_url(event_observer, path); + let bytes = match Self::get_payload_bytes(payload) { + Ok(bytes) => bytes, + Err(err) => { error!( - "Event observer: failed to delete pending payload from database"; - "error" => ?e + "Event dispatcher: failed to serialize payload"; "path" => path, "error" => ?err ); + return; } - } + }; + + let id = self.save_to_db(&full_url, bytes.as_ref(), event_observer.timeout); + + self.make_http_request_and_delete_from_db( + &bytes, + &full_url, + event_observer.timeout, + event_observer.disable_retries, + id, + ); } - fn send_payload_directly( + fn make_http_request( payload_bytes: &Arc<[u8]>, full_url: &str, timeout: Duration, disable_retries: bool, - ) -> bool { + ) -> Result<(), EventDispatcherError> { debug!( "Event dispatcher: Sending payload"; "url" => %full_url, "bytes" => payload_bytes.len() ); @@ -1137,20 +1180,18 @@ impl EventDispatcher { "backoff" => ?backoff, "attempts" => attempts ); + if disable_retries { + warn!("Observer is configured in disable_retries mode: skipping retry of payload"); + return Err(err.into()); + } + #[cfg(test)] + if TEST_EVENT_OBSERVER_SKIP_RETRY.get() { + warn!("Fault injection: skipping retry of payload"); + return Err(err.into()); + } } } - if disable_retries { - warn!("Observer is configured in disable_retries mode: skipping retry of payload"); - return false; - } - - #[cfg(test)] - if TEST_EVENT_OBSERVER_SKIP_RETRY.get() { - warn!("Fault injection: skipping retry of payload"); - return false; - } - sleep(backoff); let jitter: u64 = rand::thread_rng().gen_range(0..100); backoff = std::cmp::min( @@ -1159,105 +1200,86 @@ impl EventDispatcher { ); attempts = attempts.saturating_add(1); } - true - } - fn send_payload( - &self, - event_observer: &EventObserver, - payload: &serde_json::Value, - path: &str, - id: Option, - ) { - Self::send_payload_given_db_path(&self.db_path, event_observer, payload, path, id); + Ok(()) } - fn send_payload_given_db_path( - db_path: &Option, - event_observer: &EventObserver, - payload: &serde_json::Value, - path: &str, - id: Option, - ) { - let payload_bytes = match serde_json::to_vec(payload) { - Ok(bytes) => Arc::<[u8]>::from(bytes), - Err(err) => { - error!( - "Event dispatcher: failed to serialize payload"; "path" => path, "error" => ?err - ); - return; - } - }; - Self::send_payload_with_bytes(db_path, event_observer, payload_bytes, path, id); + fn get_payload_bytes(payload: &serde_json::Value) -> Result, EventDispatcherError> { + let payload_bytes = serde_json::to_vec(payload)?; + Ok(Arc::<[u8]>::from(payload_bytes)) } - fn send_payload_with_bytes( - db_path: &Option, - event_observer: &EventObserver, - payload_bytes: Arc<[u8]>, - path: &str, - id: Option, - ) { - // Construct the full URL + fn get_full_url(event_observer: &EventObserver, path: &str) -> String { let url_str = if path.starts_with('/') { format!("{}{path}", &event_observer.endpoint) } else { format!("{}/{path}", &event_observer.endpoint) }; - let full_url = format!("http://{url_str}"); - - // if the observer is in "disable_retries" mode quickly send the payload without checking for the db - if event_observer.disable_retries { - Self::send_payload_directly(&payload_bytes, &full_url, event_observer.timeout, true); - } else if let Some(db_path) = db_path { - // Because the DB is initialized in the call to process_pending_payloads() during startup, - // it is *probably* ok to skip initialization here. That said, at the time of writing this is the - // only call to new_without_init(), and we might want to revisit the question whether it's - // really worth it. - let conn = EventDispatcherDbConnection::new_without_init(db_path) - .expect("Failed to open database for event observer"); - - let id = match id { - Some(id) => id, - None => { - conn.insert_payload_with_retry( - &full_url, - payload_bytes.as_ref(), - event_observer.timeout, - ); - conn.last_insert_rowid() - } - }; + format!("http://{url_str}") + } - let success = Self::send_payload_directly( - &payload_bytes, - &full_url, - event_observer.timeout, - false, - ); - // This is only `false` when the TestFlag is set to skip retries - if !success { - return; - } + fn save_to_db(&self, url: &str, payload_bytes: &[u8], timeout: Duration) -> i64 { + // Because the DB is initialized in the call to process_pending_payloads() during startup, + // it is *probably* ok to skip initialization here. That said, at the time of writing this is the + // only call to new_without_init(), and we might want to revisit the question whether it's + // really worth it. + let conn = EventDispatcherDbConnection::new_without_init(&self.db_path) + .expect("Failed to open database for event observer"); - if let Err(e) = conn.delete_payload(id) { - error!( - "Event observer: failed to delete pending payload from database"; - "error" => ?e - ); - } - } else { - // No database, just send the payload - Self::send_payload_directly(&payload_bytes, &full_url, event_observer.timeout, false); + conn.insert_payload_with_retry(&url, payload_bytes.as_ref(), timeout); + conn.last_insert_rowid() + } + + fn make_http_request_and_delete_from_db( + &self, + payload_bytes: &Arc<[u8]>, + full_url: &str, + timeout: Duration, + disable_retries: bool, + id: i64, + ) { + let http_result = + Self::make_http_request(payload_bytes, full_url, timeout, disable_retries); + + if let Err(err) = http_result { + // log but continue + error!("EventDispatcher: dispatching failed"; "url" => &full_url, "error" => ?err); } + + #[cfg(test)] + if TEST_EVENT_OBSERVER_SKIP_RETRY.get() { + warn!("Fault injection: skipping deletion of payload"); + return; + } + + // We're deleting regardless of result -- if retries are disabled, that means + // we're supposed to forget about it in case of failure. If they're not disabled, + // then we wouldn't be here in case of failue, because `make_http_request` retries + // until it's successful (with the exception of the above fault injection which + // simulates a shutdown). + let deletion_result = self.delete_from_db(id); + + if let Err(e) = deletion_result { + error!( + "Event observer: failed to delete pending payload from database"; + "error" => ?e + ); + } + } + + fn delete_from_db(&self, id: i64) -> Result<(), EventDispatcherError> { + let conn = EventDispatcherDbConnection::new_without_init(&self.db_path) + .expect("Failed to open database for event observer"); + conn.delete_payload(id)?; + Ok(()) } fn send_new_attachments(&self, event_observer: &EventObserver, payload: &serde_json::Value) { - self.send_payload(event_observer, payload, PATH_ATTACHMENT_PROCESSED, None); + self.dispatch_to_observer(event_observer, payload, PATH_ATTACHMENT_PROCESSED); } fn send_new_mempool_txs(&self, event_observer: &EventObserver, payload: &serde_json::Value) { - self.send_payload(event_observer, payload, PATH_MEMPOOL_TX_SUBMIT, None); + self.dispatch_to_observer(event_observer, payload, PATH_MEMPOOL_TX_SUBMIT); } /// Serializes new microblocks data into a JSON payload and sends it off to the correct path @@ -1290,7 +1312,7 @@ impl EventDispatcher { "burn_block_timestamp": burn_block_timestamp, }); - self.send_payload(event_observer, &payload, PATH_MICROBLOCK_SUBMIT, None); + self.dispatch_to_observer(event_observer, &payload, PATH_MICROBLOCK_SUBMIT); } fn send_dropped_mempool_txs( @@ -1298,15 +1320,15 @@ impl EventDispatcher { event_observer: &EventObserver, payload: &serde_json::Value, ) { - self.send_payload(event_observer, payload, PATH_MEMPOOL_TX_DROP, None); + self.dispatch_to_observer(event_observer, payload, PATH_MEMPOOL_TX_DROP); } fn send_mined_block(&self, event_observer: &EventObserver, payload: &serde_json::Value) { - self.send_payload(event_observer, payload, PATH_MINED_BLOCK, None); + self.dispatch_to_observer(event_observer, payload, PATH_MINED_BLOCK); } fn send_mined_microblock(&self, event_observer: &EventObserver, payload: &serde_json::Value) { - self.send_payload(event_observer, payload, PATH_MINED_MICROBLOCK, None); + self.dispatch_to_observer(event_observer, payload, PATH_MINED_MICROBLOCK); } fn send_mined_nakamoto_block( @@ -1314,15 +1336,15 @@ impl EventDispatcher { event_observer: &EventObserver, payload: &serde_json::Value, ) { - self.send_payload(event_observer, payload, PATH_MINED_NAKAMOTO_BLOCK, None); + self.dispatch_to_observer(event_observer, payload, PATH_MINED_NAKAMOTO_BLOCK); } fn send_stackerdb_chunks(&self, event_observer: &EventObserver, payload: &serde_json::Value) { - self.send_payload(event_observer, payload, PATH_STACKERDB_CHUNKS, None); + self.dispatch_to_observer(event_observer, payload, PATH_STACKERDB_CHUNKS); } fn send_new_burn_block(&self, event_observer: &EventObserver, payload: &serde_json::Value) { - self.send_payload(event_observer, payload, PATH_BURN_BLOCK_SUBMIT, None); + self.dispatch_to_observer(event_observer, payload, PATH_BURN_BLOCK_SUBMIT); } } diff --git a/stacks-node/src/event_dispatcher/tests.rs b/stacks-node/src/event_dispatcher/tests.rs index da24ec8a73e..7908ca364f8 100644 --- a/stacks-node/src/event_dispatcher/tests.rs +++ b/stacks-node/src/event_dispatcher/tests.rs @@ -238,7 +238,7 @@ fn test_process_pending_payloads() { info!("endpoint: {}", endpoint); let timeout = Duration::from_secs(5); - let mut dispatcher = EventDispatcher::new(Some(dir.path().to_path_buf())); + let mut dispatcher = EventDispatcher::new(dir.path().to_path_buf()); dispatcher.register_observer(&EventObserverConfig { endpoint: endpoint.clone(), @@ -290,7 +290,7 @@ fn pending_payloads_are_skipped_if_url_does_not_match() { let mut server = mockito::Server::new(); let endpoint = server.host_with_port(); let timeout = Duration::from_secs(5); - let mut dispatcher = EventDispatcher::new(Some(dir.path().to_path_buf())); + let mut dispatcher = EventDispatcher::new(dir.path().to_path_buf()); dispatcher.register_observer(&EventObserverConfig { endpoint: endpoint.clone(), @@ -343,18 +343,13 @@ fn pending_payloads_are_skipped_if_url_does_not_match() { fn test_new_event_dispatcher_with_db() { let dir = tempdir().unwrap(); let working_dir = dir.path().to_path_buf(); - - let dispatcher = EventDispatcher::new(Some(working_dir.clone())); - let expected_db_path = working_dir.join("event_observers.sqlite"); - assert_eq!(dispatcher.db_path, Some(expected_db_path.clone())); - assert!( - !expected_db_path.exists(), - "Database file was created too soon" - ); + assert!(!expected_db_path.exists(), "Database file already exists"); - EventDispatcherDbConnection::new(&expected_db_path).expect("Failed to initialize the database"); + let dispatcher = EventDispatcher::new(working_dir.clone()); + + assert_eq!(dispatcher.db_path, expected_db_path.clone()); // Verify that the database was initialized assert!(expected_db_path.exists(), "Database file was not created"); @@ -373,13 +368,6 @@ fn test_new_event_observer() { assert_eq!(observer.disable_retries, false); } -#[test] -fn test_new_event_dispatcher_without_db() { - let dispatcher = EventDispatcher::new(None); - - assert!(dispatcher.db_path.is_none(), "Expected db_path to be None"); -} - #[test] #[serial] fn test_send_payload_with_db() { @@ -389,9 +377,7 @@ fn test_send_payload_with_db() { let working_dir = dir.path().to_path_buf(); let payload = json!({"key": "value"}); - let dispatcher = EventDispatcher::new(Some(working_dir.clone())); - let db_path = dispatcher.clone().db_path.clone().unwrap(); - EventDispatcherDbConnection::new(&db_path).expect("Failed to initialize the database"); + let dispatcher = EventDispatcher::new(working_dir.clone()); // Create a mock server let mut server = mockito::Server::new(); @@ -410,13 +396,13 @@ fn test_send_payload_with_db() { TEST_EVENT_OBSERVER_SKIP_RETRY.set(false); // Call send_payload - dispatcher.send_payload(&observer, &payload, "/test", None); + dispatcher.dispatch_to_observer(&observer, &payload, "/test"); // Verify that the payload was sent and database is empty _m.assert(); // Verify that the database is empty - let db_path = dispatcher.db_path.unwrap(); + let db_path = dispatcher.db_path; let db_path_str = db_path.to_str().unwrap(); let conn = Connection::open(db_path_str).expect("Failed to open database"); let pending_payloads = EventDispatcherDbConnection::new_from_exisiting_connection(conn) @@ -425,35 +411,6 @@ fn test_send_payload_with_db() { assert_eq!(pending_payloads.len(), 0, "Expected no pending payloads"); } -#[test] -fn test_send_payload_without_db() { - use mockito::Matcher; - - let timeout = Duration::from_secs(5); - let payload = json!({"key": "value"}); - - // Create a mock server - let mut server = mockito::Server::new(); - let _m = server - .mock("POST", "/test") - .match_header("content-type", Matcher::Regex("application/json.*".into())) - .match_body(Matcher::Json(payload.clone())) - .with_status(200) - .create(); - - let endpoint = server.url().strip_prefix("http://").unwrap().to_string(); - - let observer = EventObserver::new(endpoint, timeout, false); - - let dispatcher = EventDispatcher::new(None); - - // Call send_payload - dispatcher.send_payload(&observer, &payload, "/test", None); - - // Verify that the payload was sent - _m.assert(); -} - #[test] fn test_send_payload_success() { let port = get_random_port(); @@ -480,9 +437,11 @@ fn test_send_payload_success() { let payload = json!({"key": "value"}); - let dispatcher = EventDispatcher::new(None); + let dir = tempdir().unwrap(); + let working_dir = dir.path().to_path_buf(); + let dispatcher = EventDispatcher::new(working_dir); - dispatcher.send_payload(&observer, &payload, "/test", None); + dispatcher.dispatch_to_observer(&observer, &payload, "/test"); // Wait for the server to process the request rx.recv_timeout(Duration::from_secs(5)) @@ -498,7 +457,7 @@ fn test_send_payload_retry() { // Start a mock server in a separate thread let server = Server::http(format!("127.0.0.1:{port}")).unwrap(); - thread::spawn(move || { + let thread = thread::spawn(move || { let mut attempt = 0; while let Ok(request) = server.recv() { attempt += 1; @@ -530,13 +489,17 @@ fn test_send_payload_retry() { let payload = json!({"key": "value"}); - let dispatcher = EventDispatcher::new(None); + let dir = tempdir().unwrap(); + let working_dir = dir.path().to_path_buf(); + let dispatcher = EventDispatcher::new(working_dir); - dispatcher.send_payload(&observer, &payload, "/test", None); + dispatcher.dispatch_to_observer(&observer, &payload, "/test"); // Wait for the server to process the request rx.recv_timeout(Duration::from_secs(5)) .expect("Server did not receive request in time"); + + thread.join().unwrap(); } #[test] @@ -550,7 +513,7 @@ fn test_send_payload_timeout() { // Start a mock server in a separate thread let server = Server::http(format!("127.0.0.1:{port}")).unwrap(); - thread::spawn(move || { + let thread = thread::spawn(move || { let mut attempt = 0; // This exists to only keep request from being dropped #[allow(clippy::collection_is_never_read)] @@ -582,10 +545,12 @@ fn test_send_payload_timeout() { // Record the time before sending the payload let start_time = Instant::now(); - let dispatcher = EventDispatcher::new(None); + let dir = tempdir().unwrap(); + let working_dir = dir.path().to_path_buf(); + let dispatcher = EventDispatcher::new(working_dir); // Call the function being tested - dispatcher.send_payload(&observer, &payload, "/test", None); + dispatcher.dispatch_to_observer(&observer, &payload, "/test"); // Record the time after the function returns let elapsed_time = start_time.elapsed(); @@ -604,6 +569,8 @@ fn test_send_payload_timeout() { // Wait for the server to process the request rx.recv_timeout(Duration::from_secs(5)) .expect("Server did not receive request in time"); + + thread.join().unwrap(); } #[test] @@ -620,7 +587,7 @@ fn test_send_payload_with_db_force_restart() { info!("Starting mock server on port {port}"); // Start a mock server in a separate thread let server = Server::http(format!("127.0.0.1:{port}")).unwrap(); - thread::spawn(move || { + let thread = thread::spawn(move || { let mut attempt = 0; // This exists to only keep request from being dropped #[allow(clippy::collection_is_never_read)] @@ -670,7 +637,7 @@ fn test_send_payload_with_db_force_restart() { } }); - let mut dispatcher = EventDispatcher::new(Some(working_dir.clone())); + let mut dispatcher = EventDispatcher::new(working_dir.clone()); let observer = dispatcher.register_observer_private(&EventObserverConfig { endpoint: format!("127.0.0.1:{port}"), @@ -679,7 +646,7 @@ fn test_send_payload_with_db_force_restart() { disable_retries: false, }); - EventDispatcherDbConnection::new(&dispatcher.clone().db_path.unwrap()).unwrap(); + EventDispatcherDbConnection::new(&dispatcher.clone().db_path).unwrap(); let payload = json!({"key": "value"}); let payload2 = json!({"key": "value2"}); @@ -691,7 +658,7 @@ fn test_send_payload_with_db_force_restart() { info!("Sending payload 1"); // Send the payload - dispatcher.send_payload(&observer, &payload, "/test", None); + dispatcher.dispatch_to_observer(&observer, &payload, "/test"); // Re-enable retrying TEST_EVENT_OBSERVER_SKIP_RETRY.set(false); @@ -701,11 +668,13 @@ fn test_send_payload_with_db_force_restart() { info!("Sending payload 2"); // Send another payload - dispatcher.send_payload(&observer, &payload2, "/test", None); + dispatcher.dispatch_to_observer(&observer, &payload2, "/test"); // Wait for the server to process the requests rx.recv_timeout(Duration::from_secs(5)) .expect("Server did not receive request in time"); + + thread.join().unwrap(); } #[test] @@ -721,10 +690,12 @@ fn test_event_dispatcher_disable_retries() { let observer = EventObserver::new(endpoint, timeout, true); - let dispatcher = EventDispatcher::new(None); + let dir = tempdir().unwrap(); + let working_dir = dir.path().to_path_buf(); + let dispatcher = EventDispatcher::new(working_dir); // in non "disable_retries" mode this will run forever - dispatcher.send_payload(&observer, &payload, "/test", None); + dispatcher.dispatch_to_observer(&observer, &payload, "/test"); // Verify that the payload was sent _m.assert(); @@ -739,10 +710,12 @@ fn test_event_dispatcher_disable_retries_invalid_url() { let observer = EventObserver::new(endpoint, timeout, true); - let dispatcher = EventDispatcher::new(None); + let dir = tempdir().unwrap(); + let working_dir = dir.path().to_path_buf(); + let dispatcher = EventDispatcher::new(working_dir); // in non "disable_retries" mode this will run forever - dispatcher.send_payload(&observer, &payload, "/test", None); + dispatcher.dispatch_to_observer(&observer, &payload, "/test"); } #[test] @@ -752,7 +725,7 @@ fn block_event_with_disable_retries_observer() { let dir = tempdir().unwrap(); let working_dir = dir.path().to_path_buf(); - let mut event_dispatcher = EventDispatcher::new(Some(working_dir.clone())); + let mut event_dispatcher = EventDispatcher::new(working_dir.clone()); let config = EventObserverConfig { endpoint: String::from("255.255.255.255"), events_keys: vec![EventKeyType::MinedBlocks], @@ -952,8 +925,8 @@ fn test_block_proposal_validation_event() { let mock = server.mock("POST", "/proposal_response").create(); let endpoint = server.url().strip_prefix("http://").unwrap().to_string(); - - let mut dispatcher = EventDispatcher::new(None); + let dir = tempdir().unwrap(); + let mut dispatcher = EventDispatcher::new(dir.path().to_path_buf()); dispatcher.register_observer(&EventObserverConfig { endpoint: endpoint.clone(), events_keys: vec![EventKeyType::BlockProposal], diff --git a/stacks-node/src/nakamoto_node/miner.rs b/stacks-node/src/nakamoto_node/miner.rs index e601ceb2558..40e77490a59 100644 --- a/stacks-node/src/nakamoto_node/miner.rs +++ b/stacks-node/src/nakamoto_node/miner.rs @@ -58,6 +58,8 @@ use stacks_common::types::{PrivateKey, StacksEpochId}; #[cfg(test)] use stacks_common::util::tests::TestFlag; use stacks_common::util::vrf::VRFProof; +#[cfg(test)] +use tempfile::tempdir; use super::miner_db::MinerDB; use super::relayer::{MinerStopHandle, RelayerThread}; @@ -2055,6 +2057,8 @@ fn should_read_count_extend_units() { let (relay_sender, _rcv_2) = std::sync::mpsc::sync_channel(1); let (_coord_rcv, coord_comms) = stacks::chainstate::coordinator::comm::CoordinatorCommunication::instantiate(); + let working_dir = tempdir().unwrap(); + let mut miner = BlockMinerThread { config: Config::default(), globals: Globals::new( @@ -2087,7 +2091,7 @@ fn should_read_count_extend_units() { burn_election_block: BlockSnapshot::empty(), burn_block: BlockSnapshot::empty(), parent_tenure_id: StacksBlockId([0; 32]), - event_dispatcher: EventDispatcher::new(None), + event_dispatcher: EventDispatcher::new(working_dir.path().to_path_buf()), reason: MinerReason::Extended { burn_view_consensus_hash: ConsensusHash([0; 20]), }, diff --git a/stacks-node/src/node.rs b/stacks-node/src/node.rs index 1711dc297a7..a150ec64fb1 100644 --- a/stacks-node/src/node.rs +++ b/stacks-node/src/node.rs @@ -338,7 +338,7 @@ impl Node { ) .expect("FATAL: failed to initiate mempool"); - let mut event_dispatcher = EventDispatcher::new(Some(config.get_working_dir())); + let mut event_dispatcher = EventDispatcher::new(config.get_working_dir()); for observer in &config.events_observers { event_dispatcher.register_observer(observer); diff --git a/stacks-node/src/run_loop/nakamoto.rs b/stacks-node/src/run_loop/nakamoto.rs index 3f07ecc92d7..6c9e3ea1663 100644 --- a/stacks-node/src/run_loop/nakamoto.rs +++ b/stacks-node/src/run_loop/nakamoto.rs @@ -91,7 +91,7 @@ impl RunLoop { config.burnchain.burn_fee_cap, ))); - let mut event_dispatcher = EventDispatcher::new(Some(config.get_working_dir())); + let mut event_dispatcher = EventDispatcher::new(config.get_working_dir()); for observer in config.events_observers.iter() { event_dispatcher.register_observer(observer); } diff --git a/stacks-node/src/run_loop/neon.rs b/stacks-node/src/run_loop/neon.rs index 6ac6f4d9248..0a33aedc4bf 100644 --- a/stacks-node/src/run_loop/neon.rs +++ b/stacks-node/src/run_loop/neon.rs @@ -314,7 +314,7 @@ impl RunLoop { config.burnchain.burn_fee_cap, ))); - let mut event_dispatcher = EventDispatcher::new(Some(config.get_working_dir())); + let mut event_dispatcher = EventDispatcher::new(config.get_working_dir()); for observer in config.events_observers.iter() { event_dispatcher.register_observer(observer); } From a48a100bf2bd0ae4d2beacba1a8b030ceaeca191 Mon Sep 17 00:00:00 2001 From: benjamin-stacks <246469650+benjamin-stacks@users.noreply.github.com> Date: Mon, 22 Dec 2025 17:46:10 +0100 Subject: [PATCH 02/14] chore: return inserted event payload id directly from the insertion --- stacks-node/src/event_dispatcher.rs | 3 +-- stacks-node/src/event_dispatcher/db.rs | 27 +++++++++++++------------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/stacks-node/src/event_dispatcher.rs b/stacks-node/src/event_dispatcher.rs index d043da13d2f..acf3782656b 100644 --- a/stacks-node/src/event_dispatcher.rs +++ b/stacks-node/src/event_dispatcher.rs @@ -1226,8 +1226,7 @@ impl EventDispatcher { let conn = EventDispatcherDbConnection::new_without_init(&self.db_path) .expect("Failed to open database for event observer"); - conn.insert_payload_with_retry(&url, payload_bytes.as_ref(), timeout); - conn.last_insert_rowid() + conn.insert_payload_with_retry(&url, payload_bytes.as_ref(), timeout) } fn make_http_request_and_delete_from_db( diff --git a/stacks-node/src/event_dispatcher/db.rs b/stacks-node/src/event_dispatcher/db.rs index 122812592c8..e5a5fa742aa 100644 --- a/stacks-node/src/event_dispatcher/db.rs +++ b/stacks-node/src/event_dispatcher/db.rs @@ -59,17 +59,22 @@ impl EventDispatcherDbConnection { EventDispatcherDbConnection { connection } } - /// Insert a payload into the database, retrying on failure. - pub fn insert_payload_with_retry(&self, url: &str, payload_bytes: &[u8], timeout: Duration) { + /// Insert a payload into the database, retrying on failure. Returns the id of of the inserted record. + pub fn insert_payload_with_retry( + &self, + url: &str, + payload_bytes: &[u8], + timeout: Duration, + ) -> i64 { let mut attempts = 0i64; let mut backoff = Duration::from_millis(100); // Initial backoff duration let max_backoff = Duration::from_secs(5); // Cap the backoff duration loop { match self.insert_payload(url, payload_bytes, timeout) { - Ok(_) => { + Ok(id) => { // Successful insert, break the loop - return; + return id; } Err(err) => { // Log the error, then retry after a delay @@ -95,18 +100,14 @@ impl EventDispatcherDbConnection { url: &str, payload_bytes: &[u8], timeout: Duration, - ) -> Result<(), db_error> { + ) -> Result { let timeout_ms: u64 = timeout.as_millis().try_into().expect("Timeout too large"); - self.connection.execute( - "INSERT INTO pending_payloads (url, payload, timeout) VALUES (?1, ?2, ?3)", + let id: i64 = self.connection.query_row( + "INSERT INTO pending_payloads (url, payload, timeout) VALUES (?1, ?2, ?3) RETURNING id", params![url, payload_bytes, timeout_ms], + |row| row.get(0), )?; - Ok(()) - } - - // TODO: change this to get the id from the insertion directly, because that's more reliable - pub fn last_insert_rowid(&self) -> i64 { - self.connection.last_insert_rowid() + Ok(id) } pub fn get_pending_payloads(&self) -> Result, u64)>, db_error> { From 41717b4fe2de9811e37ba3fa6f6125b2424aef14 Mon Sep 17 00:00:00 2001 From: benjamin-stacks <246469650+benjamin-stacks@users.noreply.github.com> Date: Mon, 29 Dec 2025 12:19:43 +0100 Subject: [PATCH 03/14] refactor: use a dedicated struct for the event http request data ... instead of using unnamed tuples and long parameter lists. --- stacks-node/src/event_dispatcher.rs | 74 ++++++++++----------- stacks-node/src/event_dispatcher/db.rs | 81 ++++++++++++----------- stacks-node/src/event_dispatcher/tests.rs | 20 ++++-- 3 files changed, 97 insertions(+), 78 deletions(-) diff --git a/stacks-node/src/event_dispatcher.rs b/stacks-node/src/event_dispatcher.rs index acf3782656b..b1c1d11e379 100644 --- a/stacks-node/src/event_dispatcher.rs +++ b/stacks-node/src/event_dispatcher.rs @@ -167,6 +167,12 @@ impl EventObserver { } } +struct EventRequestData { + pub url: String, + pub payload_bytes: Arc<[u8]>, + pub timeout: Duration, +} + /// Events received from block-processing. /// Stacks events are structured as JSON, and are grouped by topic. An event observer can /// subscribe to one or more specific event streams, or the "any" stream to receive all of them. @@ -1059,10 +1065,11 @@ impl EventDispatcher { pending_payloads.len() ); - for (id, url, payload_bytes, _timeout_ms) in pending_payloads { - info!("Event dispatcher: processing pending payload: {url}"); - let full_url = Url::parse(url.as_str()) - .unwrap_or_else(|_| panic!("Event dispatcher: unable to parse {url} as a URL")); + for (id, mut data) in pending_payloads { + info!("Event dispatcher: processing pending payload: {}", data.url); + let full_url = Url::parse(data.url.as_str()).unwrap_or_else(|_| { + panic!("Event dispatcher: unable to parse {} as a URL", data.url) + }); // find the right observer let observer = self.registered_observers.iter().find(|observer| { let endpoint_url = Url::parse(format!("http://{}", &observer.endpoint).as_str()) @@ -1079,7 +1086,7 @@ impl EventDispatcher { // This observer is no longer registered, skip and delete info!( "Event dispatcher: observer {} no longer registered, skipping", - url + data.url ); if let Err(e) = conn.delete_payload(id) { error!( @@ -1090,13 +1097,11 @@ impl EventDispatcher { continue; }; - self.make_http_request_and_delete_from_db( - &payload_bytes, - full_url.as_str(), - observer.timeout, - observer.disable_retries, - id, - ); + // If the timeout configuration for this observer is different from what it was + // originally, the updated config wins. + data.timeout = observer.timeout; + + self.make_http_request_and_delete_from_db(&data, observer.disable_retries, id); } } @@ -1117,29 +1122,27 @@ impl EventDispatcher { } }; - let id = self.save_to_db(&full_url, bytes.as_ref(), event_observer.timeout); + let data = EventRequestData { + payload_bytes: bytes, + url: full_url, + timeout: event_observer.timeout, + }; + + let id = self.save_to_db(&data); - self.make_http_request_and_delete_from_db( - &bytes, - &full_url, - event_observer.timeout, - event_observer.disable_retries, - id, - ); + self.make_http_request_and_delete_from_db(&data, event_observer.disable_retries, id); } fn make_http_request( - payload_bytes: &Arc<[u8]>, - full_url: &str, - timeout: Duration, + data: &EventRequestData, disable_retries: bool, ) -> Result<(), EventDispatcherError> { debug!( - "Event dispatcher: Sending payload"; "url" => %full_url, "bytes" => payload_bytes.len() + "Event dispatcher: Sending payload"; "url" => &data.url, "bytes" => data.payload_bytes.len() ); - let url = Url::parse(full_url) - .unwrap_or_else(|_| panic!("Event dispatcher: unable to parse {full_url} as a URL")); + let url = Url::parse(&data.url) + .unwrap_or_else(|_| panic!("Event dispatcher: unable to parse {} as a URL", data.url)); let host = url.host_str().expect("Invalid URL: missing host"); let port = url.port_or_known_default().unwrap_or(80); @@ -1150,18 +1153,18 @@ impl EventDispatcher { let mut backoff = Duration::from_millis(100); let mut attempts: i32 = 0; // Cap the backoff at 3x the timeout - let max_backoff = timeout.saturating_mul(3); + let max_backoff = data.timeout.saturating_mul(3); loop { let mut request = StacksHttpRequest::new_for_peer( peerhost.clone(), "POST".into(), url.path().into(), - HttpRequestContents::new().payload_json_bytes(Arc::clone(payload_bytes)), + HttpRequestContents::new().payload_json_bytes(Arc::clone(&data.payload_bytes)), ) .unwrap_or_else(|_| panic!("FATAL: failed to encode infallible data as HTTP request")); request.add_header("Connection".into(), "close".into()); - match send_http_request(host, port, request, timeout) { + match send_http_request(host, port, request, data.timeout) { Ok(response) => { if response.preamble().status_code == 200 { debug!( @@ -1218,7 +1221,7 @@ impl EventDispatcher { format!("http://{url_str}") } - fn save_to_db(&self, url: &str, payload_bytes: &[u8], timeout: Duration) -> i64 { + fn save_to_db(&self, data: &EventRequestData) -> i64 { // Because the DB is initialized in the call to process_pending_payloads() during startup, // it is *probably* ok to skip initialization here. That said, at the time of writing this is the // only call to new_without_init(), and we might want to revisit the question whether it's @@ -1226,23 +1229,20 @@ impl EventDispatcher { let conn = EventDispatcherDbConnection::new_without_init(&self.db_path) .expect("Failed to open database for event observer"); - conn.insert_payload_with_retry(&url, payload_bytes.as_ref(), timeout) + conn.insert_payload_with_retry(data) } fn make_http_request_and_delete_from_db( &self, - payload_bytes: &Arc<[u8]>, - full_url: &str, - timeout: Duration, + data: &EventRequestData, disable_retries: bool, id: i64, ) { - let http_result = - Self::make_http_request(payload_bytes, full_url, timeout, disable_retries); + let http_result = Self::make_http_request(data, disable_retries); if let Err(err) = http_result { // log but continue - error!("EventDispatcher: dispatching failed"; "url" => &full_url, "error" => ?err); + error!("EventDispatcher: dispatching failed"; "url" => data.url.clone(), "error" => ?err); } #[cfg(test)] diff --git a/stacks-node/src/event_dispatcher/db.rs b/stacks-node/src/event_dispatcher/db.rs index e5a5fa742aa..fae3a457a73 100644 --- a/stacks-node/src/event_dispatcher/db.rs +++ b/stacks-node/src/event_dispatcher/db.rs @@ -22,6 +22,8 @@ use std::time::Duration; use rusqlite::{params, Connection}; use stacks::util_lib::db::Error as db_error; +use crate::event_dispatcher::EventRequestData; + /// Wraps a SQlite connection to the database in which pending event payloads are stored pub struct EventDispatcherDbConnection { connection: Connection, @@ -60,18 +62,13 @@ impl EventDispatcherDbConnection { } /// Insert a payload into the database, retrying on failure. Returns the id of of the inserted record. - pub fn insert_payload_with_retry( - &self, - url: &str, - payload_bytes: &[u8], - timeout: Duration, - ) -> i64 { + pub fn insert_payload_with_retry(&self, data: &EventRequestData) -> i64 { let mut attempts = 0i64; let mut backoff = Duration::from_millis(100); // Initial backoff duration let max_backoff = Duration::from_secs(5); // Cap the backoff duration loop { - match self.insert_payload(url, payload_bytes, timeout) { + match self.insert_payload(data) { Ok(id) => { // Successful insert, break the loop return id; @@ -95,36 +92,38 @@ impl EventDispatcherDbConnection { } } - pub fn insert_payload( - &self, - url: &str, - payload_bytes: &[u8], - timeout: Duration, - ) -> Result { - let timeout_ms: u64 = timeout.as_millis().try_into().expect("Timeout too large"); + pub fn insert_payload(&self, data: &EventRequestData) -> Result { + let timeout_ms: u64 = data + .timeout + .as_millis() + .try_into() + .expect("Timeout too large"); let id: i64 = self.connection.query_row( "INSERT INTO pending_payloads (url, payload, timeout) VALUES (?1, ?2, ?3) RETURNING id", - params![url, payload_bytes, timeout_ms], + params![data.url, data.payload_bytes, timeout_ms], |row| row.get(0), )?; Ok(id) } - pub fn get_pending_payloads(&self) -> Result, u64)>, db_error> { + pub fn get_pending_payloads(&self) -> Result, db_error> { let mut stmt = self .connection .prepare("SELECT id, url, payload, timeout FROM pending_payloads ORDER BY id")?; - let payload_iter = stmt.query_and_then( - [], - |row| -> Result<(i64, String, Arc<[u8]>, u64), db_error> { + let payload_iter = + stmt.query_and_then([], |row| -> Result<(i64, EventRequestData), db_error> { let id: i64 = row.get(0)?; let url: String = row.get(1)?; let payload_bytes: Vec = row.get(2)?; let payload_bytes = Arc::<[u8]>::from(payload_bytes); let timeout_ms: u64 = row.get(3)?; - Ok((id, url, payload_bytes, timeout_ms)) - }, - )?; + let data = EventRequestData { + url, + payload_bytes, + timeout: Duration::from_millis(timeout_ms), + }; + Ok((id, data)) + })?; payload_iter.collect() } @@ -258,7 +257,7 @@ mod test { .expect("Failed to get pending payloads"); assert_eq!(pending_payloads.len(), 1, "Expected one pending payload"); assert_eq!( - pending_payloads[0].2.as_ref(), + pending_payloads[0].1.payload_bytes.as_ref(), payload_str.as_bytes(), "Payload contents did not survive migration" ); @@ -272,13 +271,19 @@ mod test { let conn = EventDispatcherDbConnection::new(&db_path).expect("Failed to initialize the database"); - let url = "http://example.com/api"; + let url = "http://example.com/api".to_string(); let payload = json!({"key": "value"}); let timeout = Duration::from_secs(5); let payload_bytes = serde_json::to_vec(&payload).expect("Failed to serialize payload"); + let data = EventRequestData { + url, + payload_bytes: payload_bytes.into(), + timeout, + }; + // Insert payload - let insert_result = conn.insert_payload(url, payload_bytes.as_slice(), timeout); + let insert_result = conn.insert_payload(&data); assert!(insert_result.is_ok(), "Failed to insert payload"); // Get pending payloads @@ -287,18 +292,14 @@ mod test { .expect("Failed to get pending payloads"); assert_eq!(pending_payloads.len(), 1, "Expected one pending payload"); - let (_id, retrieved_url, stored_bytes, timeout_ms) = &pending_payloads[0]; - assert_eq!(retrieved_url, url, "URL does not match"); + let (_id, retrieved_data) = &pending_payloads[0]; + assert_eq!(retrieved_data.url, data.url, "URL does not match"); assert_eq!( - stored_bytes.as_ref(), - payload_bytes.as_slice(), + retrieved_data.payload_bytes.as_ref(), + data.payload_bytes.as_ref(), "Serialized payload does not match" ); - assert_eq!( - *timeout_ms, - timeout.as_millis() as u64, - "Timeout does not match" - ); + assert_eq!(retrieved_data.timeout, timeout, "Timeout does not match"); } #[test] @@ -309,13 +310,19 @@ mod test { let conn = EventDispatcherDbConnection::new(&db_path).expect("Failed to initialize the database"); - let url = "http://example.com/api"; + let url = "http://example.com/api".to_string(); let payload = json!({"key": "value"}); let timeout = Duration::from_secs(5); let payload_bytes = serde_json::to_vec(&payload).expect("Failed to serialize payload"); + let data = EventRequestData { + url, + payload_bytes: payload_bytes.into(), + timeout, + }; + // Insert payload - conn.insert_payload(url, payload_bytes.as_slice(), timeout) + conn.insert_payload(&data) .expect("Failed to insert payload"); // Get pending payloads @@ -324,7 +331,7 @@ mod test { .expect("Failed to get pending payloads"); assert_eq!(pending_payloads.len(), 1, "Expected one pending payload"); - let (id, _, _, _) = pending_payloads[0]; + let (id, _) = pending_payloads[0]; // Delete payload let delete_result = conn.delete_payload(id); diff --git a/stacks-node/src/event_dispatcher/tests.rs b/stacks-node/src/event_dispatcher/tests.rs index 7908ca364f8..4c7654464d5 100644 --- a/stacks-node/src/event_dispatcher/tests.rs +++ b/stacks-node/src/event_dispatcher/tests.rs @@ -261,12 +261,18 @@ fn test_process_pending_payloads() { .with_status(200) .create(); - let url = &format!("{}/api", &server.url()); + let url = format!("{}/api", &server.url()); + + let data = EventRequestData { + url, + payload_bytes: payload_bytes.into(), + timeout, + }; TEST_EVENT_OBSERVER_SKIP_RETRY.set(false); // Insert payload - conn.insert_payload(url, payload_bytes.as_slice(), timeout) + conn.insert_payload(&data) .expect("Failed to insert payload"); // Process pending payloads @@ -318,9 +324,15 @@ fn pending_payloads_are_skipped_if_url_does_not_match() { .create(); // Use a different URL than the observer's endpoint - let url = "http://different-domain.com/api"; + let url = "http://different-domain.com/api".to_string(); + + let data = EventRequestData { + url, + payload_bytes: payload_bytes.into(), + timeout, + }; - conn.insert_payload(url, payload_bytes.as_slice(), timeout) + conn.insert_payload(&data) .expect("Failed to insert payload"); dispatcher.process_pending_payloads(); From 0d62bd487e7374cbff34f7a36cff04ce82e5603a Mon Sep 17 00:00:00 2001 From: benjamin-stacks <246469650+benjamin-stacks@users.noreply.github.com> Date: Mon, 29 Dec 2025 15:15:32 +0100 Subject: [PATCH 04/14] feat: store timestamp when saving pending payloads to event observer db This will allow us to output warnings if the (non-blocking) delivery gets too far behind, because we can tell how long it took between enqueuing the event and actually sending it. This commit adds another migration to said database, so I slightly refactored the migration code. --- stacks-node/src/event_dispatcher.rs | 31 ++- stacks-node/src/event_dispatcher/db.rs | 234 ++++++++++++++++++---- stacks-node/src/event_dispatcher/tests.rs | 6 +- 3 files changed, 216 insertions(+), 55 deletions(-) diff --git a/stacks-node/src/event_dispatcher.rs b/stacks-node/src/event_dispatcher.rs index b1c1d11e379..94f06815aa6 100644 --- a/stacks-node/src/event_dispatcher.rs +++ b/stacks-node/src/event_dispatcher.rs @@ -24,7 +24,7 @@ use std::sync::mpsc::channel; use std::sync::LazyLock; use std::sync::{Arc, Mutex}; use std::thread::sleep; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use clarity::vm::costs::ExecutionCost; use clarity::vm::events::{FTEventType, NFTEventType, STXEventType}; @@ -76,6 +76,8 @@ pub use payloads::{ }; pub use stacker_db::StackerDBChannel; +use crate::event_dispatcher::db::PendingPayload; + #[cfg(test)] mod tests; @@ -1065,10 +1067,21 @@ impl EventDispatcher { pending_payloads.len() ); - for (id, mut data) in pending_payloads { - info!("Event dispatcher: processing pending payload: {}", data.url); - let full_url = Url::parse(data.url.as_str()).unwrap_or_else(|_| { - panic!("Event dispatcher: unable to parse {} as a URL", data.url) + for PendingPayload { + id, + mut request_data, + .. + } in pending_payloads + { + info!( + "Event dispatcher: processing pending payload: {}", + request_data.url + ); + let full_url = Url::parse(request_data.url.as_str()).unwrap_or_else(|_| { + panic!( + "Event dispatcher: unable to parse {} as a URL", + request_data.url + ) }); // find the right observer let observer = self.registered_observers.iter().find(|observer| { @@ -1086,7 +1099,7 @@ impl EventDispatcher { // This observer is no longer registered, skip and delete info!( "Event dispatcher: observer {} no longer registered, skipping", - data.url + request_data.url ); if let Err(e) = conn.delete_payload(id) { error!( @@ -1099,9 +1112,9 @@ impl EventDispatcher { // If the timeout configuration for this observer is different from what it was // originally, the updated config wins. - data.timeout = observer.timeout; + request_data.timeout = observer.timeout; - self.make_http_request_and_delete_from_db(&data, observer.disable_retries, id); + self.make_http_request_and_delete_from_db(&request_data, observer.disable_retries, id); } } @@ -1229,7 +1242,7 @@ impl EventDispatcher { let conn = EventDispatcherDbConnection::new_without_init(&self.db_path) .expect("Failed to open database for event observer"); - conn.insert_payload_with_retry(data) + conn.insert_payload_with_retry(data, SystemTime::now()) } fn make_http_request_and_delete_from_db( diff --git a/stacks-node/src/event_dispatcher/db.rs b/stacks-node/src/event_dispatcher/db.rs index fae3a457a73..e3f678f54c4 100644 --- a/stacks-node/src/event_dispatcher/db.rs +++ b/stacks-node/src/event_dispatcher/db.rs @@ -17,13 +17,20 @@ use std::path::PathBuf; use std::sync::Arc; use std::thread::sleep; -use std::time::Duration; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use rusqlite::{params, Connection}; use stacks::util_lib::db::Error as db_error; use crate::event_dispatcher::EventRequestData; +pub struct PendingPayload { + pub request_data: EventRequestData, + #[allow(dead_code)] // will be used in a follow-up commit + pub timestamp: SystemTime, + pub id: i64, +} + /// Wraps a SQlite connection to the database in which pending event payloads are stored pub struct EventDispatcherDbConnection { connection: Connection, @@ -42,17 +49,15 @@ impl EventDispatcherDbConnection { id INTEGER PRIMARY KEY AUTOINCREMENT, url TEXT NOT NULL, payload BLOB NOT NULL, - timeout INTEGER NOT NULL + timeout INTEGER NOT NULL, + timestamp INTEGER NOT NULL )", [], )?; let mut connection = EventDispatcherDbConnection { connection }; - if let Some(col_type) = connection.get_payload_column_type()? { - if col_type.eq_ignore_ascii_case("TEXT") { - info!("Event observer: migrating pending_payloads.payload from TEXT to BLOB"); - connection.migrate_payload_column_to_blob()?; - } - } + + connection.run_necessary_migrations()?; + Ok(connection) } @@ -62,13 +67,13 @@ impl EventDispatcherDbConnection { } /// Insert a payload into the database, retrying on failure. Returns the id of of the inserted record. - pub fn insert_payload_with_retry(&self, data: &EventRequestData) -> i64 { + pub fn insert_payload_with_retry(&self, data: &EventRequestData, timestamp: SystemTime) -> i64 { let mut attempts = 0i64; let mut backoff = Duration::from_millis(100); // Initial backoff duration let max_backoff = Duration::from_secs(5); // Cap the backoff duration loop { - match self.insert_payload(data) { + match self.insert_payload(data, timestamp) { Ok(id) => { // Successful insert, break the loop return id; @@ -92,38 +97,53 @@ impl EventDispatcherDbConnection { } } - pub fn insert_payload(&self, data: &EventRequestData) -> Result { + pub fn insert_payload( + &self, + data: &EventRequestData, + timestamp: SystemTime, + ) -> Result { let timeout_ms: u64 = data .timeout .as_millis() .try_into() .expect("Timeout too large"); + + let timestamp_s = timestamp + .duration_since(UNIX_EPOCH) + .expect("system clock is multiple decades slow") + .as_secs(); + let id: i64 = self.connection.query_row( - "INSERT INTO pending_payloads (url, payload, timeout) VALUES (?1, ?2, ?3) RETURNING id", - params![data.url, data.payload_bytes, timeout_ms], + "INSERT INTO pending_payloads (url, payload, timeout, timestamp) VALUES (?1, ?2, ?3, ?4) RETURNING id", + params![data.url, data.payload_bytes, timeout_ms, timestamp_s], |row| row.get(0), )?; Ok(id) } - pub fn get_pending_payloads(&self) -> Result, db_error> { - let mut stmt = self - .connection - .prepare("SELECT id, url, payload, timeout FROM pending_payloads ORDER BY id")?; - let payload_iter = - stmt.query_and_then([], |row| -> Result<(i64, EventRequestData), db_error> { - let id: i64 = row.get(0)?; - let url: String = row.get(1)?; - let payload_bytes: Vec = row.get(2)?; - let payload_bytes = Arc::<[u8]>::from(payload_bytes); - let timeout_ms: u64 = row.get(3)?; - let data = EventRequestData { - url, - payload_bytes, - timeout: Duration::from_millis(timeout_ms), - }; - Ok((id, data)) - })?; + pub fn get_pending_payloads(&self) -> Result, db_error> { + let mut stmt = self.connection.prepare( + "SELECT id, url, payload, timeout, timestamp FROM pending_payloads ORDER BY id", + )?; + let payload_iter = stmt.query_and_then([], |row| -> Result { + let id: i64 = row.get(0)?; + let url: String = row.get(1)?; + let payload_bytes: Vec = row.get(2)?; + let payload_bytes = Arc::<[u8]>::from(payload_bytes); + let timeout_ms: u64 = row.get(3)?; + let timestamp_s: u64 = row.get(4)?; + let request_data = EventRequestData { + url, + payload_bytes, + timeout: Duration::from_millis(timeout_ms), + }; + + Ok(PendingPayload { + id, + request_data, + timestamp: UNIX_EPOCH + Duration::from_secs(timestamp_s), + }) + })?; payload_iter.collect() } @@ -133,25 +153,69 @@ impl EventDispatcherDbConnection { Ok(()) } - fn get_payload_column_type(&self) -> Result, db_error> { + /// The initial schema of the datebase when this code was first created + const INITIAL_SCHEMA: u32 = 0; + /// The `payload`` column type changed from TEXT to BLOB + const PAYLOAD_IS_BLOB: u32 = 1; + /// Column `timestamp` added + const TIMESTAMP_COLUMN: u32 = 2; + + fn run_necessary_migrations(&mut self) -> Result<(), db_error> { + let current_schema = self.get_schema_version()?; + + if current_schema < Self::PAYLOAD_IS_BLOB { + info!("Event observer: migrating pending_payloads.payload from TEXT to BLOB"); + self.migrate_payload_column_to_blob()?; + } + + if current_schema < Self::TIMESTAMP_COLUMN { + info!("Event observer: adding timestamp to pending_payloads"); + self.add_timestamp_column()?; + } + + Ok(()) + } + + fn get_schema_version(&self) -> Result { let mut stmt = self .connection .prepare("PRAGMA table_info(pending_payloads)")?; + let name_col = stmt.column_index("name")?; + let type_col = stmt.column_index("type")?; + let rows = stmt.query_map([], |row| { - let name: String = row.get(1)?; - let col_type: String = row.get(2)?; + let name: String = row.get(name_col)?; + let col_type: String = row.get(type_col)?; Ok((name, col_type)) })?; + let mut payload_is_blob = None; + let mut timestamp_exists = false; + for row in rows { let (name, col_type) = row?; if name == "payload" { - return Ok(Some(col_type)); + payload_is_blob = Some(col_type.eq_ignore_ascii_case("BLOB")); + } else if name == "timestamp" { + timestamp_exists = true; } } - Ok(None) + if timestamp_exists && !(payload_is_blob == Some(true)) { + warn!("Pending event payload database schema is inconsistent"); + return Err(db_error::Corruption); + } + + let mut result = Self::INITIAL_SCHEMA; + + if timestamp_exists { + result = Self::TIMESTAMP_COLUMN + } else if payload_is_blob == Some(true) { + result = Self::PAYLOAD_IS_BLOB; + } + + Ok(result) } fn migrate_payload_column_to_blob(&mut self) -> Result<(), db_error> { @@ -178,6 +242,37 @@ impl EventDispatcherDbConnection { tx.commit()?; Ok(()) } + + fn add_timestamp_column(&mut self) -> Result<(), db_error> { + let tx = self.connection.transaction()?; + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("time travel to pre-1970 is not supported") + .as_secs(); + + tx.execute( + "ALTER TABLE pending_payloads RENAME TO pending_payloads_old", + [], + )?; + tx.execute( + "CREATE TABLE pending_payloads ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + url TEXT NOT NULL, + payload BLOB NOT NULL, + timeout INTEGER NOT NULL, + timestamp INTEGER NOT NULL + )", + [], + )?; + tx.execute( + "INSERT INTO pending_payloads (id, url, payload, timeout, timestamp) + SELECT id, url, CAST(payload AS BLOB), timeout, ?1 FROM pending_payloads_old", + [now], + )?; + tx.execute("DROP TABLE pending_payloads_old", [])?; + tx.commit()?; + Ok(()) + } } #[cfg(test)] @@ -212,7 +307,7 @@ mod test { } #[test] - fn test_migrate_payload_column_to_blob() { + fn test_migration() { let dir = tempdir().unwrap(); let db_path = dir.path().join("test_payload_migration.sqlite"); @@ -252,17 +347,60 @@ mod test { "Payload column was not migrated to BLOB" ); + let insertion_info_col_count: i64 = conn + .connection + .query_row( + "SELECT COUNT(*) FROM pragma_table_info('pending_payloads') WHERE name = 'timestamp'", + [], + |row| row.get(0), + ) + .unwrap(); + assert!( + insertion_info_col_count == 1, + "timestamp column was not added" + ); + let pending_payloads = conn .get_pending_payloads() .expect("Failed to get pending payloads"); assert_eq!(pending_payloads.len(), 1, "Expected one pending payload"); assert_eq!( - pending_payloads[0].1.payload_bytes.as_ref(), + pending_payloads[0].request_data.payload_bytes.as_ref(), payload_str.as_bytes(), "Payload contents did not survive migration" ); } + #[test] + fn test_migration_errors_on_inconsistent_schema() { + let dir = tempdir().unwrap(); + let db_path = dir.path().join("test_payload_migration.sqlite"); + + // Simulate the original schema with TEXT payloads, but also with + // a timestamp column. This should not happen. + let conn = Connection::open(&db_path).unwrap(); + conn.execute( + "CREATE TABLE pending_payloads ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + url TEXT NOT NULL, + payload TEXT NOT NULL, + timeout INTEGER NOT NULL, + timestamp INTEGER NOT NULL + )", + [], + ) + .unwrap(); + + let mut conn = EventDispatcherDbConnection::new_without_init(&db_path).unwrap(); + + let result = conn.run_necessary_migrations(); + + match result { + Err(db_error::Corruption) => {} + _ => panic!("inconsistent schema should have caused a corruption error result"), + } + } + #[test] fn test_insert_and_get_pending_payloads() { let dir = tempdir().unwrap(); @@ -274,6 +412,7 @@ mod test { let url = "http://example.com/api".to_string(); let payload = json!({"key": "value"}); let timeout = Duration::from_secs(5); + let timestamp_sentinel = UNIX_EPOCH + Duration::from_hours(24 * 20000); let payload_bytes = serde_json::to_vec(&payload).expect("Failed to serialize payload"); let data = EventRequestData { @@ -283,8 +422,7 @@ mod test { }; // Insert payload - let insert_result = conn.insert_payload(&data); - assert!(insert_result.is_ok(), "Failed to insert payload"); + let id = conn.insert_payload_with_retry(&data, timestamp_sentinel); // Get pending payloads let pending_payloads = conn @@ -292,7 +430,13 @@ mod test { .expect("Failed to get pending payloads"); assert_eq!(pending_payloads.len(), 1, "Expected one pending payload"); - let (_id, retrieved_data) = &pending_payloads[0]; + let PendingPayload { + id: retrieved_id, + timestamp: retrieved_timestamp, + request_data: retrieved_data, + } = &pending_payloads[0]; + + assert_eq!(*retrieved_id, id, "ID does not match"); assert_eq!(retrieved_data.url, data.url, "URL does not match"); assert_eq!( retrieved_data.payload_bytes.as_ref(), @@ -300,6 +444,10 @@ mod test { "Serialized payload does not match" ); assert_eq!(retrieved_data.timeout, timeout, "Timeout does not match"); + assert_eq!( + *retrieved_timestamp, timestamp_sentinel, + "Time stamp does not match" + ); } #[test] @@ -322,7 +470,7 @@ mod test { }; // Insert payload - conn.insert_payload(&data) + conn.insert_payload(&data, SystemTime::now()) .expect("Failed to insert payload"); // Get pending payloads @@ -331,7 +479,7 @@ mod test { .expect("Failed to get pending payloads"); assert_eq!(pending_payloads.len(), 1, "Expected one pending payload"); - let (id, _) = pending_payloads[0]; + let PendingPayload { id, .. } = pending_payloads[0]; // Delete payload let delete_result = conn.delete_payload(id); diff --git a/stacks-node/src/event_dispatcher/tests.rs b/stacks-node/src/event_dispatcher/tests.rs index 4c7654464d5..52f03062913 100644 --- a/stacks-node/src/event_dispatcher/tests.rs +++ b/stacks-node/src/event_dispatcher/tests.rs @@ -16,7 +16,7 @@ use std::net::TcpListener; use std::thread; -use std::time::Instant; +use std::time::{Instant, SystemTime}; use clarity::boot_util::boot_code_id; use clarity::vm::costs::ExecutionCost; @@ -272,7 +272,7 @@ fn test_process_pending_payloads() { TEST_EVENT_OBSERVER_SKIP_RETRY.set(false); // Insert payload - conn.insert_payload(&data) + conn.insert_payload(&data, SystemTime::now()) .expect("Failed to insert payload"); // Process pending payloads @@ -332,7 +332,7 @@ fn pending_payloads_are_skipped_if_url_does_not_match() { timeout, }; - conn.insert_payload(&data) + conn.insert_payload(&data, SystemTime::now()) .expect("Failed to insert payload"); dispatcher.process_pending_payloads(); From ecb215eca9f800bbe2d53b33f4b49ff0cbf4c717 Mon Sep 17 00:00:00 2001 From: benjamin-stacks <246469650+benjamin-stacks@users.noreply.github.com> Date: Tue, 6 Jan 2026 13:57:46 +0100 Subject: [PATCH 05/14] feat: non-blocking event delivery This commit is the main implementation work for #6543. It moves event dispatcher HTTP requests to a separate thread. That way, a slow event observer doesn't block the node from continuing its work. Only if your event observers are so slow that the node is continuously producing events faster than they can be delivered, will it eventually start blocking again, because the queue size for pending requests is bounded (at 1,000 right now, but I picked that number out of a hat, happy to change it if anyone has thoughts). Each new event payload is stored in the event observer DB, and its ID is then sent to the subthread, which will make the request and then delete the DB entry. That way, if a node is shut down while there are pending requests, they're in the DB ready to be retried after restart via `process_pending_payloads()` (which blocks until completion). So that's exactly as before (except that previously there couldn't have been more than one or two pending payloads). --- stacks-node/src/event_dispatcher.rs | 210 +++++--------- stacks-node/src/event_dispatcher/db.rs | 180 ++++++++---- stacks-node/src/event_dispatcher/tests.rs | 107 ++++++- stacks-node/src/event_dispatcher/worker.rs | 319 +++++++++++++++++++++ 4 files changed, 616 insertions(+), 200 deletions(-) create mode 100644 stacks-node/src/event_dispatcher/worker.rs diff --git a/stacks-node/src/event_dispatcher.rs b/stacks-node/src/event_dispatcher.rs index 94f06815aa6..887e81c1139 100644 --- a/stacks-node/src/event_dispatcher.rs +++ b/stacks-node/src/event_dispatcher.rs @@ -1,5 +1,5 @@ // Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation -// Copyright (C) 2020-2025 Stacks Open Internet Foundation +// Copyright (C) 2020-2026 Stacks Open Internet Foundation // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -23,7 +23,6 @@ use std::sync::mpsc::channel; #[cfg(test)] use std::sync::LazyLock; use std::sync::{Arc, Mutex}; -use std::thread::sleep; use std::time::{Duration, SystemTime}; use clarity::vm::costs::ExecutionCost; @@ -31,7 +30,6 @@ use clarity::vm::events::{FTEventType, NFTEventType, STXEventType}; use clarity::vm::types::{AssetIdentifier, QualifiedContractIdentifier}; #[cfg(any(test, feature = "testing"))] use lazy_static::lazy_static; -use rand::Rng; use serde_json::json; use stacks::burnchains::{PoxConstants, Txid}; use stacks::chainstate::burn::ConsensusHash; @@ -54,19 +52,17 @@ use stacks::net::api::postblock_proposal::{ BlockValidateOk, BlockValidateReject, BlockValidateResponse, }; use stacks::net::atlas::{Attachment, AttachmentInstance}; -use stacks::net::http::HttpRequestContents; -use stacks::net::httpcore::{send_http_request, StacksHttpRequest}; use stacks::net::stackerdb::StackerDBEventDispatcher; #[cfg(any(test, feature = "testing"))] use stacks::util::tests::TestFlag; use stacks_common::bitvec::BitVec; use stacks_common::types::chainstate::{BlockHeaderHash, BurnchainHeaderHash, StacksBlockId}; -use stacks_common::types::net::PeerHost; use url::Url; mod db; mod payloads; mod stacker_db; +mod worker; use db::EventDispatcherDbConnection; use payloads::*; @@ -77,6 +73,7 @@ pub use payloads::{ pub use stacker_db::StackerDBChannel; use crate::event_dispatcher::db::PendingPayload; +use crate::event_dispatcher::worker::{EventDispatcherResult, EventDispatcherWorker}; #[cfg(test)] mod tests; @@ -92,6 +89,8 @@ enum EventDispatcherError { SerializationError(serde_json::Error), HttpError(std::io::Error), DbError(stacks::util_lib::db::Error), + RecvError(std::sync::mpsc::RecvError), + SendError(String), // not capturing the underlying because it's a generic type } impl fmt::Display for EventDispatcherError { @@ -100,6 +99,8 @@ impl fmt::Display for EventDispatcherError { EventDispatcherError::SerializationError(ref e) => fmt::Display::fmt(e, f), EventDispatcherError::HttpError(ref e) => fmt::Display::fmt(e, f), EventDispatcherError::DbError(ref e) => fmt::Display::fmt(e, f), + EventDispatcherError::RecvError(ref e) => fmt::Display::fmt(e, f), + EventDispatcherError::SendError(ref s) => fmt::Display::fmt(s, f), } } } @@ -110,6 +111,8 @@ impl core::error::Error for EventDispatcherError { EventDispatcherError::SerializationError(ref e) => Some(e), EventDispatcherError::HttpError(ref e) => Some(e), EventDispatcherError::DbError(ref e) => Some(e), + EventDispatcherError::RecvError(ref e) => Some(e), + EventDispatcherError::SendError(_) => None, } } } @@ -132,6 +135,18 @@ impl From for EventDispatcherError { } } +impl From for EventDispatcherError { + fn from(value: std::sync::mpsc::RecvError) -> Self { + EventDispatcherError::RecvError(value) + } +} + +impl From> for EventDispatcherError { + fn from(value: std::sync::mpsc::SendError) -> Self { + EventDispatcherError::SendError(format!("{value}")) + } +} + #[derive(Debug, Clone)] struct EventObserver { /// URL to which events will be sent @@ -213,6 +228,9 @@ pub struct EventDispatcher { pub stackerdb_channel: Arc>, /// Path to the database where pending payloads are stored. db_path: PathBuf, + /// The worker thread that performs the actuall HTTP requests so that they don't block + /// the main operation of the node. + worker: EventDispatcherWorker, } /// This struct is used specifically for receiving proposal responses. @@ -237,7 +255,9 @@ impl ProposalCallbackReceiver for ProposalCallbackHandler { for observer in self.observers.iter() { self.dispatcher - .dispatch_to_observer(observer, &response, PATH_PROPOSAL_RESPONSE); + .dispatch_to_observer(observer, &response, PATH_PROPOSAL_RESPONSE) + .unwrap() + .wait_until_complete(); } } } @@ -415,6 +435,9 @@ impl EventDispatcher { db_path.push("event_observers.sqlite"); EventDispatcherDbConnection::new(&db_path).expect("Failed to initialize database"); + let worker = + EventDispatcherWorker::new(db_path.clone()).expect("Failed to start worker thread"); + EventDispatcher { stackerdb_channel: Arc::new(Mutex::new(StackerDBChannel::new())), registered_observers: vec![], @@ -430,6 +453,7 @@ impl EventDispatcher { stackerdb_observers_lookup: HashSet::new(), block_proposal_observers_lookup: HashSet::new(), db_path, + worker, } } @@ -643,7 +667,7 @@ impl EventDispatcher { ); // Send payload - self.dispatch_to_observer( + self.dispatch_to_observer_or_log_error( &self.registered_observers[observer_id], &payload, PATH_BLOCK_PROCESSED, @@ -1046,8 +1070,9 @@ impl EventDispatcher { event_observer } - /// Process any pending payloads in the database. - /// This is called when the event dispatcher is first instantiated. + /// Process any pending payloads in the database. This is meant to be called at startup, in order to + /// handle anything that was enqueued but not sent before shutdown. This method blocks until all + /// requests are made (or, if the observer is no longer registered, removed from the DB). pub fn process_pending_payloads(&self) { let conn = EventDispatcherDbConnection::new(&self.db_path).expect("Failed to initialize database"); @@ -1068,9 +1093,7 @@ impl EventDispatcher { ); for PendingPayload { - id, - mut request_data, - .. + id, request_data, .. } in pending_payloads { info!( @@ -1112,18 +1135,22 @@ impl EventDispatcher { // If the timeout configuration for this observer is different from what it was // originally, the updated config wins. - request_data.timeout = observer.timeout; - - self.make_http_request_and_delete_from_db(&request_data, observer.disable_retries, id); + self.worker + .initiate_send(id, observer.disable_retries, Some(observer.timeout)) + .expect("failed to dispatch pending event payload to worker thread") + .wait_until_complete(); } } + /// A successful result from this method only indicates that that payload was successfully + /// enqueued, not that the HTTP request was actually made. If you need to wait until that's + /// the case, call `wait_until_complete()` on the `EventDispatcherResult`. fn dispatch_to_observer( &self, event_observer: &EventObserver, payload: &serde_json::Value, path: &str, - ) { + ) -> Result { let full_url = Self::get_full_url(event_observer, path); let bytes = match Self::get_payload_bytes(payload) { Ok(bytes) => bytes, @@ -1131,7 +1158,7 @@ impl EventDispatcher { error!( "Event dispatcher: failed to serialize payload"; "path" => path, "error" => ?err ); - return; + return Err(err); } }; @@ -1143,81 +1170,25 @@ impl EventDispatcher { let id = self.save_to_db(&data); - self.make_http_request_and_delete_from_db(&data, event_observer.disable_retries, id); + self.worker + .initiate_send(id, event_observer.disable_retries, None) } - fn make_http_request( - data: &EventRequestData, - disable_retries: bool, - ) -> Result<(), EventDispatcherError> { - debug!( - "Event dispatcher: Sending payload"; "url" => &data.url, "bytes" => data.payload_bytes.len() - ); - - let url = Url::parse(&data.url) - .unwrap_or_else(|_| panic!("Event dispatcher: unable to parse {} as a URL", data.url)); - - let host = url.host_str().expect("Invalid URL: missing host"); - let port = url.port_or_known_default().unwrap_or(80); - let peerhost: PeerHost = format!("{host}:{port}") - .parse() - .unwrap_or(PeerHost::DNS(host.to_string(), port)); - - let mut backoff = Duration::from_millis(100); - let mut attempts: i32 = 0; - // Cap the backoff at 3x the timeout - let max_backoff = data.timeout.saturating_mul(3); - - loop { - let mut request = StacksHttpRequest::new_for_peer( - peerhost.clone(), - "POST".into(), - url.path().into(), - HttpRequestContents::new().payload_json_bytes(Arc::clone(&data.payload_bytes)), - ) - .unwrap_or_else(|_| panic!("FATAL: failed to encode infallible data as HTTP request")); - request.add_header("Connection".into(), "close".into()); - match send_http_request(host, port, request, data.timeout) { - Ok(response) => { - if response.preamble().status_code == 200 { - debug!( - "Event dispatcher: Successful POST"; "url" => %url - ); - break; - } else { - error!( - "Event dispatcher: Failed POST"; "url" => %url, "response" => ?response.preamble() - ); - } - } - Err(err) => { - warn!( - "Event dispatcher: connection or request failed to {host}:{port} - {err:?}"; - "backoff" => ?backoff, - "attempts" => attempts - ); - if disable_retries { - warn!("Observer is configured in disable_retries mode: skipping retry of payload"); - return Err(err.into()); - } - #[cfg(test)] - if TEST_EVENT_OBSERVER_SKIP_RETRY.get() { - warn!("Fault injection: skipping retry of payload"); - return Err(err.into()); - } - } - } - - sleep(backoff); - let jitter: u64 = rand::thread_rng().gen_range(0..100); - backoff = std::cmp::min( - backoff.saturating_mul(2) + Duration::from_millis(jitter), - max_backoff, - ); - attempts = attempts.saturating_add(1); + /// This fire-and-forget version of `dispatch_to_observer` logs any error from enqueueing the + /// request, and does not give you a way to wait for blocking until it's sent. If you need + /// more control, use `dispatch_to_observer()` directly and handle the result yourself. + /// + /// This method exists because we generally don't want the event dispatcher to interrupt the node's + /// processing. + fn dispatch_to_observer_or_log_error( + &self, + event_observer: &EventObserver, + payload: &serde_json::Value, + path: &str, + ) { + if let Err(err) = self.dispatch_to_observer(event_observer, payload, path) { + error!("Event dispatcher: Failed to enqueue payload for sending to observer: {err:?}"); } - - Ok(()) } fn get_payload_bytes(payload: &serde_json::Value) -> Result, EventDispatcherError> { @@ -1245,53 +1216,12 @@ impl EventDispatcher { conn.insert_payload_with_retry(data, SystemTime::now()) } - fn make_http_request_and_delete_from_db( - &self, - data: &EventRequestData, - disable_retries: bool, - id: i64, - ) { - let http_result = Self::make_http_request(data, disable_retries); - - if let Err(err) = http_result { - // log but continue - error!("EventDispatcher: dispatching failed"; "url" => data.url.clone(), "error" => ?err); - } - - #[cfg(test)] - if TEST_EVENT_OBSERVER_SKIP_RETRY.get() { - warn!("Fault injection: skipping deletion of payload"); - return; - } - - // We're deleting regardless of result -- if retries are disabled, that means - // we're supposed to forget about it in case of failure. If they're not disabled, - // then we wouldn't be here in case of failue, because `make_http_request` retries - // until it's successful (with the exception of the above fault injection which - // simulates a shutdown). - let deletion_result = self.delete_from_db(id); - - if let Err(e) = deletion_result { - error!( - "Event observer: failed to delete pending payload from database"; - "error" => ?e - ); - } - } - - fn delete_from_db(&self, id: i64) -> Result<(), EventDispatcherError> { - let conn = EventDispatcherDbConnection::new_without_init(&self.db_path) - .expect("Failed to open database for event observer"); - conn.delete_payload(id)?; - Ok(()) - } - fn send_new_attachments(&self, event_observer: &EventObserver, payload: &serde_json::Value) { - self.dispatch_to_observer(event_observer, payload, PATH_ATTACHMENT_PROCESSED); + self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_ATTACHMENT_PROCESSED); } fn send_new_mempool_txs(&self, event_observer: &EventObserver, payload: &serde_json::Value) { - self.dispatch_to_observer(event_observer, payload, PATH_MEMPOOL_TX_SUBMIT); + self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_MEMPOOL_TX_SUBMIT); } /// Serializes new microblocks data into a JSON payload and sends it off to the correct path @@ -1324,7 +1254,7 @@ impl EventDispatcher { "burn_block_timestamp": burn_block_timestamp, }); - self.dispatch_to_observer(event_observer, &payload, PATH_MICROBLOCK_SUBMIT); + self.dispatch_to_observer_or_log_error(event_observer, &payload, PATH_MICROBLOCK_SUBMIT); } fn send_dropped_mempool_txs( @@ -1332,15 +1262,15 @@ impl EventDispatcher { event_observer: &EventObserver, payload: &serde_json::Value, ) { - self.dispatch_to_observer(event_observer, payload, PATH_MEMPOOL_TX_DROP); + self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_MEMPOOL_TX_DROP); } fn send_mined_block(&self, event_observer: &EventObserver, payload: &serde_json::Value) { - self.dispatch_to_observer(event_observer, payload, PATH_MINED_BLOCK); + self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_MINED_BLOCK); } fn send_mined_microblock(&self, event_observer: &EventObserver, payload: &serde_json::Value) { - self.dispatch_to_observer(event_observer, payload, PATH_MINED_MICROBLOCK); + self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_MINED_MICROBLOCK); } fn send_mined_nakamoto_block( @@ -1348,15 +1278,15 @@ impl EventDispatcher { event_observer: &EventObserver, payload: &serde_json::Value, ) { - self.dispatch_to_observer(event_observer, payload, PATH_MINED_NAKAMOTO_BLOCK); + self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_MINED_NAKAMOTO_BLOCK); } fn send_stackerdb_chunks(&self, event_observer: &EventObserver, payload: &serde_json::Value) { - self.dispatch_to_observer(event_observer, payload, PATH_STACKERDB_CHUNKS); + self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_STACKERDB_CHUNKS); } fn send_new_burn_block(&self, event_observer: &EventObserver, payload: &serde_json::Value) { - self.dispatch_to_observer(event_observer, payload, PATH_BURN_BLOCK_SUBMIT); + self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_BURN_BLOCK_SUBMIT); } } diff --git a/stacks-node/src/event_dispatcher/db.rs b/stacks-node/src/event_dispatcher/db.rs index e3f678f54c4..2aa091e2284 100644 --- a/stacks-node/src/event_dispatcher/db.rs +++ b/stacks-node/src/event_dispatcher/db.rs @@ -1,5 +1,5 @@ // Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation -// Copyright (C) 2020-2025 Stacks Open Internet Foundation +// Copyright (C) 2020-2026 Stacks Open Internet Foundation // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -14,12 +14,13 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use std::fmt::Debug; use std::path::PathBuf; use std::sync::Arc; use std::thread::sleep; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use rusqlite::{params, Connection}; +use rusqlite::{params, Connection, Row}; use stacks::util_lib::db::Error as db_error; use crate::event_dispatcher::EventRequestData; @@ -32,6 +33,7 @@ pub struct PendingPayload { } /// Wraps a SQlite connection to the database in which pending event payloads are stored +#[derive(Debug)] pub struct EventDispatcherDbConnection { connection: Connection, } @@ -68,33 +70,17 @@ impl EventDispatcherDbConnection { /// Insert a payload into the database, retrying on failure. Returns the id of of the inserted record. pub fn insert_payload_with_retry(&self, data: &EventRequestData, timestamp: SystemTime) -> i64 { - let mut attempts = 0i64; - let mut backoff = Duration::from_millis(100); // Initial backoff duration - let max_backoff = Duration::from_secs(5); // Cap the backoff duration - - loop { - match self.insert_payload(data, timestamp) { - Ok(id) => { - // Successful insert, break the loop - return id; - } - Err(err) => { - // Log the error, then retry after a delay - warn!("Failed to insert payload into event observer database: {err:?}"; - "backoff" => ?backoff, - "attempts" => attempts - ); - - // Wait for the backoff duration - sleep(backoff); - - // Increase the backoff duration (with exponential backoff) - backoff = std::cmp::min(backoff.saturating_mul(2), max_backoff); - - attempts = attempts.saturating_add(1); - } - } - } + with_retry( + || self.insert_payload(data, timestamp), + "Failed to insert payload into event observer database".to_string(), + ) + } + + pub fn get_payload_with_retry(&self, id: i64) -> PendingPayload { + with_retry( + || self.get_payload(id), + "Failed to retrieve payload {id} from event observer database".to_string(), + ) } pub fn insert_payload( @@ -121,29 +107,19 @@ impl EventDispatcherDbConnection { Ok(id) } + pub fn get_payload(&self, id: i64) -> Result { + self.connection.query_row_and_then( + &format!("SELECT {PAYLOAD_FIELDS} FROM pending_payloads WHERE id = ?1"), + [id], + row_to_pending_payload, + ) + } + pub fn get_pending_payloads(&self) -> Result, db_error> { - let mut stmt = self.connection.prepare( - "SELECT id, url, payload, timeout, timestamp FROM pending_payloads ORDER BY id", - )?; - let payload_iter = stmt.query_and_then([], |row| -> Result { - let id: i64 = row.get(0)?; - let url: String = row.get(1)?; - let payload_bytes: Vec = row.get(2)?; - let payload_bytes = Arc::<[u8]>::from(payload_bytes); - let timeout_ms: u64 = row.get(3)?; - let timestamp_s: u64 = row.get(4)?; - let request_data = EventRequestData { - url, - payload_bytes, - timeout: Duration::from_millis(timeout_ms), - }; - - Ok(PendingPayload { - id, - request_data, - timestamp: UNIX_EPOCH + Duration::from_secs(timestamp_s), - }) - })?; + let mut stmt = self.connection.prepare(&format!( + "SELECT {PAYLOAD_FIELDS} FROM pending_payloads ORDER BY id" + ))?; + let payload_iter = stmt.query_and_then([], row_to_pending_payload)?; payload_iter.collect() } @@ -275,8 +251,78 @@ impl EventDispatcherDbConnection { } } +// If you change this, make sure to change `row_to_pending_payload` in sync. +const PAYLOAD_FIELDS: &str = "id, url, payload, timeout, timestamp"; + +/// This function should only be used with rows that were SELECTed using the +/// `PAYLOAD_FIELDS` constant. +fn row_to_pending_payload(row: &Row) -> Result { + let id: i64 = row.get(0)?; + let url: String = row.get(1)?; + let payload_bytes: Vec = row.get(2)?; + let payload_bytes = Arc::<[u8]>::from(payload_bytes); + let timeout_ms: u64 = row.get(3)?; + let timestamp_s: u64 = row.get(4)?; + let request_data = EventRequestData { + url, + payload_bytes, + timeout: Duration::from_millis(timeout_ms), + }; + + Ok(PendingPayload { + id, + request_data, + timestamp: UNIX_EPOCH + Duration::from_secs(timestamp_s), + }) +} + +/// Calls the given function, repeatedly if necessary, until it doesn't fail, and then +/// returns the result from the successful call. Initially backs off for 0.1s and increases +/// backoff exponentially up to a max of five seconds. If the function never returns a +/// success result, `with_retry` will block forever. +/// +/// # Example +/// +/// let response = with_retry(|| perform_db_op(42), "database operation 42 failed"); +fn with_retry(f: F, error_log_text: String) -> T +where + F: Fn() -> Result, + E: Debug, +{ + let mut attempts = 0i64; + let mut backoff = Duration::from_millis(100); // Initial backoff duration + let max_backoff = Duration::from_secs(5); // Cap the backoff duration + + loop { + match f() { + Ok(thing) => { + // Successful operation, break the loop + return thing; + } + Err(err) => { + // Log the error, then retry after a delay + warn!("{error_log_text}: {err:?}"; + "backoff" => ?backoff, + "attempts" => attempts + ); + + // Wait for the backoff duration + sleep(backoff); + + // Increase the backoff duration (with exponential backoff) + backoff = std::cmp::min(backoff.saturating_mul(2), max_backoff); + + attempts = attempts.saturating_add(1); + } + } + } +} + #[cfg(test)] mod test { + use std::cell::RefCell; + use std::time::Instant; + use serde_json::json; use tempfile::tempdir; @@ -491,4 +537,36 @@ mod test { .expect("Failed to get pending payloads"); assert_eq!(pending_payloads.len(), 0, "Expected no pending payloads"); } + + #[test] + fn test_with_retry_returns_original_result() { + let f = || Result::::Ok(6_7); + let result = with_retry(f, "failed".to_string()); + assert_eq!(result, 67); + } + + #[test] + fn test_with_retry_retries_as_often_as_necessary() { + let call_count = RefCell::new(0); + let f = || { + *call_count.borrow_mut() += 1; + if *call_count.borrow() < 5 { + return Err("keep trying"); + } else { + return Ok("you did it"); + } + }; + let now = Instant::now(); + let result = with_retry(f, "failed".to_string()); + let elapsed_millis = now.elapsed().as_millis(); + assert_eq!(result, "you did it"); + let count = *call_count.borrow(); + assert_eq!( + count, 5, + "inner function was not called the expected number of times" + ); + // We retry 4 times, with delays of 100, 200, 400, and 800 ms, respectively, + // for a total of 1,500. + assert!(1_450 < elapsed_millis && elapsed_millis < 1_550); + } } diff --git a/stacks-node/src/event_dispatcher/tests.rs b/stacks-node/src/event_dispatcher/tests.rs index 52f03062913..8ba1f123ab0 100644 --- a/stacks-node/src/event_dispatcher/tests.rs +++ b/stacks-node/src/event_dispatcher/tests.rs @@ -1,5 +1,5 @@ // Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation -// Copyright (C) 2020-2025 Stacks Open Internet Foundation +// Copyright (C) 2020-2026 Stacks Open Internet Foundation // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -15,6 +15,7 @@ // along with this program. If not, see . use std::net::TcpListener; +use std::sync::atomic::{AtomicU32, Ordering}; use std::thread; use std::time::{Instant, SystemTime}; @@ -37,9 +38,12 @@ use stacks::chainstate::stacks::{ TransactionPayload, TransactionPostConditionMode, TransactionPublicKeyEncoding, TransactionSpendingCondition, TransactionVersion, }; +use stacks::net::http::HttpRequestContents; +use stacks::net::httpcore::{send_http_request, StacksHttpRequest}; use stacks::types::chainstate::{ BlockHeaderHash, StacksAddress, StacksPrivateKey, StacksPublicKey, }; +use stacks::types::net::PeerHost; use stacks::util::hash::{Hash160, Sha512Trunc256Sum}; use stacks::util::secp256k1::MessageSignature; use stacks_common::bitvec::BitVec; @@ -408,7 +412,10 @@ fn test_send_payload_with_db() { TEST_EVENT_OBSERVER_SKIP_RETRY.set(false); // Call send_payload - dispatcher.dispatch_to_observer(&observer, &payload, "/test"); + dispatcher + .dispatch_to_observer(&observer, &payload, "/test") + .unwrap() + .wait_until_complete(); // Verify that the payload was sent and database is empty _m.assert(); @@ -453,7 +460,7 @@ fn test_send_payload_success() { let working_dir = dir.path().to_path_buf(); let dispatcher = EventDispatcher::new(working_dir); - dispatcher.dispatch_to_observer(&observer, &payload, "/test"); + dispatcher.dispatch_to_observer_or_log_error(&observer, &payload, "/test"); // Wait for the server to process the request rx.recv_timeout(Duration::from_secs(5)) @@ -505,7 +512,7 @@ fn test_send_payload_retry() { let working_dir = dir.path().to_path_buf(); let dispatcher = EventDispatcher::new(working_dir); - dispatcher.dispatch_to_observer(&observer, &payload, "/test"); + dispatcher.dispatch_to_observer_or_log_error(&observer, &payload, "/test"); // Wait for the server to process the request rx.recv_timeout(Duration::from_secs(5)) @@ -562,7 +569,10 @@ fn test_send_payload_timeout() { let dispatcher = EventDispatcher::new(working_dir); // Call the function being tested - dispatcher.dispatch_to_observer(&observer, &payload, "/test"); + dispatcher + .dispatch_to_observer(&observer, &payload, "/test") + .unwrap() + .wait_until_complete(); // Record the time after the function returns let elapsed_time = start_time.elapsed(); @@ -670,7 +680,10 @@ fn test_send_payload_with_db_force_restart() { info!("Sending payload 1"); // Send the payload - dispatcher.dispatch_to_observer(&observer, &payload, "/test"); + dispatcher + .dispatch_to_observer(&observer, &payload, "/test") + .unwrap() + .wait_until_complete(); // Re-enable retrying TEST_EVENT_OBSERVER_SKIP_RETRY.set(false); @@ -680,7 +693,7 @@ fn test_send_payload_with_db_force_restart() { info!("Sending payload 2"); // Send another payload - dispatcher.dispatch_to_observer(&observer, &payload2, "/test"); + dispatcher.dispatch_to_observer_or_log_error(&observer, &payload2, "/test"); // Wait for the server to process the requests rx.recv_timeout(Duration::from_secs(5)) @@ -707,7 +720,10 @@ fn test_event_dispatcher_disable_retries() { let dispatcher = EventDispatcher::new(working_dir); // in non "disable_retries" mode this will run forever - dispatcher.dispatch_to_observer(&observer, &payload, "/test"); + dispatcher + .dispatch_to_observer(&observer, &payload, "/test") + .unwrap() + .wait_until_complete(); // Verify that the payload was sent _m.assert(); @@ -727,7 +743,10 @@ fn test_event_dispatcher_disable_retries_invalid_url() { let dispatcher = EventDispatcher::new(working_dir); // in non "disable_retries" mode this will run forever - dispatcher.dispatch_to_observer(&observer, &payload, "/test"); + dispatcher + .dispatch_to_observer(&observer, &payload, "/test") + .unwrap() + .wait_until_complete(); } #[test] @@ -939,6 +958,7 @@ fn test_block_proposal_validation_event() { let endpoint = server.url().strip_prefix("http://").unwrap().to_string(); let dir = tempdir().unwrap(); let mut dispatcher = EventDispatcher::new(dir.path().to_path_buf()); + dispatcher.register_observer(&EventObserverConfig { endpoint: endpoint.clone(), events_keys: vec![EventKeyType::BlockProposal], @@ -970,3 +990,72 @@ fn test_block_proposal_validation_event() { mock.assert(); } + +#[test] +fn test_http_delivery_non_blocking() { + let mut slow_server = mockito::Server::new(); + + let start_count = Arc::new(AtomicU32::new(0)); + let end_count = Arc::new(AtomicU32::new(0)); + + let start_count2 = start_count.clone(); + let end_count2 = end_count.clone(); + + let mock = slow_server + .mock("POST", "/mined_nakamoto_block") + .with_body_from_request(move |_| { + start_count2.fetch_add(1, Ordering::SeqCst); + thread::sleep(Duration::from_secs(2)); + end_count2.fetch_add(1, Ordering::SeqCst); + "".into() + }) + .create(); + + let endpoint = slow_server + .url() + .strip_prefix("http://") + .unwrap() + .to_string(); + + let dir = tempdir().unwrap(); + let mut dispatcher = EventDispatcher::new(dir.path().to_path_buf()); + + dispatcher.register_observer(&EventObserverConfig { + endpoint: endpoint.clone(), + events_keys: vec![EventKeyType::MinedBlocks], + timeout_ms: 3_000, + disable_retries: false, + }); + + let nakamoto_block = NakamotoBlock { + header: NakamotoBlockHeader::empty(), + txs: vec![], + }; + + let start = Instant::now(); + + dispatcher.process_mined_nakamoto_block_event( + 0, + &nakamoto_block, + 0, + &ExecutionCost::max_value(), + vec![], + ); + + assert!( + start.elapsed() < Duration::from_millis(100), + "dispatcher blocked while sending event" + ); + + thread::sleep(Duration::from_secs(1)); + + assert!(start_count.load(Ordering::SeqCst) == 1); + assert!(end_count.load(Ordering::SeqCst) == 0); + + thread::sleep(Duration::from_secs(2)); + + assert!(start_count.load(Ordering::SeqCst) == 1); + assert!(end_count.load(Ordering::SeqCst) == 1); + + mock.assert(); +} diff --git a/stacks-node/src/event_dispatcher/worker.rs b/stacks-node/src/event_dispatcher/worker.rs new file mode 100644 index 00000000000..5967ff6b64e --- /dev/null +++ b/stacks-node/src/event_dispatcher/worker.rs @@ -0,0 +1,319 @@ +// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation +// Copyright (C) 2020-2026 Stacks Open Internet Foundation +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use std::path::PathBuf; +use std::sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender}; +use std::sync::Arc; +use std::thread::{self, sleep}; +use std::time::{Duration, SystemTime}; + +use rand::Rng; +use stacks::net::http::HttpRequestContents; +use stacks::net::httpcore::{send_http_request, StacksHttpRequest}; +use stacks::types::net::PeerHost; +use url::Url; + +use crate::event_dispatcher::db::EventDispatcherDbConnection; +#[cfg(test)] +use crate::event_dispatcher::TEST_EVENT_OBSERVER_SKIP_RETRY; +use crate::event_dispatcher::{EventDispatcherError, EventRequestData}; + +struct WorkerMessage { + /// The id of the payload data in the event observer DB. It must exist. + id: i64, + /// If true, the HTTP request is only attempted once. + disable_retries: bool, + /// A value for the HTTP timeout is stored in the DB, but can optionally be overridden. + timeout_override: Option, + /// The worker thread will send a message on this channel once it's done with this request. + completion: Sender<()>, +} + +/// The return type of `initiate_send()`. If the caller of that method just wishes to move +/// on, they can happily drop this result. This is the behavior for most event deliveries. +/// +/// On the other hand, if they wish to block until the HTTP request was successfully sent +/// (or, in the case of `disable_retries`, at least attempted), they can call +/// `.wait_until_complete()`. This is what happens during `process_pending_payloads()` at +/// startup. Note that it's possible that other requests are in the queue, so the blocking +/// may take longer than only the handling of this very request. +pub struct EventDispatcherResult { + /// The worker thread will send a one-time message to this channel to notify of completion. + /// Afterwards, it will drop the sender and thus close the channel. + receiver: Receiver<()>, +} + +impl EventDispatcherResult { + pub fn wait_until_complete(self) { + // There is no codepath that would drop the sender without sending the acknowledgenent + // first. And this method consumes `self`, so it can only be called once. + // So if despite all that, `recv()` returns an error, that means the worker thread panicked. + self.receiver + .recv() + .expect("EventDispatcherWorker thread has terminated mid-operation"); + } +} + +/// This worker is responsible for making the actual HTTP requests that ultimately result +/// from dispatching events to observers. It makes those requests on a dedicated separate +/// thread so that e.g. a slow event observer doesn't block a node from continuing its work. +/// +/// Call `EventDispatcherWorker::new()` to create. +/// +/// Call `initiate_send()` with the id of the payload (in the event oberserver DB) to enqueue. +/// +/// Cloning the `EventDispatcherWorker` does *not* create a new thread -- both the original and +/// the clone will share a single queue and worker thread. +/// +/// Once the `EventDispatcherWorker` (including any clones) is dropped, the worker thread will +/// finish any enqueued work and then shut down. +#[derive(Clone)] +pub struct EventDispatcherWorker { + sender: SyncSender, +} + +impl EventDispatcherWorker { + pub fn new(db_path: PathBuf) -> Result { + Self::new_with_custom_queue_size(db_path, 1_000) + } + + pub fn new_with_custom_queue_size( + db_path: PathBuf, + queue_size: usize, + ) -> Result { + let (message_tx, message_rx) = sync_channel(queue_size); + let (ready_tx, ready_rx) = channel(); + + thread::Builder::new() + .name("event-dispatcher".to_string()) + .spawn(move || { + let conn = match EventDispatcherDbConnection::new(&db_path) { + Ok(conn) => conn, + Err(err) => { + error!("Event Dispatcher Worker: Unable to open DB, terminating worker thread: {err}"); + ready_tx.send(Err(err)).unwrap(); + return; + } + }; + + if let Err(err) = ready_tx.send(Ok(())) { + // If the sending fails (i.e. the receiver has been dropped), that means a logic bug + // has been introduced to the code -- at time of writing, the main function is waiting + // for this message a few lines down, outside the thread closure. + // We log this, but we still start the loop. + error!( + "Event Dispatcher Worker: Unable to send ready state. This is a bug. {err}" + ); + } + + // this will run forever until the messaging channel is closed + Self::main_thread_loop(conn, message_rx); + }) + .unwrap(); + + // note double question mark, deals with both the channel RecvError and whatever error + // might be sent across that channel + ready_rx.recv()??; + + Ok(EventDispatcherWorker { sender: message_tx }) + } + + /// Let the worker know that it should send the request that is stored in the DB under the given + /// ID, and delete that DB entry once it's done. + /// + /// A successful result only means that the request was successfully enqueued, not that it was + /// actually made. If you need to wait until the latter has happened, call `wait_until_complete()` + /// on the returned `EventDispatcherResult`. + /// + /// The worker has a limited queue size (1000 by default). If the queue is already full, the + /// call to `initiate_send()` will block until space has become available. + pub fn initiate_send( + &self, + id: i64, + disable_retries: bool, + timeout_override: Option, + ) -> Result { + let (sender, receiver) = channel(); + + self.sender.send(WorkerMessage { + id, + disable_retries, + timeout_override, + completion: sender, + })?; + + Ok(EventDispatcherResult { receiver }) + } + + fn main_thread_loop(conn: EventDispatcherDbConnection, message_rx: Receiver) { + // main loop of the thread -- get message from channel, grab data from DB, send request, + // delete from DB, acknowledge + loop { + let Ok(WorkerMessage { + id, + disable_retries, + timeout_override, + completion, + }) = message_rx.recv() + else { + info!("Event Dispatcher Worker: channel closed, terminating worker thread."); + return; + }; + + // This will block forever if we were passed a non-existing ID. Don't do that. + let mut payload = conn.get_payload_with_retry(id); + + // Deliberately not handling the error case of `duration_since()` -- if the `timestamp` + // is *after* `now` (which should be extremely rare), the most likely reason is a *slight* + // adjustment to the the system clock (e.g. NTP sync) that happened between storing the + // entity and retrieving it, and that should be fine. + // If there was a *major* adjustment, all bets are off anyway. You shouldn't mess with your + // clock on a server running a node. + if let Ok(age) = SystemTime::now().duration_since(payload.timestamp) { + if age.as_secs() > 5 * 60 { + warn!( + "Event Dispatcher Worker: Event payload transmitting more than 5 minutes after event"; + "age_ms" => age.as_millis(), + "id"=> id + ); + } + } + + if let Some(timeout_override) = timeout_override { + payload.request_data.timeout = timeout_override; + } + + Self::make_http_request_and_delete_from_db( + &payload.request_data, + disable_retries, + id, + &conn, + ); + + // We're ignoring the result of this call -- if the requester has dropped the receiver + // in the meantime, that's fine. That is the usual case of fire-and-forget calls. + let _ = completion.send(()); + } + } + + fn make_http_request_and_delete_from_db( + data: &EventRequestData, + disable_retries: bool, + id: i64, + conn: &EventDispatcherDbConnection, + ) { + let http_result = Self::make_http_request(data, disable_retries); + + if let Err(err) = http_result { + // log but continue + error!("EventDispatcher: dispatching failed"; "url" => data.url.clone(), "error" => ?err); + } + + #[cfg(test)] + if TEST_EVENT_OBSERVER_SKIP_RETRY.get() { + warn!("Fault injection: skipping deletion of payload"); + return; + } + + // We're deleting regardless of result -- if retries are disabled, that means + // we're supposed to forget about it in case of failure. If they're not disabled, + // then we wouldn't be here in case of failue, because `make_http_request` retries + // until it's successful (with the exception of the above fault injection which + // simulates a shutdown). + let deletion_result = conn.delete_payload(id); + + if let Err(e) = deletion_result { + error!( + "Event observer: failed to delete pending payload from database"; + "error" => ?e + ); + } + } + + fn make_http_request( + data: &EventRequestData, + disable_retries: bool, + ) -> Result<(), EventDispatcherError> { + debug!( + "Event dispatcher: Sending payload"; "url" => &data.url, "bytes" => data.payload_bytes.len() + ); + + let url = Url::parse(&data.url) + .unwrap_or_else(|_| panic!("Event dispatcher: unable to parse {} as a URL", data.url)); + + let host = url.host_str().expect("Invalid URL: missing host"); + let port = url.port_or_known_default().unwrap_or(80); + let peerhost: PeerHost = format!("{host}:{port}") + .parse() + .unwrap_or(PeerHost::DNS(host.to_string(), port)); + + let mut backoff = Duration::from_millis(100); + let mut attempts: i32 = 0; + // Cap the backoff at 3x the timeout + let max_backoff = data.timeout.saturating_mul(3); + + loop { + let mut request = StacksHttpRequest::new_for_peer( + peerhost.clone(), + "POST".into(), + url.path().into(), + HttpRequestContents::new().payload_json_bytes(Arc::clone(&data.payload_bytes)), + ) + .unwrap_or_else(|_| panic!("FATAL: failed to encode infallible data as HTTP request")); + request.add_header("Connection".into(), "close".into()); + match send_http_request(host, port, request, data.timeout) { + Ok(response) => { + if response.preamble().status_code == 200 { + debug!( + "Event dispatcher: Successful POST"; "url" => %url + ); + break; + } else { + error!( + "Event dispatcher: Failed POST"; "url" => %url, "response" => ?response.preamble() + ); + } + } + Err(err) => { + warn!( + "Event dispatcher: connection or request failed to {host}:{port} - {err:?}"; + "backoff" => ?backoff, + "attempts" => attempts + ); + if disable_retries { + warn!("Observer is configured in disable_retries mode: skipping retry of payload"); + return Err(err.into()); + } + #[cfg(test)] + if TEST_EVENT_OBSERVER_SKIP_RETRY.get() { + warn!("Fault injection: skipping retry of payload"); + return Err(err.into()); + } + } + } + + sleep(backoff); + let jitter: u64 = rand::thread_rng().gen_range(0..100); + backoff = std::cmp::min( + backoff.saturating_mul(2) + Duration::from_millis(jitter), + max_backoff, + ); + attempts = attempts.saturating_add(1); + } + + Ok(()) + } +} From 72437b24db750af4bb26ddb56361a2c26cd063d1 Mon Sep 17 00:00:00 2001 From: benjamin-stacks <246469650+benjamin-stacks@users.noreply.github.com> Date: Wed, 7 Jan 2026 09:54:12 +0100 Subject: [PATCH 06/14] fix: add a way to block until all events are delivered, to fix a test This fixes [this integration test failure](https://github.com/stacks-network/stacks-core/actions/runs/20749024845/job/59577684952?pr=6762), caused by the fact that event delivery wasn't complete by the time the assertions were made. --- stacks-node/src/event_dispatcher.rs | 9 ++++ stacks-node/src/event_dispatcher/worker.rs | 54 ++++++++++++++++------ stacks-node/src/tests/neon_integrations.rs | 2 + 3 files changed, 51 insertions(+), 14 deletions(-) diff --git a/stacks-node/src/event_dispatcher.rs b/stacks-node/src/event_dispatcher.rs index 887e81c1139..33f5109bfb6 100644 --- a/stacks-node/src/event_dispatcher.rs +++ b/stacks-node/src/event_dispatcher.rs @@ -457,6 +457,15 @@ impl EventDispatcher { } } + /// Sends a noop task to the worker and waits until its completion is acknowledged. + /// This has the effect that all payloads that have been submitted before this point + /// are also done, which is a useful thing to wait for in some tests where you want + /// to assert on certain event deliveries. + #[cfg(test)] + pub fn catch_up(&self) { + self.worker.noop().unwrap().wait_until_complete(); + } + pub fn process_burn_block( &self, burn_block: &BurnchainHeaderHash, diff --git a/stacks-node/src/event_dispatcher/worker.rs b/stacks-node/src/event_dispatcher/worker.rs index 5967ff6b64e..d17c1478ec1 100644 --- a/stacks-node/src/event_dispatcher/worker.rs +++ b/stacks-node/src/event_dispatcher/worker.rs @@ -31,13 +31,20 @@ use crate::event_dispatcher::db::EventDispatcherDbConnection; use crate::event_dispatcher::TEST_EVENT_OBSERVER_SKIP_RETRY; use crate::event_dispatcher::{EventDispatcherError, EventRequestData}; +#[allow(dead_code)] // NoOp is only used in test configurations +enum WorkerTask { + Payload { + /// The id of the payload data in the event observer DB. It must exist. + id: i64, + /// If true, the HTTP request is only attempted once. + disable_retries: bool, + /// A value for the HTTP timeout is stored in the DB, but can optionally be overridden. + timeout_override: Option, + }, + NoOp, +} struct WorkerMessage { - /// The id of the payload data in the event observer DB. It must exist. - id: i64, - /// If true, the HTTP request is only attempted once. - disable_retries: bool, - /// A value for the HTTP timeout is stored in the DB, but can optionally be overridden. - timeout_override: Option, + task: WorkerTask, /// The worker thread will send a message on this channel once it's done with this request. completion: Sender<()>, } @@ -149,9 +156,23 @@ impl EventDispatcherWorker { let (sender, receiver) = channel(); self.sender.send(WorkerMessage { - id, - disable_retries, - timeout_override, + task: WorkerTask::Payload { + id, + disable_retries, + timeout_override, + }, + completion: sender, + })?; + + Ok(EventDispatcherResult { receiver }) + } + + #[cfg(test)] + pub fn noop(&self) -> Result { + let (sender, receiver) = channel(); + + self.sender.send(WorkerMessage { + task: WorkerTask::NoOp, completion: sender, })?; @@ -162,15 +183,20 @@ impl EventDispatcherWorker { // main loop of the thread -- get message from channel, grab data from DB, send request, // delete from DB, acknowledge loop { - let Ok(WorkerMessage { + let Ok(WorkerMessage { task, completion }) = message_rx.recv() else { + info!("Event Dispatcher Worker: channel closed, terminating worker thread."); + return; + }; + + let WorkerTask::Payload { id, disable_retries, timeout_override, - completion, - }) = message_rx.recv() + } = task else { - info!("Event Dispatcher Worker: channel closed, terminating worker thread."); - return; + // no-op -- just ack and move on + let _ = completion.send(()); + continue; }; // This will block forever if we were passed a non-existing ID. Don't do that. diff --git a/stacks-node/src/tests/neon_integrations.rs b/stacks-node/src/tests/neon_integrations.rs index cf349fdc30b..644402b81d0 100644 --- a/stacks-node/src/tests/neon_integrations.rs +++ b/stacks-node/src/tests/neon_integrations.rs @@ -8344,6 +8344,8 @@ fn push_boot_receipts() { let mut run_loop = neon::RunLoop::new(conf); let _chainstate = run_loop.boot_chainstate(&burnchain_config); + run_loop.get_event_dispatcher().catch_up(); + // verify that the event observer got its boot receipts let blocks = test_observer::get_blocks(); assert_eq!(blocks.len(), 1); From f368e56f89625a3d8fcb4d7f380c8d575b92b3d0 Mon Sep 17 00:00:00 2001 From: benjamin-stacks <246469650+benjamin-stacks@users.noreply.github.com> Date: Thu, 8 Jan 2026 15:00:10 +0100 Subject: [PATCH 07/14] fix: ensure `process_pending_payloads` is really only called at startup Doing this work in the RunLoop implementations' startup code is *almost* the same thing, but not quite, since the nakamoto run loop might be started later (after an epoch 3 transition), at which point the event DB may already have new items from the current run of the application, which should *not* be touched by `process_pending_payloads`. This used to not be a problem, but now that that DB is used for the actual queue of the (concurrently running) EventDispatcherWorker, it has become one. --- stacks-node/src/main.rs | 13 +++++++++++++ stacks-node/src/node.rs | 1 - stacks-node/src/run_loop/nakamoto.rs | 1 - stacks-node/src/run_loop/neon.rs | 1 - 4 files changed, 13 insertions(+), 3 deletions(-) diff --git a/stacks-node/src/main.rs b/stacks-node/src/main.rs index c1b7c9e99ad..6e3da4acafa 100644 --- a/stacks-node/src/main.rs +++ b/stacks-node/src/main.rs @@ -249,6 +249,17 @@ fn cli_get_miner_spend( spend_amount } +/// If the previous session was terminated before all the pending events had been sent, +/// the DB will still contain them. Work through that before doing anything new. +/// Pending events for observers that are no longer registered will be discarded. +fn send_pending_event_payloads(conf: &Config) { + let mut event_dispatcher = EventDispatcher::new(conf.get_working_dir()); + for observer in &conf.events_observers { + event_dispatcher.register_observer(observer); + } + event_dispatcher.process_pending_payloads(); +} + fn main() { panic::set_hook(Box::new(|panic_info| { error!("Process abort due to thread panic: {panic_info}"); @@ -411,6 +422,8 @@ fn main() { debug!("burnchain configuration {:?}", &conf.burnchain); debug!("connection configuration {:?}", &conf.connection_options); + send_pending_event_payloads(&conf); + let num_round: u64 = 0; // Infinite number of rounds if conf.burnchain.mode == "helium" || conf.burnchain.mode == "mocknet" { diff --git a/stacks-node/src/node.rs b/stacks-node/src/node.rs index a150ec64fb1..5b54f04d4a3 100644 --- a/stacks-node/src/node.rs +++ b/stacks-node/src/node.rs @@ -343,7 +343,6 @@ impl Node { for observer in &config.events_observers { event_dispatcher.register_observer(observer); } - event_dispatcher.process_pending_payloads(); let burnchain_config = config.get_burnchain(); diff --git a/stacks-node/src/run_loop/nakamoto.rs b/stacks-node/src/run_loop/nakamoto.rs index 6c9e3ea1663..b335e83b42b 100644 --- a/stacks-node/src/run_loop/nakamoto.rs +++ b/stacks-node/src/run_loop/nakamoto.rs @@ -95,7 +95,6 @@ impl RunLoop { for observer in config.events_observers.iter() { event_dispatcher.register_observer(observer); } - event_dispatcher.process_pending_payloads(); Self { config, diff --git a/stacks-node/src/run_loop/neon.rs b/stacks-node/src/run_loop/neon.rs index 0a33aedc4bf..3efc8181122 100644 --- a/stacks-node/src/run_loop/neon.rs +++ b/stacks-node/src/run_loop/neon.rs @@ -318,7 +318,6 @@ impl RunLoop { for observer in config.events_observers.iter() { event_dispatcher.register_observer(observer); } - event_dispatcher.process_pending_payloads(); Self { config, From d5fa2fc5d8bb22a8374f3ac24e18a2267eaa4fdf Mon Sep 17 00:00:00 2001 From: benjamin-stacks <246469650+benjamin-stacks@users.noreply.github.com> Date: Thu, 8 Jan 2026 17:50:25 +0100 Subject: [PATCH 08/14] fix all the integration test issues with asynchronous event dispatching This is like 72437b24db750af4bb26ddb56361a2c26cd063d1, but it works for all the tests instead of only the one. While only that one test very obviously failed, the issue exists for pretty much all of the integration tests, because they rely on the test_observer to capture all relevant data. Things are fast enough, and therefore we've only seen one blatant failure, but 1) it's going to be flaky (I can create a whole lot of test failures by adding a small artificial delay to event delivery), and 2) it might actually be *hiding* test failures (in some cases, like e.g. neon_integrations::deep_contract, we're asserting that certain things are *not* in the data, and if the data is incomplete to begin with, those assertions are moot). --- stacks-node/src/event_dispatcher.rs | 46 ++++++++++++++++++++-- stacks-node/src/tests/neon_integrations.rs | 26 ++++++++++-- 2 files changed, 65 insertions(+), 7 deletions(-) diff --git a/stacks-node/src/event_dispatcher.rs b/stacks-node/src/event_dispatcher.rs index 33f5109bfb6..1ee47038a46 100644 --- a/stacks-node/src/event_dispatcher.rs +++ b/stacks-node/src/event_dispatcher.rs @@ -20,9 +20,9 @@ use std::fmt; use std::path::PathBuf; #[cfg(test)] use std::sync::mpsc::channel; -#[cfg(test)] -use std::sync::LazyLock; use std::sync::{Arc, Mutex}; +#[cfg(test)] +use std::sync::{LazyLock, Weak}; use std::time::{Duration, SystemTime}; use clarity::vm::costs::ExecutionCost; @@ -229,8 +229,9 @@ pub struct EventDispatcher { /// Path to the database where pending payloads are stored. db_path: PathBuf, /// The worker thread that performs the actuall HTTP requests so that they don't block - /// the main operation of the node. - worker: EventDispatcherWorker, + /// the main operation of the node. It's wrapped in an `Arc` only to make some test helpers + /// work (see `ALL_WORKERS`); in release code it wouldn't be necessary. + worker: Arc, } /// This struct is used specifically for receiving proposal responses. @@ -429,6 +430,36 @@ impl BlockEventDispatcher for EventDispatcher { } } +/// During integration tests, the `test_observer` needs to ensure that all events +/// that were triggered have actually been delivered, before it can pass on the +/// captured data. To make that work, during test we store weak references to +/// all the workers and make it possible to wait for all of them to catch up +/// in a single function call (see `catch_up_all_event_dispatchers`). +#[cfg(test)] +static ALL_WORKERS: Mutex>> = Mutex::new(Vec::new()); + +#[cfg(test)] +pub fn catch_up_all_event_dispatchers() { + let mut results = Vec::new(); + let mut guard = ALL_WORKERS.lock().unwrap(); + + // remove all items that have been dropped; call .noop() the rest + guard.retain_mut(|w| { + let Some(worker) = w.upgrade() else { + return false; + }; + results.push(worker.noop().unwrap()); + return true; + }); + // unlock the mutex + drop(guard); + + // block until all workers have caught up + for result in results { + result.wait_until_complete(); + } +} + impl EventDispatcher { pub fn new(working_dir: PathBuf) -> EventDispatcher { let mut db_path = working_dir; @@ -438,6 +469,13 @@ impl EventDispatcher { let worker = EventDispatcherWorker::new(db_path.clone()).expect("Failed to start worker thread"); + let worker = Arc::new(worker); + + #[cfg(test)] + { + ALL_WORKERS.lock().unwrap().push(Arc::downgrade(&worker)); + } + EventDispatcher { stackerdb_channel: Arc::new(Mutex::new(StackerDBChannel::new())), registered_observers: vec![], diff --git a/stacks-node/src/tests/neon_integrations.rs b/stacks-node/src/tests/neon_integrations.rs index 644402b81d0..3578fb876a4 100644 --- a/stacks-node/src/tests/neon_integrations.rs +++ b/stacks-node/src/tests/neon_integrations.rs @@ -292,7 +292,9 @@ pub mod test_observer { use warp::Filter; use {tokio, warp}; - use crate::event_dispatcher::{MinedBlockEvent, MinedMicroblockEvent, MinedNakamotoBlockEvent}; + use crate::event_dispatcher::{ + self, MinedBlockEvent, MinedMicroblockEvent, MinedNakamotoBlockEvent, + }; use crate::Config; pub const EVENT_OBSERVER_PORT: u16 = 50303; @@ -520,51 +522,70 @@ pub mod test_observer { Ok(warp::http::StatusCode::OK) } + /// Waits until all events that have been queued for dispatch until now + /// have actually been delivered. This ensures the tests have all the data + /// they need to assert on. + fn catch_up() { + event_dispatcher::catch_up_all_event_dispatchers(); + } + pub fn get_stacker_sets() -> Vec<(StacksBlockId, u64, RewardSet)> { + catch_up(); STACKER_SETS.lock().unwrap().clone() } pub fn get_memtxs() -> Vec { + catch_up(); MEMTXS.lock().unwrap().clone() } pub fn get_memtx_drops() -> Vec<(String, String)> { + catch_up(); MEMTXS_DROPPED.lock().unwrap().clone() } pub fn get_blocks() -> Vec { + catch_up(); NEW_BLOCKS.lock().unwrap().clone() } pub fn get_microblocks() -> Vec { + catch_up(); NEW_MICROBLOCKS.lock().unwrap().clone() } pub fn get_burn_blocks() -> Vec { + catch_up(); BURN_BLOCKS.lock().unwrap().clone() } pub fn get_attachments() -> Vec { + catch_up(); ATTACHMENTS.lock().unwrap().clone() } pub fn get_mined_blocks() -> Vec { + catch_up(); MINED_BLOCKS.lock().unwrap().clone() } pub fn get_mined_microblocks() -> Vec { + catch_up(); MINED_MICROBLOCKS.lock().unwrap().clone() } pub fn get_mined_nakamoto_blocks() -> Vec { + catch_up(); MINED_NAKAMOTO_BLOCKS.lock().unwrap().clone() } pub fn get_stackerdb_chunks() -> Vec { + catch_up(); NEW_STACKERDB_CHUNKS.lock().unwrap().clone() } pub fn get_proposal_responses() -> Vec { + catch_up(); PROPOSAL_RESPONSES.lock().unwrap().clone() } @@ -655,6 +676,7 @@ pub mod test_observer { } pub fn clear() { + catch_up(); NEW_BLOCKS.lock().unwrap().clear(); MINED_BLOCKS.lock().unwrap().clear(); MINED_MICROBLOCKS.lock().unwrap().clear(); @@ -8344,8 +8366,6 @@ fn push_boot_receipts() { let mut run_loop = neon::RunLoop::new(conf); let _chainstate = run_loop.boot_chainstate(&burnchain_config); - run_loop.get_event_dispatcher().catch_up(); - // verify that the event observer got its boot receipts let blocks = test_observer::get_blocks(); assert_eq!(blocks.len(), 1); From 1b4d5f462cd302797e6f99ea39bed7e303b1d7e8 Mon Sep 17 00:00:00 2001 From: benjamin-stacks <246469650+benjamin-stacks@users.noreply.github.com> Date: Fri, 9 Jan 2026 09:32:25 +0100 Subject: [PATCH 09/14] use the same event dispatcher for neon and nakamoto When switching runloops at the epoch 2/3 transition, this ensures that the same event dispatcher worker thread is handling delivery, which in turn ensures that all payloads are delivered in order --- stacks-node/src/run_loop/boot_nakamoto.rs | 3 ++- stacks-node/src/run_loop/mod.rs | 2 +- stacks-node/src/run_loop/nakamoto.rs | 16 ++++++++++++---- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/stacks-node/src/run_loop/boot_nakamoto.rs b/stacks-node/src/run_loop/boot_nakamoto.rs index e2644f1f5e4..bd73966e013 100644 --- a/stacks-node/src/run_loop/boot_nakamoto.rs +++ b/stacks-node/src/run_loop/boot_nakamoto.rs @@ -83,7 +83,7 @@ impl BootRunLoop { InnerLoops::Epoch2(neon), ) } else { - let naka = NakaRunLoop::new(config.clone(), None, None, None); + let naka = NakaRunLoop::new(config.clone(), None, None, None, None); ( naka.get_coordinator_channel().unwrap(), InnerLoops::Epoch3(naka), @@ -184,6 +184,7 @@ impl BootRunLoop { Some(termination_switch), Some(counters), monitoring_thread, + Some(neon_loop.get_event_dispatcher()), ); let new_coord_channels = naka .get_coordinator_channel() diff --git a/stacks-node/src/run_loop/mod.rs b/stacks-node/src/run_loop/mod.rs index 3fc5195c27c..5a903aec113 100644 --- a/stacks-node/src/run_loop/mod.rs +++ b/stacks-node/src/run_loop/mod.rs @@ -171,7 +171,7 @@ pub struct RegisteredKey { } pub fn announce_boot_receipts( - event_dispatcher: &mut EventDispatcher, + event_dispatcher: &EventDispatcher, chainstate: &StacksChainState, pox_constants: &PoxConstants, boot_receipts: &[StacksTransactionReceipt], diff --git a/stacks-node/src/run_loop/nakamoto.rs b/stacks-node/src/run_loop/nakamoto.rs index b335e83b42b..8a6a89fcb86 100644 --- a/stacks-node/src/run_loop/nakamoto.rs +++ b/stacks-node/src/run_loop/nakamoto.rs @@ -77,11 +77,16 @@ pub struct RunLoop { impl RunLoop { /// Sets up a runloop and node, given a config. + /// + /// If no event_dispatcher is passed, a new one is created. Allowing one to be passed in + /// allows the nakamoto runloop to continue using the same event dispatcher as the + /// neon runloop at the epoch 2->3 transition. pub fn new( config: Config, should_keep_running: Option>, counters: Option, monitoring_thread: Option>>, + event_dispatcher: Option, ) -> Self { let channels = CoordinatorCommunication::instantiate(); let should_keep_running = @@ -91,10 +96,13 @@ impl RunLoop { config.burnchain.burn_fee_cap, ))); - let mut event_dispatcher = EventDispatcher::new(config.get_working_dir()); - for observer in config.events_observers.iter() { - event_dispatcher.register_observer(observer); - } + let event_dispatcher = event_dispatcher.unwrap_or_else(|| { + let mut event_dispatcher = EventDispatcher::new(config.get_working_dir()); + for observer in config.events_observers.iter() { + event_dispatcher.register_observer(observer); + } + event_dispatcher + }); Self { config, From 16d9387313f3cd7bb1bbb71123b768462ec6b488 Mon Sep 17 00:00:00 2001 From: benjamin-stacks <246469650+benjamin-stacks@users.noreply.github.com> Date: Fri, 9 Jan 2026 09:37:11 +0100 Subject: [PATCH 10/14] give event dispatcher threads distinct names, and at some debug logging --- stacks-node/src/event_dispatcher/worker.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/stacks-node/src/event_dispatcher/worker.rs b/stacks-node/src/event_dispatcher/worker.rs index d17c1478ec1..6b41b198a84 100644 --- a/stacks-node/src/event_dispatcher/worker.rs +++ b/stacks-node/src/event_dispatcher/worker.rs @@ -15,6 +15,7 @@ // along with this program. If not, see . use std::path::PathBuf; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender}; use std::sync::Arc; use std::thread::{self, sleep}; @@ -92,6 +93,8 @@ pub struct EventDispatcherWorker { sender: SyncSender, } +static NEXT_THREAD_NUM: AtomicU64 = AtomicU64::new(1); + impl EventDispatcherWorker { pub fn new(db_path: PathBuf) -> Result { Self::new_with_custom_queue_size(db_path, 1_000) @@ -104,8 +107,10 @@ impl EventDispatcherWorker { let (message_tx, message_rx) = sync_channel(queue_size); let (ready_tx, ready_rx) = channel(); + let thread_num = NEXT_THREAD_NUM.fetch_add(1, Ordering::SeqCst); + thread::Builder::new() - .name("event-dispatcher".to_string()) + .name(format!("event-dispatcher-{thread_num}").to_string()) .spawn(move || { let conn = match EventDispatcherDbConnection::new(&db_path) { Ok(conn) => conn, @@ -154,6 +159,7 @@ impl EventDispatcherWorker { timeout_override: Option, ) -> Result { let (sender, receiver) = channel(); + debug!("Event Dispatcher Worker: sending payload {id}"); self.sender.send(WorkerMessage { task: WorkerTask::Payload { @@ -170,6 +176,7 @@ impl EventDispatcherWorker { #[cfg(test)] pub fn noop(&self) -> Result { let (sender, receiver) = channel(); + debug!("Event Dispatcher Worker: sending no-op"); self.sender.send(WorkerMessage { task: WorkerTask::NoOp, @@ -195,10 +202,13 @@ impl EventDispatcherWorker { } = task else { // no-op -- just ack and move on + debug!("Event Dispatcher Worker: doing no-op"); let _ = completion.send(()); continue; }; + debug!("Event Dispatcher Worker: doing payload {id}"); + // This will block forever if we were passed a non-existing ID. Don't do that. let mut payload = conn.get_payload_with_retry(id); From 3cd2e254b397525f9c3e44c8ae4e1e9db1937d28 Mon Sep 17 00:00:00 2001 From: benjamin-stacks <246469650+benjamin-stacks@users.noreply.github.com> Date: Fri, 9 Jan 2026 09:46:31 +0100 Subject: [PATCH 11/14] for testing, add a 250ms delay to event dispatcher thread handling This is just to check if CI passes. If it does, we can reasonably assume that d5fa2fc5d8bb22a8374f3ac24e18a2267eaa4fdf was all that's necessary. --- stacks-node/src/event_dispatcher/worker.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/stacks-node/src/event_dispatcher/worker.rs b/stacks-node/src/event_dispatcher/worker.rs index 6b41b198a84..ef78cf74043 100644 --- a/stacks-node/src/event_dispatcher/worker.rs +++ b/stacks-node/src/event_dispatcher/worker.rs @@ -195,6 +195,8 @@ impl EventDispatcherWorker { return; }; + thread::sleep(Duration::from_millis(250)); + let WorkerTask::Payload { id, disable_retries, From 4b360cc61c4b987602a120f72f231aab9644a394 Mon Sep 17 00:00:00 2001 From: benjamin-stacks <246469650+benjamin-stacks@users.noreply.github.com> Date: Fri, 9 Jan 2026 12:44:21 +0100 Subject: [PATCH 12/14] catch up in mine_nakamoto_block_without_commit Lots of tests failing with state machine update timeouts. It's expected that the tests take longer now, that's not what we're worried about. --- stacks-node/src/tests/signer/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/stacks-node/src/tests/signer/mod.rs b/stacks-node/src/tests/signer/mod.rs index d8db0e960d0..f374322919d 100644 --- a/stacks-node/src/tests/signer/mod.rs +++ b/stacks-node/src/tests/signer/mod.rs @@ -72,6 +72,7 @@ use super::neon_integrations::{ copy_dir_all, get_account, get_sortition_info_ch, submit_tx_fallible, Account, }; use crate::burnchains::bitcoin::core_controller::BitcoinCoreController; +use crate::event_dispatcher::catch_up_all_event_dispatchers; use crate::nakamoto_node::miner::TEST_MINE_SKIP; use crate::neon::Counters; use crate::run_loop::boot_nakamoto; @@ -1140,6 +1141,7 @@ impl SignerTest { TEST_MINE_SKIP.set(true); let mined_blocks = self.running_nodes.counters.naka_mined_blocks.clone(); let mined_before = mined_blocks.get(); + catch_up_all_event_dispatchers(); self.mine_bitcoin_block(); wait_for_state_machine_update_by_miner_tenure_id( timeout.as_secs(), From 821d08c576c55fa0bce580ce66081da1bf7038b5 Mon Sep 17 00:00:00 2001 From: benjamin-stacks <246469650+benjamin-stacks@users.noreply.github.com> Date: Fri, 9 Jan 2026 14:15:11 +0100 Subject: [PATCH 13/14] reduce to 50ms these tests fire a *lot* of events --- stacks-node/src/event_dispatcher/worker.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stacks-node/src/event_dispatcher/worker.rs b/stacks-node/src/event_dispatcher/worker.rs index ef78cf74043..30e82516b09 100644 --- a/stacks-node/src/event_dispatcher/worker.rs +++ b/stacks-node/src/event_dispatcher/worker.rs @@ -195,7 +195,7 @@ impl EventDispatcherWorker { return; }; - thread::sleep(Duration::from_millis(250)); + thread::sleep(Duration::from_millis(50)); let WorkerTask::Payload { id, From 6acb3eccfbb3ce8bf09705fc7fde3b77acdf8060 Mon Sep 17 00:00:00 2001 From: benjamin-stacks <246469650+benjamin-stacks@users.noreply.github.com> Date: Fri, 9 Jan 2026 15:44:28 +0100 Subject: [PATCH 14/14] two tests left -- more catch up! --- stacks-node/src/tests/signer/v0.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/stacks-node/src/tests/signer/v0.rs b/stacks-node/src/tests/signer/v0.rs index c3aeaa886a0..52d23909c56 100644 --- a/stacks-node/src/tests/signer/v0.rs +++ b/stacks-node/src/tests/signer/v0.rs @@ -115,7 +115,9 @@ use tracing_subscriber::{fmt, EnvFilter}; use super::SignerTest; use crate::clarity::vm::clarity::ClarityConnection; -use crate::event_dispatcher::{MinedNakamotoBlockEvent, TEST_SKIP_BLOCK_ANNOUNCEMENT}; +use crate::event_dispatcher::{ + catch_up_all_event_dispatchers, MinedNakamotoBlockEvent, TEST_SKIP_BLOCK_ANNOUNCEMENT, +}; use crate::nakamoto_node::miner::{ fault_injection_stall_miner, fault_injection_unstall_miner, TEST_BLOCK_ANNOUNCE_STALL, TEST_BROADCAST_PROPOSAL_STALL, TEST_MINE_SKIP, TEST_P2P_BROADCAST_STALL, @@ -7879,6 +7881,7 @@ fn mock_sign_epoch_25() { < epoch_3_boundary { let mut mock_block_mesage = None; + catch_up_all_event_dispatchers(); let mock_poll_time = Instant::now(); signer_test .running_nodes @@ -11340,6 +11343,8 @@ fn block_validation_pending_table() { // Set the delay to 0 so that the block validation finishes quickly TEST_VALIDATE_DELAY_DURATION_SECS.set(0); + catch_up_all_event_dispatchers(); + wait_for(30, || { let proposal_responses = test_observer::get_proposal_responses(); let found_proposal = proposal_responses