Skip to content

Commit

Permalink
Run upstream stamp in blocking thread
Browse files Browse the repository at this point in the history
Seems to improve performance.

Also improved error logging.
  • Loading branch information
petertodd committed Feb 18, 2025
1 parent 70ad884 commit e2c76db
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 12 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ tokio = { version = "1", features = ["full"] }
http = { version = "1.2.0", features = [] }
http-body-util = "0.1"
hyper-util = { version = "0.1", features = ["full"] }
reqwest = "0.12.12"
reqwest = { version = "0.12.12", features = ["blocking"] }
rand = "0.9.0"

bitcoin_hashes = "0.16.0"
Expand Down
21 changes: 11 additions & 10 deletions src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,26 +78,26 @@ impl StampRequest {
}
}

pub async fn aggregate_requests(requests: Vec<StampRequest>, upstream_url: Url) {
pub fn aggregate_requests(requests: Vec<StampRequest>, upstream_url: Url) {
let digests: Vec<[u8; 32]> = requests.iter().map(|req| req.digest).collect();

let (ops, tip_digest) = hash_tree(&digests);

let client = reqwest::Client::new();
let client = reqwest::blocking::Client::new();

match (async || -> Result<_, StampRequestError> {
match (|| -> Result<_, StampRequestError> {
let response = client.post(upstream_url)
.body(Vec::from(tip_digest))
.send()
.await?;
.timeout(std::time::Duration::from_secs(2))
.send()?;
if response.status() == StatusCode::OK {
let proof = response.bytes().await?;
log::debug!("got {} bytes of proof from upstream", proof.len());
let proof = response.bytes()?;
log::info!("got {} bytes of proof from upstream", proof.len());
Ok(proof)
} else {
Err(StampRequestError::BadStatus(response.status()))
}
})().await {
})() {
Ok(proof) => {
for (request, ops) in requests.into_iter().zip(ops.into_iter()) {
let stamp = LinearTimestamp {
Expand All @@ -110,7 +110,7 @@ pub async fn aggregate_requests(requests: Vec<StampRequest>, upstream_url: Url)
}
}
Err(err) => {
log::error!("{}", &err);
log::error!("{}", std::error::Report::new(&err).pretty(true));
let err = Arc::new(err);
for request in requests.into_iter() {
let _ = request.reply.send(Err(Arc::clone(&err)));
Expand All @@ -137,7 +137,8 @@ pub async fn aggregator_task(

if requests.len() > 0 {
log::info!("got {} requests", requests.len());
let _ = tokio::spawn(aggregate_requests(requests, upstream_url.clone()));
let upstream_url = upstream_url.clone();
let _ = tokio::task::spawn_blocking(move || aggregate_requests(requests, upstream_url));
}
};

Expand Down
4 changes: 3 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![feature(error_reporter)]

use std::net::SocketAddr;
use std::time::Duration;
use std::num::NonZero;
Expand Down Expand Up @@ -86,7 +88,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
))
.await
{
log::error!("Error serving connection: {:?}", err);
log::error!("Error serving connection: {}", std::error::Report::new(err).pretty(true));
}
});
}
Expand Down

0 comments on commit e2c76db

Please sign in to comment.