Skip to content

Lazer exporter fixes #166

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jun 23, 2025
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pyth-agent"
version = "3.0.1"
version = "3.0.2"
edition = "2024"

[[bin]]
Expand All @@ -10,6 +10,7 @@ path = "src/bin/agent.rs"
[dependencies]
anyhow = "1.0.81"
backoff = "0.4.0"
base64 = "0.22.1"
ed25519-dalek = "2.1.1"
serde = { version = "1.0.197", features = ["derive", "rc"] }
async-trait = "0.1.79"
Expand Down
156 changes: 90 additions & 66 deletions src/agent/services/lazer_exporter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use {
crate::agent::state,
anyhow::{
Context,
Result,
anyhow,
bail,
Expand All @@ -9,6 +10,11 @@ use {
ExponentialBackoffBuilder,
backoff::Backoff,
},
base64::{
Engine,
prelude::BASE64_STANDARD,
},
ed25519_dalek::SigningKey,
futures_util::{
SinkExt,
stream::{
Expand All @@ -25,6 +31,7 @@ use {
Deserialize,
Serialize,
},
solana_sdk::signature::keypair,
std::{
path::PathBuf,
sync::Arc,
Expand Down Expand Up @@ -59,18 +66,23 @@ pub const RELAYER_CHANNEL_CAPACITY: usize = 1000;

#[derive(Clone, Debug, Deserialize)]
pub struct Config {
pub history_url: Url,
pub relayer_urls: Vec<Url>,
pub authorization_token: String,
pub publish_keypair_path: PathBuf,
pub history_url: Url,
pub relayer_urls: Vec<Url>,
pub publish_keypair_path: PathBuf,
#[serde(with = "humantime_serde", default = "default_publish_interval")]
pub publish_interval_duration: Duration,
pub publish_interval_duration: Duration,
#[serde(with = "humantime_serde", default = "default_symbol_fetch_interval")]
pub symbol_fetch_interval_duration: Duration,
}

fn default_publish_interval() -> Duration {
Duration::from_millis(200)
}

fn default_symbol_fetch_interval() -> Duration {
Duration::from_secs(60 * 60)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend using 1 minute. imagine we have a same-day listing. we want publishers to publish to it as fast as possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated!

}

struct RelayerWsSession {
ws_sender: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, TungsteniteMessage>,
}
Expand Down Expand Up @@ -248,17 +260,44 @@ async fn fetch_symbols(history_url: &Url) -> Result<Vec<SymbolResponse>> {
Ok(data)
}

fn get_signing_key(config: &Config) -> Result<SigningKey> {
// Read the keypair from the file using Solana SDK because it's the same key used by the Pythnet publisher
let publish_keypair = match keypair::read_keypair_file(&config.publish_keypair_path) {
Ok(k) => k,
Err(e) => {
tracing::error!(
error = ?e,
publish_keypair_path = config.publish_keypair_path.display().to_string(),
"Reading publish keypair returned an error. ",
);
bail!("Reading publish keypair returned an error. ");
}
};

SigningKey::from_keypair_bytes(&publish_keypair.to_bytes())
.context("Failed to create signing key from keypair")
}

#[instrument(skip(config, state))]
pub fn lazer_exporter(config: Config, state: Arc<state::State>) -> Vec<JoinHandle<()>> {
let mut handles = vec![];

let signing_key = match get_signing_key(&config) {
Ok(signing_key) => signing_key,
Err(e) => {
// This is fatal as we can't publish without the key.
tracing::error!("failed to get Lazer signing key: {e:?}");
panic!("failed to get Lazer signing key")
}
};

// can safely drop first receiver for ease of iteration
let (relayer_sender, _) = broadcast::channel(RELAYER_CHANNEL_CAPACITY);

for url in config.relayer_urls.iter() {
let mut task = RelayerSessionTask {
url: url.clone(),
token: config.authorization_token.to_owned(),
token: BASE64_STANDARD.encode(signing_key.verifying_key().to_bytes()),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we separate the signing key from the verifying key really early in the process, and pass them around separately? This is like one typo away from accidentally encoding and sending the private key, which makes me nervous. In fact, I wonder if we can put some guards in to prevent the private key from being encoded after its read or something.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also why are we setting the public key as the token? The plan was to retain access tokens unitl we can remove them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latter was an ask from Ali.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@darunrs we want pythnet publishers to start publishing asap. We will remove this when we implement the changes in the relayer. For now we still need to use tokens to match with publishers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also worried about accidentally exposing private keys, but since it's temporary it might be ok. Let's hear what @ali-behjati thinks.

Copy link

@darunrs darunrs Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. Thanks for filling me in. That makes sense, and I don't think it is problematic to make the public key and access key shared in the short term.

Copy link

@darunrs darunrs Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding private key leaking, we can override (or not derive) things like Debug, Serialize, Encode with something that produces dummy values for the signing key by wrapping it in a struct or type and making it a private member of the struct. I think it might not be too hard to get something in place. Doesn't necessarily have to be a blocker but I would feel better about it being implemented before having publishers use it.

Once we have the signing key on hand, I believe we only need to do two things: produce verifying key, and sign data. By exposing a limited interface I think we could protect against private key leakage to some degree.

receiver: relayer_sender.subscribe(),
};
handles.push(tokio::spawn(async move { task.run().await }));
Expand All @@ -268,6 +307,7 @@ pub fn lazer_exporter(config: Config, state: Arc<state::State>) -> Vec<JoinHandl
config.clone(),
state,
relayer_sender,
signing_key,
)));

handles
Expand All @@ -284,11 +324,6 @@ mod lazer_exporter {
},
state::local::LocalStore,
},
anyhow::{
Context,
Result,
bail,
},
ed25519_dalek::{
Signer,
SigningKey,
Expand All @@ -314,74 +349,33 @@ mod lazer_exporter {
signature_data::Data::Ed25519,
},
},
solana_sdk::signer::keypair,
std::{
collections::HashMap,
sync::Arc,
},
tokio::sync::broadcast::Sender,
url::Url,
};

fn get_signing_key(config: &Config) -> Result<SigningKey> {
// Read the keypair from the file using Solana SDK because it's the same key used by the Pythnet publisher
let publish_keypair = match keypair::read_keypair_file(&config.publish_keypair_path) {
Ok(k) => k,
Err(e) => {
tracing::error!(
error = ?e,
publish_keypair_path = config.publish_keypair_path.display().to_string(),
"Reading publish keypair returned an error. ",
);
bail!("Reading publish keypair returned an error. ");
}
};

SigningKey::from_keypair_bytes(&publish_keypair.to_bytes())
.context("Failed to create signing key from keypair")
}

pub async fn lazer_exporter<S>(
config: Config,
state: Arc<S>,
relayer_sender: Sender<SignedLazerTransaction>,
signing_key: SigningKey,
) where
S: LocalStore,
S: Send + Sync + 'static,
{
let signing_key = match get_signing_key(&config) {
Ok(signing_key) => signing_key,
Err(e) => {
tracing::error!("lazer_exporter signing key failure: {e:?}");
return;
}
};

// TODO: Re-fetch on an interval?
let lazer_symbols: HashMap<pyth_sdk::Identifier, SymbolResponse> =
match fetch_symbols(&config.history_url).await {
Ok(symbols) => symbols
.into_iter()
.filter_map(|symbol| {
let hermes_id = symbol.hermes_id.clone()?;
match pyth_sdk::Identifier::from_hex(hermes_id.clone()) {
Ok(id) => Some((id, symbol)),
Err(e) => {
tracing::warn!("Failed to parse hermes_id {}: {e:?}", hermes_id);
None
}
}
})
.collect(),
Err(e) => {
tracing::error!("Failed to fetch Lazer symbols: {e:?}");
return;
}
};

let mut lazer_symbols = get_lazer_symbol_map(&config.history_url).await;
let mut publish_interval = tokio::time::interval(config.publish_interval_duration);
let mut symbol_fetch_interval =
tokio::time::interval(config.symbol_fetch_interval_duration);

loop {
tokio::select! {
_ = symbol_fetch_interval.tick() => {
lazer_symbols = get_lazer_symbol_map(&config.history_url).await;
},
_ = publish_interval.tick() => {
let publisher_timestamp = MessageField::some(Timestamp::now());
let mut publisher_update = PublisherUpdate {
Expand Down Expand Up @@ -452,6 +446,30 @@ mod lazer_exporter {
}
}
}

async fn get_lazer_symbol_map(
history_url: &Url,
) -> HashMap<pyth_sdk::Identifier, SymbolResponse> {
match fetch_symbols(history_url).await {
Ok(symbols) => symbols
.into_iter()
.filter_map(|symbol| {
let hermes_id = symbol.hermes_id.clone()?;
match pyth_sdk::Identifier::from_hex(hermes_id.clone()) {
Ok(id) => Some((id, symbol)),
Err(e) => {
tracing::warn!("Failed to parse hermes_id {}: {e:?}", hermes_id);
None
}
}
})
.collect(),
Err(e) => {
tracing::error!("Failed to fetch Lazer symbols: {e:?}");
HashMap::new()
}
}
}
}

#[cfg(test)]
Expand Down Expand Up @@ -604,15 +622,21 @@ mod tests {
let state = Arc::new(local::Store::new(&mut Registry::default()));
let (relayer_sender, mut relayer_receiver) = broadcast::channel(RELAYER_CHANNEL_CAPACITY);
let private_key_file = get_private_key_file();
let private_key = get_private_key();

let config = Config {
history_url: Url::parse("http://127.0.0.1:12345").unwrap(),
relayer_urls: vec![Url::parse("http://127.0.0.1:12346").unwrap()],
authorization_token: "token1".to_string(),
publish_keypair_path: PathBuf::from(private_key_file.path()),
publish_interval_duration: Duration::from_secs(1),
history_url: Url::parse("http://127.0.0.1:12345").unwrap(),
relayer_urls: vec![Url::parse("http://127.0.0.1:12346").unwrap()],
publish_keypair_path: PathBuf::from(private_key_file.path()),
publish_interval_duration: Duration::from_secs(1),
symbol_fetch_interval_duration: Duration::from_secs(60 * 60),
};
tokio::spawn(lazer_exporter(config, state.clone(), relayer_sender));
tokio::spawn(lazer_exporter(
config,
state.clone(),
relayer_sender,
private_key,
));

tokio::time::sleep(std::time::Duration::from_millis(2000)).await;
match relayer_receiver.try_recv() {
Expand Down
Loading