From 8e7b6d5ae61adce3203071cb27451beb2d86e7c9 Mon Sep 17 00:00:00 2001
From: Chizy
Date: Sun, 1 Mar 2026 23:23:04 -0500
Subject: [PATCH 1/4] feat: add multi-instrument support, cross-exchange
arbitrage, and performance optimizations
---
CMakeLists.txt | 66 ++-
README.md | 29 +-
config/default_config.json | 24 +
core/instrument/InstrumentManager.cpp | 321 ++++++++++++++
core/instrument/InstrumentManager.h | 152 +++++++
core/instrument/ResourceAllocator.cpp | 84 ++++
core/instrument/ResourceAllocator.h | 60 +++
core/risk/RiskConfig.h | 48 +-
core/risk/RiskManager.cpp | 106 ++++-
core/risk/RiskManager.h | 64 +++
core/utils/LockFreeOrderBook.cpp | 136 +++---
core/utils/ObjectPool.h | 147 +++++++
core/utils/ThreadAffinity.cpp | 110 +++++
core/utils/ThreadAffinity.h | 44 ++
docs/CROSS_EXCHANGE_ARBITRAGE.md | 144 ++++++
docs/CROSS_MARKET_CORRELATION.md | 139 ++++++
docs/MULTI_INSTRUMENT_GUIDE.md | 127 ++++++
docs/PERFORMANCE_OPTIMIZATION_GUIDE.md | 174 ++++++++
docs/ROADMAP.md | 44 +-
main.cpp | 183 +++++++-
.../analytics/CrossMarketCorrelation.cpp | 412 ++++++++++++++++++
strategies/analytics/CrossMarketCorrelation.h | 172 ++++++++
strategies/arbitrage/ArbitrageDetector.cpp | 256 +++++++++++
strategies/arbitrage/ArbitrageDetector.h | 171 ++++++++
strategies/arbitrage/ArbitrageExecutor.cpp | 138 ++++++
strategies/arbitrage/ArbitrageExecutor.h | 92 ++++
strategies/basic/MLEnhancedMarketMaker.cpp | 59 +++
strategies/basic/MLEnhancedMarketMaker.h | 27 ++
.../performance/MultiInstrumentBenchmark.cpp | 111 +++++
tests/unit/ArbitrageDetectorTests.cpp | 191 ++++++++
tests/unit/CrossMarketCorrelationTests.cpp | 181 ++++++++
tests/unit/InstrumentManagerTests.cpp | 121 +++++
tests/unit/RiskManagerTests.cpp | 80 ++++
33 files changed, 4120 insertions(+), 93 deletions(-)
create mode 100644 core/instrument/InstrumentManager.cpp
create mode 100644 core/instrument/InstrumentManager.h
create mode 100644 core/instrument/ResourceAllocator.cpp
create mode 100644 core/instrument/ResourceAllocator.h
create mode 100644 core/utils/ObjectPool.h
create mode 100644 core/utils/ThreadAffinity.cpp
create mode 100644 core/utils/ThreadAffinity.h
create mode 100644 docs/CROSS_EXCHANGE_ARBITRAGE.md
create mode 100644 docs/CROSS_MARKET_CORRELATION.md
create mode 100644 docs/MULTI_INSTRUMENT_GUIDE.md
create mode 100644 docs/PERFORMANCE_OPTIMIZATION_GUIDE.md
create mode 100644 strategies/analytics/CrossMarketCorrelation.cpp
create mode 100644 strategies/analytics/CrossMarketCorrelation.h
create mode 100644 strategies/arbitrage/ArbitrageDetector.cpp
create mode 100644 strategies/arbitrage/ArbitrageDetector.h
create mode 100644 strategies/arbitrage/ArbitrageExecutor.cpp
create mode 100644 strategies/arbitrage/ArbitrageExecutor.h
create mode 100644 tests/performance/MultiInstrumentBenchmark.cpp
create mode 100644 tests/unit/ArbitrageDetectorTests.cpp
create mode 100644 tests/unit/CrossMarketCorrelationTests.cpp
create mode 100644 tests/unit/InstrumentManagerTests.cpp
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 0677d44..ffe54f5 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -150,6 +150,18 @@ option(BUILD_VISUALIZATION "Build visualization server" ON)
option(USE_TBB "Use Intel TBB for parallel algorithms" OFF)
option(USE_DPDK "Use DPDK for kernel bypass networking" OFF)
option(USE_LOCK_FREE "Use lock-free data structures" ON)
+option(ENABLE_LTO "Enable Link-Time Optimization for release builds" OFF)
+
+# LTO support
+if(ENABLE_LTO)
+ include(CheckIPOSupported)
+ check_ipo_supported(RESULT LTO_SUPPORTED)
+ if(LTO_SUPPORTED)
+ message(STATUS "LTO/IPO enabled")
+ else()
+ message(STATUS "LTO/IPO not supported by this compiler")
+ endif()
+endif()
# Include directories
include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${WEBSOCKETPP_INCLUDE_DIRS}
@@ -191,7 +203,10 @@ set(CORE_SOURCES
core/persistence/journal/Journal.cpp
core/persistence/journal/JournalEntry.cpp
core/persistence/snapshot/SnapshotManager.cpp
- core/routing/OrderRouter.cpp)
+ core/routing/OrderRouter.cpp
+ core/instrument/InstrumentManager.cpp
+ core/instrument/ResourceAllocator.cpp
+ core/utils/ThreadAffinity.cpp)
# Strategy library files
set(STRATEGY_SOURCES
@@ -203,7 +218,10 @@ set(STRATEGY_SOURCES
strategies/analytics/MarketRegimeDetector.cpp
strategies/rl/RLParameterAdapter.cpp
strategies/backtesting/BacktestEngine.cpp
- strategies/config/StrategyConfig.cpp)
+ strategies/config/StrategyConfig.cpp
+ strategies/arbitrage/ArbitrageDetector.cpp
+ strategies/arbitrage/ArbitrageExecutor.cpp
+ strategies/analytics/CrossMarketCorrelation.cpp)
# Exchange library files
set(EXCHANGE_SOURCES
@@ -330,6 +348,11 @@ endif()
target_link_libraries(pinnaclemm ${PINNACLEMM_LIBS})
+# Apply LTO to main executable in release builds
+if(ENABLE_LTO AND LTO_SUPPORTED)
+ set_property(TARGET pinnaclemm PROPERTY INTERPROCEDURAL_OPTIMIZATION TRUE)
+endif()
+
# Tests
if(BUILD_TESTS)
enable_testing()
@@ -488,6 +511,33 @@ if(BUILD_TESTS)
Threads::Threads
Boost::filesystem)
add_test(NAME DisasterRecoveryTests COMMAND disaster_recovery_tests)
+
+ # Instrument Manager tests
+ add_executable(instrument_manager_tests tests/unit/InstrumentManagerTests.cpp)
+ target_link_libraries(
+ instrument_manager_tests
+ core
+ risk
+ strategy
+ exchange
+ GTest::gtest_main
+ GTest::gtest
+ Threads::Threads)
+ add_test(NAME InstrumentManagerTests COMMAND instrument_manager_tests)
+
+ # Arbitrage Detector tests
+ add_executable(arbitrage_detector_tests tests/unit/ArbitrageDetectorTests.cpp)
+ target_link_libraries(arbitrage_detector_tests core strategy
+ GTest::gtest_main GTest::gtest Threads::Threads)
+ add_test(NAME ArbitrageDetectorTests COMMAND arbitrage_detector_tests)
+
+ # Cross-Market Correlation tests
+ add_executable(cross_market_correlation_tests
+ tests/unit/CrossMarketCorrelationTests.cpp)
+ target_link_libraries(cross_market_correlation_tests core strategy
+ GTest::gtest_main GTest::gtest Threads::Threads)
+ add_test(NAME CrossMarketCorrelationTests
+ COMMAND cross_market_correlation_tests)
endif()
# Benchmarks
@@ -558,6 +608,18 @@ if(BUILD_BENCHMARKS)
add_executable(risk_check_benchmark tests/performance/RiskCheckBenchmark.cpp)
target_link_libraries(risk_check_benchmark core risk benchmark::benchmark
Threads::Threads)
+
+ # Multi-instrument benchmarks
+ add_executable(multi_instrument_benchmark
+ tests/performance/MultiInstrumentBenchmark.cpp)
+ target_link_libraries(
+ multi_instrument_benchmark
+ core
+ risk
+ strategy
+ exchange
+ benchmark::benchmark
+ Threads::Threads)
endif()
# Install targets
diff --git a/README.md b/README.md
index 38d55a5..4889eda 100644
--- a/README.md
+++ b/README.md
@@ -21,7 +21,8 @@
Order Routing •
Performance Benchmarks •
API Reference •
- Exchange Connectors
+ Exchange Connectors •
+ Multi-Instrument
@@ -52,6 +53,11 @@ PinnacleMM is a high-performance, production-grade market making system designed
- **Disaster Recovery**: Atomic risk state persistence, position reconciliation, and labeled backup management
- **Kubernetes Deployment**: Production-ready StatefulSet with health probes, PVC, network policies, and pod disruption budget
- **Enterprise Security**: AES-256-CBC encryption with unique salts, 100,000 PBKDF2 iterations, secure password input, comprehensive input validation, audit logging, rate limiting, and certificate pinning
+- **Multi-Instrument Trading**: Simultaneous trading across multiple symbols with `InstrumentManager` orchestration
+- **Cross-Exchange Arbitrage**: Venue price discrepancy detection with fee-adjusted opportunity scanning and dry-run execution
+- **Cross-Market Correlation**: Pearson/rolling correlation, lead-lag analysis, Engle-Granger cointegration, and signal-based spread adjustment
+- **Per-Symbol Risk Tracking**: Atomic per-symbol position, PnL, and volume tracking with configurable per-symbol limits
+- **Dynamic Resource Allocation**: CPU core distribution and thread pinning for multi-instrument deployments
- **Comprehensive Testing**: Extensive test suite ensuring reliability and performance
## System Architecture
@@ -161,6 +167,12 @@ cd build && ./pinnaclemm --mode simulation --enable-ml --json-log --json-log-fil
# Combined: ML + visualization + JSON logging
cd build && ./pinnaclemm --mode simulation --enable-ml --enable-visualization --json-log --json-log-file sim_ml_data.jsonl
+# Multi-instrument simulation
+cd build && ./pinnaclemm --mode simulation --symbols BTC-USD,ETH-USD
+
+# Arbitrage detection (dry-run)
+cd build && ./pinnaclemm --mode simulation --symbol BTC-USD --enable-arbitrage --arb-dry-run
+
# The visualization dashboard will be available at:
# - WebSocket: ws://localhost:8080 (or custom port with --viz-ws-port)
# - REST API: http://localhost:8081 (or custom port with --viz-api-port)
@@ -453,6 +465,9 @@ docker run -it --name pinnaclemm-live \
- **Market Regime Detector**: Hidden Markov Model-based detection of 8 market regimes
- **RL Parameter Adapter**: Reinforcement learning for dynamic strategy parameter optimization
- **Advanced Backtesting Engine**: Historical replay with Monte Carlo analysis and statistical testing
+- **Instrument Manager**: Multi-instrument orchestration with per-symbol order books, strategies, and simulators
+- **Arbitrage Detector**: Cross-exchange price discrepancy detection with fee-adjusted scanning
+- **Cross-Market Correlation**: Statistical lead-lag analysis and cointegration testing for signal generation
- **Real-Time Visualization**: Web-based dashboard with Chart.js and D3.js visualization
- **FIX Protocol Engine**: Professional-grade FIX connectivity for institutional trading
- **Persistence System**: Crash recovery with memory-mapped files
@@ -548,6 +563,12 @@ open build/test_dashboard.html
# or manually: file:///path/to/PinnacleMM/build/test_dashboard.html
```
+### Multi-Instrument & Optimization
+- [Multi-Instrument Guide](docs/MULTI_INSTRUMENT_GUIDE.md) - **Multi-symbol trading with InstrumentManager**
+- [Cross-Exchange Arbitrage](docs/CROSS_EXCHANGE_ARBITRAGE.md) - **Venue spread detection and execution**
+- [Cross-Market Correlation](docs/CROSS_MARKET_CORRELATION.md) - **Statistical correlation and signal generation**
+- [Performance Optimization Guide](docs/PERFORMANCE_OPTIMIZATION_GUIDE.md) - **LTO, CPU affinity, object pooling, lock-free fixes**
+
### Exchange Integration
- [FIX Protocol Integration Guide](docs/FIX_PROTOCOL_INTEGRATION.md)
- [FIX Testing Guide](docs/TESTING_GUIDE.md)
@@ -621,6 +642,12 @@ cd build
./disaster_recovery_tests # 8 tests - state persistence, backups
./risk_check_benchmark # Risk check latency benchmarks
+# Test multi-instrument and optimization components (Phase 5)
+./instrument_manager_tests # 9 tests - lifecycle management
+./arbitrage_detector_tests # 8 tests - opportunity detection, fees
+./cross_market_correlation_tests # 7 tests - correlation, lead-lag
+./multi_instrument_benchmark # Startup scaling benchmarks
+
# Memory safety validation with Address Sanitizer (development builds)
cmake -DCMAKE_BUILD_TYPE=Debug -DENABLE_SANITIZERS=ON .. && make -j8
./pinnaclemm --mode simulation --symbol BTC-USD --verbose
diff --git a/config/default_config.json b/config/default_config.json
index f61843f..261db19 100644
--- a/config/default_config.json
+++ b/config/default_config.json
@@ -60,6 +60,30 @@
"keepSnapshots": 5,
"compactionThreshold": 1000000
},
+ "instruments": [
+ {
+ "symbol": "BTC-USD",
+ "enabled": true,
+ "useLockFree": true,
+ "enableML": false,
+ "baseSpreadBps": 10.0,
+ "orderQuantity": 0.01,
+ "maxPosition": 10.0
+ }
+ ],
+ "arbitrage": {
+ "enabled": false,
+ "minSpreadBps": 5.0,
+ "minProfitUsd": 1.0,
+ "maxStalenessMs": 500,
+ "scanIntervalMs": 10,
+ "dryRun": true,
+ "venues": ["coinbase", "kraken"],
+ "venueFees": {
+ "coinbase": 0.001,
+ "kraken": 0.0016
+ }
+ },
"risk_management": {
"limits": {
"max_position_size": 10.0,
diff --git a/core/instrument/InstrumentManager.cpp b/core/instrument/InstrumentManager.cpp
new file mode 100644
index 0000000..b5210d6
--- /dev/null
+++ b/core/instrument/InstrumentManager.cpp
@@ -0,0 +1,321 @@
+#include "InstrumentManager.h"
+#include "../../strategies/basic/MLEnhancedMarketMaker.h"
+#include "../persistence/PersistenceManager.h"
+
+#include
+#include
+
+namespace pinnacle {
+namespace instrument {
+
+InstrumentManager::~InstrumentManager() { stopAll(); }
+
+bool InstrumentManager::addInstrument(const InstrumentConfig& config,
+ const std::string& mode) {
+ std::lock_guard lock(m_mutex);
+
+ if (m_instruments.count(config.symbol)) {
+ spdlog::warn("Instrument {} already registered", config.symbol);
+ return false;
+ }
+
+ InstrumentContext ctx;
+ ctx.symbol = config.symbol;
+ ctx.config = config;
+
+ // Try to recover order book from persistence
+ auto& persistenceManager = persistence::PersistenceManager::getInstance();
+ ctx.orderBook = persistenceManager.getRecoveredOrderBook(config.symbol);
+
+ if (ctx.orderBook) {
+ spdlog::info("Using recovered order book for {} with {} existing orders",
+ config.symbol, ctx.orderBook->getOrderCount());
+ } else {
+ if (config.useLockFree) {
+ ctx.orderBook = std::make_shared(config.symbol);
+ spdlog::info("[{}] Using lock-free order book", config.symbol);
+ } else {
+ ctx.orderBook = std::make_shared(config.symbol);
+ spdlog::info("[{}] Using mutex-based order book", config.symbol);
+ }
+ }
+
+ // Create strategy
+ strategy::StrategyConfig stratConfig;
+ stratConfig.symbol = config.symbol;
+ stratConfig.baseSpreadBps = config.baseSpreadBps;
+ stratConfig.orderQuantity = config.orderQuantity;
+ stratConfig.maxPosition = config.maxPosition;
+
+ if (config.enableML) {
+ strategy::MLEnhancedMarketMaker::MLConfig mlConfig{};
+ mlConfig.enableMLSpreadOptimization = true;
+ mlConfig.enableOnlineLearning = true;
+ mlConfig.fallbackToHeuristics = true;
+ mlConfig.mlConfidenceThreshold = 0.5;
+
+ ctx.strategy = std::make_shared(
+ config.symbol, stratConfig, mlConfig);
+ spdlog::info("[{}] Using ML-enhanced market maker", config.symbol);
+ } else {
+ ctx.strategy = std::make_shared(config.symbol,
+ stratConfig);
+ spdlog::info("[{}] Using basic market maker", config.symbol);
+ }
+
+ // Create simulator for non-live modes
+ if (mode != "live") {
+ ctx.simulator =
+ std::make_shared(ctx.orderBook);
+ }
+
+ m_instruments.emplace(config.symbol, std::move(ctx));
+ spdlog::info("Instrument {} added (mode={})", config.symbol, mode);
+ return true;
+}
+
+bool InstrumentManager::removeInstrument(const std::string& symbol) {
+ std::lock_guard lock(m_mutex);
+
+ auto it = m_instruments.find(symbol);
+ if (it == m_instruments.end()) {
+ spdlog::warn("Instrument {} not found for removal", symbol);
+ return false;
+ }
+
+ auto& ctx = it->second;
+
+ // Stop components
+ if (ctx.strategy && ctx.strategy->isRunning()) {
+ ctx.strategy->stop();
+ }
+ if (ctx.simulator && ctx.simulator->isRunning()) {
+ ctx.simulator->stop();
+ }
+
+ m_instruments.erase(it);
+ spdlog::info("Instrument {} removed", symbol);
+ return true;
+}
+
+bool InstrumentManager::startAll() {
+ std::lock_guard lock(m_mutex);
+
+ bool allOk = true;
+ for (auto& [symbol, ctx] : m_instruments) {
+ if (ctx.running) {
+ continue;
+ }
+
+ if (!ctx.strategy) {
+ spdlog::error("[{}] Strategy is null", symbol);
+ allOk = false;
+ continue;
+ }
+
+ // Initialize strategy
+ if (!ctx.strategy->initialize(ctx.orderBook)) {
+ spdlog::error("[{}] Failed to initialize strategy", symbol);
+ allOk = false;
+ continue;
+ }
+
+ // Start strategy
+ if (!ctx.strategy->start()) {
+ spdlog::error("[{}] Failed to start strategy", symbol);
+ allOk = false;
+ continue;
+ }
+
+ // Start simulator if present
+ if (ctx.simulator) {
+ if (!ctx.simulator->start()) {
+ spdlog::error("[{}] Failed to start simulator", symbol);
+ ctx.strategy->stop();
+ allOk = false;
+ continue;
+ }
+ }
+
+ ctx.running = true;
+ spdlog::info("[{}] Instrument started", symbol);
+ }
+
+ return allOk;
+}
+
+bool InstrumentManager::stopAll() {
+ std::lock_guard lock(m_mutex);
+
+ bool allOk = true;
+ for (auto& [symbol, ctx] : m_instruments) {
+ if (!ctx.running) {
+ continue;
+ }
+
+ if (ctx.strategy && ctx.strategy->isRunning()) {
+ if (!ctx.strategy->stop()) {
+ spdlog::error("[{}] Failed to stop strategy", symbol);
+ allOk = false;
+ }
+ }
+
+ if (ctx.simulator && ctx.simulator->isRunning()) {
+ if (!ctx.simulator->stop()) {
+ spdlog::error("[{}] Failed to stop simulator", symbol);
+ allOk = false;
+ }
+ }
+
+ ctx.running = false;
+ spdlog::info("[{}] Instrument stopped", symbol);
+ }
+
+ return allOk;
+}
+
+bool InstrumentManager::startInstrument(const std::string& symbol) {
+ std::lock_guard lock(m_mutex);
+
+ auto it = m_instruments.find(symbol);
+ if (it == m_instruments.end()) {
+ return false;
+ }
+
+ auto& ctx = it->second;
+ if (ctx.running) {
+ return true;
+ }
+
+ if (!ctx.strategy) {
+ spdlog::error("[{}] Strategy is null", symbol);
+ return false;
+ }
+ if (!ctx.strategy->initialize(ctx.orderBook)) {
+ spdlog::error("[{}] Failed to initialize strategy", symbol);
+ return false;
+ }
+ if (!ctx.strategy->start()) {
+ spdlog::error("[{}] Failed to start strategy", symbol);
+ return false;
+ }
+ if (ctx.simulator && !ctx.simulator->start()) {
+ spdlog::error("[{}] Failed to start simulator", symbol);
+ ctx.strategy->stop();
+ return false;
+ }
+
+ ctx.running = true;
+ return true;
+}
+
+bool InstrumentManager::stopInstrument(const std::string& symbol) {
+ std::lock_guard lock(m_mutex);
+
+ auto it = m_instruments.find(symbol);
+ if (it == m_instruments.end()) {
+ return false;
+ }
+
+ auto& ctx = it->second;
+ if (!ctx.running) {
+ return true;
+ }
+
+ if (ctx.strategy && ctx.strategy->isRunning()) {
+ ctx.strategy->stop();
+ }
+ if (ctx.simulator && ctx.simulator->isRunning()) {
+ ctx.simulator->stop();
+ }
+
+ ctx.running = false;
+ return true;
+}
+
+InstrumentContext* InstrumentManager::getContext(const std::string& symbol) {
+ std::lock_guard lock(m_mutex);
+ auto it = m_instruments.find(symbol);
+ if (it == m_instruments.end()) {
+ return nullptr;
+ }
+ return &it->second;
+}
+
+const InstrumentContext*
+InstrumentManager::getContext(const std::string& symbol) const {
+ std::lock_guard lock(m_mutex);
+ auto it = m_instruments.find(symbol);
+ if (it == m_instruments.end()) {
+ return nullptr;
+ }
+ return &it->second;
+}
+
+std::vector InstrumentManager::getSymbols() const {
+ std::lock_guard lock(m_mutex);
+ std::vector symbols;
+ symbols.reserve(m_instruments.size());
+ for (const auto& [sym, _] : m_instruments) {
+ symbols.push_back(sym);
+ }
+ return symbols;
+}
+
+size_t InstrumentManager::getInstrumentCount() const {
+ std::lock_guard lock(m_mutex);
+ return m_instruments.size();
+}
+
+bool InstrumentManager::hasInstrument(const std::string& symbol) const {
+ std::lock_guard lock(m_mutex);
+ return m_instruments.count(symbol) > 0;
+}
+
+std::string InstrumentManager::getAggregateStatistics() const {
+ std::lock_guard lock(m_mutex);
+
+ std::ostringstream oss;
+ double totalPnL = 0.0;
+ double totalPosition = 0.0;
+ size_t totalOrders = 0;
+
+ for (const auto& [symbol, ctx] : m_instruments) {
+ oss << "--- " << symbol << " ---\n";
+
+ if (ctx.orderBook) {
+ oss << " Best bid: " << ctx.orderBook->getBestBidPrice() << "\n";
+ oss << " Best ask: " << ctx.orderBook->getBestAskPrice() << "\n";
+ oss << " Mid price: " << ctx.orderBook->getMidPrice() << "\n";
+ oss << " Spread: " << ctx.orderBook->getSpread() << "\n";
+ oss << " Order count: " << ctx.orderBook->getOrderCount() << "\n";
+ totalOrders += ctx.orderBook->getOrderCount();
+ }
+
+ if (ctx.strategy) {
+ oss << ctx.strategy->getStatistics() << "\n";
+ totalPnL += ctx.strategy->getPnL();
+ totalPosition += ctx.strategy->getPosition();
+ }
+ }
+
+ oss << "--- AGGREGATE ---\n";
+ oss << " Instruments: " << m_instruments.size() << "\n";
+ oss << " Total PnL: " << totalPnL << "\n";
+ oss << " Total Position: " << totalPosition << "\n";
+ oss << " Total Orders: " << totalOrders << "\n";
+
+ return oss.str();
+}
+
+void InstrumentManager::createCheckpoints() {
+ std::lock_guard lock(m_mutex);
+ for (auto& [symbol, ctx] : m_instruments) {
+ if (ctx.orderBook) {
+ ctx.orderBook->createCheckpoint();
+ }
+ }
+}
+
+} // namespace instrument
+} // namespace pinnacle
diff --git a/core/instrument/InstrumentManager.h b/core/instrument/InstrumentManager.h
new file mode 100644
index 0000000..0e95359
--- /dev/null
+++ b/core/instrument/InstrumentManager.h
@@ -0,0 +1,152 @@
+#pragma once
+
+#include "../../exchange/simulator/ExchangeSimulator.h"
+#include "../../strategies/basic/BasicMarketMaker.h"
+#include "../../strategies/config/StrategyConfig.h"
+#include "../orderbook/LockFreeOrderBook.h"
+#include "../orderbook/OrderBook.h"
+
+#include
+#include
+#include
+#include
+#include
+
+namespace pinnacle {
+namespace instrument {
+
+/**
+ * @struct InstrumentConfig
+ * @brief Configuration for a single instrument
+ */
+struct InstrumentConfig {
+ std::string symbol{"BTC-USD"};
+ bool useLockFree{true};
+ bool enableML{false};
+ double baseSpreadBps{10.0};
+ double orderQuantity{0.01};
+ double maxPosition{10.0};
+};
+
+/**
+ * @struct InstrumentContext
+ * @brief Holds all per-instrument components
+ */
+struct InstrumentContext {
+ std::string symbol;
+ std::shared_ptr orderBook;
+ std::shared_ptr strategy;
+ std::shared_ptr simulator; // null in live mode
+ InstrumentConfig config;
+ bool running{false};
+};
+
+/**
+ * @class InstrumentManager
+ * @brief Central class to manage multiple {symbol, orderbook, strategy,
+ * simulator} tuples
+ *
+ * Manages the lifecycle of per-instrument components. Supports adding/removing
+ * instruments at runtime. Keeps single-symbol mode working for backward
+ * compatibility.
+ */
+class InstrumentManager {
+public:
+ InstrumentManager() = default;
+ ~InstrumentManager();
+
+ InstrumentManager(const InstrumentManager&) = delete;
+ InstrumentManager& operator=(const InstrumentManager&) = delete;
+
+ /**
+ * @brief Add an instrument with its own orderbook, strategy, and simulator
+ * @param config Instrument configuration
+ * @param mode Operating mode ("simulation", "live", "backtest")
+ * @return true if the instrument was added successfully
+ */
+ bool addInstrument(const InstrumentConfig& config, const std::string& mode);
+
+ /**
+ * @brief Remove an instrument by symbol (stops it first)
+ * @param symbol Trading symbol to remove
+ * @return true if the instrument was removed successfully
+ */
+ bool removeInstrument(const std::string& symbol);
+
+ /**
+ * @brief Start all registered instruments
+ * @return true if all instruments started successfully
+ */
+ bool startAll();
+
+ /**
+ * @brief Stop all registered instruments
+ * @return true if all instruments stopped successfully
+ */
+ bool stopAll();
+
+ /**
+ * @brief Start a specific instrument
+ * @param symbol Trading symbol
+ * @return true if started successfully
+ */
+ bool startInstrument(const std::string& symbol);
+
+ /**
+ * @brief Stop a specific instrument
+ * @param symbol Trading symbol
+ * @return true if stopped successfully
+ */
+ bool stopInstrument(const std::string& symbol);
+
+ /**
+ * @brief Get the context for a specific instrument
+ * @param symbol Trading symbol
+ * @return Pointer to InstrumentContext, or nullptr if not found
+ */
+ InstrumentContext* getContext(const std::string& symbol);
+
+ /**
+ * @brief Get the context for a specific instrument (const version)
+ * @param symbol Trading symbol
+ * @return Const pointer to InstrumentContext, or nullptr if not found
+ */
+ const InstrumentContext* getContext(const std::string& symbol) const;
+
+ /**
+ * @brief Get all registered symbols
+ * @return Vector of symbol strings
+ */
+ std::vector getSymbols() const;
+
+ /**
+ * @brief Get number of registered instruments
+ * @return Count of instruments
+ */
+ size_t getInstrumentCount() const;
+
+ /**
+ * @brief Check if a symbol is registered
+ * @param symbol Trading symbol
+ * @return true if the symbol is registered
+ */
+ bool hasInstrument(const std::string& symbol) const;
+
+ /**
+ * @brief Get aggregate statistics across all instruments
+ * @return Formatted statistics string
+ */
+ std::string getAggregateStatistics() const;
+
+ /**
+ * @brief Create checkpoints for all order books
+ */
+ void createCheckpoints();
+
+private:
+ mutable std::mutex m_mutex;
+ std::unordered_map m_instruments;
+};
+
+} // namespace instrument
+} // namespace pinnacle
diff --git a/core/instrument/ResourceAllocator.cpp b/core/instrument/ResourceAllocator.cpp
new file mode 100644
index 0000000..0f6ef84
--- /dev/null
+++ b/core/instrument/ResourceAllocator.cpp
@@ -0,0 +1,84 @@
+#include "ResourceAllocator.h"
+
+#include
+#include
+
+namespace pinnacle {
+namespace instrument {
+
+std::unordered_map
+ResourceAllocator::allocate(const std::vector& symbols) const {
+ std::unordered_map assignments;
+
+ int numCores = getAvailableCores();
+ int numInstruments = static_cast(symbols.size());
+
+ if (numInstruments == 0) {
+ return assignments;
+ }
+
+ if (numCores <= 0) {
+ spdlog::error("Invalid core count {}; defaulting to 1", numCores);
+ numCores = 1;
+ }
+
+ // Reserve core 0 for OS / main thread
+ int availableCores = std::max(1, numCores - 1);
+
+ // Each instrument ideally needs 2 cores (strategy + simulator)
+ int coresPerInstrument = std::max(1, availableCores / numInstruments);
+
+ int nextCore = 1; // Start from core 1
+
+ for (int i = 0; i < numInstruments; ++i) {
+ CoreAssignment assignment;
+ assignment.symbol = symbols[i];
+
+ // Assign strategy core (stay in range [1, numCores))
+ assignment.strategyCore = 1 + ((nextCore - 1) % availableCores);
+ nextCore++;
+
+ // Assign simulator core if we have enough cores
+ if (coresPerInstrument >= 2) {
+ assignment.simulatorCore = 1 + ((nextCore - 1) % availableCores);
+ nextCore++;
+ } else {
+ // Share core with strategy
+ assignment.simulatorCore = assignment.strategyCore;
+ }
+
+ // Higher priority for instruments listed first
+ assignment.priority = numInstruments - i;
+
+ assignments[symbols[i]] = assignment;
+
+ spdlog::info("[{}] Core assignment: strategy={} simulator={} priority={}",
+ symbols[i], assignment.strategyCore, assignment.simulatorCore,
+ assignment.priority);
+ }
+
+ return assignments;
+}
+
+int ResourceAllocator::getAvailableCores() const {
+ return utils::ThreadAffinity::getNumCores();
+}
+
+bool ResourceAllocator::applyAssignment(const CoreAssignment& assignment,
+ bool isStrategy) {
+ int core = isStrategy ? assignment.strategyCore : assignment.simulatorCore;
+ if (core < 0) {
+ return false;
+ }
+
+ bool result = utils::ThreadAffinity::pinToCore(core);
+
+ std::string threadType = isStrategy ? "strategy" : "simulator";
+ std::string threadName = assignment.symbol + "_" + threadType;
+ utils::ThreadAffinity::setThreadName(threadName);
+
+ return result;
+}
+
+} // namespace instrument
+} // namespace pinnacle
diff --git a/core/instrument/ResourceAllocator.h b/core/instrument/ResourceAllocator.h
new file mode 100644
index 0000000..484f480
--- /dev/null
+++ b/core/instrument/ResourceAllocator.h
@@ -0,0 +1,60 @@
+#pragma once
+
+#include "../utils/ThreadAffinity.h"
+
+#include
+#include
+#include
+
+namespace pinnacle {
+namespace instrument {
+
+/**
+ * @struct CoreAssignment
+ * @brief CPU core assignment for an instrument
+ */
+struct CoreAssignment {
+ std::string symbol;
+ int strategyCore{-1}; // Core for the strategy thread
+ int simulatorCore{-1}; // Core for the simulator thread
+ int priority{0}; // Thread priority hint (0 = normal)
+};
+
+/**
+ * @class ResourceAllocator
+ * @brief Assigns CPU cores and priorities to instruments based on count and
+ * available hardware
+ *
+ * Used by InstrumentManager on startup to distribute instruments across
+ * available cores for optimal performance.
+ */
+class ResourceAllocator {
+public:
+ ResourceAllocator() = default;
+
+ /**
+ * @brief Allocate cores for a set of instruments
+ * @param symbols List of instrument symbols
+ * @return Map of symbol -> CoreAssignment
+ */
+ std::unordered_map
+ allocate(const std::vector& symbols) const;
+
+ /**
+ * @brief Get available core count
+ * @return Number of hardware threads available
+ */
+ int getAvailableCores() const;
+
+ /**
+ * @brief Apply a core assignment to the calling thread
+ * @param assignment The assignment to apply
+ * @param isStrategy true for strategy thread, false for simulator
+ * @return true if affinity was set successfully
+ */
+ static bool applyAssignment(const CoreAssignment& assignment,
+ bool isStrategy);
+};
+
+} // namespace instrument
+} // namespace pinnacle
diff --git a/core/risk/RiskConfig.h b/core/risk/RiskConfig.h
index 702404a..9fc6081 100644
--- a/core/risk/RiskConfig.h
+++ b/core/risk/RiskConfig.h
@@ -36,6 +36,19 @@ struct RiskLimits {
uint32_t maxOrdersPerSecond{100};
};
+/**
+ * @struct PerSymbolLimits
+ * @brief Per-symbol risk limits (overrides global if set)
+ */
+struct PerSymbolLimits {
+ std::string symbol;
+ double maxPositionSize{0.0}; // 0 = use global
+ double maxDailyVolume{0.0}; // 0 = use global
+ double dailyLossLimit{0.0}; // 0 = use global
+ double maxOrderSize{0.0}; // 0 = use global
+ double maxNotionalExposure{0.0}; // 0 = use global
+};
+
/**
* @struct CircuitBreakerConfig
* @brief Configuration for the circuit breaker
@@ -96,6 +109,7 @@ struct RiskConfig {
CircuitBreakerConfig circuitBreaker;
VaRConfig var;
AlertConfig alerts;
+ std::vector perSymbolLimits;
/**
* @brief Load risk configuration from JSON
@@ -186,6 +200,22 @@ struct RiskConfig {
config.alerts.criticalThresholdPct = al.value(
"critical_threshold_pct", config.alerts.criticalThresholdPct);
}
+
+ if (rm.contains("per_symbol_limits") &&
+ rm["per_symbol_limits"].is_array()) {
+ for (const auto& psl : rm["per_symbol_limits"]) {
+ PerSymbolLimits limits;
+ limits.symbol = psl.value("symbol", std::string{});
+ limits.maxPositionSize = psl.value("max_position_size", 0.0);
+ limits.maxDailyVolume = psl.value("max_daily_volume", 0.0);
+ limits.dailyLossLimit = psl.value("daily_loss_limit", 0.0);
+ limits.maxOrderSize = psl.value("max_order_size", 0.0);
+ limits.maxNotionalExposure = psl.value("max_notional_exposure", 0.0);
+ if (!limits.symbol.empty()) {
+ config.perSymbolLimits.push_back(limits);
+ }
+ }
+ }
}
return config;
@@ -195,7 +225,7 @@ struct RiskConfig {
* @brief Serialize to JSON
*/
nlohmann::json toJson() const {
- return {
+ nlohmann::json result = {
{"risk_management",
{{"limits",
{{"max_position_size", limits.maxPositionSize},
@@ -233,6 +263,22 @@ struct RiskConfig {
{"max_history", alerts.maxAlertHistory},
{"warning_threshold_pct", alerts.warningThresholdPct},
{"critical_threshold_pct", alerts.criticalThresholdPct}}}}}};
+
+ // Add per-symbol limits
+ nlohmann::json pslArray = nlohmann::json::array();
+ for (const auto& psl : perSymbolLimits) {
+ pslArray.push_back({{"symbol", psl.symbol},
+ {"max_position_size", psl.maxPositionSize},
+ {"max_daily_volume", psl.maxDailyVolume},
+ {"daily_loss_limit", psl.dailyLossLimit},
+ {"max_order_size", psl.maxOrderSize},
+ {"max_notional_exposure", psl.maxNotionalExposure}});
+ }
+ if (!pslArray.empty()) {
+ result["risk_management"]["per_symbol_limits"] = pslArray;
+ }
+
+ return result;
}
};
diff --git a/core/risk/RiskManager.cpp b/core/risk/RiskManager.cpp
index 659db91..06dd0d4 100644
--- a/core/risk/RiskManager.cpp
+++ b/core/risk/RiskManager.cpp
@@ -2,6 +2,7 @@
#include "../utils/AuditLogger.h"
#include
+#include
#include
namespace pinnacle {
@@ -54,6 +55,13 @@ void RiskManager::initialize(const RiskLimits& limits) {
m_ordersThisSecond.store(0, std::memory_order_relaxed);
m_currentSecond.store(0, std::memory_order_relaxed);
+ // Clear per-symbol state so tests and re-initialization start fresh
+ {
+ std::unique_lock lock(m_symbolMutex);
+ m_symbolStates.clear();
+ m_symbolLimits.clear();
+ }
+
spdlog::info("RiskManager initialized - maxPos={} maxOrderSize={} "
"dailyLossLimit={} maxDrawdown={}%",
limits.maxPositionSize, limits.maxOrderSize,
@@ -114,11 +122,30 @@ RiskCheckResult RiskManager::checkOrder(OrderSide side, double price,
return RiskCheckResult::REJECTED_ORDER_SIZE_LIMIT;
}
- // 4. Position limit check
+ // 4. Position limit check (per-symbol if registered, else global)
double currentPos = m_position.load(std::memory_order_relaxed);
double projectedPos = (side == OrderSide::BUY) ? (currentPos + quantity)
: (currentPos - quantity);
double maxPos = m_limits.maxPositionSize;
+
+ // Check per-symbol position limit if available
+ {
+ std::shared_lock lock(m_symbolMutex);
+ auto limIt = m_symbolLimits.find(symbol);
+ auto stateIt = m_symbolStates.find(symbol);
+ if (limIt != m_symbolLimits.end() && limIt->second.maxPositionSize > 0.0 &&
+ stateIt != m_symbolStates.end()) {
+ double symPos = stateIt->second->position.load(std::memory_order_relaxed);
+ double symProjected =
+ (side == OrderSide::BUY) ? (symPos + quantity) : (symPos - quantity);
+ if (std::abs(symProjected) > limIt->second.maxPositionSize) {
+ AUDIT_ORDER_ACTIVITY("system", "", "rejected_position_limit", symbol,
+ false);
+ return RiskCheckResult::REJECTED_POSITION_LIMIT;
+ }
+ }
+ }
+
if (std::abs(projectedPos) > maxPos) {
AUDIT_ORDER_ACTIVITY("system", "", "rejected_position_limit", symbol,
false);
@@ -213,6 +240,36 @@ void RiskManager::onFill(OrderSide side, double price, double quantity,
m_netExposure.store(net, std::memory_order_release);
}
+ // Update per-symbol state if registered (grow-only map — pointer is stable)
+ {
+ std::shared_lock lock(m_symbolMutex);
+ auto symIt = m_symbolStates.find(symbol);
+ if (symIt != m_symbolStates.end()) {
+ auto& ss = *symIt->second;
+
+ double sPrev = ss.position.load(std::memory_order_relaxed);
+ double sNew;
+ do {
+ sNew = sPrev + delta;
+ } while (!ss.position.compare_exchange_weak(
+ sPrev, sNew, std::memory_order_release, std::memory_order_relaxed));
+
+ double vPrev = ss.dailyVolume.load(std::memory_order_relaxed);
+ double vNew;
+ do {
+ vNew = vPrev + quantity;
+ } while (!ss.dailyVolume.compare_exchange_weak(
+ vPrev, vNew, std::memory_order_release, std::memory_order_relaxed));
+
+ double ePrev = ss.exposure.load(std::memory_order_relaxed);
+ double eNew;
+ do {
+ eNew = ePrev + notional;
+ } while (!ss.exposure.compare_exchange_weak(
+ ePrev, eNew, std::memory_order_release, std::memory_order_relaxed));
+ }
+ }
+
spdlog::debug("Fill: {} {} {} @ {} | pos={} vol={} notional={}",
(side == OrderSide::BUY) ? "BUY" : "SELL", quantity, symbol,
price, newPos, newVol, notional);
@@ -567,6 +624,53 @@ void RiskManager::checkDailyReset() {
}
}
+// ---------------------------------------------------------------------------
+// Per-symbol tracking
+// ---------------------------------------------------------------------------
+void RiskManager::registerSymbol(const std::string& symbol) {
+ std::unique_lock lock(m_symbolMutex);
+ if (m_symbolStates.count(symbol)) {
+ return; // already registered
+ }
+ m_symbolStates[symbol] = std::make_shared(symbol);
+ spdlog::info("Registered per-symbol risk tracking for {}", symbol);
+}
+
+SymbolRiskState* RiskManager::getSymbolState(const std::string& symbol) {
+ std::shared_lock lock(m_symbolMutex);
+ auto it = m_symbolStates.find(symbol);
+ if (it != m_symbolStates.end()) {
+ return it->second.get();
+ }
+ return nullptr;
+}
+
+const SymbolRiskState*
+RiskManager::getSymbolState(const std::string& symbol) const {
+ std::shared_lock lock(m_symbolMutex);
+ auto it = m_symbolStates.find(symbol);
+ if (it != m_symbolStates.end()) {
+ return it->second.get();
+ }
+ return nullptr;
+}
+
+void RiskManager::setSymbolLimits(const PerSymbolLimits& limits) {
+ std::unique_lock lock(m_symbolMutex);
+ m_symbolLimits[limits.symbol] = limits;
+ spdlog::info("Set per-symbol limits for {}", limits.symbol);
+}
+
+const PerSymbolLimits*
+RiskManager::getSymbolLimits(const std::string& symbol) const {
+ std::shared_lock lock(m_symbolMutex);
+ auto it = m_symbolLimits.find(symbol);
+ if (it != m_symbolLimits.end()) {
+ return &it->second;
+ }
+ return nullptr;
+}
+
// ---------------------------------------------------------------------------
// Utility
// ---------------------------------------------------------------------------
diff --git a/core/risk/RiskManager.h b/core/risk/RiskManager.h
index e2663a3..69bbd75 100644
--- a/core/risk/RiskManager.h
+++ b/core/risk/RiskManager.h
@@ -7,9 +7,12 @@
#include
#include
#include
+#include
#include
+#include
#include
#include
+#include
namespace pinnacle {
namespace risk {
@@ -52,6 +55,28 @@ struct RiskState {
uint64_t currentSecond{0};
};
+/**
+ * @struct SymbolRiskState
+ * @brief Per-symbol atomic tracking for position, PnL, and volume
+ *
+ * Uses atomics so the hot path can read without locks.
+ * The map holding these is grow-only so readers are safe.
+ */
+struct SymbolRiskState {
+ std::string symbol;
+ std::atomic position{0.0};
+ std::atomic dailyPnL{0.0};
+ std::atomic dailyVolume{0.0};
+ std::atomic exposure{0.0};
+
+ SymbolRiskState() = default;
+ explicit SymbolRiskState(const std::string& sym) : symbol(sym) {}
+
+ // Non-copyable due to atomics — use shared_ptr
+ SymbolRiskState(const SymbolRiskState&) = delete;
+ SymbolRiskState& operator=(const SymbolRiskState&) = delete;
+};
+
/**
* @class RiskManager
* @brief Singleton risk manager providing pre-trade checks and position
@@ -60,6 +85,9 @@ struct RiskState {
* The hot path (checkOrder) is fully lock-free, relying only on atomic loads.
* State mutations (onFill, onPnLUpdate) use atomic stores and acquire the mutex
* only when complex multi-field consistency is required.
+ *
+ * Per-symbol tracking is grow-only: once a symbol is registered, its
+ * SymbolRiskState pointer is stable so reads need no lock.
*/
class RiskManager {
public:
@@ -143,6 +171,35 @@ class RiskManager {
double getPositionUtilization() const;
double getDailyLossUtilization() const;
+ // --- Per-symbol tracking ---
+
+ /**
+ * @brief Register a symbol for per-symbol risk tracking
+ * @param symbol Trading symbol
+ */
+ void registerSymbol(const std::string& symbol);
+
+ /**
+ * @brief Get per-symbol risk state (nullptr if not registered)
+ */
+ SymbolRiskState* getSymbolState(const std::string& symbol);
+
+ /**
+ * @brief Get per-symbol risk state (const, nullptr if not registered)
+ */
+ const SymbolRiskState* getSymbolState(const std::string& symbol) const;
+
+ /**
+ * @brief Set per-symbol limits (overrides global for that symbol)
+ * @param limits Per-symbol limit configuration
+ */
+ void setSymbolLimits(const PerSymbolLimits& limits);
+
+ /**
+ * @brief Get per-symbol limits (returns nullptr if not set)
+ */
+ const PerSymbolLimits* getSymbolLimits(const std::string& symbol) const;
+
/**
* @brief Update risk limits at runtime
* @param limits New risk limits
@@ -205,6 +262,13 @@ class RiskManager {
std::string m_haltReason;
uint64_t m_dailyResetTime{0};
+ // Per-symbol state (grow-only map — reads safe without lock once inserted)
+ std::unordered_map>
+ m_symbolStates;
+ std::unordered_map m_symbolLimits;
+ mutable std::shared_mutex
+ m_symbolMutex; // shared for reads, exclusive for writes
+
// Hedge state
std::mutex m_hedgeMutex;
HedgeCallback m_hedgeCallback;
diff --git a/core/utils/LockFreeOrderBook.cpp b/core/utils/LockFreeOrderBook.cpp
index 8d75022..2783ce6 100644
--- a/core/utils/LockFreeOrderBook.cpp
+++ b/core/utils/LockFreeOrderBook.cpp
@@ -27,92 +27,88 @@ LockFreePriceLevel::~LockFreePriceLevel() {
}
}
-void LockFreePriceLevel::updateTotalQuantity() {
- double total = 0.0;
- OrderNode* current = m_head.load(std::memory_order_acquire)
- ->next.load(std::memory_order_acquire);
-
- while (current) {
- if (current->order) {
- total += current->order->getRemainingQuantity();
- }
- current = current->next.load(std::memory_order_acquire);
- }
-
- m_totalQuantity.store(total, std::memory_order_release);
-}
-
bool LockFreePriceLevel::addOrder(std::shared_ptr order) {
if (!order) {
return false;
}
- // Use exclusive lock to prevent readers during structure modification
+ double qty = order->getRemainingQuantity();
+
+ // Use exclusive lock — direct append, no CAS loop needed
std::unique_lock lock(m_nodeAccessMutex);
- // Create a new node
OrderNode* newNode = new OrderNode(std::move(order));
- // Add to the end of the list using a Michael-Scott queue approach
- while (true) {
- OrderNode* tail = m_tail.load(std::memory_order_acquire);
- OrderNode* next = tail->next.load(std::memory_order_acquire);
-
- // Check if tail is still the last node
- if (tail == m_tail.load(std::memory_order_acquire)) {
- if (next == nullptr) {
- // Tail is pointing to the last node, try to append the new node
- if (tail->next.compare_exchange_weak(next, newNode,
- std::memory_order_release,
- std::memory_order_relaxed)) {
- // Successfully appended the new node, try to update the tail
- m_tail.compare_exchange_strong(tail, newNode,
- std::memory_order_release,
- std::memory_order_relaxed);
- m_orderCount.fetch_add(1, std::memory_order_release);
- updateTotalQuantity();
- return true;
- }
- } else {
- // Tail is not pointing to the last node, try to help advance it
- m_tail.compare_exchange_strong(tail, next, std::memory_order_release,
- std::memory_order_relaxed);
- }
- }
- }
+ // Direct append: we hold exclusive lock so no concurrent modification
+ OrderNode* tail = m_tail.load(std::memory_order_relaxed);
+ tail->next.store(newNode, std::memory_order_relaxed);
+ m_tail.store(newNode, std::memory_order_relaxed);
+
+ m_orderCount.fetch_add(1, std::memory_order_release);
+
+ // O(1) quantity update instead of walking the list
+ double prev = m_totalQuantity.load(std::memory_order_relaxed);
+ m_totalQuantity.store(prev + qty, std::memory_order_release);
+
+ return true;
}
bool LockFreePriceLevel::removeOrder(const std::string& orderId) {
- // Use exclusive lock to prevent readers from accessing during modification
+ // Use exclusive lock — physically unlink the node
std::unique_lock lock(m_nodeAccessMutex);
- OrderNode* prev = m_head.load(std::memory_order_acquire);
- OrderNode* curr = prev->next.load(std::memory_order_acquire);
+ OrderNode* prev = m_head.load(std::memory_order_relaxed);
+ OrderNode* curr = prev->next.load(std::memory_order_relaxed);
- bool found = false;
while (curr != nullptr) {
- // Find the node to remove
if (curr->order && curr->order->getOrderId() == orderId) {
- found = true;
+ double qty = curr->order->getRemainingQuantity();
+
+ // Physical unlink: prev->next = curr->next
+ OrderNode* next = curr->next.load(std::memory_order_relaxed);
+ prev->next.store(next, std::memory_order_relaxed);
+
+ // Update tail if we removed the last node
+ if (m_tail.load(std::memory_order_relaxed) == curr) {
+ m_tail.store(prev, std::memory_order_relaxed);
+ }
+
+ delete curr;
- // Logical deletion: null out the order pointer
- // This prevents use-after-free while allowing safe traversal
- // The Order object will be automatically cleaned up by shared_ptr
- curr->order = nullptr;
m_orderCount.fetch_sub(1, std::memory_order_release);
- break;
+
+ // O(1) quantity update
+ double prevTotal = m_totalQuantity.load(std::memory_order_relaxed);
+ m_totalQuantity.store(std::max(0.0, prevTotal - qty),
+ std::memory_order_release);
+
+ return true;
}
prev = curr;
- curr = curr->next.load(std::memory_order_acquire);
+ curr = curr->next.load(std::memory_order_relaxed);
}
- // Update total quantity if an order was removed
- if (found) {
- updateTotalQuantity();
+ return false;
+}
+
+void LockFreePriceLevel::updateTotalQuantity() {
+ // Kept for compatibility but now rarely needed — add/remove do O(1) updates.
+ // This full-scan version can be used if quantities change externally.
+ std::shared_lock lock(m_nodeAccessMutex);
+
+ double total = 0.0;
+ OrderNode* current = m_head.load(std::memory_order_acquire)
+ ->next.load(std::memory_order_acquire);
+
+ while (current) {
+ if (current->order) {
+ total += current->order->getRemainingQuantity();
+ }
+ current = current->next.load(std::memory_order_acquire);
}
- return found;
+ m_totalQuantity.store(total, std::memory_order_release);
}
std::vector> LockFreePriceLevel::getOrders() const {
@@ -140,6 +136,8 @@ std::vector> LockFreePriceLevel::getOrders() const {
std::shared_ptr
LockFreePriceLevel::findOrder(const std::string& orderId) const {
+ std::shared_lock lock(m_nodeAccessMutex);
+
OrderNode* current = m_head.load(std::memory_order_acquire)
->next.load(std::memory_order_acquire);
@@ -155,6 +153,8 @@ LockFreePriceLevel::findOrder(const std::string& orderId) const {
void LockFreePriceLevel::forEachOrder(
const std::function)>& func) const {
+ std::shared_lock lock(m_nodeAccessMutex);
+
OrderNode* current = m_head.load(std::memory_order_acquire)
->next.load(std::memory_order_acquire);
@@ -170,8 +170,15 @@ void LockFreePriceLevel::forEachOrder(
LockFreeOrderMap::ShardGuard::ShardGuard(std::atomic_flag& lock)
: m_lock(lock) {
+ // Spin with pause hint to reduce contention
while (m_lock.test_and_set(std::memory_order_acquire)) {
- // Spin until we acquire the lock
+#if defined(_MSC_VER) && (defined(_M_X64) || defined(_M_IX86))
+ _mm_pause();
+#elif defined(__x86_64__) || defined(__i386__)
+ __builtin_ia32_pause();
+#elif defined(__aarch64__) || defined(_M_ARM64)
+ asm volatile("yield" ::: "memory");
+#endif
}
}
@@ -258,12 +265,7 @@ bool LockFreeOrderBook::addOrder(std::shared_ptr order) {
return false;
}
- // Check if the order already exists
- if (m_orders.contains(order->getOrderId())) {
- return false;
- }
-
- // Add to the order map
+ // Single-step insert (returns false if already exists — no separate contains)
if (!m_orders.insert(order->getOrderId(), order)) {
return false;
}
diff --git a/core/utils/ObjectPool.h b/core/utils/ObjectPool.h
new file mode 100644
index 0000000..8f52050
--- /dev/null
+++ b/core/utils/ObjectPool.h
@@ -0,0 +1,147 @@
+#pragma once
+
+#include
+#include
+#include
+#include
+#include
+#include
+
+namespace pinnacle {
+namespace utils {
+
+/**
+ * @class ObjectPool
+ * @brief Thread-safe object pool template with custom deleter recycling
+ *
+ * Pre-allocates objects and returns them via shared_ptr with a custom deleter
+ * that recycles objects back to the pool instead of destroying them.
+ *
+ * @tparam T The type of object to pool
+ */
+template class ObjectPool {
+public:
+ /**
+ * @brief Construct pool with initial capacity
+ * @param initialSize Number of objects to pre-allocate
+ */
+ explicit ObjectPool(size_t initialSize = 64) {
+ m_pool.reserve(initialSize);
+ for (size_t i = 0; i < initialSize; ++i) {
+ m_pool.push_back(std::make_unique());
+ }
+ m_totalAllocated.store(initialSize, std::memory_order_relaxed);
+ }
+
+ /**
+ * @brief Construct pool with a factory function
+ * @param initialSize Number of objects to pre-allocate
+ * @param factory Function to create new objects
+ */
+ ObjectPool(size_t initialSize, std::function()> factory)
+ : m_factory(std::move(factory)) {
+ m_pool.reserve(initialSize);
+ for (size_t i = 0; i < initialSize; ++i) {
+ m_pool.push_back(m_factory ? m_factory() : std::make_unique());
+ }
+ m_totalAllocated.store(initialSize, std::memory_order_relaxed);
+ }
+
+ ~ObjectPool() { m_alive->store(false, std::memory_order_release); }
+
+ ObjectPool(const ObjectPool&) = delete;
+ ObjectPool& operator=(const ObjectPool&) = delete;
+
+ /**
+ * @brief Acquire an object from the pool
+ *
+ * Returns a shared_ptr with a custom deleter that recycles the object back
+ * to the pool when the last reference is released.
+ *
+ * @return shared_ptr to a pooled object
+ */
+ std::shared_ptr acquire() {
+ T* raw = nullptr;
+
+ {
+ std::lock_guard lock(m_mutex);
+ if (!m_pool.empty()) {
+ raw = m_pool.back().release();
+ m_pool.pop_back();
+ }
+ }
+
+ if (!raw) {
+ // Pool exhausted — allocate a new object
+ raw = m_factory ? m_factory().release() : new T();
+ m_totalAllocated.fetch_add(1, std::memory_order_relaxed);
+ }
+
+ m_acquireCount.fetch_add(1, std::memory_order_relaxed);
+
+ // Return with custom deleter that recycles back to pool (or deletes if pool
+ // is destroyed)
+ auto alive = m_alive;
+ return std::shared_ptr(raw, [this, alive](T* obj) {
+ if (alive->load(std::memory_order_acquire)) {
+ recycle(obj);
+ } else {
+ delete obj;
+ }
+ });
+ }
+
+ /**
+ * @brief Get current number of available objects in the pool
+ */
+ size_t available() const {
+ std::lock_guard lock(m_mutex);
+ return m_pool.size();
+ }
+
+ /**
+ * @brief Get total number of objects ever allocated by this pool
+ */
+ size_t totalAllocated() const {
+ return m_totalAllocated.load(std::memory_order_relaxed);
+ }
+
+ /**
+ * @brief Get total number of acquire() calls
+ */
+ size_t acquireCount() const {
+ return m_acquireCount.load(std::memory_order_relaxed);
+ }
+
+ /**
+ * @brief Get total number of recycle operations
+ */
+ size_t recycleCount() const {
+ return m_recycleCount.load(std::memory_order_relaxed);
+ }
+
+private:
+ void recycle(T* obj) {
+ if (!obj) {
+ return;
+ }
+
+ m_recycleCount.fetch_add(1, std::memory_order_relaxed);
+
+ std::lock_guard lock(m_mutex);
+ m_pool.push_back(std::unique_ptr(obj));
+ }
+
+ mutable std::mutex m_mutex;
+ std::vector> m_pool;
+ std::function()> m_factory;
+
+ std::shared_ptr> m_alive =
+ std::make_shared>(true);
+ std::atomic m_totalAllocated{0};
+ std::atomic m_acquireCount{0};
+ std::atomic m_recycleCount{0};
+};
+
+} // namespace utils
+} // namespace pinnacle
diff --git a/core/utils/ThreadAffinity.cpp b/core/utils/ThreadAffinity.cpp
new file mode 100644
index 0000000..5296642
--- /dev/null
+++ b/core/utils/ThreadAffinity.cpp
@@ -0,0 +1,110 @@
+#include "ThreadAffinity.h"
+
+#include
+
+#ifdef __APPLE__
+#include
+#include
+#include
+#include
+#elif defined(__linux__)
+#include
+#include
+#endif
+
+namespace pinnacle {
+namespace utils {
+
+bool ThreadAffinity::pinToCore(int coreId) {
+ if (coreId < 0 || coreId >= getNumCores()) {
+ spdlog::warn("Invalid core ID {} (available: 0-{})", coreId,
+ getNumCores() - 1);
+ return false;
+ }
+
+#ifdef __APPLE__
+ // macOS uses thread affinity tags (hints, not hard pinning)
+ thread_affinity_policy_data_t policy;
+ policy.affinity_tag = coreId + 1; // 0 means no affinity
+ mach_port_t thread_port = mach_thread_self();
+ kern_return_t ret = thread_policy_set(
+ thread_port, THREAD_AFFINITY_POLICY,
+ reinterpret_cast(&policy), THREAD_AFFINITY_POLICY_COUNT);
+ mach_port_deallocate(mach_task_self(), thread_port);
+ if (ret != KERN_SUCCESS) {
+ spdlog::warn("Failed to set thread affinity to core {}", coreId);
+ return false;
+ }
+ return true;
+
+#elif defined(__linux__)
+ cpu_set_t cpuset;
+ CPU_ZERO(&cpuset);
+ CPU_SET(coreId, &cpuset);
+
+ int ret = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
+ if (ret != 0) {
+ spdlog::warn("Failed to set thread affinity to core {}: {}", coreId,
+ strerror(ret));
+ return false;
+ }
+ return true;
+
+#else
+ spdlog::warn("Thread affinity not supported on this platform");
+ return false;
+#endif
+}
+
+void ThreadAffinity::setThreadName(const std::string& name) {
+#ifdef __APPLE__
+ // macOS: only the calling thread can set its own name
+ pthread_setname_np(name.substr(0, 63).c_str());
+#elif defined(__linux__)
+ // Linux: name limited to 16 chars including null terminator
+ pthread_setname_np(pthread_self(), name.substr(0, 15).c_str());
+#endif
+}
+
+int ThreadAffinity::getNumCores() {
+ int cores = static_cast(std::thread::hardware_concurrency());
+ return cores > 0 ? cores : 1;
+}
+
+bool ThreadAffinity::pinThreadToCore(std::thread& thread, int coreId) {
+ if (!thread.joinable()) {
+ return false;
+ }
+
+ if (coreId < 0 || coreId >= getNumCores()) {
+ spdlog::warn("Invalid core ID {} (available: 0-{})", coreId,
+ getNumCores() - 1);
+ return false;
+ }
+
+#ifdef __APPLE__
+ // macOS doesn't support pinning another thread from outside easily.
+ // The thread itself should call pinToCore().
+ spdlog::debug("macOS: thread should call pinToCore() itself");
+ return false;
+
+#elif defined(__linux__)
+ cpu_set_t cpuset;
+ CPU_ZERO(&cpuset);
+ CPU_SET(coreId, &cpuset);
+
+ int ret = pthread_setaffinity_np(thread.native_handle(), sizeof(cpu_set_t),
+ &cpuset);
+ if (ret != 0) {
+ spdlog::warn("Failed to pin thread to core {}: {}", coreId, strerror(ret));
+ return false;
+ }
+ return true;
+
+#else
+ return false;
+#endif
+}
+
+} // namespace utils
+} // namespace pinnacle
diff --git a/core/utils/ThreadAffinity.h b/core/utils/ThreadAffinity.h
new file mode 100644
index 0000000..9d8fa0a
--- /dev/null
+++ b/core/utils/ThreadAffinity.h
@@ -0,0 +1,44 @@
+#pragma once
+
+#include
+#include
+
+namespace pinnacle {
+namespace utils {
+
+/**
+ * @class ThreadAffinity
+ * @brief Platform-specific CPU pinning and thread naming utilities
+ */
+class ThreadAffinity {
+public:
+ /**
+ * @brief Pin the calling thread to a specific CPU core
+ * @param coreId The CPU core to pin to (0-indexed)
+ * @return true if pinning succeeded
+ */
+ static bool pinToCore(int coreId);
+
+ /**
+ * @brief Set the name of the calling thread (for debugging/profiling)
+ * @param name Thread name (will be truncated to platform limit)
+ */
+ static void setThreadName(const std::string& name);
+
+ /**
+ * @brief Get the number of available CPU cores
+ * @return Number of hardware threads
+ */
+ static int getNumCores();
+
+ /**
+ * @brief Pin a given std::thread to a specific core
+ * @param thread Thread to pin
+ * @param coreId CPU core to pin to
+ * @return true if pinning succeeded
+ */
+ static bool pinThreadToCore(std::thread& thread, int coreId);
+};
+
+} // namespace utils
+} // namespace pinnacle
diff --git a/docs/CROSS_EXCHANGE_ARBITRAGE.md b/docs/CROSS_EXCHANGE_ARBITRAGE.md
new file mode 100644
index 0000000..98872d1
--- /dev/null
+++ b/docs/CROSS_EXCHANGE_ARBITRAGE.md
@@ -0,0 +1,144 @@
+# Cross-Exchange Arbitrage
+
+## Overview
+
+PinnacleMM includes a cross-exchange arbitrage system that detects and optionally executes price discrepancies across multiple venues. The system consists of two components:
+
+- **ArbitrageDetector**: Continuously scans venue quotes for profitable spreads
+- **ArbitrageExecutor**: Submits simultaneous buy/sell orders to capture the spread
+
+## Quick Start
+
+```bash
+# Enable arbitrage detection in dry-run mode (logs opportunities, no execution)
+cd build && ./pinnaclemm --mode simulation --symbols BTC-USD --enable-arbitrage --arb-dry-run
+
+# Customize minimum spread threshold
+cd build && ./pinnaclemm --mode simulation --enable-arbitrage --arb-min-spread 10.0 --arb-dry-run
+```
+
+## CLI Flags
+
+| Flag | Description | Default |
+|------|-------------|---------|
+| `--enable-arbitrage` | Enable the arbitrage detector | `false` |
+| `--arb-min-spread` | Minimum spread in basis points | `5.0` |
+| `--arb-dry-run` | Log opportunities without executing | `true` |
+
+## Configuration
+
+### `config/default_config.json`
+
+```json
+{
+ "arbitrage": {
+ "enabled": false,
+ "minSpreadBps": 5.0,
+ "minProfitUsd": 1.0,
+ "maxStalenessMs": 500,
+ "scanIntervalMs": 10,
+ "dryRun": true,
+ "venues": ["coinbase", "kraken"],
+ "venueFees": {
+ "coinbase": 0.001,
+ "kraken": 0.0016
+ }
+ }
+}
+```
+
+### Configuration Fields
+
+| Field | Type | Description |
+|-------|------|-------------|
+| `enabled` | bool | Master switch for arbitrage |
+| `minSpreadBps` | double | Minimum net spread (after fees) in bps to consider |
+| `minProfitUsd` | double | Minimum estimated profit in USD |
+| `maxStalenessMs` | uint64 | Maximum quote age before it's considered stale |
+| `scanIntervalMs` | uint64 | How often the detector scans for opportunities |
+| `dryRun` | bool | If true, log opportunities without executing |
+| `venues` | string[] | List of venue identifiers |
+| `venueFees` | map | Per-venue trading fee as a fraction (e.g., 0.001 = 0.1%) |
+
+## Architecture
+
+### Opportunity Detection
+
+The `ArbitrageDetector` maintains a per-venue, per-symbol quote cache. On each scan cycle:
+
+1. For each symbol, enumerate all venue pairs
+2. For each pair, check if `venue_A.bid - venue_B.ask > fees`
+3. Apply staleness filtering (reject quotes older than `maxStalenessMs`)
+4. Apply fee adjustment: `net_spread = bid - ask - (bid * fee_sell) - (ask * fee_buy)`
+5. Convert to basis points: `spreadBps = (net_spread / ask) * 10000`
+6. Filter by `minSpreadBps` and `minProfitUsd`
+
+### Data Flow
+
+```
+Venue WebSocket Feeds
+ │
+ ├─ updateVenueQuote("coinbase", "BTC-USD", bid, bidSize, ask, askSize, ts)
+ ├─ updateVenueQuote("kraken", "BTC-USD", bid, bidSize, ask, askSize, ts)
+ │
+ ▼
+ArbitrageDetector (background scan thread)
+ │
+ ├─ detectOpportunities("BTC-USD")
+ │ └─ Compare all venue pairs, apply fees, filter
+ │
+ ├─ opportunityCallback(ArbitrageOpportunity)
+ │
+ ▼
+ArbitrageExecutor
+ ├─ Dry-run: log opportunity
+ └─ Live: submit buy + sell via OrderRouter
+```
+
+### ArbitrageOpportunity
+
+```cpp
+struct ArbitrageOpportunity {
+ std::string symbol;
+ std::string buyVenue; // Venue with lowest ask
+ std::string sellVenue; // Venue with highest bid
+ double buyPrice; // Best ask at buy venue
+ double sellPrice; // Best bid at sell venue
+ double spread; // Raw spread (sellPrice - buyPrice)
+ double spreadBps; // Net spread in basis points (after fees)
+ double maxQuantity; // Min of buy/sell available size
+ double estimatedProfit; // spreadBps/10000 * buyPrice * quantity
+ uint64_t detectedAt; // Nanosecond timestamp
+};
+```
+
+### ArbitrageExecutor
+
+The executor supports two modes:
+
+- **Dry-run** (`dryRun: true`): Simulates execution, returns synthetic fill results
+- **Live** (`dryRun: false`): Uses an `OrderSubmitCallback` to route orders through `OrderRouter`
+
+### Risk Controls
+
+- **Staleness filter**: Quotes older than `maxStalenessMs` are rejected
+- **Fee adjustment**: All opportunities are evaluated net of trading fees
+- **Minimum thresholds**: Both `minSpreadBps` and `minProfitUsd` must be met
+- **Dry-run default**: Production deployments should start in dry-run mode
+
+## Testing
+
+```bash
+cd build
+./arbitrage_detector_tests # 8 tests
+```
+
+Test cases cover:
+- Opportunity detection with sufficient spread
+- Fee adjustment eliminating thin spreads
+- Staleness filtering of old quotes
+- Minimum spread threshold enforcement
+- Dry-run execution simulation
+- Opportunity callback invocation
+- Statistics reporting
+- Single-venue (no self-arbitrage)
diff --git a/docs/CROSS_MARKET_CORRELATION.md b/docs/CROSS_MARKET_CORRELATION.md
new file mode 100644
index 0000000..7e596ab
--- /dev/null
+++ b/docs/CROSS_MARKET_CORRELATION.md
@@ -0,0 +1,139 @@
+# Cross-Market Correlation Analysis
+
+## Overview
+
+The `CrossMarketCorrelation` engine provides statistical models to detect when one instrument's price movement predicts another's. It implements four analysis methods:
+
+1. **Pearson Correlation**: Full-sample linear correlation of log returns
+2. **Rolling Correlation**: Sliding-window correlation for regime sensitivity
+3. **Lead-Lag Analysis**: Detects if one instrument's returns predict another's with a time offset
+4. **Engle-Granger Cointegration**: Tests whether two price series share a long-run equilibrium
+
+## Integration with MLEnhancedMarketMaker
+
+When cross-market signals are active, the `MLEnhancedMarketMaker` adjusts its spread based on the expected move magnitude and confidence from the leading instrument:
+
+```cpp
+// Wire up in main.cpp or strategy initialization
+CrossMarketCorrelation crossMarket(config);
+crossMarket.addPair("BTC-USD", "ETH-USD");
+
+mlStrategy.setCrossMarketCorrelation(&crossMarket);
+```
+
+The spread adjustment widens when a correlated leader instrument shows a large expected move, protecting against adverse selection.
+
+## Configuration
+
+```cpp
+struct CrossMarketConfig {
+ size_t returnWindowSize{100}; // Window for return calculations
+ size_t rollingWindowSize{30}; // Rolling correlation window
+ int maxLagBars{10}; // Max lead-lag offset to test
+ double minCorrelation{0.5}; // Min |correlation| to consider
+ double signalThreshold{0.3}; // Min signal strength to emit
+ double cointegrationPValue{0.05}; // Significance level
+};
+```
+
+In `MLEnhancedMarketMaker::MLConfig`:
+
+```cpp
+bool enableCrossMarketSignals{false};
+double crossMarketSpreadAdjustmentWeight{0.2};
+```
+
+## Statistical Methods
+
+### Pearson Correlation
+
+Computed on log returns (not raw prices) to improve stationarity (formal tests like ADF may still be needed):
+
+```
+r_i = log(P_i / P_{i-1})
+corr(A, B) = Cov(r_A, r_B) / (Std(r_A) * Std(r_B))
+```
+
+Range: [-1, 1]. Values near +1 indicate co-movement; near -1 indicate inverse movement.
+
+### Rolling Correlation
+
+Same as Pearson but computed over the most recent `rollingWindowSize` returns. This captures regime-dependent correlation shifts.
+
+### Lead-Lag Analysis
+
+For each lag offset `k` in `[-maxLagBars, +maxLagBars]`, compute:
+
+```
+corr(r_A[t], r_B[t+k])
+```
+
+The lag with the highest absolute correlation is reported. Positive `leadLagBarsA` means A leads B.
+
+### Engle-Granger Cointegration
+
+1. Regress prices: `P_A = alpha + beta * P_B + epsilon`
+2. Compute residuals `e_t = P_A_t - alpha - beta * P_B_t`
+3. Run simplified ADF test on residuals: `delta_e_t = gamma * e_{t-1} + noise`
+4. If t-statistic for gamma < -3.37 (5% critical value), the pair is cointegrated
+
+Cointegrated pairs have a mean-reverting spread, making them candidates for pairs trading.
+
+## Signal Generation
+
+Signals are generated when:
+1. A pair has `|leadLagCoefficient| >= minCorrelation`
+2. The leader has a non-zero lag offset (`leadLagBarsA != 0`)
+3. The leader's recent return multiplied by the coefficient exceeds `signalThreshold`
+
+```cpp
+struct CrossMarketSignal {
+ std::string leadSymbol; // The instrument that moves first
+ std::string lagSymbol; // The instrument expected to follow
+ double signalStrength; // [0, 1]
+ double expectedMove; // Expected % move in lag symbol
+ double confidence; // Based on rolling correlation
+ uint64_t timestamp;
+};
+```
+
+## Usage
+
+```cpp
+CrossMarketConfig cfg;
+cfg.returnWindowSize = 100;
+cfg.minCorrelation = 0.5;
+
+CrossMarketCorrelation engine(cfg);
+engine.addPair("BTC-USD", "ETH-USD");
+
+// Feed price observations (from market data feeds)
+engine.addPriceObservation("BTC-USD", 50000.0, 1000.0, timestampNs);
+engine.addPriceObservation("ETH-USD", 3000.0, 5000.0, timestampNs);
+
+// Query correlation
+auto corr = engine.getCorrelation("BTC-USD", "ETH-USD");
+// corr.pearsonCorrelation, corr.leadLagBarsA, corr.isCointegrated
+
+// Get active trading signals
+auto signals = engine.getActiveSignals();
+
+// Get statistics
+std::string stats = engine.getStatistics();
+```
+
+## Testing
+
+```bash
+cd build
+./cross_market_correlation_tests # 7 tests
+```
+
+Test cases:
+- Perfectly correlated series (Pearson ~ 1.0)
+- Inversely correlated series (Pearson < -0.8)
+- Lead-lag detection with 2-bar offset
+- Cointegration detection (linear relationship + noise)
+- Uncorrelated data (low |correlation|)
+- Signal generation from lead-lag patterns
+- Statistics output
diff --git a/docs/MULTI_INSTRUMENT_GUIDE.md b/docs/MULTI_INSTRUMENT_GUIDE.md
new file mode 100644
index 0000000..712a73d
--- /dev/null
+++ b/docs/MULTI_INSTRUMENT_GUIDE.md
@@ -0,0 +1,127 @@
+# Multi-Instrument Trading Guide
+
+## Overview
+
+PinnacleMM supports simultaneous trading across multiple instruments, each with its own order book, strategy instance, and optional exchange simulator. The `InstrumentManager` class orchestrates per-instrument lifecycles while sharing global risk management and order routing infrastructure.
+
+## Quick Start
+
+```bash
+# Trade two instruments in simulation mode
+cd build && ./pinnaclemm --mode simulation --symbols BTC-USD,ETH-USD
+
+# Trade with ML enabled on all instruments
+cd build && ./pinnaclemm --mode simulation --symbols BTC-USD,ETH-USD --enable-ml
+
+# Single-symbol mode still works (backward compatible)
+cd build && ./pinnaclemm --mode simulation --symbol BTC-USD
+```
+
+## CLI Flags
+
+| Flag | Description | Default |
+|------|-------------|---------|
+| `--symbols` | Comma-separated list of instruments | (uses `--symbol`) |
+| `--symbol` | Single instrument (backward compat) | `BTC-USD` |
+
+When `--symbols` is provided, it takes precedence over `--symbol`.
+
+## Configuration
+
+### `config/default_config.json`
+
+The `instruments` array defines per-instrument settings:
+
+```json
+{
+ "instruments": [
+ {
+ "symbol": "BTC-USD",
+ "enabled": true,
+ "useLockFree": true,
+ "enableML": false,
+ "baseSpreadBps": 10.0,
+ "orderQuantity": 0.01,
+ "maxPosition": 10.0
+ },
+ {
+ "symbol": "ETH-USD",
+ "enabled": true,
+ "useLockFree": false,
+ "enableML": true,
+ "baseSpreadBps": 15.0,
+ "orderQuantity": 0.1,
+ "maxPosition": 50.0
+ }
+ ]
+}
+```
+
+### Per-Instrument Fields
+
+| Field | Type | Description |
+|-------|------|-------------|
+| `symbol` | string | Trading pair identifier |
+| `enabled` | bool | Whether this instrument is active |
+| `useLockFree` | bool | Use lock-free order book implementation |
+| `enableML` | bool | Enable ML-enhanced strategy for this instrument |
+| `baseSpreadBps` | double | Base spread in basis points |
+| `orderQuantity` | double | Default order size |
+| `maxPosition` | double | Maximum position for this instrument |
+
+## Architecture
+
+### InstrumentManager
+
+The `InstrumentManager` owns a map of `InstrumentContext` objects, each containing:
+
+```
+InstrumentContext
+ ├── symbol: std::string
+ ├── orderBook: std::shared_ptr
+ ├── strategy: std::shared_ptr
+ └── simulator: std::shared_ptr (null in live mode)
+```
+
+### Lifecycle
+
+1. `addInstrument(config, mode)` — Creates order book, strategy, and simulator for the given symbol
+2. `startAll()` — Starts all registered instruments
+3. Main loop — Each instrument runs independently; stats are aggregated
+4. `stopAll()` — Gracefully stops all instruments
+
+### Per-Symbol Risk Tracking
+
+When multiple instruments are active, the `RiskManager` tracks position, PnL, and volume per-symbol using atomic state. See the risk management section for details.
+
+```cpp
+// Register symbols for per-symbol tracking
+rm.registerSymbol("BTC-USD");
+rm.registerSymbol("ETH-USD");
+
+// Set per-symbol limits (optional — falls back to global)
+PerSymbolLimits btcLimits;
+btcLimits.symbol = "BTC-USD";
+btcLimits.maxPositionSize = 5.0;
+rm.setSymbolLimits(btcLimits);
+
+// Query per-symbol state
+auto* btcState = rm.getSymbolState("BTC-USD");
+double btcPosition = btcState->position.load();
+```
+
+## Performance Considerations
+
+- Each instrument runs its own strategy thread and simulator
+- The `ResourceAllocator` distributes CPU cores across instruments based on available hardware
+- Lock-free order books are recommended for high-throughput instruments
+- Global risk checks remain lock-free regardless of instrument count
+- Object pooling reduces allocation overhead on hot paths
+
+## Testing
+
+```bash
+cd build
+./instrument_manager_tests # 9 tests covering lifecycle management
+./multi_instrument_benchmark # Startup scaling and throughput benchmarks
+```
diff --git a/docs/PERFORMANCE_OPTIMIZATION_GUIDE.md b/docs/PERFORMANCE_OPTIMIZATION_GUIDE.md
new file mode 100644
index 0000000..ff1f4d5
--- /dev/null
+++ b/docs/PERFORMANCE_OPTIMIZATION_GUIDE.md
@@ -0,0 +1,174 @@
+# Performance Optimization Guide
+
+## Overview
+
+Phase 5 introduced several performance optimizations to PinnacleMM:
+
+1. **Lock-Free OrderBook Fix**: Eliminated 56x regression, now 4.5x faster than mutex
+2. **Object Pool**: Thread-safe allocation recycling for hot-path objects
+3. **CPU Affinity & Thread Pinning**: Platform-specific core assignment
+4. **Link-Time Optimization (LTO)**: Whole-program optimization at link time
+5. **Dynamic Resource Allocation**: Automatic CPU core distribution across instruments
+
+## Lock-Free OrderBook Optimization
+
+### Problem
+
+The lock-free order book was 56x slower than the mutex-based implementation at 1000 orders due to:
+
+- **O(n) quantity recalculation**: `updateTotalQuantity()` walked the entire linked list on every add/remove
+- **Redundant CAS loops**: Michael-Scott queue CAS retries inside an already-held exclusive lock
+- **Logical deletion without physical unlinking**: Removed nodes stayed in the list, growing walk times
+- **Duplicate shard lock acquisitions**: Separate `contains()` + `insert()` calls
+
+### Fix
+
+| Issue | Fix | Complexity Change |
+|-------|-----|-------------------|
+| `updateTotalQuantity()` full scan | O(1) atomic add/subtract on add/remove | O(n) -> O(1) per op |
+| CAS loop under exclusive lock | Direct pointer append | Eliminates retries |
+| Logical deletion (null pointer) | Physical node unlinking + delete | Prevents list growth |
+| `contains()` + `insert()` | Single `insert()` check | 2 locks -> 1 lock |
+| Spinlock busy-wait | Platform-specific yield hint | Reduces CPU waste |
+
+### Results
+
+| Metric | Before | After | Improvement |
+|--------|--------|-------|-------------|
+| LockFree 100 orders | N/A | 0.17ms | - |
+| LockFree 1000 orders | 75.2ms | 1.79ms | **42x faster** |
+| vs Mutex 1000 orders | 56x slower | 4.5x faster | Regression eliminated |
+| LockFree 10000 orders | N/A | 20.2ms | - |
+| vs Mutex 10000 orders | - | 3.3x faster | - |
+
+## Object Pool
+
+### `core/utils/ObjectPool.h`
+
+Header-only, thread-safe object pool template for hot-path allocation:
+
+```cpp
+#include "core/utils/ObjectPool.h"
+
+// Create pool with 1000 pre-allocated Order objects
+pinnacle::utils::ObjectPool pool(1000);
+
+// Acquire returns shared_ptr with custom deleter that recycles
+auto order = pool.acquire("id", "BTC-USD", OrderSide::BUY, OrderType::LIMIT, 100.0, 1.0, ts);
+
+// When shared_ptr ref count reaches 0, object is recycled back to pool
+// No heap allocation on acquire if pool has available objects
+```
+
+### Design
+
+- Pre-allocates objects in constructor
+- `acquire()` returns `std::shared_ptr` with a custom deleter
+- Custom deleter resets and recycles the object back to the pool
+- Falls back to `new` allocation if pool is exhausted
+- Thread-safe via `std::mutex` on the free list
+
+### When to Use
+
+- Order allocation in `ExchangeSimulator` and `BasicMarketMaker`
+- Any hot-path object that is frequently created and destroyed
+- Objects with non-trivial construction cost
+
+## CPU Affinity & Thread Pinning
+
+### `core/utils/ThreadAffinity.h`
+
+Platform-specific thread pinning for latency-sensitive threads:
+
+```cpp
+#include "core/utils/ThreadAffinity.h"
+
+// Pin current thread to core 2
+pinnacle::utils::ThreadAffinity::pinToCore(2);
+
+// Set thread name (visible in profilers)
+pinnacle::utils::ThreadAffinity::setThreadName("strategy-btc");
+
+// Query available cores
+int cores = pinnacle::utils::ThreadAffinity::getNumCores();
+
+// Pin a specific thread
+pinnacle::utils::ThreadAffinity::pinThreadToCore(myThread, 3);
+```
+
+### Platform Support
+
+| Platform | Thread Pinning | Thread Naming |
+|----------|---------------|---------------|
+| macOS (ARM/x86) | `thread_affinity_policy` | `pthread_setname_np` |
+| Linux | `pthread_setaffinity_np` | `pthread_setname_np` |
+| Other | No-op (returns false) | No-op |
+
+## Link-Time Optimization (LTO)
+
+### CMake Configuration
+
+LTO is enabled via the `ENABLE_LTO` option:
+
+```bash
+cmake -DENABLE_LTO=ON ..
+make -j$(nproc)
+```
+
+When enabled, the compiler performs whole-program optimization across translation units, enabling:
+- Cross-file inlining
+- Dead code elimination
+- Interprocedural constant propagation
+
+### Impact
+
+LTO typically provides 5-15% throughput improvement for compute-bound workloads. The overhead is increased link time.
+
+## Dynamic Resource Allocation
+
+### `core/instrument/ResourceAllocator.h`
+
+Automatically distributes CPU cores across instruments:
+
+```cpp
+ResourceAllocator allocator;
+auto assignments = allocator.allocate(instrumentCount);
+
+for (const auto& assignment : assignments) {
+ // assignment.instrumentIndex - which instrument
+ // assignment.coreId - which CPU core to pin to
+ // assignment.priority - relative priority (0 = highest)
+}
+```
+
+### Allocation Strategy
+
+1. Core 0 is reserved for OS/kernel work
+2. Remaining cores are distributed round-robin across instruments
+3. Priority is assigned based on order (lower index = higher priority)
+
+## Benchmarks
+
+```bash
+cd build
+
+# Order book comparison (mutex vs lock-free)
+./orderbook_benchmark
+
+# Multi-instrument scaling
+./multi_instrument_benchmark
+
+# Key benchmarks in multi_instrument_benchmark:
+# SingleInstrumentStartup — Baseline startup time
+# MultiInstrumentStartup/1..8 — Scaling with 1-8 instruments
+# ObjectPoolAcquireRelease — Pool vs raw allocation
+# ObjectPoolContended/1..4 — Multi-threaded pool performance
+```
+
+## Profiling Tips
+
+1. **CPU profiling**: Use `perf record` (Linux) or Instruments (macOS) to identify hotspots
+2. **Cache analysis**: `perf stat -e cache-misses` to check cache behavior
+3. **Lock contention**: Monitor spinlock spin counts in `LockFreeOrderMap::ShardGuard`
+4. **Memory allocation**: Use `jemalloc` or `tcmalloc` for production builds
+5. **NUMA awareness**: On multi-socket systems, pin threads to cores near their memory
diff --git a/docs/ROADMAP.md b/docs/ROADMAP.md
index fc75c4f..b12b0ff 100644
--- a/docs/ROADMAP.md
+++ b/docs/ROADMAP.md
@@ -152,15 +152,41 @@ PinnacleMM is an ultra-low latency market making system designed for high-freque
**Goal:** Fine-tune performance and scale to multiple markets.
### Deliverables
-- 🔲 Multi-instrument support
-- 🔲 Cross-exchange arbitrage capabilities
-- 🔲 Advanced statistical models for cross-market correlations
-- 🔲 Dynamic resource allocation
-- 🔲 Performance profiling and additional optimizations
-- 🔲 Comprehensive documentation and case studies
-
-### Expected Completion
-- 4 weeks
+- Multi-instrument support via `InstrumentManager` with `--symbols` CLI flag
+- Cross-exchange arbitrage detection and execution (dry-run and live)
+- Cross-market correlation analysis (Pearson, rolling, lead-lag, Engle-Granger cointegration)
+- Per-symbol risk tracking with atomic state and per-symbol position limits
+- Lock-free order book performance fix (56x regression eliminated, now 4.5x faster than mutex)
+- Object pool for hot-path allocation recycling
+- CPU affinity and thread pinning (macOS + Linux)
+- Link-Time Optimization (LTO) build option
+- Dynamic resource allocation for multi-instrument deployments
+- Multi-instrument benchmarks and scaling tests
+- MLEnhancedMarketMaker integration with cross-market signals
+
+### Key Components Implemented
+- **InstrumentManager**: Central orchestrator for per-symbol {orderbook, strategy, simulator} tuples (9 unit tests)
+- **ArbitrageDetector/Executor**: Background scan thread with venue quote cache, fee-adjusted opportunity detection, dry-run support (8 unit tests)
+- **CrossMarketCorrelation**: Pearson/rolling correlation, lead-lag analysis, simplified Engle-Granger ADF test, signal generation for MLEnhancedMarketMaker (7 unit tests)
+- **Per-Symbol Risk**: `SymbolRiskState` with atomics, `registerSymbol()`, `setSymbolLimits()`, `getSymbolState()` (4 new unit tests, 15 total passing)
+- **LockFreeOrderBook Fix**: O(1) quantity updates, physical node unlinking, eliminated CAS retries, platform-specific yield hints
+- **ObjectPool**: Header-only thread-safe pool template with custom shared_ptr deleter
+- **ThreadAffinity**: `pinToCore()`, `setThreadName()`, `getNumCores()` for macOS and Linux
+- **ResourceAllocator**: CPU core distribution across instruments
+
+### Testing
+- 39 new unit tests across 5 test suites (all passing)
+- Multi-instrument benchmark for startup scaling and throughput
+- Full regression check — no existing test regressions
+
+### Documentation
+- [Multi-Instrument Guide](MULTI_INSTRUMENT_GUIDE.md)
+- [Cross-Exchange Arbitrage](CROSS_EXCHANGE_ARBITRAGE.md)
+- [Cross-Market Correlation](CROSS_MARKET_CORRELATION.md)
+- [Performance Optimization Guide](PERFORMANCE_OPTIMIZATION_GUIDE.md)
+
+### Completion
+- **Completed**: March 2026
## Testing Integration
diff --git a/main.cpp b/main.cpp
index 51bad6d..005be86 100644
--- a/main.cpp
+++ b/main.cpp
@@ -1,3 +1,4 @@
+#include "core/instrument/InstrumentManager.h"
#include "core/orderbook/LockFreeOrderBook.h"
#include "core/orderbook/OrderBook.h"
#include "core/persistence/PersistenceManager.h"
@@ -14,6 +15,8 @@
#include "exchange/connector/ExchangeConnectorFactory.h"
#include "exchange/connector/SecureConfig.h"
#include "exchange/simulator/ExchangeSimulator.h"
+#include "strategies/arbitrage/ArbitrageDetector.h"
+#include "strategies/arbitrage/ArbitrageExecutor.h"
#include "strategies/backtesting/BacktestEngine.h"
#include "strategies/basic/BasicMarketMaker.h"
#include "strategies/basic/MLEnhancedMarketMaker.h"
@@ -32,6 +35,7 @@
#include
#include
#include
+#include
#include
#include
@@ -204,7 +208,15 @@ int main(int argc, char* argv[]) {
"slippage-bps", po::value()->default_value(2.0),
"Slippage in basis points for backtest")(
"backtest-duration", po::value()->default_value(3600),
- "Backtest duration in seconds (default: 3600 = 1 hour)");
+ "Backtest duration in seconds (default: 3600 = 1 hour)")(
+ "symbols", po::value(),
+ "Comma-separated list of symbols (e.g. BTC-USD,ETH-USD)")(
+ "enable-arbitrage", po::bool_switch()->default_value(false),
+ "Enable cross-exchange arbitrage detection")(
+ "arb-min-spread", po::value()->default_value(5.0),
+ "Minimum spread in bps for arbitrage")(
+ "arb-dry-run", po::bool_switch()->default_value(true),
+ "Arbitrage dry-run mode (log only, no execution)");
po::variables_map vm;
po::store(po::parse_command_line(argc, argv, desc), vm);
@@ -230,6 +242,23 @@ int main(int argc, char* argv[]) {
bool verbose = vm["verbose"].as();
bool useLockFree = vm["lock-free"].as();
+ // Parse multi-symbol flag
+ std::vector symbols;
+ if (vm.count("symbols")) {
+ std::string symbolsStr = vm["symbols"].as();
+ std::istringstream iss(symbolsStr);
+ std::string tok;
+ while (std::getline(iss, tok, ',')) {
+ if (!tok.empty()) {
+ symbols.push_back(tok);
+ }
+ }
+ }
+ if (symbols.empty()) {
+ symbols.push_back(symbol); // fallback to --symbol
+ }
+ bool multiInstrument = symbols.size() > 1;
+
// Initialize logger
auto console_sink = std::make_shared();
auto file_sink =
@@ -243,7 +272,18 @@ int main(int argc, char* argv[]) {
}
spdlog::set_default_logger(logger);
- spdlog::info("Starting PinnacleMM for {} in {} mode", symbol, mode);
+ if (multiInstrument) {
+ std::string symbolList;
+ for (size_t i = 0; i < symbols.size(); ++i) {
+ if (i > 0)
+ symbolList += ", ";
+ symbolList += symbols[i];
+ }
+ spdlog::info("Starting PinnacleMM with {} instruments [{}] in {} mode",
+ symbols.size(), symbolList, mode);
+ } else {
+ spdlog::info("Starting PinnacleMM for {} in {} mode", symbol, mode);
+ }
spdlog::info("Using lock-free data structures: {}",
useLockFree ? "enabled" : "disabled");
@@ -355,6 +395,135 @@ int main(int argc, char* argv[]) {
"CircuitBreaker");
});
+ // Register all symbols with the risk manager for per-symbol tracking
+ for (const auto& sym : symbols) {
+ riskManager.registerSymbol(sym);
+ }
+
+ // Apply per-symbol limits from config
+ for (const auto& psl : riskConfig.perSymbolLimits) {
+ riskManager.setSymbolLimits(psl);
+ }
+
+ // InstrumentManager for multi-instrument mode
+ pinnacle::instrument::InstrumentManager instrumentManager;
+ bool enableML = vm["enable-ml"].as();
+
+ // Initialize JSON logger if enabled
+ std::shared_ptr jsonLogger;
+ if (vm["json-log"].as()) {
+ std::string jsonLogFile = vm["json-log-file"].as();
+ jsonLogger =
+ std::make_shared(jsonLogFile, true);
+ spdlog::info("JSON logging enabled, output file: {}", jsonLogFile);
+ }
+
+ if (multiInstrument) {
+ // Multi-instrument path: use InstrumentManager
+ for (const auto& sym : symbols) {
+ pinnacle::instrument::InstrumentConfig instCfg;
+ instCfg.symbol = sym;
+ instCfg.useLockFree = useLockFree;
+ instCfg.enableML = enableML;
+ instrumentManager.addInstrument(instCfg, mode);
+ }
+
+ // For backtest mode with multiple instruments, not yet supported
+ if (mode == "backtest") {
+ spdlog::warn("Backtest mode with multiple instruments not yet "
+ "supported. Using first symbol only.");
+ // Fall through to single-instrument backtest below
+ } else if (mode != "live") {
+ // Simulation mode: start all instruments
+ if (!instrumentManager.startAll()) {
+ spdlog::error("Failed to start all instruments");
+ return 1;
+ }
+
+ spdlog::info("All {} instruments started in simulation mode",
+ symbols.size());
+
+ // Setup arbitrage if enabled
+ std::unique_ptr arbDetector;
+ std::unique_ptr arbExecutor;
+ if (vm["enable-arbitrage"].as()) {
+ pinnacle::arbitrage::ArbitrageConfig arbConfig;
+ arbConfig.minSpreadBps = vm["arb-min-spread"].as();
+ arbConfig.dryRun = vm["arb-dry-run"].as();
+ arbConfig.symbols = symbols;
+ arbConfig.scanIntervalMs = 100;
+
+ arbDetector =
+ std::make_unique(
+ arbConfig);
+ arbExecutor =
+ std::make_unique(
+ arbConfig.dryRun);
+
+ arbDetector->setOpportunityCallback(
+ [&arbExecutor](
+ const pinnacle::arbitrage::ArbitrageOpportunity& opp) {
+ arbExecutor->execute(opp);
+ });
+
+ arbDetector->start();
+ spdlog::info("Arbitrage detector started (dryRun={})",
+ arbConfig.dryRun);
+ }
+
+ // Main loop for multi-instrument simulation
+ uint64_t lastStatsTime = 0;
+ uint64_t lastCheckpointTime = 0;
+
+ while (g_running.load()) {
+ uint64_t currentTime = pinnacle::utils::TimeUtils::getCurrentMillis();
+
+ if (currentTime - lastStatsTime > 5000) {
+ spdlog::info("======================");
+ spdlog::info("Current time: {}",
+ pinnacle::utils::TimeUtils::getCurrentISOTimestamp());
+ spdlog::info("{}", instrumentManager.getAggregateStatistics());
+
+ if (arbDetector) {
+ spdlog::info("{}", arbDetector->getStatistics());
+ }
+
+ spdlog::info("======================");
+ lastStatsTime = currentTime;
+ }
+
+ if (currentTime - lastCheckpointTime > 5 * 60 * 1000) {
+ instrumentManager.createCheckpoints();
+ lastCheckpointTime = currentTime;
+ }
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+
+ // Shutdown
+ spdlog::info("Shutting down multi-instrument mode...");
+
+ if (arbDetector) {
+ arbDetector->stop();
+ }
+
+ instrumentManager.stopAll();
+
+ if (varEngine) {
+ varEngine->stop();
+ }
+
+ spdlog::info("Final statistics:");
+ spdlog::info("{}", instrumentManager.getAggregateStatistics());
+
+ AUDIT_SYSTEM_EVENT("PinnacleMM system shutdown complete", true);
+ spdlog::info("Shutdown complete");
+ return 0;
+ }
+ }
+
+ // --- Single-instrument path (backward compatible) ---
+
// Create or retrieve order book
std::shared_ptr orderBook;
@@ -382,18 +551,8 @@ int main(int argc, char* argv[]) {
config.symbol = symbol;
// Initialize strategy (basic or ML-enhanced)
- bool enableML = vm["enable-ml"].as();
std::shared_ptr strategy;
- // Initialize JSON logger if enabled
- std::shared_ptr jsonLogger;
- if (vm["json-log"].as()) {
- std::string jsonLogFile = vm["json-log-file"].as();
- jsonLogger =
- std::make_shared(jsonLogFile, true);
- spdlog::info("JSON logging enabled, output file: {}", jsonLogFile);
- }
-
if (enableML) {
spdlog::info("Initializing ML-enhanced market maker");
diff --git a/strategies/analytics/CrossMarketCorrelation.cpp b/strategies/analytics/CrossMarketCorrelation.cpp
new file mode 100644
index 0000000..81bbbf6
--- /dev/null
+++ b/strategies/analytics/CrossMarketCorrelation.cpp
@@ -0,0 +1,412 @@
+#include "CrossMarketCorrelation.h"
+
+#include
+#include
+#include
+#include
+
+namespace pinnacle {
+namespace analytics {
+
+CrossMarketCorrelation::CrossMarketCorrelation(const CrossMarketConfig& config)
+ : m_config(config) {}
+
+void CrossMarketCorrelation::addPriceObservation(const std::string& symbol,
+ double price, double volume,
+ uint64_t timestamp) {
+ std::lock_guard lock(m_dataMutex);
+
+ auto& series = m_series[symbol];
+ series.prices.push_back(price);
+ series.volumes.push_back(volume);
+ series.timestamps.push_back(timestamp);
+
+ // Compute log return (guard against non-positive prices)
+ if (series.prices.size() >= 2) {
+ size_t n = series.prices.size();
+ if (series.prices[n - 2] > 0.0 && series.prices[n - 1] > 0.0) {
+ double ret = std::log(series.prices[n - 1] / series.prices[n - 2]);
+ series.returns.push_back(ret);
+ }
+ }
+
+ // Trim to window size (keep 2x for lead-lag analysis)
+ size_t maxSize = m_config.returnWindowSize * 2;
+ while (series.prices.size() > maxSize) {
+ series.prices.pop_front();
+ series.volumes.pop_front();
+ series.timestamps.pop_front();
+ }
+ while (series.returns.size() > maxSize) {
+ series.returns.pop_front();
+ }
+
+ // Update all pairs involving this symbol
+ m_signalsDirty = true;
+ for (auto& [key, pair] : m_pairs) {
+ if (key.symbolA == symbol || key.symbolB == symbol) {
+ updatePair(key);
+ }
+ }
+}
+
+void CrossMarketCorrelation::addPair(const std::string& symbolA,
+ const std::string& symbolB) {
+ std::lock_guard lock(m_dataMutex);
+
+ PairKey key{symbolA, symbolB};
+ if (m_pairs.count(key)) {
+ return;
+ }
+
+ CorrelationPair pair;
+ pair.symbolA = symbolA;
+ pair.symbolB = symbolB;
+ m_pairs[key] = pair;
+ m_signalsDirty = true;
+}
+
+void CrossMarketCorrelation::removePair(const std::string& symbolA,
+ const std::string& symbolB) {
+ std::lock_guard lock(m_dataMutex);
+ m_pairs.erase(PairKey{symbolA, symbolB});
+ m_signalsDirty = true;
+}
+
+CorrelationPair
+CrossMarketCorrelation::getCorrelation(const std::string& symbolA,
+ const std::string& symbolB) const {
+ std::lock_guard lock(m_dataMutex);
+ auto it = m_pairs.find(PairKey{symbolA, symbolB});
+ if (it != m_pairs.end()) {
+ return it->second;
+ }
+ return CorrelationPair{symbolA, symbolB};
+}
+
+std::vector
+CrossMarketCorrelation::getActiveSignals() const {
+ std::lock_guard lock(m_dataMutex);
+ if (m_signalsDirty) {
+ updateSignals();
+ }
+ return m_signals;
+}
+
+std::vector
+CrossMarketCorrelation::getAllCorrelations() const {
+ std::lock_guard lock(m_dataMutex);
+ std::vector result;
+ result.reserve(m_pairs.size());
+ for (const auto& [key, pair] : m_pairs) {
+ result.push_back(pair);
+ }
+ return result;
+}
+
+std::string CrossMarketCorrelation::getStatistics() const {
+ std::lock_guard lock(m_dataMutex);
+
+ std::ostringstream oss;
+ oss << "CrossMarketCorrelation Statistics:\n";
+ oss << " Tracked symbols: " << m_series.size() << "\n";
+ oss << " Registered pairs: " << m_pairs.size() << "\n";
+
+ for (const auto& [key, pair] : m_pairs) {
+ oss << " " << pair.symbolA << "/" << pair.symbolB << ":\n";
+ oss << " Pearson: " << pair.pearsonCorrelation << "\n";
+ oss << " Rolling: " << pair.rollingCorrelation << "\n";
+ oss << " Lead-lag: " << pair.leadLagBarsA << " bars"
+ << " (coeff=" << pair.leadLagCoefficient << ")\n";
+ oss << " Cointegration: " << pair.cointegrationScore
+ << (pair.isCointegrated ? " [cointegrated]" : "") << "\n";
+ }
+
+ return oss.str();
+}
+
+// --- Statistical computation methods ---
+
+double CrossMarketCorrelation::computePearsonCorrelation(
+ const std::deque& x, const std::deque& y) const {
+ size_t n = std::min(x.size(), y.size());
+ if (n < 3) {
+ return 0.0;
+ }
+
+ // Use last n elements
+ double sumX = 0, sumY = 0, sumXY = 0, sumX2 = 0, sumY2 = 0;
+
+ for (size_t i = 0; i < n; ++i) {
+ size_t xi = x.size() - n + i;
+ size_t yi = y.size() - n + i;
+ sumX += x[xi];
+ sumY += y[yi];
+ sumXY += x[xi] * y[yi];
+ sumX2 += x[xi] * x[xi];
+ sumY2 += y[yi] * y[yi];
+ }
+
+ double denom =
+ std::sqrt((n * sumX2 - sumX * sumX) * (n * sumY2 - sumY * sumY));
+ if (denom < 1e-15) {
+ return 0.0;
+ }
+
+ return (n * sumXY - sumX * sumY) / denom;
+}
+
+double CrossMarketCorrelation::computeRollingCorrelation(
+ const std::deque& x, const std::deque& y) const {
+ size_t window = m_config.rollingWindowSize;
+ size_t n = std::min({x.size(), y.size(), window});
+ if (n < 3) {
+ return 0.0;
+ }
+
+ // Use the last 'n' elements
+ std::deque xWindow(x.end() - n, x.end());
+ std::deque yWindow(y.end() - n, y.end());
+
+ return computePearsonCorrelation(xWindow, yWindow);
+}
+
+CrossMarketCorrelation::LeadLagResult
+CrossMarketCorrelation::computeLeadLag(const std::deque& x,
+ const std::deque& y) const {
+ LeadLagResult result;
+ double bestCorr = 0.0;
+ int maxLag = m_config.maxLagBars;
+
+ size_t n = std::min(x.size(), y.size());
+ if (n < static_cast(maxLag + 3)) {
+ return result;
+ }
+
+ for (int lag = -maxLag; lag <= maxLag; ++lag) {
+ // Compute correlation with offset
+ size_t effectiveN = n - std::abs(lag);
+ if (effectiveN < 3) {
+ continue;
+ }
+
+ double sumX = 0, sumY = 0, sumXY = 0, sumX2 = 0, sumY2 = 0;
+
+ for (size_t i = 0; i < effectiveN; ++i) {
+ size_t xi = (lag >= 0) ? (x.size() - effectiveN + i)
+ : (x.size() - effectiveN + i - lag);
+ size_t yi = (lag >= 0) ? (y.size() - effectiveN + i + lag)
+ : (y.size() - effectiveN + i);
+
+ if (xi >= x.size() || yi >= y.size()) {
+ continue;
+ }
+
+ sumX += x[xi];
+ sumY += y[yi];
+ sumXY += x[xi] * y[yi];
+ sumX2 += x[xi] * x[xi];
+ sumY2 += y[yi] * y[yi];
+ }
+
+ double denom = std::sqrt((effectiveN * sumX2 - sumX * sumX) *
+ (effectiveN * sumY2 - sumY * sumY));
+ if (denom < 1e-15) {
+ continue;
+ }
+
+ double corr = (effectiveN * sumXY - sumX * sumY) / denom;
+
+ if (std::abs(corr) > std::abs(bestCorr)) {
+ bestCorr = corr;
+ result.bestLag = lag;
+ result.coefficient = corr;
+ }
+ }
+
+ return result;
+}
+
+double CrossMarketCorrelation::computeCointegration(
+ const std::deque& pricesA,
+ const std::deque& pricesB) const {
+ // Simplified Engle-Granger: OLS regression of A on B, then ADF on residuals
+ size_t n = std::min(pricesA.size(), pricesB.size());
+ if (n < 20) {
+ return 0.0;
+ }
+
+ // Step 1: OLS regression Y = alpha + beta * X
+ double sumX = 0, sumY = 0, sumXY = 0, sumX2 = 0;
+
+ for (size_t i = 0; i < n; ++i) {
+ size_t ai = pricesA.size() - n + i;
+ size_t bi = pricesB.size() - n + i;
+ sumX += pricesB[bi];
+ sumY += pricesA[ai];
+ sumXY += pricesA[ai] * pricesB[bi];
+ sumX2 += pricesB[bi] * pricesB[bi];
+ }
+
+ double denom = n * sumX2 - sumX * sumX;
+ if (std::abs(denom) < 1e-15) {
+ return 0.0;
+ }
+
+ double beta = (n * sumXY - sumX * sumY) / denom;
+ double alpha = (sumY - beta * sumX) / n;
+
+ // Step 2: Compute residuals
+ std::vector residuals(n);
+ for (size_t i = 0; i < n; ++i) {
+ size_t ai = pricesA.size() - n + i;
+ size_t bi = pricesB.size() - n + i;
+ residuals[i] = pricesA[ai] - alpha - beta * pricesB[bi];
+ }
+
+ // Step 3: Simplified ADF test on residuals
+ // delta_e(t) = gamma * e(t-1) + noise
+ // t-stat for gamma
+ double sumE1 = 0, sumDE = 0, sumE1DE = 0, sumE12 = 0;
+ for (size_t i = 1; i < n; ++i) {
+ double e1 = residuals[i - 1];
+ double de = residuals[i] - residuals[i - 1];
+ sumE1 += e1;
+ sumDE += de;
+ sumE12 += e1 * e1;
+ sumE1DE += e1 * de;
+ }
+
+ size_t m = n - 1;
+ double gammaDenom = m * sumE12 - sumE1 * sumE1;
+ if (std::abs(gammaDenom) < 1e-15) {
+ return 0.0;
+ }
+
+ double gamma = (m * sumE1DE - sumE1 * sumDE) / gammaDenom;
+
+ // Compute standard error of gamma
+ double gammaAlpha = (sumDE - gamma * sumE1) / m;
+ double sse = 0.0;
+ for (size_t i = 1; i < n; ++i) {
+ double predicted = gammaAlpha + gamma * residuals[i - 1];
+ double actual = residuals[i] - residuals[i - 1];
+ double err = actual - predicted;
+ sse += err * err;
+ }
+ double se = std::sqrt(sse / (m - 2));
+ double varianceTerm = sumE12 - sumE1 * sumE1 / m;
+ if (varianceTerm < 1e-15) {
+ return 0.0;
+ }
+ double seGamma = se / std::sqrt(varianceTerm);
+
+ if (seGamma < 1e-15) {
+ return 0.0;
+ }
+
+ // t-statistic (more negative = stronger cointegration)
+ return gamma / seGamma;
+}
+
+void CrossMarketCorrelation::updatePair(const PairKey& key) {
+ // Must be called with m_dataMutex held
+
+ auto itA = m_series.find(key.symbolA);
+ auto itB = m_series.find(key.symbolB);
+
+ if (itA == m_series.end() || itB == m_series.end()) {
+ return;
+ }
+
+ auto& seriesA = itA->second;
+ auto& seriesB = itB->second;
+
+ auto pairIt = m_pairs.find(key);
+ if (pairIt == m_pairs.end()) {
+ return;
+ }
+
+ auto& pair = pairIt->second;
+
+ // Pearson correlation on returns
+ if (seriesA.returns.size() >= 10 && seriesB.returns.size() >= 10) {
+ pair.pearsonCorrelation =
+ computePearsonCorrelation(seriesA.returns, seriesB.returns);
+ pair.rollingCorrelation =
+ computeRollingCorrelation(seriesA.returns, seriesB.returns);
+
+ auto leadLag = computeLeadLag(seriesA.returns, seriesB.returns);
+ pair.leadLagCoefficient = leadLag.coefficient;
+ pair.leadLagBarsA = leadLag.bestLag;
+ }
+
+ // Cointegration on prices
+ if (seriesA.prices.size() >= 20 && seriesB.prices.size() >= 20) {
+ pair.cointegrationScore =
+ computeCointegration(seriesA.prices, seriesB.prices);
+
+ // Approximate critical values for Engle-Granger (2 variables)
+ // At 5% significance: ~ -3.37
+ pair.isCointegrated = pair.cointegrationScore < -3.37;
+ }
+}
+
+void CrossMarketCorrelation::updateSignals() const {
+ // Must be called with m_dataMutex held
+
+ m_signals.clear();
+
+ for (const auto& [key, pair] : m_pairs) {
+ // Only generate signals for pairs with sufficient lead-lag relationship
+ if (std::abs(pair.leadLagCoefficient) < m_config.minCorrelation) {
+ continue;
+ }
+
+ if (pair.leadLagBarsA == 0) {
+ continue; // No lead-lag detected
+ }
+
+ // Determine which symbol leads
+ std::string leadSym, lagSym;
+ if (pair.leadLagBarsA > 0) {
+ leadSym = pair.symbolA;
+ lagSym = pair.symbolB;
+ } else {
+ leadSym = pair.symbolB;
+ lagSym = pair.symbolA;
+ }
+
+ // Get the leader's recent return
+ auto leadIt = m_series.find(leadSym);
+ if (leadIt == m_series.end() || leadIt->second.returns.empty()) {
+ continue;
+ }
+
+ double leaderReturn = leadIt->second.returns.back();
+ double signalStrength =
+ std::abs(pair.leadLagCoefficient) * std::abs(leaderReturn);
+
+ if (signalStrength < m_config.signalThreshold) {
+ continue;
+ }
+
+ CrossMarketSignal signal;
+ signal.leadSymbol = leadSym;
+ signal.lagSymbol = lagSym;
+ signal.signalStrength = std::min(1.0, signalStrength);
+ signal.expectedMove = leaderReturn * pair.leadLagCoefficient;
+ signal.confidence = std::abs(pair.rollingCorrelation);
+
+ if (!leadIt->second.timestamps.empty()) {
+ signal.timestamp = leadIt->second.timestamps.back();
+ }
+
+ m_signals.push_back(signal);
+ }
+
+ m_signalsDirty = false;
+}
+
+} // namespace analytics
+} // namespace pinnacle
diff --git a/strategies/analytics/CrossMarketCorrelation.h b/strategies/analytics/CrossMarketCorrelation.h
new file mode 100644
index 0000000..6817210
--- /dev/null
+++ b/strategies/analytics/CrossMarketCorrelation.h
@@ -0,0 +1,172 @@
+#pragma once
+
+#include
+#include
+#include
+#include
+#include
+#include
+
+namespace pinnacle {
+namespace analytics {
+
+/**
+ * @struct CorrelationPair
+ * @brief Statistics for a pair of correlated instruments
+ */
+struct CorrelationPair {
+ std::string symbolA;
+ std::string symbolB;
+ double pearsonCorrelation{0.0}; // [-1, 1]
+ double rollingCorrelation{0.0}; // recent-window correlation
+ double leadLagCoefficient{0.0}; // strength of lead-lag relationship
+ int leadLagBarsA{0}; // positive = A leads B
+ double cointegrationScore{
+ 0.0}; // Engle-Granger t-stat (more negative = stronger)
+ bool isCointegrated{false};
+};
+
+/**
+ * @struct CrossMarketSignal
+ * @brief Trading signal from cross-market analysis
+ */
+struct CrossMarketSignal {
+ std::string leadSymbol;
+ std::string lagSymbol;
+ double signalStrength{0.0}; // [0, 1]
+ double expectedMove{0.0}; // expected % move in lag symbol
+ double confidence{0.0}; // [0, 1]
+ uint64_t timestamp{0};
+};
+
+/**
+ * @struct CrossMarketConfig
+ * @brief Configuration for cross-market correlation analysis
+ */
+struct CrossMarketConfig {
+ size_t returnWindowSize{100}; // window for return calculations
+ size_t rollingWindowSize{30}; // rolling correlation window
+ int maxLagBars{10}; // max lead-lag offset to test
+ double minCorrelation{0.5}; // min absolute correlation to consider
+ double signalThreshold{0.3}; // min signal strength to emit
+ double cointegrationPValue{0.05}; // significance level for cointegration
+};
+
+/**
+ * @class CrossMarketCorrelation
+ * @brief Statistical models to detect when one instrument's movement predicts
+ * another's
+ *
+ * Implements Pearson correlation, rolling correlation, lead-lag analysis,
+ * and Engle-Granger cointegration test for pairs of instruments.
+ */
+class CrossMarketCorrelation {
+public:
+ explicit CrossMarketCorrelation(const CrossMarketConfig& config = {});
+ ~CrossMarketCorrelation() = default;
+
+ CrossMarketCorrelation(const CrossMarketCorrelation&) = delete;
+ CrossMarketCorrelation& operator=(const CrossMarketCorrelation&) = delete;
+
+ /**
+ * @brief Add a price observation for a symbol
+ * @param symbol Trading symbol
+ * @param price Current price
+ * @param volume Current volume
+ * @param timestamp Nanosecond timestamp
+ */
+ void addPriceObservation(const std::string& symbol, double price,
+ double volume, uint64_t timestamp);
+
+ /**
+ * @brief Register a pair for correlation tracking
+ * @param symbolA First symbol
+ * @param symbolB Second symbol
+ */
+ void addPair(const std::string& symbolA, const std::string& symbolB);
+
+ /**
+ * @brief Remove a pair
+ */
+ void removePair(const std::string& symbolA, const std::string& symbolB);
+
+ /**
+ * @brief Get correlation statistics for a pair
+ */
+ CorrelationPair getCorrelation(const std::string& symbolA,
+ const std::string& symbolB) const;
+
+ /**
+ * @brief Get all currently active cross-market signals
+ */
+ std::vector getActiveSignals() const;
+
+ /**
+ * @brief Get all registered pairs with their correlations
+ */
+ std::vector getAllCorrelations() const;
+
+ /**
+ * @brief Get statistics string
+ */
+ std::string getStatistics() const;
+
+private:
+ CrossMarketConfig m_config;
+
+ // Per-symbol price series
+ struct PriceSeries {
+ std::deque prices;
+ std::deque returns; // log returns
+ std::deque volumes;
+ std::deque timestamps;
+ };
+
+ mutable std::mutex m_dataMutex;
+ std::unordered_map m_series;
+
+ // Registered pairs
+ struct PairKey {
+ std::string symbolA;
+ std::string symbolB;
+ bool operator==(const PairKey& other) const {
+ return symbolA == other.symbolA && symbolB == other.symbolB;
+ }
+ };
+
+ struct PairKeyHash {
+ size_t operator()(const PairKey& k) const {
+ return std::hash{}(k.symbolA) ^
+ (std::hash{}(k.symbolB) << 1);
+ }
+ };
+
+ std::unordered_map m_pairs;
+
+ // Cached signals
+ mutable std::vector m_signals;
+ mutable bool m_signalsDirty{true};
+
+ // Statistical computation methods
+ double computePearsonCorrelation(const std::deque& x,
+ const std::deque& y) const;
+
+ double computeRollingCorrelation(const std::deque& x,
+ const std::deque& y) const;
+
+ struct LeadLagResult {
+ double coefficient{0.0};
+ int bestLag{0};
+ };
+ LeadLagResult computeLeadLag(const std::deque& x,
+ const std::deque& y) const;
+
+ double computeCointegration(const std::deque& pricesA,
+ const std::deque& pricesB) const;
+
+ void updatePair(const PairKey& key);
+ void updateSignals() const;
+};
+
+} // namespace analytics
+} // namespace pinnacle
diff --git a/strategies/arbitrage/ArbitrageDetector.cpp b/strategies/arbitrage/ArbitrageDetector.cpp
new file mode 100644
index 0000000..2ba2988
--- /dev/null
+++ b/strategies/arbitrage/ArbitrageDetector.cpp
@@ -0,0 +1,256 @@
+#include "ArbitrageDetector.h"
+
+#include
+#include
+#include
+#include
+
+namespace pinnacle {
+namespace arbitrage {
+
+ArbitrageDetector::ArbitrageDetector(const ArbitrageConfig& config)
+ : m_config(config) {}
+
+ArbitrageDetector::~ArbitrageDetector() { stop(); }
+
+bool ArbitrageDetector::start() {
+ bool expected = false;
+ if (!m_running.compare_exchange_strong(expected, true,
+ std::memory_order_acq_rel,
+ std::memory_order_acquire)) {
+ return true; // Already running
+ }
+
+ m_scanThread = std::thread(&ArbitrageDetector::scanLoop, this);
+ spdlog::info("ArbitrageDetector started (scanInterval={}ms dryRun={})",
+ m_config.scanIntervalMs, m_config.dryRun);
+ return true;
+}
+
+bool ArbitrageDetector::stop() {
+ bool expected = true;
+ if (!m_running.compare_exchange_strong(expected, false,
+ std::memory_order_acq_rel,
+ std::memory_order_acquire)) {
+ return true; // Already stopped
+ }
+
+ if (m_scanThread.joinable()) {
+ m_scanThread.join();
+ }
+ spdlog::info("ArbitrageDetector stopped (total opportunities={})",
+ m_totalOpportunities.load(std::memory_order_relaxed));
+ return true;
+}
+
+bool ArbitrageDetector::isRunning() const {
+ return m_running.load(std::memory_order_acquire);
+}
+
+void ArbitrageDetector::updateVenueQuote(const std::string& venue,
+ const std::string& symbol, double bid,
+ double bidSize, double ask,
+ double askSize, uint64_t timestamp) {
+ VenueQuote quote;
+ quote.bidPrice = bid;
+ quote.bidSize = bidSize;
+ quote.askPrice = ask;
+ quote.askSize = askSize;
+ quote.timestamp = timestamp;
+
+ std::lock_guard lock(m_quotesMutex);
+ m_quotes[venue][symbol] = quote;
+}
+
+std::vector
+ArbitrageDetector::getCurrentOpportunities() const {
+ std::lock_guard lock(m_opportunitiesMutex);
+ return m_opportunities;
+}
+
+void ArbitrageDetector::setOpportunityCallback(OpportunityCallback callback) {
+ std::lock_guard lock(m_callbackMutex);
+ m_callback = std::move(callback);
+}
+
+std::string ArbitrageDetector::getStatistics() const {
+ std::ostringstream oss;
+ oss << "ArbitrageDetector Statistics:\n";
+ oss << " Total scans: " << m_totalScans.load(std::memory_order_relaxed)
+ << "\n";
+ oss << " Total opportunities: "
+ << m_totalOpportunities.load(std::memory_order_relaxed) << "\n";
+ oss << " Dry run: " << (m_config.dryRun ? "yes" : "no") << "\n";
+ oss << " Min spread: " << m_config.minSpreadBps << " bps\n";
+
+ auto currentOpps = getCurrentOpportunities();
+ oss << " Active opportunities: " << currentOpps.size() << "\n";
+ for (const auto& opp : currentOpps) {
+ oss << " " << opp.symbol << ": buy@" << opp.buyVenue << " "
+ << opp.buyPrice << " sell@" << opp.sellVenue << " " << opp.sellPrice
+ << " spread=" << opp.spreadBps << "bps profit=$" << opp.estimatedProfit
+ << "\n";
+ }
+
+ return oss.str();
+}
+
+uint64_t ArbitrageDetector::getTotalOpportunitiesDetected() const {
+ return m_totalOpportunities.load(std::memory_order_relaxed);
+}
+
+void ArbitrageDetector::scanLoop() {
+ while (m_running.load(std::memory_order_acquire)) {
+ std::vector allOpps;
+
+ for (const auto& symbol : m_config.symbols) {
+ auto opps = detectOpportunities(symbol);
+ allOpps.insert(allOpps.end(), opps.begin(), opps.end());
+ }
+
+ m_totalScans.fetch_add(1, std::memory_order_relaxed);
+
+ // Update current opportunities
+ size_t newOppsCount = allOpps.size();
+ {
+ std::lock_guard lock(m_opportunitiesMutex);
+ m_opportunities = std::move(allOpps);
+ }
+
+ // Fire callbacks for new opportunities
+ if (newOppsCount > 0) {
+ m_totalOpportunities.fetch_add(newOppsCount, std::memory_order_relaxed);
+
+ OpportunityCallback cb;
+ {
+ std::lock_guard lock(m_callbackMutex);
+ cb = m_callback;
+ }
+
+ if (cb) {
+ std::lock_guard lock(m_opportunitiesMutex);
+ for (const auto& opp : m_opportunities) {
+ cb(opp);
+ }
+ }
+ }
+
+ std::this_thread::sleep_for(
+ std::chrono::milliseconds(m_config.scanIntervalMs));
+ }
+}
+
+std::vector
+ArbitrageDetector::detectOpportunities(const std::string& symbol) const {
+ std::vector opportunities;
+
+ std::lock_guard lock(m_quotesMutex);
+
+ // Collect non-stale quotes for this symbol across venues
+ struct VenueData {
+ std::string venue;
+ VenueQuote quote;
+ double fee;
+ };
+
+ std::vector venueData;
+ for (const auto& venue : m_config.venues) {
+ auto venueIt = m_quotes.find(venue);
+ if (venueIt == m_quotes.end()) {
+ continue;
+ }
+
+ auto symbolIt = venueIt->second.find(symbol);
+ if (symbolIt == venueIt->second.end()) {
+ continue;
+ }
+
+ const auto& quote = symbolIt->second;
+ if (isStale(quote)) {
+ continue;
+ }
+
+ if (quote.bidPrice <= 0 || quote.askPrice <= 0) {
+ continue;
+ }
+
+ venueData.push_back({venue, quote, getVenueFee(venue)});
+ }
+
+ // Compare all venue pairs for arbitrage
+ uint64_t now = utils::TimeUtils::getCurrentNanos();
+
+ for (size_t i = 0; i < venueData.size(); ++i) {
+ for (size_t j = 0; j < venueData.size(); ++j) {
+ if (i == j) {
+ continue;
+ }
+
+ const auto& buyer = venueData[i]; // Buy at this venue's ask
+ const auto& seller = venueData[j]; // Sell at this venue's bid
+
+ double buyPrice = buyer.quote.askPrice;
+ double sellPrice = seller.quote.bidPrice;
+
+ // Gross spread
+ double spread = sellPrice - buyPrice;
+ if (spread <= 0) {
+ continue;
+ }
+
+ // Deduct fees
+ double totalFees = (buyPrice * buyer.fee) + (sellPrice * seller.fee);
+ double netSpread = spread - totalFees;
+
+ if (netSpread <= 0) {
+ continue;
+ }
+
+ double midPrice = (buyPrice + sellPrice) / 2.0;
+ double spreadBps = (netSpread / midPrice) * 10000.0;
+
+ if (spreadBps < m_config.minSpreadBps) {
+ continue;
+ }
+
+ double maxQty = std::min(buyer.quote.askSize, seller.quote.bidSize);
+ double estimatedProfit = netSpread * maxQty;
+
+ if (estimatedProfit < m_config.minProfitUsd) {
+ continue;
+ }
+
+ ArbitrageOpportunity opp;
+ opp.symbol = symbol;
+ opp.buyVenue = buyer.venue;
+ opp.sellVenue = seller.venue;
+ opp.buyPrice = buyPrice;
+ opp.sellPrice = sellPrice;
+ opp.spread = netSpread;
+ opp.spreadBps = spreadBps;
+ opp.maxQuantity = maxQty;
+ opp.estimatedProfit = estimatedProfit;
+ opp.detectedAt = now;
+
+ opportunities.push_back(opp);
+ }
+ }
+
+ return opportunities;
+}
+
+bool ArbitrageDetector::isStale(const VenueQuote& quote) const {
+ uint64_t now = utils::TimeUtils::getCurrentNanos();
+ return (now - quote.timestamp) > m_config.maxStalenessNs;
+}
+
+double ArbitrageDetector::getVenueFee(const std::string& venue) const {
+ auto it = m_config.venueFees.find(venue);
+ if (it != m_config.venueFees.end()) {
+ return it->second;
+ }
+ return 0.0;
+}
+
+} // namespace arbitrage
+} // namespace pinnacle
diff --git a/strategies/arbitrage/ArbitrageDetector.h b/strategies/arbitrage/ArbitrageDetector.h
new file mode 100644
index 0000000..e7c687e
--- /dev/null
+++ b/strategies/arbitrage/ArbitrageDetector.h
@@ -0,0 +1,171 @@
+#pragma once
+
+#include "../../core/orderbook/Order.h"
+#include "../../core/utils/TimeUtils.h"
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+namespace pinnacle {
+namespace arbitrage {
+
+/**
+ * @struct ArbitrageOpportunity
+ * @brief Represents a detected cross-exchange arbitrage opportunity
+ */
+struct ArbitrageOpportunity {
+ std::string symbol;
+ std::string buyVenue;
+ std::string sellVenue;
+ double buyPrice{0.0};
+ double sellPrice{0.0};
+ double spread{0.0};
+ double spreadBps{0.0};
+ double maxQuantity{0.0};
+ double estimatedProfit{0.0};
+ uint64_t detectedAt{0};
+};
+
+/**
+ * @struct ArbitrageConfig
+ * @brief Configuration for the arbitrage detector
+ */
+struct ArbitrageConfig {
+ double minSpreadBps{5.0};
+ double minProfitUsd{1.0};
+ uint64_t maxStalenessNs{500000000}; // 500ms
+ uint64_t scanIntervalMs{10};
+ bool dryRun{true};
+ std::vector symbols;
+ std::vector venues;
+ std::unordered_map venueFees; // venue -> fee ratio
+};
+
+/**
+ * @struct VenueQuote
+ * @brief Quote from a specific venue
+ */
+struct VenueQuote {
+ double bidPrice{0.0};
+ double bidSize{0.0};
+ double askPrice{0.0};
+ double askSize{0.0};
+ uint64_t timestamp{0};
+};
+
+/**
+ * @class ArbitrageDetector
+ * @brief Scans multi-venue quotes for profitable cross-exchange spreads
+ *
+ * Maintains a per-venue quote cache and runs a background scan thread
+ * to detect arbitrage opportunities based on configurable thresholds.
+ */
+class ArbitrageDetector {
+public:
+ using OpportunityCallback = std::function;
+
+ explicit ArbitrageDetector(const ArbitrageConfig& config);
+ ~ArbitrageDetector();
+
+ ArbitrageDetector(const ArbitrageDetector&) = delete;
+ ArbitrageDetector& operator=(const ArbitrageDetector&) = delete;
+
+ /**
+ * @brief Start the background scan thread
+ * @return true if started successfully
+ */
+ bool start();
+
+ /**
+ * @brief Stop the background scan thread
+ * @return true if stopped successfully
+ */
+ bool stop();
+
+ /**
+ * @brief Check if the detector is running
+ */
+ bool isRunning() const;
+
+ /**
+ * @brief Update the quote for a specific venue and symbol
+ */
+ void updateVenueQuote(const std::string& venue, const std::string& symbol,
+ double bid, double bidSize, double ask, double askSize,
+ uint64_t timestamp);
+
+ /**
+ * @brief Get all current opportunities (thread-safe snapshot)
+ */
+ std::vector getCurrentOpportunities() const;
+
+ /**
+ * @brief Register a callback for new opportunities
+ */
+ void setOpportunityCallback(OpportunityCallback callback);
+
+ /**
+ * @brief Get statistics string
+ */
+ std::string getStatistics() const;
+
+ /**
+ * @brief Get total number of opportunities detected
+ */
+ uint64_t getTotalOpportunitiesDetected() const;
+
+private:
+ ArbitrageConfig m_config;
+
+ // Venue quotes: venue -> symbol -> quote
+ using QuoteMap =
+ std::unordered_map>;
+ QuoteMap m_quotes;
+ mutable std::mutex m_quotesMutex;
+
+ // Current opportunities
+ std::vector m_opportunities;
+ mutable std::mutex m_opportunitiesMutex;
+
+ // Callback
+ OpportunityCallback m_callback;
+ std::mutex m_callbackMutex;
+
+ // Scan thread
+ std::thread m_scanThread;
+ std::atomic m_running{false};
+
+ // Statistics
+ std::atomic m_totalOpportunities{0};
+ std::atomic m_totalScans{0};
+
+ /**
+ * @brief Background scan loop
+ */
+ void scanLoop();
+
+ /**
+ * @brief Detect opportunities for a specific symbol
+ */
+ std::vector
+ detectOpportunities(const std::string& symbol) const;
+
+ /**
+ * @brief Check if a quote is stale
+ */
+ bool isStale(const VenueQuote& quote) const;
+
+ /**
+ * @brief Get the fee for a venue (0.0 if not configured)
+ */
+ double getVenueFee(const std::string& venue) const;
+};
+
+} // namespace arbitrage
+} // namespace pinnacle
diff --git a/strategies/arbitrage/ArbitrageExecutor.cpp b/strategies/arbitrage/ArbitrageExecutor.cpp
new file mode 100644
index 0000000..2635217
--- /dev/null
+++ b/strategies/arbitrage/ArbitrageExecutor.cpp
@@ -0,0 +1,138 @@
+#include "ArbitrageExecutor.h"
+#include "../../core/utils/TimeUtils.h"
+
+#include
+#include
+
+namespace pinnacle {
+namespace arbitrage {
+
+ArbitrageExecutor::ArbitrageExecutor(bool dryRun) : m_dryRun(dryRun) {}
+
+ExecutionResult
+ArbitrageExecutor::execute(const ArbitrageOpportunity& opportunity) {
+ ExecutionResult result;
+ result.opportunity = opportunity;
+
+ uint64_t startTime = utils::TimeUtils::getCurrentNanos();
+
+ m_totalExecutions.fetch_add(1, std::memory_order_relaxed);
+
+ if (m_dryRun) {
+ // Simulate execution
+ result.buyFilled = true;
+ result.sellFilled = true;
+ result.buyFillPrice = opportunity.buyPrice;
+ result.sellFillPrice = opportunity.sellPrice;
+ result.fillQuantity = opportunity.maxQuantity;
+ result.realizedProfit = opportunity.estimatedProfit;
+ result.slippage = 0.0;
+
+ spdlog::info("[DRY-RUN] Arbitrage: BUY {} {} @{} from {} | SELL @{} on {} "
+ "| profit=${}",
+ opportunity.maxQuantity, opportunity.symbol,
+ opportunity.buyPrice, opportunity.buyVenue,
+ opportunity.sellPrice, opportunity.sellVenue,
+ opportunity.estimatedProfit);
+
+ m_successfulExecutions.fetch_add(1, std::memory_order_relaxed);
+
+ // Update total profit via CAS
+ double prev = m_totalProfit.load(std::memory_order_relaxed);
+ while (!m_totalProfit.compare_exchange_weak(
+ prev, prev + result.realizedProfit, std::memory_order_release,
+ std::memory_order_relaxed)) {
+ }
+
+ } else {
+ // Real execution via callbacks
+ OrderSubmitCallback cb;
+ {
+ std::lock_guard lock(m_callbackMutex);
+ cb = m_submitCallback;
+ }
+
+ if (!cb) {
+ result.error = "No order submit callback registered";
+ m_failedExecutions.fetch_add(1, std::memory_order_relaxed);
+ return result;
+ }
+
+ // Submit buy order
+ result.buyFilled =
+ cb(opportunity.buyVenue, opportunity.symbol, OrderSide::BUY,
+ opportunity.buyPrice, opportunity.maxQuantity);
+
+ // Submit sell order
+ result.sellFilled =
+ cb(opportunity.sellVenue, opportunity.symbol, OrderSide::SELL,
+ opportunity.sellPrice, opportunity.maxQuantity);
+
+ if (result.buyFilled && result.sellFilled) {
+ result.buyFillPrice = opportunity.buyPrice;
+ result.sellFillPrice = opportunity.sellPrice;
+ result.fillQuantity = opportunity.maxQuantity;
+ result.realizedProfit = opportunity.estimatedProfit;
+ m_successfulExecutions.fetch_add(1, std::memory_order_relaxed);
+
+ double prev = m_totalProfit.load(std::memory_order_relaxed);
+ while (!m_totalProfit.compare_exchange_weak(
+ prev, prev + result.realizedProfit, std::memory_order_release,
+ std::memory_order_relaxed)) {
+ }
+ } else {
+ result.error = "Partial fill — ";
+ if (!result.buyFilled) {
+ result.error += "buy failed ";
+ }
+ if (!result.sellFilled) {
+ result.error += "sell failed";
+ }
+ m_failedExecutions.fetch_add(1, std::memory_order_relaxed);
+ }
+ }
+
+ result.executionTimeNs = utils::TimeUtils::getCurrentNanos() - startTime;
+
+ // Store recent result
+ {
+ std::lock_guard lock(m_resultsMutex);
+ m_recentResults.push_back(result);
+ if (m_recentResults.size() > 100) {
+ m_recentResults.erase(m_recentResults.begin());
+ }
+ }
+
+ return result;
+}
+
+void ArbitrageExecutor::setOrderSubmitCallback(OrderSubmitCallback callback) {
+ std::lock_guard lock(m_callbackMutex);
+ m_submitCallback = std::move(callback);
+}
+
+std::string ArbitrageExecutor::getStatistics() const {
+ std::ostringstream oss;
+ oss << "ArbitrageExecutor Statistics:\n";
+ oss << " Dry run: " << (m_dryRun ? "yes" : "no") << "\n";
+ oss << " Total executions: "
+ << m_totalExecutions.load(std::memory_order_relaxed) << "\n";
+ oss << " Successful: "
+ << m_successfulExecutions.load(std::memory_order_relaxed) << "\n";
+ oss << " Failed: " << m_failedExecutions.load(std::memory_order_relaxed)
+ << "\n";
+ oss << " Total profit: $" << m_totalProfit.load(std::memory_order_relaxed)
+ << "\n";
+ return oss.str();
+}
+
+uint64_t ArbitrageExecutor::getTotalExecutions() const {
+ return m_totalExecutions.load(std::memory_order_relaxed);
+}
+
+double ArbitrageExecutor::getTotalProfit() const {
+ return m_totalProfit.load(std::memory_order_relaxed);
+}
+
+} // namespace arbitrage
+} // namespace pinnacle
diff --git a/strategies/arbitrage/ArbitrageExecutor.h b/strategies/arbitrage/ArbitrageExecutor.h
new file mode 100644
index 0000000..90d00bd
--- /dev/null
+++ b/strategies/arbitrage/ArbitrageExecutor.h
@@ -0,0 +1,92 @@
+#pragma once
+
+#include "ArbitrageDetector.h"
+
+#include
+#include
+#include
+#include
+#include
+#include
+
+namespace pinnacle {
+namespace arbitrage {
+
+/**
+ * @struct ExecutionResult
+ * @brief Result of an arbitrage execution attempt
+ */
+struct ExecutionResult {
+ ArbitrageOpportunity opportunity;
+ bool buyFilled{false};
+ bool sellFilled{false};
+ double buyFillPrice{0.0};
+ double sellFillPrice{0.0};
+ double fillQuantity{0.0};
+ double realizedProfit{0.0};
+ double slippage{0.0};
+ uint64_t executionTimeNs{0};
+ std::string error;
+};
+
+/**
+ * @class ArbitrageExecutor
+ * @brief Executes arbitrage opportunities by submitting simultaneous buy/sell
+ * orders
+ */
+class ArbitrageExecutor {
+public:
+ using OrderSubmitCallback = std::function;
+
+ explicit ArbitrageExecutor(bool dryRun = true);
+ ~ArbitrageExecutor() = default;
+
+ ArbitrageExecutor(const ArbitrageExecutor&) = delete;
+ ArbitrageExecutor& operator=(const ArbitrageExecutor&) = delete;
+
+ /**
+ * @brief Execute an arbitrage opportunity
+ * @param opportunity The detected opportunity
+ * @return Execution result
+ */
+ ExecutionResult execute(const ArbitrageOpportunity& opportunity);
+
+ /**
+ * @brief Set the order submission callback (used to route to OrderRouter)
+ */
+ void setOrderSubmitCallback(OrderSubmitCallback callback);
+
+ /**
+ * @brief Get execution statistics
+ */
+ std::string getStatistics() const;
+
+ /**
+ * @brief Get total number of executions
+ */
+ uint64_t getTotalExecutions() const;
+
+ /**
+ * @brief Get total realized profit
+ */
+ double getTotalProfit() const;
+
+private:
+ bool m_dryRun;
+ OrderSubmitCallback m_submitCallback;
+ std::mutex m_callbackMutex;
+
+ // Statistics
+ std::atomic m_totalExecutions{0};
+ std::atomic m_successfulExecutions{0};
+ std::atomic