diff --git a/stacks-node/src/event_dispatcher.rs b/stacks-node/src/event_dispatcher.rs index d043da13d2..94f06815aa 100644 --- a/stacks-node/src/event_dispatcher.rs +++ b/stacks-node/src/event_dispatcher.rs @@ -24,7 +24,7 @@ use std::sync::mpsc::channel; use std::sync::LazyLock; use std::sync::{Arc, Mutex}; use std::thread::sleep; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use clarity::vm::costs::ExecutionCost; use clarity::vm::events::{FTEventType, NFTEventType, STXEventType}; @@ -76,6 +76,8 @@ pub use payloads::{ }; pub use stacker_db::StackerDBChannel; +use crate::event_dispatcher::db::PendingPayload; + #[cfg(test)] mod tests; @@ -167,6 +169,12 @@ impl EventObserver { } } +struct EventRequestData { + pub url: String, + pub payload_bytes: Arc<[u8]>, + pub timeout: Duration, +} + /// Events received from block-processing. /// Stacks events are structured as JSON, and are grouped by topic. An event observer can /// subscribe to one or more specific event streams, or the "any" stream to receive all of them. @@ -1059,10 +1067,22 @@ impl EventDispatcher { pending_payloads.len() ); - for (id, url, payload_bytes, _timeout_ms) in pending_payloads { - info!("Event dispatcher: processing pending payload: {url}"); - let full_url = Url::parse(url.as_str()) - .unwrap_or_else(|_| panic!("Event dispatcher: unable to parse {url} as a URL")); + for PendingPayload { + id, + mut request_data, + .. + } in pending_payloads + { + info!( + "Event dispatcher: processing pending payload: {}", + request_data.url + ); + let full_url = Url::parse(request_data.url.as_str()).unwrap_or_else(|_| { + panic!( + "Event dispatcher: unable to parse {} as a URL", + request_data.url + ) + }); // find the right observer let observer = self.registered_observers.iter().find(|observer| { let endpoint_url = Url::parse(format!("http://{}", &observer.endpoint).as_str()) @@ -1079,7 +1099,7 @@ impl EventDispatcher { // This observer is no longer registered, skip and delete info!( "Event dispatcher: observer {} no longer registered, skipping", - url + request_data.url ); if let Err(e) = conn.delete_payload(id) { error!( @@ -1090,13 +1110,11 @@ impl EventDispatcher { continue; }; - self.make_http_request_and_delete_from_db( - &payload_bytes, - full_url.as_str(), - observer.timeout, - observer.disable_retries, - id, - ); + // If the timeout configuration for this observer is different from what it was + // originally, the updated config wins. + request_data.timeout = observer.timeout; + + self.make_http_request_and_delete_from_db(&request_data, observer.disable_retries, id); } } @@ -1117,29 +1135,27 @@ impl EventDispatcher { } }; - let id = self.save_to_db(&full_url, bytes.as_ref(), event_observer.timeout); + let data = EventRequestData { + payload_bytes: bytes, + url: full_url, + timeout: event_observer.timeout, + }; + + let id = self.save_to_db(&data); - self.make_http_request_and_delete_from_db( - &bytes, - &full_url, - event_observer.timeout, - event_observer.disable_retries, - id, - ); + self.make_http_request_and_delete_from_db(&data, event_observer.disable_retries, id); } fn make_http_request( - payload_bytes: &Arc<[u8]>, - full_url: &str, - timeout: Duration, + data: &EventRequestData, disable_retries: bool, ) -> Result<(), EventDispatcherError> { debug!( - "Event dispatcher: Sending payload"; "url" => %full_url, "bytes" => payload_bytes.len() + "Event dispatcher: Sending payload"; "url" => &data.url, "bytes" => data.payload_bytes.len() ); - let url = Url::parse(full_url) - .unwrap_or_else(|_| panic!("Event dispatcher: unable to parse {full_url} as a URL")); + let url = Url::parse(&data.url) + .unwrap_or_else(|_| panic!("Event dispatcher: unable to parse {} as a URL", data.url)); let host = url.host_str().expect("Invalid URL: missing host"); let port = url.port_or_known_default().unwrap_or(80); @@ -1150,18 +1166,18 @@ impl EventDispatcher { let mut backoff = Duration::from_millis(100); let mut attempts: i32 = 0; // Cap the backoff at 3x the timeout - let max_backoff = timeout.saturating_mul(3); + let max_backoff = data.timeout.saturating_mul(3); loop { let mut request = StacksHttpRequest::new_for_peer( peerhost.clone(), "POST".into(), url.path().into(), - HttpRequestContents::new().payload_json_bytes(Arc::clone(payload_bytes)), + HttpRequestContents::new().payload_json_bytes(Arc::clone(&data.payload_bytes)), ) .unwrap_or_else(|_| panic!("FATAL: failed to encode infallible data as HTTP request")); request.add_header("Connection".into(), "close".into()); - match send_http_request(host, port, request, timeout) { + match send_http_request(host, port, request, data.timeout) { Ok(response) => { if response.preamble().status_code == 200 { debug!( @@ -1218,7 +1234,7 @@ impl EventDispatcher { format!("http://{url_str}") } - fn save_to_db(&self, url: &str, payload_bytes: &[u8], timeout: Duration) -> i64 { + fn save_to_db(&self, data: &EventRequestData) -> i64 { // Because the DB is initialized in the call to process_pending_payloads() during startup, // it is *probably* ok to skip initialization here. That said, at the time of writing this is the // only call to new_without_init(), and we might want to revisit the question whether it's @@ -1226,24 +1242,20 @@ impl EventDispatcher { let conn = EventDispatcherDbConnection::new_without_init(&self.db_path) .expect("Failed to open database for event observer"); - conn.insert_payload_with_retry(&url, payload_bytes.as_ref(), timeout); - conn.last_insert_rowid() + conn.insert_payload_with_retry(data, SystemTime::now()) } fn make_http_request_and_delete_from_db( &self, - payload_bytes: &Arc<[u8]>, - full_url: &str, - timeout: Duration, + data: &EventRequestData, disable_retries: bool, id: i64, ) { - let http_result = - Self::make_http_request(payload_bytes, full_url, timeout, disable_retries); + let http_result = Self::make_http_request(data, disable_retries); if let Err(err) = http_result { // log but continue - error!("EventDispatcher: dispatching failed"; "url" => &full_url, "error" => ?err); + error!("EventDispatcher: dispatching failed"; "url" => data.url.clone(), "error" => ?err); } #[cfg(test)] diff --git a/stacks-node/src/event_dispatcher/db.rs b/stacks-node/src/event_dispatcher/db.rs index 122812592c..6db073f70f 100644 --- a/stacks-node/src/event_dispatcher/db.rs +++ b/stacks-node/src/event_dispatcher/db.rs @@ -17,10 +17,19 @@ use std::path::PathBuf; use std::sync::Arc; use std::thread::sleep; -use std::time::Duration; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use rusqlite::{params, Connection}; -use stacks::util_lib::db::Error as db_error; +use stacks::util_lib::db::{table_exists, Error as db_error}; + +use crate::event_dispatcher::EventRequestData; + +pub struct PendingPayload { + pub request_data: EventRequestData, + #[allow(dead_code)] // will be used in a follow-up commit + pub timestamp: SystemTime, + pub id: i64, +} /// Wraps a SQlite connection to the database in which pending event payloads are stored pub struct EventDispatcherDbConnection { @@ -40,17 +49,15 @@ impl EventDispatcherDbConnection { id INTEGER PRIMARY KEY AUTOINCREMENT, url TEXT NOT NULL, payload BLOB NOT NULL, - timeout INTEGER NOT NULL + timeout INTEGER NOT NULL, + timestamp INTEGER NOT NULL )", [], )?; let mut connection = EventDispatcherDbConnection { connection }; - if let Some(col_type) = connection.get_payload_column_type()? { - if col_type.eq_ignore_ascii_case("TEXT") { - info!("Event observer: migrating pending_payloads.payload from TEXT to BLOB"); - connection.migrate_payload_column_to_blob()?; - } - } + + connection.run_necessary_migrations()?; + Ok(connection) } @@ -59,17 +66,17 @@ impl EventDispatcherDbConnection { EventDispatcherDbConnection { connection } } - /// Insert a payload into the database, retrying on failure. - pub fn insert_payload_with_retry(&self, url: &str, payload_bytes: &[u8], timeout: Duration) { + /// Insert a payload into the database, retrying on failure. Returns the id of of the inserted record. + pub fn insert_payload_with_retry(&self, data: &EventRequestData, timestamp: SystemTime) -> i64 { let mut attempts = 0i64; let mut backoff = Duration::from_millis(100); // Initial backoff duration let max_backoff = Duration::from_secs(5); // Cap the backoff duration loop { - match self.insert_payload(url, payload_bytes, timeout) { - Ok(_) => { + match self.insert_payload(data, timestamp) { + Ok(id) => { // Successful insert, break the loop - return; + return id; } Err(err) => { // Log the error, then retry after a delay @@ -92,38 +99,51 @@ impl EventDispatcherDbConnection { pub fn insert_payload( &self, - url: &str, - payload_bytes: &[u8], - timeout: Duration, - ) -> Result<(), db_error> { - let timeout_ms: u64 = timeout.as_millis().try_into().expect("Timeout too large"); - self.connection.execute( - "INSERT INTO pending_payloads (url, payload, timeout) VALUES (?1, ?2, ?3)", - params![url, payload_bytes, timeout_ms], + data: &EventRequestData, + timestamp: SystemTime, + ) -> Result { + let timeout_ms: u64 = data + .timeout + .as_millis() + .try_into() + .expect("Timeout too large"); + + let timestamp_s = timestamp + .duration_since(UNIX_EPOCH) + .expect("system clock is multiple decades slow") + .as_secs(); + + let id: i64 = self.connection.query_row( + "INSERT INTO pending_payloads (url, payload, timeout, timestamp) VALUES (?1, ?2, ?3, ?4) RETURNING id", + params![data.url, data.payload_bytes, timeout_ms, timestamp_s], + |row| row.get(0), )?; - Ok(()) - } - - // TODO: change this to get the id from the insertion directly, because that's more reliable - pub fn last_insert_rowid(&self) -> i64 { - self.connection.last_insert_rowid() + Ok(id) } - pub fn get_pending_payloads(&self) -> Result, u64)>, db_error> { - let mut stmt = self - .connection - .prepare("SELECT id, url, payload, timeout FROM pending_payloads ORDER BY id")?; - let payload_iter = stmt.query_and_then( - [], - |row| -> Result<(i64, String, Arc<[u8]>, u64), db_error> { - let id: i64 = row.get(0)?; - let url: String = row.get(1)?; - let payload_bytes: Vec = row.get(2)?; - let payload_bytes = Arc::<[u8]>::from(payload_bytes); - let timeout_ms: u64 = row.get(3)?; - Ok((id, url, payload_bytes, timeout_ms)) - }, + pub fn get_pending_payloads(&self) -> Result, db_error> { + let mut stmt = self.connection.prepare( + "SELECT id, url, payload, timeout, timestamp FROM pending_payloads ORDER BY id", )?; + let payload_iter = stmt.query_and_then([], |row| -> Result { + let id: i64 = row.get(0)?; + let url: String = row.get(1)?; + let payload_bytes: Vec = row.get(2)?; + let payload_bytes = Arc::<[u8]>::from(payload_bytes); + let timeout_ms: u64 = row.get(3)?; + let timestamp_s: u64 = row.get(4)?; + let request_data = EventRequestData { + url, + payload_bytes, + timeout: Duration::from_millis(timeout_ms), + }; + + Ok(PendingPayload { + id, + request_data, + timestamp: UNIX_EPOCH + Duration::from_secs(timestamp_s), + }) + })?; payload_iter.collect() } @@ -133,25 +153,54 @@ impl EventDispatcherDbConnection { Ok(()) } - fn get_payload_column_type(&self) -> Result, db_error> { - let mut stmt = self - .connection - .prepare("PRAGMA table_info(pending_payloads)")?; + /// The initial schema of the database when this code was first created + const DB_VERSION_INITIAL_SCHEMA: u32 = 0; + /// The `payload`` column type changed from TEXT to BLOB + const DB_VERSION_PAYLOAD_IS_BLOB: u32 = 1; + /// Column `timestamp` and table `db_config` added + const DB_VERSION_VERSIONING_AND_TIMESTAMP_COLUMN: u32 = 2; - let rows = stmt.query_map([], |row| { - let name: String = row.get(1)?; - let col_type: String = row.get(2)?; - Ok((name, col_type)) - })?; + fn run_necessary_migrations(&mut self) -> Result<(), db_error> { + let current_schema = self.get_schema_version()?; - for row in rows { - let (name, col_type) = row?; - if name == "payload" { - return Ok(Some(col_type)); - } + if current_schema < Self::DB_VERSION_PAYLOAD_IS_BLOB { + info!("Event observer: migrating pending_payloads.payload from TEXT to BLOB"); + self.migrate_payload_column_to_blob()?; } - Ok(None) + if current_schema < Self::DB_VERSION_VERSIONING_AND_TIMESTAMP_COLUMN { + info!("Event observer: adding timestamp to pending_payloads"); + self.add_versioning_and_timestamp_column()?; + } + + Ok(()) + } + + fn get_schema_version(&self) -> Result { + let has_db_config = table_exists(&self.connection, "db_config")?; + + if has_db_config { + let version = + self.connection + .query_row("SELECT MAX(version) FROM db_config", [], |r| { + r.get::<_, u32>(0) + })?; + return Ok(version); + } + + let payload_type = self.connection.query_row( + "SELECT type FROM pragma_table_info('pending_payloads') WHERE name='payload'", + [], + |r| r.get::<_, String>(0), + )?; + + let payload_is_blob = payload_type.eq_ignore_ascii_case("BLOB"); + + if payload_is_blob { + Ok(Self::DB_VERSION_PAYLOAD_IS_BLOB) + } else { + Ok(Self::DB_VERSION_INITIAL_SCHEMA) + } } fn migrate_payload_column_to_blob(&mut self) -> Result<(), db_error> { @@ -178,6 +227,44 @@ impl EventDispatcherDbConnection { tx.commit()?; Ok(()) } + + fn add_versioning_and_timestamp_column(&mut self) -> Result<(), db_error> { + let tx = self.connection.transaction()?; + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("time travel to pre-1970 is not supported") + .as_secs(); + + tx.execute( + "ALTER TABLE pending_payloads RENAME TO pending_payloads_old", + [], + )?; + tx.execute( + "CREATE TABLE pending_payloads ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + url TEXT NOT NULL, + payload BLOB NOT NULL, + timeout INTEGER NOT NULL, + timestamp INTEGER NOT NULL + )", + [], + )?; + tx.execute( + "INSERT INTO pending_payloads (id, url, payload, timeout, timestamp) + SELECT id, url, CAST(payload AS BLOB), timeout, ?1 FROM pending_payloads_old", + [now], + )?; + tx.execute("DROP TABLE pending_payloads_old", [])?; + + tx.execute("CREATE TABLE db_config (version INTEGER)", [])?; + tx.execute( + "INSERT INTO db_config (version) VALUES (?1)", + params![Self::DB_VERSION_VERSIONING_AND_TIMESTAMP_COLUMN], + )?; + + tx.commit()?; + Ok(()) + } } #[cfg(test)] @@ -212,7 +299,7 @@ mod test { } #[test] - fn test_migrate_payload_column_to_blob() { + fn test_migration() { let dir = tempdir().unwrap(); let db_path = dir.path().join("test_payload_migration.sqlite"); @@ -252,12 +339,35 @@ mod test { "Payload column was not migrated to BLOB" ); + let insertion_info_col_count: i64 = conn + .connection + .query_row( + "SELECT COUNT(*) FROM pragma_table_info('pending_payloads') WHERE name = 'timestamp'", + [], + |row| row.get(0), + ) + .unwrap(); + assert!( + insertion_info_col_count == 1, + "timestamp column was not added" + ); + + let version: u32 = conn + .connection + .query_row("SELECT MAX(version) FROM db_config", [], |r| r.get(0)) + .expect("db_config was not added"); + assert_eq!( + version, + EventDispatcherDbConnection::DB_VERSION_VERSIONING_AND_TIMESTAMP_COLUMN, + "Unexpected version number. Did you add a migration? Update this test." + ); + let pending_payloads = conn .get_pending_payloads() .expect("Failed to get pending payloads"); assert_eq!(pending_payloads.len(), 1, "Expected one pending payload"); assert_eq!( - pending_payloads[0].2.as_ref(), + pending_payloads[0].request_data.payload_bytes.as_ref(), payload_str.as_bytes(), "Payload contents did not survive migration" ); @@ -271,14 +381,20 @@ mod test { let conn = EventDispatcherDbConnection::new(&db_path).expect("Failed to initialize the database"); - let url = "http://example.com/api"; + let url = "http://example.com/api".to_string(); let payload = json!({"key": "value"}); let timeout = Duration::from_secs(5); + let timestamp_sentinel = UNIX_EPOCH + Duration::from_hours(24 * 20000); let payload_bytes = serde_json::to_vec(&payload).expect("Failed to serialize payload"); + let data = EventRequestData { + url, + payload_bytes: payload_bytes.into(), + timeout, + }; + // Insert payload - let insert_result = conn.insert_payload(url, payload_bytes.as_slice(), timeout); - assert!(insert_result.is_ok(), "Failed to insert payload"); + let id = conn.insert_payload_with_retry(&data, timestamp_sentinel); // Get pending payloads let pending_payloads = conn @@ -286,17 +402,23 @@ mod test { .expect("Failed to get pending payloads"); assert_eq!(pending_payloads.len(), 1, "Expected one pending payload"); - let (_id, retrieved_url, stored_bytes, timeout_ms) = &pending_payloads[0]; - assert_eq!(retrieved_url, url, "URL does not match"); + let PendingPayload { + id: retrieved_id, + timestamp: retrieved_timestamp, + request_data: retrieved_data, + } = &pending_payloads[0]; + + assert_eq!(*retrieved_id, id, "ID does not match"); + assert_eq!(retrieved_data.url, data.url, "URL does not match"); assert_eq!( - stored_bytes.as_ref(), - payload_bytes.as_slice(), + retrieved_data.payload_bytes.as_ref(), + data.payload_bytes.as_ref(), "Serialized payload does not match" ); + assert_eq!(retrieved_data.timeout, timeout, "Timeout does not match"); assert_eq!( - *timeout_ms, - timeout.as_millis() as u64, - "Timeout does not match" + *retrieved_timestamp, timestamp_sentinel, + "Time stamp does not match" ); } @@ -308,13 +430,19 @@ mod test { let conn = EventDispatcherDbConnection::new(&db_path).expect("Failed to initialize the database"); - let url = "http://example.com/api"; + let url = "http://example.com/api".to_string(); let payload = json!({"key": "value"}); let timeout = Duration::from_secs(5); let payload_bytes = serde_json::to_vec(&payload).expect("Failed to serialize payload"); + let data = EventRequestData { + url, + payload_bytes: payload_bytes.into(), + timeout, + }; + // Insert payload - conn.insert_payload(url, payload_bytes.as_slice(), timeout) + conn.insert_payload(&data, SystemTime::now()) .expect("Failed to insert payload"); // Get pending payloads @@ -323,7 +451,7 @@ mod test { .expect("Failed to get pending payloads"); assert_eq!(pending_payloads.len(), 1, "Expected one pending payload"); - let (id, _, _, _) = pending_payloads[0]; + let PendingPayload { id, .. } = pending_payloads[0]; // Delete payload let delete_result = conn.delete_payload(id); diff --git a/stacks-node/src/event_dispatcher/tests.rs b/stacks-node/src/event_dispatcher/tests.rs index 7908ca364f..52f0306291 100644 --- a/stacks-node/src/event_dispatcher/tests.rs +++ b/stacks-node/src/event_dispatcher/tests.rs @@ -16,7 +16,7 @@ use std::net::TcpListener; use std::thread; -use std::time::Instant; +use std::time::{Instant, SystemTime}; use clarity::boot_util::boot_code_id; use clarity::vm::costs::ExecutionCost; @@ -261,12 +261,18 @@ fn test_process_pending_payloads() { .with_status(200) .create(); - let url = &format!("{}/api", &server.url()); + let url = format!("{}/api", &server.url()); + + let data = EventRequestData { + url, + payload_bytes: payload_bytes.into(), + timeout, + }; TEST_EVENT_OBSERVER_SKIP_RETRY.set(false); // Insert payload - conn.insert_payload(url, payload_bytes.as_slice(), timeout) + conn.insert_payload(&data, SystemTime::now()) .expect("Failed to insert payload"); // Process pending payloads @@ -318,9 +324,15 @@ fn pending_payloads_are_skipped_if_url_does_not_match() { .create(); // Use a different URL than the observer's endpoint - let url = "http://different-domain.com/api"; + let url = "http://different-domain.com/api".to_string(); + + let data = EventRequestData { + url, + payload_bytes: payload_bytes.into(), + timeout, + }; - conn.insert_payload(url, payload_bytes.as_slice(), timeout) + conn.insert_payload(&data, SystemTime::now()) .expect("Failed to insert payload"); dispatcher.process_pending_payloads();