diff --git a/Cargo.lock b/Cargo.lock index 2323e18..6e1786a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,7 +23,7 @@ dependencies = [ "const_panic", "core_extensions", "generational-arena", - "libloading", + "libloading 0.7.4", "lock_api", "parking_lot", "paste", @@ -611,6 +611,29 @@ dependencies = [ "cc", ] +[[package]] +name = "aws-lc-rs" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5932a7d9d28b0d2ea34c6b3779d35e3dd6f6345317c34e73438c4f1f29144151" +dependencies = [ + "aws-lc-sys", + "zeroize", +] + +[[package]] +name = "aws-lc-sys" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1826f2e4cfc2cd19ee53c42fbf68e2f81ec21108e0b7ecf6a71cf062137360fc" +dependencies = [ + "bindgen", + "cc", + "cmake", + "dunce", + "fs_extra", +] + [[package]] name = "axum" version = "0.8.4" @@ -707,6 +730,26 @@ dependencies = [ "serde", ] +[[package]] +name = "bindgen" +version = "0.72.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "993776b509cfb49c750f11b8f07a46fa23e0a1386ffc01fb1e7d343efc387895" +dependencies = [ + "bitflags", + "cexpr", + "clang-sys", + "itertools 0.13.0", + "log", + "prettyplease", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn 2.0.106", +] + [[package]] name = "bitflags" version = "2.9.4" @@ -940,6 +983,15 @@ dependencies = [ "shlex", ] +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", +] + [[package]] name = "cfg-if" version = "1.0.3" @@ -982,6 +1034,17 @@ dependencies = [ "inout", ] +[[package]] +name = "clang-sys" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" +dependencies = [ + "glob", + "libc", + "libloading 0.8.9", +] + [[package]] name = "clap" version = "4.5.48" @@ -1022,6 +1085,15 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b94f61472cee1439c0b966b47e3aca9ae07e45d070759512cd390ea2bebc6675" +[[package]] +name = "cmake" +version = "0.1.54" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7caa3f9de89ddbe2c607f4101924c5abec803763ae9534e4f4d7d8f84aa81f0" +dependencies = [ + "cc", +] + [[package]] name = "colorchoice" version = "1.0.4" @@ -1488,6 +1560,12 @@ dependencies = [ "yellowstone-grpc-proto", ] +[[package]] +name = "dunce" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" + [[package]] name = "ed25519" version = "1.5.3" @@ -1699,6 +1777,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs_extra" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" + [[package]] name = "funty" version = "2.0.0" @@ -1874,6 +1958,12 @@ version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43503cc176394dd30a6525f5f36e838339b8b5619be33ed9a7783841580a97b6" +[[package]] +name = "glob" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" + [[package]] name = "h2" version = "0.4.12" @@ -2086,7 +2176,7 @@ dependencies = [ "tokio", "tokio-rustls", "tower-service", - "webpki-roots", + "webpki-roots 1.0.3", ] [[package]] @@ -2470,6 +2560,8 @@ name = "keeprs" version = "0.1.0" dependencies = [ "anchor-lang", + "anyhow", + "async-trait", "axum", "clap", "crossbeam", @@ -2477,11 +2569,15 @@ dependencies = [ "drift-rs", "env_logger 0.11.8", "futures-util", + "http 0.2.12", "log", "mimalloc", "prometheus", "pyth-lazer-client", "pyth-lazer-protocol", + "rustls", + "serde", + "serde_json", "solana-account-decoder-client-types", "solana-rpc-client", "solana-rpc-client-api", @@ -2489,6 +2585,10 @@ dependencies = [ "solana-transaction-status", "solana-transaction-status-client-types", "tokio", + "tokio-rustls", + "tokio-tungstenite 0.28.0", + "url", + "webpki-roots 0.26.11", ] [[package]] @@ -2513,6 +2613,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "libloading" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7c4b02199fee7c5d21a5ae7d8cfa79a6ef5bb2fc834d6e9058e89c825efdc55" +dependencies = [ + "cfg-if", + "windows-link 0.2.1", +] + [[package]] name = "libmimalloc-sys" version = "0.1.44" @@ -2661,6 +2771,12 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.8.9" @@ -2705,6 +2821,16 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "num" version = "0.4.3" @@ -3541,7 +3667,7 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "web-sys", - "webpki-roots", + "webpki-roots 1.0.3", ] [[package]] @@ -3658,6 +3784,7 @@ version = "0.23.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd3c25631629d034ce7cd9940adc9d45762d46de2b0f57193c4443b92c6d4d40" dependencies = [ + "aws-lc-rs", "log", "once_cell", "ring", @@ -3695,6 +3822,7 @@ version = "0.103.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e10b3f4191e8a80e6b43eebabfac91e5dcecebb27a71f04e820c47ec41d314bf" dependencies = [ + "aws-lc-rs", "ring", "rustls-pki-types", "untrusted", @@ -6379,6 +6507,22 @@ dependencies = [ "tungstenite 0.26.2", ] +[[package]] +name = "tokio-tungstenite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d25a406cddcc431a75d3d9afc6a7c0f7428d4891dd973e4d54c56b46127bf857" +dependencies = [ + "futures-util", + "log", + "rustls", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tungstenite 0.28.0", + "webpki-roots 0.26.11", +] + [[package]] name = "tokio-util" version = "0.7.16" @@ -6656,6 +6800,25 @@ dependencies = [ "utf-8", ] +[[package]] +name = "tungstenite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8628dcc84e5a09eb3d8423d6cb682965dea9133204e8fb3efee74c2a0c259442" +dependencies = [ + "bytes", + "data-encoding", + "http 1.3.1", + "httparse", + "log", + "rand 0.9.2", + "rustls", + "rustls-pki-types", + "sha1", + "thiserror 2.0.17", + "utf-8", +] + [[package]] name = "typed-arena" version = "2.0.2" @@ -6907,6 +7070,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-roots" +version = "0.26.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" +dependencies = [ + "webpki-roots 1.0.3", +] + [[package]] name = "webpki-roots" version = "1.0.3" diff --git a/Cargo.toml b/Cargo.toml index 2aee9a2..3459c16 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,6 @@ [package] name = "keeprs" +default-run = "keeprs" version = "0.1.0" edition = "2021" description = "Rust keeper bots for Drift protocol" @@ -9,6 +10,7 @@ keywords = ["drift", "defi", "solana", "trading-bot", "liquidity"] categories = ["financial", "network-programming"] [dependencies] +async-trait = "0.1" anchor-lang = "*" axum = "0.8" clap = { version = "4.5.40", "features" = ["env", "derive"] } @@ -30,6 +32,15 @@ solana-sdk = "2.3" solana-transaction-status = "2.3" solana-transaction-status-client-types = "2.3" tokio = "*" +url = "2.5" +tokio-tungstenite = { version = "0.28.0", features = ["rustls-tls-webpki-roots"] } +anyhow = "1.0.100" +http = "0.2" +tokio-rustls = "0.26" +rustls = { version = "0.23", default-features = false, features = ["ring", "std"] } +webpki-roots = "0.26" +serde = { version = "1.0.228", features = ["derive"] } +serde_json = "1.0.145" [[bin]] name = "tx_history" diff --git a/src/exchange_connectors/binance.rs b/src/exchange_connectors/binance.rs new file mode 100644 index 0000000..9811dc9 --- /dev/null +++ b/src/exchange_connectors/binance.rs @@ -0,0 +1,166 @@ +use super::{Exchange, ExchangeCommand, PriceUpdate}; +use anyhow::Result; +use async_trait::async_trait; +use futures_util::{SinkExt, StreamExt}; +use log::{error, info, warn}; +use serde::Deserialize; +use serde_json::json; +use std::time::Duration; +use tokio::sync::mpsc::{self, Sender}; +use tokio_tungstenite::{connect_async, tungstenite::Message}; +use url::Url; + +pub struct BinanceExchange { + command_tx: Option>, +} + +impl BinanceExchange { + pub fn new() -> Self { + Self { command_tx: None } + } +} + +#[derive(Deserialize, Debug)] +struct BinanceBookTicker { + s: String, // symbol + b: String, // best bid + a: String, // best ask +} + +#[async_trait] +impl Exchange for BinanceExchange { + async fn connect(&mut self, sender: Sender) -> Result<()> { + let (cmd_tx, mut cmd_rx) = mpsc::channel(100); + self.command_tx = Some(cmd_tx); + + tokio::spawn(async move { + let mut active_symbols: Vec = Vec::new(); + + loop { + let url = Url::parse("wss://fstream.binance.com/ws").unwrap(); + info!("Connecting to Binance Futures WS: {}", url); + + match connect_async(url.to_string()).await { + Ok((mut ws_stream, _)) => { + info!("Connected to Binance"); + + // Resubscribe if we have active symbols + if !active_symbols.is_empty() { + let params: Vec = active_symbols + .iter() + .map(|s| format!("{}@bookTicker", s.to_lowercase())) + .collect(); + let subscribe_msg = json!({ + "method": "SUBSCRIBE", + "params": params, + "id": 1 + }); + if let Err(e) = ws_stream + .send(Message::Text(subscribe_msg.to_string().into())) + .await + { + error!("Failed to resubscribe: {}", e); + } else { + info!("Resubscribed to Binance symbols: {:?}", active_symbols); + } + } + + loop { + tokio::select! { + Some(cmd) = cmd_rx.recv() => { + match cmd { + ExchangeCommand::Subscribe(symbols) => { + active_symbols.extend(symbols.clone()); + // Dedup logic could go here + + let params: Vec = symbols.iter().map(|s| format!("{}@bookTicker", s.to_lowercase())).collect(); + let subscribe_msg = json!({ + "method": "SUBSCRIBE", + "params": params, + "id": 1 + }); + if let Err(e) = ws_stream.send(Message::Text(subscribe_msg.to_string().into())).await { + error!("Failed to subscribe: {}", e); + break; + } + info!("Subscribed to Binance symbols: {:?}", symbols); + } + ExchangeCommand::Unsubscribe(symbols) => { + active_symbols.retain(|s| !symbols.contains(s)); + + let params: Vec = symbols.iter().map(|s| format!("{}@bookTicker", s.to_lowercase())).collect(); + let unsubscribe_msg = json!({ + "method": "UNSUBSCRIBE", + "params": params, + "id": 1 + }); + if let Err(e) = ws_stream.send(Message::Text(unsubscribe_msg.to_string().into())).await { + error!("Failed to unsubscribe: {}", e); + break; + } + info!("Unsubscribed from Binance symbols: {:?}", symbols); + } + } + } + Some(msg) = ws_stream.next() => { + match msg { + Ok(Message::Text(text)) => { + if let Ok(ticker) = serde_json::from_str::(&text) { + let bid: f64 = ticker.b.parse().unwrap_or(0.0); + let ask: f64 = ticker.a.parse().unwrap_or(0.0); + if bid > 0.0 && ask > 0.0 { + let mid_price = (bid + ask) / 2.0; + let update = PriceUpdate { + symbol: ticker.s.to_uppercase(), + mid_price, + exchange: "binance".to_string(), + }; + if let Err(_) = sender.send(update).await { + break; + } + } + } + } + Ok(Message::Ping(payload)) => { + if let Err(_) = ws_stream.send(Message::Pong(payload)).await { + break; + } + } + Err(e) => { + error!("Binance WS error: {}", e); + break; + } + _ => {} + } + } + else => break, // Stream ended + } + } + } + Err(e) => { + error!("Failed to connect to Binance: {}", e); + } + } + warn!("Binance connection lost, reconnecting in 5s..."); + tokio::time::sleep(Duration::from_secs(5)).await; + } + }); + Ok(()) + } + + async fn subscribe(&self, symbols: &[String]) -> Result<()> { + if let Some(tx) = &self.command_tx { + tx.send(ExchangeCommand::Subscribe(symbols.to_vec())) + .await?; + } + Ok(()) + } + + async fn unsubscribe(&self, symbols: &[String]) -> Result<()> { + if let Some(tx) = &self.command_tx { + tx.send(ExchangeCommand::Unsubscribe(symbols.to_vec())) + .await?; + } + Ok(()) + } +} diff --git a/src/exchange_connectors/coinbase.rs b/src/exchange_connectors/coinbase.rs new file mode 100644 index 0000000..521f769 --- /dev/null +++ b/src/exchange_connectors/coinbase.rs @@ -0,0 +1,174 @@ +use super::{Exchange, ExchangeCommand, PriceUpdate}; +use anyhow::Result; +use async_trait::async_trait; +use futures_util::{SinkExt, StreamExt}; +use log::{error, info, warn}; +use serde::Deserialize; +use serde_json::json; +use std::time::Duration; +use tokio::sync::mpsc::{self, Sender}; +use tokio_tungstenite::{connect_async, tungstenite::Message}; +use url::Url; + +pub struct CoinbaseExchange { + command_tx: Option>, +} + +impl CoinbaseExchange { + pub fn new() -> Self { + Self { command_tx: None } + } +} + +#[derive(Deserialize, Debug)] +struct CoinbaseTickerMessage { + channel: String, + events: Vec, +} + +#[derive(Deserialize, Debug)] +struct CoinbaseEvent { + tickers: Vec, +} + +#[derive(Deserialize, Debug)] +struct CoinbaseTicker { + product_id: String, + best_bid: String, + best_ask: String, +} + +#[async_trait] +impl Exchange for CoinbaseExchange { + async fn connect(&mut self, sender: Sender) -> Result<()> { + let (cmd_tx, mut cmd_rx) = mpsc::channel(100); + self.command_tx = Some(cmd_tx); + + tokio::spawn(async move { + let mut active_symbols: Vec = Vec::new(); + + loop { + let url = Url::parse("wss://advanced-trade-ws.coinbase.com").unwrap(); + info!("Connecting to Coinbase Advanced Trade WS: {}", url); + + match connect_async(url.to_string()).await { + Ok((mut ws_stream, _)) => { + info!("Connected to Coinbase"); + + // Resubscribe + if !active_symbols.is_empty() { + let subscribe_msg = json!({ + "type": "subscribe", + "product_ids": active_symbols, + "channel": "ticker" + }); + if let Err(e) = ws_stream + .send(Message::Text(subscribe_msg.to_string().into())) + .await + { + error!("Failed to resubscribe: {}", e); + } else { + info!("Resubscribed to Coinbase symbols: {:?}", active_symbols); + } + } + + loop { + tokio::select! { + Some(cmd) = cmd_rx.recv() => { + match cmd { + ExchangeCommand::Subscribe(symbols) => { + active_symbols.extend(symbols.clone()); + let subscribe_msg = json!({ + "type": "subscribe", + "product_ids": symbols, + "channel": "ticker" + }); + if let Err(e) = ws_stream.send(Message::Text(subscribe_msg.to_string().into())).await { + error!("Failed to subscribe: {}", e); + break; + } + info!("Subscribed to Coinbase symbols: {:?}", symbols); + } + ExchangeCommand::Unsubscribe(symbols) => { + active_symbols.retain(|s| !symbols.contains(s)); + let unsubscribe_msg = json!({ + "type": "unsubscribe", + "product_ids": symbols, + "channel": "ticker" + }); + if let Err(e) = ws_stream.send(Message::Text(unsubscribe_msg.to_string().into())).await { + error!("Failed to unsubscribe: {}", e); + break; + } + info!("Unsubscribed from Coinbase symbols: {:?}", symbols); + } + } + } + Some(msg) = ws_stream.next() => { + match msg { + Ok(Message::Text(text)) => { + if let Ok(message) = serde_json::from_str::(&text) { + if message.channel == "ticker" { + for event in message.events { + for ticker in event.tickers { + let bid: f64 = ticker.best_bid.parse().unwrap_or(0.0); + let ask: f64 = ticker.best_ask.parse().unwrap_or(0.0); + if bid > 0.0 && ask > 0.0 { + let mid_price = (bid + ask) / 2.0; + let update = PriceUpdate { + symbol: ticker.product_id.clone(), + mid_price, + exchange: "coinbase".to_string(), + }; + if let Err(_) = sender.send(update).await { + break; + } + } + } + } + } + } + } + Ok(Message::Ping(payload)) => { + if let Err(_) = ws_stream.send(Message::Pong(payload)).await { + break; + } + } + Err(e) => { + error!("Coinbase WS error: {}", e); + break; + } + _ => {} + } + } + else => break, + } + } + } + Err(e) => { + error!("Failed to connect to Coinbase: {}", e); + } + } + warn!("Coinbase connection lost, reconnecting in 5s..."); + tokio::time::sleep(Duration::from_secs(5)).await; + } + }); + Ok(()) + } + + async fn subscribe(&self, symbols: &[String]) -> Result<()> { + if let Some(tx) = &self.command_tx { + tx.send(ExchangeCommand::Subscribe(symbols.to_vec())) + .await?; + } + Ok(()) + } + + async fn unsubscribe(&self, symbols: &[String]) -> Result<()> { + if let Some(tx) = &self.command_tx { + tx.send(ExchangeCommand::Unsubscribe(symbols.to_vec())) + .await?; + } + Ok(()) + } +} diff --git a/src/exchange_connectors/mod.rs b/src/exchange_connectors/mod.rs new file mode 100644 index 0000000..ec54e69 --- /dev/null +++ b/src/exchange_connectors/mod.rs @@ -0,0 +1,25 @@ +use anyhow::Result; +use async_trait::async_trait; +use tokio::sync::mpsc::Sender; + +#[derive(Debug, Clone)] +pub struct PriceUpdate { + pub symbol: String, + pub mid_price: f64, + pub exchange: String, +} + +pub enum ExchangeCommand { + Subscribe(Vec), + Unsubscribe(Vec), +} + +#[async_trait] +pub trait Exchange: Send + Sync { + async fn connect(&mut self, sender: Sender) -> Result<()>; + async fn subscribe(&self, symbols: &[String]) -> Result<()>; + async fn unsubscribe(&self, symbols: &[String]) -> Result<()>; +} + +pub mod binance; +pub mod coinbase; diff --git a/src/main.rs b/src/main.rs index f11d8af..77cadc8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,15 +1,18 @@ //! Rust Keeper Bot use std::sync::Arc; +pub mod exchange_connectors; mod filler; mod http; mod liquidator; +mod mm_oracle_cranker; mod util; use crate::{ filler::FillerBot, http::{health_handler, metrics_handler, Metrics}, liquidator::LiquidatorBot, + mm_oracle_cranker::MmOracleCrankerBot, }; use clap::Parser; @@ -27,6 +30,9 @@ pub struct Config { pub min_collateral: u64, /// Run perp liquidator bot #[clap(long, default_value = "false")] + pub mm_oracle_cranker: bool, + /// Run perp liquidator bot + #[clap(long, default_value = "false")] pub liquidator: bool, /// Run perp filler bot #[clap(long, default_value = "true")] @@ -50,6 +56,12 @@ pub struct Config { pub dry: bool, #[clap(long, default_value = "0")] pub sub_account_id: u16, + /// Comma-separated Binance symbols to subscribe (lowercase) + #[clap(long, env = "BINANCE_SYMBOLS", default_value = "btcusdt,ethusdt")] + pub binance_symbols: String, + /// Comma-separated Coinbase symbols to subscribe + #[clap(long, env = "COINBASE_SYMBOLS", default_value = "USDT-USD")] + pub coinbase_symbols: String, } enum UseMarkets { @@ -75,8 +87,9 @@ impl Config { #[tokio::main] async fn main() { - env_logger::init(); + let _ = rustls::crypto::ring::default_provider().install_default(); let _ = dotenv::dotenv(); + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); let config = Config::parse(); let metrics = Arc::new(Metrics::new()); @@ -141,6 +154,9 @@ async fn main() { if config.liquidator { let bot = LiquidatorBot::new(config, drift, metrics).await; bot.run().await; + } else if config.mm_oracle_cranker { + let bot = MmOracleCrankerBot::new(config, drift).await; + bot.run().await; } else if config.filler { let bot = FillerBot::new(config, drift, metrics).await; bot.run().await; diff --git a/src/mm_oracle_cranker.rs b/src/mm_oracle_cranker.rs new file mode 100644 index 0000000..b885b6c --- /dev/null +++ b/src/mm_oracle_cranker.rs @@ -0,0 +1,94 @@ +use crate::exchange_connectors::{ + binance::BinanceExchange, coinbase::CoinbaseExchange, Exchange, PriceUpdate, +}; +use crate::Config; +use anyhow::Result; +use drift_rs::DriftClient; +use log::{error, info}; +use std::collections::HashMap; +use tokio::sync::mpsc; +use tokio::time::{sleep, Duration}; + +pub struct MmOracleCrankerBot { + drift: DriftClient, + config: Config, +} + +impl MmOracleCrankerBot { + pub async fn new(config: Config, drift: DriftClient) -> Self { + Self { drift, config } + } + + pub async fn run(&self) { + info!("Starting MM Oracle Cranker Bot"); + + let (tx, mut rx) = mpsc::channel(100); + + // Keep exchanges alive to maintain connections + let mut _exchanges: Vec> = Vec::new(); + + // Binance + let binance_symbols: Vec = self + .config + .binance_symbols + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(); + + if !binance_symbols.is_empty() { + let mut binance = BinanceExchange::new(); + if let Err(e) = binance.connect(tx.clone()).await { + error!("Failed to connect to Binance: {}", e); + } else { + if let Err(e) = binance.subscribe(&binance_symbols).await { + error!("Failed to subscribe to Binance: {}", e); + } else { + _exchanges.push(Box::new(binance)); + } + } + } + + // Coinbase + let coinbase_symbols: Vec = self + .config + .coinbase_symbols + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(); + + if !coinbase_symbols.is_empty() { + let mut coinbase = CoinbaseExchange::new(); + if let Err(e) = coinbase.connect(tx.clone()).await { + error!("Failed to connect to Coinbase: {}", e); + } else { + if let Err(e) = coinbase.subscribe(&coinbase_symbols).await { + error!("Failed to subscribe to Coinbase: {}", e); + } else { + _exchanges.push(Box::new(coinbase)); + } + } + } + + let mut prices: HashMap = HashMap::new(); + + while let Some(update) = rx.recv().await { + prices.insert(update.symbol.clone(), update.mid_price); + + let btc_usdt = prices.get("BTCUSDT").cloned(); + let usdt_usd = prices.get("USDT-USD").cloned(); + + if let (Some(p1), Some(p2)) = (btc_usdt, usdt_usd) { + let btc_usd = p1 * p2; + info!( + "Calculated BTC/USD: {:.2} (BTC/USDT: {:.2}, USDT/USD: {:.4})", + btc_usd, p1, p2 + ); + } else { + // Just log what we have + info!("Update: {} = {:.4}", update.symbol, update.mid_price); + } + } + } +}