Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions bin/autobahn-router/src/edge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub struct EdgeState {
// TODO: it may be much better to store this centrally, so it's cheap to take a snapshot
pub cached_prices: Vec<(u64, f64, f64)>,
is_valid: bool,
is_dirty: bool,
Comment thread
mschneider marked this conversation as resolved.
pub last_update: u64,
pub last_update_slot: u64,

Expand Down Expand Up @@ -153,7 +154,7 @@ impl Edge {
})
.collect_vec();

debug!(input_mint = %self.input_mint, pool = %self.key(), multiplier = multiplier, price = price, amounts = amounts.iter().join(";"), "price_data");
trace!(input_mint = %self.input_mint, pool = %self.key(), multiplier = multiplier, price = price, amounts = amounts.iter().join(";"), "price_data");

let overflow = amounts.iter().any(|x| *x == u64::MAX);
if overflow {
Expand All @@ -166,6 +167,7 @@ impl Edge {
state.last_update_slot = chain_data.newest_processed_slot();
state.cached_prices.clear();
state.is_valid = false;
state.is_dirty = false;
return;
}

Expand Down Expand Up @@ -196,6 +198,7 @@ impl Edge {
state.last_update_slot = chain_data.newest_processed_slot();
state.cached_prices.clear();
state.is_valid = true;
state.is_dirty = false;

if let Some(timestamp) = state.cooldown_until {
if timestamp < state.last_update {
Expand Down Expand Up @@ -229,6 +232,26 @@ impl Edge {
}
}

pub fn mark_as_dirty(&self) {
let mut state = self.state.write().unwrap();
state.is_dirty = true;
}

pub fn update_if_needed(
&self,
chain_data: &AccountProviderView,
token_cache: &TokenCache,
price_cache: &PriceCache,
path_warming_amounts: &Vec<u64>,
) {
if !self.state.read().unwrap().is_dirty {
return;
}

debug!("Lazily updating {}->{}", debug_tools::name(&self.input_mint), debug_tools::name(&self.output_mint));
self.update(chain_data, token_cache, price_cache, path_warming_amounts)
}

pub fn update(
&self,
chain_data: &AccountProviderView,
Expand Down Expand Up @@ -259,7 +282,7 @@ impl EdgeState {
/// Returns the price (in native/native) and ln(price) most applicable for the in amount
/// Returns None if invalid
pub fn cached_price_for(&self, in_amount: u64) -> Option<(f64, f64)> {
if !self.is_valid() || self.cached_prices.is_empty() {
if !self.is_valid() || self.cached_prices.is_empty() || self.is_dirty {
return None;
}

Expand All @@ -272,7 +295,7 @@ impl EdgeState {
}

pub fn cached_price_exact_out_for(&self, out_amount: u64) -> Option<(f64, f64)> {
if !self.is_valid_out() {
if !self.is_valid_out() || self.cached_prices.is_empty() || self.is_dirty {
return None;
}

Expand Down
18 changes: 17 additions & 1 deletion bin/autobahn-router/src/edge_updater.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::edge::Edge;
use crate::hot_mints::{HotMintUpdate, HotMintsCache};
use crate::metrics;
use crate::token_cache::TokenCache;
use crate::util::tokio_spawn;
Expand All @@ -11,7 +12,7 @@ use router_lib::price_feeds::price_cache::PriceCache;
use router_lib::price_feeds::price_feed::PriceUpdate;
use solana_program::pubkey::Pubkey;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
Expand Down Expand Up @@ -76,6 +77,8 @@ pub fn spawn_updater_job(
token_cache: TokenCache,
price_cache: PriceCache,
path_warming_amounts: Vec<u64>,
hot_mints: Arc<RwLock<HotMintsCache>>,
mut hot_mints_receiver: broadcast::Receiver<HotMintUpdate>,
register_mint_sender: async_channel::Sender<Pubkey>,
ready_sender: async_channel::Sender<()>,
mut slot_updates: broadcast::Receiver<u64>,
Expand Down Expand Up @@ -184,6 +187,19 @@ pub fn spawn_updater_job(
},
Ok(price_upd) = price_updates.recv() => {
if let Some(impacted_edges) = updater.state.edges_per_mint.get(&price_upd.mint) {
let reader = hot_mints.read().unwrap();
for edge in impacted_edges {
if reader.is_hot(&edge.input_mint) {
updater.state.dirty_edges.insert(edge.unique_id(), edge.clone());
}
else {
edge.mark_as_dirty();
}
}
};
},
Ok(hot_mint_upd) = hot_mints_receiver.recv() => {
if let Some(impacted_edges) = updater.state.edges_per_mint.get(&hot_mint_upd.mint) {
for edge in impacted_edges {
updater.state.dirty_edges.insert(edge.unique_id(), edge.clone());
}
Expand Down
45 changes: 45 additions & 0 deletions bin/autobahn-router/src/hot_mints.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,52 @@
use crate::debug_tools;
use router_config_lib::HotMintsConfig;
use serde_derive::{Deserialize, Serialize};
use solana_program::pubkey::Pubkey;
use std::collections::{HashSet, VecDeque};
use std::str::FromStr;
use tokio::sync::broadcast;
use tracing::info;

pub struct HotMintsCache {
max_count: usize,
always_hot: HashSet<Pubkey>,
latest_unordered: HashSet<Pubkey>,
latest_ordered: VecDeque<Pubkey>,
sender: Option<broadcast::Sender<HotMintUpdate>>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HotMintUpdate {
pub mint: Pubkey,
}

impl HotMintsCache {
pub fn new_with_watcher(
config: &Option<HotMintsConfig>,
sender: broadcast::Sender<HotMintUpdate>,
) -> Self {
let config = config.clone().unwrap_or(HotMintsConfig {
always_hot_mints: vec![
"So11111111111111111111111111111111111111112".to_string(),
"EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v".to_string(),
"Es9vMFrzaCERmJfrF4H2FYD4KCoNkY11McCe8BenwNYB".to_string(),
],
keep_latest_count: 100,
});

HotMintsCache {
max_count: config.keep_latest_count,
always_hot: config
.always_hot_mints
.iter()
.map(|x| Pubkey::from_str(x).unwrap())
.collect(),
latest_unordered: Default::default(),
latest_ordered: Default::default(),
sender: Some(sender),
}
}

pub fn new(config: &Option<HotMintsConfig>) -> Self {
let config = config.clone().unwrap_or(HotMintsConfig {
always_hot_mints: vec![
Expand All @@ -32,6 +66,7 @@ impl HotMintsCache {
.collect(),
latest_unordered: Default::default(),
latest_ordered: Default::default(),
sender: None,
}
}

Expand All @@ -54,6 +89,12 @@ impl HotMintsCache {
}

if self.latest_unordered.insert(pubkey) {
if let Some(sender) = &self.sender {
if sender.receiver_count() > 0 {
sender.send(HotMintUpdate { mint: pubkey }).unwrap();
}
}

info!("Adding {} to hot mints", debug_tools::name(&pubkey));
}
self.latest_ordered.push_front(pubkey);
Expand All @@ -66,6 +107,10 @@ impl HotMintsCache {
.copied()
.collect()
}

pub fn is_hot(&self, mint: &Pubkey) -> bool {
self.latest_unordered.contains(mint) || self.always_hot.contains(mint)
}
}

#[cfg(test)]
Expand Down
10 changes: 8 additions & 2 deletions bin/autobahn-router/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::edge_updater::{spawn_updater_job, Dex};
use crate::hot_mints::HotMintsCache;
use crate::hot_mints::{HotMintUpdate, HotMintsCache};
use crate::ix_builder::{SwapInstructionsBuilderImpl, SwapStepInstructionBuilderImpl};
use crate::liquidity::{spawn_liquidity_updater_job, LiquidityProvider};
use crate::path_warmer::spawn_path_warmer_job;
Expand Down Expand Up @@ -99,7 +99,11 @@ async fn main() -> anyhow::Result<()> {
let config = Config::load(&args[1])?;
let router_version = RouterVersion::OverestimateAmount;

let hot_mints = Arc::new(RwLock::new(HotMintsCache::new(&config.hot_mints)));
let (hot_mint_sender, hot_mint_receiver) = broadcast::channel::<HotMintUpdate>(20);
let hot_mints = Arc::new(RwLock::new(HotMintsCache::new_with_watcher(
&config.hot_mints,
hot_mint_sender.clone(),
)));

let mango_data = match mango::mango_fetcher::fetch_mango_data().await {
Err(e) => {
Expand Down Expand Up @@ -348,6 +352,8 @@ async fn main() -> anyhow::Result<()> {
token_cache.clone(),
price_cache.clone(),
path_warming_amounts.clone(),
hot_mints.clone(),
hot_mint_sender.subscribe(),
price_feed.register_mint_sender(),
ready_channels[i].0.clone(),
rpc_slot_sender.subscribe(),
Expand Down
24 changes: 22 additions & 2 deletions bin/autobahn-router/src/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@ use crate::debug_tools;
use crate::metrics;
use crate::prelude::*;
use crate::routing_objectpool::RoutingObjectPools;
use crate::routing_types::*;
use crate::token_cache::TokenCache;
use mango_feeds_connector::chain_data::AccountData;
use ordered_float::NotNan;
use router_config_lib::Config;
use router_lib::dex::SwapMode;
use router_lib::dex::{AccountProviderView, DexEdge};
use router_lib::price_feeds::price_cache::PriceCache;
use std::collections::hash_map::Entry::{Occupied, Vacant};
use std::time::{Duration, Instant};
use std::u64;
use thiserror::Error;
use tracing::Level;

use crate::routing_types::*;

#[derive(Error, Debug)]
pub enum RoutingError {
#[error("unsupported input mint {0:?}")]
Expand Down Expand Up @@ -1889,6 +1890,25 @@ impl Routing {
" - available edge"
);
}

pub fn lazy_compute_prices(
&self,
chain_data: &AccountProviderView,
token_cache: &TokenCache,
price_cache: &PriceCache,
mint: &Pubkey,
) {
for edge in &self.edges {
if edge.input_mint.eq(mint) {
edge.update_if_needed(
chain_data,
token_cache,
price_cache,
&self.path_warming_amounts,
)
}
}
}
}

#[cfg(test)]
Expand Down
6 changes: 6 additions & 0 deletions bin/autobahn-router/src/server/route_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ impl RouteProvider for RoutingRouteProvider {
hot_mints_guard.get()
};

// ensure new hot mints are ready (edge cached_price should be available)
self.routing
.lazy_compute_prices(&self.chain_data, &self.tokens, &self.prices, &from_mint);
self.routing
.lazy_compute_prices(&self.chain_data, &self.tokens, &self.prices, &to_mint);

let route = self.routing.find_best_route(
&self.chain_data,
&from_mint,
Expand Down
1 change: 1 addition & 0 deletions bin/autobahn-router/template-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ min_quote_out_to_in_amount_ratio = 0.65
always_hot_mints = [
"So11111111111111111111111111111111111111112", # SOL
"EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", # USDC
"Es9vMFrzaCERmJfrF4H2FYD4KCoNkY11McCe8BenwNYB", # USDT
]
keep_latest_count = 50

Expand Down