diff --git a/apps/fortuna/Cargo.toml b/apps/fortuna/Cargo.toml index ce6431237e..81809c0361 100644 --- a/apps/fortuna/Cargo.toml +++ b/apps/fortuna/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fortuna" -version = "7.4.10" +version = "7.4.11" edition = "2021" [lib] diff --git a/apps/fortuna/src/chain/ethereum.rs b/apps/fortuna/src/chain/ethereum.rs index 939f8f7cc8..a60ded2fb6 100644 --- a/apps/fortuna/src/chain/ethereum.rs +++ b/apps/fortuna/src/chain/ethereum.rs @@ -7,9 +7,11 @@ use { config::EthereumConfig, eth_utils::{ eth_gas_oracle::EthProviderOracle, + failover_middleware::FailoverMiddleware, legacy_tx_middleware::LegacyTxMiddleware, nonce_manager::NonceManagerMiddleware, traced_client::{RpcMetrics, TracedClient}, + utils::create_failover_provider, }, }, anyhow::{anyhow, Error, Result}, @@ -155,27 +157,41 @@ impl SignablePythContractInner { } impl SignablePythContract { - pub async fn from_config(chain_config: &EthereumConfig, private_key: &str) -> Result { - let provider = Provider::::try_from(&chain_config.geth_rpc_addr)?; - Self::from_config_and_provider(chain_config, private_key, provider).await + pub async fn from_config_with_key(chain_config: &EthereumConfig, private_key: &str) -> Result { + if !chain_config.geth_rpc_addrs.is_empty() { + let provider = create_failover_provider(&chain_config.geth_rpc_addrs)?; + Self::from_config_and_provider(chain_config, private_key, provider).await + } else { + let provider = Provider::::try_from(chain_config.geth_rpc_addr.as_str())?; + Self::from_config_and_provider(chain_config, private_key, provider).await + } } } impl InstrumentedSignablePythContract { - pub async fn from_config( + pub async fn from_config_with_metrics( chain_config: &EthereumConfig, private_key: &str, chain_id: ChainId, metrics: Arc, ) -> Result { - let provider = TracedClient::new(chain_id, &chain_config.geth_rpc_addr, metrics)?; + let rpc_addr = if !chain_config.geth_rpc_addrs.is_empty() { + &chain_config.geth_rpc_addrs[0] + } else { + &chain_config.geth_rpc_addr + }; + let provider = TracedClient::new(chain_id, rpc_addr, metrics)?; Self::from_config_and_provider(chain_config, private_key, provider).await } } impl PythContract { - pub fn from_config(chain_config: &EthereumConfig) -> Result { - let provider = Provider::::try_from(&chain_config.geth_rpc_addr)?; + pub fn from_config_basic(chain_config: &EthereumConfig) -> Result { + let provider = if !chain_config.geth_rpc_addrs.is_empty() { + create_failover_provider(&chain_config.geth_rpc_addrs)? + } else { + Provider::::try_from(chain_config.geth_rpc_addr.as_str())? + }; Ok(PythRandom::new( chain_config.contract_addr, @@ -185,12 +201,17 @@ impl PythContract { } impl InstrumentedPythContract { - pub fn from_config( + pub fn from_config_with_chain_metrics( chain_config: &EthereumConfig, chain_id: ChainId, metrics: Arc, ) -> Result { - let provider = TracedClient::new(chain_id, &chain_config.geth_rpc_addr, metrics)?; + let rpc_addr = if !chain_config.geth_rpc_addrs.is_empty() { + &chain_config.geth_rpc_addrs[0] + } else { + &chain_config.geth_rpc_addr + }; + let provider = TracedClient::new(chain_id, rpc_addr, metrics)?; Ok(PythRandom::new( chain_config.contract_addr, diff --git a/apps/fortuna/src/command/generate.rs b/apps/fortuna/src/command/generate.rs index a1216d0b14..38b222f650 100644 --- a/apps/fortuna/src/command/generate.rs +++ b/apps/fortuna/src/command/generate.rs @@ -11,8 +11,8 @@ use { /// Run the entire random number generation protocol to produce a random number. pub async fn generate(opts: &GenerateOptions) -> Result<()> { - let contract = Arc::new( - SignablePythContract::from_config( + let contract: Arc = Arc::new( + SignablePythContract::from_config_with_key( &Config::load(&opts.config.config)?.get_chain_config(&opts.chain_id)?, &opts.private_key, ) diff --git a/apps/fortuna/src/command/get_request.rs b/apps/fortuna/src/command/get_request.rs index 02741b42ff..fe6abe9475 100644 --- a/apps/fortuna/src/command/get_request.rs +++ b/apps/fortuna/src/command/get_request.rs @@ -10,7 +10,7 @@ use { /// Get the on-chain request metadata for a provider and sequence number. pub async fn get_request(opts: &GetRequestOptions) -> Result<()> { // Initialize a Provider to interface with the EVM contract. - let contract = Arc::new(PythContract::from_config( + let contract: Arc = Arc::new(PythContract::from_config_basic( &Config::load(&opts.config.config)?.get_chain_config(&opts.chain_id)?, )?); diff --git a/apps/fortuna/src/command/inspect.rs b/apps/fortuna/src/command/inspect.rs index 454c143186..efa865e33f 100644 --- a/apps/fortuna/src/command/inspect.rs +++ b/apps/fortuna/src/command/inspect.rs @@ -33,7 +33,11 @@ async fn inspect_chain( num_requests: u64, multicall_batch_size: u64, ) -> Result<()> { - let rpc_provider = Provider::::try_from(&chain_config.geth_rpc_addr)?; + let rpc_provider = if !chain_config.geth_rpc_addrs.is_empty() { + crate::eth_utils::utils::create_failover_provider(&chain_config.geth_rpc_addrs)? + } else { + Provider::::try_from(chain_config.geth_rpc_addr.as_str())? + }; let multicall_exists = rpc_provider .get_code(ethers::contract::MULTICALL_ADDRESS, None) .await @@ -41,7 +45,7 @@ async fn inspect_chain( .len() > 0; - let contract = PythContract::from_config(chain_config)?; + let contract = PythContract::from_config_basic(chain_config)?; let entropy_provider = contract.get_default_provider().call().await?; let provider_info = contract.get_provider_info(entropy_provider).call().await?; let mut current_request_number = provider_info.sequence_number; diff --git a/apps/fortuna/src/command/register_provider.rs b/apps/fortuna/src/command/register_provider.rs index 1833003434..3b30ab59ec 100644 --- a/apps/fortuna/src/command/register_provider.rs +++ b/apps/fortuna/src/command/register_provider.rs @@ -41,8 +41,8 @@ pub async fn register_provider_from_config( ))?; // Initialize a Provider to interface with the EVM contract. - let contract = - Arc::new(SignablePythContract::from_config(chain_config, &private_key_string).await?); + let contract: Arc = + Arc::new(SignablePythContract::from_config_with_key(chain_config, &private_key_string).await?); // Create a new random hash chain. let random = rand::random::<[u8; 32]>(); let secret = provider_config diff --git a/apps/fortuna/src/command/request_randomness.rs b/apps/fortuna/src/command/request_randomness.rs index e69bf4bdfc..7a6a31e7ff 100644 --- a/apps/fortuna/src/command/request_randomness.rs +++ b/apps/fortuna/src/command/request_randomness.rs @@ -8,8 +8,8 @@ use { }; pub async fn request_randomness(opts: &RequestRandomnessOptions) -> Result<()> { - let contract = Arc::new( - SignablePythContract::from_config( + let contract: Arc = Arc::new( + SignablePythContract::from_config_with_key( &Config::load(&opts.config.config)?.get_chain_config(&opts.chain_id)?, &opts.private_key, ) diff --git a/apps/fortuna/src/command/run.rs b/apps/fortuna/src/command/run.rs index 128447b6b0..de75fbc91b 100644 --- a/apps/fortuna/src/command/run.rs +++ b/apps/fortuna/src/command/run.rs @@ -134,7 +134,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> { ))?; let (tx_exit, rx_exit) = watch::channel(false); let metrics_registry = Arc::new(RwLock::new(Registry::default())); - let rpc_metrics = Arc::new(RpcMetrics::new(metrics_registry.clone()).await); + let rpc_metrics: Arc = Arc::new(RpcMetrics::new(metrics_registry.clone()).await); let mut tasks = Vec::new(); for (chain_id, chain_config) in config.chains.clone() { @@ -217,7 +217,7 @@ async fn setup_chain_state( chain_config: &EthereumConfig, rpc_metrics: Arc, ) -> Result { - let contract = Arc::new(InstrumentedPythContract::from_config( + let contract = Arc::new(InstrumentedPythContract::from_config_with_chain_metrics( chain_config, chain_id.clone(), rpc_metrics, @@ -316,14 +316,18 @@ pub async fn check_block_timestamp_lag( metrics: Family, rpc_metrics: Arc, ) { - let provider = - match TracedClient::new(chain_id.clone(), &chain_config.geth_rpc_addr, rpc_metrics) { - Ok(r) => r, - Err(e) => { - tracing::error!("Failed to create provider for chain id - {:?}", e); - return; - } - }; + let rpc_addr = if !chain_config.geth_rpc_addrs.is_empty() { + &chain_config.geth_rpc_addrs[0] + } else { + &chain_config.geth_rpc_addr + }; + let provider = match TracedClient::new(chain_id.clone(), rpc_addr, rpc_metrics) { + Ok(r) => r, + Err(e) => { + tracing::error!("Failed to create provider for chain id - {:?}", e); + return; + } + }; const INF_LAG: i64 = 1000000; // value that definitely triggers an alert let lag = match provider.get_block(BlockNumber::Latest).await { diff --git a/apps/fortuna/src/command/setup_provider.rs b/apps/fortuna/src/command/setup_provider.rs index 3c587b0d14..04e046bce5 100644 --- a/apps/fortuna/src/command/setup_provider.rs +++ b/apps/fortuna/src/command/setup_provider.rs @@ -73,7 +73,7 @@ async fn setup_chain_provider( ))?; let provider_address = private_key.clone().parse::()?.address(); // Initialize a Provider to interface with the EVM contract. - let contract = Arc::new(SignablePythContract::from_config(chain_config, &private_key).await?); + let contract: Arc = Arc::new(SignablePythContract::from_config_with_key(chain_config, &private_key).await?); tracing::info!("Fetching provider info"); let provider_info = contract.get_provider_info(provider_address).call().await?; diff --git a/apps/fortuna/src/command/withdraw_fees.rs b/apps/fortuna/src/command/withdraw_fees.rs index 8f701823a9..ed30d29860 100644 --- a/apps/fortuna/src/command/withdraw_fees.rs +++ b/apps/fortuna/src/command/withdraw_fees.rs @@ -22,7 +22,7 @@ pub async fn withdraw_fees(opts: &WithdrawFeesOptions) -> Result<()> { Some(chain_id) => { let chain_config = &config.get_chain_config(&chain_id)?; let contract = - SignablePythContract::from_config(chain_config, &private_key_string).await?; + SignablePythContract::from_config_with_key(chain_config, &private_key_string).await?; withdraw_fees_for_chain( contract, @@ -36,7 +36,7 @@ pub async fn withdraw_fees(opts: &WithdrawFeesOptions) -> Result<()> { for (chain_id, chain_config) in config.chains.iter() { tracing::info!("Withdrawing fees for chain: {}", chain_id); let contract = - SignablePythContract::from_config(chain_config, &private_key_string).await?; + SignablePythContract::from_config_with_key(chain_config, &private_key_string).await?; withdraw_fees_for_chain( contract, @@ -78,7 +78,7 @@ pub async fn withdraw_fees_for_chain( match &tx_result { Some(receipt) => { - tracing::info!("Withdrawal transaction hash {:?}", receipt.transaction_hash); + tracing::info!("Withdrawal transaction hash {:?}", receipt.transaction_hash()); } None => { tracing::warn!("No transaction receipt. Unclear what happened to the transaction"); diff --git a/apps/fortuna/src/config.rs b/apps/fortuna/src/config.rs index 70014ddcb7..8886f24504 100644 --- a/apps/fortuna/src/config.rs +++ b/apps/fortuna/src/config.rs @@ -107,8 +107,11 @@ impl Config { #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct EthereumConfig { - /// URL of a Geth RPC endpoint to use for interacting with the blockchain. - /// TODO: Change type from String to Url + #[serde(default = "default_geth_rpc_addrs")] + pub geth_rpc_addrs: Vec, + + #[serde(skip_serializing)] + #[deprecated(note = "Use geth_rpc_addrs instead")] pub geth_rpc_addr: String, /// URL of a Geth RPC wss endpoint to use for subscribing to blockchain events. @@ -202,6 +205,10 @@ fn default_backlog_range() -> u64 { 1000 } +fn default_geth_rpc_addrs() -> Vec { + Vec::new() +} + #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct EscalationPolicyConfig { // The keeper will perform the callback as long as the tx is within this percentage of the configured gas limit. diff --git a/apps/fortuna/src/eth_utils/failover_middleware.rs b/apps/fortuna/src/eth_utils/failover_middleware.rs new file mode 100644 index 0000000000..cb7b8b9661 --- /dev/null +++ b/apps/fortuna/src/eth_utils/failover_middleware.rs @@ -0,0 +1,132 @@ +use { + anyhow::Result, + axum::async_trait, + ethers::{ + middleware::{Middleware, MiddlewareError}, + prelude::{BlockId, PendingTransaction}, + providers::JsonRpcClient, + types::{transaction::eip2718::TypedTransaction, BlockNumber, Filter, Log, U256}, + }, + thiserror::Error, + tracing, +}; + +#[derive(Clone, Debug)] +pub struct FailoverMiddleware { + middlewares: Vec, + current_idx: usize, +} + +impl FailoverMiddleware { + pub fn new(middlewares: Vec) -> Self { + if middlewares.is_empty() { + panic!("FailoverMiddleware requires at least one middleware"); + } + + Self { + middlewares, + current_idx: 0, + } + } + + fn current(&self) -> &M { + &self.middlewares[self.current_idx] + } + + async fn with_failover(&self, operation: F) -> Result> + where + F: Fn(&M) -> Fut + Clone, + Fut: std::future::Future::Error>>, + M: Middleware, + { + let mut last_error = None; + + for (idx, middleware) in self.middlewares.iter().enumerate() { + match operation(middleware).await { + Ok(result) => { + if idx > self.current_idx { + tracing::info!( + "Successfully used fallback RPC endpoint {} after primary endpoint failure", + idx + ); + } + return Ok(result); + } + Err(err) => { + tracing::warn!( + "RPC endpoint {} failed with error: {:?}. Trying next endpoint if available.", + idx, + err + ); + last_error = Some(FailoverMiddlewareError::MiddlewareError(err)); + } + } + } + + Err(last_error.unwrap_or_else(|| { + FailoverMiddlewareError::NoMiddlewares + })) + } +} + +#[derive(Error, Debug)] +pub enum FailoverMiddlewareError { + #[error("{0}")] + MiddlewareError(M::Error), + + #[error("No middlewares available")] + NoMiddlewares, +} + +impl MiddlewareError for FailoverMiddlewareError { + type Inner = M::Error; + + fn from_err(src: M::Error) -> Self { + FailoverMiddlewareError::MiddlewareError(src) + } + + fn as_inner(&self) -> Option<&Self::Inner> { + match self { + FailoverMiddlewareError::MiddlewareError(e) => Some(e), + _ => None, + } + } +} + +#[async_trait] +impl Middleware for FailoverMiddleware { + type Error = FailoverMiddlewareError; + type Provider = M::Provider; + type Inner = M; + + fn inner(&self) -> &M { + self.current() + } + + + async fn send_transaction + Send + Sync>( + &self, + tx: T, + block: Option, + ) -> Result, Self::Error> { + let tx = tx.into(); + self.with_failover(|middleware| middleware.send_transaction(tx.clone(), block)) + .await + } + + async fn get_block_number(&self) -> Result { + self.with_failover(|middleware| middleware.get_block_number()).await + } + + async fn get_logs(&self, filter: &Filter) -> Result, Self::Error> { + self.with_failover(|middleware| middleware.get_logs(filter)).await + } + + async fn fill_transaction( + &self, + tx: &mut TypedTransaction, + block: Option, + ) -> Result<(), Self::Error> { + self.with_failover(|middleware| middleware.fill_transaction(tx, block)).await + } +} diff --git a/apps/fortuna/src/eth_utils/mod.rs b/apps/fortuna/src/eth_utils/mod.rs new file mode 100644 index 0000000000..71035371ef --- /dev/null +++ b/apps/fortuna/src/eth_utils/mod.rs @@ -0,0 +1,6 @@ +pub mod eth_gas_oracle; +pub mod failover_middleware; +pub mod legacy_tx_middleware; +pub mod nonce_manager; +pub mod traced_client; +pub mod utils; diff --git a/apps/fortuna/src/eth_utils/utils.rs b/apps/fortuna/src/eth_utils/utils.rs index 7627cf701e..8de3a35e18 100644 --- a/apps/fortuna/src/eth_utils/utils.rs +++ b/apps/fortuna/src/eth_utils/utils.rs @@ -6,9 +6,13 @@ use { ethers::{ contract::ContractCall, middleware::Middleware, + providers::{Http, Provider}, types::{TransactionReceipt, U256}, }, - std::sync::{atomic::AtomicU64, Arc}, + std::{ + str::FromStr, + sync::{atomic::AtomicU64, Arc}, + }, tokio::time::{timeout, Duration}, tracing, }; @@ -325,3 +329,24 @@ pub async fn submit_tx( Ok(receipt) } + +pub fn create_failover_provider(rpc_addrs: &[String]) -> Result> { + if rpc_addrs.is_empty() { + return Err(anyhow!("No RPC addresses provided")); + } + + if rpc_addrs.len() == 1 { + return Provider::::try_from(&rpc_addrs[0]) + .map_err(|e| anyhow!("Failed to create provider from {}: {:?}", rpc_addrs[0], e)); + } + + let providers = rpc_addrs + .iter() + .map(|addr| { + Http::from_str(addr) + .map_err(|e| anyhow!("Failed to create HTTP provider from {}: {:?}", addr, e)) + }) + .collect::>>()?; + + Ok(Provider::new(providers[0].clone())) +} diff --git a/apps/fortuna/src/keeper.rs b/apps/fortuna/src/keeper.rs index 3ade50b0ed..db30a7d627 100644 --- a/apps/fortuna/src/keeper.rs +++ b/apps/fortuna/src/keeper.rs @@ -65,7 +65,7 @@ pub async fn run_keeper_threads( tracing::info!("latest safe block: {}", &latest_safe_block); let contract = Arc::new( - InstrumentedSignablePythContract::from_config( + InstrumentedSignablePythContract::from_config_with_metrics( &chain_eth_config, &private_key, chain_state.id.clone(), @@ -173,7 +173,7 @@ pub async fn run_keeper_threads( let chain_config = chain_eth_config.clone(); let provider_address = chain_state.provider_address; let keeper_metrics = metrics.clone(); - let contract = match InstrumentedPythContract::from_config( + let contract = match InstrumentedPythContract::from_config_with_metrics( &chain_config, chain_id.clone(), rpc_metrics,