diff --git a/stacks-node/src/event_dispatcher.rs b/stacks-node/src/event_dispatcher.rs index e7aaff025db..1ee47038a46 100644 --- a/stacks-node/src/event_dispatcher.rs +++ b/stacks-node/src/event_dispatcher.rs @@ -1,5 +1,5 @@ // Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation -// Copyright (C) 2020-2025 Stacks Open Internet Foundation +// Copyright (C) 2020-2026 Stacks Open Internet Foundation // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -16,21 +16,20 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; +use std::fmt; use std::path::PathBuf; #[cfg(test)] use std::sync::mpsc::channel; -#[cfg(test)] -use std::sync::LazyLock; use std::sync::{Arc, Mutex}; -use std::thread::sleep; -use std::time::Duration; +#[cfg(test)] +use std::sync::{LazyLock, Weak}; +use std::time::{Duration, SystemTime}; use clarity::vm::costs::ExecutionCost; use clarity::vm::events::{FTEventType, NFTEventType, STXEventType}; use clarity::vm::types::{AssetIdentifier, QualifiedContractIdentifier}; #[cfg(any(test, feature = "testing"))] use lazy_static::lazy_static; -use rand::Rng; use serde_json::json; use stacks::burnchains::{PoxConstants, Txid}; use stacks::chainstate::burn::ConsensusHash; @@ -53,19 +52,17 @@ use stacks::net::api::postblock_proposal::{ BlockValidateOk, BlockValidateReject, BlockValidateResponse, }; use stacks::net::atlas::{Attachment, AttachmentInstance}; -use stacks::net::http::HttpRequestContents; -use stacks::net::httpcore::{send_http_request, StacksHttpRequest}; use stacks::net::stackerdb::StackerDBEventDispatcher; #[cfg(any(test, feature = "testing"))] use stacks::util::tests::TestFlag; use stacks_common::bitvec::BitVec; use stacks_common::types::chainstate::{BlockHeaderHash, BurnchainHeaderHash, StacksBlockId}; -use stacks_common::types::net::PeerHost; use url::Url; mod db; mod payloads; mod stacker_db; +mod worker; use db::EventDispatcherDbConnection; use payloads::*; @@ -75,6 +72,9 @@ pub use payloads::{ }; pub use stacker_db::StackerDBChannel; +use crate::event_dispatcher::db::PendingPayload; +use crate::event_dispatcher::worker::{EventDispatcherResult, EventDispatcherWorker}; + #[cfg(test)] mod tests; @@ -84,6 +84,69 @@ lazy_static! { pub static ref TEST_SKIP_BLOCK_ANNOUNCEMENT: TestFlag = TestFlag::default(); } +#[derive(Debug)] +enum EventDispatcherError { + SerializationError(serde_json::Error), + HttpError(std::io::Error), + DbError(stacks::util_lib::db::Error), + RecvError(std::sync::mpsc::RecvError), + SendError(String), // not capturing the underlying because it's a generic type +} + +impl fmt::Display for EventDispatcherError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + EventDispatcherError::SerializationError(ref e) => fmt::Display::fmt(e, f), + EventDispatcherError::HttpError(ref e) => fmt::Display::fmt(e, f), + EventDispatcherError::DbError(ref e) => fmt::Display::fmt(e, f), + EventDispatcherError::RecvError(ref e) => fmt::Display::fmt(e, f), + EventDispatcherError::SendError(ref s) => fmt::Display::fmt(s, f), + } + } +} + +impl core::error::Error for EventDispatcherError { + fn cause(&self) -> Option<&dyn core::error::Error> { + match *self { + EventDispatcherError::SerializationError(ref e) => Some(e), + EventDispatcherError::HttpError(ref e) => Some(e), + EventDispatcherError::DbError(ref e) => Some(e), + EventDispatcherError::RecvError(ref e) => Some(e), + EventDispatcherError::SendError(_) => None, + } + } +} + +impl From for EventDispatcherError { + fn from(value: serde_json::Error) -> Self { + EventDispatcherError::SerializationError(value) + } +} + +impl From for EventDispatcherError { + fn from(value: stacks::util_lib::db::Error) -> Self { + EventDispatcherError::DbError(value) + } +} + +impl From for EventDispatcherError { + fn from(value: std::io::Error) -> Self { + EventDispatcherError::HttpError(value) + } +} + +impl From for EventDispatcherError { + fn from(value: std::sync::mpsc::RecvError) -> Self { + EventDispatcherError::RecvError(value) + } +} + +impl From> for EventDispatcherError { + fn from(value: std::sync::mpsc::SendError) -> Self { + EventDispatcherError::SendError(format!("{value}")) + } +} + #[derive(Debug, Clone)] struct EventObserver { /// URL to which events will be sent @@ -121,6 +184,12 @@ impl EventObserver { } } +struct EventRequestData { + pub url: String, + pub payload_bytes: Arc<[u8]>, + pub timeout: Duration, +} + /// Events received from block-processing. /// Stacks events are structured as JSON, and are grouped by topic. An event observer can /// subscribe to one or more specific event streams, or the "any" stream to receive all of them. @@ -157,16 +226,19 @@ pub struct EventDispatcher { block_proposal_observers_lookup: HashSet, /// Channel for sending StackerDB events to the miner coordinator pub stackerdb_channel: Arc>, - /// Path to the database where pending payloads are stored. If `None`, then - /// the database is not used and events are not recoverable across restarts. - db_path: Option, + /// Path to the database where pending payloads are stored. + db_path: PathBuf, + /// The worker thread that performs the actuall HTTP requests so that they don't block + /// the main operation of the node. It's wrapped in an `Arc` only to make some test helpers + /// work (see `ALL_WORKERS`); in release code it wouldn't be necessary. + worker: Arc, } /// This struct is used specifically for receiving proposal responses. /// It's constructed separately to play nicely with threading. struct ProposalCallbackHandler { observers: Vec, - db_path: Option, + dispatcher: EventDispatcher, } impl ProposalCallbackReceiver for ProposalCallbackHandler { @@ -183,13 +255,10 @@ impl ProposalCallbackReceiver for ProposalCallbackHandler { }; for observer in self.observers.iter() { - EventDispatcher::send_payload_given_db_path( - &self.db_path, - observer, - &response, - PATH_PROPOSAL_RESPONSE, - None, - ); + self.dispatcher + .dispatch_to_observer(observer, &response, PATH_PROPOSAL_RESPONSE) + .unwrap() + .wait_until_complete(); } } } @@ -280,7 +349,7 @@ impl MemPoolEventDispatcher for EventDispatcher { } let handler = ProposalCallbackHandler { observers: callback_receivers, - db_path: self.db_path.clone(), + dispatcher: self.clone(), }; Some(Box::new(handler)) } @@ -361,20 +430,52 @@ impl BlockEventDispatcher for EventDispatcher { } } -impl Default for EventDispatcher { - fn default() -> Self { - EventDispatcher::new(None) +/// During integration tests, the `test_observer` needs to ensure that all events +/// that were triggered have actually been delivered, before it can pass on the +/// captured data. To make that work, during test we store weak references to +/// all the workers and make it possible to wait for all of them to catch up +/// in a single function call (see `catch_up_all_event_dispatchers`). +#[cfg(test)] +static ALL_WORKERS: Mutex>> = Mutex::new(Vec::new()); + +#[cfg(test)] +pub fn catch_up_all_event_dispatchers() { + let mut results = Vec::new(); + let mut guard = ALL_WORKERS.lock().unwrap(); + + // remove all items that have been dropped; call .noop() the rest + guard.retain_mut(|w| { + let Some(worker) = w.upgrade() else { + return false; + }; + results.push(worker.noop().unwrap()); + return true; + }); + // unlock the mutex + drop(guard); + + // block until all workers have caught up + for result in results { + result.wait_until_complete(); } } impl EventDispatcher { - pub fn new(working_dir: Option) -> EventDispatcher { - let db_path = if let Some(mut db_path) = working_dir { - db_path.push("event_observers.sqlite"); - Some(db_path) - } else { - None - }; + pub fn new(working_dir: PathBuf) -> EventDispatcher { + let mut db_path = working_dir; + db_path.push("event_observers.sqlite"); + EventDispatcherDbConnection::new(&db_path).expect("Failed to initialize database"); + + let worker = + EventDispatcherWorker::new(db_path.clone()).expect("Failed to start worker thread"); + + let worker = Arc::new(worker); + + #[cfg(test)] + { + ALL_WORKERS.lock().unwrap().push(Arc::downgrade(&worker)); + } + EventDispatcher { stackerdb_channel: Arc::new(Mutex::new(StackerDBChannel::new())), registered_observers: vec![], @@ -390,9 +491,19 @@ impl EventDispatcher { stackerdb_observers_lookup: HashSet::new(), block_proposal_observers_lookup: HashSet::new(), db_path, + worker, } } + /// Sends a noop task to the worker and waits until its completion is acknowledged. + /// This has the effect that all payloads that have been submitted before this point + /// are also done, which is a useful thing to wait for in some tests where you want + /// to assert on certain event deliveries. + #[cfg(test)] + pub fn catch_up(&self) { + self.worker.noop().unwrap().wait_until_complete(); + } + pub fn process_burn_block( &self, burn_block: &BurnchainHeaderHash, @@ -603,11 +714,10 @@ impl EventDispatcher { ); // Send payload - self.send_payload( + self.dispatch_to_observer_or_log_error( &self.registered_observers[observer_id], &payload, PATH_BLOCK_PROCESSED, - None, ); } } @@ -1007,14 +1117,12 @@ impl EventDispatcher { event_observer } - /// Process any pending payloads in the database. - /// This is called when the event dispatcher is first instantiated. + /// Process any pending payloads in the database. This is meant to be called at startup, in order to + /// handle anything that was enqueued but not sent before shutdown. This method blocks until all + /// requests are made (or, if the observer is no longer registered, removed from the DB). pub fn process_pending_payloads(&self) { - let Some(db_path) = &self.db_path else { - return; - }; let conn = - EventDispatcherDbConnection::new(db_path).expect("Failed to initialize database"); + EventDispatcherDbConnection::new(&self.db_path).expect("Failed to initialize database"); let pending_payloads = match conn.get_pending_payloads() { Ok(payloads) => payloads, Err(e) => { @@ -1031,10 +1139,20 @@ impl EventDispatcher { pending_payloads.len() ); - for (id, url, payload_bytes, _timeout_ms) in pending_payloads { - info!("Event dispatcher: processing pending payload: {url}"); - let full_url = Url::parse(url.as_str()) - .unwrap_or_else(|_| panic!("Event dispatcher: unable to parse {url} as a URL")); + for PendingPayload { + id, request_data, .. + } in pending_payloads + { + info!( + "Event dispatcher: processing pending payload: {}", + request_data.url + ); + let full_url = Url::parse(request_data.url.as_str()).unwrap_or_else(|_| { + panic!( + "Event dispatcher: unable to parse {} as a URL", + request_data.url + ) + }); // find the right observer let observer = self.registered_observers.iter().find(|observer| { let endpoint_url = Url::parse(format!("http://{}", &observer.endpoint).as_str()) @@ -1051,7 +1169,7 @@ impl EventDispatcher { // This observer is no longer registered, skip and delete info!( "Event dispatcher: observer {} no longer registered, skipping", - url + request_data.url ); if let Err(e) = conn.delete_payload(id) { error!( @@ -1062,202 +1180,95 @@ impl EventDispatcher { continue; }; - Self::send_payload_with_bytes( - &self.db_path, - observer, - payload_bytes, - full_url.path(), - Some(id), - ); - - #[cfg(test)] - if TEST_EVENT_OBSERVER_SKIP_RETRY.get() { - warn!("Fault injection: delete_payload"); - return; - } - - if let Err(e) = conn.delete_payload(id) { - error!( - "Event observer: failed to delete pending payload from database"; - "error" => ?e - ); - } + // If the timeout configuration for this observer is different from what it was + // originally, the updated config wins. + self.worker + .initiate_send(id, observer.disable_retries, Some(observer.timeout)) + .expect("failed to dispatch pending event payload to worker thread") + .wait_until_complete(); } } - fn send_payload_directly( - payload_bytes: &Arc<[u8]>, - full_url: &str, - timeout: Duration, - disable_retries: bool, - ) -> bool { - debug!( - "Event dispatcher: Sending payload"; "url" => %full_url, "bytes" => payload_bytes.len() - ); - - let url = Url::parse(full_url) - .unwrap_or_else(|_| panic!("Event dispatcher: unable to parse {full_url} as a URL")); - - let host = url.host_str().expect("Invalid URL: missing host"); - let port = url.port_or_known_default().unwrap_or(80); - let peerhost: PeerHost = format!("{host}:{port}") - .parse() - .unwrap_or(PeerHost::DNS(host.to_string(), port)); - - let mut backoff = Duration::from_millis(100); - let mut attempts: i32 = 0; - // Cap the backoff at 3x the timeout - let max_backoff = timeout.saturating_mul(3); - - loop { - let mut request = StacksHttpRequest::new_for_peer( - peerhost.clone(), - "POST".into(), - url.path().into(), - HttpRequestContents::new().payload_json_bytes(Arc::clone(payload_bytes)), - ) - .unwrap_or_else(|_| panic!("FATAL: failed to encode infallible data as HTTP request")); - request.add_header("Connection".into(), "close".into()); - match send_http_request(host, port, request, timeout) { - Ok(response) => { - if response.preamble().status_code == 200 { - debug!( - "Event dispatcher: Successful POST"; "url" => %url - ); - break; - } else { - error!( - "Event dispatcher: Failed POST"; "url" => %url, "response" => ?response.preamble() - ); - } - } - Err(err) => { - warn!( - "Event dispatcher: connection or request failed to {host}:{port} - {err:?}"; - "backoff" => ?backoff, - "attempts" => attempts - ); - } - } - - if disable_retries { - warn!("Observer is configured in disable_retries mode: skipping retry of payload"); - return false; - } - - #[cfg(test)] - if TEST_EVENT_OBSERVER_SKIP_RETRY.get() { - warn!("Fault injection: skipping retry of payload"); - return false; - } - - sleep(backoff); - let jitter: u64 = rand::thread_rng().gen_range(0..100); - backoff = std::cmp::min( - backoff.saturating_mul(2) + Duration::from_millis(jitter), - max_backoff, - ); - attempts = attempts.saturating_add(1); - } - true - } - - fn send_payload( + /// A successful result from this method only indicates that that payload was successfully + /// enqueued, not that the HTTP request was actually made. If you need to wait until that's + /// the case, call `wait_until_complete()` on the `EventDispatcherResult`. + fn dispatch_to_observer( &self, event_observer: &EventObserver, payload: &serde_json::Value, path: &str, - id: Option, - ) { - Self::send_payload_given_db_path(&self.db_path, event_observer, payload, path, id); - } - - fn send_payload_given_db_path( - db_path: &Option, - event_observer: &EventObserver, - payload: &serde_json::Value, - path: &str, - id: Option, - ) { - let payload_bytes = match serde_json::to_vec(payload) { - Ok(bytes) => Arc::<[u8]>::from(bytes), + ) -> Result { + let full_url = Self::get_full_url(event_observer, path); + let bytes = match Self::get_payload_bytes(payload) { + Ok(bytes) => bytes, Err(err) => { error!( "Event dispatcher: failed to serialize payload"; "path" => path, "error" => ?err ); - return; + return Err(err); } }; - Self::send_payload_with_bytes(db_path, event_observer, payload_bytes, path, id); + + let data = EventRequestData { + payload_bytes: bytes, + url: full_url, + timeout: event_observer.timeout, + }; + + let id = self.save_to_db(&data); + + self.worker + .initiate_send(id, event_observer.disable_retries, None) } - fn send_payload_with_bytes( - db_path: &Option, + /// This fire-and-forget version of `dispatch_to_observer` logs any error from enqueueing the + /// request, and does not give you a way to wait for blocking until it's sent. If you need + /// more control, use `dispatch_to_observer()` directly and handle the result yourself. + /// + /// This method exists because we generally don't want the event dispatcher to interrupt the node's + /// processing. + fn dispatch_to_observer_or_log_error( + &self, event_observer: &EventObserver, - payload_bytes: Arc<[u8]>, + payload: &serde_json::Value, path: &str, - id: Option, ) { - // Construct the full URL + if let Err(err) = self.dispatch_to_observer(event_observer, payload, path) { + error!("Event dispatcher: Failed to enqueue payload for sending to observer: {err:?}"); + } + } + + fn get_payload_bytes(payload: &serde_json::Value) -> Result, EventDispatcherError> { + let payload_bytes = serde_json::to_vec(payload)?; + Ok(Arc::<[u8]>::from(payload_bytes)) + } + + fn get_full_url(event_observer: &EventObserver, path: &str) -> String { let url_str = if path.starts_with('/') { format!("{}{path}", &event_observer.endpoint) } else { format!("{}/{path}", &event_observer.endpoint) }; - let full_url = format!("http://{url_str}"); - - // if the observer is in "disable_retries" mode quickly send the payload without checking for the db - if event_observer.disable_retries { - Self::send_payload_directly(&payload_bytes, &full_url, event_observer.timeout, true); - } else if let Some(db_path) = db_path { - // Because the DB is initialized in the call to process_pending_payloads() during startup, - // it is *probably* ok to skip initialization here. That said, at the time of writing this is the - // only call to new_without_init(), and we might want to revisit the question whether it's - // really worth it. - let conn = EventDispatcherDbConnection::new_without_init(db_path) - .expect("Failed to open database for event observer"); - - let id = match id { - Some(id) => id, - None => { - conn.insert_payload_with_retry( - &full_url, - payload_bytes.as_ref(), - event_observer.timeout, - ); - conn.last_insert_rowid() - } - }; + format!("http://{url_str}") + } - let success = Self::send_payload_directly( - &payload_bytes, - &full_url, - event_observer.timeout, - false, - ); - // This is only `false` when the TestFlag is set to skip retries - if !success { - return; - } + fn save_to_db(&self, data: &EventRequestData) -> i64 { + // Because the DB is initialized in the call to process_pending_payloads() during startup, + // it is *probably* ok to skip initialization here. That said, at the time of writing this is the + // only call to new_without_init(), and we might want to revisit the question whether it's + // really worth it. + let conn = EventDispatcherDbConnection::new_without_init(&self.db_path) + .expect("Failed to open database for event observer"); - if let Err(e) = conn.delete_payload(id) { - error!( - "Event observer: failed to delete pending payload from database"; - "error" => ?e - ); - } - } else { - // No database, just send the payload - Self::send_payload_directly(&payload_bytes, &full_url, event_observer.timeout, false); - } + conn.insert_payload_with_retry(data, SystemTime::now()) } fn send_new_attachments(&self, event_observer: &EventObserver, payload: &serde_json::Value) { - self.send_payload(event_observer, payload, PATH_ATTACHMENT_PROCESSED, None); + self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_ATTACHMENT_PROCESSED); } fn send_new_mempool_txs(&self, event_observer: &EventObserver, payload: &serde_json::Value) { - self.send_payload(event_observer, payload, PATH_MEMPOOL_TX_SUBMIT, None); + self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_MEMPOOL_TX_SUBMIT); } /// Serializes new microblocks data into a JSON payload and sends it off to the correct path @@ -1290,7 +1301,7 @@ impl EventDispatcher { "burn_block_timestamp": burn_block_timestamp, }); - self.send_payload(event_observer, &payload, PATH_MICROBLOCK_SUBMIT, None); + self.dispatch_to_observer_or_log_error(event_observer, &payload, PATH_MICROBLOCK_SUBMIT); } fn send_dropped_mempool_txs( @@ -1298,15 +1309,15 @@ impl EventDispatcher { event_observer: &EventObserver, payload: &serde_json::Value, ) { - self.send_payload(event_observer, payload, PATH_MEMPOOL_TX_DROP, None); + self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_MEMPOOL_TX_DROP); } fn send_mined_block(&self, event_observer: &EventObserver, payload: &serde_json::Value) { - self.send_payload(event_observer, payload, PATH_MINED_BLOCK, None); + self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_MINED_BLOCK); } fn send_mined_microblock(&self, event_observer: &EventObserver, payload: &serde_json::Value) { - self.send_payload(event_observer, payload, PATH_MINED_MICROBLOCK, None); + self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_MINED_MICROBLOCK); } fn send_mined_nakamoto_block( @@ -1314,15 +1325,15 @@ impl EventDispatcher { event_observer: &EventObserver, payload: &serde_json::Value, ) { - self.send_payload(event_observer, payload, PATH_MINED_NAKAMOTO_BLOCK, None); + self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_MINED_NAKAMOTO_BLOCK); } fn send_stackerdb_chunks(&self, event_observer: &EventObserver, payload: &serde_json::Value) { - self.send_payload(event_observer, payload, PATH_STACKERDB_CHUNKS, None); + self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_STACKERDB_CHUNKS); } fn send_new_burn_block(&self, event_observer: &EventObserver, payload: &serde_json::Value) { - self.send_payload(event_observer, payload, PATH_BURN_BLOCK_SUBMIT, None); + self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_BURN_BLOCK_SUBMIT); } } diff --git a/stacks-node/src/event_dispatcher/db.rs b/stacks-node/src/event_dispatcher/db.rs index 122812592c8..2aa091e2284 100644 --- a/stacks-node/src/event_dispatcher/db.rs +++ b/stacks-node/src/event_dispatcher/db.rs @@ -1,5 +1,5 @@ // Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation -// Copyright (C) 2020-2025 Stacks Open Internet Foundation +// Copyright (C) 2020-2026 Stacks Open Internet Foundation // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -14,15 +14,26 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use std::fmt::Debug; use std::path::PathBuf; use std::sync::Arc; use std::thread::sleep; -use std::time::Duration; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use rusqlite::{params, Connection}; +use rusqlite::{params, Connection, Row}; use stacks::util_lib::db::Error as db_error; +use crate::event_dispatcher::EventRequestData; + +pub struct PendingPayload { + pub request_data: EventRequestData, + #[allow(dead_code)] // will be used in a follow-up commit + pub timestamp: SystemTime, + pub id: i64, +} + /// Wraps a SQlite connection to the database in which pending event payloads are stored +#[derive(Debug)] pub struct EventDispatcherDbConnection { connection: Connection, } @@ -40,17 +51,15 @@ impl EventDispatcherDbConnection { id INTEGER PRIMARY KEY AUTOINCREMENT, url TEXT NOT NULL, payload BLOB NOT NULL, - timeout INTEGER NOT NULL + timeout INTEGER NOT NULL, + timestamp INTEGER NOT NULL )", [], )?; let mut connection = EventDispatcherDbConnection { connection }; - if let Some(col_type) = connection.get_payload_column_type()? { - if col_type.eq_ignore_ascii_case("TEXT") { - info!("Event observer: migrating pending_payloads.payload from TEXT to BLOB"); - connection.migrate_payload_column_to_blob()?; - } - } + + connection.run_necessary_migrations()?; + Ok(connection) } @@ -59,71 +68,58 @@ impl EventDispatcherDbConnection { EventDispatcherDbConnection { connection } } - /// Insert a payload into the database, retrying on failure. - pub fn insert_payload_with_retry(&self, url: &str, payload_bytes: &[u8], timeout: Duration) { - let mut attempts = 0i64; - let mut backoff = Duration::from_millis(100); // Initial backoff duration - let max_backoff = Duration::from_secs(5); // Cap the backoff duration - - loop { - match self.insert_payload(url, payload_bytes, timeout) { - Ok(_) => { - // Successful insert, break the loop - return; - } - Err(err) => { - // Log the error, then retry after a delay - warn!("Failed to insert payload into event observer database: {err:?}"; - "backoff" => ?backoff, - "attempts" => attempts - ); - - // Wait for the backoff duration - sleep(backoff); - - // Increase the backoff duration (with exponential backoff) - backoff = std::cmp::min(backoff.saturating_mul(2), max_backoff); - - attempts = attempts.saturating_add(1); - } - } - } + /// Insert a payload into the database, retrying on failure. Returns the id of of the inserted record. + pub fn insert_payload_with_retry(&self, data: &EventRequestData, timestamp: SystemTime) -> i64 { + with_retry( + || self.insert_payload(data, timestamp), + "Failed to insert payload into event observer database".to_string(), + ) + } + + pub fn get_payload_with_retry(&self, id: i64) -> PendingPayload { + with_retry( + || self.get_payload(id), + "Failed to retrieve payload {id} from event observer database".to_string(), + ) } pub fn insert_payload( &self, - url: &str, - payload_bytes: &[u8], - timeout: Duration, - ) -> Result<(), db_error> { - let timeout_ms: u64 = timeout.as_millis().try_into().expect("Timeout too large"); - self.connection.execute( - "INSERT INTO pending_payloads (url, payload, timeout) VALUES (?1, ?2, ?3)", - params![url, payload_bytes, timeout_ms], + data: &EventRequestData, + timestamp: SystemTime, + ) -> Result { + let timeout_ms: u64 = data + .timeout + .as_millis() + .try_into() + .expect("Timeout too large"); + + let timestamp_s = timestamp + .duration_since(UNIX_EPOCH) + .expect("system clock is multiple decades slow") + .as_secs(); + + let id: i64 = self.connection.query_row( + "INSERT INTO pending_payloads (url, payload, timeout, timestamp) VALUES (?1, ?2, ?3, ?4) RETURNING id", + params![data.url, data.payload_bytes, timeout_ms, timestamp_s], + |row| row.get(0), )?; - Ok(()) + Ok(id) } - // TODO: change this to get the id from the insertion directly, because that's more reliable - pub fn last_insert_rowid(&self) -> i64 { - self.connection.last_insert_rowid() + pub fn get_payload(&self, id: i64) -> Result { + self.connection.query_row_and_then( + &format!("SELECT {PAYLOAD_FIELDS} FROM pending_payloads WHERE id = ?1"), + [id], + row_to_pending_payload, + ) } - pub fn get_pending_payloads(&self) -> Result, u64)>, db_error> { - let mut stmt = self - .connection - .prepare("SELECT id, url, payload, timeout FROM pending_payloads ORDER BY id")?; - let payload_iter = stmt.query_and_then( - [], - |row| -> Result<(i64, String, Arc<[u8]>, u64), db_error> { - let id: i64 = row.get(0)?; - let url: String = row.get(1)?; - let payload_bytes: Vec = row.get(2)?; - let payload_bytes = Arc::<[u8]>::from(payload_bytes); - let timeout_ms: u64 = row.get(3)?; - Ok((id, url, payload_bytes, timeout_ms)) - }, - )?; + pub fn get_pending_payloads(&self) -> Result, db_error> { + let mut stmt = self.connection.prepare(&format!( + "SELECT {PAYLOAD_FIELDS} FROM pending_payloads ORDER BY id" + ))?; + let payload_iter = stmt.query_and_then([], row_to_pending_payload)?; payload_iter.collect() } @@ -133,25 +129,69 @@ impl EventDispatcherDbConnection { Ok(()) } - fn get_payload_column_type(&self) -> Result, db_error> { + /// The initial schema of the datebase when this code was first created + const INITIAL_SCHEMA: u32 = 0; + /// The `payload`` column type changed from TEXT to BLOB + const PAYLOAD_IS_BLOB: u32 = 1; + /// Column `timestamp` added + const TIMESTAMP_COLUMN: u32 = 2; + + fn run_necessary_migrations(&mut self) -> Result<(), db_error> { + let current_schema = self.get_schema_version()?; + + if current_schema < Self::PAYLOAD_IS_BLOB { + info!("Event observer: migrating pending_payloads.payload from TEXT to BLOB"); + self.migrate_payload_column_to_blob()?; + } + + if current_schema < Self::TIMESTAMP_COLUMN { + info!("Event observer: adding timestamp to pending_payloads"); + self.add_timestamp_column()?; + } + + Ok(()) + } + + fn get_schema_version(&self) -> Result { let mut stmt = self .connection .prepare("PRAGMA table_info(pending_payloads)")?; + let name_col = stmt.column_index("name")?; + let type_col = stmt.column_index("type")?; + let rows = stmt.query_map([], |row| { - let name: String = row.get(1)?; - let col_type: String = row.get(2)?; + let name: String = row.get(name_col)?; + let col_type: String = row.get(type_col)?; Ok((name, col_type)) })?; + let mut payload_is_blob = None; + let mut timestamp_exists = false; + for row in rows { let (name, col_type) = row?; if name == "payload" { - return Ok(Some(col_type)); + payload_is_blob = Some(col_type.eq_ignore_ascii_case("BLOB")); + } else if name == "timestamp" { + timestamp_exists = true; } } - Ok(None) + if timestamp_exists && !(payload_is_blob == Some(true)) { + warn!("Pending event payload database schema is inconsistent"); + return Err(db_error::Corruption); + } + + let mut result = Self::INITIAL_SCHEMA; + + if timestamp_exists { + result = Self::TIMESTAMP_COLUMN + } else if payload_is_blob == Some(true) { + result = Self::PAYLOAD_IS_BLOB; + } + + Ok(result) } fn migrate_payload_column_to_blob(&mut self) -> Result<(), db_error> { @@ -178,10 +218,111 @@ impl EventDispatcherDbConnection { tx.commit()?; Ok(()) } + + fn add_timestamp_column(&mut self) -> Result<(), db_error> { + let tx = self.connection.transaction()?; + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("time travel to pre-1970 is not supported") + .as_secs(); + + tx.execute( + "ALTER TABLE pending_payloads RENAME TO pending_payloads_old", + [], + )?; + tx.execute( + "CREATE TABLE pending_payloads ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + url TEXT NOT NULL, + payload BLOB NOT NULL, + timeout INTEGER NOT NULL, + timestamp INTEGER NOT NULL + )", + [], + )?; + tx.execute( + "INSERT INTO pending_payloads (id, url, payload, timeout, timestamp) + SELECT id, url, CAST(payload AS BLOB), timeout, ?1 FROM pending_payloads_old", + [now], + )?; + tx.execute("DROP TABLE pending_payloads_old", [])?; + tx.commit()?; + Ok(()) + } +} + +// If you change this, make sure to change `row_to_pending_payload` in sync. +const PAYLOAD_FIELDS: &str = "id, url, payload, timeout, timestamp"; + +/// This function should only be used with rows that were SELECTed using the +/// `PAYLOAD_FIELDS` constant. +fn row_to_pending_payload(row: &Row) -> Result { + let id: i64 = row.get(0)?; + let url: String = row.get(1)?; + let payload_bytes: Vec = row.get(2)?; + let payload_bytes = Arc::<[u8]>::from(payload_bytes); + let timeout_ms: u64 = row.get(3)?; + let timestamp_s: u64 = row.get(4)?; + let request_data = EventRequestData { + url, + payload_bytes, + timeout: Duration::from_millis(timeout_ms), + }; + + Ok(PendingPayload { + id, + request_data, + timestamp: UNIX_EPOCH + Duration::from_secs(timestamp_s), + }) +} + +/// Calls the given function, repeatedly if necessary, until it doesn't fail, and then +/// returns the result from the successful call. Initially backs off for 0.1s and increases +/// backoff exponentially up to a max of five seconds. If the function never returns a +/// success result, `with_retry` will block forever. +/// +/// # Example +/// +/// let response = with_retry(|| perform_db_op(42), "database operation 42 failed"); +fn with_retry(f: F, error_log_text: String) -> T +where + F: Fn() -> Result, + E: Debug, +{ + let mut attempts = 0i64; + let mut backoff = Duration::from_millis(100); // Initial backoff duration + let max_backoff = Duration::from_secs(5); // Cap the backoff duration + + loop { + match f() { + Ok(thing) => { + // Successful operation, break the loop + return thing; + } + Err(err) => { + // Log the error, then retry after a delay + warn!("{error_log_text}: {err:?}"; + "backoff" => ?backoff, + "attempts" => attempts + ); + + // Wait for the backoff duration + sleep(backoff); + + // Increase the backoff duration (with exponential backoff) + backoff = std::cmp::min(backoff.saturating_mul(2), max_backoff); + + attempts = attempts.saturating_add(1); + } + } + } } #[cfg(test)] mod test { + use std::cell::RefCell; + use std::time::Instant; + use serde_json::json; use tempfile::tempdir; @@ -212,7 +353,7 @@ mod test { } #[test] - fn test_migrate_payload_column_to_blob() { + fn test_migration() { let dir = tempdir().unwrap(); let db_path = dir.path().join("test_payload_migration.sqlite"); @@ -252,17 +393,60 @@ mod test { "Payload column was not migrated to BLOB" ); + let insertion_info_col_count: i64 = conn + .connection + .query_row( + "SELECT COUNT(*) FROM pragma_table_info('pending_payloads') WHERE name = 'timestamp'", + [], + |row| row.get(0), + ) + .unwrap(); + assert!( + insertion_info_col_count == 1, + "timestamp column was not added" + ); + let pending_payloads = conn .get_pending_payloads() .expect("Failed to get pending payloads"); assert_eq!(pending_payloads.len(), 1, "Expected one pending payload"); assert_eq!( - pending_payloads[0].2.as_ref(), + pending_payloads[0].request_data.payload_bytes.as_ref(), payload_str.as_bytes(), "Payload contents did not survive migration" ); } + #[test] + fn test_migration_errors_on_inconsistent_schema() { + let dir = tempdir().unwrap(); + let db_path = dir.path().join("test_payload_migration.sqlite"); + + // Simulate the original schema with TEXT payloads, but also with + // a timestamp column. This should not happen. + let conn = Connection::open(&db_path).unwrap(); + conn.execute( + "CREATE TABLE pending_payloads ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + url TEXT NOT NULL, + payload TEXT NOT NULL, + timeout INTEGER NOT NULL, + timestamp INTEGER NOT NULL + )", + [], + ) + .unwrap(); + + let mut conn = EventDispatcherDbConnection::new_without_init(&db_path).unwrap(); + + let result = conn.run_necessary_migrations(); + + match result { + Err(db_error::Corruption) => {} + _ => panic!("inconsistent schema should have caused a corruption error result"), + } + } + #[test] fn test_insert_and_get_pending_payloads() { let dir = tempdir().unwrap(); @@ -271,14 +455,20 @@ mod test { let conn = EventDispatcherDbConnection::new(&db_path).expect("Failed to initialize the database"); - let url = "http://example.com/api"; + let url = "http://example.com/api".to_string(); let payload = json!({"key": "value"}); let timeout = Duration::from_secs(5); + let timestamp_sentinel = UNIX_EPOCH + Duration::from_hours(24 * 20000); let payload_bytes = serde_json::to_vec(&payload).expect("Failed to serialize payload"); + let data = EventRequestData { + url, + payload_bytes: payload_bytes.into(), + timeout, + }; + // Insert payload - let insert_result = conn.insert_payload(url, payload_bytes.as_slice(), timeout); - assert!(insert_result.is_ok(), "Failed to insert payload"); + let id = conn.insert_payload_with_retry(&data, timestamp_sentinel); // Get pending payloads let pending_payloads = conn @@ -286,17 +476,23 @@ mod test { .expect("Failed to get pending payloads"); assert_eq!(pending_payloads.len(), 1, "Expected one pending payload"); - let (_id, retrieved_url, stored_bytes, timeout_ms) = &pending_payloads[0]; - assert_eq!(retrieved_url, url, "URL does not match"); + let PendingPayload { + id: retrieved_id, + timestamp: retrieved_timestamp, + request_data: retrieved_data, + } = &pending_payloads[0]; + + assert_eq!(*retrieved_id, id, "ID does not match"); + assert_eq!(retrieved_data.url, data.url, "URL does not match"); assert_eq!( - stored_bytes.as_ref(), - payload_bytes.as_slice(), + retrieved_data.payload_bytes.as_ref(), + data.payload_bytes.as_ref(), "Serialized payload does not match" ); + assert_eq!(retrieved_data.timeout, timeout, "Timeout does not match"); assert_eq!( - *timeout_ms, - timeout.as_millis() as u64, - "Timeout does not match" + *retrieved_timestamp, timestamp_sentinel, + "Time stamp does not match" ); } @@ -308,13 +504,19 @@ mod test { let conn = EventDispatcherDbConnection::new(&db_path).expect("Failed to initialize the database"); - let url = "http://example.com/api"; + let url = "http://example.com/api".to_string(); let payload = json!({"key": "value"}); let timeout = Duration::from_secs(5); let payload_bytes = serde_json::to_vec(&payload).expect("Failed to serialize payload"); + let data = EventRequestData { + url, + payload_bytes: payload_bytes.into(), + timeout, + }; + // Insert payload - conn.insert_payload(url, payload_bytes.as_slice(), timeout) + conn.insert_payload(&data, SystemTime::now()) .expect("Failed to insert payload"); // Get pending payloads @@ -323,7 +525,7 @@ mod test { .expect("Failed to get pending payloads"); assert_eq!(pending_payloads.len(), 1, "Expected one pending payload"); - let (id, _, _, _) = pending_payloads[0]; + let PendingPayload { id, .. } = pending_payloads[0]; // Delete payload let delete_result = conn.delete_payload(id); @@ -335,4 +537,36 @@ mod test { .expect("Failed to get pending payloads"); assert_eq!(pending_payloads.len(), 0, "Expected no pending payloads"); } + + #[test] + fn test_with_retry_returns_original_result() { + let f = || Result::::Ok(6_7); + let result = with_retry(f, "failed".to_string()); + assert_eq!(result, 67); + } + + #[test] + fn test_with_retry_retries_as_often_as_necessary() { + let call_count = RefCell::new(0); + let f = || { + *call_count.borrow_mut() += 1; + if *call_count.borrow() < 5 { + return Err("keep trying"); + } else { + return Ok("you did it"); + } + }; + let now = Instant::now(); + let result = with_retry(f, "failed".to_string()); + let elapsed_millis = now.elapsed().as_millis(); + assert_eq!(result, "you did it"); + let count = *call_count.borrow(); + assert_eq!( + count, 5, + "inner function was not called the expected number of times" + ); + // We retry 4 times, with delays of 100, 200, 400, and 800 ms, respectively, + // for a total of 1,500. + assert!(1_450 < elapsed_millis && elapsed_millis < 1_550); + } } diff --git a/stacks-node/src/event_dispatcher/tests.rs b/stacks-node/src/event_dispatcher/tests.rs index da24ec8a73e..8ba1f123ab0 100644 --- a/stacks-node/src/event_dispatcher/tests.rs +++ b/stacks-node/src/event_dispatcher/tests.rs @@ -1,5 +1,5 @@ // Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation -// Copyright (C) 2020-2025 Stacks Open Internet Foundation +// Copyright (C) 2020-2026 Stacks Open Internet Foundation // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -15,8 +15,9 @@ // along with this program. If not, see . use std::net::TcpListener; +use std::sync::atomic::{AtomicU32, Ordering}; use std::thread; -use std::time::Instant; +use std::time::{Instant, SystemTime}; use clarity::boot_util::boot_code_id; use clarity::vm::costs::ExecutionCost; @@ -37,9 +38,12 @@ use stacks::chainstate::stacks::{ TransactionPayload, TransactionPostConditionMode, TransactionPublicKeyEncoding, TransactionSpendingCondition, TransactionVersion, }; +use stacks::net::http::HttpRequestContents; +use stacks::net::httpcore::{send_http_request, StacksHttpRequest}; use stacks::types::chainstate::{ BlockHeaderHash, StacksAddress, StacksPrivateKey, StacksPublicKey, }; +use stacks::types::net::PeerHost; use stacks::util::hash::{Hash160, Sha512Trunc256Sum}; use stacks::util::secp256k1::MessageSignature; use stacks_common::bitvec::BitVec; @@ -238,7 +242,7 @@ fn test_process_pending_payloads() { info!("endpoint: {}", endpoint); let timeout = Duration::from_secs(5); - let mut dispatcher = EventDispatcher::new(Some(dir.path().to_path_buf())); + let mut dispatcher = EventDispatcher::new(dir.path().to_path_buf()); dispatcher.register_observer(&EventObserverConfig { endpoint: endpoint.clone(), @@ -261,12 +265,18 @@ fn test_process_pending_payloads() { .with_status(200) .create(); - let url = &format!("{}/api", &server.url()); + let url = format!("{}/api", &server.url()); + + let data = EventRequestData { + url, + payload_bytes: payload_bytes.into(), + timeout, + }; TEST_EVENT_OBSERVER_SKIP_RETRY.set(false); // Insert payload - conn.insert_payload(url, payload_bytes.as_slice(), timeout) + conn.insert_payload(&data, SystemTime::now()) .expect("Failed to insert payload"); // Process pending payloads @@ -290,7 +300,7 @@ fn pending_payloads_are_skipped_if_url_does_not_match() { let mut server = mockito::Server::new(); let endpoint = server.host_with_port(); let timeout = Duration::from_secs(5); - let mut dispatcher = EventDispatcher::new(Some(dir.path().to_path_buf())); + let mut dispatcher = EventDispatcher::new(dir.path().to_path_buf()); dispatcher.register_observer(&EventObserverConfig { endpoint: endpoint.clone(), @@ -318,9 +328,15 @@ fn pending_payloads_are_skipped_if_url_does_not_match() { .create(); // Use a different URL than the observer's endpoint - let url = "http://different-domain.com/api"; + let url = "http://different-domain.com/api".to_string(); + + let data = EventRequestData { + url, + payload_bytes: payload_bytes.into(), + timeout, + }; - conn.insert_payload(url, payload_bytes.as_slice(), timeout) + conn.insert_payload(&data, SystemTime::now()) .expect("Failed to insert payload"); dispatcher.process_pending_payloads(); @@ -343,18 +359,13 @@ fn pending_payloads_are_skipped_if_url_does_not_match() { fn test_new_event_dispatcher_with_db() { let dir = tempdir().unwrap(); let working_dir = dir.path().to_path_buf(); - - let dispatcher = EventDispatcher::new(Some(working_dir.clone())); - let expected_db_path = working_dir.join("event_observers.sqlite"); - assert_eq!(dispatcher.db_path, Some(expected_db_path.clone())); - assert!( - !expected_db_path.exists(), - "Database file was created too soon" - ); + assert!(!expected_db_path.exists(), "Database file already exists"); + + let dispatcher = EventDispatcher::new(working_dir.clone()); - EventDispatcherDbConnection::new(&expected_db_path).expect("Failed to initialize the database"); + assert_eq!(dispatcher.db_path, expected_db_path.clone()); // Verify that the database was initialized assert!(expected_db_path.exists(), "Database file was not created"); @@ -373,13 +384,6 @@ fn test_new_event_observer() { assert_eq!(observer.disable_retries, false); } -#[test] -fn test_new_event_dispatcher_without_db() { - let dispatcher = EventDispatcher::new(None); - - assert!(dispatcher.db_path.is_none(), "Expected db_path to be None"); -} - #[test] #[serial] fn test_send_payload_with_db() { @@ -389,9 +393,7 @@ fn test_send_payload_with_db() { let working_dir = dir.path().to_path_buf(); let payload = json!({"key": "value"}); - let dispatcher = EventDispatcher::new(Some(working_dir.clone())); - let db_path = dispatcher.clone().db_path.clone().unwrap(); - EventDispatcherDbConnection::new(&db_path).expect("Failed to initialize the database"); + let dispatcher = EventDispatcher::new(working_dir.clone()); // Create a mock server let mut server = mockito::Server::new(); @@ -410,13 +412,16 @@ fn test_send_payload_with_db() { TEST_EVENT_OBSERVER_SKIP_RETRY.set(false); // Call send_payload - dispatcher.send_payload(&observer, &payload, "/test", None); + dispatcher + .dispatch_to_observer(&observer, &payload, "/test") + .unwrap() + .wait_until_complete(); // Verify that the payload was sent and database is empty _m.assert(); // Verify that the database is empty - let db_path = dispatcher.db_path.unwrap(); + let db_path = dispatcher.db_path; let db_path_str = db_path.to_str().unwrap(); let conn = Connection::open(db_path_str).expect("Failed to open database"); let pending_payloads = EventDispatcherDbConnection::new_from_exisiting_connection(conn) @@ -425,35 +430,6 @@ fn test_send_payload_with_db() { assert_eq!(pending_payloads.len(), 0, "Expected no pending payloads"); } -#[test] -fn test_send_payload_without_db() { - use mockito::Matcher; - - let timeout = Duration::from_secs(5); - let payload = json!({"key": "value"}); - - // Create a mock server - let mut server = mockito::Server::new(); - let _m = server - .mock("POST", "/test") - .match_header("content-type", Matcher::Regex("application/json.*".into())) - .match_body(Matcher::Json(payload.clone())) - .with_status(200) - .create(); - - let endpoint = server.url().strip_prefix("http://").unwrap().to_string(); - - let observer = EventObserver::new(endpoint, timeout, false); - - let dispatcher = EventDispatcher::new(None); - - // Call send_payload - dispatcher.send_payload(&observer, &payload, "/test", None); - - // Verify that the payload was sent - _m.assert(); -} - #[test] fn test_send_payload_success() { let port = get_random_port(); @@ -480,9 +456,11 @@ fn test_send_payload_success() { let payload = json!({"key": "value"}); - let dispatcher = EventDispatcher::new(None); + let dir = tempdir().unwrap(); + let working_dir = dir.path().to_path_buf(); + let dispatcher = EventDispatcher::new(working_dir); - dispatcher.send_payload(&observer, &payload, "/test", None); + dispatcher.dispatch_to_observer_or_log_error(&observer, &payload, "/test"); // Wait for the server to process the request rx.recv_timeout(Duration::from_secs(5)) @@ -498,7 +476,7 @@ fn test_send_payload_retry() { // Start a mock server in a separate thread let server = Server::http(format!("127.0.0.1:{port}")).unwrap(); - thread::spawn(move || { + let thread = thread::spawn(move || { let mut attempt = 0; while let Ok(request) = server.recv() { attempt += 1; @@ -530,13 +508,17 @@ fn test_send_payload_retry() { let payload = json!({"key": "value"}); - let dispatcher = EventDispatcher::new(None); + let dir = tempdir().unwrap(); + let working_dir = dir.path().to_path_buf(); + let dispatcher = EventDispatcher::new(working_dir); - dispatcher.send_payload(&observer, &payload, "/test", None); + dispatcher.dispatch_to_observer_or_log_error(&observer, &payload, "/test"); // Wait for the server to process the request rx.recv_timeout(Duration::from_secs(5)) .expect("Server did not receive request in time"); + + thread.join().unwrap(); } #[test] @@ -550,7 +532,7 @@ fn test_send_payload_timeout() { // Start a mock server in a separate thread let server = Server::http(format!("127.0.0.1:{port}")).unwrap(); - thread::spawn(move || { + let thread = thread::spawn(move || { let mut attempt = 0; // This exists to only keep request from being dropped #[allow(clippy::collection_is_never_read)] @@ -582,10 +564,15 @@ fn test_send_payload_timeout() { // Record the time before sending the payload let start_time = Instant::now(); - let dispatcher = EventDispatcher::new(None); + let dir = tempdir().unwrap(); + let working_dir = dir.path().to_path_buf(); + let dispatcher = EventDispatcher::new(working_dir); // Call the function being tested - dispatcher.send_payload(&observer, &payload, "/test", None); + dispatcher + .dispatch_to_observer(&observer, &payload, "/test") + .unwrap() + .wait_until_complete(); // Record the time after the function returns let elapsed_time = start_time.elapsed(); @@ -604,6 +591,8 @@ fn test_send_payload_timeout() { // Wait for the server to process the request rx.recv_timeout(Duration::from_secs(5)) .expect("Server did not receive request in time"); + + thread.join().unwrap(); } #[test] @@ -620,7 +609,7 @@ fn test_send_payload_with_db_force_restart() { info!("Starting mock server on port {port}"); // Start a mock server in a separate thread let server = Server::http(format!("127.0.0.1:{port}")).unwrap(); - thread::spawn(move || { + let thread = thread::spawn(move || { let mut attempt = 0; // This exists to only keep request from being dropped #[allow(clippy::collection_is_never_read)] @@ -670,7 +659,7 @@ fn test_send_payload_with_db_force_restart() { } }); - let mut dispatcher = EventDispatcher::new(Some(working_dir.clone())); + let mut dispatcher = EventDispatcher::new(working_dir.clone()); let observer = dispatcher.register_observer_private(&EventObserverConfig { endpoint: format!("127.0.0.1:{port}"), @@ -679,7 +668,7 @@ fn test_send_payload_with_db_force_restart() { disable_retries: false, }); - EventDispatcherDbConnection::new(&dispatcher.clone().db_path.unwrap()).unwrap(); + EventDispatcherDbConnection::new(&dispatcher.clone().db_path).unwrap(); let payload = json!({"key": "value"}); let payload2 = json!({"key": "value2"}); @@ -691,7 +680,10 @@ fn test_send_payload_with_db_force_restart() { info!("Sending payload 1"); // Send the payload - dispatcher.send_payload(&observer, &payload, "/test", None); + dispatcher + .dispatch_to_observer(&observer, &payload, "/test") + .unwrap() + .wait_until_complete(); // Re-enable retrying TEST_EVENT_OBSERVER_SKIP_RETRY.set(false); @@ -701,11 +693,13 @@ fn test_send_payload_with_db_force_restart() { info!("Sending payload 2"); // Send another payload - dispatcher.send_payload(&observer, &payload2, "/test", None); + dispatcher.dispatch_to_observer_or_log_error(&observer, &payload2, "/test"); // Wait for the server to process the requests rx.recv_timeout(Duration::from_secs(5)) .expect("Server did not receive request in time"); + + thread.join().unwrap(); } #[test] @@ -721,10 +715,15 @@ fn test_event_dispatcher_disable_retries() { let observer = EventObserver::new(endpoint, timeout, true); - let dispatcher = EventDispatcher::new(None); + let dir = tempdir().unwrap(); + let working_dir = dir.path().to_path_buf(); + let dispatcher = EventDispatcher::new(working_dir); // in non "disable_retries" mode this will run forever - dispatcher.send_payload(&observer, &payload, "/test", None); + dispatcher + .dispatch_to_observer(&observer, &payload, "/test") + .unwrap() + .wait_until_complete(); // Verify that the payload was sent _m.assert(); @@ -739,10 +738,15 @@ fn test_event_dispatcher_disable_retries_invalid_url() { let observer = EventObserver::new(endpoint, timeout, true); - let dispatcher = EventDispatcher::new(None); + let dir = tempdir().unwrap(); + let working_dir = dir.path().to_path_buf(); + let dispatcher = EventDispatcher::new(working_dir); // in non "disable_retries" mode this will run forever - dispatcher.send_payload(&observer, &payload, "/test", None); + dispatcher + .dispatch_to_observer(&observer, &payload, "/test") + .unwrap() + .wait_until_complete(); } #[test] @@ -752,7 +756,7 @@ fn block_event_with_disable_retries_observer() { let dir = tempdir().unwrap(); let working_dir = dir.path().to_path_buf(); - let mut event_dispatcher = EventDispatcher::new(Some(working_dir.clone())); + let mut event_dispatcher = EventDispatcher::new(working_dir.clone()); let config = EventObserverConfig { endpoint: String::from("255.255.255.255"), events_keys: vec![EventKeyType::MinedBlocks], @@ -952,8 +956,9 @@ fn test_block_proposal_validation_event() { let mock = server.mock("POST", "/proposal_response").create(); let endpoint = server.url().strip_prefix("http://").unwrap().to_string(); + let dir = tempdir().unwrap(); + let mut dispatcher = EventDispatcher::new(dir.path().to_path_buf()); - let mut dispatcher = EventDispatcher::new(None); dispatcher.register_observer(&EventObserverConfig { endpoint: endpoint.clone(), events_keys: vec![EventKeyType::BlockProposal], @@ -985,3 +990,72 @@ fn test_block_proposal_validation_event() { mock.assert(); } + +#[test] +fn test_http_delivery_non_blocking() { + let mut slow_server = mockito::Server::new(); + + let start_count = Arc::new(AtomicU32::new(0)); + let end_count = Arc::new(AtomicU32::new(0)); + + let start_count2 = start_count.clone(); + let end_count2 = end_count.clone(); + + let mock = slow_server + .mock("POST", "/mined_nakamoto_block") + .with_body_from_request(move |_| { + start_count2.fetch_add(1, Ordering::SeqCst); + thread::sleep(Duration::from_secs(2)); + end_count2.fetch_add(1, Ordering::SeqCst); + "".into() + }) + .create(); + + let endpoint = slow_server + .url() + .strip_prefix("http://") + .unwrap() + .to_string(); + + let dir = tempdir().unwrap(); + let mut dispatcher = EventDispatcher::new(dir.path().to_path_buf()); + + dispatcher.register_observer(&EventObserverConfig { + endpoint: endpoint.clone(), + events_keys: vec![EventKeyType::MinedBlocks], + timeout_ms: 3_000, + disable_retries: false, + }); + + let nakamoto_block = NakamotoBlock { + header: NakamotoBlockHeader::empty(), + txs: vec![], + }; + + let start = Instant::now(); + + dispatcher.process_mined_nakamoto_block_event( + 0, + &nakamoto_block, + 0, + &ExecutionCost::max_value(), + vec![], + ); + + assert!( + start.elapsed() < Duration::from_millis(100), + "dispatcher blocked while sending event" + ); + + thread::sleep(Duration::from_secs(1)); + + assert!(start_count.load(Ordering::SeqCst) == 1); + assert!(end_count.load(Ordering::SeqCst) == 0); + + thread::sleep(Duration::from_secs(2)); + + assert!(start_count.load(Ordering::SeqCst) == 1); + assert!(end_count.load(Ordering::SeqCst) == 1); + + mock.assert(); +} diff --git a/stacks-node/src/event_dispatcher/worker.rs b/stacks-node/src/event_dispatcher/worker.rs new file mode 100644 index 00000000000..30e82516b09 --- /dev/null +++ b/stacks-node/src/event_dispatcher/worker.rs @@ -0,0 +1,357 @@ +// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation +// Copyright (C) 2020-2026 Stacks Open Internet Foundation +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use std::path::PathBuf; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender}; +use std::sync::Arc; +use std::thread::{self, sleep}; +use std::time::{Duration, SystemTime}; + +use rand::Rng; +use stacks::net::http::HttpRequestContents; +use stacks::net::httpcore::{send_http_request, StacksHttpRequest}; +use stacks::types::net::PeerHost; +use url::Url; + +use crate::event_dispatcher::db::EventDispatcherDbConnection; +#[cfg(test)] +use crate::event_dispatcher::TEST_EVENT_OBSERVER_SKIP_RETRY; +use crate::event_dispatcher::{EventDispatcherError, EventRequestData}; + +#[allow(dead_code)] // NoOp is only used in test configurations +enum WorkerTask { + Payload { + /// The id of the payload data in the event observer DB. It must exist. + id: i64, + /// If true, the HTTP request is only attempted once. + disable_retries: bool, + /// A value for the HTTP timeout is stored in the DB, but can optionally be overridden. + timeout_override: Option, + }, + NoOp, +} +struct WorkerMessage { + task: WorkerTask, + /// The worker thread will send a message on this channel once it's done with this request. + completion: Sender<()>, +} + +/// The return type of `initiate_send()`. If the caller of that method just wishes to move +/// on, they can happily drop this result. This is the behavior for most event deliveries. +/// +/// On the other hand, if they wish to block until the HTTP request was successfully sent +/// (or, in the case of `disable_retries`, at least attempted), they can call +/// `.wait_until_complete()`. This is what happens during `process_pending_payloads()` at +/// startup. Note that it's possible that other requests are in the queue, so the blocking +/// may take longer than only the handling of this very request. +pub struct EventDispatcherResult { + /// The worker thread will send a one-time message to this channel to notify of completion. + /// Afterwards, it will drop the sender and thus close the channel. + receiver: Receiver<()>, +} + +impl EventDispatcherResult { + pub fn wait_until_complete(self) { + // There is no codepath that would drop the sender without sending the acknowledgenent + // first. And this method consumes `self`, so it can only be called once. + // So if despite all that, `recv()` returns an error, that means the worker thread panicked. + self.receiver + .recv() + .expect("EventDispatcherWorker thread has terminated mid-operation"); + } +} + +/// This worker is responsible for making the actual HTTP requests that ultimately result +/// from dispatching events to observers. It makes those requests on a dedicated separate +/// thread so that e.g. a slow event observer doesn't block a node from continuing its work. +/// +/// Call `EventDispatcherWorker::new()` to create. +/// +/// Call `initiate_send()` with the id of the payload (in the event oberserver DB) to enqueue. +/// +/// Cloning the `EventDispatcherWorker` does *not* create a new thread -- both the original and +/// the clone will share a single queue and worker thread. +/// +/// Once the `EventDispatcherWorker` (including any clones) is dropped, the worker thread will +/// finish any enqueued work and then shut down. +#[derive(Clone)] +pub struct EventDispatcherWorker { + sender: SyncSender, +} + +static NEXT_THREAD_NUM: AtomicU64 = AtomicU64::new(1); + +impl EventDispatcherWorker { + pub fn new(db_path: PathBuf) -> Result { + Self::new_with_custom_queue_size(db_path, 1_000) + } + + pub fn new_with_custom_queue_size( + db_path: PathBuf, + queue_size: usize, + ) -> Result { + let (message_tx, message_rx) = sync_channel(queue_size); + let (ready_tx, ready_rx) = channel(); + + let thread_num = NEXT_THREAD_NUM.fetch_add(1, Ordering::SeqCst); + + thread::Builder::new() + .name(format!("event-dispatcher-{thread_num}").to_string()) + .spawn(move || { + let conn = match EventDispatcherDbConnection::new(&db_path) { + Ok(conn) => conn, + Err(err) => { + error!("Event Dispatcher Worker: Unable to open DB, terminating worker thread: {err}"); + ready_tx.send(Err(err)).unwrap(); + return; + } + }; + + if let Err(err) = ready_tx.send(Ok(())) { + // If the sending fails (i.e. the receiver has been dropped), that means a logic bug + // has been introduced to the code -- at time of writing, the main function is waiting + // for this message a few lines down, outside the thread closure. + // We log this, but we still start the loop. + error!( + "Event Dispatcher Worker: Unable to send ready state. This is a bug. {err}" + ); + } + + // this will run forever until the messaging channel is closed + Self::main_thread_loop(conn, message_rx); + }) + .unwrap(); + + // note double question mark, deals with both the channel RecvError and whatever error + // might be sent across that channel + ready_rx.recv()??; + + Ok(EventDispatcherWorker { sender: message_tx }) + } + + /// Let the worker know that it should send the request that is stored in the DB under the given + /// ID, and delete that DB entry once it's done. + /// + /// A successful result only means that the request was successfully enqueued, not that it was + /// actually made. If you need to wait until the latter has happened, call `wait_until_complete()` + /// on the returned `EventDispatcherResult`. + /// + /// The worker has a limited queue size (1000 by default). If the queue is already full, the + /// call to `initiate_send()` will block until space has become available. + pub fn initiate_send( + &self, + id: i64, + disable_retries: bool, + timeout_override: Option, + ) -> Result { + let (sender, receiver) = channel(); + debug!("Event Dispatcher Worker: sending payload {id}"); + + self.sender.send(WorkerMessage { + task: WorkerTask::Payload { + id, + disable_retries, + timeout_override, + }, + completion: sender, + })?; + + Ok(EventDispatcherResult { receiver }) + } + + #[cfg(test)] + pub fn noop(&self) -> Result { + let (sender, receiver) = channel(); + debug!("Event Dispatcher Worker: sending no-op"); + + self.sender.send(WorkerMessage { + task: WorkerTask::NoOp, + completion: sender, + })?; + + Ok(EventDispatcherResult { receiver }) + } + + fn main_thread_loop(conn: EventDispatcherDbConnection, message_rx: Receiver) { + // main loop of the thread -- get message from channel, grab data from DB, send request, + // delete from DB, acknowledge + loop { + let Ok(WorkerMessage { task, completion }) = message_rx.recv() else { + info!("Event Dispatcher Worker: channel closed, terminating worker thread."); + return; + }; + + thread::sleep(Duration::from_millis(50)); + + let WorkerTask::Payload { + id, + disable_retries, + timeout_override, + } = task + else { + // no-op -- just ack and move on + debug!("Event Dispatcher Worker: doing no-op"); + let _ = completion.send(()); + continue; + }; + + debug!("Event Dispatcher Worker: doing payload {id}"); + + // This will block forever if we were passed a non-existing ID. Don't do that. + let mut payload = conn.get_payload_with_retry(id); + + // Deliberately not handling the error case of `duration_since()` -- if the `timestamp` + // is *after* `now` (which should be extremely rare), the most likely reason is a *slight* + // adjustment to the the system clock (e.g. NTP sync) that happened between storing the + // entity and retrieving it, and that should be fine. + // If there was a *major* adjustment, all bets are off anyway. You shouldn't mess with your + // clock on a server running a node. + if let Ok(age) = SystemTime::now().duration_since(payload.timestamp) { + if age.as_secs() > 5 * 60 { + warn!( + "Event Dispatcher Worker: Event payload transmitting more than 5 minutes after event"; + "age_ms" => age.as_millis(), + "id"=> id + ); + } + } + + if let Some(timeout_override) = timeout_override { + payload.request_data.timeout = timeout_override; + } + + Self::make_http_request_and_delete_from_db( + &payload.request_data, + disable_retries, + id, + &conn, + ); + + // We're ignoring the result of this call -- if the requester has dropped the receiver + // in the meantime, that's fine. That is the usual case of fire-and-forget calls. + let _ = completion.send(()); + } + } + + fn make_http_request_and_delete_from_db( + data: &EventRequestData, + disable_retries: bool, + id: i64, + conn: &EventDispatcherDbConnection, + ) { + let http_result = Self::make_http_request(data, disable_retries); + + if let Err(err) = http_result { + // log but continue + error!("EventDispatcher: dispatching failed"; "url" => data.url.clone(), "error" => ?err); + } + + #[cfg(test)] + if TEST_EVENT_OBSERVER_SKIP_RETRY.get() { + warn!("Fault injection: skipping deletion of payload"); + return; + } + + // We're deleting regardless of result -- if retries are disabled, that means + // we're supposed to forget about it in case of failure. If they're not disabled, + // then we wouldn't be here in case of failue, because `make_http_request` retries + // until it's successful (with the exception of the above fault injection which + // simulates a shutdown). + let deletion_result = conn.delete_payload(id); + + if let Err(e) = deletion_result { + error!( + "Event observer: failed to delete pending payload from database"; + "error" => ?e + ); + } + } + + fn make_http_request( + data: &EventRequestData, + disable_retries: bool, + ) -> Result<(), EventDispatcherError> { + debug!( + "Event dispatcher: Sending payload"; "url" => &data.url, "bytes" => data.payload_bytes.len() + ); + + let url = Url::parse(&data.url) + .unwrap_or_else(|_| panic!("Event dispatcher: unable to parse {} as a URL", data.url)); + + let host = url.host_str().expect("Invalid URL: missing host"); + let port = url.port_or_known_default().unwrap_or(80); + let peerhost: PeerHost = format!("{host}:{port}") + .parse() + .unwrap_or(PeerHost::DNS(host.to_string(), port)); + + let mut backoff = Duration::from_millis(100); + let mut attempts: i32 = 0; + // Cap the backoff at 3x the timeout + let max_backoff = data.timeout.saturating_mul(3); + + loop { + let mut request = StacksHttpRequest::new_for_peer( + peerhost.clone(), + "POST".into(), + url.path().into(), + HttpRequestContents::new().payload_json_bytes(Arc::clone(&data.payload_bytes)), + ) + .unwrap_or_else(|_| panic!("FATAL: failed to encode infallible data as HTTP request")); + request.add_header("Connection".into(), "close".into()); + match send_http_request(host, port, request, data.timeout) { + Ok(response) => { + if response.preamble().status_code == 200 { + debug!( + "Event dispatcher: Successful POST"; "url" => %url + ); + break; + } else { + error!( + "Event dispatcher: Failed POST"; "url" => %url, "response" => ?response.preamble() + ); + } + } + Err(err) => { + warn!( + "Event dispatcher: connection or request failed to {host}:{port} - {err:?}"; + "backoff" => ?backoff, + "attempts" => attempts + ); + if disable_retries { + warn!("Observer is configured in disable_retries mode: skipping retry of payload"); + return Err(err.into()); + } + #[cfg(test)] + if TEST_EVENT_OBSERVER_SKIP_RETRY.get() { + warn!("Fault injection: skipping retry of payload"); + return Err(err.into()); + } + } + } + + sleep(backoff); + let jitter: u64 = rand::thread_rng().gen_range(0..100); + backoff = std::cmp::min( + backoff.saturating_mul(2) + Duration::from_millis(jitter), + max_backoff, + ); + attempts = attempts.saturating_add(1); + } + + Ok(()) + } +} diff --git a/stacks-node/src/main.rs b/stacks-node/src/main.rs index c1b7c9e99ad..6e3da4acafa 100644 --- a/stacks-node/src/main.rs +++ b/stacks-node/src/main.rs @@ -249,6 +249,17 @@ fn cli_get_miner_spend( spend_amount } +/// If the previous session was terminated before all the pending events had been sent, +/// the DB will still contain them. Work through that before doing anything new. +/// Pending events for observers that are no longer registered will be discarded. +fn send_pending_event_payloads(conf: &Config) { + let mut event_dispatcher = EventDispatcher::new(conf.get_working_dir()); + for observer in &conf.events_observers { + event_dispatcher.register_observer(observer); + } + event_dispatcher.process_pending_payloads(); +} + fn main() { panic::set_hook(Box::new(|panic_info| { error!("Process abort due to thread panic: {panic_info}"); @@ -411,6 +422,8 @@ fn main() { debug!("burnchain configuration {:?}", &conf.burnchain); debug!("connection configuration {:?}", &conf.connection_options); + send_pending_event_payloads(&conf); + let num_round: u64 = 0; // Infinite number of rounds if conf.burnchain.mode == "helium" || conf.burnchain.mode == "mocknet" { diff --git a/stacks-node/src/nakamoto_node/miner.rs b/stacks-node/src/nakamoto_node/miner.rs index 1fd7dd4fc78..988bcdb4781 100644 --- a/stacks-node/src/nakamoto_node/miner.rs +++ b/stacks-node/src/nakamoto_node/miner.rs @@ -58,6 +58,8 @@ use stacks_common::types::{PrivateKey, StacksEpochId}; #[cfg(test)] use stacks_common::util::tests::TestFlag; use stacks_common::util::vrf::VRFProof; +#[cfg(test)] +use tempfile::tempdir; use super::miner_db::MinerDB; use super::relayer::{MinerStopHandle, RelayerThread}; @@ -2062,6 +2064,8 @@ fn should_read_count_extend_units() { let (relay_sender, _rcv_2) = std::sync::mpsc::sync_channel(1); let (_coord_rcv, coord_comms) = stacks::chainstate::coordinator::comm::CoordinatorCommunication::instantiate(); + let working_dir = tempdir().unwrap(); + let mut miner = BlockMinerThread { config: Config::default(), globals: Globals::new( @@ -2094,7 +2098,7 @@ fn should_read_count_extend_units() { burn_election_block: BlockSnapshot::empty(), burn_block: BlockSnapshot::empty(), parent_tenure_id: StacksBlockId([0; 32]), - event_dispatcher: EventDispatcher::new(None), + event_dispatcher: EventDispatcher::new(working_dir.path().to_path_buf()), reason: MinerReason::Extended { burn_view_consensus_hash: ConsensusHash([0; 20]), }, diff --git a/stacks-node/src/node.rs b/stacks-node/src/node.rs index 1711dc297a7..5b54f04d4a3 100644 --- a/stacks-node/src/node.rs +++ b/stacks-node/src/node.rs @@ -338,12 +338,11 @@ impl Node { ) .expect("FATAL: failed to initiate mempool"); - let mut event_dispatcher = EventDispatcher::new(Some(config.get_working_dir())); + let mut event_dispatcher = EventDispatcher::new(config.get_working_dir()); for observer in &config.events_observers { event_dispatcher.register_observer(observer); } - event_dispatcher.process_pending_payloads(); let burnchain_config = config.get_burnchain(); diff --git a/stacks-node/src/run_loop/boot_nakamoto.rs b/stacks-node/src/run_loop/boot_nakamoto.rs index e2644f1f5e4..bd73966e013 100644 --- a/stacks-node/src/run_loop/boot_nakamoto.rs +++ b/stacks-node/src/run_loop/boot_nakamoto.rs @@ -83,7 +83,7 @@ impl BootRunLoop { InnerLoops::Epoch2(neon), ) } else { - let naka = NakaRunLoop::new(config.clone(), None, None, None); + let naka = NakaRunLoop::new(config.clone(), None, None, None, None); ( naka.get_coordinator_channel().unwrap(), InnerLoops::Epoch3(naka), @@ -184,6 +184,7 @@ impl BootRunLoop { Some(termination_switch), Some(counters), monitoring_thread, + Some(neon_loop.get_event_dispatcher()), ); let new_coord_channels = naka .get_coordinator_channel() diff --git a/stacks-node/src/run_loop/mod.rs b/stacks-node/src/run_loop/mod.rs index 3fc5195c27c..5a903aec113 100644 --- a/stacks-node/src/run_loop/mod.rs +++ b/stacks-node/src/run_loop/mod.rs @@ -171,7 +171,7 @@ pub struct RegisteredKey { } pub fn announce_boot_receipts( - event_dispatcher: &mut EventDispatcher, + event_dispatcher: &EventDispatcher, chainstate: &StacksChainState, pox_constants: &PoxConstants, boot_receipts: &[StacksTransactionReceipt], diff --git a/stacks-node/src/run_loop/nakamoto.rs b/stacks-node/src/run_loop/nakamoto.rs index 3f07ecc92d7..8a6a89fcb86 100644 --- a/stacks-node/src/run_loop/nakamoto.rs +++ b/stacks-node/src/run_loop/nakamoto.rs @@ -77,11 +77,16 @@ pub struct RunLoop { impl RunLoop { /// Sets up a runloop and node, given a config. + /// + /// If no event_dispatcher is passed, a new one is created. Allowing one to be passed in + /// allows the nakamoto runloop to continue using the same event dispatcher as the + /// neon runloop at the epoch 2->3 transition. pub fn new( config: Config, should_keep_running: Option>, counters: Option, monitoring_thread: Option>>, + event_dispatcher: Option, ) -> Self { let channels = CoordinatorCommunication::instantiate(); let should_keep_running = @@ -91,11 +96,13 @@ impl RunLoop { config.burnchain.burn_fee_cap, ))); - let mut event_dispatcher = EventDispatcher::new(Some(config.get_working_dir())); - for observer in config.events_observers.iter() { - event_dispatcher.register_observer(observer); - } - event_dispatcher.process_pending_payloads(); + let event_dispatcher = event_dispatcher.unwrap_or_else(|| { + let mut event_dispatcher = EventDispatcher::new(config.get_working_dir()); + for observer in config.events_observers.iter() { + event_dispatcher.register_observer(observer); + } + event_dispatcher + }); Self { config, diff --git a/stacks-node/src/run_loop/neon.rs b/stacks-node/src/run_loop/neon.rs index 6ac6f4d9248..3efc8181122 100644 --- a/stacks-node/src/run_loop/neon.rs +++ b/stacks-node/src/run_loop/neon.rs @@ -314,11 +314,10 @@ impl RunLoop { config.burnchain.burn_fee_cap, ))); - let mut event_dispatcher = EventDispatcher::new(Some(config.get_working_dir())); + let mut event_dispatcher = EventDispatcher::new(config.get_working_dir()); for observer in config.events_observers.iter() { event_dispatcher.register_observer(observer); } - event_dispatcher.process_pending_payloads(); Self { config, diff --git a/stacks-node/src/tests/neon_integrations.rs b/stacks-node/src/tests/neon_integrations.rs index cf349fdc30b..3578fb876a4 100644 --- a/stacks-node/src/tests/neon_integrations.rs +++ b/stacks-node/src/tests/neon_integrations.rs @@ -292,7 +292,9 @@ pub mod test_observer { use warp::Filter; use {tokio, warp}; - use crate::event_dispatcher::{MinedBlockEvent, MinedMicroblockEvent, MinedNakamotoBlockEvent}; + use crate::event_dispatcher::{ + self, MinedBlockEvent, MinedMicroblockEvent, MinedNakamotoBlockEvent, + }; use crate::Config; pub const EVENT_OBSERVER_PORT: u16 = 50303; @@ -520,51 +522,70 @@ pub mod test_observer { Ok(warp::http::StatusCode::OK) } + /// Waits until all events that have been queued for dispatch until now + /// have actually been delivered. This ensures the tests have all the data + /// they need to assert on. + fn catch_up() { + event_dispatcher::catch_up_all_event_dispatchers(); + } + pub fn get_stacker_sets() -> Vec<(StacksBlockId, u64, RewardSet)> { + catch_up(); STACKER_SETS.lock().unwrap().clone() } pub fn get_memtxs() -> Vec { + catch_up(); MEMTXS.lock().unwrap().clone() } pub fn get_memtx_drops() -> Vec<(String, String)> { + catch_up(); MEMTXS_DROPPED.lock().unwrap().clone() } pub fn get_blocks() -> Vec { + catch_up(); NEW_BLOCKS.lock().unwrap().clone() } pub fn get_microblocks() -> Vec { + catch_up(); NEW_MICROBLOCKS.lock().unwrap().clone() } pub fn get_burn_blocks() -> Vec { + catch_up(); BURN_BLOCKS.lock().unwrap().clone() } pub fn get_attachments() -> Vec { + catch_up(); ATTACHMENTS.lock().unwrap().clone() } pub fn get_mined_blocks() -> Vec { + catch_up(); MINED_BLOCKS.lock().unwrap().clone() } pub fn get_mined_microblocks() -> Vec { + catch_up(); MINED_MICROBLOCKS.lock().unwrap().clone() } pub fn get_mined_nakamoto_blocks() -> Vec { + catch_up(); MINED_NAKAMOTO_BLOCKS.lock().unwrap().clone() } pub fn get_stackerdb_chunks() -> Vec { + catch_up(); NEW_STACKERDB_CHUNKS.lock().unwrap().clone() } pub fn get_proposal_responses() -> Vec { + catch_up(); PROPOSAL_RESPONSES.lock().unwrap().clone() } @@ -655,6 +676,7 @@ pub mod test_observer { } pub fn clear() { + catch_up(); NEW_BLOCKS.lock().unwrap().clear(); MINED_BLOCKS.lock().unwrap().clear(); MINED_MICROBLOCKS.lock().unwrap().clear(); diff --git a/stacks-node/src/tests/signer/mod.rs b/stacks-node/src/tests/signer/mod.rs index d8db0e960d0..f374322919d 100644 --- a/stacks-node/src/tests/signer/mod.rs +++ b/stacks-node/src/tests/signer/mod.rs @@ -72,6 +72,7 @@ use super::neon_integrations::{ copy_dir_all, get_account, get_sortition_info_ch, submit_tx_fallible, Account, }; use crate::burnchains::bitcoin::core_controller::BitcoinCoreController; +use crate::event_dispatcher::catch_up_all_event_dispatchers; use crate::nakamoto_node::miner::TEST_MINE_SKIP; use crate::neon::Counters; use crate::run_loop::boot_nakamoto; @@ -1140,6 +1141,7 @@ impl SignerTest { TEST_MINE_SKIP.set(true); let mined_blocks = self.running_nodes.counters.naka_mined_blocks.clone(); let mined_before = mined_blocks.get(); + catch_up_all_event_dispatchers(); self.mine_bitcoin_block(); wait_for_state_machine_update_by_miner_tenure_id( timeout.as_secs(), diff --git a/stacks-node/src/tests/signer/v0.rs b/stacks-node/src/tests/signer/v0.rs index c3aeaa886a0..52d23909c56 100644 --- a/stacks-node/src/tests/signer/v0.rs +++ b/stacks-node/src/tests/signer/v0.rs @@ -115,7 +115,9 @@ use tracing_subscriber::{fmt, EnvFilter}; use super::SignerTest; use crate::clarity::vm::clarity::ClarityConnection; -use crate::event_dispatcher::{MinedNakamotoBlockEvent, TEST_SKIP_BLOCK_ANNOUNCEMENT}; +use crate::event_dispatcher::{ + catch_up_all_event_dispatchers, MinedNakamotoBlockEvent, TEST_SKIP_BLOCK_ANNOUNCEMENT, +}; use crate::nakamoto_node::miner::{ fault_injection_stall_miner, fault_injection_unstall_miner, TEST_BLOCK_ANNOUNCE_STALL, TEST_BROADCAST_PROPOSAL_STALL, TEST_MINE_SKIP, TEST_P2P_BROADCAST_STALL, @@ -7879,6 +7881,7 @@ fn mock_sign_epoch_25() { < epoch_3_boundary { let mut mock_block_mesage = None; + catch_up_all_event_dispatchers(); let mock_poll_time = Instant::now(); signer_test .running_nodes @@ -11340,6 +11343,8 @@ fn block_validation_pending_table() { // Set the delay to 0 so that the block validation finishes quickly TEST_VALIDATE_DELAY_DURATION_SECS.set(0); + catch_up_all_event_dispatchers(); + wait_for(30, || { let proposal_responses = test_observer::get_proposal_responses(); let found_proposal = proposal_responses