diff --git a/Cargo.lock b/Cargo.lock index 483e1e7..f544279 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3705,11 +3705,12 @@ dependencies = [ [[package]] name = "pyth-agent" -version = "3.0.2" +version = "3.0.3" dependencies = [ "anyhow", "async-trait", "backoff", + "base64 0.22.1", "bincode 2.0.1", "bytemuck", "chrono", diff --git a/Cargo.toml b/Cargo.toml index b53e3f1..48900f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-agent" -version = "3.0.2" +version = "3.0.3" edition = "2024" [[bin]] @@ -10,6 +10,7 @@ path = "src/bin/agent.rs" [dependencies] anyhow = "1.0.81" backoff = "0.4.0" +base64 = "0.22.1" ed25519-dalek = "2.1.1" serde = { version = "1.0.197", features = ["derive", "rc"] } async-trait = "0.1.79" diff --git a/src/agent/services/lazer_exporter.rs b/src/agent/services/lazer_exporter.rs index ab06572..2f2bbaa 100644 --- a/src/agent/services/lazer_exporter.rs +++ b/src/agent/services/lazer_exporter.rs @@ -1,14 +1,19 @@ use { crate::agent::state, anyhow::{ + Context, Result, - anyhow, bail, }, backoff::{ ExponentialBackoffBuilder, backoff::Backoff, }, + base64::{ + Engine, + prelude::BASE64_STANDARD, + }, + ed25519_dalek::SigningKey, futures_util::{ SinkExt, stream::{ @@ -25,6 +30,7 @@ use { Deserialize, Serialize, }, + solana_sdk::signature::keypair, std::{ path::PathBuf, sync::Arc, @@ -59,18 +65,23 @@ pub const RELAYER_CHANNEL_CAPACITY: usize = 1000; #[derive(Clone, Debug, Deserialize)] pub struct Config { - pub history_url: Url, - pub relayer_urls: Vec, - pub authorization_token: String, - pub publish_keypair_path: PathBuf, + pub history_url: Url, + pub relayer_urls: Vec, + pub publish_keypair_path: PathBuf, #[serde(with = "humantime_serde", default = "default_publish_interval")] - pub publish_interval_duration: Duration, + pub publish_interval_duration: Duration, + #[serde(with = "humantime_serde", default = "default_symbol_fetch_interval")] + pub symbol_fetch_interval_duration: Duration, } fn default_publish_interval() -> Duration { Duration::from_millis(200) } +fn default_symbol_fetch_interval() -> Duration { + Duration::from_secs(60 * 60) +} + struct RelayerWsSession { ws_sender: SplitSink>, TungsteniteMessage>, } @@ -145,8 +156,8 @@ impl RelayerSessionTask { failure_count += 1; let next_backoff = backoff.next_backoff().unwrap_or(max_interval); - tracing::error!( - "relayer session failed with error: {:?}, failure_count: {}; retrying in {:?}", + tracing::warn!( + "relayer session ended with error: {:?}, failure_count: {}; retrying in {:?}", e, failure_count, next_backoff @@ -199,7 +210,7 @@ impl RelayerSessionTask { tracing::error!("Error receiving message from at relayer: {e:?}"); } None => { - tracing::error!("relayer connection closed"); + tracing::warn!("relayer connection closed"); bail!("relayer connection closed"); } } @@ -240,7 +251,6 @@ struct SymbolResponse { async fn fetch_symbols(history_url: &Url) -> Result> { let mut url = history_url.clone(); - url.set_scheme("http").map_err(|_| anyhow!("invalid url"))?; url.set_path("/history/v1/symbols"); let client = Client::new(); let response = client.get(url).send().await?.error_for_status()?; @@ -248,17 +258,46 @@ async fn fetch_symbols(history_url: &Url) -> Result> { Ok(data) } +fn get_signing_key(config: &Config) -> Result { + // Read the keypair from the file using Solana SDK because it's the same key used by the Pythnet publisher + let publish_keypair = match keypair::read_keypair_file(&config.publish_keypair_path) { + Ok(k) => k, + Err(e) => { + tracing::error!( + error = ?e, + publish_keypair_path = config.publish_keypair_path.display().to_string(), + "Reading publish keypair returned an error. ", + ); + bail!("Reading publish keypair returned an error. "); + } + }; + + SigningKey::from_keypair_bytes(&publish_keypair.to_bytes()) + .context("Failed to create signing key from keypair") +} + #[instrument(skip(config, state))] pub fn lazer_exporter(config: Config, state: Arc) -> Vec> { let mut handles = vec![]; + let signing_key = match get_signing_key(&config) { + Ok(signing_key) => signing_key, + Err(e) => { + // This is fatal as we can't publish without the key. + tracing::error!("failed to get Lazer signing key: {e:?}"); + panic!("failed to get Lazer signing key") + } + }; + let pubkey_base64 = BASE64_STANDARD.encode(signing_key.verifying_key().to_bytes()); + tracing::info!("Loaded Lazer signing key; pubkey in base64: {pubkey_base64}"); + // can safely drop first receiver for ease of iteration let (relayer_sender, _) = broadcast::channel(RELAYER_CHANNEL_CAPACITY); for url in config.relayer_urls.iter() { let mut task = RelayerSessionTask { url: url.clone(), - token: config.authorization_token.to_owned(), + token: pubkey_base64.clone(), receiver: relayer_sender.subscribe(), }; handles.push(tokio::spawn(async move { task.run().await })); @@ -268,6 +307,7 @@ pub fn lazer_exporter(config: Config, state: Arc) -> Vec Result { - // Read the keypair from the file using Solana SDK because it's the same key used by the Pythnet publisher - let publish_keypair = match keypair::read_keypair_file(&config.publish_keypair_path) { - Ok(k) => k, - Err(e) => { - tracing::error!( - error = ?e, - publish_keypair_path = config.publish_keypair_path.display().to_string(), - "Reading publish keypair returned an error. ", - ); - bail!("Reading publish keypair returned an error. "); - } - }; - - SigningKey::from_keypair_bytes(&publish_keypair.to_bytes()) - .context("Failed to create signing key from keypair") - } - pub async fn lazer_exporter( config: Config, state: Arc, relayer_sender: Sender, + signing_key: SigningKey, ) where S: LocalStore, S: Send + Sync + 'static, { - let signing_key = match get_signing_key(&config) { - Ok(signing_key) => signing_key, - Err(e) => { - tracing::error!("lazer_exporter signing key failure: {e:?}"); - return; + // We can't publish to Lazer without symbols, so crash the process if it fails. + let mut lazer_symbols = match get_lazer_symbol_map(&config.history_url).await { + Ok(symbol_map) => { + if symbol_map.is_empty() { + panic!("Retrieved zero Lazer symbols from {}", config.history_url); + } + symbol_map + } + Err(_) => { + tracing::error!( + "Failed to retrieve Lazer symbols from {}", + config.history_url + ); + panic!( + "Failed to retrieve Lazer symbols from {}", + config.history_url + ); } }; - // TODO: Re-fetch on an interval? - let lazer_symbols: HashMap = - match fetch_symbols(&config.history_url).await { - Ok(symbols) => symbols - .into_iter() - .filter_map(|symbol| { - let hermes_id = symbol.hermes_id.clone()?; - match pyth_sdk::Identifier::from_hex(hermes_id.clone()) { - Ok(id) => Some((id, symbol)), - Err(e) => { - tracing::warn!("Failed to parse hermes_id {}: {e:?}", hermes_id); - None - } - } - }) - .collect(), - Err(e) => { - tracing::error!("Failed to fetch Lazer symbols: {e:?}"); - return; - } - }; + tracing::info!( + "Retrieved {} Lazer feeds with hermes symbols from symbols endpoint: {}", + lazer_symbols.len(), + &config.history_url + ); + + let (symbols_sender, mut symbols_receiver) = mpsc::channel(1); + tokio::spawn(get_lazer_symbols_task( + config.history_url.clone(), + config.symbol_fetch_interval_duration.clone(), + symbols_sender, + )); let mut publish_interval = tokio::time::interval(config.publish_interval_duration); @@ -448,9 +473,94 @@ mod lazer_exporter { tracing::error!("Error sending transaction to relayer receivers: {e}"); } } + }, + latest_symbol_map = symbols_receiver.recv() => { + match latest_symbol_map { + Some(symbol_map) => { + tracing::info!("Refreshing Lazer symbol map with {} symbols", symbol_map.len()); + lazer_symbols = symbol_map + } + None => { + // agent can continue but will eventually have a stale symbol set unless the process is cycled. + tracing::error!("Lazer symbol refresh channel closed") + } + } + }, + } + } + } + + async fn get_lazer_symbols_task( + history_url: Url, + fetch_interval_duration: Duration, + sender: mpsc::Sender>, + ) { + let mut symbol_fetch_interval = tokio::time::interval(fetch_interval_duration); + + loop { + tokio::select! { + _ = symbol_fetch_interval.tick() => { + tracing::info!("Refreshing Lazer symbol map from history service..."); + match get_lazer_symbol_map(&history_url).await { + Ok(symbol_map) => { + if symbol_map.is_empty() { + tracing::error!("Retrieved zero Lazer symbols from {}", history_url); + continue; + } + match sender.send(symbol_map).await { + Ok(_) => (), + Err(e) => { + // agent can continue but will eventually have a stale symbol set unless the process is cycled. + tracing::error!("Error sending refreshed symbol map to exporter task: {e}"); + } + } + }, + Err(_) => { + tracing::error!("Failed to retrieve Lazer symbols from {} in refresh task", history_url); + } + } + } + } + } + } + + async fn get_lazer_symbol_map( + history_url: &Url, + ) -> anyhow::Result> { + const NUM_RETRIES: usize = 3; + const RETRY_INTERVAL: Duration = Duration::from_secs(1); + let mut retry_count = 0; + + while retry_count < NUM_RETRIES { + match fetch_symbols(history_url).await { + Ok(symbols) => { + let symbol_map = symbols + .into_iter() + .filter_map(|symbol| { + let hermes_id = symbol.hermes_id.clone()?; + match pyth_sdk::Identifier::from_hex(hermes_id.clone()) { + Ok(id) => Some((id, symbol)), + Err(e) => { + tracing::warn!( + "Failed to parse hermes_id {}: {e:?}", + hermes_id + ); + None + } + } + }) + .collect(); + return Ok(symbol_map); + } + Err(e) => { + tracing::error!("Failed to fetch Lazer symbols: {e:?}"); + + retry_count += 1; + tokio::time::sleep(RETRY_INTERVAL).await; } } } + anyhow::bail!("Lazer symbol map fetch failed after {NUM_RETRIES} attempts"); } } @@ -604,15 +714,21 @@ mod tests { let state = Arc::new(local::Store::new(&mut Registry::default())); let (relayer_sender, mut relayer_receiver) = broadcast::channel(RELAYER_CHANNEL_CAPACITY); let private_key_file = get_private_key_file(); + let private_key = get_private_key(); let config = Config { - history_url: Url::parse("http://127.0.0.1:12345").unwrap(), - relayer_urls: vec![Url::parse("http://127.0.0.1:12346").unwrap()], - authorization_token: "token1".to_string(), - publish_keypair_path: PathBuf::from(private_key_file.path()), - publish_interval_duration: Duration::from_secs(1), + history_url: Url::parse("http://127.0.0.1:12345").unwrap(), + relayer_urls: vec![Url::parse("http://127.0.0.1:12346").unwrap()], + publish_keypair_path: PathBuf::from(private_key_file.path()), + publish_interval_duration: Duration::from_secs(1), + symbol_fetch_interval_duration: Duration::from_secs(60 * 60), }; - tokio::spawn(lazer_exporter(config, state.clone(), relayer_sender)); + tokio::spawn(lazer_exporter( + config, + state.clone(), + relayer_sender, + private_key, + )); tokio::time::sleep(std::time::Duration::from_millis(2000)).await; match relayer_receiver.try_recv() {