Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions crates/aggserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,10 @@ url = "2.5.4"
futures-util = "0.3.31"
async-stream = "0.3.6"


[dev-dependencies]
criterion = "0.5"

[[bench]]
name = "bench"
harness = false
51 changes: 51 additions & 0 deletions crates/aggserver/benches/bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use aggcommon::sources::SourceId;
use aggserver::service::model::{CombinedSummary, ContributorSnapshot};
use criterion::{Criterion, criterion_group, criterion_main};

fn snapshot_with_levels<const N: usize>(
source: SourceId,
time: u64,
bids: &[(f64, f64)],
asks: &[(f64, f64)],
) -> ContributorSnapshot<N> {
let mut s = ContributorSnapshot::new(source, time);
for &(p, a) in bids {
s.push_bid(p, a);
}
for &(p, a) in asks {
s.push_ask(p, a);
}
s
}

fn bench_combined_summary_to_proto(c: &mut Criterion) {
const N: usize = 10;

// Define bid and ask levels for two sources
let bids_source1 = (0..N).map(|i| (100.0 + i as f64, 10.0)).collect::<Vec<_>>();
let asks_source1 = (0..N).map(|i| (200.0 + i as f64, 10.0)).collect::<Vec<_>>();

let bids_source2 = (0..N)
.map(|i| (100.0 + (i / 2) as f64, 5.0))
.collect::<Vec<_>>();
let asks_source2 = (0..N)
.map(|i| (200.0 + (i / 2) as f64, 5.0))
.collect::<Vec<_>>();

// Create snapshots using snapshot_with_levels
let source1 = snapshot_with_levels::<N>(SourceId::Bitstamp, 0, &bids_source1, &asks_source1);
let source2 = snapshot_with_levels::<N>(SourceId::Binance, 0, &bids_source2, &asks_source2);

let mut combined_summary = CombinedSummary::<N>::new();
combined_summary.update_snapshot(source1);
combined_summary.update_snapshot(source2);

c.bench_function("combined_summary_to_proto", |b| {
b.iter(|| {
let _proto = combined_summary.to_proto();
});
});
}

criterion_group!(benches, bench_combined_summary_to_proto,);
criterion_main!(benches);
4 changes: 4 additions & 0 deletions crates/aggserver/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub mod config;
pub mod server;
pub mod service;
pub mod stream;
272 changes: 143 additions & 129 deletions crates/aggserver/src/service/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ use aggcommon::proto::orderbook::{Level, Summary};
use aggcommon::shutdown::{ShutdownReceiver, ShutdownSignaller};
use aggcommon::sources::{NUM_SOURCES, SourceId};
use anyhow::{Context, Result};
use arrayvec::ArrayVec;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::collections::BinaryHeap;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::watch;

/// Trait for order book summary sources.
Expand Down Expand Up @@ -98,85 +97,6 @@ pub struct PriceLevel {
}
use std::cmp::Ordering;

/// Represents a price used for ordering ask levels in an order book.
///
/// This struct guarantees a total ordering over `f64` by sanitizing any `NaN` input
/// to `0.0` at construction. `AskKeyedPrice` is sorted in ascending order, which is
/// typical for asks (lowest price first).
#[derive(Debug, Clone, Copy)]
struct AskKeyedPrice(pub f64);

impl AskKeyedPrice {
/// Constructs a new `AskKeyedPrice`, converting `NaN` to `0.0`.
pub fn new(value: f64) -> Self {
if value.is_nan() {
Self(0.0)
} else {
Self(value)
}
}
}

impl PartialEq for AskKeyedPrice {
fn eq(&self, other: &Self) -> bool {
self.0 == other.0
}
}

impl Eq for AskKeyedPrice {}

impl Ord for AskKeyedPrice {
fn cmp(&self, other: &Self) -> Ordering {
self.0.partial_cmp(&other.0).unwrap()
}
}

