Skip to content

Commit

Permalink
fix: evm reconnecting (#248)
Browse files Browse the repository at this point in the history
* fix: evm reconnecting

* fix: clippy warnings
  • Loading branch information
frolvanya authored Feb 12, 2025
1 parent 8e88d69 commit 40c43b4
Showing 1 changed file with 45 additions and 28 deletions.
73 changes: 45 additions & 28 deletions omni-relayer/src/startup/evm.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,38 @@
use anyhow::{Context, Result};
use log::{error, info, warn};
use omni_types::ChainKind;
use reqwest::Client;
use tokio_stream::StreamExt;

use alloy::{
primitives::Address,
providers::{Provider, ProviderBuilder, RootProvider, WsConnect},
rpc::types::{Filter, Log},
sol_types::SolEvent,
transports::http::Http,
};
use anyhow::{Context, Result};
use ethereum_types::H256;
use log::{error, info, warn};
use omni_types::ChainKind;
use reqwest::{Client, Url};
use tokio_stream::StreamExt;

use crate::{
config, utils,
workers::near::{DeployToken, FinTransfer},
};

fn extract_evm_config(evm: config::Evm) -> (String, String, Address, u64, i64) {
(
evm.rpc_http_url,
fn hide_api_key<E: ToString>(err: &E) -> String {
let env_key = "INFURA_API_KEY";
let api_key = std::env::var(env_key).unwrap_or_default();
err.to_string().replace(&api_key, env_key)
}

fn extract_evm_config(evm: config::Evm) -> Result<(Url, String, Address, u64, i64)> {
Ok((
evm.rpc_http_url
.parse()
.context("Failed to parse EVM rpc provider as url")?,
evm.rpc_ws_url,
evm.bridge_token_factory_address,
evm.block_processing_batch_size,
evm.expected_finalization_time,
)
))
}

pub async fn start_indexer(
Expand All @@ -43,9 +50,9 @@ pub async fn start_indexer(
block_processing_batch_size,
expected_finalization_time,
) = match chain_kind {
ChainKind::Eth => extract_evm_config(config.eth.context("Failed to get Eth config")?),
ChainKind::Base => extract_evm_config(config.base.context("Failed to get Base config")?),
ChainKind::Arb => extract_evm_config(config.arb.context("Failed to get Arb config")?),
ChainKind::Eth => extract_evm_config(config.eth.context("Failed to get Eth config")?)?,
ChainKind::Base => extract_evm_config(config.base.context("Failed to get Base config")?)?,
ChainKind::Arb => extract_evm_config(config.arb.context("Failed to get Arb config")?)?,
_ => anyhow::bail!("Unsupported chain kind: {chain_kind:?}"),
};

Expand All @@ -61,20 +68,26 @@ pub async fn start_indexer(
);

loop {
let http_provider = ProviderBuilder::new().on_http(rpc_http_url.parse().context(
format!("Failed to parse {chain_kind:?} rpc provider as url",),
)?);
let http_provider = ProviderBuilder::new().on_http(rpc_http_url.clone());

process_recent_blocks(
&mut redis_connection,
&http_provider,
&filter,
chain_kind,
start_block,
block_processing_batch_size,
expected_finalization_time,
)
.await?;
crate::skip_fail!(
process_recent_blocks(
&mut redis_connection,
&http_provider,
&filter,
chain_kind,
start_block,
block_processing_batch_size,
expected_finalization_time,
)
.await
.map_err(|err| hide_api_key(&err)),
format!(
"Failed to process recent blocks for {:?} indexer",
chain_kind
),
5
);

info!(
"All historical logs processed, starting {:?} WS subscription",
Expand All @@ -84,13 +97,17 @@ pub async fn start_indexer(
let ws_provider = crate::skip_fail!(
ProviderBuilder::new()
.on_ws(WsConnect::new(&rpc_ws_url))
.await,
.await
.map_err(|err| hide_api_key(&err)),
format!("{chain_kind:?} WebSocket connection failed"),
5
);

let mut stream = crate::skip_fail!(
ws_provider.subscribe_logs(&filter).await,
ws_provider
.subscribe_logs(&filter)
.await
.map_err(|err| hide_api_key(&err)),
format!("{chain_kind:?} WebSocket subscription failed"),
5
)
Expand Down

0 comments on commit 40c43b4

Please sign in to comment.