diff --git a/crates/aggserver/Cargo.toml b/crates/aggserver/Cargo.toml index 3097a29..3bcdbd3 100644 --- a/crates/aggserver/Cargo.toml +++ b/crates/aggserver/Cargo.toml @@ -37,3 +37,10 @@ url = "2.5.4" futures-util = "0.3.31" async-stream = "0.3.6" + +[dev-dependencies] +criterion = "0.5" + +[[bench]] +name = "bench" +harness = false \ No newline at end of file diff --git a/crates/aggserver/benches/bench.rs b/crates/aggserver/benches/bench.rs new file mode 100644 index 0000000..38511e8 --- /dev/null +++ b/crates/aggserver/benches/bench.rs @@ -0,0 +1,51 @@ +use aggcommon::sources::SourceId; +use aggserver::service::model::{CombinedSummary, ContributorSnapshot}; +use criterion::{Criterion, criterion_group, criterion_main}; + +fn snapshot_with_levels( + source: SourceId, + time: u64, + bids: &[(f64, f64)], + asks: &[(f64, f64)], +) -> ContributorSnapshot { + let mut s = ContributorSnapshot::new(source, time); + for &(p, a) in bids { + s.push_bid(p, a); + } + for &(p, a) in asks { + s.push_ask(p, a); + } + s +} + +fn bench_combined_summary_to_proto(c: &mut Criterion) { + const N: usize = 10; + + // Define bid and ask levels for two sources + let bids_source1 = (0..N).map(|i| (100.0 + i as f64, 10.0)).collect::>(); + let asks_source1 = (0..N).map(|i| (200.0 + i as f64, 10.0)).collect::>(); + + let bids_source2 = (0..N) + .map(|i| (100.0 + (i / 2) as f64, 5.0)) + .collect::>(); + let asks_source2 = (0..N) + .map(|i| (200.0 + (i / 2) as f64, 5.0)) + .collect::>(); + + // Create snapshots using snapshot_with_levels + let source1 = snapshot_with_levels::(SourceId::Bitstamp, 0, &bids_source1, &asks_source1); + let source2 = snapshot_with_levels::(SourceId::Binance, 0, &bids_source2, &asks_source2); + + let mut combined_summary = CombinedSummary::::new(); + combined_summary.update_snapshot(source1); + combined_summary.update_snapshot(source2); + + c.bench_function("combined_summary_to_proto", |b| { + b.iter(|| { + let _proto = combined_summary.to_proto(); + }); + }); +} + +criterion_group!(benches, bench_combined_summary_to_proto,); +criterion_main!(benches); diff --git a/crates/aggserver/src/lib.rs b/crates/aggserver/src/lib.rs new file mode 100644 index 0000000..178a204 --- /dev/null +++ b/crates/aggserver/src/lib.rs @@ -0,0 +1,4 @@ +pub mod config; +pub mod server; +pub mod service; +pub mod stream; diff --git a/crates/aggserver/src/service/model.rs b/crates/aggserver/src/service/model.rs index 807e5f5..fc708fd 100644 --- a/crates/aggserver/src/service/model.rs +++ b/crates/aggserver/src/service/model.rs @@ -4,9 +4,8 @@ use aggcommon::proto::orderbook::{Level, Summary}; use aggcommon::shutdown::{ShutdownReceiver, ShutdownSignaller}; use aggcommon::sources::{NUM_SOURCES, SourceId}; use anyhow::{Context, Result}; -use arrayvec::ArrayVec; -use std::collections::{BTreeMap, HashMap}; -use std::sync::Arc; +use std::collections::BinaryHeap; +use std::{collections::HashMap, sync::Arc}; use tokio::sync::watch; /// Trait for order book summary sources. @@ -98,85 +97,6 @@ pub struct PriceLevel { } use std::cmp::Ordering; -/// Represents a price used for ordering ask levels in an order book. -/// -/// This struct guarantees a total ordering over `f64` by sanitizing any `NaN` input -/// to `0.0` at construction. `AskKeyedPrice` is sorted in ascending order, which is -/// typical for asks (lowest price first). -#[derive(Debug, Clone, Copy)] -struct AskKeyedPrice(pub f64); - -impl AskKeyedPrice { - /// Constructs a new `AskKeyedPrice`, converting `NaN` to `0.0`. - pub fn new(value: f64) -> Self { - if value.is_nan() { - Self(0.0) - } else { - Self(value) - } - } -} - -impl PartialEq for AskKeyedPrice { - fn eq(&self, other: &Self) -> bool { - self.0 == other.0 - } -} - -impl Eq for AskKeyedPrice {} - -impl Ord for AskKeyedPrice { - fn cmp(&self, other: &Self) -> Ordering { - self.0.partial_cmp(&other.0).unwrap() - } -} - -impl PartialOrd for AskKeyedPrice { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -/// Represents a price used for ordering bid levels in an order book. -/// -/// This struct also guarantees a total ordering over `f64` by converting `NaN` -/// to `0.0`. Unlike `AskKeyedPrice`, bids are ordered in **descending** order, -/// meaning the highest bid price comes first. -#[derive(Debug, Clone, Copy)] -struct BidKeyedPrice(pub f64); - -impl BidKeyedPrice { - /// Constructs a new `BidKeyedPrice`, converting `NaN` to `0.0`. - pub fn new(value: f64) -> Self { - if value.is_nan() { - Self(0.0) - } else { - Self(value) - } - } -} - -impl PartialEq for BidKeyedPrice { - fn eq(&self, other: &Self) -> bool { - self.0 == other.0 - } -} - -impl Eq for BidKeyedPrice {} - -impl Ord for BidKeyedPrice { - fn cmp(&self, other: &Self) -> Ordering { - // Reverse for descending sort (highest bid first) - self.0.partial_cmp(&other.0).unwrap().reverse() - } -} - -impl PartialOrd for BidKeyedPrice { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - /// Represents top-N levels from a single contributor. /// Assumes input is already sorted correctly (descending for bids, ascending for asks). #[derive(Debug, Clone, Copy)] @@ -323,66 +243,136 @@ impl CombinedSummary { /// Converts the combined summary to a protobuf-compatible format. pub fn to_proto(&self) -> Summary { - type SourceVec = ArrayVec; - let mut bid_map: BTreeMap> = BTreeMap::new(); - let mut ask_map: BTreeMap> = BTreeMap::new(); + #[derive(Debug)] + struct DequeueLevels<'a, const N: usize, const IS_BID: bool> { + index_pos: usize, + cl: &'a ContributorLevels, + source_id: SourceId, + } + + impl<'a, const N: usize, const IS_BID: bool> DequeueLevels<'a, N, IS_BID> { + fn peek(&self) -> Option { + if self.index_pos < self.cl.depth() { + let level = self.cl.as_slice()[self.index_pos]; + Some(Level { + price: level.price, + amount: level.amount, + exchange: self.source_id.to_string(), + }) + } else { + None + } + } - let mut latest_time: NanoTime = 0; + fn pop(&mut self) -> Option { + let res = self.peek(); + if res.is_some() { + self.index_pos += 1; + } + res + } - for snapshot in &self.sources { - if snapshot.is_empty() { - continue; + fn is_exhausted(&self) -> bool { + self.index_pos >= self.cl.depth() } + } - latest_time = latest_time.max(snapshot.arrival_time); + impl<'a, const N: usize, const IS_BID: bool> Ord for DequeueLevels<'a, N, IS_BID> { + fn cmp(&self, other: &Self) -> Ordering { + let a = self.cl.as_slice()[self.index_pos].price; + let b = other.cl.as_slice()[other.index_pos].price; + let cmp = a.partial_cmp(&b).unwrap(); + if IS_BID { + cmp // Descending for bids (max-heap) + } else { + cmp.reverse() // Ascending for asks (min-heap via max-heap reversal) + } + } + } - for level in snapshot.bids.as_slice() { - let level_proto = Level { - exchange: snapshot.source_id.to_string(), - price: level.price, - amount: level.amount, - }; - bid_map - .entry(BidKeyedPrice::new(level.price)) - .or_default() - .push(level_proto); + impl<'a, const N: usize, const IS_BID: bool> PartialOrd for DequeueLevels<'a, N, IS_BID> { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) } + } + + impl<'a, const N: usize, const IS_BID: bool> PartialEq for DequeueLevels<'a, N, IS_BID> { + fn eq(&self, other: &Self) -> bool { + self.cl.as_slice()[self.index_pos].price + == other.cl.as_slice()[other.index_pos].price + } + } - for level in snapshot.asks.as_slice() { - let level_proto = Level { - exchange: snapshot.source_id.to_string(), - price: level.price, - amount: level.amount, + impl<'a, const N: usize, const IS_BID: bool> Eq for DequeueLevels<'a, N, IS_BID> {} + + fn merge_top_n( + snapshots: &[ContributorSnapshot], + ) -> Vec { + let mut heap: BinaryHeap> = BinaryHeap::new(); + + for snapshot in snapshots { + if snapshot.is_empty() { + continue; + } + let cl = if IS_BID { + &snapshot.bids + } else { + &snapshot.asks }; - ask_map - .entry(AskKeyedPrice::new(level.price)) - .or_default() - .push(level_proto); + + if cl.depth() > 0 { + heap.push(DequeueLevels { + index_pos: 0, + cl, + source_id: snapshot.source_id, + }); + } + } + + let mut result = Vec::new(); + while result.len() < N { + if let Some(mut top) = heap.pop() { + let level = top.pop().unwrap(); + let level_price = level.price; + result.push(level); + + // Gather all with same price + while let Some(peeked) = heap.peek() { + if peeked.cl.as_slice()[peeked.index_pos].price == level_price { + let mut equal = heap.pop().unwrap(); + result.push(equal.pop().unwrap()); + if !equal.is_exhausted() { + heap.push(equal); + } + } else { + break; + } + } + + if !top.is_exhausted() { + heap.push(top); + } + } else { + break; + } } + + result } - let bids: Vec = bid_map - .into_iter() - .take(N) - .flat_map(|(_, mut levels)| { - levels.sort_unstable_by(|a, b| b.amount.partial_cmp(&a.amount).unwrap()); - levels - }) - .collect(); - - let asks: Vec = ask_map - .into_iter() - .take(N) - .flat_map(|(_, mut levels)| { - levels.sort_unstable_by(|a, b| b.amount.partial_cmp(&a.amount).unwrap()); - levels - }) - .collect(); - - let spread = spread_from(bids.first(), asks.first()); + let latest_time = self + .sources + .iter() + .filter(|s| !s.is_empty()) + .map(|s| s.arrival_time) + .max() + .unwrap_or(0); + + let bids = merge_top_n::(&self.sources); + let asks = merge_top_n::(&self.sources); Summary { - spread, + spread: spread_from(bids.first(), asks.first()), bids, asks, arrival_time: latest_time, @@ -664,4 +654,28 @@ mod tests { let spread = spread_from(None, None); assert!(spread.is_nan()); } + + #[test] + fn test_multiple_contributors_same_price_are_preserved() { + let mut combined = CombinedSummary::<5>::new(); + + let mut snap1 = ContributorSnapshot::new(SourceId::Binance, 1); + snap1.push_ask(101.0, 1.0); + combined.update_snapshot(snap1); + + let mut snap2 = ContributorSnapshot::new(SourceId::Bitstamp, 2); + snap2.push_ask(101.0, 2.0); + combined.update_snapshot(snap2); + + let summary = combined.to_proto(); + + // Both levels should appear since they are at the same price + assert_eq!(summary.asks.len(), 2); + assert_eq!(summary.asks[0].price, 101.0); + assert_eq!(summary.asks[1].price, 101.0); + + // Sorted by amount descending + assert_eq!(summary.asks[0].amount, 2.0); + assert_eq!(summary.asks[1].amount, 1.0); + } }