impl PartialOrd for AskKeyedPrice {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

/// Represents a price used for ordering bid levels in an order book.
///
/// This struct also guarantees a total ordering over `f64` by converting `NaN`
/// to `0.0`. Unlike `AskKeyedPrice`, bids are ordered in **descending** order,
/// meaning the highest bid price comes first.
#[derive(Debug, Clone, Copy)]
struct BidKeyedPrice(pub f64);

impl BidKeyedPrice {
/// Constructs a new `BidKeyedPrice`, converting `NaN` to `0.0`.
pub fn new(value: f64) -> Self {
if value.is_nan() {
Self(0.0)
} else {
Self(value)
}
}
}

impl PartialEq for BidKeyedPrice {
fn eq(&self, other: &Self) -> bool {
self.0 == other.0
}
}

impl Eq for BidKeyedPrice {}

impl Ord for BidKeyedPrice {
fn cmp(&self, other: &Self) -> Ordering {
// Reverse for descending sort (highest bid first)
self.0.partial_cmp(&other.0).unwrap().reverse()
}
}

impl PartialOrd for BidKeyedPrice {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

/// Represents top-N levels from a single contributor.
/// Assumes input is already sorted correctly (descending for bids, ascending for asks).
#[derive(Debug, Clone, Copy)]
Expand Down Expand Up @@ -323,66 +243,136 @@ impl<const N: usize> CombinedSummary<N> {

/// Converts the combined summary to a protobuf-compatible format.
pub fn to_proto(&self) -> Summary {
type SourceVec<const N: usize> = ArrayVec<Level, N>;
let mut bid_map: BTreeMap<BidKeyedPrice, SourceVec<N>> = BTreeMap::new();
let mut ask_map: BTreeMap<AskKeyedPrice, SourceVec<N>> = BTreeMap::new();
#[derive(Debug)]
struct DequeueLevels<'a, const N: usize, const IS_BID: bool> {
index_pos: usize,
cl: &'a ContributorLevels<N>,
source_id: SourceId,
}

impl<'a, const N: usize, const IS_BID: bool> DequeueLevels<'a, N, IS_BID> {
fn peek(&self) -> Option<Level> {
if self.index_pos < self.cl.depth() {
let level = self.cl.as_slice()[self.index_pos];
Some(Level {
price: level.price,
amount: level.amount,
exchange: self.source_id.to_string(),
})
} else {
None
}
}

let mut latest_time: NanoTime = 0;
fn pop(&mut self) -> Option<Level> {
let res = self.peek();
if res.is_some() {
self.index_pos += 1;
}
res
}

for snapshot in &self.sources {
if snapshot.is_empty() {
continue;
fn is_exhausted(&self) -> bool {
self.index_pos >= self.cl.depth()
}
}

latest_time = latest_time.max(snapshot.arrival_time);
impl<'a, const N: usize, const IS_BID: bool> Ord for DequeueLevels<'a, N, IS_BID> {
fn cmp(&self, other: &Self) -> Ordering {
let a = self.cl.as_slice()[self.index_pos].price;
let b = other.cl.as_slice()[other.index_pos].price;
let cmp = a.partial_cmp(&b).unwrap();
if IS_BID {
cmp // Descending for bids (max-heap)
} else {
cmp.reverse() // Ascending for asks (min-heap via max-heap reversal)
}
}
}

for level in snapshot.bids.as_slice() {
let level_proto = Level {
exchange: snapshot.source_id.to_string(),
price: level.price,
amount: level.amount,
};
bid_map
.entry(BidKeyedPrice::new(level.price))
.or_default()
.push(level_proto);
impl<'a, const N: usize, const IS_BID: bool> PartialOrd for DequeueLevels<'a, N, IS_BID> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl<'a, const N: usize, const IS_BID: bool> PartialEq for DequeueLevels<'a, N, IS_BID> {
fn eq(&self, other: &Self) -> bool {
self.cl.as_slice()[self.index_pos].price
== other.cl.as_slice()[other.index_pos].price
}
}

for level in snapshot.asks.as_slice() {
let level_proto = Level {
exchange: snapshot.source_id.to_string(),
price: level.price,
amount: level.amount,
impl<'a, const N: usize, const IS_BID: bool> Eq for DequeueLevels<'a, N, IS_BID> {}

fn merge_top_n<const IS_BID: bool, const N: usize>(
snapshots: &[ContributorSnapshot<N>],
) -> Vec<Level> {
let mut heap: BinaryHeap<DequeueLevels<N, IS_BID>> = BinaryHeap::new();

for snapshot in snapshots {
if snapshot.is_empty() {
continue;
}
let cl = if IS_BID {
&snapshot.bids
} else {
&snapshot.asks
};
ask_map
.entry(AskKeyedPrice::new(level.price))
.or_default()
.push(level_proto);

if cl.depth() > 0 {
heap.push(DequeueLevels {
index_pos: 0,
cl,
source_id: snapshot.source_id,
});
}
}

let mut result = Vec::new();
while result.len() < N {
if let Some(mut top) = heap.pop() {
let level = top.pop().unwrap();
let level_price = level.price;
result.push(level);

// Gather all with same price
while let Some(peeked) = heap.peek() {
if peeked.cl.as_slice()[peeked.index_pos].price == level_price {
let mut equal = heap.pop().unwrap();
result.push(equal.pop().unwrap());
if !equal.is_exhausted() {
heap.push(equal);
}
} else {
break;
}
}

if !top.is_exhausted() {
heap.push(top);
}
} else {
break;
}
}

result
}

let bids: Vec<Level> = bid_map
.into_iter()
.take(N)
.flat_map(|(_, mut levels)| {
levels.sort_unstable_by(|a, b| b.amount.partial_cmp(&a.amount).unwrap());
levels
})
.collect();

let asks: Vec<Level> = ask_map
.into_iter()
.take(N)
.flat_map(|(_, mut levels)| {
levels.sort_unstable_by(|a, b| b.amount.partial_cmp(&a.amount).unwrap());
levels
})
.collect();

let spread = spread_from(bids.first(), asks.first());
let latest_time = self
.sources
.iter()
.filter(|s| !s.is_empty())
.map(|s| s.arrival_time)
.max()
.unwrap_or(0);

let bids = merge_top_n::<true, N>(&self.sources);
let asks = merge_top_n::<false, N>(&self.sources);

Summary {
spread,
spread: spread_from(bids.first(), asks.first()),
bids,
asks,
arrival_time: latest_time,
Expand Down Expand Up @@ -664,4 +654,28 @@ mod tests {
let spread = spread_from(None, None);
assert!(spread.is_nan());
}

#[test]
fn test_multiple_contributors_same_price_are_preserved() {
let mut combined = CombinedSummary::<5>::new();

let mut snap1 = ContributorSnapshot::new(SourceId::Binance, 1);
snap1.push_ask(101.0, 1.0);
combined.update_snapshot(snap1);

let mut snap2 = ContributorSnapshot::new(SourceId::Bitstamp, 2);
snap2.push_ask(101.0, 2.0);
combined.update_snapshot(snap2);

let summary = combined.to_proto();

// Both levels should appear since they are at the same price
assert_eq!(summary.asks.len(), 2);
assert_eq!(summary.asks[0].price, 101.0);
assert_eq!(summary.asks[1].price, 101.0);

// Sorted by amount descending
assert_eq!(summary.asks[0].amount, 2.0);
assert_eq!(summary.asks[1].amount, 1.0);
}
}