From 2c3dcb5526a427f30b19d84fb092078218c55cfb Mon Sep 17 00:00:00 2001 From: Tumas Date: Thu, 6 Feb 2025 18:36:51 +0200 Subject: [PATCH] Add content negotiation to builder API, use ssz as a default format --- Cargo.lock | 1 + builder_api/Cargo.toml | 1 + builder_api/src/api.rs | 119 ++++++++++++++++++++++++-------------- builder_api/src/config.rs | 4 +- 4 files changed, 79 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 339aa51b..ab7d8196 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1053,6 +1053,7 @@ name = "builder_api" version = "0.0.0" dependencies = [ "anyhow", + "arc-swap", "bls", "clock", "derive_more 1.0.0", diff --git a/builder_api/Cargo.toml b/builder_api/Cargo.toml index 8a710bfe..f09746e8 100644 --- a/builder_api/Cargo.toml +++ b/builder_api/Cargo.toml @@ -7,6 +7,7 @@ authors = ["Grandine "] workspace = true [dependencies] +arc-swap = { workspace = true } anyhow = { workspace = true } bls = { workspace = true } clock = { workspace = true } diff --git a/builder_api/src/api.rs b/builder_api/src/api.rs index 432314dd..03155932 100644 --- a/builder_api/src/api.rs +++ b/builder_api/src/api.rs @@ -2,8 +2,8 @@ use core::time::Duration; use std::sync::Arc; use anyhow::{bail, ensure, Result}; +use arc_swap::ArcSwap; use bls::PublicKeyBytes; -use derive_more::Constructor; use helper_functions::{misc, signing::SignForAllForks}; use http_api_utils::ETH_CONSENSUS_VERSION; use itertools::Itertools as _; @@ -11,10 +11,11 @@ use log::{debug, info}; use mime::{APPLICATION_JSON, APPLICATION_OCTET_STREAM}; use prometheus_metrics::Metrics; use reqwest::{ - header::{ACCEPT, CONTENT_TYPE}, - Client, RequestBuilder, Response, StatusCode, + header::{HeaderValue, ACCEPT, CONTENT_TYPE}, + Client, Response, StatusCode, }; -use ssz::{SszHash as _, SszRead as _, SszWrite as _}; +use serde::de::DeserializeOwned; +use ssz::{SszHash as _, SszRead, SszWrite as _}; use thiserror::Error; use typenum::Unsigned as _; use types::{ @@ -57,6 +58,8 @@ pub enum BuilderApiError { header_root: H256, payload_root: H256, }, + #[error("received response with unsupported content-type: {content_type:?}")] + UnsupportedContentType { content_type: Option }, #[error( "Builder API responded with incorrect version \ (computed: {computed}, response: {in_response})" @@ -64,14 +67,24 @@ pub enum BuilderApiError { VersionMismatch { computed: Phase, in_response: Phase }, } -#[derive(Constructor)] pub struct Api { config: BuilderConfig, client: Client, metrics: Option>, + supports_ssz: ArcSwap>, } impl Api { + #[must_use] + pub fn new(config: BuilderConfig, client: Client, metrics: Option>) -> Self { + Self { + config, + client, + metrics, + supports_ssz: ArcSwap::from_pointee(None), + } + } + #[expect( clippy::unnecessary_min_or_max, reason = "GENESIS_SLOT const might be adjusted independently." @@ -161,11 +174,19 @@ impl Api { debug!("getting execution payload header from {url}"); - let response = self - .request_with_accept_header(self.client.get(url.into_url()).timeout(REQUEST_TIMEOUT)) - .send() - .await?; + let request = self.client.get(url.into_url()).timeout(REQUEST_TIMEOUT); + // See + let request = if self.config.builder_api_format == BuilderApiFormat::Json { + request.header(ACCEPT, APPLICATION_JSON.as_ref()) + } else { + request.header( + ACCEPT, + format!("{APPLICATION_OCTET_STREAM};q=1,{APPLICATION_JSON};q=0.9"), + ) + }; + + let response = request.send().await?; let response = handle_error(response).await?; if response.status() == StatusCode::NO_CONTENT { @@ -173,15 +194,7 @@ impl Api { return Ok(None); } - let builder_bid = match self.config.builder_api_format { - BuilderApiFormat::Json => response.json().await?, - BuilderApiFormat::Ssz => { - let phase = http_api_utils::extract_phase_from_headers(response.headers())?; - let bytes = response.bytes().await?; - - SignedBuilderBid::

::from_ssz(&phase, &bytes)? - } - }; + let builder_bid = self.parse_response::>(response).await?; debug!("get_execution_payload_header response: {builder_bid:?}"); @@ -242,33 +255,30 @@ impl Api { let block_root = block.message().hash_tree_root(); let slot = block.message().slot(); - let request = self.request_with_accept_header( - self.client - .post(url.into_url()) - .timeout(remaining_time) - .header(ETH_CONSENSUS_VERSION, block.phase().as_ref()), - ); + let request = self + .client + .post(url.into_url()) + .timeout(remaining_time) + .header(ETH_CONSENSUS_VERSION, block.phase().as_ref()); - let request = match self.config.builder_api_format { - BuilderApiFormat::Json => request.json(block), - BuilderApiFormat::Ssz => request + let use_json = self.config.builder_api_format == BuilderApiFormat::Json + || self.supports_ssz.load().is_some_and(|supported| !supported); + + let request = if use_json { + request.json(block) + } else { + request + .header(ACCEPT, APPLICATION_OCTET_STREAM.as_ref()) .header(CONTENT_TYPE, APPLICATION_OCTET_STREAM.as_ref()) - .body(block.to_ssz()?), + .body(block.to_ssz()?) }; let response = request.send().await?; let response = handle_error(response).await?; - let response: WithBlobsAndMev, P> = - match self.config.builder_api_format { - BuilderApiFormat::Json => response.json().await?, - BuilderApiFormat::Ssz => { - let phase = http_api_utils::extract_phase_from_headers(response.headers())?; - let bytes = response.bytes().await?; - - ExecutionPayloadAndBlobsBundle::

::from_ssz(&phase, &bytes)? - } - } + let response: WithBlobsAndMev, P> = self + .parse_response::>(response) + .await? .into(); let execution_payload = &response.value; @@ -293,13 +303,34 @@ impl Api { Ok(response) } - fn request_with_accept_header(&self, request_builder: RequestBuilder) -> RequestBuilder { - let accept_header = match self.config.builder_api_format { - BuilderApiFormat::Json => APPLICATION_JSON, - BuilderApiFormat::Ssz => APPLICATION_OCTET_STREAM, - }; + async fn parse_response>( + &self, + response: Response, + ) -> Result { + let content_type = response.headers().get(CONTENT_TYPE); + + if content_type.is_none() + || content_type == Some(&HeaderValue::from_static(APPLICATION_JSON.as_ref())) + { + return response + .json() + .await + .inspect(|_| self.supports_ssz.store(Arc::new(Some(false)))) + .map_err(Into::into); + } + + if content_type == Some(&HeaderValue::from_static(APPLICATION_OCTET_STREAM.as_ref())) { + let phase = http_api_utils::extract_phase_from_headers(response.headers())?; + let bytes = response.bytes().await?; + + return T::from_ssz(&phase, &bytes) + .inspect(|_| self.supports_ssz.store(Arc::new(Some(true)))) + .map_err(Into::into); + } - request_builder.header(ACCEPT, accept_header.as_ref()) + bail!(BuilderApiError::UnsupportedContentType { + content_type: content_type.cloned(), + }) } fn url(&self, path: &str) -> Result { diff --git a/builder_api/src/config.rs b/builder_api/src/config.rs index 8b32fa0f..84a5ae6e 100644 --- a/builder_api/src/config.rs +++ b/builder_api/src/config.rs @@ -4,10 +4,10 @@ use types::redacting_url::RedactingUrl; pub const DEFAULT_BUILDER_MAX_SKIPPED_SLOTS_PER_EPOCH: u64 = 8; pub const DEFAULT_BUILDER_MAX_SKIPPED_SLOTS: u64 = 3; -#[derive(Clone, Debug, Default, Display, FromStr)] +#[derive(Clone, Copy, PartialEq, Eq, Debug, Default, Display, FromStr)] pub enum BuilderApiFormat { - #[default] Json, + #[default] Ssz, }