Skip to content

Commit 78d59bc

Browse files
committed
Router: lazy compute prices for cold mints
1 parent e7029e8 commit 78d59bc

6 files changed

Lines changed: 124 additions & 8 deletions

File tree

bin/autobahn-router/src/edge.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub struct EdgeState {
1717
// TODO: it may be much better to store this centrally, so it's cheap to take a snapshot
1818
pub cached_prices: Vec<(u64, f64, f64)>,
1919
is_valid: bool,
20+
is_dirty: bool,
2021
pub last_update: u64,
2122
pub last_update_slot: u64,
2223

@@ -153,7 +154,7 @@ impl Edge {
153154
})
154155
.collect_vec();
155156

156-
debug!(input_mint = %self.input_mint, pool = %self.key(), multiplier = multiplier, price = price, amounts = amounts.iter().join(";"), "price_data");
157+
trace!(input_mint = %self.input_mint, pool = %self.key(), multiplier = multiplier, price = price, amounts = amounts.iter().join(";"), "price_data");
157158

158159
let overflow = amounts.iter().any(|x| *x == u64::MAX);
159160
if overflow {
@@ -166,6 +167,7 @@ impl Edge {
166167
state.last_update_slot = chain_data.newest_processed_slot();
167168
state.cached_prices.clear();
168169
state.is_valid = false;
170+
state.is_dirty = false;
169171
return;
170172
}
171173

@@ -196,6 +198,7 @@ impl Edge {
196198
state.last_update_slot = chain_data.newest_processed_slot();
197199
state.cached_prices.clear();
198200
state.is_valid = true;
201+
state.is_dirty = false;
199202

200203
if let Some(timestamp) = state.cooldown_until {
201204
if timestamp < state.last_update {
@@ -229,6 +232,26 @@ impl Edge {
229232
}
230233
}
231234

235+
pub fn mark_as_dirty(&self) {
236+
let mut state = self.state.write().unwrap();
237+
state.is_dirty = true;
238+
}
239+
240+
pub fn update_if_needed(
241+
&self,
242+
chain_data: &AccountProviderView,
243+
token_cache: &TokenCache,
244+
price_cache: &PriceCache,
245+
path_warming_amounts: &Vec<u64>,
246+
) {
247+
if !self.state.read().unwrap().is_dirty {
248+
return;
249+
}
250+
251+
debug!("Lazily updating {}->{}", debug_tools::name(&self.input_mint), debug_tools::name(&self.output_mint));
252+
self.update(chain_data, token_cache, price_cache, path_warming_amounts)
253+
}
254+
232255
pub fn update(
233256
&self,
234257
chain_data: &AccountProviderView,
@@ -259,7 +282,7 @@ impl EdgeState {
259282
/// Returns the price (in native/native) and ln(price) most applicable for the in amount
260283
/// Returns None if invalid
261284
pub fn cached_price_for(&self, in_amount: u64) -> Option<(f64, f64)> {
262-
if !self.is_valid() || self.cached_prices.is_empty() {
285+
if !self.is_valid() || self.cached_prices.is_empty() || self.is_dirty {
263286
return None;
264287
}
265288

@@ -272,7 +295,7 @@ impl EdgeState {
272295
}
273296

274297
pub fn cached_price_exact_out_for(&self, out_amount: u64) -> Option<(f64, f64)> {
275-
if !self.is_valid_out() {
298+
if !self.is_valid_out() || self.cached_prices.is_empty() || self.is_dirty {
276299
return None;
277300
}
278301

bin/autobahn-router/src/edge_updater.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::edge::Edge;
2+
use crate::hot_mints::{HotMintUpdate, HotMintsCache};
23
use crate::metrics;
34
use crate::token_cache::TokenCache;
45
use crate::util::tokio_spawn;
@@ -11,7 +12,7 @@ use router_lib::price_feeds::price_cache::PriceCache;
1112
use router_lib::price_feeds::price_feed::PriceUpdate;
1213
use solana_program::pubkey::Pubkey;
1314
use std::collections::{HashMap, HashSet};
14-
use std::sync::Arc;
15+
use std::sync::{Arc, RwLock};
1516
use std::time::{Duration, Instant};
1617
use tokio::sync::broadcast;
1718
use tokio::sync::broadcast::error::RecvError;
@@ -76,6 +77,8 @@ pub fn spawn_updater_job(
7677
token_cache: TokenCache,
7778
price_cache: PriceCache,
7879
path_warming_amounts: Vec<u64>,
80+
hot_mints: Arc<RwLock<HotMintsCache>>,
81+
mut hot_mints_receiver: broadcast::Receiver<HotMintUpdate>,
7982
register_mint_sender: async_channel::Sender<Pubkey>,
8083
ready_sender: async_channel::Sender<()>,
8184
mut slot_updates: broadcast::Receiver<u64>,
@@ -184,6 +187,19 @@ pub fn spawn_updater_job(
184187
},
185188
Ok(price_upd) = price_updates.recv() => {
186189
if let Some(impacted_edges) = updater.state.edges_per_mint.get(&price_upd.mint) {
190+
let reader = hot_mints.read().unwrap();
191+
for edge in impacted_edges {
192+
if reader.is_hot(&edge.input_mint) {
193+
updater.state.dirty_edges.insert(edge.unique_id(), edge.clone());
194+
}
195+
else {
196+
edge.mark_as_dirty();
197+
}
198+
}
199+
};
200+
},
201+
Ok(hot_mint_upd) = hot_mints_receiver.recv() => {
202+
if let Some(impacted_edges) = updater.state.edges_per_mint.get(&hot_mint_upd.mint) {
187203
for edge in impacted_edges {
188204
updater.state.dirty_edges.insert(edge.unique_id(), edge.clone());
189205
}

bin/autobahn-router/src/hot_mints.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,52 @@
11
use crate::debug_tools;
22
use router_config_lib::HotMintsConfig;
3+
use serde_derive::{Deserialize, Serialize};
34
use solana_program::pubkey::Pubkey;
45
use std::collections::{HashSet, VecDeque};
56
use std::str::FromStr;
7+
use tokio::sync::broadcast;
68
use tracing::info;
79

810
pub struct HotMintsCache {
911
max_count: usize,
1012
always_hot: HashSet<Pubkey>,
1113
latest_unordered: HashSet<Pubkey>,
1214
latest_ordered: VecDeque<Pubkey>,
15+
sender: Option<broadcast::Sender<HotMintUpdate>>,
16+
}
17+
18+
#[derive(Debug, Clone, Serialize, Deserialize)]
19+
pub struct HotMintUpdate {
20+
pub mint: Pubkey,
1321
}
1422

1523
impl HotMintsCache {
24+
pub fn new_with_watcher(
25+
config: &Option<HotMintsConfig>,
26+
sender: broadcast::Sender<HotMintUpdate>,
27+
) -> Self {
28+
let config = config.clone().unwrap_or(HotMintsConfig {
29+
always_hot_mints: vec![
30+
"So11111111111111111111111111111111111111112".to_string(),
31+
"EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v".to_string(),
32+
"Es9vMFrzaCERmJfrF4H2FYD4KCoNkY11McCe8BenwNYB".to_string(),
33+
],
34+
keep_latest_count: 100,
35+
});
36+
37+
HotMintsCache {
38+
max_count: config.keep_latest_count,
39+
always_hot: config
40+
.always_hot_mints
41+
.iter()
42+
.map(|x| Pubkey::from_str(x).unwrap())
43+
.collect(),
44+
latest_unordered: Default::default(),
45+
latest_ordered: Default::default(),
46+
sender: Some(sender),
47+
}
48+
}
49+
1650
pub fn new(config: &Option<HotMintsConfig>) -> Self {
1751
let config = config.clone().unwrap_or(HotMintsConfig {
1852
always_hot_mints: vec![
@@ -32,6 +66,7 @@ impl HotMintsCache {
3266
.collect(),
3367
latest_unordered: Default::default(),
3468
latest_ordered: Default::default(),
69+
sender: None,
3570
}
3671
}
3772

@@ -54,6 +89,12 @@ impl HotMintsCache {
5489
}
5590

5691
if self.latest_unordered.insert(pubkey) {
92+
if let Some(sender) = &self.sender {
93+
if sender.receiver_count() > 0 {
94+
sender.send(HotMintUpdate { mint: pubkey }).unwrap();
95+
}
96+
}
97+
5798
info!("Adding {} to hot mints", debug_tools::name(&pubkey));
5899
}
59100
self.latest_ordered.push_front(pubkey);
@@ -66,6 +107,10 @@ impl HotMintsCache {
66107
.copied()
67108
.collect()
68109
}
110+
111+
pub fn is_hot(&self, mint: &Pubkey) -> bool {
112+
self.latest_unordered.contains(mint) || self.always_hot.contains(mint)
113+
}
69114
}
70115

71116
#[cfg(test)]

bin/autobahn-router/src/main.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::edge_updater::{spawn_updater_job, Dex};
2-
use crate::hot_mints::HotMintsCache;
2+
use crate::hot_mints::{HotMintUpdate, HotMintsCache};
33
use crate::ix_builder::{SwapInstructionsBuilderImpl, SwapStepInstructionBuilderImpl};
44
use crate::liquidity::{spawn_liquidity_updater_job, LiquidityProvider};
55
use crate::path_warmer::spawn_path_warmer_job;
@@ -99,7 +99,11 @@ async fn main() -> anyhow::Result<()> {
9999
let config = Config::load(&args[1])?;
100100
let router_version = RouterVersion::OverestimateAmount;
101101

102-
let hot_mints = Arc::new(RwLock::new(HotMintsCache::new(&config.hot_mints)));
102+
let (hot_mint_sender, hot_mint_receiver) = broadcast::channel::<HotMintUpdate>(20);
103+
let hot_mints = Arc::new(RwLock::new(HotMintsCache::new_with_watcher(
104+
&config.hot_mints,
105+
hot_mint_sender.clone(),
106+
)));
103107

104108
let mango_data = match mango::mango_fetcher::fetch_mango_data().await {
105109
Err(e) => {
@@ -348,6 +352,8 @@ async fn main() -> anyhow::Result<()> {
348352
token_cache.clone(),
349353
price_cache.clone(),
350354
path_warming_amounts.clone(),
355+
hot_mints.clone(),
356+
hot_mint_sender.subscribe(),
351357
price_feed.register_mint_sender(),
352358
ready_channels[i].0.clone(),
353359
rpc_slot_sender.subscribe(),

bin/autobahn-router/src/routing.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,20 @@ use crate::debug_tools;
22
use crate::metrics;
33
use crate::prelude::*;
44
use crate::routing_objectpool::RoutingObjectPools;
5+
use crate::routing_types::*;
6+
use crate::token_cache::TokenCache;
57
use mango_feeds_connector::chain_data::AccountData;
68
use ordered_float::NotNan;
79
use router_config_lib::Config;
810
use router_lib::dex::SwapMode;
911
use router_lib::dex::{AccountProviderView, DexEdge};
12+
use router_lib::price_feeds::price_cache::PriceCache;
1013
use std::collections::hash_map::Entry::{Occupied, Vacant};
1114
use std::time::{Duration, Instant};
1215
use std::u64;
1316
use thiserror::Error;
1417
use tracing::Level;
1518

16-
use crate::routing_types::*;
17-
1819
#[derive(Error, Debug)]
1920
pub enum RoutingError {
2021
#[error("unsupported input mint {0:?}")]
@@ -1889,6 +1890,25 @@ impl Routing {
18891890
" - available edge"
18901891
);
18911892
}
1893+
1894+
pub fn lazy_compute_prices(
1895+
&self,
1896+
chain_data: &AccountProviderView,
1897+
token_cache: &TokenCache,
1898+
price_cache: &PriceCache,
1899+
mint: &Pubkey,
1900+
) {
1901+
for edge in &self.edges {
1902+
if edge.input_mint.eq(mint) {
1903+
edge.update_if_needed(
1904+
chain_data,
1905+
token_cache,
1906+
price_cache,
1907+
&self.path_warming_amounts,
1908+
)
1909+
}
1910+
}
1911+
}
18921912
}
18931913

18941914
#[cfg(test)]

bin/autobahn-router/src/server/route_provider.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,12 @@ impl RouteProvider for RoutingRouteProvider {
9191
hot_mints_guard.get()
9292
};
9393

94+
// ensure new hot mints are ready (edge cached_price should be available)
95+
self.routing
96+
.lazy_compute_prices(&self.chain_data, &self.tokens, &self.prices, &from_mint);
97+
self.routing
98+
.lazy_compute_prices(&self.chain_data, &self.tokens, &self.prices, &to_mint);
99+
94100
let route = self.routing.find_best_route(
95101
&self.chain_data,
96102
&from_mint,

0 commit comments

Comments
 (0)