From e2c76dbfdfd0b2a688641f8208c22f376e859391 Mon Sep 17 00:00:00 2001 From: Peter Todd Date: Tue, 18 Feb 2025 17:13:12 +0000 Subject: [PATCH] Run upstream stamp in blocking thread Seems to improve performance. Also improved error logging. --- Cargo.lock | 12 ++++++++++++ Cargo.toml | 2 +- src/aggregator.rs | 21 +++++++++++---------- src/main.rs | 4 +++- 4 files changed, 27 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0d58921..b913058 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -361,6 +361,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -369,6 +370,12 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + [[package]] name = "futures-sink" version = "0.3.31" @@ -388,9 +395,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-core", + "futures-io", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -1058,6 +1069,7 @@ dependencies = [ "base64", "bytes", "encoding_rs", + "futures-channel", "futures-core", "futures-util", "h2", diff --git a/Cargo.toml b/Cargo.toml index 0058635..5ab29a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ tokio = { version = "1", features = ["full"] } http = { version = "1.2.0", features = [] } http-body-util = "0.1" hyper-util = { version = "0.1", features = ["full"] } -reqwest = "0.12.12" +reqwest = { version = "0.12.12", features = ["blocking"] } rand = "0.9.0" bitcoin_hashes = "0.16.0" diff --git a/src/aggregator.rs b/src/aggregator.rs index 1934a17..920dcb7 100644 --- a/src/aggregator.rs +++ b/src/aggregator.rs @@ -78,26 +78,26 @@ impl StampRequest { } } -pub async fn aggregate_requests(requests: Vec, upstream_url: Url) { +pub fn aggregate_requests(requests: Vec, upstream_url: Url) { let digests: Vec<[u8; 32]> = requests.iter().map(|req| req.digest).collect(); let (ops, tip_digest) = hash_tree(&digests); - let client = reqwest::Client::new(); + let client = reqwest::blocking::Client::new(); - match (async || -> Result<_, StampRequestError> { + match (|| -> Result<_, StampRequestError> { let response = client.post(upstream_url) .body(Vec::from(tip_digest)) - .send() - .await?; + .timeout(std::time::Duration::from_secs(2)) + .send()?; if response.status() == StatusCode::OK { - let proof = response.bytes().await?; - log::debug!("got {} bytes of proof from upstream", proof.len()); + let proof = response.bytes()?; + log::info!("got {} bytes of proof from upstream", proof.len()); Ok(proof) } else { Err(StampRequestError::BadStatus(response.status())) } - })().await { + })() { Ok(proof) => { for (request, ops) in requests.into_iter().zip(ops.into_iter()) { let stamp = LinearTimestamp { @@ -110,7 +110,7 @@ pub async fn aggregate_requests(requests: Vec, upstream_url: Url) } } Err(err) => { - log::error!("{}", &err); + log::error!("{}", std::error::Report::new(&err).pretty(true)); let err = Arc::new(err); for request in requests.into_iter() { let _ = request.reply.send(Err(Arc::clone(&err))); @@ -137,7 +137,8 @@ pub async fn aggregator_task( if requests.len() > 0 { log::info!("got {} requests", requests.len()); - let _ = tokio::spawn(aggregate_requests(requests, upstream_url.clone())); + let upstream_url = upstream_url.clone(); + let _ = tokio::task::spawn_blocking(move || aggregate_requests(requests, upstream_url)); } }; diff --git a/src/main.rs b/src/main.rs index ddb88fc..21209ed 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,5 @@ +#![feature(error_reporter)] + use std::net::SocketAddr; use std::time::Duration; use std::num::NonZero; @@ -86,7 +88,7 @@ async fn main() -> Result<(), Box> { )) .await { - log::error!("Error serving connection: {:?}", err); + log::error!("Error serving connection: {}", std::error::Report::new(err).pretty(true)); } }); }