Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 51 additions & 39 deletions stacks-node/src/event_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::sync::mpsc::channel;
use std::sync::LazyLock;
use std::sync::{Arc, Mutex};
use std::thread::sleep;
use std::time::Duration;
use std::time::{Duration, SystemTime};

use clarity::vm::costs::ExecutionCost;
use clarity::vm::events::{FTEventType, NFTEventType, STXEventType};
Expand Down Expand Up @@ -76,6 +76,8 @@ pub use payloads::{
};
pub use stacker_db::StackerDBChannel;

use crate::event_dispatcher::db::PendingPayload;

#[cfg(test)]
mod tests;

Expand Down Expand Up @@ -167,6 +169,12 @@ impl EventObserver {
}
}

struct EventRequestData {
pub url: String,
pub payload_bytes: Arc<[u8]>,
pub timeout: Duration,
}

/// Events received from block-processing.
/// Stacks events are structured as JSON, and are grouped by topic. An event observer can
/// subscribe to one or more specific event streams, or the "any" stream to receive all of them.
Expand Down Expand Up @@ -1059,10 +1067,22 @@ impl EventDispatcher {
pending_payloads.len()
);

for (id, url, payload_bytes, _timeout_ms) in pending_payloads {
info!("Event dispatcher: processing pending payload: {url}");
let full_url = Url::parse(url.as_str())
.unwrap_or_else(|_| panic!("Event dispatcher: unable to parse {url} as a URL"));
for PendingPayload {
id,
mut request_data,
..
} in pending_payloads
{
info!(
"Event dispatcher: processing pending payload: {}",
request_data.url
);
let full_url = Url::parse(request_data.url.as_str()).unwrap_or_else(|_| {
panic!(
"Event dispatcher: unable to parse {} as a URL",
request_data.url
)
});
// find the right observer
let observer = self.registered_observers.iter().find(|observer| {
let endpoint_url = Url::parse(format!("http://{}", &observer.endpoint).as_str())
Expand All @@ -1079,7 +1099,7 @@ impl EventDispatcher {
// This observer is no longer registered, skip and delete
info!(
"Event dispatcher: observer {} no longer registered, skipping",
url
request_data.url
);
if let Err(e) = conn.delete_payload(id) {
error!(
Expand All @@ -1090,13 +1110,11 @@ impl EventDispatcher {
continue;
};

self.make_http_request_and_delete_from_db(
&payload_bytes,
full_url.as_str(),
observer.timeout,
observer.disable_retries,
id,
);
// If the timeout configuration for this observer is different from what it was
// originally, the updated config wins.
request_data.timeout = observer.timeout;

self.make_http_request_and_delete_from_db(&request_data, observer.disable_retries, id);
}
}

Expand All @@ -1117,29 +1135,27 @@ impl EventDispatcher {
}
};

let id = self.save_to_db(&full_url, bytes.as_ref(), event_observer.timeout);
let data = EventRequestData {
payload_bytes: bytes,
url: full_url,
timeout: event_observer.timeout,
};

let id = self.save_to_db(&data);

self.make_http_request_and_delete_from_db(
&bytes,
&full_url,
event_observer.timeout,
event_observer.disable_retries,
id,
);
self.make_http_request_and_delete_from_db(&data, event_observer.disable_retries, id);
}

fn make_http_request(
payload_bytes: &Arc<[u8]>,
full_url: &str,
timeout: Duration,
data: &EventRequestData,
disable_retries: bool,
) -> Result<(), EventDispatcherError> {
debug!(
"Event dispatcher: Sending payload"; "url" => %full_url, "bytes" => payload_bytes.len()
"Event dispatcher: Sending payload"; "url" => &data.url, "bytes" => data.payload_bytes.len()
);

let url = Url::parse(full_url)
.unwrap_or_else(|_| panic!("Event dispatcher: unable to parse {full_url} as a URL"));
let url = Url::parse(&data.url)
.unwrap_or_else(|_| panic!("Event dispatcher: unable to parse {} as a URL", data.url));

let host = url.host_str().expect("Invalid URL: missing host");
let port = url.port_or_known_default().unwrap_or(80);
Expand All @@ -1150,18 +1166,18 @@ impl EventDispatcher {
let mut backoff = Duration::from_millis(100);
let mut attempts: i32 = 0;
// Cap the backoff at 3x the timeout
let max_backoff = timeout.saturating_mul(3);
let max_backoff = data.timeout.saturating_mul(3);

loop {
let mut request = StacksHttpRequest::new_for_peer(
peerhost.clone(),
"POST".into(),
url.path().into(),
HttpRequestContents::new().payload_json_bytes(Arc::clone(payload_bytes)),
HttpRequestContents::new().payload_json_bytes(Arc::clone(&data.payload_bytes)),
)
.unwrap_or_else(|_| panic!("FATAL: failed to encode infallible data as HTTP request"));
request.add_header("Connection".into(), "close".into());
match send_http_request(host, port, request, timeout) {
match send_http_request(host, port, request, data.timeout) {
Ok(response) => {
if response.preamble().status_code == 200 {
debug!(
Expand Down Expand Up @@ -1218,32 +1234,28 @@ impl EventDispatcher {
format!("http://{url_str}")
}

fn save_to_db(&self, url: &str, payload_bytes: &[u8], timeout: Duration) -> i64 {
fn save_to_db(&self, data: &EventRequestData) -> i64 {
// Because the DB is initialized in the call to process_pending_payloads() during startup,
// it is *probably* ok to skip initialization here. That said, at the time of writing this is the
// only call to new_without_init(), and we might want to revisit the question whether it's
// really worth it.
let conn = EventDispatcherDbConnection::new_without_init(&self.db_path)
.expect("Failed to open database for event observer");

conn.insert_payload_with_retry(&url, payload_bytes.as_ref(), timeout);
conn.last_insert_rowid()
conn.insert_payload_with_retry(data, SystemTime::now())
}

fn make_http_request_and_delete_from_db(
&self,
payload_bytes: &Arc<[u8]>,
full_url: &str,
timeout: Duration,
data: &EventRequestData,
disable_retries: bool,
id: i64,
) {
let http_result =
Self::make_http_request(payload_bytes, full_url, timeout, disable_retries);
let http_result = Self::make_http_request(data, disable_retries);

if let Err(err) = http_result {
// log but continue
error!("EventDispatcher: dispatching failed"; "url" => &full_url, "error" => ?err);
error!("EventDispatcher: dispatching failed"; "url" => data.url.clone(), "error" => ?err);
}

#[cfg(test)]
Expand Down
Loading